Skip to content

Commit fcd38fd

Browse files
committed
Add method to terminate a worker
1 parent 17b01da commit fcd38fd

File tree

2 files changed

+136
-62
lines changed

2 files changed

+136
-62
lines changed

crates/worker/src/actor/bridge.rs

+125-19
Original file line numberDiff line numberDiff line change
@@ -8,23 +8,99 @@ use std::rc::Weak;
88
use serde::{Deserialize, Serialize};
99

1010
use super::handler_id::HandlerId;
11+
use super::messages::FromWorker;
1112
use super::messages::ToWorker;
13+
use super::native_worker::DedicatedWorker;
1214
use super::native_worker::NativeWorkerExt;
1315
use super::traits::Worker;
14-
use super::{Callback, Shared};
16+
use super::Callback;
1517
use crate::codec::Codec;
1618

1719
pub(crate) type ToWorkerQueue<W> = Vec<ToWorker<W>>;
1820
pub(crate) type CallbackMap<W> = HashMap<HandlerId, Weak<dyn Fn(<W as Worker>::Output)>>;
1921

20-
struct WorkerBridgeInner<W>
22+
pub(crate) struct WorkerBridgeInner<W>
2123
where
2224
W: Worker,
2325
{
2426
// When worker is loaded, queue becomes None.
25-
pending_queue: Shared<Option<ToWorkerQueue<W>>>,
26-
callbacks: Shared<CallbackMap<W>>,
27-
post_msg: Rc<dyn Fn(ToWorker<W>)>,
27+
pending_queue: RefCell<Option<ToWorkerQueue<W>>>,
28+
callbacks: RefCell<CallbackMap<W>>,
29+
native_worker: RefCell<Option<DedicatedWorker>>,
30+
post_msg: Box<dyn Fn(&DedicatedWorker, ToWorker<W>)>,
31+
}
32+
33+
impl<W> WorkerBridgeInner<W>
34+
where
35+
W: Worker + 'static,
36+
{
37+
pub(crate) fn new<CODEC>(
38+
native_worker: DedicatedWorker,
39+
callbacks: CallbackMap<W>,
40+
) -> Rc<Self>
41+
where
42+
CODEC: Codec,
43+
W::Input: Serialize + for<'de> Deserialize<'de>,
44+
W::Output: Serialize + for<'de> Deserialize<'de>,
45+
{
46+
let worker = native_worker.clone();
47+
48+
let pending_queue = RefCell::new(Some(Vec::new()));
49+
let callbacks = RefCell::new(callbacks);
50+
let native_worker = RefCell::new(Some(native_worker));
51+
let post_msg = move |worker: &DedicatedWorker, msg: ToWorker<W>|
52+
worker.post_packed_message::<_, CODEC>(msg);
53+
54+
let self_ = Self {
55+
pending_queue,
56+
callbacks,
57+
native_worker,
58+
post_msg: Box::new(post_msg),
59+
};
60+
let self_ = Rc::new(self_);
61+
62+
let handler = {
63+
let bridge_inner = Rc::downgrade(&self_);
64+
// If all bridges are dropped then `self_` is dropped and `upgrade` returns `None`.
65+
move |msg: FromWorker<W>| if let Some(bridge_inner) = Weak::upgrade(&bridge_inner) {
66+
match msg {
67+
FromWorker::WorkerLoaded => {
68+
// Set pending queue to `None`. Unless `WorkerLoaded` is
69+
// sent twice, this will always be `Some`.
70+
if let Some(pending_queue) = bridge_inner.take_queue() {
71+
// Will be `None` if the worker has been terminated.
72+
if let Some(worker) = bridge_inner.native_worker.borrow_mut().as_ref() {
73+
// Send all pending messages.
74+
for to_worker in pending_queue.into_iter() {
75+
(bridge_inner.post_msg)(worker, to_worker);
76+
}
77+
}
78+
}
79+
}
80+
FromWorker::ProcessOutput(id, output) => {
81+
let mut callbacks = bridge_inner.callbacks.borrow_mut();
82+
83+
if let Some(m) = callbacks.get(&id) {
84+
if let Some(m) = Weak::upgrade(m) {
85+
m(output);
86+
} else {
87+
// The bridge has been dropped.
88+
callbacks.remove(&id);
89+
}
90+
}
91+
}
92+
}
93+
}
94+
};
95+
96+
worker.set_on_packed_message::<_, CODEC, _>(handler);
97+
98+
self_
99+
}
100+
101+
fn take_queue(&self) -> Option<ToWorkerQueue<W>> {
102+
self.pending_queue.borrow_mut().take()
103+
}
28104
}
29105

30106
impl<W> fmt::Debug for WorkerBridgeInner<W>
@@ -49,10 +125,24 @@ where
49125
m.push(msg);
50126
}
51127
None => {
52-
(self.post_msg)(msg);
128+
if let Some(worker) = self.native_worker.borrow().as_ref() {
129+
(self.post_msg)(worker, msg);
130+
}
53131
}
54132
}
55133
}
134+
135+
/// Terminate the worker, no more messages can be sent after this.
136+
fn terminate(&self) {
137+
if let Some(worker) = self.native_worker.borrow_mut().take() {
138+
worker.terminate();
139+
}
140+
}
141+
142+
/// Returns true if the worker is terminated.
143+
fn is_terminated(&self) -> bool {
144+
self.native_worker.borrow().is_none()
145+
}
56146
}
57147

58148
impl<W> Drop for WorkerBridgeInner<W>
@@ -66,6 +156,15 @@ where
66156
}
67157

