diff --git a/src/compiler/crystal/compiler.cr b/src/compiler/crystal/compiler.cr index 899ef242f318..ddcd9e205c40 100644 --- a/src/compiler/crystal/compiler.cr +++ b/src/compiler/crystal/compiler.cr @@ -535,10 +535,13 @@ module Crystal wants_stats_or_progress = @progress_tracker.stats? || @progress_tracker.progress? - # If threads is 1 and no stats/progress is needed we can avoid - # fork/spawn/channels altogether. This is particularly useful for - # CI because there forking eventually leads to "out of memory" errors. - if @n_threads == 1 + # Don't start more processes than compilation units + n_threads = @n_threads.clamp(1..units.size) + + # If threads is 1 we can avoid fork/spawn/channels altogether. This is + # particularly useful for CI because there forking eventually leads to + # "out of memory" errors. + if n_threads == 1 units.each do |unit| unit.compile all_reused << unit.name if wants_stats_or_progress && unit.reused_previous_compilation? @@ -546,62 +549,104 @@ module Crystal return all_reused end - {% if !Crystal::System::Process.class.has_method?("fork") %} + {% if !LibC.has_method?("fork") %} raise "Cannot fork compiler. `Crystal::System::Process.fork` is not implemented on this system." {% elsif flag?(:preview_mt) %} raise "Cannot fork compiler in multithread mode" {% else %} - jobs_count = 0 - wait_channel = Channel(Array(String)).new(@n_threads) + workers = fork_workers(n_threads) do |input, output| + while i = input.gets(chomp: true).presence + unit = units[i.to_i] + unit.compile + result = {name: unit.name, reused: unit.reused_previous_compilation?} + output.puts result.to_json + end + end + + overqueue = 1 + indexes = Atomic(Int32).new(0) + channel = Channel(String).new(n_threads) + completed = Channel(Nil).new(n_threads) - units.each_slice(Math.max(units.size // @n_threads, 1)) do |slice| - jobs_count += 1 + workers.each do |pid, input, output| spawn do - # For stats output we want to count how many previous - # .o files were reused, mainly to detect performance regressions. - # Because we fork, we must communicate using a pipe. - reused = [] of String - if wants_stats_or_progress - pr, pw = IO.pipe - spawn do - pr.each_line do |line| - unit = JSON.parse(line) - reused << unit["name"].as_s if unit["reused"].as_bool - @progress_tracker.stage_progress += 1 - end + overqueued = 0 + + overqueue.times do + if (index = indexes.add(1)) < units.size + input.puts index + overqueued += 1 end end - codegen_process = Crystal::System::Process.fork do - pipe_w = pw - slice.each do |unit| - unit.compile - if pipe_w - unit_json = {name: unit.name, reused: unit.reused_previous_compilation?}.to_json - pipe_w.puts unit_json - end - end + while (index = indexes.add(1)) < units.size + input.puts index + + response = output.gets(chomp: true).not_nil! + channel.send response end - Process.new(codegen_process).wait - if pipe_w = pw - pipe_w.close - Fiber.yield + overqueued.times do + response = output.gets(chomp: true).not_nil! + channel.send response end - wait_channel.send reused + input << '\n' + input.close + output.close + + Process.new(pid).wait + completed.send(nil) end end - jobs_count.times do - reused = wait_channel.receive - all_reused.concat(reused) + spawn do + n_threads.times { completed.receive } + channel.close + end + + while response = channel.receive? + next unless wants_stats_or_progress + + result = JSON.parse(response) + all_reused << result["name"].as_s if result["reused"].as_bool + @progress_tracker.stage_progress += 1 end all_reused {% end %} end + private def fork_workers(n_threads) + workers = [] of {Int32, IO::FileDescriptor, IO::FileDescriptor} + + n_threads.times do + iread, iwrite = IO.pipe + oread, owrite = IO.pipe + + iwrite.flush_on_newline = true + owrite.flush_on_newline = true + + pid = Crystal::System::Process.fork do + iwrite.close + oread.close + + yield iread, owrite + + iread.close + owrite.close + exit 0 + end + + iread.close + owrite.close + + workers << {pid, iwrite, oread} + end + + workers + end + private def print_macro_run_stats(program) return unless @progress_tracker.stats? return if program.compiled_macros_cache.empty?