diff --git a/lib/floe/runner.rb b/lib/floe/runner.rb index 4fb89b88..29bf0884 100644 --- a/lib/floe/runner.rb +++ b/lib/floe/runner.rb @@ -27,6 +27,13 @@ def for_resource(resource) scheme = resource.split("://").first resolve_scheme(scheme) || raise(ArgumentError, "Invalid resource scheme [#{scheme}]") end + + def runners + @runners.each_value.map do |runner| + runner = runner.call if runner.kind_of?(Proc) + runner + end + end end # Run a command asynchronously and create a runner_context diff --git a/lib/floe/workflow.rb b/lib/floe/workflow.rb index cb0993ef..012d1631 100644 --- a/lib/floe/workflow.rb +++ b/lib/floe/workflow.rb @@ -20,16 +20,21 @@ def wait(workflows, timeout: nil, &block) workflows = [workflows] if workflows.kind_of?(self) logger.info("checking #{workflows.count} workflows...") - run_until = Time.now.utc + timeout if timeout.to_i > 0 - ready = [] - queue = Queue.new - wait_thread = Thread.new do - loop do - Runner.for_resource("docker").wait do |event, runner_context| - queue.push([event, runner_context]) + run_until = Time.now.utc + timeout if timeout.to_i > 0 + ready = [] + queue = Queue.new + wait_threads = + Runner.runners.map do |runner| + next unless runner.respond_to?(:wait) + + Thread.new do + loop do + runner.wait do |event, runner_context| + queue.push([event, runner_context]) + end + end end end - end loop do ready = workflows.select(&:step_nonblock_ready?) @@ -81,7 +86,7 @@ def wait(workflows, timeout: nil, &block) logger.info("checking #{workflows.count} workflows...Complete - #{ready.count} ready") ready ensure - wait_thread&.kill + wait_threads.compact.map(&:kill) end end