Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add EventLoop#wait_[readable|writable] methods #15376

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/crystal/event_loop/file_descriptor.cr
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ abstract class Crystal::EventLoop
# Returns 0 when EOF is reached.
abstract def read(file_descriptor : Crystal::System::FileDescriptor, slice : Bytes) : Int32

# Blocks the current fiber until the file descriptor is ready for read.
abstract def wait_readable(file_descriptor : Crystal::System::FileDescriptor) : Nil

# Writes at least one byte from *slice* to the file descriptor.
#
# Blocks the current fiber if the file descriptor isn't ready for writing,
Expand All @@ -17,6 +20,9 @@ abstract class Crystal::EventLoop
# Returns the number of bytes written (up to `slice.size`).
abstract def write(file_descriptor : Crystal::System::FileDescriptor, slice : Bytes) : Int32

# Blocks the current fiber until the file descriptor is ready for write.
abstract def wait_writable(file_descriptor : Crystal::System::FileDescriptor) : Nil

# Closes the file descriptor resource.
abstract def close(file_descriptor : Crystal::System::FileDescriptor) : Nil
end
Expand Down
22 changes: 22 additions & 0 deletions src/crystal/event_loop/iocp.cr
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,21 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop
end.to_i32
end

def wait_readable(file_descriptor : Crystal::System::FileDescriptor) : Nil
raise NotImplementedError.new("Crystal::System::IOCP#wait_readable(FileDescriptor)")
end

def write(file_descriptor : Crystal::System::FileDescriptor, slice : Bytes) : Int32
System::IOCP.overlapped_operation(file_descriptor, "WriteFile", file_descriptor.write_timeout, writing: true) do |overlapped|
ret = LibC.WriteFile(file_descriptor.windows_handle, slice, slice.size, out byte_count, overlapped)
{ret, byte_count}
end.to_i32
end

def wait_writable(file_descriptor : Crystal::System::FileDescriptor) : Nil
raise NotImplementedError.new("Crystal::System::IOCP#wait_writable(FileDescriptor)")
end

def close(file_descriptor : Crystal::System::FileDescriptor) : Nil
LibC.CancelIoEx(file_descriptor.windows_handle, nil) unless file_descriptor.system_blocking?
end
Expand All @@ -220,6 +228,13 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop
bytes_read.to_i32
end

def wait_readable(socket : ::Socket) : Nil
# NOTE: Windows 10+ has `ProcessSocketNotifications` to associate sockets to
# a completion port and be notified of socket readiness. See
# <https://learn.microsoft.com/en-us/windows/win32/winsock/winsock-socket-state-notifications>
raise NotImplementedError.new("Crystal::System::IOCP#wait_readable(Socket)")
end

def write(socket : ::Socket, slice : Bytes) : Int32
wsabuf = wsa_buffer(slice)

Expand All @@ -231,6 +246,13 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop
bytes.to_i32
end

def wait_writable(socket : ::Socket) : Nil
# NOTE: Windows 10+ has `ProcessSocketNotifications` to associate sockets to
# a completion port and be notified of socket readiness. See
# <https://learn.microsoft.com/en-us/windows/win32/winsock/winsock-socket-state-notifications>
raise NotImplementedError.new("Crystal::System::IOCP#wait_writable(Socket)")
end

def send_to(socket : ::Socket, slice : Bytes, address : ::Socket::Address) : Int32
wsabuf = wsa_buffer(slice)
bytes_written = System::IOCP.wsa_overlapped_operation(socket, socket.fd, "WSASendTo", socket.write_timeout) do |overlapped|
Expand Down
36 changes: 32 additions & 4 deletions src/crystal/event_loop/libevent.cr
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop
end
end

def wait_readable(file_descriptor : Crystal::System::FileDescriptor) : Nil
file_descriptor.evented_wait_readable(raise_if_closed: false) do
raise IO::TimeoutError.new("Read timed out")
end
end

def write(file_descriptor : Crystal::System::FileDescriptor, slice : Bytes) : Int32
evented_write(file_descriptor, "Error writing file_descriptor") do
LibC.write(file_descriptor.fd, slice, slice.size).tap do |return_code|
Expand All @@ -94,6 +100,12 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop
end
end

