From 3852ce4b242167e5585deea48e07b8f1cc1f19a6 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Fri, 6 Dec 2024 15:01:08 +0100 Subject: [PATCH] Alt EventLoop#run(queue*, blocking) method The point is to avoid parallel enqueues while running the event loop, so we get better control to where and how the runnable fibers are enqueued; for example all at once instead of one by one (may not be as effective as it sounds). More importantly for Execution Contexts: it avoids parallel enqueues while the eventloop is running which sometimes leads to confusing behavior; for example when deciding to wake up a scheduler/thread we musn't interryupt the event loop (obviously). This is working correctly for the Polling (Epoll, Kqueue) and IOCP event loop implementations. I'm less confident with the libevent one where the external library executes arbitrary callbacks. --- src/crystal/event_loop.cr | 7 +++++ src/crystal/event_loop/iocp.cr | 8 ++++++ src/crystal/event_loop/libevent.cr | 43 +++++++++++++++++++++++++----- src/crystal/event_loop/polling.cr | 9 ++++++- src/io/evented.cr | 14 ++++++++-- 5 files changed, 71 insertions(+), 10 deletions(-) diff --git a/src/crystal/event_loop.cr b/src/crystal/event_loop.cr index 0294abc2ef3d..1b2887a1c535 100644 --- a/src/crystal/event_loop.cr +++ b/src/crystal/event_loop.cr @@ -54,6 +54,13 @@ abstract class Crystal::EventLoop # events. abstract def run(blocking : Bool) : Bool + {% if flag?(:execution_context) %} + # Same as `#run` but collects runnable fibers into *queue* instead of + # enqueueing in parallel, so the caller is responsible and in control for + # when and how the fibers will be enqueued. + abstract def run(queue : Fiber::Queue*, blocking : Bool) : Nil + {% end %} + # Tells a blocking run loop to no longer wait for events to activate. It may # for example enqueue a NOOP event with an immediate (or past) timeout. Having # activated an event, the loop shall return, allowing the blocked thread to diff --git a/src/crystal/event_loop/iocp.cr b/src/crystal/event_loop/iocp.cr index 6e4175e3daee..3c84b2ce927d 100644 --- a/src/crystal/event_loop/iocp.cr +++ b/src/crystal/event_loop/iocp.cr @@ -55,6 +55,7 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop iocp end + # thread unsafe def run(blocking : Bool) : Bool enqueued = false @@ -66,6 +67,13 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop enqueued end + {% if flag?(:execution_context) %} + # thread unsafe + def run(queue : Fiber::Queue*, blocking : Bool) : Nil + run_impl(blocking) { |fiber| queue.value.push(fiber) } + end + {% end %} + # Runs the event loop and enqueues the fiber for the next upcoming event or # completion. private def run_impl(blocking : Bool, &) : Nil diff --git a/src/crystal/event_loop/libevent.cr b/src/crystal/event_loop/libevent.cr index 9c0b3d33b15c..b6a2f1baa745 100644 --- a/src/crystal/event_loop/libevent.cr +++ b/src/crystal/event_loop/libevent.cr @@ -20,26 +20,55 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop event_base.loop(flags) end + {% if flag?(:execution_context) %} + def run(queue : Fiber::Queue*, blocking : Bool) : Nil + Crystal.trace :evloop, "run", fiber: fiber, blocking: blocking + @runnables = queue + run(blocking) + ensure + @runnables = nil + end + + def callback_enqueue(fiber : Fiber) : Nil + if queue = @runnables + queue.value.push(fiber) + else + raise "BUG: libevent callback executed outside of #run(queue*, blocking) call" + end + end + {% end %} + def interrupt : Nil event_base.loop_exit end - # Create a new resume event for a fiber. + # Create a new resume event for a fiber (sleep). def create_resume_event(fiber : Fiber) : Crystal::EventLoop::LibEvent::Event event_base.new_event(-1, LibEvent2::EventFlags::None, fiber) do |s, flags, data| - data.as(Fiber).enqueue + f = data.as(Fiber) + {% if flag?(:execution_context) %} + event_loop = Crystal::EventLoop.current.as(Crystal::EventLoop::LibEvent) + event_loop.callback_enqueue(f) + {% else %} + f.enqueue + {% end %} end end - # Creates a timeout_event. + # Creates a timeout event (timeout action of select expression). def create_timeout_event(fiber) : Crystal::EventLoop::LibEvent::Event event_base.new_event(-1, LibEvent2::EventFlags::None, fiber) do |s, flags, data| f = data.as(Fiber) - if (select_action = f.timeout_select_action) + if select_action = f.timeout_select_action f.timeout_select_action = nil - select_action.time_expired(f) - else - f.enqueue + if select_action.time_expired? + {% if flag?(:execution_context) %} + event_loop = Crystal::EventLoop.current.as(Crystal::EventLoop::LibEvent) + event_loop.callback_enqueue(f) + {% else %} + f.enqueue + {% end %} + end end end end diff --git a/src/crystal/event_loop/polling.cr b/src/crystal/event_loop/polling.cr index f7fc36082a0e..24d94af02bcc 100644 --- a/src/crystal/event_loop/polling.cr +++ b/src/crystal/event_loop/polling.cr @@ -112,7 +112,7 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop end {% end %} - # NOTE: thread unsafe + # thread unsafe def run(blocking : Bool) : Bool system_run(blocking) do |fiber| {% if flag?(:execution_context) %} @@ -124,6 +124,13 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop true end + {% if flag?(:execution_context) %} + # thread unsafe + def run(queue : Fiber::Queue*, blocking : Bool) : Nil + system_run(blocking) { |fiber| queue.value.push(fiber) } + end + {% end %} + # fiber interface, see Crystal::EventLoop def create_resume_event(fiber : Fiber) : FiberEvent diff --git a/src/io/evented.cr b/src/io/evented.cr index b238830f284a..db601a83964f 100644 --- a/src/io/evented.cr +++ b/src/io/evented.cr @@ -20,7 +20,12 @@ module IO::Evented @read_timed_out = timed_out if reader = @readers.get?.try &.shift? - reader.enqueue + {% if flag?(:execution_context) && Crystal::EventLoop.has_constant?(:LibEvent) %} + event_loop = Crystal::EventLoop.current.as(Crystal::EventLoop::LibEvent) + event_loop.callback_enqueue(reader) + {% else %} + reader.enqueue + {% end %} end end @@ -29,7 +34,12 @@ module IO::Evented @write_timed_out = timed_out if writer = @writers.get?.try &.shift? - writer.enqueue + {% if flag?(:execution_context) && Crystal::EventLoop.has_constant?(:LibEvent) %} + event_loop = Crystal::EventLoop.current.as(Crystal::EventLoop::LibEvent) + event_loop.callback_enqueue(reader) + {% else %} + writer.enqueue + {% end %} end end