diff --git a/crates/compiler/load_internal/src/file.rs b/crates/compiler/load_internal/src/file.rs index 870d1769139..d183f157d53 100644 --- a/crates/compiler/load_internal/src/file.rs +++ b/crates/compiler/load_internal/src/file.rs @@ -68,7 +68,7 @@ use roc_solve_problem::TypeError; use roc_target::Target; use roc_types::subs::{CopiedImport, ExposedTypesStorageSubs, Subs, VarStore, Variable}; use roc_types::types::{Alias, Types}; -use roc_worker::{ChannelProblem, WorkerMsg}; +use roc_worker::ChannelProblem; use std::collections::hash_map::Entry::{Occupied, Vacant}; use std::collections::HashMap; use std::io; @@ -1035,14 +1035,14 @@ type MsgSender<'a> = Sender>; /// Add a task to the queue, and notify all the listeners. fn enqueue_task<'a>( injector: &Injector>, - listeners: &[Sender], + listeners: &[Sender<()>], task: BuildTask<'a>, ) -> Result<(), LoadingProblem<'a>> { injector.push(task); for listener in listeners { listener - .send(WorkerMsg::TaskAdded) + .send(()) .map_err(|_| LoadingProblem::ChannelProblem(ChannelProblem::FailedToEnqueueTask))?; } @@ -1569,9 +1569,9 @@ pub fn load_single_threaded<'a>( // We'll add tasks to this, and then worker threads will take tasks from it. let injector = Injector::new(); - let (worker_msg_tx, worker_msg_rx) = bounded(1024); - let worker_listener = worker_msg_tx; - let worker_listeners = arena.alloc([worker_listener]); + let (worker_wakup_tx, worker_wakup_rx) = bounded(1024); + let worker_waker = worker_wakup_tx; + let worker_wakers = [worker_waker]; let worker = Worker::new_fifo(); let stealer = worker.stealer(); @@ -1579,7 +1579,7 @@ pub fn load_single_threaded<'a>( // now we just manually interleave stepping the state "thread" and the worker "thread" loop { - match state_thread_step(arena, state, worker_listeners, &injector, &msg_tx, &msg_rx) { + match state_thread_step(arena, state, &worker_wakers, &injector, &msg_tx, &msg_rx) { Ok(ControlFlow::Break(done)) => return Ok(done), Ok(ControlFlow::Continue(new_state)) => { state = new_state; @@ -1589,7 +1589,7 @@ pub fn load_single_threaded<'a>( // then check if the worker can step let control_flow = - roc_worker::worker_task_step(&worker, &injector, stealers, &worker_msg_rx, |task| { + roc_worker::worker_task_step(&worker, &injector, stealers, &worker_wakup_rx, |task| { run_task(task, arena, &src_dir, msg_tx.clone(), roc_cache_dir, target) }); @@ -1603,10 +1603,10 @@ pub fn load_single_threaded<'a>( } } -fn state_thread_step<'a>( +fn state_thread_step<'a, 'b>( arena: &'a Bump, state: State<'a>, - worker_listeners: &'a [Sender], + worker_wakers: &'b [Sender<()>], injector: &Injector>, msg_tx: &crossbeam::channel::Sender>, msg_rx: &crossbeam::channel::Receiver>, @@ -1712,14 +1712,8 @@ fn state_thread_step<'a>( let render = state.render; let palette = state.palette; - let res_state = update( - state, - msg, - msg_tx.clone(), - injector, - worker_listeners, - arena, - ); + let res_state = + update(state, msg, msg_tx.clone(), injector, worker_wakers, arena); match res_state { Ok(new_state) => Ok(ControlFlow::Continue(new_state)), @@ -1993,15 +1987,21 @@ fn load_multi_threaded<'a>( { let thread_result = thread::scope(|thread_scope| { - let mut worker_listeners = - bumpalo::collections::Vec::with_capacity_in(num_workers, arena); + // Careful! It's important that worker listeners aren't allocated in the arena, + // since they need to be correctly dropped if we have a panic in this thread::scope code. + // Making sure they're owned means they'll be dropped correctly on either normal exit + // of this thread::scope block or on panicking. When they're dropped, the worker threads + // will correctly exit their message processing loops. + // If these were allocated in the arena, we might panic without shutting down the worker threads, + // causing the thread::scope block to hang while it waits for the worker threads to exit. + let mut worker_wakers = Vec::with_capacity(num_workers); for worker_arena in it { let msg_tx = msg_tx.clone(); let worker = worker_queues.pop().unwrap(); - let (worker_msg_tx, worker_msg_rx) = bounded(1024); - worker_listeners.push(worker_msg_tx); + let (worker_wakup_tx, worker_wakup_rx) = bounded(1024); + worker_wakers.push(worker_wakup_tx); // We only want to move a *reference* to the main task queue's // injector in the thread, not the injector itself @@ -2015,16 +2015,22 @@ fn load_multi_threaded<'a>( .stack_size(EXPANDED_STACK_SIZE) .spawn(move |_| { // will process messages until we run out - roc_worker::worker_task(worker, injector, stealers, worker_msg_rx, |task| { - run_task( - task, - worker_arena, - src_dir, - msg_tx.clone(), - roc_cache_dir, - target, - ) - }) + roc_worker::worker_task( + worker, + injector, + stealers, + worker_wakup_rx, + |task| { + run_task( + task, + worker_arena, + src_dir, + msg_tx.clone(), + roc_cache_dir, + target, + ) + }, + ) }); res_join_handle.unwrap_or_else(|_| { @@ -2039,31 +2045,13 @@ fn load_multi_threaded<'a>( // Grab a reference to these Senders outside the loop, so we can share // it across each iteration of the loop. - let worker_listeners = worker_listeners.into_bump_slice(); let msg_tx = msg_tx.clone(); - macro_rules! shut_down_worker_threads { - () => { - for listener in worker_listeners { - // We intentionally don't propagate this Result, because even if - // shutting down a worker failed (which can happen if a a panic - // occurred on that thread), we want to continue shutting down - // the others regardless. - if listener.send(WorkerMsg::Shutdown).is_err() { - log!("There was an error trying to shutdown a worker thread. One reason this can happen is if the thread panicked."); - } - } - }; - } - // The root module will have already queued up messages to process, // and processing those messages will in turn queue up more messages. loop { - match state_thread_step(arena, state, worker_listeners, &injector, &msg_tx, &msg_rx) - { + match state_thread_step(arena, state, &worker_wakers, &injector, &msg_tx, &msg_rx) { Ok(ControlFlow::Break(load_result)) => { - shut_down_worker_threads!(); - return Ok(load_result); } Ok(ControlFlow::Continue(new_state)) => { @@ -2071,8 +2059,6 @@ fn load_multi_threaded<'a>( continue; } Err(e) => { - shut_down_worker_threads!(); - return Err(e); } } @@ -2106,18 +2092,18 @@ fn load_multi_threaded<'a>( } } -fn start_tasks<'a>( +fn start_tasks<'a, 'b>( arena: &'a Bump, state: &mut State<'a>, work: MutSet<(ModuleId, Phase)>, injector: &Injector>, - worker_listeners: &'a [Sender], + worker_wakers: &'b [Sender<()>], ) -> Result<(), LoadingProblem<'a>> { for (module_id, phase) in work { let tasks = start_phase(module_id, phase, arena, state); for task in tasks { - enqueue_task(injector, worker_listeners, task)? + enqueue_task(injector, worker_wakers, task)? } } @@ -2174,12 +2160,12 @@ fn extend_module_with_builtin_import(module: &mut ParsedModule, module_id: Modul module.initial_scope.extend(types); } -fn update<'a>( +fn update<'a, 'b>( mut state: State<'a>, msg: Msg<'a>, msg_tx: MsgSender<'a>, injector: &Injector>, - worker_listeners: &'a [Sender], + worker_wakers: &'b [Sender<()>], arena: &'a Bump, ) -> Result, LoadingProblem<'a>> { use self::Msg::*; @@ -2305,7 +2291,7 @@ fn update<'a>( work.extend(state.dependencies.notify(home, Phase::LoadHeader)); work.insert((home, Phase::Parse)); - start_tasks(arena, &mut state, work, injector, worker_listeners)?; + start_tasks(arena, &mut state, work, injector, worker_wakers)?; Ok(state) } @@ -2382,7 +2368,7 @@ fn update<'a>( } }; - start_tasks(arena, &mut state, work, injector, worker_listeners)?; + start_tasks(arena, &mut state, work, injector, worker_wakers)?; state .module_cache @@ -2393,7 +2379,7 @@ fn update<'a>( let work = state.dependencies.notify(module_id, Phase::Parse); - start_tasks(arena, &mut state, work, injector, worker_listeners)?; + start_tasks(arena, &mut state, work, injector, worker_wakers)?; Ok(state) } @@ -2445,7 +2431,7 @@ fn update<'a>( .dependencies .notify(module_id, Phase::CanonicalizeAndConstrain); - start_tasks(arena, &mut state, work, injector, worker_listeners)?; + start_tasks(arena, &mut state, work, injector, worker_wakers)?; Ok(state) } @@ -2652,7 +2638,7 @@ fn update<'a>( work }; - start_tasks(arena, &mut state, work, injector, worker_listeners)?; + start_tasks(arena, &mut state, work, injector, worker_wakers)?; } Ok(state) @@ -2700,7 +2686,7 @@ fn update<'a>( .dependencies .notify(module_id, Phase::FindSpecializations); - start_tasks(arena, &mut state, work, injector, worker_listeners)?; + start_tasks(arena, &mut state, work, injector, worker_wakers)?; Ok(state) } @@ -2990,13 +2976,13 @@ fn update<'a>( let work = state.dependencies.reload_make_specialization_pass(); - start_tasks(arena, &mut state, work, injector, worker_listeners)?; + start_tasks(arena, &mut state, work, injector, worker_wakers)?; Ok(state) } NextStep::MakingInPhase => { - start_tasks(arena, &mut state, work, injector, worker_listeners)?; + start_tasks(arena, &mut state, work, injector, worker_wakers)?; Ok(state) } diff --git a/crates/compiler/worker/src/worker.rs b/crates/compiler/worker/src/worker.rs index 71cb267be9f..8c9d2bcd601 100644 --- a/crates/compiler/worker/src/worker.rs +++ b/crates/compiler/worker/src/worker.rs @@ -7,12 +7,6 @@ use roc_module::symbol::ModuleId; use roc_work::Phase; use std::ops::ControlFlow; -#[derive(Debug)] -pub enum WorkerMsg { - Shutdown, - TaskAdded, -} - #[derive(Debug)] pub enum ChannelProblem { FailedToSendRootMsg, @@ -29,41 +23,31 @@ pub fn worker_task_step( worker: &Worker, injector: &Injector, stealers: &[Stealer], - worker_msg_rx: &Receiver, + worker_wakeup_rx: &Receiver<()>, run_task: impl Fn(Task) -> Result<(), ChannelProblem>, ) -> Result, ChannelProblem> { - match worker_msg_rx.try_recv() { - Ok(msg) => { - match msg { - WorkerMsg::Shutdown => { - // We've finished all our work. It's time to - // shut down the thread, so when the main thread - // blocks on joining with all the worker threads, - // it can finally exit too! - Ok(ControlFlow::Break(())) - } - WorkerMsg::TaskAdded => { - // Find a task - either from this thread's queue, - // or from the main queue, or from another worker's - // queue - and run it. - // - // There might be no tasks to work on! That could - // happen if another thread is working on a task - // which will later result in more tasks being - // added. In that case, do nothing, and keep waiting - // until we receive a Shutdown message. - if let Some(task) = find_task(worker, injector, stealers) { - run_task(task)?; - } - - Ok(ControlFlow::Continue(())) - } + match worker_wakeup_rx.try_recv() { + Ok(()) => { + // Find a task - either from this thread's queue, + // or from the main queue, or from another worker's + // queue - and run it. + // + // There might be no tasks to work on! That could + // happen if another thread is working on a task + // which will later result in more tasks being + // added. In that case, do nothing, and keep waiting + // until we receive a Shutdown message. + if let Some(task) = find_task(worker, injector, stealers) { + run_task(task)?; } + + Ok(ControlFlow::Continue(())) } Err(err) => match err { crossbeam::channel::TryRecvError::Empty => Ok(ControlFlow::Continue(())), crossbeam::channel::TryRecvError::Disconnected => { - Err(ChannelProblem::ChannelDisconnected) + // The channel sender has been dropped, which means we want to shut down + Ok(ControlFlow::Break(())) } }, } @@ -73,33 +57,22 @@ pub fn worker_task( worker: Worker, injector: &Injector, stealers: &[Stealer], - worker_msg_rx: crossbeam::channel::Receiver, + worker_wakeup_rx: crossbeam::channel::Receiver<()>, run_task: impl Fn(Task) -> Result<(), ChannelProblem>, ) -> Result<(), ChannelProblem> { // Keep listening until we receive a Shutdown msg - for msg in worker_msg_rx.iter() { - match msg { - WorkerMsg::Shutdown => { - // We've finished all our work. It's time to - // shut down the thread, so when the main thread - // blocks on joining with all the worker threads, - // it can finally exit too! - return Ok(()); - } - WorkerMsg::TaskAdded => { - // Find a task - either from this thread's queue, - // or from the main queue, or from another worker's - // queue - and run it. - // - // There might be no tasks to work on! That could - // happen if another thread is working on a task - // which will later result in more tasks being - // added. In that case, do nothing, and keep waiting - // until we receive a Shutdown message. - if let Some(task) = find_task(&worker, injector, stealers) { - run_task(task)?; - } - } + for () in worker_wakeup_rx.iter() { + // Find a task - either from this thread's queue, + // or from the main queue, or from another worker's + // queue - and run it. + // + // There might be no tasks to work on! That could + // happen if another thread is working on a task + // which will later result in more tasks being + // added. In that case, do nothing, and keep waiting + // until we receive a Shutdown message. + if let Some(task) = find_task(&worker, injector, stealers) { + run_task(task)?; } } @@ -110,17 +83,17 @@ pub fn start_tasks>( state: &mut State, work: MutSet<(ModuleId, Phase)>, injector: &Injector, - worker_listeners: &[Sender], + worker_wakers: &[Sender<()>], mut start_phase: impl FnMut(ModuleId, Phase, &mut State) -> Tasks, -) -> Result<(), SendError> { +) -> Result<(), SendError<()>> { for (module_id, phase) in work { let tasks = start_phase(module_id, phase, state); for task in tasks { injector.push(task); - for listener in worker_listeners { - listener.send(WorkerMsg::TaskAdded)?; + for listener in worker_wakers { + listener.send(())?; } } }