From c70496d322c5d32811815cbc1506de23c95e51b8 Mon Sep 17 00:00:00 2001 From: Andrew Adams Date: Thu, 5 Dec 2024 08:57:25 -0800 Subject: [PATCH] Don't spin on the main mutex while waiting for new work (#8433) * Don't spin on the main mutex while waiting for new work Once they run out of work to do, Halide worker threads spin for a bit checking if new work has been enqueued before calling cond_wait, which puts them to sleep until signaled. Job owners also spin waiting for their job to complete before going to sleep on a different condition variable. I hate this, but all previous attempts I have made at removing or reducing the spinning have made things slower. One problem with this approach is that spinning is done by releasing the work queue lock, yielding, reacquiring the work queue lock, and doing the somewhat involved check to see if there's something useful for this thread to do, either because new work was enqueued, the last item on a job completed, or a semaphore was released. This hammering of the lock by idle worker threads can starve the thread that actually completed the last task, delaying its ability to tell the job owner the job is done, and can also starve the job owner, causing it to take extra time to realize the job is all done and return back into Halide code. So this adds some wasted time at the end of every parallel for loop. This PR gets these idle threads to spin off the main mutex. I did this by combining a counter with a condition variable. Any time they are signaled, the counter is atomically incremented. Before they first release the lock, the idlers atomically capture the value of this counter. Then in cond_wait they spin for a bit doing atomic loads of the counter in between yields until it changes, in which case they grab the lock and return, or until they reach the spin count limit, in which case they go to sleep. This improved performance quite a bit over main for the blur app, which is a fast pipeline (~100us) with fine-grained parallelism. The speed-up was 1.2x! Not much effect on the more complex apps. --- src/runtime/synchronization_common.h | 2 + src/runtime/thread_pool_common.h | 104 ++++++++++++++++++--------- 2 files changed, 71 insertions(+), 35 deletions(-) diff --git a/src/runtime/synchronization_common.h b/src/runtime/synchronization_common.h index 778c423e4046..9b19dc92d8cc 100644 --- a/src/runtime/synchronization_common.h +++ b/src/runtime/synchronization_common.h @@ -834,6 +834,7 @@ class fast_cond { ALWAYS_INLINE void broadcast() { if_tsan_pre_signal(this); + uintptr_t val; atomic_load_relaxed(&state, &val); if (val == 0) { @@ -846,6 +847,7 @@ class fast_cond { } ALWAYS_INLINE void wait(fast_mutex *mutex) { + // Go to sleep until signaled wait_parking_control control(&state, mutex); uintptr_t result = control.park((uintptr_t)this); if (result != (uintptr_t)mutex) { diff --git a/src/runtime/thread_pool_common.h b/src/runtime/thread_pool_common.h index b13427a4261c..961ed479060e 100644 --- a/src/runtime/thread_pool_common.h +++ b/src/runtime/thread_pool_common.h @@ -30,6 +30,57 @@ namespace Halide { namespace Runtime { namespace Internal { +// A condition variable, augmented with a bit of spinning on an atomic counter +// before going to sleep for real. This helps reduce overhead at the end of a +// parallel for loop when idle worker threads are waiting for other threads to +// finish so that the next parallel for loop can begin. +struct halide_cond_with_spinning { + halide_cond cond; + uintptr_t counter; + + void wait(halide_mutex *mutex) { + // First spin for a bit, checking the counter for another thread to bump + // it. + uintptr_t initial; + Synchronization::atomic_load_relaxed(&counter, &initial); + halide_mutex_unlock(mutex); + for (int spin = 0; spin < 40; spin++) { + halide_thread_yield(); + uintptr_t current; + Synchronization::atomic_load_relaxed(&counter, ¤t); + if (current != initial) { + halide_mutex_lock(mutex); + return; + } + } + + // Give up on spinning and relock the mutex preparing to sleep for real. + halide_mutex_lock(mutex); + + // Check one final time with the lock held. This guarantees we won't + // miss an increment of the counter because it is only ever incremented + // with the lock held. + uintptr_t current; + Synchronization::atomic_load_relaxed(&counter, ¤t); + if (current != initial) { + return; + } + + halide_cond_wait(&cond, mutex); + } + + void broadcast() { + // Release any spinning waiters + Synchronization::atomic_fetch_add_acquire_release(&counter, (uintptr_t)1); + + // Release any sleeping waiters + halide_cond_broadcast(&cond); + } + + // Note that this cond var variant doesn't have signal(), because it always + // wakes all spinning waiters. +}; + struct work { halide_parallel_task_t task; @@ -121,7 +172,7 @@ struct work_queue_t { // may want to wake them up independently. Any code that may // invalidate any of the reasons a worker or owner may have slept // must signal or broadcast the appropriate condition variable. - halide_cond wake_a_team, wake_b_team, wake_owners; + halide_cond_with_spinning wake_a_team, wake_b_team, wake_owners; // The number of sleeping workers and owners. An over-estimate - a // waking-up thread may not have decremented this yet. @@ -203,9 +254,6 @@ WEAK void dump_job_state() { WEAK void worker_thread(void *); WEAK void worker_thread_already_locked(work *owned_job) { - int spin_count = 0; - const int max_spin_count = 40; - while (owned_job ? owned_job->running() : !work_queue.shutdown) { work *job = work_queue.jobs; work **prev_ptr = &work_queue.jobs; @@ -226,7 +274,7 @@ WEAK void worker_thread_already_locked(work *owned_job) { // The wakeup can likely be only done under certain conditions, but it is only happening // in when an error has already occured and it seems more important to ensure reliable // termination than to optimize this path. - halide_cond_broadcast(&work_queue.wake_owners); + work_queue.wake_owners.broadcast(); continue; } } @@ -283,38 +331,24 @@ WEAK void worker_thread_already_locked(work *owned_job) { if (!job) { // There is no runnable job. Go to sleep. if (owned_job) { - if (spin_count++ < max_spin_count) { - // Give the workers a chance to finish up before sleeping - halide_mutex_unlock(&work_queue.mutex); - halide_thread_yield(); - halide_mutex_lock(&work_queue.mutex); - } else { - work_queue.owners_sleeping++; - owned_job->owner_is_sleeping = true; - halide_cond_wait(&work_queue.wake_owners, &work_queue.mutex); - owned_job->owner_is_sleeping = false; - work_queue.owners_sleeping--; - } + work_queue.owners_sleeping++; + owned_job->owner_is_sleeping = true; + work_queue.wake_owners.wait(&work_queue.mutex); + owned_job->owner_is_sleeping = false; + work_queue.owners_sleeping--; } else { work_queue.workers_sleeping++; if (work_queue.a_team_size > work_queue.target_a_team_size) { // Transition to B team work_queue.a_team_size--; - halide_cond_wait(&work_queue.wake_b_team, &work_queue.mutex); + work_queue.wake_b_team.wait(&work_queue.mutex); work_queue.a_team_size++; - } else if (spin_count++ < max_spin_count) { - // Spin waiting for new work - halide_mutex_unlock(&work_queue.mutex); - halide_thread_yield(); - halide_mutex_lock(&work_queue.mutex); } else { - halide_cond_wait(&work_queue.wake_a_team, &work_queue.mutex); + work_queue.wake_a_team.wait(&work_queue.mutex); } work_queue.workers_sleeping--; } continue; - } else { - spin_count = 0; } log_message("Working on job " << job->task.name); @@ -432,7 +466,7 @@ WEAK void worker_thread_already_locked(work *owned_job) { if (wake_owners || (job->active_workers == 0 && (job->task.extent == 0 || job->exit_status != halide_error_code_success) && job->owner_is_sleeping)) { // The job is done or some owned job failed via sibling linkage. Wake up the owner. - halide_cond_broadcast(&work_queue.wake_owners); + work_queue.wake_owners.broadcast(); } } } @@ -554,11 +588,11 @@ WEAK void enqueue_work_already_locked(int num_jobs, work *jobs, work *task_paren work_queue.target_a_team_size = workers_to_wake; } - halide_cond_broadcast(&work_queue.wake_a_team); + work_queue.wake_a_team.broadcast(); if (work_queue.target_a_team_size > work_queue.a_team_size) { - halide_cond_broadcast(&work_queue.wake_b_team); + work_queue.wake_b_team.broadcast(); if (stealable_jobs) { - halide_cond_broadcast(&work_queue.wake_owners); + work_queue.wake_owners.broadcast(); } } @@ -707,9 +741,9 @@ WEAK void halide_shutdown_thread_pool() { halide_mutex_lock(&work_queue.mutex); work_queue.shutdown = true; - halide_cond_broadcast(&work_queue.wake_owners); - halide_cond_broadcast(&work_queue.wake_a_team); - halide_cond_broadcast(&work_queue.wake_b_team); + work_queue.wake_owners.broadcast(); + work_queue.wake_a_team.broadcast(); + work_queue.wake_b_team.broadcast(); halide_mutex_unlock(&work_queue.mutex); // Wait until they leave @@ -739,8 +773,8 @@ WEAK int halide_default_semaphore_release(halide_semaphore_t *s, int n) { if (old_val == 0 && n != 0) { // Don't wake if nothing released. // We may have just made a job runnable halide_mutex_lock(&work_queue.mutex); - halide_cond_broadcast(&work_queue.wake_a_team); - halide_cond_broadcast(&work_queue.wake_owners); + work_queue.wake_a_team.broadcast(); + work_queue.wake_owners.broadcast(); halide_mutex_unlock(&work_queue.mutex); } return old_val + n;