Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
mattwparas committed Jan 13, 2025
1 parent 0432c9b commit 2566f27
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 37 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ path = "src/main.rs"

[workspace.dependencies]
# This has to line up with the workspace version above
steel-core = { path = "./crates/steel-core", version = "0.6.0", features = ["dylibs", "markdown", "stacker", "sync"] }
steel-core = { path = "./crates/steel-core", version = "0.6.0", features = ["dylibs", "markdown", "stacker", "sync", "rooted-instructions"] }

[features]
default = ["mimalloc"]
Expand Down
24 changes: 17 additions & 7 deletions crates/steel-core/src/steel_vm/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use super::{
builtin::{BuiltInModule, FunctionSignatureMetadata},
primitives::{register_builtin_modules, CONSTANTS},
vm::{SteelThread, ThreadStateController},
vm::{SteelThread, Synchronizer, ThreadStateController},
};

#[cfg(feature = "dylibs")]
Expand Down Expand Up @@ -217,9 +217,25 @@ pub struct Engine {
pub(crate) id: EngineId,
}

impl Engine {
pub fn enter_safepoint<T, F: FnMut() -> T>(&mut self, mut thunk: F) -> T {
let mut res = None;

self.virtual_machine.enter_safepoint(|_| {
res = Some((thunk)());
Ok(SteelVal::Void)
});

res.unwrap()
}
}

