Skip to content

Commit

Permalink
Alt EventLoop#run(queue*, blocking) method
Browse files Browse the repository at this point in the history
The point is to avoid parallel enqueues while running the event loop, so
we get better control to where and how the runnable fibers are enqueued;
for example all at once instead of one by one (may not be as effective
as it sounds).

More importantly for Execution Contexts: it avoids parallel enqueues
while the eventloop is running which sometimes leads to confusing
behavior; for example when deciding to wake up a scheduler/thread we
musn't interryupt the event loop (obviously).

This is working correctly for the Polling (Epoll, Kqueue) and IOCP event
loop implementations. I'm less confident with the libevent one where the
external library executes arbitrary callbacks.
  • Loading branch information
ysbaddaden committed Jan 16, 2025
1 parent 35da3f9 commit 3852ce4
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 10 deletions.
7 changes: 7 additions & 0 deletions src/crystal/event_loop.cr
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ abstract class Crystal::EventLoop
# events.
abstract def run(blocking : Bool) : Bool

{% if flag?(:execution_context) %}
# Same as `#run` but collects runnable fibers into *queue* instead of
# enqueueing in parallel, so the caller is responsible and in control for
# when and how the fibers will be enqueued.
abstract def run(queue : Fiber::Queue*, blocking : Bool) : Nil
{% end %}

# Tells a blocking run loop to no longer wait for events to activate. It may
# for example enqueue a NOOP event with an immediate (or past) timeout. Having
# activated an event, the loop shall return, allowing the blocked thread to
Expand Down
8 changes: 8 additions & 0 deletions src/crystal/event_loop/iocp.cr
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop
iocp
end

# thread unsafe
def run(blocking : Bool) : Bool
enqueued = false

Expand All @@ -66,6 +67,13 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop
enqueued
end

{% if flag?(:execution_context) %}
# thread unsafe
def run(queue : Fiber::Queue*, blocking : Bool) : Nil
run_impl(blocking) { |fiber| queue.value.push(fiber) }
end
{% end %}

# Runs the event loop and enqueues the fiber for the next upcoming event or
# completion.
private def run_impl(blocking : Bool, &) : Nil
Expand Down
43 changes: 36 additions & 7 deletions src/crystal/event_loop/libevent.cr
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,55 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop
event_base.loop(flags)
end

{% if flag?(:execution_context) %}
def run(queue : Fiber::Queue*, blocking : Bool) : Nil
Crystal.trace :evloop, "run", fiber: fiber, blocking: blocking
@runnables = queue
run(blocking)
ensure
@runnables = nil
end

def callback_enqueue(fiber : Fiber) : Nil
if queue = @runnables
queue.value.push(fiber)
else
raise "BUG: libevent callback executed outside of #run(queue*, blocking) call"
end
end
{% end %}

def interrupt : Nil
event_base.loop_exit
end

# Create a new resume event for a fiber.
# Create a new resume event for a fiber (sleep).
def create_resume_event(fiber : Fiber) : Crystal::EventLoop::LibEvent::Event
event_base.new_event(-1, LibEvent2::EventFlags::None, fiber) do |s, flags, data|
data.as(Fiber).enqueue
f = data.as(Fiber)
{% if flag?(:execution_context) %}
event_loop = Crystal::EventLoop.current.as(Crystal::EventLoop::LibEvent)
event_loop.callback_enqueue(f)
{% else %}
f.enqueue
{% end %}
end
end

# Creates a timeout_event.
# Creates a timeout event (timeout action of select expression).
def create_timeout_event(fiber) : Crystal::EventLoop::LibEvent::Event
event_base.new_event(-1, LibEvent2::EventFlags::None, fiber) do |s, flags, data|
f = data.as(Fiber)
if (select_action = f.timeout_select_action)
if select_action = f.timeout_select_action
f.timeout_select_action = nil
select_action.time_expired(f)
else
f.enqueue
if select_action.time_expired?
{% if flag?(:execution_context) %}
event_loop = Crystal::EventLoop.current.as(Crystal::EventLoop::LibEvent)
event_loop.callback_enqueue(f)
{% else %}
f.enqueue
{% end %}
end
end
end
end
Expand Down
9 changes: 8 additions & 1 deletion src/crystal/event_loop/polling.cr
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop
end
{% end %}

# NOTE: thread unsafe
# thread unsafe
def run(blocking : Bool) : Bool
system_run(blocking) do |fiber|
{% if flag?(:execution_context) %}
Expand All @@ -124,6 +124,13 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop
true
end

{% if flag?(:execution_context) %}
# thread unsafe
def run(queue : Fiber::Queue*, blocking : Bool) : Nil
system_run(blocking) { |fiber| queue.value.push(fiber) }
end
{% end %}

# fiber interface, see Crystal::EventLoop

def create_resume_event(fiber : Fiber) : FiberEvent
Expand Down
14 changes: 12 additions & 2 deletions src/io/evented.cr
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ module IO::Evented
@read_timed_out = timed_out

if reader = @readers.get?.try &.shift?
reader.enqueue
{% if flag?(:execution_context) && Crystal::EventLoop.has_constant?(:LibEvent) %}
event_loop = Crystal::EventLoop.current.as(Crystal::EventLoop::LibEvent)
event_loop.callback_enqueue(reader)
{% else %}
reader.enqueue
{% end %}
end
end

Expand All @@ -29,7 +34,12 @@ module IO::Evented
@write_timed_out = timed_out

if writer = @writers.get?.try &.shift?
writer.enqueue
{% if flag?(:execution_context) && Crystal::EventLoop.has_constant?(:LibEvent) %}
event_loop = Crystal::EventLoop.current.as(Crystal::EventLoop::LibEvent)
event_loop.callback_enqueue(reader)
{% else %}
writer.enqueue
{% end %}
end
end

Expand Down

0 comments on commit 3852ce4

Please sign in to comment.