Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't spin on the main mutex while waiting for new work #8433

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/runtime/synchronization_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,7 @@ class fast_cond {

ALWAYS_INLINE void broadcast() {
if_tsan_pre_signal(this);

uintptr_t val;
atomic_load_relaxed(&state, &val);
if (val == 0) {
Expand All @@ -846,6 +847,7 @@ class fast_cond {
}

ALWAYS_INLINE void wait(fast_mutex *mutex) {
// Go to sleep until signaled
wait_parking_control control(&state, mutex);
uintptr_t result = control.park((uintptr_t)this);
if (result != (uintptr_t)mutex) {
Expand Down
104 changes: 69 additions & 35 deletions src/runtime/thread_pool_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,57 @@ namespace Halide {
namespace Runtime {
namespace Internal {

// A condition variable, augmented with a bit of spinning on an atomic counter
// before going to sleep for real. This helps reduce overhead at the end of a
// parallel for loop when idle worker threads are waiting for other threads to
// finish so that the next parallel for loop can begin.
struct halide_cond_with_spinning {
halide_cond cond;
uintptr_t counter;

void wait(halide_mutex *mutex) {
// First spin for a bit, checking the counter for another thread to bump
// it.
uintptr_t initial;
Synchronization::atomic_load_relaxed(&counter, &initial);
halide_mutex_unlock(mutex);
for (int spin = 0; spin < 40; spin++) {
halide_thread_yield();
uintptr_t current;
Synchronization::atomic_load_relaxed(&counter, &current);
if (current != initial) {
halide_mutex_lock(mutex);
return;
}
}

// Give up on spinning and relock the mutex preparing to sleep for real.
halide_mutex_lock(mutex);

// Check one final time with the lock held. This guarantees we won't
// miss an increment of the counter because it is only ever incremented
// with the lock held.
uintptr_t current;
Synchronization::atomic_load_relaxed(&counter, &current);
if (current != initial) {
return;
}

halide_cond_wait(&cond, mutex);
}

void broadcast() {
// Release any spinning waiters
Synchronization::atomic_fetch_add_acquire_release(&counter, (uintptr_t)1);

// Release any sleeping waiters
halide_cond_broadcast(&cond);
}

// Note that this cond var variant doesn't have signal(), because it always
// wakes all spinning waiters.
};

struct work {
halide_parallel_task_t task;

Expand Down Expand Up @@ -121,7 +172,7 @@ struct work_queue_t {
// may want to wake them up independently. Any code that may
// invalidate any of the reasons a worker or owner may have slept
// must signal or broadcast the appropriate condition variable.
halide_cond wake_a_team, wake_b_team, wake_owners;
halide_cond_with_spinning wake_a_team, wake_b_team, wake_owners;

// The number of sleeping workers and owners. An over-estimate - a
// waking-up thread may not have decremented this yet.
Expand Down Expand Up @@ -203,9 +254,6 @@ WEAK void dump_job_state() {
WEAK void worker_thread(void *);

WEAK void worker_thread_already_locked(work *owned_job) {
int spin_count = 0;
const int max_spin_count = 40;

while (owned_job ? owned_job->running() : !work_queue.shutdown) {
work *job = work_queue.jobs;
work **prev_ptr = &work_queue.jobs;
Expand All @@ -226,7 +274,7 @@ WEAK void worker_thread_already_locked(work *owned_job) {
// The wakeup can likely be only done under certain conditions, but it is only happening
// in when an error has already occured and it seems more important to ensure reliable
// termination than to optimize this path.
halide_cond_broadcast(&work_queue.wake_owners);
work_queue.wake_owners.broadcast();
continue;
}
}
Expand Down Expand Up @@ -283,38 +331,24 @@ WEAK void worker_thread_already_locked(work *owned_job) {
if (!job) {
// There is no runnable job. Go to sleep.
if (owned_job) {
if (spin_count++ < max_spin_count) {
// Give the workers a chance to finish up before sleeping
halide_mutex_unlock(&work_queue.mutex);
halide_thread_yield();
halide_mutex_lock(&work_queue.mutex);
} else {
work_queue.owners_sleeping++;
owned_job->owner_is_sleeping = true;
halide_cond_wait(&work_queue.wake_owners, &work_queue.mutex);
owned_job->owner_is_sleeping = false;
work_queue.owners_sleeping--;
}
work_queue.owners_sleeping++;
owned_job->owner_is_sleeping = true;
work_queue.wake_owners.wait(&work_queue.mutex);
owned_job->owner_is_sleeping = false;
work_queue.owners_sleeping--;
} else {
work_queue.workers_sleeping++;
if (work_queue.a_team_size > work_queue.target_a_team_size) {
// Transition to B team
work_queue.a_team_size--;
halide_cond_wait(&work_queue.wake_b_team, &work_queue.mutex);
work_queue.wake_b_team.wait(&work_queue.mutex);
work_queue.a_team_size++;
} else if (spin_count++ < max_spin_count) {
// Spin waiting for new work
halide_mutex_unlock(&work_queue.mutex);
halide_thread_yield();
halide_mutex_lock(&work_queue.mutex);
} else {
halide_cond_wait(&work_queue.wake_a_team, &work_queue.mutex);
work_queue.wake_a_team.wait(&work_queue.mutex);
}
work_queue.workers_sleeping--;
}
continue;
} else {
spin_count = 0;
}

log_message("Working on job " << job->task.name);
Expand Down Expand Up @@ -432,7 +466,7 @@ WEAK void worker_thread_already_locked(work *owned_job) {
if (wake_owners ||
(job->active_workers == 0 && (job->task.extent == 0 || job->exit_status != halide_error_code_success) && job->owner_is_sleeping)) {
// The job is done or some owned job failed via sibling linkage. Wake up the owner.
halide_cond_broadcast(&work_queue.wake_owners);
work_queue.wake_owners.broadcast();
}
}
}
Expand Down Expand Up @@ -554,11 +588,11 @@ WEAK void enqueue_work_already_locked(int num_jobs, work *jobs, work *task_paren
work_queue.target_a_team_size = workers_to_wake;
}

halide_cond_broadcast(&work_queue.wake_a_team);
work_queue.wake_a_team.broadcast();
if (work_queue.target_a_team_size > work_queue.a_team_size) {
halide_cond_broadcast(&work_queue.wake_b_team);
work_queue.wake_b_team.broadcast();
if (stealable_jobs) {
halide_cond_broadcast(&work_queue.wake_owners);
work_queue.wake_owners.broadcast();
}
}

Expand Down Expand Up @@ -707,9 +741,9 @@ WEAK void halide_shutdown_thread_pool() {
halide_mutex_lock(&work_queue.mutex);

work_queue.shutdown = true;
halide_cond_broadcast(&work_queue.wake_owners);
halide_cond_broadcast(&work_queue.wake_a_team);
halide_cond_broadcast(&work_queue.wake_b_team);
work_queue.wake_owners.broadcast();
work_queue.wake_a_team.broadcast();
work_queue.wake_b_team.broadcast();
halide_mutex_unlock(&work_queue.mutex);

// Wait until they leave
Expand Down Expand Up @@ -739,8 +773,8 @@ WEAK int halide_default_semaphore_release(halide_semaphore_t *s, int n) {
if (old_val == 0 && n != 0) { // Don't wake if nothing released.
// We may have just made a job runnable
halide_mutex_lock(&work_queue.mutex);
halide_cond_broadcast(&work_queue.wake_a_team);
halide_cond_broadcast(&work_queue.wake_owners);
work_queue.wake_a_team.broadcast();
work_queue.wake_owners.broadcast();
halide_mutex_unlock(&work_queue.mutex);
}
return old_val + n;
Expand Down