impl Clone for Engine {
fn clone(&self) -> Self {
let mut virtual_machine = self.virtual_machine.clone();

virtual_machine.synchronizer = Synchronizer::new();

let compiler = Arc::new(RwLock::new(self.virtual_machine.compiler.write().clone()));

// virtual_machine.compiler = Some(Arc::downgrade(&compiler));
Expand Down Expand Up @@ -2025,12 +2041,6 @@ impl Engine {
// .execute_program::<DoNotUseCallback, ApplyContract>(program)
// }

// TODO this does not take into account the issues with
// people registering new functions that shadow the original one
// fn constants(&mut self) -> ImmutableHashMap<InternedString, SteelVal, FxBuildHasher> {
// CONSTANT_PRIMITIVES.clone()
// }

pub fn add_module(&mut self, path: String) -> Result<()> {
self.virtual_machine.compiler.write().compile_module(
path.into(),
Expand Down
33 changes: 31 additions & 2 deletions crates/steel-core/src/steel_vm/vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ use std::{cell::RefCell, collections::HashMap, iter::Iterator, rc::Rc};

use super::engine::EngineId;

use arc_swap::ArcSwap;
use crossbeam::atomic::AtomicCell;
#[cfg(feature = "profiling")]
use log::{debug, log_enabled};
Expand Down Expand Up @@ -343,6 +342,10 @@ pub enum ThreadState {
/// The thread execution context
#[derive(Clone)]
pub struct SteelThread {
// TODO: Figure out how to best broadcast changes
// to the rest of the world? Right now pausing threads
// means we can get away with one environment that is
// shared, but in reality this should just be
pub(crate) global_env: Env,
pub(crate) stack: Vec<SteelVal>,

Expand Down Expand Up @@ -477,8 +480,14 @@ impl Synchronizer {
pub(crate) unsafe fn call_per_ctx(&mut self, mut func: impl FnMut(&mut SteelThread)) {
let guard = self.threads.lock().unwrap();

// IMPORTANT - This needs to be all threads except the currently
// executing one.
for ThreadContext { ctx, .. } in guard.iter() {
if let Some(ctx) = ctx.upgrade() {
if Arc::ptr_eq(&ctx, &self.ctx) {
continue;
}

// TODO: Have to use a condvar
loop {
if let Some(ctx) = ctx.load() {
Expand All @@ -493,6 +502,8 @@ impl Synchronizer {
} else {
log::debug!("Waiting for thread...")

// println!("Waiting for thread...");

// TODO: Some kind of condvar or message passing
// is probably a better scheme here, but the idea is to just
// wait until all the threads are done.
Expand All @@ -511,6 +522,11 @@ impl Synchronizer {
// Wait for all the threads to be legal
for ThreadContext { ctx, .. } in guard.iter() {
if let Some(ctx) = ctx.upgrade() {
// Don't pause myself, enter safepoint from main thread?
if Arc::ptr_eq(&ctx, &self.ctx) {
continue;
}

// TODO: Have to use a condvar
loop {
if let Some(ctx) = ctx.load() {
Expand Down Expand Up @@ -584,6 +600,16 @@ impl Synchronizer {

impl SteelThread {
pub fn new(sources: Sources, compiler: std::sync::Arc<RwLock<Compiler>>) -> SteelThread {
let synchronizer = Synchronizer::new();
let weak_ctx = Arc::downgrade(&synchronizer.ctx);

// TODO: Entering safepoint should happen often
// for the main thread?
synchronizer.threads.lock().unwrap().push(ThreadContext {
ctx: weak_ctx,
handle: SteelVal::Void,
});

SteelThread {
global_env: Env::root(),
stack: Vec::with_capacity(128),
Expand All @@ -604,7 +630,7 @@ impl SteelThread {
// with the executables
constant_map: DEFAULT_CONSTANT_MAP.with(|x| x.clone()),
interrupted: Default::default(),
synchronizer: Synchronizer::new(),
synchronizer,
thread_local_storage: Vec::new(),
sources,
compiler,
Expand Down Expand Up @@ -3694,6 +3720,7 @@ impl<'a> VmCore<'a> {
// TODO: Do the same thing here:
self.thread.synchronizer.stop_threads();

// println!("Pausing threads to define new variable");
self.thread
.global_env
.repl_define_idx(payload_size, value.clone());
Expand All @@ -3707,6 +3734,8 @@ impl<'a> VmCore<'a> {
});
}

// println!("Finished broadcasting new variable");

// Resume.
// Apply these to all of the things.
self.thread.synchronizer.resume_threads();
Expand Down
29 changes: 16 additions & 13 deletions crates/steel-repl/src/highlight.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
extern crate rustyline;
use colored::*;
use steel_parser::interner::InternedString;
use steel_parser::parser::SourceId;

use std::collections::HashSet;
use std::sync::{Arc, Mutex};

use rustyline::highlight::Highlighter;
Expand All @@ -17,22 +19,20 @@ use rustyline::completion::Pair;

use std::borrow::Cow;

use steel::steel_vm::engine::Engine;

impl Completer for RustylineHelper {
type Candidate = Pair;
}

#[derive(Helper)]
pub struct RustylineHelper {
engine: Arc<Mutex<Engine>>,
globals: Arc<Mutex<HashSet<InternedString>>>,
bracket: crossbeam::atomic::AtomicCell<Option<(u8, usize)>>, // keywords: HashSet<&'static str>,
}

impl RustylineHelper {
pub fn new(engine: Arc<Mutex<Engine>>) -> Self {
pub fn new(globals: Arc<Mutex<HashSet<InternedString>>>) -> Self {
Self {
engine,
globals,
bracket: crossbeam::atomic::AtomicCell::new(None),
}
}
Expand Down Expand Up @@ -195,18 +195,21 @@ impl Highlighter for RustylineHelper {
}
TokenType::Identifier(ident) => {
// If its a free identifier, nix it?
if self.engine.lock().unwrap().global_exists(ident) {
// println!("before length: {}", token.source().as_bytes().len());
let highlighted = format!("{}", token.source().bright_blue());
// println!("After length: {}", highlighted.as_bytes().len());

// println!("paren pos: {:?}", self.bracket.get());

if self
.globals
.lock()
.unwrap()
.contains(&InternedString::from(*ident))
{
let highlighted = format!("{}", token.source().bright_blue());
ranges_to_replace.push((token.span().range(), highlighted));
}

// else if self.engine.borrow().in_scope_macros().contains_key(*ident) {
// let highlighted = format!("{}", token.source().bright_cyan());
// TODO:
// if self.engine.lock().unwrap().global_exists(ident) {
// // println!("before length: {}", token.source().as_bytes().len());
// let highlighted = format!("{}", token.source().bright_blue());
// ranges_to_replace.push((token.span().range(), highlighted));
// }
}
Expand Down
29 changes: 20 additions & 9 deletions crates/steel-repl/src/repl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,10 @@ pub fn repl_base(mut vm: Engine) -> std::io::Result<()> {
vm.register_fn("quit", cancellation_function);
let safepoint = vm.get_thread_state_controller();

let engine = Arc::new(Mutex::new(vm));
rl.set_helper(Some(RustylineHelper::new(engine.clone())));
let mut engine = vm;
let globals = Arc::new(Mutex::new(engine.globals().iter().copied().collect()));

rl.set_helper(Some(RustylineHelper::new(globals.clone())));

let safepoint = safepoint.clone();
let ctrlc_safepoint = safepoint.clone();
Expand All @@ -235,7 +237,20 @@ pub fn repl_base(mut vm: Engine) -> std::io::Result<()> {
};

while rx.try_recv().is_err() {
let readline = rl.readline(&prompt);
// Update globals for highlighting
// TODO: Come up with some kind of subscription API?
let known_globals_length = globals.lock().unwrap().len();
let updated_globals_length = engine.globals().len();
if updated_globals_length > known_globals_length {
let mut guard = globals.lock().unwrap();
if let Some(range) = engine.globals().get(known_globals_length..) {
for var in range {
guard.insert(*var);
}
}
}

let readline = engine.enter_safepoint(|| rl.readline(&prompt));

match readline {
Ok(line) => {
Expand Down Expand Up @@ -281,19 +296,15 @@ pub fn repl_base(mut vm: Engine) -> std::io::Result<()> {

clear_interrupted();

finish_load_or_interrupt(
&mut engine.lock().unwrap(),
exprs,
path.to_path_buf(),
);
finish_load_or_interrupt(&mut engine, exprs, path.to_path_buf());
}
_ => {
// TODO also include this for loading files
let now = Instant::now();

clear_interrupted();

finish_or_interrupt(&mut engine.lock().unwrap(), line);
finish_or_interrupt(&mut engine, line);

if print_time {
println!("Time taken: {:?}", now.elapsed());
Expand Down
20 changes: 15 additions & 5 deletions pmap.scm
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
(require "steel/time/time.scm")
(require-builtin steel/time)

;; Dedicated internal thread pool - sits and waits for things.
;; How does eval on another thread behave?
;; The ultimate question...

;; Thread pool for parallel map - will just be static for all pmaps.
(define tp (make-thread-pool 16))

Expand Down Expand Up @@ -54,17 +58,23 @@
(define inputs (range 0 100000))

(define (looper x)
(if (= x 100)
x
(looper (+ x 1))))
(if (= x 100) x (looper (+ x 1))))

(define (expensive-add1 x)
(looper 0)
(add1 x))

;; List chunks -> Most elegant way of submitting the values in the list?
;; Also, more or less guaranteed to be sequential in memory. Big memory savings by
;; putting things together.

;; Where is the contention?
;; Contention on function calls.
(for-each (lambda (_)
; (time! (pmap expensive-add1 inputs))
(time! (map expensive-add1 inputs)))
;; Rooted instructions - very important.
;; Need to have a lot more tests around this.
(displayln (equal? (time! (pmap expensive-add1 inputs))
(time! (map expensive-add1 inputs)))))
(range 0 10))

; (time! (pmap expensive-add1 inputs))
Expand Down

0 comments on commit 2566f27

Please sign in to comment.