From f6c425922bdb18068affc6a3c8cff88429a078fd Mon Sep 17 00:00:00 2001 From: Keenan Brock Date: Tue, 30 Apr 2024 16:09:32 -0400 Subject: [PATCH] Async version of the awesome runner --- lib/floe/awesome_runner.rb | 61 ++++++++++++++++++++++++++++++++++--- spec/awesome_runner_spec.rb | 2 ++ 2 files changed, 59 insertions(+), 4 deletions(-) diff --git a/lib/floe/awesome_runner.rb b/lib/floe/awesome_runner.rb index 3246d5395..9ebb54414 100644 --- a/lib/floe/awesome_runner.rb +++ b/lib/floe/awesome_runner.rb @@ -1,14 +1,54 @@ # frozen_string_literal: true +require "concurrent/array" + module Floe + class AwesomeProcess < Thread + attr_reader :result + attr_accessor :error + + def initialize(queue, context, *args) + self.report_on_exception = true + @processed = false + @context = context + + # don't like changing context here + # but want to make sure thread is setup put it here to prevent a race condition where this runs too quickly + # if this completes right away, it will trigger the queue + context["thread"] = self + + super do + @result = AwesomeSpawn.run(*args) + + # this is potentially a race condition + Floe::AwesomeRunner.populate_results!(@context, :result => @result) + # trugger an event + queue.push(["delete", context]) + rescue => err + # don't think we ever get in here + @error = err + + Floe::AwesomeRunner.populate_results!(@context, :error => err) + queue.push(["delete", context]) + end + end + end + class AwesomeRunner < Floe::Runner SCHEME = "awesome" SCHEME_PREFIX = "#{SCHEME}://" SCHEME_OFFSET = SCHEME.length + 3 + # only exposed for tests + # use wait instead + attr_reader :queue + def initialize(_options = {}) require "awesome_spawn" + # events triggered + @queue = Queue.new + super end @@ -21,13 +61,18 @@ def run_async!(resource, params = {}, _secrets = {}, _context = {}) runner_context = {} - # TODO: fix sanitization preventing params in args (e.g.: $PARAM1 => \$PARAM1) - result = AwesomeSpawn.run(method, :env => params, :params => args) - self.class.populate_results!(runner_context, :result => result) + # NOTE: this adds itself to the runner_context + AwesomeProcess.new(@queue, runner_context, method, :env => params, :params => args) + runner_context end def status!(runner_context) + # check if it has no output (i.e.: we think it is running) but it is not running + if !runner_context.key?("Output") && !runner_context["thread"]&.alive? + runner_context["Output"] = {"Error" => "Lambda.Unknown", "Cause" => "no output and no thread"} + runner_context["Error"] = true + end end def running?(runner_context) @@ -43,9 +88,17 @@ def output(runner_context) end def cleanup(runner_context) + runner_context["thread"] = nil end - def wait(_timeout: nil, _events: %i[create update delete]) + def wait(timeout: nil, _events: %i[create update delete]) + # TODO: implement whole interface + raise "wait needs a block and doesn't support timeout" unless timeout.nil? && block_given? + + loop do + event_context = @queue.pop + yield event_context if block_given? + end end # internal methods diff --git a/spec/awesome_runner_spec.rb b/spec/awesome_runner_spec.rb index 94bdf923d..aafedc326 100644 --- a/spec/awesome_runner_spec.rb +++ b/spec/awesome_runner_spec.rb @@ -20,12 +20,14 @@ stub_good_run("ls", :params => [], :env => {}, :output => "file\nlisting\n") subject.run_async!("awesome://ls") + subject.queue.pop end it "passes environment variables to command run" do stub_good_run("ls", :params => [], :env => {"FOO" => "BAR"}, :output => "file\nlisting\n") subject.run_async!("awesome://ls", {"FOO" => "BAR"}) + subject.queue.pop end end