Skip to content

Watchexec Async Example #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
[workspace]
resolver = "2"
members = [
"examples_common",
"example_crypto",
"example_parsing",
"example_watchexec",
]

[profile.bench]
Expand All @@ -11,12 +13,21 @@ debug = true
[workspace.dependencies]
anyhow = "1"
base64 = "0.21"
chrono = { version = "0.4", features = ["serde"] }
chrono-tz = "0.8"
clap = { version = "4", features = ["derive"] }
criterion = "0.5"
futures = "0.3"
hex = "0.4"
inotify = "0.10"
nom = "7"
parking_lot = "0.12"
rand = "0.8"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tokio = { version = "1", features = ["full"] }
tracing = "0.1"
tracing-subscriber = "0.3"
tracing-subscriber = "0.3"
watchexec = "3"
watchexec-events = "2"
watchexec-signals = "2"
2 changes: 2 additions & 0 deletions example_crypto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ publish = false

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html



[[bench]]
name = "keygen"
harness = false
Expand Down
1 change: 1 addition & 0 deletions example_watchexec/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
watched
21 changes: 21 additions & 0 deletions example_watchexec/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[package]
name = "example_watchexec"
version = "0.1.0"
edition = "2021"
publish = false

[[example]]
name = "watchexec_async"
path = "examples/watchexec_async.rs"

[dependencies]
anyhow.workspace = true
chrono.workspace = true
examples_common = { path = "../examples_common" }
futures.workspace = true
inotify.workspace = true
tokio.workspace = true
tracing.workspace = true
watchexec.workspace = true
watchexec-events.workspace = true
watchexec-signals.workspace = true
188 changes: 188 additions & 0 deletions example_watchexec/examples/watchexec_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
use std::collections::BTreeMap;
use std::ops::AddAssign;
use std::path::PathBuf;
use std::sync::Arc;

use anyhow::Result;
use chrono::{DateTime, Utc};
use tokio::sync::Mutex;
use tracing::Level;
use watchexec::action::ActionHandler;
use watchexec::Watchexec;
use watchexec_events::filekind::FileEventKind;
use watchexec_events::Tag;
use watchexec_signals::Signal;

use examples_common::logging::LogLevelFilter;

const CARGO_CRATE_NAME: &str = env!("CARGO_CRATE_NAME");
const CARGO_MANIFEST_DIR: &str = env!("CARGO_MANIFEST_DIR");
const WATCHED_FILE_NAME: &str = "watched";

/// Read/write storage available for use during response to [watchexec] events.
#[derive(Debug, Default)]
struct Runtime {
/// A map of events that occurred historically
event_history: BTreeMap<DateTime<Utc>, Vec<ChangeEvent>>,
/// A count of events that have been processed
event_count: usize,
}

impl Runtime {
pub fn new() -> Arc<Mutex<Self>> {
Arc::new(Mutex::new(Default::default()))
}

/// Process an event that occurred in [watchexec].
pub async fn on_event(&mut self, mut action: ActionHandler) -> ActionHandler {
tracing::info!("Received event: {:?}", action);

let ts = Utc::now();

// count events
self.event_count.add_assign(1);

// note shutdown
let mut must_exit = false;

// record event history
let mut events = Vec::new();

// process signals first
for signal in action.signals() {
events.push(ChangeEvent::SignalReceived(signal));

if signal == Signal::Interrupt || signal == Signal::Terminate {
tracing::debug!(ts = ts.to_rfc3339(), %signal, "Received signal to exit");
must_exit = true;
} else {
tracing::debug!(ts = ts.to_rfc3339(), %signal, "Received non-stop signal");
}
}

// then process file events
for event in action.events.iter() {
let mut file_path = Option::<PathBuf>::None;
let mut event_type = Option::<FileChangeKind>::None;

for tag in event.tags.iter() {
match tag {
Tag::Path { path, .. } => {
file_path = Some(path.clone());
}
Tag::FileEventKind(kind) => {
event_type = match kind {
FileEventKind::Access(_) => Some(FileChangeKind::Accessed),
FileEventKind::Create(_) => Some(FileChangeKind::Created),
FileEventKind::Modify(_) => Some(FileChangeKind::Modified),
FileEventKind::Remove(_) => Some(FileChangeKind::Removed),
_ => None
};
}
_ => {}
}
}

if file_path.is_none() || event_type.is_none() {
tracing::debug!(ts = ts.to_rfc3339(), "Not a file event, continuing");
continue;
} else {
let (file_path, event_type) = (file_path.unwrap(), event_type.unwrap());

tracing::debug!(ts = ts.to_rfc3339(), path = %file_path.display(), event_type = ?event_type, "Received file event");

events.push(ChangeEvent::FileChanged(FileChangeEvent {
path: file_path,
kind: event_type,
}));
}
}

// FIXME watchexec also makes it possible to monitor _processes_ and respond to their events

// record the events
self.event_history.insert(ts, events);

// if we need to shut down, make it so
if must_exit {
action.quit();
}

// return the action
action
}
}

#[derive(Debug)]
enum ChangeEvent {
FileChanged(FileChangeEvent),
SignalReceived(Signal),
}

#[derive(Debug)]
struct FileChangeEvent {
#[allow(unused)]
kind: FileChangeKind,
#[allow(unused)]
path: PathBuf,
}

#[derive(Debug)]
enum FileChangeKind {
Accessed,
Created,
Modified,
Removed,
}

#[tokio::main]
async fn main() -> Result<()> {
// initialize logging
examples_common::logging::init_logging(LogLevelFilter::builder()
.global(Level::WARN)
.level(CARGO_CRATE_NAME, Level::TRACE)
.level(examples_common::CRATE_NAME, Level::DEBUG)
.build());


// start work
let workdir = PathBuf::from(CARGO_MANIFEST_DIR);
let w = workdir.join(WATCHED_FILE_NAME);

if !w.is_file() {
// if the file doesn't exist, create it
tokio::fs::write(&w, "init").await?;
}

tracing::info!("Starting watchexec");
tracing::info!("Modify ./{} to trigger events", PathBuf::from(example_watchexec::CRATE_NAME).join(WATCHED_FILE_NAME).display());

let rt = Runtime::new();
let rt2 = rt.clone();

// NOTE this is the most difficult part of all of this
/*
Watchexec::nwe_async accept one argument:

impl Fn(ActionHandler) -> Box<dyn Future<Output=ActionHandler> + Send + Sync> + Send + Sync + 'static

So, what we need here is a regular (non-async) function as the first argument. This function must return a Box co
*/
let wx = Watchexec::new_async(move |action| {
let rt = rt2.clone();
Box::new(async move {
rt.lock().await.on_event(action).await
})
})?;

wx.config.pathset([w]);
wx.main().await??;

let total_events: usize = {
rt.lock().await.event_history.values().map(|v| v.len()).sum()
};

tracing::info!(events = rt.lock().await.event_count, total_events, "Shutting down");

Ok(())
}
2 changes: 2 additions & 0 deletions example_watchexec/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/// Convenience constant for the crate name
pub const CRATE_NAME: &str = env!("CARGO_CRATE_NAME");