Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Codegen: on demand distribution to forked processes #14273

Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 76 additions & 34 deletions src/compiler/crystal/compiler.cr
Original file line number Diff line number Diff line change
Expand Up @@ -546,62 +546,104 @@ module Crystal
return all_reused
end

{% if !Crystal::System::Process.class.has_method?("fork") %}
{% if !LibC.has_method?("fork") %}
ysbaddaden marked this conversation as resolved.
Show resolved Hide resolved
raise "Cannot fork compiler. `Crystal::System::Process.fork` is not implemented on this system."
{% elsif flag?(:preview_mt) %}
raise "Cannot fork compiler in multithread mode"
{% else %}
jobs_count = 0
wait_channel = Channel(Array(String)).new(@n_threads)
workers = fork_workers do |input, output|
while i = input.gets(chomp: true).presence
unit = units[i.to_i]
unit.compile
result = {name: unit.name, reused: unit.reused_previous_compilation?}
output.puts result.to_json
end
end

overqueue = 1
indexes = Atomic(Int32).new(0)
channel = Channel(String).new(@n_threads)
completed = Channel(Nil).new(@n_threads)

units.each_slice(Math.max(units.size // @n_threads, 1)) do |slice|
jobs_count += 1
workers.each do |pid, input, output|
spawn do
# For stats output we want to count how many previous
# .o files were reused, mainly to detect performance regressions.
# Because we fork, we must communicate using a pipe.
reused = [] of String
if wants_stats_or_progress
pr, pw = IO.pipe
spawn do
pr.each_line do |line|
unit = JSON.parse(line)
reused << unit["name"].as_s if unit["reused"].as_bool
@progress_tracker.stage_progress += 1
end
overqueued = 0

overqueue.times do
if (index = indexes.add(1)) < units.size
input.puts index
overqueued += 1
end
end

codegen_process = Crystal::System::Process.fork do
pipe_w = pw
slice.each do |unit|
unit.compile
if pipe_w
unit_json = {name: unit.name, reused: unit.reused_previous_compilation?}.to_json
pipe_w.puts unit_json
end
end
while (index = indexes.add(1)) < units.size
input.puts index

response = output.gets(chomp: true).not_nil!
channel.send response
end
Process.new(codegen_process).wait

if pipe_w = pw
pipe_w.close
Fiber.yield
overqueued.times do
response = output.gets(chomp: true).not_nil!
channel.send response
end

wait_channel.send reused
input << '\n'
input.close
output.close

Process.new(pid).wait
completed.send(nil)
end
end

jobs_count.times do
reused = wait_channel.receive
all_reused.concat(reused)
spawn do
@n_threads.times { completed.receive }
channel.close
end

while response = channel.receive?
next unless wants_stats_or_progress

result = JSON.parse(response)
all_reused << result["name"].as_s if result["reused"].as_bool
@progress_tracker.stage_progress += 1
end

all_reused
{% end %}
end

private def fork_workers
workers = [] of {Int32, IO::FileDescriptor, IO::FileDescriptor}

@n_threads.times do
iread, iwrite = IO.pipe
oread, owrite = IO.pipe

iwrite.flush_on_newline = true
owrite.flush_on_newline = true

pid = Crystal::System::Process.fork do
iwrite.close
oread.close

yield iread, owrite

iread.close
owrite.close
exit 0
end

iread.close
owrite.close

workers << {pid, iwrite, oread}
end

workers
end

private def print_macro_run_stats(program)
return unless @progress_tracker.stats?
return if program.compiled_macros_cache.empty?
Expand Down
Loading