Skip to content

Commit 0b07bff

Browse files
authored
V8: Let a worker thread own the isolate (#3401)
# Description of Changes 1. Moves work of `JsInstance` to a worker thread to avoid re-making the isolate all the time. 2. Does some minor refactoring. 3. Deals with `JsInstanceEnv` being potentially unset. Fixes #3460. # API and ABI breaking changes None # Expected complexity level and risk 3 # Testing Should be covered by exiting tests.
1 parent dd94991 commit 0b07bff

File tree

11 files changed

+628
-367
lines changed

11 files changed

+628
-367
lines changed

crates/core/src/host/host_controller.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -603,14 +603,14 @@ async fn make_module_host(
603603
let start = Instant::now();
604604
let module_host = match host_type {
605605
HostType::Wasm => {
606-
let actor = runtimes.wasmtime.make_actor(mcc)?;
606+
let (actor, init_inst) = runtimes.wasmtime.make_actor(mcc)?;
607607
trace!("wasmtime::make_actor blocked for {:?}", start.elapsed());
608-
ModuleHost::new(actor, unregister, executor, database_identity)
608+
ModuleHost::new(actor, init_inst, unregister, executor, database_identity)
609609
}
610610
HostType::Js => {
611-
let actor = runtimes.v8.make_actor(mcc)?;
611+
let (actor, init_inst) = runtimes.v8.make_actor(mcc)?;
612612
trace!("v8::make_actor blocked for {:?}", start.elapsed());
613-
ModuleHost::new(actor, unregister, executor, database_identity)
613+
ModuleHost::new(actor, init_inst, unregister, executor, database_identity)
614614
}
615615
};
616616
Ok((program, module_host))

crates/core/src/host/module_common.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use crate::{
88
wasm_common::{module_host_actor::DescribeError, DESCRIBE_MODULE_DUNDER},
99
Scheduler,
1010
},
11-
module_host_context::ModuleCreationContext,
11+
module_host_context::ModuleCreationContextLimited,
1212
replica_context::ReplicaContext,
1313
};
1414
use spacetimedb_lib::{Identity, RawModuleDef};
@@ -17,7 +17,7 @@ use std::sync::Arc;
1717

1818
/// Builds a [`ModuleCommon`] from a [`RawModuleDef`].
1919
pub fn build_common_module_from_raw(
20-
mcc: ModuleCreationContext,
20+
mcc: ModuleCreationContextLimited,
2121
raw_def: RawModuleDef,
2222
) -> Result<ModuleCommon, ValidationErrors> {
2323
// Perform a bunch of validation on the raw definition.
@@ -31,7 +31,7 @@ pub fn build_common_module_from_raw(
3131
def,
3232
replica_ctx.owner_identity,
3333
replica_ctx.database_identity,
34-
mcc.program.hash,
34+
mcc.program_hash,
3535
log_tx,
3636
replica_ctx.subscriptions.clone(),
3737
);

crates/core/src/host/module_host.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,9 @@ impl ReducersMap {
321321
/// A runtime that can create modules.
322322
pub trait ModuleRuntime {
323323
/// Creates a module based on the context `mcc`.
324-
fn make_actor(&self, mcc: ModuleCreationContext<'_>) -> anyhow::Result<Module>;
324+
///
325+
/// Also returns the initial instance for the module.
326+
fn make_actor(&self, mcc: ModuleCreationContext<'_>) -> anyhow::Result<(Module, Instance)>;
325327
}
326328

327329
pub enum Module {
@@ -357,10 +359,10 @@ impl Module {
357359
Module::Js(module) => module.info(),
358360
}
359361
}
360-
fn create_instance(&self) -> Instance {
362+
async fn create_instance(&self) -> Instance {
361363
match self {
362364
Module::Wasm(module) => Instance::Wasm(Box::new(module.create_instance())),
363-
Module::Js(module) => Instance::Js(Box::new(module.create_instance())),
365+
Module::Js(module) => Instance::Js(Box::new(module.create_instance().await)),
364366
}
365367
}
366368
fn host_type(&self) -> HostType {
@@ -554,7 +556,7 @@ impl CreateInstanceTimeMetric {
554556
}
555557

556558
impl ModuleInstanceManager {
557-
fn new(module: Arc<Module>, database_identity: Identity) -> Self {
559+
fn new(module: Arc<Module>, init_inst: Instance, database_identity: Identity) -> Self {
558560
let host_type = module.host_type();
559561
let create_instance_time_metric = CreateInstanceTimeMetric {
560562
metric: WORKER_METRICS
@@ -563,19 +565,24 @@ impl ModuleInstanceManager {
563565
host_type,
564566
database_identity,
565567
};
568+
569+
// Add the first instance.
570+
let mut instances = VecDeque::new();
571+
instances.push_front(init_inst);
572+
566573
Self {
567-
instances: Default::default(),
574+
instances,
568575
module,
569576
create_instance_time_metric,
570577
}
571578
}
572-
fn get_instance(&mut self) -> Instance {
579+
async fn get_instance(&mut self) -> Instance {
573580
if let Some(inst) = self.instances.pop_back() {
574581
inst
575582
} else {
576583
let start_time = std::time::Instant::now();
577584
// TODO: should we be calling `create_instance` on the `SingleCoreExecutor` rather than the calling thread?
578-
let res = self.module.create_instance();
585+
let res = self.module.create_instance().await;
579586
let elapsed_time = start_time.elapsed();
580587
self.create_instance_time_metric.observe(elapsed_time);
581588
res
@@ -690,6 +697,7 @@ pub enum ClientConnectedError {
690697
impl ModuleHost {
691698
pub(super) fn new(
692699
module: Module,
700+
init_inst: Instance,
693701
on_panic: impl Fn() + Send + Sync + 'static,
694702
executor: SingleCoreExecutor,
695703
database_identity: Identity,
@@ -700,7 +708,8 @@ impl ModuleHost {
700708

701709
let module_clone = module.clone();
702710

703-
let instance_manager = Arc::new(Mutex::new(ModuleInstanceManager::new(module_clone, database_identity)));
711+
let instance_manager = ModuleInstanceManager::new(module_clone, init_inst, database_identity);
712+
let instance_manager = Arc::new(Mutex::new(instance_manager));
704713

705714
ModuleHost {
706715
info,
@@ -807,7 +816,7 @@ impl ModuleHost {
807816
(self.on_panic)();
808817
});
809818

810-
let mut instance = self.instance_manager.lock().await.get_instance();
819+
let mut instance = self.instance_manager.lock().await.get_instance().await;
811820

812821
let (res, instance) = self
813822
.executor

crates/core/src/host/v8/budget.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#![allow(dead_code)]
2+
13
//! Provides budget, energy, timeout, and long-running logging facilities.
24
//!
35
//! These are all driven by [`with_timeout_and_cb_every`] for V8 modules
@@ -47,7 +49,11 @@ pub(super) type InterruptCallback = extern "C" fn(&mut Isolate, *mut c_void);
4749
/// every [`EPOCH_TICKS_PER_SECOND`] ticks (~every 1 second)
4850
/// to log that the reducer is still running.
4951
pub(super) extern "C" fn cb_log_long_running(isolate: &mut Isolate, _: *mut c_void) {
50-
let env = env_on_isolate(isolate);
52+
let Some(env) = env_on_isolate(isolate) else {
53+
// All we can do is log something.
54+
tracing::error!("`JsInstanceEnv` not set");
55+
return;
56+
};
5157
let database = env.instance_env.replica_ctx.database_identity;
5258
let reducer = env.reducer_name();
5359
let dur = env.reducer_start().elapsed();
@@ -61,7 +67,6 @@ pub(super) extern "C" fn cb_noop(_: &mut Isolate, _: *mut c_void) {}
6167
/// when `budget` has been used up.
6268
///
6369
/// Every `callback_every` ticks, `callback` is called.
64-
#[allow(dead_code)]
6570
fn run_timeout_and_cb_every(
6671
handle: IsolateHandle,
6772
callback_every: u64,

crates/core/src/host/v8/error.rs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -402,10 +402,33 @@ pub(super) fn log_traceback(func_type: &str, func: &str, e: &anyhow::Error) {
402402
pub(super) fn catch_exception<'scope, T>(
403403
scope: &mut PinScope<'scope, '_>,
404404
body: impl FnOnce(&mut PinScope<'scope, '_>) -> Result<T, ErrorOrException<ExceptionThrown>>,
405-
) -> Result<T, ErrorOrException<JsError>> {
405+
) -> Result<T, (ErrorOrException<JsError>, CanContinue)> {
406406
tc_scope!(scope, scope);
407407
body(scope).map_err(|e| match e {
408-
ErrorOrException::Err(e) => ErrorOrException::Err(e),
409-
ErrorOrException::Exception(_) => ErrorOrException::Exception(JsError::from_caught(scope)),
408+
ErrorOrException::Err(e) => (ErrorOrException::Err(e), CanContinue::Yes),
409+
ErrorOrException::Exception(_) => {
410+
let error = ErrorOrException::Exception(JsError::from_caught(scope));
411+
412+
let can_continue = if scope.can_continue() {
413+
// We can continue.
414+
CanContinue::Yes
415+
} else if scope.has_terminated() {
416+
// We can continue if we do `Isolate::cancel_terminate_execution`.
417+
CanContinue::YesCancelTermination
418+
} else {
419+
// We cannot.
420+
CanContinue::No
421+
};
422+
423+
(error, can_continue)
424+
}
410425
})
411426
}
427+
428+
/// Encodes whether it is safe to continue using the [`Isolate`]
429+
/// for further execution after [`catch_exception`] has happened.
430+
pub(super) enum CanContinue {
431+
Yes,
432+
YesCancelTermination,
433+
No,
434+
}

0 commit comments

Comments
 (0)