Skip to content

Commit

Permalink
Use current thread runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
flxo committed Jan 13, 2022
1 parent 06ba6c4 commit 23f2d75
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 34 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ url = "1.7.2"
which = "2.0.1"
zip = "0.5.2"
termcolor = "1.0.4"
tokio-signal = "0.2.7"

[target.'cfg(target_os = "linux")'.dependencies]
tokio-socketcan = "0.1.3"
Expand Down
53 changes: 21 additions & 32 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@
// SOFTWARE.

use failure::Error;
use futures::{sync::oneshot, Future, Sink, Stream};
use futures::{Future, Sink, Stream};
use rogcat::{parser, record::Record};
use std::{process::exit, str::FromStr};
use tokio::runtime::Runtime;
use tokio_signal::ctrl_c;
use tokio::runtime::current_thread::Runtime;
use url::Url;

mod cli;
Expand Down Expand Up @@ -92,36 +91,26 @@ fn run() -> Result<(), Error> {
let filter = filter::from_args_profile(&args, &profile)?;
let mut parser = parser::Parser::default();

let mut runtime = Runtime::new()?;

let f = source
.map(move |a| match a {
StreamData::Line(l) => parser.parse(&l),
StreamData::Record(r) => r,
})
.filter(move |r| filter.filter(r))
.take_while(move |_| {
Ok(match head {
Some(0) => false,
Some(n) => {
head = Some(n - 1);
true
}
None => true,
Runtime::new()?.block_on(
source
.map(move |a| match a {
StreamData::Line(l) => parser.parse(&l),
StreamData::Record(r) => r,
})
})
.forward(sink)
.map(|_| exit(0))
.map_err(|e| eprintln!("{}", e));
let mut f = Some(oneshot::spawn(f, &runtime.executor()));

// Cancel stream processing on ctrl-c
runtime.block_on(ctrl_c().flatten_stream().take(1).for_each(move |()| {
f.take();
Ok(())
}))?;

Ok(())
.filter(move |r| filter.filter(r))
.take_while(move |_| {
Ok(match head {
Some(0) => false,
Some(n) => {
head = Some(n - 1);
true
}
None => true,
})
})
.forward(sink)
.map(|_| exit(0)),
)
}

fn main() {
Expand Down

0 comments on commit 23f2d75

Please sign in to comment.