diff --git a/Cargo.toml b/Cargo.toml index 9afeb319d..41d165d52 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ path = "src/main.rs" [workspace.dependencies] # This has to line up with the workspace version above -steel-core = { path = "./crates/steel-core", version = "0.6.0", features = ["dylibs", "markdown", "stacker", "sync"] } +steel-core = { path = "./crates/steel-core", version = "0.6.0", features = ["dylibs", "markdown", "stacker", "sync", "rooted-instructions"] } [features] default = ["mimalloc"] diff --git a/crates/steel-core/src/steel_vm/engine.rs b/crates/steel-core/src/steel_vm/engine.rs index d6629fd1c..c1d390e5c 100644 --- a/crates/steel-core/src/steel_vm/engine.rs +++ b/crates/steel-core/src/steel_vm/engine.rs @@ -3,7 +3,7 @@ use super::{ builtin::{BuiltInModule, FunctionSignatureMetadata}, primitives::{register_builtin_modules, CONSTANTS}, - vm::{SteelThread, ThreadStateController}, + vm::{SteelThread, Synchronizer, ThreadStateController}, }; #[cfg(feature = "dylibs")] @@ -217,9 +217,25 @@ pub struct Engine { pub(crate) id: EngineId, } +impl Engine { + pub fn enter_safepoint T>(&mut self, mut thunk: F) -> T { + let mut res = None; + + self.virtual_machine.enter_safepoint(|_| { + res = Some((thunk)()); + Ok(SteelVal::Void) + }); + + res.unwrap() + } +} + impl Clone for Engine { fn clone(&self) -> Self { let mut virtual_machine = self.virtual_machine.clone(); + + virtual_machine.synchronizer = Synchronizer::new(); + let compiler = Arc::new(RwLock::new(self.virtual_machine.compiler.write().clone())); // virtual_machine.compiler = Some(Arc::downgrade(&compiler)); @@ -2025,12 +2041,6 @@ impl Engine { // .execute_program::(program) // } - // TODO this does not take into account the issues with - // people registering new functions that shadow the original one - // fn constants(&mut self) -> ImmutableHashMap { - // CONSTANT_PRIMITIVES.clone() - // } - pub fn add_module(&mut self, path: String) -> Result<()> { self.virtual_machine.compiler.write().compile_module( path.into(), diff --git a/crates/steel-core/src/steel_vm/vm.rs b/crates/steel-core/src/steel_vm/vm.rs index f292f1c68..cd727934a 100644 --- a/crates/steel-core/src/steel_vm/vm.rs +++ b/crates/steel-core/src/steel_vm/vm.rs @@ -60,7 +60,6 @@ use std::{cell::RefCell, collections::HashMap, iter::Iterator, rc::Rc}; use super::engine::EngineId; -use arc_swap::ArcSwap; use crossbeam::atomic::AtomicCell; #[cfg(feature = "profiling")] use log::{debug, log_enabled}; @@ -343,6 +342,10 @@ pub enum ThreadState { /// The thread execution context #[derive(Clone)] pub struct SteelThread { + // TODO: Figure out how to best broadcast changes + // to the rest of the world? Right now pausing threads + // means we can get away with one environment that is + // shared, but in reality this should just be pub(crate) global_env: Env, pub(crate) stack: Vec, @@ -477,8 +480,14 @@ impl Synchronizer { pub(crate) unsafe fn call_per_ctx(&mut self, mut func: impl FnMut(&mut SteelThread)) { let guard = self.threads.lock().unwrap(); + // IMPORTANT - This needs to be all threads except the currently + // executing one. for ThreadContext { ctx, .. } in guard.iter() { if let Some(ctx) = ctx.upgrade() { + if Arc::ptr_eq(&ctx, &self.ctx) { + continue; + } + // TODO: Have to use a condvar loop { if let Some(ctx) = ctx.load() { @@ -493,6 +502,8 @@ impl Synchronizer { } else { log::debug!("Waiting for thread...") + // println!("Waiting for thread..."); + // TODO: Some kind of condvar or message passing // is probably a better scheme here, but the idea is to just // wait until all the threads are done. @@ -511,6 +522,11 @@ impl Synchronizer { // Wait for all the threads to be legal for ThreadContext { ctx, .. } in guard.iter() { if let Some(ctx) = ctx.upgrade() { + // Don't pause myself, enter safepoint from main thread? + if Arc::ptr_eq(&ctx, &self.ctx) { + continue; + } + // TODO: Have to use a condvar loop { if let Some(ctx) = ctx.load() { @@ -584,6 +600,16 @@ impl Synchronizer { impl SteelThread { pub fn new(sources: Sources, compiler: std::sync::Arc>) -> SteelThread { + let synchronizer = Synchronizer::new(); + let weak_ctx = Arc::downgrade(&synchronizer.ctx); + + // TODO: Entering safepoint should happen often + // for the main thread? + synchronizer.threads.lock().unwrap().push(ThreadContext { + ctx: weak_ctx, + handle: SteelVal::Void, + }); + SteelThread { global_env: Env::root(), stack: Vec::with_capacity(128), @@ -604,7 +630,7 @@ impl SteelThread { // with the executables constant_map: DEFAULT_CONSTANT_MAP.with(|x| x.clone()), interrupted: Default::default(), - synchronizer: Synchronizer::new(), + synchronizer, thread_local_storage: Vec::new(), sources, compiler, @@ -3694,6 +3720,7 @@ impl<'a> VmCore<'a> { // TODO: Do the same thing here: self.thread.synchronizer.stop_threads(); + // println!("Pausing threads to define new variable"); self.thread .global_env .repl_define_idx(payload_size, value.clone()); @@ -3707,6 +3734,8 @@ impl<'a> VmCore<'a> { }); } + // println!("Finished broadcasting new variable"); + // Resume. // Apply these to all of the things. self.thread.synchronizer.resume_threads(); diff --git a/crates/steel-repl/src/highlight.rs b/crates/steel-repl/src/highlight.rs index 720771d0d..ea32ff9a2 100644 --- a/crates/steel-repl/src/highlight.rs +++ b/crates/steel-repl/src/highlight.rs @@ -1,7 +1,9 @@ extern crate rustyline; use colored::*; +use steel_parser::interner::InternedString; use steel_parser::parser::SourceId; +use std::collections::HashSet; use std::sync::{Arc, Mutex}; use rustyline::highlight::Highlighter; @@ -17,22 +19,20 @@ use rustyline::completion::Pair; use std::borrow::Cow; -use steel::steel_vm::engine::Engine; - impl Completer for RustylineHelper { type Candidate = Pair; } #[derive(Helper)] pub struct RustylineHelper { - engine: Arc>, + globals: Arc>>, bracket: crossbeam::atomic::AtomicCell>, // keywords: HashSet<&'static str>, } impl RustylineHelper { - pub fn new(engine: Arc>) -> Self { + pub fn new(globals: Arc>>) -> Self { Self { - engine, + globals, bracket: crossbeam::atomic::AtomicCell::new(None), } } @@ -195,18 +195,21 @@ impl Highlighter for RustylineHelper { } TokenType::Identifier(ident) => { // If its a free identifier, nix it? - if self.engine.lock().unwrap().global_exists(ident) { - // println!("before length: {}", token.source().as_bytes().len()); - let highlighted = format!("{}", token.source().bright_blue()); - // println!("After length: {}", highlighted.as_bytes().len()); - - // println!("paren pos: {:?}", self.bracket.get()); + if self + .globals + .lock() + .unwrap() + .contains(&InternedString::from(*ident)) + { + let highlighted = format!("{}", token.source().bright_blue()); ranges_to_replace.push((token.span().range(), highlighted)); } - // else if self.engine.borrow().in_scope_macros().contains_key(*ident) { - // let highlighted = format!("{}", token.source().bright_cyan()); + // TODO: + // if self.engine.lock().unwrap().global_exists(ident) { + // // println!("before length: {}", token.source().as_bytes().len()); + // let highlighted = format!("{}", token.source().bright_blue()); // ranges_to_replace.push((token.span().range(), highlighted)); // } } diff --git a/crates/steel-repl/src/repl.rs b/crates/steel-repl/src/repl.rs index 4fa6da51b..837378190 100644 --- a/crates/steel-repl/src/repl.rs +++ b/crates/steel-repl/src/repl.rs @@ -219,8 +219,10 @@ pub fn repl_base(mut vm: Engine) -> std::io::Result<()> { vm.register_fn("quit", cancellation_function); let safepoint = vm.get_thread_state_controller(); - let engine = Arc::new(Mutex::new(vm)); - rl.set_helper(Some(RustylineHelper::new(engine.clone()))); + let mut engine = vm; + let globals = Arc::new(Mutex::new(engine.globals().iter().copied().collect())); + + rl.set_helper(Some(RustylineHelper::new(globals.clone()))); let safepoint = safepoint.clone(); let ctrlc_safepoint = safepoint.clone(); @@ -235,7 +237,20 @@ pub fn repl_base(mut vm: Engine) -> std::io::Result<()> { }; while rx.try_recv().is_err() { - let readline = rl.readline(&prompt); + // Update globals for highlighting + // TODO: Come up with some kind of subscription API? + let known_globals_length = globals.lock().unwrap().len(); + let updated_globals_length = engine.globals().len(); + if updated_globals_length > known_globals_length { + let mut guard = globals.lock().unwrap(); + if let Some(range) = engine.globals().get(known_globals_length..) { + for var in range { + guard.insert(*var); + } + } + } + + let readline = engine.enter_safepoint(|| rl.readline(&prompt)); match readline { Ok(line) => { @@ -281,11 +296,7 @@ pub fn repl_base(mut vm: Engine) -> std::io::Result<()> { clear_interrupted(); - finish_load_or_interrupt( - &mut engine.lock().unwrap(), - exprs, - path.to_path_buf(), - ); + finish_load_or_interrupt(&mut engine, exprs, path.to_path_buf()); } _ => { // TODO also include this for loading files @@ -293,7 +304,7 @@ pub fn repl_base(mut vm: Engine) -> std::io::Result<()> { clear_interrupted(); - finish_or_interrupt(&mut engine.lock().unwrap(), line); + finish_or_interrupt(&mut engine, line); if print_time { println!("Time taken: {:?}", now.elapsed()); diff --git a/pmap.scm b/pmap.scm index 3e3612e8d..63941a57c 100644 --- a/pmap.scm +++ b/pmap.scm @@ -2,6 +2,10 @@ (require "steel/time/time.scm") (require-builtin steel/time) +;; Dedicated internal thread pool - sits and waits for things. +;; How does eval on another thread behave? +;; The ultimate question... + ;; Thread pool for parallel map - will just be static for all pmaps. (define tp (make-thread-pool 16)) @@ -54,17 +58,23 @@ (define inputs (range 0 100000)) (define (looper x) - (if (= x 100) - x - (looper (+ x 1)))) + (if (= x 100) x (looper (+ x 1)))) (define (expensive-add1 x) (looper 0) (add1 x)) +;; List chunks -> Most elegant way of submitting the values in the list? +;; Also, more or less guaranteed to be sequential in memory. Big memory savings by +;; putting things together. + +;; Where is the contention? +;; Contention on function calls. (for-each (lambda (_) - ; (time! (pmap expensive-add1 inputs)) - (time! (map expensive-add1 inputs))) + ;; Rooted instructions - very important. + ;; Need to have a lot more tests around this. + (displayln (equal? (time! (pmap expensive-add1 inputs)) + (time! (map expensive-add1 inputs))))) (range 0 10)) ; (time! (pmap expensive-add1 inputs))