From 761349490b2b3e052e24f00d52f8424a77586151 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Tue, 3 Dec 2024 19:39:22 +0100 Subject: [PATCH] Add Runnables and GlobalQueue for schedulers to keep fibers --- .../execution_context/global_queue_spec.cr | 225 +++++++++++++++ spec/std/execution_context/runnables_spec.cr | 264 ++++++++++++++++++ spec/std/execution_context/spec_helper.cr | 21 ++ src/execution_context/global_queue.cr | 104 +++++++ src/execution_context/runnables.cr | 210 ++++++++++++++ 5 files changed, 824 insertions(+) create mode 100644 spec/std/execution_context/global_queue_spec.cr create mode 100644 spec/std/execution_context/runnables_spec.cr create mode 100644 spec/std/execution_context/spec_helper.cr create mode 100644 src/execution_context/global_queue.cr create mode 100644 src/execution_context/runnables.cr diff --git a/spec/std/execution_context/global_queue_spec.cr b/spec/std/execution_context/global_queue_spec.cr new file mode 100644 index 000000000000..838a31406c01 --- /dev/null +++ b/spec/std/execution_context/global_queue_spec.cr @@ -0,0 +1,225 @@ +require "./spec_helper" + +describe ExecutionContext::GlobalQueue do + it "#initialize" do + q = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + q.empty?.should be_true + end + + it "#unsafe_push and #unsafe_pop" do + f1 = Fiber.new(name: "f1") { } + f2 = Fiber.new(name: "f2") { } + f3 = Fiber.new(name: "f3") { } + + q = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + q.unsafe_push(f1) + q.size.should eq(1) + + q.unsafe_push(f2) + q.unsafe_push(f3) + q.size.should eq(3) + + q.unsafe_pop?.should be(f3) + q.size.should eq(2) + + q.unsafe_pop?.should be(f2) + q.unsafe_pop?.should be(f1) + q.unsafe_pop?.should be_nil + q.size.should eq(0) + q.empty?.should be_true + end + + describe "#unsafe_grab?" do + it "can't grab from empty queue" do + q = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + runnables = ExecutionContext::Runnables(6).new(q) + q.unsafe_grab?(runnables, 4).should be_nil + end + + it "grabs fibers" do + q = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + fibers = 10.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a + fibers.each { |f| q.unsafe_push(f) } + + runnables = ExecutionContext::Runnables(6).new(q) + fiber = q.unsafe_grab?(runnables, 4) + + # returned the last enqueued fiber + fiber.should be(fibers[9]) + + # enqueued the next 2 fibers + runnables.size.should eq(2) + runnables.get?.should be(fibers[8]) + runnables.get?.should be(fibers[7]) + + # the remaining fibers are still there: + 6.downto(0).each do |i| + q.unsafe_pop?.should be(fibers[i]) + end + end + + it "can't grab more than available" do + f = Fiber.new { } + q = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + q.unsafe_push(f) + + # dequeues the unique fiber + runnables = ExecutionContext::Runnables(6).new(q) + fiber = q.unsafe_grab?(runnables, 4) + fiber.should be(f) + + # had nothing left to dequeue + runnables.size.should eq(0) + end + + it "clamps divisor to 1" do + f = Fiber.new { } + q = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + q.unsafe_push(f) + + # dequeues the unique fiber + runnables = ExecutionContext::Runnables(6).new(q) + fiber = q.unsafe_grab?(runnables, 0) + fiber.should be(f) + + # had nothing left to dequeue + runnables.size.should eq(0) + end + end + + # interpreter doesn't support threads yet (#14287) + pending_interpreted describe: "thread safety" do + it "one by one" do + fibers = StaticArray(ExecutionContext::FiberCounter, 763).new do |i| + ExecutionContext::FiberCounter.new(Fiber.new(name: "f#{i}") { }) + end + + n = 7 + increments = 15 + queue = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + ready = Thread::WaitGroup.new(n) + shutdown = Thread::WaitGroup.new(n) + + n.times do |i| + Thread.new(name: "ONE-#{i}") do |thread| + slept = 0 + ready.done + + loop do + if fiber = queue.pop? + fc = fibers.find { |x| x.@fiber == fiber }.not_nil! + queue.push(fiber) if fc.increment < increments + slept = 0 + elsif slept < 100 + slept += 1 + Thread.sleep(1.nanosecond) # don't burn CPU + else + break + end + end + rescue exception + Crystal::System.print_error "\nthread: #{thread.name}: exception: #{exception}" + ensure + shutdown.done + end + end + ready.wait + + fibers.each_with_index do |fc, i| + queue.push(fc.@fiber) + Thread.sleep(10.nanoseconds) if i % 10 == 9 + end + + shutdown.wait + + # must have dequeued each fiber exactly X times + fibers.each { |fc| fc.counter.should eq(increments) } + end + + it "bulk operations" do + n = 7 + increments = 15 + + fibers = StaticArray(ExecutionContext::FiberCounter, 765).new do |i| # 765 can be divided by 3 and 5 + ExecutionContext::FiberCounter.new(Fiber.new(name: "f#{i}") { }) + end + + queue = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + ready = Thread::WaitGroup.new(n) + shutdown = Thread::WaitGroup.new(n) + + n.times do |i| + Thread.new(name: "BULK-#{i}") do |thread| + slept = 0 + + r = ExecutionContext::Runnables(3).new(queue) + + batch = Fiber::Queue.new + size = 0 + + reenqueue = -> { + if size > 0 + queue.bulk_push(pointerof(batch)) + names = [] of String? + batch.each { |f| names << f.name } + batch.clear + size = 0 + end + } + + execute = ->(fiber : Fiber) { + fc = fibers.find { |x| x.@fiber == fiber }.not_nil! + + if fc.increment < increments + batch.push(fc.@fiber) + size += 1 + end + } + + ready.done + + loop do + if fiber = r.get? + execute.call(fiber) + slept = 0 + next + end + + if fiber = queue.grab?(r, 1) + reenqueue.call + execute.call(fiber) + slept = 0 + next + end + + if slept >= 100 + break + end + + reenqueue.call + slept += 1 + Thread.sleep(1.nanosecond) # don't burn CPU + end + rescue exception + Crystal::System.print_error "\nthread #{thread.name} raised: #{exception}" + ensure + shutdown.done + end + end + ready.wait + + # enqueue in batches of 5 + 0.step(to: fibers.size - 1, by: 5) do |i| + q = Fiber::Queue.new + 5.times { |j| q.push(fibers[i + j].@fiber) } + queue.bulk_push(pointerof(q)) + Thread.sleep(10.nanoseconds) if i % 4 == 3 + end + + shutdown.wait + + # must have dequeued each fiber exactly X times (no less, no more) + fibers.each { |fc| fc.counter.should eq(increments) } + end + end +end diff --git a/spec/std/execution_context/runnables_spec.cr b/spec/std/execution_context/runnables_spec.cr new file mode 100644 index 000000000000..6fa342675402 --- /dev/null +++ b/spec/std/execution_context/runnables_spec.cr @@ -0,0 +1,264 @@ +require "./spec_helper" + +describe ExecutionContext::Runnables do + it "#initialize" do + g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r = ExecutionContext::Runnables(16).new(g) + r.capacity.should eq(16) + end + + describe "#push" do + it "enqueues the fiber in local queue" do + fibers = 4.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a + + # local enqueue + g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r = ExecutionContext::Runnables(4).new(g) + fibers.each { |f| r.push(f) } + + # local dequeue + fibers.each { |f| r.get?.should be(f) } + r.get?.should be_nil + + # didn't push to global queue + g.pop?.should be_nil + end + + it "moves half the local queue to the global queue on overflow" do + fibers = 5.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a + + # local enqueue + overflow + g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r = ExecutionContext::Runnables(4).new(g) + fibers.each { |f| r.push(f) } + + # kept half of local queue + r.get?.should be(fibers[2]) + r.get?.should be(fibers[3]) + + # moved half of local queue + last push to global queue + g.pop?.should eq(fibers[0]) + g.pop?.should eq(fibers[1]) + g.pop?.should eq(fibers[4]) + end + + it "can always push up to capacity" do + g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r = ExecutionContext::Runnables(4).new(g) + + 4.times do + # local + 4.times { r.push(Fiber.new { }) } + 2.times { r.get? } + 2.times { r.push(Fiber.new { }) } + + # overflow (2+1 fibers are sent to global queue + 1 local) + 2.times { r.push(Fiber.new { }) } + + # clear + 3.times { r.get? } + end + + # on each iteration we pushed 2+1 fibers to the global queue + g.size.should eq(12) + + # grab fibers back from the global queue + fiber = g.unsafe_grab?(r, divisor: 1) + fiber.should_not be_nil + r.get?.should_not be_nil + r.get?.should be_nil + end + end + + describe "#bulk_push" do + it "fills the local queue" do + q = Fiber::Queue.new + fibers = 4.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a + fibers.each { |f| q.push(f) } + + # local enqueue + g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r = ExecutionContext::Runnables(4).new(g) + r.bulk_push(pointerof(q)) + + fibers.reverse_each { |f| r.get?.should be(f) } + g.empty?.should be_true + end + + it "pushes the overflow to the global queue" do + q = Fiber::Queue.new + fibers = 7.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a + fibers.each { |f| q.push(f) } + + # local enqueue + overflow + g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r = ExecutionContext::Runnables(4).new(g) + r.bulk_push(pointerof(q)) + + # filled the local queue + r.get?.should eq(fibers[6]) + r.get?.should eq(fibers[5]) + r.get?.should be(fibers[4]) + r.get?.should be(fibers[3]) + + # moved the rest to the global queue + g.pop?.should eq(fibers[2]) + g.pop?.should eq(fibers[1]) + g.pop?.should eq(fibers[0]) + end + end + + describe "#get?" do + # TODO: need specific tests (though we already use it in the above tests?) + end + + describe "#steal_from" do + it "steals from another runnables" do + g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + fibers = 6.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a + + # fill the source queue + r1 = ExecutionContext::Runnables(16).new(g) + fibers.each { |f| r1.push(f) } + + # steal from source queue + r2 = ExecutionContext::Runnables(16).new(g) + fiber = r2.steal_from(r1) + + # stole half of the runnable fibers + fiber.should be(fibers[2]) + r2.get?.should be(fibers[0]) + r2.get?.should be(fibers[1]) + r2.get?.should be_nil + + # left the other half + r1.get?.should be(fibers[3]) + r1.get?.should be(fibers[4]) + r1.get?.should be(fibers[5]) + r1.get?.should be_nil + + # global queue is left untouched + g.empty?.should be_true + end + + it "steals the last fiber" do + g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + lone = Fiber.new(name: "lone") { } + + # fill the source queue + r1 = ExecutionContext::Runnables(16).new(g) + r1.push(lone) + + # steal from source queue + r2 = ExecutionContext::Runnables(16).new(g) + fiber = r2.steal_from(r1) + + # stole the fiber & local queue is still empty + fiber.should be(lone) + r2.get?.should be_nil + + # left nothing in original queue + r1.get?.should be_nil + + # global queue is left untouched + g.empty?.should be_true + end + + it "steals nothing" do + g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r1 = ExecutionContext::Runnables(16).new(g) + r2 = ExecutionContext::Runnables(16).new(g) + + fiber = r2.steal_from(r1) + fiber.should be_nil + r2.get?.should be_nil + r1.get?.should be_nil + end + end + + # interpreter doesn't support threads yet (#14287) + pending_interpreted describe: "thread safety" do + it "stress test" do + n = 7 + increments = 7919 + + # less fibers than space in runnables (so threads can starve) + # 54 is roughly half of 16 × 7 and can be divided by 9 (for batch enqueues below) + fibers = Array(ExecutionContext::FiberCounter).new(54) do |i| + ExecutionContext::FiberCounter.new(Fiber.new(name: "f#{i}") { }) + end + + global_queue = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + ready = Thread::WaitGroup.new(n) + shutdown = Thread::WaitGroup.new(n) + + all_runnables = Array(ExecutionContext::Runnables(16)).new(n) do + ExecutionContext::Runnables(16).new(global_queue) + end + + n.times do |i| + Thread.new(name: "RUN-#{i}") do |thread| + runnables = all_runnables[i] + slept = 0 + + execute = ->(fiber : Fiber) { + fc = fibers.find { |x| x.@fiber == fiber }.not_nil! + runnables.push(fiber) if fc.increment < increments + } + + ready.done + + loop do + # dequeue from local queue + if fiber = runnables.get? + execute.call(fiber) + slept = 0 + next + end + + # steal from another queue + while (r = all_runnables.sample) == runnables + end + if fiber = runnables.steal_from(r) + execute.call(fiber) + slept = 0 + next + end + + # dequeue from global queue + if fiber = global_queue.grab?(runnables, n) + execute.call(fiber) + slept = 0 + next + end + + if slept >= 100 + break + end + + slept += 1 + Thread.sleep(1.nanosecond) # don't burn CPU + end + rescue exception + Crystal::System.print_error "\nthread #{thread.name} raised: #{exception}" + ensure + shutdown.done + end + end + ready.wait + + # enqueue in batches + 0.step(to: fibers.size - 1, by: 9) do |i| + q = Fiber::Queue.new + 9.times { |j| q.push(fibers[i + j].@fiber) } + global_queue.bulk_push(pointerof(q)) + Thread.sleep(10.nanoseconds) if i % 2 == 1 + end + + shutdown.wait + + # must have dequeued each fiber exactly X times (no less, no more) + fibers.each { |fc| fc.counter.should eq(increments) } + end + end +end diff --git a/spec/std/execution_context/spec_helper.cr b/spec/std/execution_context/spec_helper.cr new file mode 100644 index 000000000000..9a1dbb881cee --- /dev/null +++ b/spec/std/execution_context/spec_helper.cr @@ -0,0 +1,21 @@ +require "../spec_helper" +require "crystal/system/thread_wait_group" +require "execution_context/runnables" +require "execution_context/global_queue" + +module ExecutionContext + class FiberCounter + def initialize(@fiber : Fiber) + @counter = Atomic(Int32).new(0) + end + + # fetch and add + def increment + @counter.add(1, :relaxed) + 1 + end + + def counter + @counter.get(:relaxed) + end + end +end diff --git a/src/execution_context/global_queue.cr b/src/execution_context/global_queue.cr new file mode 100644 index 000000000000..22535ab01ed6 --- /dev/null +++ b/src/execution_context/global_queue.cr @@ -0,0 +1,104 @@ +# The queue is a port of Go's `globrunq*` functions, distributed under a +# BSD-like license: +# + +require "../fiber/queue" +require "./runnables" + +module ExecutionContext + # Global queue of runnable fibers. + # Unbounded. + # Shared by all schedulers in an execution context. + # + # Basically a `Fiber::Queue` wrapped in a `Thread::Mutex`, at the exception of + # the `#grab?` method that tries to grab 1/Nth of the queue at once. + class GlobalQueue + def initialize(@mutex : Thread::Mutex) + @queue = Fiber::Queue.new + end + + # Grabs the lock and enqueues a runnable fiber on the global runnable queue. + def push(fiber : Fiber) : Nil + @mutex.synchronize { unsafe_push(fiber) } + end + + # Enqueues a runnable fiber on the global runnable queue. Assumes the lock + # is currently held. + def unsafe_push(fiber : Fiber) : Nil + @queue.push(fiber) + end + + # Grabs the lock and puts a runnable fiber on the global runnable queue. + def bulk_push(queue : Fiber::Queue*) : Nil + @mutex.synchronize { unsafe_bulk_push(queue) } + end + + # Puts a runnable fiber on the global runnable queue. Assumes the lock is + # currently held. + def unsafe_bulk_push(queue : Fiber::Queue*) : Nil + @queue.bulk_unshift(queue) + end + + # Grabs the lock and dequeues one runnable fiber from the global runnable + # queue. + def pop? : Fiber? + @mutex.synchronize { unsafe_pop? } + end + + # Dequeues one runnable fiber from the global runnable queue. Assumes the + # lock is currently held. + def unsafe_pop? : Fiber? + @queue.pop? + end + + # Grabs the lock then tries to grab a batch of fibers from the global + # runnable queue. Returns the next runnable fiber or `nil` if the queue was + # empty. + # + # `divisor` is meant for fair distribution of fibers across threads in the + # execution context; it should be the number of threads. + def grab?(runnables : Runnables, divisor : Int32) : Fiber? + @mutex.synchronize { unsafe_grab?(runnables, divisor) } + end + + # Try to grab a batch of fibers from the global runnable queue. Returns the + # next runnable fiber or `nil` if the queue was empty. Assumes the lock is + # currently held. + # + # `divisor` is meant for fair distribution of fibers across threads in the + # execution context; it should be the number of threads. + def unsafe_grab?(runnables : Runnables, divisor : Int32) : Fiber? + # ported from Go: globrunqget + return if @queue.empty? + + divisor = 1 if divisor < 1 + size = @queue.size + + n = { + size, # can't grab more than available + size // divisor + 1, # divide + try to take at least 1 fiber + runnables.capacity // 2, # refill half the destination queue + }.min + + fiber = @queue.pop? + + # OPTIMIZE: q = @queue.split(n - 1) then `runnables.push(pointerof(q))` (?) + (n - 1).times do + break unless f = @queue.pop? + runnables.push(f) + end + + fiber + end + + @[AlwaysInline] + def empty? : Bool + @queue.empty? + end + + @[AlwaysInline] + def size : Int32 + @queue.size + end + end +end diff --git a/src/execution_context/runnables.cr b/src/execution_context/runnables.cr new file mode 100644 index 000000000000..6be2fda446c0 --- /dev/null +++ b/src/execution_context/runnables.cr @@ -0,0 +1,210 @@ +# The queue is a port of Go's `runq*` functions, distributed under a BSD-like +# license: +# +# The queue derivates from the chase-lev lock-free queue with adaptations: +# +# - single ring buffer (per scheduler); +# - on overflow: bulk push half the ring to `GlobalQueue`; +# - on empty: bulk grab up to half the ring from `GlobalQueue`; +# - bulk push operation; + +require "../fiber/queue" +require "./global_queue" + +module ExecutionContext + # :nodoc: + # + # Local queue or runnable fibers for schedulers. + # Bounded. + # First-in, first-out semantics (FIFO). + # Single producer, multiple consumers thread safety. + # + # Private to an execution context scheduler, except for stealing methods that + # can be called from any thread in the execution context. + class Runnables(N) + def initialize(@global_queue : GlobalQueue) + # head is an index to the buffer where the next fiber to dequeue is. + # + # tail is an index to the buffer where the next fiber to enqueue will be + # (on the next push). + # + # head is always behind tail (not empty) or equal (empty) but never after + # tail (the queue would have a negative size => bug). + @head = Atomic(UInt32).new(0) + @tail = Atomic(UInt32).new(0) + @buffer = uninitialized Fiber[N] + end + + @[AlwaysInline] + def capacity : Int32 + N + end + + # Tries to push fiber on the local runnable queue. If the run queue is full, + # pushes fiber on the global queue, which will grab the global lock. + # + # Executed only by the owner. + def push(fiber : Fiber) : Nil + # ported from Go: runqput + loop do + head = @head.get(:acquire) # sync with consumers + tail = @tail.get(:relaxed) + + if (tail &- head) < N + # put fiber to local queue + @buffer.to_unsafe[tail % N] = fiber + + # make the fiber available for consumption + @tail.set(tail &+ 1, :release) + return + end + + if push_slow(fiber, head, tail) + return + end + + # failed to advance head (another scheduler stole fibers), + # the queue isn't full, now the push above must succeed + end + end + + private def push_slow(fiber : Fiber, head : UInt32, tail : UInt32) : Bool + # ported from Go: runqputslow + n = (tail &- head) // 2 + raise "BUG: queue is not full" if n != N // 2 + + # first, try to grab half of the fibers from local queue + batch = uninitialized Fiber[N] # actually N // 2 + 1 but that doesn't compile + n.times do |i| + batch.to_unsafe[i] = @buffer.to_unsafe[(head &+ i) % N] + end + _, success = @head.compare_and_set(head, head &+ n, :acquire_release, :acquire) + return false unless success + + # append fiber to the batch + batch.to_unsafe[n] = fiber + + # link the fibers + n.times do |i| + batch.to_unsafe[i].schedlink = batch.to_unsafe[i &+ 1] + end + queue = Fiber::Queue.new(batch.to_unsafe[0], batch.to_unsafe[n], size: (n &+ 1).to_i32) + + # now put the batch on global queue (grabs the global lock) + @global_queue.bulk_push(pointerof(queue)) + + true + end + + # Tries to enqueue all the fibers in `queue` into the local queue. If the + # queue is full, the overflow will be pushed to the global queue; in that + # case this will temporarily acquire the global queue lock. + # + # Executed only by the owner. + def bulk_push(queue : Fiber::Queue*) : Nil + # ported from Go: runqputbatch + head = @head.get(:acquire) # sync with other consumers + tail = @tail.get(:relaxed) + + while !queue.value.empty? && (tail &- head) < N + fiber = queue.value.pop + @buffer.to_unsafe[tail % N] = fiber + tail &+= 1 + end + + # make the fibers available for consumption + @tail.set(tail, :release) + + # put any overflow on global queue + @global_queue.bulk_push(queue) unless queue.value.empty? + end + + # Dequeues the next runnable fiber from the local queue. + # + # Executed only by the owner. + # TODO: rename as `#shift?` + def get? : Fiber? + # ported from Go: runqget + + head = @head.get(:acquire) # sync with other consumers + loop do + tail = @tail.get(:relaxed) + return if tail == head + + fiber = @buffer.to_unsafe[head % N] + head, success = @head.compare_and_set(head, head &+ 1, :acquire_release, :acquire) + return fiber if success + end + end + + # Steals half the fibers from the local queue of `src` and puts them onto + # the local queue. Returns one of the stolen fibers, or `nil` on failure. + # + # Only executed from the owner (when the local queue is empty). + def steal_from(src : Runnables) : Fiber? + # ported from Go: runqsteal + + tail = @tail.get(:relaxed) + n = src.grab(@buffer.to_unsafe, tail) + return if n == 0 + + # 'dequeue' last fiber from @buffer + n &-= 1 + fiber = @buffer.to_unsafe[(tail &+ n) % N] + return fiber if n == 0 + + head = @head.get(:acquire) # sync with consumers + if tail &- head &+ n >= N + raise "BUG: local queue overflow" + end + + # make the fibers available for consumption + @tail.set(tail &+ n, :release) + + fiber + end + + # Grabs a batch of fibers from local queue into `buffer` of size N (normally + # the ring buffer of another `Runnables`) starting at `buffer_head`. Returns + # number of grabbed fibers. + # + # Can be executed by any scheduler. + protected def grab(buffer : Fiber*, buffer_head : UInt32) : UInt32 + # ported from Go: runqgrab + + head = @head.get(:acquire) # sync with other consumers + loop do + tail = @tail.get(:acquire) # sync with the producer + + n = tail &- head + n -= n // 2 + return 0_u32 if n == 0 # queue is empty + + if n > N // 2 + # read inconsistent head and tail + head = @head.get(:acquire) + next + end + + n.times do |i| + fiber = @buffer.to_unsafe[(head &+ i) % N] + buffer[(buffer_head &+ i) % N] = fiber + end + + # try to mark the fiber as consumed + head, success = @head.compare_and_set(head, head &+ n, :acquire_release, :acquire) + return n if success + end + end + + @[AlwaysInline] + def empty? : Bool + @head.get(:relaxed) == @tail.get(:relaxed) + end + + @[AlwaysInline] + def size : UInt32 + @tail.get(:relaxed) &- @head.get(:relaxed) + end + end +end