Skip to content

Commit

Permalink
Prevent panics in worker threads from causing deadlocks
Browse files Browse the repository at this point in the history
Occassionally, we'll run into panics during compilation which prior to this diff
would cause the compiler to hang indefinitely after printing the initial panic message.
  • Loading branch information
joshuawarner32 committed Nov 14, 2024
1 parent 157aaae commit 5b368c6
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 128 deletions.
118 changes: 52 additions & 66 deletions crates/compiler/load_internal/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use roc_solve_problem::TypeError;
use roc_target::Target;
use roc_types::subs::{CopiedImport, ExposedTypesStorageSubs, Subs, VarStore, Variable};
use roc_types::types::{Alias, Types};
use roc_worker::{ChannelProblem, WorkerMsg};
use roc_worker::ChannelProblem;
use std::collections::hash_map::Entry::{Occupied, Vacant};
use std::collections::HashMap;
use std::io;
Expand Down Expand Up @@ -1035,14 +1035,14 @@ type MsgSender<'a> = Sender<Msg<'a>>;
/// Add a task to the queue, and notify all the listeners.
fn enqueue_task<'a>(
injector: &Injector<BuildTask<'a>>,
listeners: &[Sender<WorkerMsg>],
listeners: &[Sender<()>],
task: BuildTask<'a>,
) -> Result<(), LoadingProblem<'a>> {
injector.push(task);

for listener in listeners {
listener
.send(WorkerMsg::TaskAdded)
.send(())
.map_err(|_| LoadingProblem::ChannelProblem(ChannelProblem::FailedToEnqueueTask))?;
}

Expand Down Expand Up @@ -1569,17 +1569,17 @@ pub fn load_single_threaded<'a>(
// We'll add tasks to this, and then worker threads will take tasks from it.
let injector = Injector::new();

let (worker_msg_tx, worker_msg_rx) = bounded(1024);
let worker_listener = worker_msg_tx;
let worker_listeners = arena.alloc([worker_listener]);
let (worker_wakup_tx, worker_wakup_rx) = bounded(1024);
let worker_waker = worker_wakup_tx;
let worker_wakers = [worker_waker];

let worker = Worker::new_fifo();
let stealer = worker.stealer();
let stealers = &[stealer];

