Skip to content

Commit 3bbf340

Browse files
authored
Merge pull request #83 from delan/fix-wait-with-pipe
Fix wait_with_pipe() so it waits for last child and uses exit status
2 parents 171690e + a0bb123 commit 3bbf340

File tree

3 files changed

+158
-74
lines changed

3 files changed

+158
-74
lines changed

src/child.rs

Lines changed: 143 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use crate::{info, warn};
22
use crate::{process, CmdResult, FunResult};
33
use os_pipe::PipeReader;
4+
use std::any::Any;
5+
use std::fmt::Display;
46
use std::io::{BufRead, BufReader, Error, ErrorKind, Read, Result};
57
use std::process::{Child, ExitStatus};
68
use std::thread::JoinHandle;
@@ -115,32 +117,58 @@ impl FunChildren {
115117
}
116118
}
117119

118-
/// Waits for the children processes to exit completely, pipe content will be processed by
119-
/// provided function.
120-
pub fn wait_with_pipe(&mut self, f: &mut dyn FnMut(Box<dyn Read>)) -> CmdResult {
121-
let child = self.children.pop().unwrap();
122-
let stderr_thread =
123-
StderrThread::new(&child.cmd, &child.file, child.line, child.stderr, false);
124-
match child.handle {
125-
CmdChildHandle::Proc(mut proc) => {
126-
if let Some(stdout) = child.stdout {
127-
f(Box::new(stdout));
128-
let _ = proc.kill();
129-
}
130-
}
131-
CmdChildHandle::Thread(_) => {
132-
if let Some(stdout) = child.stdout {
133-
f(Box::new(stdout));
134-
}
135-
}
136-
CmdChildHandle::SyncFn => {
137-
if let Some(stdout) = child.stdout {
138-
f(Box::new(stdout));
120+
/// Pipes stdout from the last child in the pipeline to the given function, which runs in
121+
/// the current thread, then waits for all of the children to exit.
122+
///
123+
/// If the function returns early, without reading from stdout until the last child exits,
124+
/// then the rest of stdout is automatically read and discarded to allow the child to finish.
125+
pub fn wait_with_pipe(&mut self, f: &mut dyn FnMut(&mut Box<dyn Read>)) -> CmdResult {
126+
let mut last_child = self.children.pop().unwrap();
127+
let mut stderr_thread = StderrThread::new(
128+
&last_child.cmd,
129+
&last_child.file,
130+
last_child.line,
131+
last_child.stderr.take(),
132+
false,
133+
);
134+
let last_child_res = if let Some(stdout) = last_child.stdout {
135+
let mut stdout: Box<dyn Read> = Box::new(stdout);
136+
f(&mut stdout);
137+
// The provided function may have left some of stdout unread.
138+
// Continue reading stdout on its behalf, until the child exits.
139+
let mut buf = vec![0; 65536];
140+
let outcome: Box<dyn ChildOutcome> = loop {
141+
match last_child.handle {
142+
CmdChildHandle::Proc(ref mut child) => {
143+
if let Some(result) = child.try_wait().transpose() {
144+
break Box::new(ProcWaitOutcome::from(result));
145+
}
146+
}
147+
CmdChildHandle::Thread(ref mut join_handle) => {
148+
if let Some(handle) = join_handle.take() {
149+
if handle.is_finished() {
150+
break Box::new(ThreadJoinOutcome::from(handle.join()));
151+
} else {
152+
join_handle.replace(handle);
153+
}
154+
}
155+
}
156+
CmdChildHandle::SyncFn => {
157+
break Box::new(SyncFnOutcome);
158+
}
139159
}
140-
}
160+
let _ = stdout.read(&mut buf);
161+
};
162+
outcome.to_io_result(&last_child.cmd, &last_child.file, last_child.line)
163+
} else {
164+
last_child.wait(true)
141165
};
142-
drop(stderr_thread);
143-
CmdChildren::wait_children(&mut self.children)
166+
let other_children_res = CmdChildren::wait_children(&mut self.children);
167+
let _ = stderr_thread.join();
168+
169+
self.ignore_error
170+
.then_some(Ok(()))
171+
.unwrap_or(last_child_res.and(other_children_res))
144172
}
145173

146174
/// Returns the OS-assigned process identifiers associated with these children processes.
@@ -253,62 +281,108 @@ impl CmdChild {
253281

254282
pub(crate) enum CmdChildHandle {
255283
Proc(Child),
256-
Thread(JoinHandle<CmdResult>),
284+
Thread(Option<JoinHandle<CmdResult>>),
257285
SyncFn,
258286
}
259287

260-
impl CmdChildHandle {
261-
fn wait(self, cmd: &str, file: &str, line: u32) -> CmdResult {
262-
match self {
263-
CmdChildHandle::Proc(mut proc) => {
264-
let status = proc.wait();
265-
match status {
266-
Err(e) => return Err(process::new_cmd_io_error(&e, cmd, file, line)),
267-
Ok(status) => {
268-
if !status.success() {
269-
return Err(Self::status_to_io_error(status, cmd, file, line));
270-
}
271-
}
272-
}
273-
}
274-
CmdChildHandle::Thread(thread) => {
275-
let status = thread.join();
276-
match status {
277-
Ok(result) => {
278-
if let Err(e) = result {
279-
return Err(process::new_cmd_io_error(&e, cmd, file, line));
280-
}
281-
}
282-
Err(e) => {
283-
return Err(Error::new(
284-
ErrorKind::Other,
285-
format!(
286-
"Running [{cmd}] thread joined with error: {e:?} at {file}:{line}"
287-
),
288-
))
289-
}
288+
#[derive(Debug)]
289+
struct ProcWaitOutcome(std::io::Result<ExitStatus>);
290+
impl From<std::io::Result<ExitStatus>> for ProcWaitOutcome {
291+
fn from(result: std::io::Result<ExitStatus>) -> Self {
292+
Self(result)
293+
}
294+
}
295+
impl Display for ProcWaitOutcome {
296+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
297+
match &self.0 {
298+
Ok(status) => {
299+
if status.success() {
300+
write!(f, "Command process succeeded")
301+
} else if let Some(code) = status.code() {
302+
write!(f, "Command process exited normally with status code {code}")
303+
} else {
304+
write!(f, "Command process exited abnormally: {status}")
290305
}
291306
}
292-
CmdChildHandle::SyncFn => {}
307+
Err(error) => write!(f, "Failed to wait for command process: {error:?}"),
293308
}
294-
Ok(())
295309
}
296-
297-
fn status_to_io_error(status: ExitStatus, cmd: &str, file: &str, line: u32) -> Error {
298-
if let Some(code) = status.code() {
299-
Error::new(
300-
ErrorKind::Other,
301-
format!("Running [{cmd}] exited with error; status code: {code} at {file}:{line}"),
302-
)
310+
}
311+
#[derive(Debug)]
312+
enum ThreadJoinOutcome {
313+
Ok,
314+
Err(std::io::Error),
315+
Panic(Box<dyn Any + Send + 'static>),
316+
}
317+
impl From<std::thread::Result<CmdResult>> for ThreadJoinOutcome {
318+
fn from(result: std::thread::Result<CmdResult>) -> Self {
319+
match result {
320+
Ok(Ok(())) => Self::Ok,
321+
Ok(Err(error)) => Self::Err(error),
322+
Err(panic) => Self::Panic(panic),
323+
}
324+
}
325+
}
326+
impl Display for ThreadJoinOutcome {
327+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
328+
match self {
329+
Self::Ok => write!(f, "Command thread succeeded"),
330+
Self::Err(error) => write!(f, "Command thread returned error: {error:?}"),
331+
Self::Panic(panic) => write!(f, "Command thread panicked: {panic:?}"),
332+
}
333+
}
334+
}
335+
#[derive(Debug)]
336+
struct SyncFnOutcome;
337+
impl Display for SyncFnOutcome {
338+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
339+
write!(f, "Command finished")
340+
}
341+
}
342+
trait ChildOutcome: Display {
343+
fn success(&self) -> bool;
344+
fn to_io_result(&self, cmd: &str, file: &str, line: u32) -> std::io::Result<()> {
345+
if self.success() {
346+
Ok(())
303347
} else {
304-
Error::new(
348+
Err(Error::new(
305349
ErrorKind::Other,
306-
format!(
307-
"Running [{cmd}] exited with error; terminated by {status} at {file}:{line}"
308-
),
309-
)
350+
format!("Running [{cmd}] exited with error; {self} at {file}:{line}"),
351+
))
310352
}
311353
}
354+
}
355+
impl ChildOutcome for ProcWaitOutcome {
356+
fn success(&self) -> bool {
357+
self.0.as_ref().is_ok_and(|status| status.success())
358+
}
359+
}
360+
impl ChildOutcome for ThreadJoinOutcome {
361+
fn success(&self) -> bool {
362+
matches!(self, Self::Ok)
363+
}
364+
}
365+
impl ChildOutcome for SyncFnOutcome {
366+
fn success(&self) -> bool {
367+
true
368+
}
369+
}
370+
371+
impl CmdChildHandle {
372+
fn wait(self, cmd: &str, file: &str, line: u32) -> CmdResult {
373+
let outcome: Box<dyn ChildOutcome> = match self {
374+
CmdChildHandle::Proc(mut proc) => Box::new(ProcWaitOutcome::from(proc.wait())),
375+
CmdChildHandle::Thread(mut thread) => {
376+
if let Some(thread) = thread.take() {
377+
Box::new(ThreadJoinOutcome::from(thread.join()))
378+
} else {
379+
unreachable!()
380+
}
381+
}
382+
CmdChildHandle::SyncFn => return Ok(()),
383+
};
384+
outcome.to_io_result(cmd, file, line)
385+
}
312386

313387
fn kill(self, cmd: &str, file: &str, line: u32) -> CmdResult {
314388
match self {

src/process.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,7 @@ impl Cmd {
460460
if pipe_out || with_output {
461461
let handle = thread::Builder::new().spawn(move || internal_cmd(&mut env))?;
462462
Ok(CmdChild::new(
463-
CmdChildHandle::Thread(handle),
463+
CmdChildHandle::Thread(Some(handle)),
464464
full_cmds,
465465
self.file,
466466
self.line,

tests/test_macros.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,17 @@ fn test_pipe() {
217217
test_case!(true, true, ($macro $bang (ignore true | false)) $($after)*),
218218
test_case!(true, true, ($macro $bang (ignore false | true)) $($after)*),
219219
test_case!(true, true, ($macro $bang (ignore false | false)) $($after)*),
220+
// Built-ins should work too, without locking up.
221+
test_case!(true, true, ($macro $bang (echo)) $($after)*),
222+
test_case!(true, true, ($macro $bang (echo | true)) $($after)*),
223+
test_case!(false, false, ($macro $bang (echo | false)) $($after)*),
224+
test_case!(true, true, ($macro $bang (true | echo)) $($after)*),
225+
test_case!(false, true, ($macro $bang (false | echo)) $($after)*),
226+
test_case!(true, true, ($macro $bang (cd /)) $($after)*),
227+
test_case!(true, true, ($macro $bang (cd / | true)) $($after)*),
228+
test_case!(false, false, ($macro $bang (cd / | false)) $($after)*),
229+
test_case!(true, true, ($macro $bang (true | cd /)) $($after)*),
230+
test_case!(false, true, ($macro $bang (false | cd /)) $($after)*),
220231
]
221232
};
222233
}
@@ -233,10 +244,9 @@ fn test_pipe() {
233244
test_cases_for_entry_point!((spawn_with_output!(...))
234245
.unwrap()
235246
.wait_with_raw_output(&mut vec![])),
236-
// FIXME: wait_with_pipe() is currently busted
237-
// test_cases_for_entry_point!((spawn_with_output!(...))
238-
// .unwrap()
239-
// .wait_with_pipe(&mut |_stdout| {})),
247+
test_cases_for_entry_point!((spawn_with_output!(...))
248+
.unwrap()
249+
.wait_with_pipe(&mut |_stdout| {})),
240250
];
241251

242252
macro_rules! check_eq {

0 commit comments

Comments
 (0)