Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement runner#wait in a generic way #179

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions lib/floe/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ def for_resource(resource)
scheme = resource.split("://").first
resolve_scheme(scheme) || raise(ArgumentError, "Invalid resource scheme [#{scheme}]")
end

def runners
@runners.each_value.map do |runner|
runner = runner.call if runner.kind_of?(Proc)
runner
end
end
Comment on lines +31 to +36
Copy link
Member Author

@kbrock kbrock May 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is essentially @runners.values resolving the procs.

I'm realizing that on startup 100% of runners are procs.
But after 1 task is run, 0% of runners are procs.
Wonder if we can do a resolve up front so we don't have to handle this case during regular runtime activity.


I feel supporting multiple schemes by a single runner is currently far fetched.
Ignoring that use case allows us to keep to our current simple runner registration process: Floe::Runner.register_scheme.

In the future, if we come up with a case where there are multiple schemes, we can make our registration process more complicated.

end

# Run a command asynchronously and create a runner_context
Expand Down
23 changes: 14 additions & 9 deletions lib/floe/workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,21 @@ def wait(workflows, timeout: nil, &block)
workflows = [workflows] if workflows.kind_of?(self)
logger.info("checking #{workflows.count} workflows...")

run_until = Time.now.utc + timeout if timeout.to_i > 0
ready = []
queue = Queue.new
wait_thread = Thread.new do
loop do
Runner.for_resource("docker").wait do |event, runner_context|
queue.push([event, runner_context])
run_until = Time.now.utc + timeout if timeout.to_i > 0
ready = []
queue = Queue.new
wait_threads =
Runner.runners.map do |runner|
next unless runner.respond_to?(:wait)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kbrock if we don't have a wait thread what is going to unblock the queue.pop

            # Block until an event is raised
            event, runner_context = queue.pop

Copy link
Member Author

@kbrock kbrock May 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@agrare That thread is only needed to handle the async nature of the "docker" runner. I commented out the "docker" thread and was able to run a workflow just fine:

{
  "StartAt": "a",
  "States": {
    "a":{
      "Type": "Pass",
      "Next": "b"
    },
    "b": {
      "Type": "Wait",
      "Seconds": 1,
      "Next": "c"
    },
      "c": {
        "Type": "Succeed"
    }
  }
}

ASIDE: was also able to call with an awesome:// Task without this thread


Thread.new do
loop do
runner.wait do |event, runner_context|
queue.push([event, runner_context])
end
end
end
end
end

loop do
ready = workflows.select(&:step_nonblock_ready?)
Expand Down Expand Up @@ -81,7 +86,7 @@ def wait(workflows, timeout: nil, &block)
logger.info("checking #{workflows.count} workflows...Complete - #{ready.count} ready")
ready
ensure
wait_thread&.kill
wait_threads.compact.map(&:kill)
end
end

Expand Down