From 8f956876c14a37db0852b0dc658177165f24767a Mon Sep 17 00:00:00 2001 From: K Pamnany Date: Wed, 5 Mar 2025 16:28:45 -0500 Subject: [PATCH] Thread wakeup: replace serial wakeup with tree wakeup --- src/julia_threads.h | 1 + src/safepoint.c | 9 ++++++++- src/scheduler.c | 46 +++++++++++++++++++++++++++++++++++---------- 3 files changed, 45 insertions(+), 11 deletions(-) diff --git a/src/julia_threads.h b/src/julia_threads.h index dbe9166f288a9..d2d4f1d864568 100644 --- a/src/julia_threads.h +++ b/src/julia_threads.h @@ -219,6 +219,7 @@ typedef struct _jl_tls_states_t { // some hidden state (usually just because we don't have the type's size declaration) #ifdef JL_LIBRARY_EXPORTS uv_mutex_t sleep_lock; + int wake_next; uv_cond_t wake_signal; #endif } jl_tls_states_t; diff --git a/src/safepoint.c b/src/safepoint.c index 970c48875d790..ecfeacd2831a9 100644 --- a/src/safepoint.c +++ b/src/safepoint.c @@ -270,6 +270,8 @@ void jl_safepoint_wait_gc(jl_task_t *ct) JL_NOTSAFEPOINT } } +int wake_next_threads(int16_t) JL_NOTSAFEPOINT; + // equivalent to jl_set_gc_and_wait, but waiting on resume-thread lock instead void jl_safepoint_wait_thread_resume(jl_task_t *ct) { @@ -288,8 +290,13 @@ void jl_safepoint_wait_thread_resume(jl_task_t *ct) uv_cond_broadcast(&safepoint_cond_begin); uv_mutex_unlock(&safepoint_lock); uv_mutex_lock(&ct->ptls->sleep_lock); - while (jl_atomic_load_relaxed(&ct->ptls->suspend_count)) + while (jl_atomic_load_relaxed(&ct->ptls->suspend_count)) { uv_cond_wait(&ct->ptls->wake_signal, &ct->ptls->sleep_lock); + if (ct->ptls->wake_next) { + ct->ptls->wake_next = 0; + wake_next_threads(ct->ptls->tid); + } + } } // must exit gc while still holding the mutex_unlock, so we know other // threads in jl_safepoint_suspend_thread will observe this thread in the diff --git a/src/scheduler.c b/src/scheduler.c index 731a0c5146605..4e15e97b640ac 100644 --- a/src/scheduler.c +++ b/src/scheduler.c @@ -127,6 +127,7 @@ void jl_threadfun(void *arg) void jl_init_thread_scheduler(jl_ptls_t ptls) JL_NOTSAFEPOINT { uv_mutex_init(&ptls->sleep_lock); + ptls->wake_next = 0; uv_cond_init(&ptls->wake_signal); // record that there is now another thread that may be used to schedule work // we will decrement this again in scheduler_delete_thread, only slightly @@ -213,10 +214,13 @@ static int set_not_sleeping(jl_ptls_t ptls) JL_NOTSAFEPOINT return 0; } -static int wake_thread(int16_t tid) JL_NOTSAFEPOINT +int wake_next_threads(int16_t) JL_NOTSAFEPOINT; + +static int wake_thread(int16_t tid, int wake_next) JL_NOTSAFEPOINT { jl_ptls_t ptls2 = jl_atomic_load_relaxed(&jl_all_tls_states)[tid]; + // only do something if tid is sleeping if (jl_atomic_load_relaxed(&ptls2->sleep_check_state) != not_sleeping) { int8_t state = sleeping; if (jl_atomic_cmpswap_relaxed(&ptls2->sleep_check_state, &state, not_sleeping)) { @@ -224,14 +228,35 @@ static int wake_thread(int16_t tid) JL_NOTSAFEPOINT assert(wasrunning); (void)wasrunning; JL_PROBE_RT_SLEEP_CHECK_WAKE(ptls2, state); uv_mutex_lock(&ptls2->sleep_lock); + ptls2->wake_next = wake_next; uv_cond_signal(&ptls2->wake_signal); uv_mutex_unlock(&ptls2->sleep_lock); return 1; } + // TODO: tid was sleeping, but we failed to transition its state?! } + + // tid wasn't sleeping so we might have to wake its children for it + if (wake_next) + return wake_next_threads(tid); + return 0; } +int wake_next_threads(int16_t tid) JL_NOTSAFEPOINT +{ + int any_asleep = 0; + int nthreads = jl_atomic_load_acquire(&jl_n_threads) - jl_n_gcthreads; + int16_t tid1 = (2 * tid) + 1; + if (tid1 < nthreads) { + any_asleep |= wake_thread(tid1, 1); + int16_t tid2 = (2 * tid) + 2; + if (tid2 < nthreads) { + any_asleep |= wake_thread(tid2, 1); + } + } + return any_asleep; +} static void wake_libuv(void) JL_NOTSAFEPOINT { @@ -262,7 +287,8 @@ void wakeup_thread(jl_task_t *ct, int16_t tid) JL_NOTSAFEPOINT { // Pass in ptls } else { // something added to the sticky-queue: notify that thread - if (wake_thread(tid) && uvlock != ct) { + wake_thread(tid, 0); + if (uvlock != ct) { // check if we need to notify uv_run too jl_fence(); jl_ptls_t other = jl_atomic_load_relaxed(&jl_all_tls_states)[tid]; @@ -279,14 +305,9 @@ void wakeup_thread(jl_task_t *ct, int16_t tid) JL_NOTSAFEPOINT { // Pass in ptls // something added to the multi-queue: notify all threads // in the future, we might want to instead wake some fraction of threads, // and let each of those wake additional threads if they find work - int anysleep = 0; - int nthreads = jl_atomic_load_acquire(&jl_n_threads); - for (tid = 0; tid < nthreads; tid++) { - if (tid != self) - anysleep |= wake_thread(tid); - } - // check if we need to notify uv_run too - if (uvlock != ct && anysleep) { + wake_thread(0, 1); + if (uvlock != ct) { + // check if we need to notify uv_run too jl_fence(); if (jl_atomic_load_relaxed(&jl_uv_mutex.owner) != NULL) wake_libuv(); @@ -518,6 +539,11 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, } // else should we warn the user of certain deadlock here if tid == 0 && n_threads_running == 0? uv_cond_wait(&ptls->wake_signal, &ptls->sleep_lock); + // we were woken up; should we wake others? + if (!may_sleep(ptls) && ptls->wake_next) { + wake_next_threads(ptls->tid); + ptls->wake_next = 0; + } } assert(jl_atomic_load_relaxed(&ptls->sleep_check_state) == not_sleeping); assert(jl_atomic_load_relaxed(&n_threads_running));