def wait_writable(file_descriptor : Crystal::System::FileDescriptor) : Nil
file_descriptor.evented_wait_writable do
raise IO::TimeoutError.new("Write timed out")
end
end

def close(file_descriptor : Crystal::System::FileDescriptor) : Nil
file_descriptor.evented_close
end
Expand All @@ -104,12 +116,24 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop
end
end

def wait_readable(socket : ::Socket) : Nil
socket.evented_wait_readable(raise_if_closed: false) do
raise IO::TimeoutError.new("Read timed out")
end
end

def write(socket : ::Socket, slice : Bytes) : Int32
evented_write(socket, "Error writing to socket") do
LibC.send(socket.fd, slice, slice.size, 0).to_i32
end
end

def wait_writable(socket : ::Socket) : Nil
socket.evented_wait_writable do
raise IO::TimeoutError.new("Write timed out")
end
end

def receive_from(socket : ::Socket, slice : Bytes) : Tuple(Int32, ::Socket::Address)
sockaddr = Pointer(LibC::SockaddrStorage).malloc.as(LibC::Sockaddr*)
# initialize sockaddr with the initialized family of the socket
Expand Down Expand Up @@ -142,7 +166,7 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop
when Errno::EISCONN
return
when Errno::EINPROGRESS, Errno::EALREADY
socket.wait_writable(timeout: timeout) do
socket.evented_wait_writable(timeout: timeout) do
return IO::TimeoutError.new("connect timed out")
end
else
Expand Down Expand Up @@ -174,7 +198,7 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop
if socket.closed?
return
elsif Errno.value == Errno::EAGAIN
socket.wait_readable(raise_if_closed: false) do
socket.evented_wait_readable(raise_if_closed: false) do
raise IO::TimeoutError.new("Accept timed out")
end
return if socket.closed?
Expand All @@ -200,7 +224,9 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop
end

if Errno.value == Errno::EAGAIN
target.wait_readable
target.evented_wait_readable do
raise IO::TimeoutError.new("Read timed out")
end
else
raise IO::Error.from_errno(errno_msg, target: target)
end
Expand All @@ -218,7 +244,9 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop
end

if Errno.value == Errno::EAGAIN
target.wait_writable
target.evented_wait_writable do
raise IO::TimeoutError.new("Write timed out")
end
else
raise IO::Error.from_errno(errno_msg, target: target)
end
Expand Down
24 changes: 24 additions & 0 deletions src/crystal/event_loop/polling.cr
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop
end
end

def wait_readable(file_descriptor : System::FileDescriptor) : Nil
wait_readable(file_descriptor, file_descriptor.@read_timeout) do
raise IO::TimeoutError.new
end
end

def write(file_descriptor : System::FileDescriptor, slice : Bytes) : Int32
size = evented_write(file_descriptor, slice, file_descriptor.@write_timeout)

Expand All @@ -160,6 +166,12 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop
end
end

def wait_writable(file_descriptor : System::FileDescriptor) : Nil
wait_writable(file_descriptor, file_descriptor.@write_timeout) do
raise IO::TimeoutError.new
end
end

def close(file_descriptor : System::FileDescriptor) : Nil
evented_close(file_descriptor)
end
Expand All @@ -176,12 +188,24 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop
size
end

def wait_readable(socket : ::Socket) : Nil
wait_readable(socket, socket.@read_timeout) do
raise IO::TimeoutError.new
end
end

def write(socket : ::Socket, slice : Bytes) : Int32
size = evented_write(socket, slice, socket.@write_timeout)
raise IO::Error.from_errno("write", target: socket) if size == -1
size
end

def wait_writable(socket : ::Socket) : Nil
wait_writable(socket, socket.@write_timeout) do
raise IO::TimeoutError.new
end
end

def accept(socket : ::Socket) : ::Socket::Handle?
loop do
client_fd =
Expand Down
6 changes: 6 additions & 0 deletions src/crystal/event_loop/socket.cr
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ abstract class Crystal::EventLoop
# Use `#receive_from` for capturing the source address of a message.
abstract def read(socket : ::Socket, slice : Bytes) : Int32

# Blocks the current fiber until the socket is ready for read.
abstract def wait_readable(socket : ::Socket) : Nil

