Skip to content

Commit 17c8858

Browse files
authored
Merge pull request #780 from NobodyXu/optimize
Optimize `Build::compile_objects`: Only spawns one thread
2 parents 57853c4 + ff45d42 commit 17c8858

File tree

2 files changed

+135
-118
lines changed

2 files changed

+135
-118
lines changed

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ edition = "2018"
1919

2020
[dependencies]
2121
jobserver = { version = "0.1.16", optional = true }
22+
os_pipe = "1"
2223

2324
[features]
2425
parallel = ["jobserver"]

src/lib.rs

+134-118
Original file line numberDiff line numberDiff line change
@@ -1218,8 +1218,7 @@ impl Build {
12181218
}
12191219

12201220
#[cfg(feature = "parallel")]
1221-
fn compile_objects<'me>(&'me self, objs: &[Object]) -> Result<(), Error> {
1222-
use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
1221+
fn compile_objects(&self, objs: &[Object]) -> Result<(), Error> {
12231222
use std::sync::Once;
12241223

12251224
// Limit our parallelism globally with a jobserver. Start off by
@@ -1242,56 +1241,28 @@ impl Build {
12421241
// Note that this jobserver is cached globally so we only used one per
12431242
// process and only worry about creating it once.
12441243
//
1245-
// * Next we use a raw `thread::spawn` per thread to actually compile
1246-
// objects in parallel. We only actually spawn a thread after we've
1247-
// acquired a token to perform some work
1248-
//
1249-
// * Finally though we want to keep the dependencies of this crate
1250-
// pretty light, so we avoid using a safe abstraction like `rayon` and
1251-
// instead rely on some bits of `unsafe` code. We know that this stack
1252-
// frame persists while everything is compiling so we use all the
1253-
// stack-allocated objects without cloning/reallocating. We use a
1254-
// transmute to `State` with a `'static` lifetime to persist
1255-
// everything we need across the boundary, and the join-on-drop
1256-
// semantics of `JoinOnDrop` should ensure that our stack frame is
1257-
// alive while threads are alive.
1244+
// * Next we use spawn the process to actually compile objects in
1245+
// parallel after we've acquired a token to perform some work
12581246
//
12591247
// With all that in mind we compile all objects in a loop here, after we
12601248
// acquire the appropriate tokens, Once all objects have been compiled
1261-
// we join on all the threads and propagate the results of compilation.
1262-
//
1263-
// Note that as a slight optimization we try to break out as soon as
1264-
// possible as soon as any compilation fails to ensure that errors get
1265-
// out to the user as fast as possible.
1266-
let error = AtomicBool::new(false);
1267-
let mut threads = Vec::new();
1268-
for obj in objs {
1269-
if error.load(SeqCst) {
1270-
break;
1271-
}
1272-
let token = server.acquire()?;
1273-
let state = State {
1274-
build: self,
1275-
obj,
1276-
error: &error,
1277-
};
1278-
let state = unsafe { std::mem::transmute::<State, State<'static>>(state) };
1279-
let thread = thread::spawn(|| {
1280-
let state: State<'me> = state; // erase the `'static` lifetime
1281-
let result = state.build.compile_object(state.obj);
1282-
if result.is_err() {
1283-
state.error.store(true, SeqCst);
1284-
}
1285-
drop(token); // make sure our jobserver token is released after the compile
1286-
return result;
1287-
});
1288-
threads.push(JoinOnDrop(Some(thread)));
1289-
}
1249+
// we wait on all the processes and propagate the results of compilation.
1250+
let print = PrintThread::new()?;
12901251

1291-
for mut thread in threads {
1292-
if let Some(thread) = thread.0.take() {
1293-
thread.join().expect("thread should not panic")?;
1294-
}
1252+
let children = objs
1253+
.iter()
1254+
.map(|obj| {
1255+
let (mut cmd, program) = self.create_compile_object_cmd(obj)?;
1256+
let token = server.acquire()?;
1257+
1258+
let child = spawn(&mut cmd, &program, print.pipe_writer_cloned()?.unwrap())?;
1259+
1260+
Ok((cmd, program, KillOnDrop(child), token))
1261+
})
1262+
.collect::<Result<Vec<_>, Error>>()?;
1263+
1264+
for (cmd, program, mut child, _token) in children {
1265+
wait_on_child(&cmd, &program, &mut child.0)?;
12951266
}
12961267

12971268
// Reacquire our process's token before we proceed, which we released
@@ -1302,16 +1273,6 @@ impl Build {
13021273

13031274
return Ok(());
13041275

1305-
/// Shared state from the parent thread to the child thread. This
1306-
/// package of pointers is temporarily transmuted to a `'static`
1307-
/// lifetime to cross the thread boundary and then once the thread is
1308-
/// running we erase the `'static` to go back to an anonymous lifetime.
1309-
struct State<'a> {
1310-
build: &'a Build,
1311-
obj: &'a Object,
1312-
error: &'a AtomicBool,
1313-
}
1314-
13151276
/// Returns a suitable `jobserver::Client` used to coordinate
13161277
/// parallelism between build scripts.
13171278
fn jobserver() -> &'static jobserver::Client {
@@ -1357,26 +1318,30 @@ impl Build {
13571318
return client;
13581319
}
13591320

1360-
struct JoinOnDrop(Option<thread::JoinHandle<Result<(), Error>>>);
1321+
struct KillOnDrop(Child);
13611322

1362-
impl Drop for JoinOnDrop {
1323+
impl Drop for KillOnDrop {
13631324
fn drop(&mut self) {
1364-
if let Some(thread) = self.0.take() {
1365-
drop(thread.join());
1366-
}
1325+
let child = &mut self.0;
1326+
1327+
child.kill().ok();
13671328
}
13681329
}
13691330
}
13701331

13711332
#[cfg(not(feature = "parallel"))]
13721333
fn compile_objects(&self, objs: &[Object]) -> Result<(), Error> {
1334+
let print = PrintThread::new()?;
1335+
13731336
for obj in objs {
1374-
self.compile_object(obj)?;
1337+
let (mut cmd, name) = self.create_compile_object_cmd(obj)?;
1338+
run_inner(&mut cmd, &name, print.pipe_writer_cloned()?.unwrap())?;
13751339
}
1340+
13761341
Ok(())
13771342
}
13781343

1379-
fn compile_object(&self, obj: &Object) -> Result<(), Error> {
1344+
fn create_compile_object_cmd(&self, obj: &Object) -> Result<(Command, String), Error> {
13801345
let asm_ext = AsmFileExt::from_path(&obj.src);
13811346
let is_asm = asm_ext.is_some();
13821347
let target = self.get_target()?;
@@ -1425,8 +1390,7 @@ impl Build {
14251390
self.fix_env_for_apple_os(&mut cmd)?;
14261391
}
14271392

1428-
run(&mut cmd, &name)?;
1429-
Ok(())
1393+
Ok((cmd, name))
14301394
}
14311395

14321396
/// This will return a result instead of panicing; see expand() for the complete description.
@@ -3463,21 +3427,19 @@ impl Tool {
34633427
}
34643428
}
34653429

3466-
fn run(cmd: &mut Command, program: &str) -> Result<(), Error> {
3467-
let (mut child, print) = spawn(cmd, program)?;
3430+
fn wait_on_child(cmd: &Command, program: &str, child: &mut Child) -> Result<(), Error> {
34683431
let status = match child.wait() {
34693432
Ok(s) => s,
3470-
Err(_) => {
3433+
Err(e) => {
34713434
return Err(Error::new(
34723435
ErrorKind::ToolExecError,
34733436
&format!(
3474-
"Failed to wait on spawned child process, command {:?} with args {:?}.",
3475-
cmd, program
3437+
"Failed to wait on spawned child process, command {:?} with args {:?}: {}.",
3438+
cmd, program, e
34763439
),
34773440
));
34783441
}
34793442
};
3480-
print.join().unwrap();
34813443
println!("{}", status);
34823444

34833445
if status.success() {
@@ -3493,63 +3455,62 @@ fn run(cmd: &mut Command, program: &str) -> Result<(), Error> {
34933455
}
34943456
}
34953457

3458+
fn run_inner(
3459+
cmd: &mut Command,
3460+
program: &str,
3461+
pipe_writer: os_pipe::PipeWriter,
3462+
) -> Result<(), Error> {
3463+
let mut child = spawn(cmd, program, pipe_writer)?;
3464+
wait_on_child(cmd, program, &mut child)
3465+
}
3466+
3467+
fn run(cmd: &mut Command, program: &str) -> Result<(), Error> {
3468+
let mut print = PrintThread::new()?;
3469+
run_inner(cmd, program, print.pipe_writer().take().unwrap())?;
3470+
3471+
Ok(())
3472+
}
3473+
34963474
fn run_output(cmd: &mut Command, program: &str) -> Result<Vec<u8>, Error> {
34973475
cmd.stdout(Stdio::piped());
3498-
let (mut child, print) = spawn(cmd, program)?;
3476+
3477+
let mut print = PrintThread::new()?;
3478+
let mut child = spawn(cmd, program, print.pipe_writer().take().unwrap())?;
3479+
34993480
let mut stdout = vec![];
35003481
child
35013482
.stdout
35023483
.take()
35033484
.unwrap()
35043485
.read_to_end(&mut stdout)
35053486
.unwrap();
3506-
let status = match child.wait() {
3507-
Ok(s) => s,
3508-
Err(_) => {
3509-
return Err(Error::new(
3510-
ErrorKind::ToolExecError,
3511-
&format!(
3512-
"Failed to wait on spawned child process, command {:?} with args {:?}.",
3513-
cmd, program
3514-
),
3515-
));
3516-
}
3517-
};
3518-
print.join().unwrap();
3519-
println!("{}", status);
35203487

3521-
if status.success() {
3522-
Ok(stdout)
3523-
} else {
3524-
Err(Error::new(
3525-
ErrorKind::ToolExecError,
3526-
&format!(
3527-
"Command {:?} with args {:?} did not execute successfully (status code {}).",
3528-
cmd, program, status
3529-
),
3530-
))
3531-
}
3488+
wait_on_child(cmd, program, &mut child)?;
3489+
3490+
Ok(stdout)
35323491
}
35333492

3534-
fn spawn(cmd: &mut Command, program: &str) -> Result<(Child, JoinHandle<()>), Error> {
3535-
println!("running: {:?}", cmd);
3493+
fn spawn(
3494+
cmd: &mut Command,
3495+
program: &str,
3496+
pipe_writer: os_pipe::PipeWriter,
3497+
) -> Result<Child, Error> {
3498+
struct ResetStderr<'cmd>(&'cmd mut Command);
35363499

3537-
// Capture the standard error coming from these programs, and write it out
3538-
// with cargo:warning= prefixes. Note that this is a bit wonky to avoid
3539-
// requiring the output to be UTF-8, we instead just ship bytes from one
3540-
// location to another.
3541-
match cmd.stderr(Stdio::piped()).spawn() {
3542-
Ok(mut child) => {
3543-
let stderr = BufReader::new(child.stderr.take().unwrap());
3544-
let print = thread::spawn(move || {
3545-
for line in stderr.split(b'\n').filter_map(|l| l.ok()) {
3546-
print!("cargo:warning=");
3547-
std::io::stdout().write_all(&line).unwrap();
3548-
println!("");
3549-
}
3550-
});
3551-
Ok((child, print))
3500+
impl Drop for ResetStderr<'_> {
3501+
fn drop(&mut self) {
3502+
// Reset stderr to default to release pipe_writer so that print thread will
3503+
// not block forever.
3504+
self.0.stderr(Stdio::inherit());
35523505
}
3506+
}
3507+
3508+
println!("running: {:?}", cmd);
3509+
3510+
let cmd = ResetStderr(cmd);
3511+
3512+
match cmd.0.stderr(pipe_writer).spawn() {
3513+
Ok(child) => Ok(child),
35533514
Err(ref e) if e.kind() == io::ErrorKind::NotFound => {
35543515
let extra = if cfg!(windows) {
35553516
" (see https://github.com/rust-lang/cc-rs#compile-time-requirements \
@@ -3562,11 +3523,11 @@ fn spawn(cmd: &mut Command, program: &str) -> Result<(Child, JoinHandle<()>), Er
35623523
&format!("Failed to find tool. Is `{}` installed?{}", program, extra),
35633524
))
35643525
}
3565-
Err(ref e) => Err(Error::new(
3526+
Err(e) => Err(Error::new(
35663527
ErrorKind::ToolExecError,
35673528
&format!(
35683529
"Command {:?} with args {:?} failed to start: {:?}",
3569-
cmd, program, e
3530+
cmd.0, program, e
35703531
),
35713532
)),
35723533
}
@@ -3769,3 +3730,58 @@ impl AsmFileExt {
37693730
None
37703731
}
37713732
}
3733+
3734+
struct PrintThread {
3735+
handle: Option<JoinHandle<()>>,
3736+
pipe_writer: Option<os_pipe::PipeWriter>,
3737+
}
3738+
3739+
impl PrintThread {
3740+
fn new() -> Result<Self, Error> {
3741+
let (pipe_reader, pipe_writer) = os_pipe::pipe()?;
3742+
3743+
// Capture the standard error coming from compilation, and write it out
3744+
// with cargo:warning= prefixes. Note that this is a bit wonky to avoid
3745+
// requiring the output to be UTF-8, we instead just ship bytes from one
3746+
// location to another.
3747+
let print = thread::spawn(move || {
3748+
let mut stderr = BufReader::with_capacity(4096, pipe_reader);
3749+
let mut line = String::with_capacity(20);
3750+
let mut stdout = io::stdout();
3751+
3752+
// read_line returns 0 on Eof
3753+
while stderr.read_line(&mut line).unwrap() != 0 {
3754+
writeln!(&mut stdout, "cargo:warning={}", line).ok();
3755+
3756+
// read_line does not clear the buffer
3757+
line.clear();
3758+
}
3759+
});
3760+
3761+
Ok(Self {
3762+
handle: Some(print),
3763+
pipe_writer: Some(pipe_writer),
3764+
})
3765+
}
3766+
3767+
fn pipe_writer(&mut self) -> &mut Option<os_pipe::PipeWriter> {
3768+
&mut self.pipe_writer
3769+
}
3770+
3771+
fn pipe_writer_cloned(&self) -> Result<Option<os_pipe::PipeWriter>, Error> {
3772+
self.pipe_writer
3773+
.as_ref()
3774+
.map(os_pipe::PipeWriter::try_clone)
3775+
.transpose()
3776+
.map_err(From::from)
3777+
}
3778+
}
3779+
3780+
impl Drop for PrintThread {
3781+
fn drop(&mut self) {
3782+
// Drop pipe_writer first to avoid deadlock
3783+
self.pipe_writer.take();
3784+
3785+
self.handle.take().unwrap().join().unwrap();
3786+
}
3787+
}

0 commit comments

Comments
 (0)