// now we just manually interleave stepping the state "thread" and the worker "thread"
loop {
match state_thread_step(arena, state, worker_listeners, &injector, &msg_tx, &msg_rx) {
match state_thread_step(arena, state, &worker_wakers, &injector, &msg_tx, &msg_rx) {
Ok(ControlFlow::Break(done)) => return Ok(done),
Ok(ControlFlow::Continue(new_state)) => {
state = new_state;
Expand All @@ -1589,7 +1589,7 @@ pub fn load_single_threaded<'a>(

// then check if the worker can step
let control_flow =
roc_worker::worker_task_step(&worker, &injector, stealers, &worker_msg_rx, |task| {
roc_worker::worker_task_step(&worker, &injector, stealers, &worker_wakup_rx, |task| {
run_task(task, arena, &src_dir, msg_tx.clone(), roc_cache_dir, target)
});

Expand All @@ -1603,10 +1603,10 @@ pub fn load_single_threaded<'a>(
}
}

fn state_thread_step<'a>(
fn state_thread_step<'a, 'b>(
arena: &'a Bump,
state: State<'a>,
worker_listeners: &'a [Sender<WorkerMsg>],
worker_wakers: &'b [Sender<()>],
injector: &Injector<BuildTask<'a>>,
msg_tx: &crossbeam::channel::Sender<Msg<'a>>,
msg_rx: &crossbeam::channel::Receiver<Msg<'a>>,
Expand Down Expand Up @@ -1712,14 +1712,8 @@ fn state_thread_step<'a>(
let render = state.render;
let palette = state.palette;

let res_state = update(
state,
msg,
msg_tx.clone(),
injector,
worker_listeners,
arena,
);
let res_state =
update(state, msg, msg_tx.clone(), injector, worker_wakers, arena);

match res_state {
Ok(new_state) => Ok(ControlFlow::Continue(new_state)),
Expand Down Expand Up @@ -1993,15 +1987,21 @@ fn load_multi_threaded<'a>(

{
let thread_result = thread::scope(|thread_scope| {
let mut worker_listeners =
bumpalo::collections::Vec::with_capacity_in(num_workers, arena);
// Careful! It's important that worker listeners aren't allocated in the arena,
// since they need to be correctly dropped if we have a panic in this thread::scope code.
// Making sure they're owned means they'll be dropped correctly on either normal exit
// of this thread::scope block or on panicking. When they're dropped, the worker threads
// will correctly exit their message processing loops.
// If these were allocated in the arena, we might panic without shutting down the worker threads,
// causing the thread::scope block to hang while it waits for the worker threads to exit.
let mut worker_wakers = Vec::with_capacity(num_workers);

for worker_arena in it {
let msg_tx = msg_tx.clone();
let worker = worker_queues.pop().unwrap();

let (worker_msg_tx, worker_msg_rx) = bounded(1024);
worker_listeners.push(worker_msg_tx);
let (worker_wakup_tx, worker_wakup_rx) = bounded(1024);
worker_wakers.push(worker_wakup_tx);

// We only want to move a *reference* to the main task queue's
// injector in the thread, not the injector itself
Expand All @@ -2015,16 +2015,22 @@ fn load_multi_threaded<'a>(
.stack_size(EXPANDED_STACK_SIZE)
.spawn(move |_| {
// will process messages until we run out
roc_worker::worker_task(worker, injector, stealers, worker_msg_rx, |task| {
run_task(
task,
worker_arena,
src_dir,
msg_tx.clone(),
roc_cache_dir,
target,
)
})
roc_worker::worker_task(
worker,
injector,
stealers,
worker_wakup_rx,
|task| {
run_task(
task,
worker_arena,
src_dir,
msg_tx.clone(),
roc_cache_dir,
target,
)
},
)
});

res_join_handle.unwrap_or_else(|_| {
Expand All @@ -2039,40 +2045,20 @@ fn load_multi_threaded<'a>(

// Grab a reference to these Senders outside the loop, so we can share
// it across each iteration of the loop.
let worker_listeners = worker_listeners.into_bump_slice();
let msg_tx = msg_tx.clone();

macro_rules! shut_down_worker_threads {
() => {
for listener in worker_listeners {
// We intentionally don't propagate this Result, because even if
// shutting down a worker failed (which can happen if a a panic
// occurred on that thread), we want to continue shutting down
// the others regardless.
if listener.send(WorkerMsg::Shutdown).is_err() {
log!("There was an error trying to shutdown a worker thread. One reason this can happen is if the thread panicked.");
}
}
};
}

// The root module will have already queued up messages to process,
// and processing those messages will in turn queue up more messages.
loop {
match state_thread_step(arena, state, worker_listeners, &injector, &msg_tx, &msg_rx)
{
match state_thread_step(arena, state, &worker_wakers, &injector, &msg_tx, &msg_rx) {
Ok(ControlFlow::Break(load_result)) => {
shut_down_worker_threads!();

return Ok(load_result);
}
Ok(ControlFlow::Continue(new_state)) => {
state = new_state;
continue;
}
Err(e) => {
shut_down_worker_threads!();

return Err(e);
}
}
Expand Down Expand Up @@ -2106,18 +2092,18 @@ fn load_multi_threaded<'a>(
}
}

fn start_tasks<'a>(
fn start_tasks<'a, 'b>(
arena: &'a Bump,
state: &mut State<'a>,
work: MutSet<(ModuleId, Phase)>,
injector: &Injector<BuildTask<'a>>,
worker_listeners: &'a [Sender<WorkerMsg>],
worker_wakers: &'b [Sender<()>],
) -> Result<(), LoadingProblem<'a>> {
for (module_id, phase) in work {
let tasks = start_phase(module_id, phase, arena, state);

for task in tasks {
enqueue_task(injector, worker_listeners, task)?
enqueue_task(injector, worker_wakers, task)?
}
}

Expand Down Expand Up @@ -2174,12 +2160,12 @@ fn extend_module_with_builtin_import(module: &mut ParsedModule, module_id: Modul
module.initial_scope.extend(types);
}

fn update<'a>(
fn update<'a, 'b>(
mut state: State<'a>,
msg: Msg<'a>,
msg_tx: MsgSender<'a>,
injector: &Injector<BuildTask<'a>>,
worker_listeners: &'a [Sender<WorkerMsg>],
worker_wakers: &'b [Sender<()>],
arena: &'a Bump,
) -> Result<State<'a>, LoadingProblem<'a>> {
use self::Msg::*;
Expand Down Expand Up @@ -2305,7 +2291,7 @@ fn update<'a>(
work.extend(state.dependencies.notify(home, Phase::LoadHeader));
work.insert((home, Phase::Parse));

start_tasks(arena, &mut state, work, injector, worker_listeners)?;
start_tasks(arena, &mut state, work, injector, worker_wakers)?;

Ok(state)
}
Expand Down Expand Up @@ -2382,7 +2368,7 @@ fn update<'a>(
}
};

start_tasks(arena, &mut state, work, injector, worker_listeners)?;
start_tasks(arena, &mut state, work, injector, worker_wakers)?;

state
.module_cache
Expand All @@ -2393,7 +2379,7 @@ fn update<'a>(

let work = state.dependencies.notify(module_id, Phase::Parse);

start_tasks(arena, &mut state, work, injector, worker_listeners)?;
start_tasks(arena, &mut state, work, injector, worker_wakers)?;

Ok(state)
}
Expand Down Expand Up @@ -2445,7 +2431,7 @@ fn update<'a>(
.dependencies
.notify(module_id, Phase::CanonicalizeAndConstrain);

start_tasks(arena, &mut state, work, injector, worker_listeners)?;
start_tasks(arena, &mut state, work, injector, worker_wakers)?;

Ok(state)
}
Expand Down Expand Up @@ -2652,7 +2638,7 @@ fn update<'a>(
work
};

start_tasks(arena, &mut state, work, injector, worker_listeners)?;
start_tasks(arena, &mut state, work, injector, worker_wakers)?;
}

Ok(state)
Expand Down Expand Up @@ -2700,7 +2686,7 @@ fn update<'a>(
.dependencies
.notify(module_id, Phase::FindSpecializations);

start_tasks(arena, &mut state, work, injector, worker_listeners)?;
start_tasks(arena, &mut state, work, injector, worker_wakers)?;

Ok(state)
}
Expand Down Expand Up @@ -2990,13 +2976,13 @@ fn update<'a>(

let work = state.dependencies.reload_make_specialization_pass();

start_tasks(arena, &mut state, work, injector, worker_listeners)?;
start_tasks(arena, &mut state, work, injector, worker_wakers)?;

Ok(state)
}

NextStep::MakingInPhase => {
start_tasks(arena, &mut state, work, injector, worker_listeners)?;
start_tasks(arena, &mut state, work, injector, worker_wakers)?;

Ok(state)
}
Expand Down
Loading

0 comments on commit 5b368c6

Please sign in to comment.