# Writes at least one byte from *slice* to the socket.
#
# Blocks the current fiber if the socket is not ready for writing,
Expand All @@ -25,6 +28,9 @@ abstract class Crystal::EventLoop
# Use `#send_to` for sending a message to a specific target address.
abstract def write(socket : ::Socket, slice : Bytes) : Int32

# Blocks the current fiber until the socket is ready for write.
abstract def wait_writable(socket : ::Socket) : Nil

# Accepts an incoming TCP connection on the socket.
#
# Blocks the current fiber if no connection is waiting, continuing when one
Expand Down
32 changes: 30 additions & 2 deletions src/crystal/event_loop/wasi.cr
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ class Crystal::EventLoop::Wasi < Crystal::EventLoop
end
end

def wait_readable(file_descriptor : Crystal::System::FileDescriptor) : Nil
file_descriptor.evented_wait_readable(raise_if_closed: false) do
raise IO::TimeoutError.new("Read timed out")
end
end

def write(file_descriptor : Crystal::System::FileDescriptor, slice : Bytes) : Int32
evented_write(file_descriptor, "Error writing file_descriptor") do
LibC.write(file_descriptor.fd, slice, slice.size).tap do |return_code|
Expand All @@ -49,6 +55,12 @@ class Crystal::EventLoop::Wasi < Crystal::EventLoop
end
end

def wait_writable(file_descriptor : Crystal::System::FileDescriptor) : Nil
file_descriptor.evented_wait_writable(raise_if_closed: false) do
raise IO::TimeoutError.new("Write timed out")
end
end

def close(file_descriptor : Crystal::System::FileDescriptor) : Nil
file_descriptor.evented_close
end
Expand All @@ -59,12 +71,24 @@ class Crystal::EventLoop::Wasi < Crystal::EventLoop
end
end

def wait_readable(socket : ::Socket) : Nil
socket.evented_wait_readable do
raise IO::TimeoutError.new("Read timed out")
end
end

def write(socket : ::Socket, slice : Bytes) : Int32
evented_write(socket, "Error writing to socket") do
LibC.send(socket.fd, slice, slice.size, 0)
end
end

def wait_writable(socket : ::Socket) : Nil
socket.evented_wait_writable do
raise IO::TimeoutError.new("Write timed out")
end
end

def receive_from(socket : ::Socket, slice : Bytes) : Tuple(Int32, ::Socket::Address)
raise NotImplementedError.new "Crystal::Wasi::EventLoop#receive_from"
end
Expand Down Expand Up @@ -94,7 +118,9 @@ class Crystal::EventLoop::Wasi < Crystal::EventLoop
end

if Errno.value == Errno::EAGAIN
target.wait_readable
target.evented_wait_readable do
raise IO::TimeoutError.new("Read timed out")
end
else
raise IO::Error.from_errno(errno_msg, target: target)
end
Expand All @@ -112,7 +138,9 @@ class Crystal::EventLoop::Wasi < Crystal::EventLoop
end

if Errno.value == Errno::EAGAIN
target.wait_writable
target.evented_wait_writable do
raise IO::TimeoutError.new("Write timed out")
end
else
raise IO::Error.from_errno(errno_msg, target: target)
end
Expand Down
14 changes: 2 additions & 12 deletions src/io/evented.cr
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,7 @@ module IO::Evented
end

# :nodoc:
def wait_readable(timeout = @read_timeout) : Nil
wait_readable(timeout: timeout) { raise TimeoutError.new("Read timed out") }
end

# :nodoc:
def wait_readable(timeout = @read_timeout, *, raise_if_closed = true, &) : Nil
def evented_wait_readable(timeout = @read_timeout, *, raise_if_closed = true, &) : Nil
readers = @readers.get { Deque(Fiber).new }
readers << Fiber.current
add_read_event(timeout)
Expand All @@ -59,12 +54,7 @@ module IO::Evented
end

# :nodoc:
def wait_writable(timeout = @write_timeout) : Nil
wait_writable(timeout: timeout) { raise TimeoutError.new("Write timed out") }
end

# :nodoc:
def wait_writable(timeout = @write_timeout, &) : Nil
def evented_wait_writable(timeout = @write_timeout, &) : Nil
writers = @writers.get { Deque(Fiber).new }
writers << Fiber.current
add_write_event(timeout)
Expand Down