68158
/// A connection manager for components interaction with workers.
159+
///
160+
/// Dropping this object will send a disconnect message to the worker and drop
161+
/// the callback if set, but will have no effect on forked bridges. Note that
162+
/// the worker will still receive and process any messages sent over the bridge
163+
/// up to that point, but the reply will not trigger a callback. If all forked
164+
/// bridges for a worker are dropped, the worker will be sent a destroy message.
165+
///
166+
/// To terminate the worker and stop execution immediately, use
167+
/// [`terminate`](#method.terminate).
69168
pub struct WorkerBridge<W>
70169
where
71170
W: Worker,
@@ -84,26 +183,16 @@ where
84183
self.inner.send_message(ToWorker::Connected(self.id));
85184
}
86185

87-
pub(crate) fn new<CODEC>(
186+
pub(crate) fn new(
88187
id: HandlerId,
89-
native_worker: web_sys::Worker,
90-
pending_queue: Rc<RefCell<Option<ToWorkerQueue<W>>>>,
91-
callbacks: Rc<RefCell<CallbackMap<W>>>,
188+
inner: Rc<WorkerBridgeInner<W>>,
92189
callback: Option<Callback<W::Output>>,
93190
) -> Self
94191
where
95-
CODEC: Codec,
96192
W::Input: Serialize + for<'de> Deserialize<'de>,
97193
{
98-
let post_msg = move |msg: ToWorker<W>| native_worker.post_packed_message::<_, CODEC>(msg);
99-
100194
let self_ = Self {
101-
inner: WorkerBridgeInner {
102-
pending_queue,
103-
callbacks,
104-
post_msg: Rc::new(post_msg),
105-
}
106-
.into(),
195+
inner,
107196
id,
108197
_worker: PhantomData,
109198
_cb: callback,
@@ -146,6 +235,23 @@ where
146235

147236
self_
148237
}
238+
239+
/// Immediately terminates the worker and stops any execution in progress,
240+
/// for this and all forked bridges. All messages will be dropped without
241+
/// the worker receiving them. No disconnect or destroy message is sent. Any
242+
/// messages sent after this point are dropped (from this bridge or any
243+
/// forks).
244+
///
245+
/// For more details see
246+
/// [`web_sys::Worker::terminate`](https://rustwasm.github.io/wasm-bindgen/api/web_sys/struct.Worker.html#method.terminate).
247+
pub fn terminate(&self) {
248+
self.inner.terminate()
249+
}
250+
251+
/// Returns true if the worker is terminated.
252+
pub fn is_terminated(&self) -> bool {
253+
self.inner.is_terminated()
254+
}
149255
}
150256

151257
impl<W> Drop for WorkerBridge<W>

crates/worker/src/actor/spawner.rs

+11-43
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,19 @@
1-
use std::cell::RefCell;
21
use std::collections::HashMap;
32
use std::fmt;
43
use std::marker::PhantomData;
5-
use std::rc::{Rc, Weak};
4+
use std::rc::Rc;
65

76
use gloo_utils::window;
87
use js_sys::Array;
98
use serde::de::Deserialize;
109
use serde::ser::Serialize;
1110
use web_sys::{Blob, BlobPropertyBag, Url};
1211

13-
use super::bridge::{CallbackMap, WorkerBridge};
12+
use super::bridge::{WorkerBridge, WorkerBridgeInner};
1413
use super::handler_id::HandlerId;
15-
use super::messages::FromWorker;
16-
use super::native_worker::{DedicatedWorker, NativeWorkerExt};
14+
use super::native_worker::DedicatedWorker;
1715
use super::traits::Worker;
18-
use super::{Callback, Shared};
16+
use super::Callback;
1917
use crate::codec::{Bincode, Codec};
2018

2119
fn create_worker(path: &str) -> DedicatedWorker {
@@ -110,51 +108,21 @@ where
110108
W::Input: Serialize + for<'de> Deserialize<'de>,
111109
W::Output: Serialize + for<'de> Deserialize<'de>,
112110
{
113-
let pending_queue = Rc::new(RefCell::new(Some(Vec::new())));
114111
let handler_id = HandlerId::new();
115112
let mut callbacks = HashMap::new();
116113

117114
if let Some(m) = self.callback.as_ref().map(Rc::downgrade) {
118115
callbacks.insert(handler_id, m);
119116
}
120117

121-
let callbacks: Shared<CallbackMap<W>> = Rc::new(RefCell::new(callbacks));
122-
123-
let handler = {
124-
let pending_queue = pending_queue.clone();
125-
let callbacks = callbacks.clone();
126-
127-
let worker = worker.clone();
128-
129-
move |msg: FromWorker<W>| match msg {
130-
FromWorker::WorkerLoaded => {
131-
if let Some(pending_queue) = pending_queue.borrow_mut().take() {
132-
for to_worker in pending_queue.into_iter() {
133-
worker.post_packed_message::<_, CODEC>(to_worker);
134-
}
135-
}
136-
}
137-
FromWorker::ProcessOutput(id, output) => {
138-
let mut callbacks = callbacks.borrow_mut();
139-
140-
if let Some(m) = callbacks.get(&id) {
141-
if let Some(m) = Weak::upgrade(m) {
142-
m(output);
143-
} else {
144-
callbacks.remove(&id);
145-
}
146-
}
147-
}
148-
}
149-
};
150-
151-
worker.set_on_packed_message::<_, CODEC, _>(handler);
152-
153-
WorkerBridge::<W>::new::<CODEC>(
154-
handler_id,
118+
let inner = WorkerBridgeInner::<W>::new::<CODEC>(
155119
worker,
156-
pending_queue,
157-
callbacks,
120+
callbacks
121+
);
122+
123+
WorkerBridge::<W>::new(
124+
handler_id,
125+
inner,
158126
self.callback.clone(),
159127
)
160128
}

0 commit comments

Comments
 (0)