diff --git a/acts/src/env/moudle/vars.rs b/acts/src/env/moudle/vars.rs index ef8e97b..a98a17b 100644 --- a/acts/src/env/moudle/vars.rs +++ b/acts/src/env/moudle/vars.rs @@ -15,12 +15,13 @@ impl UserVars { } pub fn get_data(&self, key: &str) -> Option { - if let Ok(ctx) = Context::current() { - if let Some(v) = ctx.task().find::(key) { - return Some(v); - } + if let Ok(ctx) = Context::current() + && let Some(v) = ctx.task().find::(key) + { + Some(v) + } else { + None } - None } } diff --git a/acts/src/event/mod.rs b/acts/src/event/mod.rs index 2f0a62f..42296b7 100644 --- a/acts/src/event/mod.rs +++ b/acts/src/event/mod.rs @@ -40,6 +40,7 @@ pub enum EventAction { Error, Push, Remove, + SetVars, SetProcessVars, } diff --git a/acts/src/export/executor/act_executor.rs b/acts/src/export/executor/act_executor.rs index c52da7c..0f2d3b2 100644 --- a/acts/src/export/executor/act_executor.rs +++ b/acts/src/export/executor/act_executor.rs @@ -50,6 +50,10 @@ impl ActExecutor { self.do_action(pid, tid, EventAction::Remove, options) } + pub fn set_task_vars(&self, pid: &str, tid: &str, options: &Vars) -> Result<()> { + self.do_action(pid, tid, EventAction::SetVars, options) + } + pub fn set_process_vars(&self, pid: &str, tid: &str, options: &Vars) -> Result<()> { self.do_action(pid, tid, EventAction::SetProcessVars, options) } diff --git a/acts/src/model/output.rs b/acts/src/model/output.rs index f2f01c4..1ddfb6f 100644 --- a/acts/src/model/output.rs +++ b/acts/src/model/output.rs @@ -30,13 +30,13 @@ fn get(name: &str, value: &Value) -> Option where T: for<'de> Deserialize<'de> + Clone, { - if let Some(v) = value.get(name) { - if let Ok(v) = serde_json::from_value::(v.clone()) { - return Some(v); - } + if let Some(v) = value.get(name) + && let Ok(v) = serde_json::from_value::(v.clone()) + { + Some(v) + } else { + None } - - None } impl fmt::Display for OutputType { diff --git a/acts/src/model/vars.rs b/acts/src/model/vars.rs index 5c52e38..f47a481 100644 --- a/acts/src/model/vars.rs +++ b/acts/src/model/vars.rs @@ -9,10 +9,6 @@ pub struct Vars { inner: Map, } -pub struct Iter<'a> { - iter: serde_json::map::Iter<'a>, -} - pub struct IterMut<'a> { iter: serde_json::map::IterMut<'a>, } @@ -58,14 +54,6 @@ impl FromIterator<(String, Value)> for Vars { } } -impl<'a> Iterator for Iter<'a> { - type Item = (&'a String, &'a Value); - - fn next(&mut self) -> Option { - self.iter.next() - } -} - impl<'a> Iterator for IterMut<'a> { type Item = (&'a String, &'a mut Value); @@ -163,13 +151,13 @@ impl Vars { where T: for<'de> Deserialize<'de> + Clone, { - if let Some(value) = self.inner.get(name) { - if let Ok(value) = serde_json::from_value::(value.clone()) { - return Some(value); - } + if let Some(value) = self.inner.get(name) + && let Ok(value) = serde_json::from_value::(value.clone()) + { + Some(value) + } else { + None } - - None } pub fn get_value(&self, name: &str) -> Option<&Value> { @@ -252,8 +240,8 @@ fn from_json_number(n: &serde_json::Number) -> Value { if n.is_i64() { Value::Number(serde_json::Number::from(n.as_i64().unwrap())) } else if n.is_u64() { - return Value::Number(serde_json::Number::from(n.as_u64().unwrap())); + Value::Number(serde_json::Number::from(n.as_u64().unwrap())) } else { - return Value::Number(serde_json::Number::from_f64(n.as_f64().unwrap()).unwrap()); + Value::Number(serde_json::Number::from_f64(n.as_f64().unwrap()).unwrap()) } } diff --git a/acts/src/scheduler/context.rs b/acts/src/scheduler/context.rs index 39a9fd0..d5774a2 100644 --- a/acts/src/scheduler/context.rs +++ b/acts/src/scheduler/context.rs @@ -275,11 +275,11 @@ impl Context { /// redo the task and dispatch directly pub fn redo_task(&self, task: &Arc) -> Result<()> { - if let Some(prev) = task.prev() { - if let Some(prev_task) = self.proc.task(&prev) { - let task = self.proc.create_task(task.node(), Some(prev_task)); - self.runtime.push(&task); - } + if let Some(prev) = task.prev() + && let Some(prev_task) = self.proc.task(&prev) + { + let task = self.proc.create_task(task.node(), Some(prev_task)); + self.runtime.push(&task); } Ok(()) @@ -398,13 +398,12 @@ impl Context { self.emit_task(&task)?; // after emitting, re-check the task state - if task.state().is_error() { - if let Some(err) = task.err() { - if let Some(parent) = task.parent() { - parent.set_err(&err); - return parent.error(self); - } - } + if task.state().is_error() + && let Some(err) = task.err() + && let Some(parent) = task.parent() + { + parent.set_err(&err); + return parent.error(self); } } @@ -415,26 +414,26 @@ impl Context { debug!("ctx::emit_task, task={:?}", task); // on workflow start - if let NodeContent::Workflow(_) = &task.node().content { - if task.state().is_created() { - if self.proc.state().is_none() { - self.proc.set_state(TaskState::Running); - } - self.runtime.scher().emit_proc_event(&self.proc); + if let NodeContent::Workflow(_) = &task.node().content + && task.state().is_created() + { + if self.proc.state().is_none() { + self.proc.set_state(TaskState::Running); } + self.runtime.scher().emit_proc_event(&self.proc); } self.runtime.scher().emit_task_event(task)?; // on workflow complete - if let NodeContent::Workflow(_) = &task.node().content { - if task.state().is_completed() { - self.proc.set_state(task.state()); - if let Some(err) = task.err() { - self.proc.set_err(&err); - } - self.runtime.scher().emit_proc_event(&self.proc); + if let NodeContent::Workflow(_) = &task.node().content + && task.state().is_completed() + { + self.proc.set_state(task.state()); + if let Some(err) = task.err() { + self.proc.set_err(&err); } + self.runtime.scher().emit_proc_event(&self.proc); } Ok(()) diff --git a/acts/src/scheduler/process/task.rs b/acts/src/scheduler/process/task.rs index dfddb14..fdd4672 100644 --- a/acts/src/scheduler/process/task.rs +++ b/acts/src/scheduler/process/task.rs @@ -256,12 +256,12 @@ impl Task { pub fn inputs(self: &Arc) -> Vars { let ctx = self.create_context(); let mut vars = Vars::new(); - if let Some(prev) = self.prev() { - if let Some(prev_task) = self.proc.task(&prev) { - // set the prev task's outputs as current inputs - for (ref k, v) in &prev_task.outputs() { - vars.set(k, v.clone()); - } + if let Some(prev) = self.prev() + && let Some(prev_task) = self.proc.task(&prev) + { + // set the prev task's outputs as current inputs + for (ref k, v) in &prev_task.outputs() { + vars.set(k, v.clone()); } } @@ -569,6 +569,16 @@ impl Task { task.set_data(&ctx.vars()); task.error(ctx)?; } + EventAction::SetVars => { + if self.state().is_completed() { + return Err(ActError::Action(format!( + "task '{}:{}' is already completed", + self.pid, self.id + ))); + } + + self.set_data(&ctx.vars()); + } EventAction::SetProcessVars => { if self.state().is_completed() { return Err(ActError::Action(format!( diff --git a/acts/src/scheduler/process/task/act.rs b/acts/src/scheduler/process/task/act.rs index c267c1b..3046ac1 100644 --- a/acts/src/scheduler/process/task/act.rs +++ b/acts/src/scheduler/process/task/act.rs @@ -128,11 +128,11 @@ impl ActTask for Act { return Ok(true); } } - } else if state.is_skip() || state.is_success() { - if let Some(next) = &task.node.next().upgrade() { - ctx.sched_task(next); - return Ok(true); - } + } else if (state.is_skip() || state.is_success()) + && let Some(next) = &task.node.next().upgrade() + { + ctx.sched_task(next); + return Ok(true); } Ok(is_next) } diff --git a/acts/src/scheduler/state.rs b/acts/src/scheduler/state.rs index 1e4ed46..1c3dd4f 100644 --- a/acts/src/scheduler/state.rs +++ b/acts/src/scheduler/state.rs @@ -37,7 +37,7 @@ pub enum TaskState { /// task is aborted by abort action Aborted, - /// task is skippted by exteral action or internal conditions + /// task is skipped by external action or internal conditions Skipped, /// task is removed diff --git a/acts/src/scheduler/tree/visit.rs b/acts/src/scheduler/tree/visit.rs index 6a1bed4..2d6e616 100644 --- a/acts/src/scheduler/tree/visit.rs +++ b/acts/src/scheduler/tree/visit.rs @@ -74,12 +74,11 @@ impl Visitor { .enumerate() .map(|(i, iter)| { let mut is_last = i == len - 1; - if iter.kind() == NodeKind::Step { - if let Some(next) = iter.next().upgrade() { - if self.root.visit_count(next.id()) == 0 { - is_last = false; - } - } + if iter.kind() == NodeKind::Step + && let Some(next) = iter.next().upgrade() + && self.root.visit_count(next.id()) == 0 + { + is_last = false; } Visitor::new(&self.root, iter, iter.level, i, is_last, &self.path) }) @@ -87,21 +86,21 @@ impl Visitor { } pub fn next_visit(&self) -> Option> { - if let Some(next) = self.node.next().upgrade() { - if self.root.visit_count(next.id()) == 0 { - let node = Visitor::new( - &self.root, - &next, - next.level, - self.index + 1, - next.next().upgrade().is_none(), - &self.path, - ); - return Some(node); - } + if let Some(next) = self.node.next().upgrade() + && self.root.visit_count(next.id()) == 0 + { + let node = Visitor::new( + &self.root, + &next, + next.level, + self.index + 1, + next.next().upgrade().is_none(), + &self.path, + ); + Some(node) + } else { + None } - - None } pub fn visit(&mut self) { diff --git a/acts/src/store/db/mem/collect.rs b/acts/src/store/db/mem/collect.rs index 4f9da69..4648196 100644 --- a/acts/src/store/db/mem/collect.rs +++ b/acts/src/store/db/mem/collect.rs @@ -60,7 +60,7 @@ where #[allow(unused_assignments)] let mut rows = vec![]; if !q.is_cond() { - rows = db.iter().map(|(_, v)| v).collect::>(); + rows = db.values().collect::>(); } else { let mut q = q.clone(); for cond in q.queries_mut() { diff --git a/acts/src/utils/convert.rs b/acts/src/utils/convert.rs index 2d6ff4b..4d8b079 100644 --- a/acts/src/utils/convert.rs +++ b/acts/src/utils/convert.rs @@ -99,20 +99,20 @@ pub fn fill_outputs(outputs: &Vars, ctx: &Context) -> Vars { // println!("fill_outputs: outputs={outputs}"); let mut ret = Vars::new(); for (ref k, ref v) in outputs { - if let JsonValue::String(string) = v { - if let Some(expr) = get_expr(string) { - let result = Context::scope(ctx.clone(), move || { - ctx.runtime.env().eval::(&expr) - }); - let new_value = result.unwrap_or_else(|err| { - eprintln!("fill_outputs: expr:{string}, err={err}"); - JsonValue::Null - }); - - // satisfies the rule 1 - ret.insert(k.to_string(), new_value); - continue; - } + if let JsonValue::String(string) = v + && let Some(expr) = get_expr(string) + { + let result = Context::scope(ctx.clone(), move || { + ctx.runtime.env().eval::(&expr) + }); + let new_value = result.unwrap_or_else(|err| { + eprintln!("fill_outputs: expr:{string}, err={err}"); + JsonValue::Null + }); + + // satisfies the rule 1 + ret.insert(k.to_string(), new_value); + continue; } // rule 2 @@ -134,17 +134,16 @@ pub fn fill_outputs(outputs: &Vars, ctx: &Context) -> Vars { pub fn fill_proc_vars(task: &Arc, values: &Vars, ctx: &Context) -> Vars { let mut ret = Vars::new(); for (ref k, ref v) in values { - if let JsonValue::String(string) = v { - if let Some(expr) = get_expr(string) { - let result = - Context::scope(ctx.clone(), || ctx.runtime.env().eval::(&expr)); - let new_value = result.unwrap_or(JsonValue::Null); + if let JsonValue::String(string) = v + && let Some(expr) = get_expr(string) + { + let result = Context::scope(ctx.clone(), || ctx.runtime.env().eval::(&expr)); + let new_value = result.unwrap_or(JsonValue::Null); - // satisfies the rule 1 - ret.insert(k.to_string(), new_value); + // satisfies the rule 1 + ret.insert(k.to_string(), new_value); - continue; - } + continue; } // rule 2 diff --git a/examples/timeout/client.rs b/examples/timeout/client.rs index 7464862..719c0cf 100644 --- a/examples/timeout/client.rs +++ b/examples/timeout/client.rs @@ -41,10 +41,10 @@ impl Client { } } - if e.is_msg() { - if let Some(action) = self.messages.get(&e.key) { - action(executor, e); - } + if e.is_msg() + && let Some(action) = self.messages.get(&e.key) + { + action(executor, e); } Ok(())