Skip to content

Commit

Permalink
Thread wakeup: replace serial wakeup with tree wakeup
Browse files Browse the repository at this point in the history
  • Loading branch information
kpamnany committed Mar 6, 2025
1 parent beb928b commit 8f95687
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 11 deletions.
1 change: 1 addition & 0 deletions src/julia_threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ typedef struct _jl_tls_states_t {
// some hidden state (usually just because we don't have the type's size declaration)
#ifdef JL_LIBRARY_EXPORTS
uv_mutex_t sleep_lock;
int wake_next;
uv_cond_t wake_signal;
#endif
} jl_tls_states_t;
Expand Down
9 changes: 8 additions & 1 deletion src/safepoint.c
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ void jl_safepoint_wait_gc(jl_task_t *ct) JL_NOTSAFEPOINT
}
}

int wake_next_threads(int16_t) JL_NOTSAFEPOINT;

// equivalent to jl_set_gc_and_wait, but waiting on resume-thread lock instead
void jl_safepoint_wait_thread_resume(jl_task_t *ct)
{
Expand All @@ -288,8 +290,13 @@ void jl_safepoint_wait_thread_resume(jl_task_t *ct)
uv_cond_broadcast(&safepoint_cond_begin);
uv_mutex_unlock(&safepoint_lock);
uv_mutex_lock(&ct->ptls->sleep_lock);
while (jl_atomic_load_relaxed(&ct->ptls->suspend_count))
while (jl_atomic_load_relaxed(&ct->ptls->suspend_count)) {
uv_cond_wait(&ct->ptls->wake_signal, &ct->ptls->sleep_lock);
if (ct->ptls->wake_next) {
ct->ptls->wake_next = 0;
wake_next_threads(ct->ptls->tid);
}
}
}
// must exit gc while still holding the mutex_unlock, so we know other
// threads in jl_safepoint_suspend_thread will observe this thread in the
Expand Down
46 changes: 36 additions & 10 deletions src/scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ void jl_threadfun(void *arg)
void jl_init_thread_scheduler(jl_ptls_t ptls) JL_NOTSAFEPOINT
{
uv_mutex_init(&ptls->sleep_lock);
ptls->wake_next = 0;
uv_cond_init(&ptls->wake_signal);
// record that there is now another thread that may be used to schedule work
// we will decrement this again in scheduler_delete_thread, only slightly
Expand Down Expand Up @@ -213,25 +214,49 @@ static int set_not_sleeping(jl_ptls_t ptls) JL_NOTSAFEPOINT
return 0;
}

static int wake_thread(int16_t tid) JL_NOTSAFEPOINT
int wake_next_threads(int16_t) JL_NOTSAFEPOINT;

static int wake_thread(int16_t tid, int wake_next) JL_NOTSAFEPOINT
{
jl_ptls_t ptls2 = jl_atomic_load_relaxed(&jl_all_tls_states)[tid];

// only do something if tid is sleeping
if (jl_atomic_load_relaxed(&ptls2->sleep_check_state) != not_sleeping) {
int8_t state = sleeping;
if (jl_atomic_cmpswap_relaxed(&ptls2->sleep_check_state, &state, not_sleeping)) {
int wasrunning = jl_atomic_fetch_add_relaxed(&n_threads_running, 1); // increment in-flight wakeup count
assert(wasrunning); (void)wasrunning;
JL_PROBE_RT_SLEEP_CHECK_WAKE(ptls2, state);
uv_mutex_lock(&ptls2->sleep_lock);
ptls2->wake_next = wake_next;
uv_cond_signal(&ptls2->wake_signal);
uv_mutex_unlock(&ptls2->sleep_lock);
return 1;
}
// TODO: tid was sleeping, but we failed to transition its state?!
}

// tid wasn't sleeping so we might have to wake its children for it
if (wake_next)
return wake_next_threads(tid);

return 0;
}

int wake_next_threads(int16_t tid) JL_NOTSAFEPOINT
{
int any_asleep = 0;
int nthreads = jl_atomic_load_acquire(&jl_n_threads) - jl_n_gcthreads;
int16_t tid1 = (2 * tid) + 1;
if (tid1 < nthreads) {
any_asleep |= wake_thread(tid1, 1);
int16_t tid2 = (2 * tid) + 2;
if (tid2 < nthreads) {
any_asleep |= wake_thread(tid2, 1);
}
}
return any_asleep;
}

static void wake_libuv(void) JL_NOTSAFEPOINT
{
Expand Down Expand Up @@ -262,7 +287,8 @@ void wakeup_thread(jl_task_t *ct, int16_t tid) JL_NOTSAFEPOINT { // Pass in ptls
}
else {
// something added to the sticky-queue: notify that thread
if (wake_thread(tid) && uvlock != ct) {
wake_thread(tid, 0);
if (uvlock != ct) {
// check if we need to notify uv_run too
jl_fence();
jl_ptls_t other = jl_atomic_load_relaxed(&jl_all_tls_states)[tid];
Expand All @@ -279,14 +305,9 @@ void wakeup_thread(jl_task_t *ct, int16_t tid) JL_NOTSAFEPOINT { // Pass in ptls
// something added to the multi-queue: notify all threads
// in the future, we might want to instead wake some fraction of threads,
// and let each of those wake additional threads if they find work
int anysleep = 0;
int nthreads = jl_atomic_load_acquire(&jl_n_threads);
for (tid = 0; tid < nthreads; tid++) {
if (tid != self)
anysleep |= wake_thread(tid);
}
// check if we need to notify uv_run too
if (uvlock != ct && anysleep) {
wake_thread(0, 1);
if (uvlock != ct) {
// check if we need to notify uv_run too
jl_fence();
if (jl_atomic_load_relaxed(&jl_uv_mutex.owner) != NULL)
wake_libuv();
Expand Down Expand Up @@ -518,6 +539,11 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
}
// else should we warn the user of certain deadlock here if tid == 0 && n_threads_running == 0?
uv_cond_wait(&ptls->wake_signal, &ptls->sleep_lock);
// we were woken up; should we wake others?
if (!may_sleep(ptls) && ptls->wake_next) {
wake_next_threads(ptls->tid);
ptls->wake_next = 0;
}
}
assert(jl_atomic_load_relaxed(&ptls->sleep_check_state) == not_sleeping);
assert(jl_atomic_load_relaxed(&n_threads_running));
Expand Down

0 comments on commit 8f95687

Please sign in to comment.