Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions acts/src/env/moudle/vars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ impl UserVars {
}

pub fn get_data(&self, key: &str) -> Option<Vars> {
if let Ok(ctx) = Context::current() {
if let Some(v) = ctx.task().find::<Vars>(key) {
return Some(v);
}
if let Ok(ctx) = Context::current()
&& let Some(v) = ctx.task().find::<Vars>(key)
{
Some(v)
} else {
None
}
None
}
}

Expand Down
1 change: 1 addition & 0 deletions acts/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub enum EventAction {
Error,
Push,
Remove,
SetVars,
SetProcessVars,
}

Expand Down
4 changes: 4 additions & 0 deletions acts/src/export/executor/act_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ impl ActExecutor {
self.do_action(pid, tid, EventAction::Remove, options)
}

pub fn set_task_vars(&self, pid: &str, tid: &str, options: &Vars) -> Result<()> {
self.do_action(pid, tid, EventAction::SetVars, options)
}

pub fn set_process_vars(&self, pid: &str, tid: &str, options: &Vars) -> Result<()> {
self.do_action(pid, tid, EventAction::SetProcessVars, options)
}
Expand Down
12 changes: 6 additions & 6 deletions acts/src/model/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ fn get<T>(name: &str, value: &Value) -> Option<T>
where
T: for<'de> Deserialize<'de> + Clone,
{
if let Some(v) = value.get(name) {
if let Ok(v) = serde_json::from_value::<T>(v.clone()) {
return Some(v);
}
if let Some(v) = value.get(name)
&& let Ok(v) = serde_json::from_value::<T>(v.clone())
{
Some(v)
} else {
None
}

None
}

impl fmt::Display for OutputType {
Expand Down
28 changes: 8 additions & 20 deletions acts/src/model/vars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ pub struct Vars {
inner: Map<String, Value>,
}

pub struct Iter<'a> {
iter: serde_json::map::Iter<'a>,
}

pub struct IterMut<'a> {
iter: serde_json::map::IterMut<'a>,
}
Expand Down Expand Up @@ -58,14 +54,6 @@ impl FromIterator<(String, Value)> for Vars {
}
}

impl<'a> Iterator for Iter<'a> {
type Item = (&'a String, &'a Value);

fn next(&mut self) -> Option<Self::Item> {
self.iter.next()
}
}

impl<'a> Iterator for IterMut<'a> {
type Item = (&'a String, &'a mut Value);

Expand Down Expand Up @@ -163,13 +151,13 @@ impl Vars {
where
T: for<'de> Deserialize<'de> + Clone,
{
if let Some(value) = self.inner.get(name) {
if let Ok(value) = serde_json::from_value::<T>(value.clone()) {
return Some(value);
}
if let Some(value) = self.inner.get(name)
&& let Ok(value) = serde_json::from_value::<T>(value.clone())
{
Some(value)
} else {
None
}

None
}

pub fn get_value(&self, name: &str) -> Option<&Value> {
Expand Down Expand Up @@ -252,8 +240,8 @@ fn from_json_number(n: &serde_json::Number) -> Value {
if n.is_i64() {
Value::Number(serde_json::Number::from(n.as_i64().unwrap()))
} else if n.is_u64() {
return Value::Number(serde_json::Number::from(n.as_u64().unwrap()));
Value::Number(serde_json::Number::from(n.as_u64().unwrap()))
} else {
return Value::Number(serde_json::Number::from_f64(n.as_f64().unwrap()).unwrap());
Value::Number(serde_json::Number::from_f64(n.as_f64().unwrap()).unwrap())
}
}
49 changes: 24 additions & 25 deletions acts/src/scheduler/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,11 +275,11 @@ impl Context {

/// redo the task and dispatch directly
pub fn redo_task(&self, task: &Arc<Task>) -> Result<()> {
if let Some(prev) = task.prev() {
if let Some(prev_task) = self.proc.task(&prev) {
let task = self.proc.create_task(task.node(), Some(prev_task));
self.runtime.push(&task);
}
if let Some(prev) = task.prev()
&& let Some(prev_task) = self.proc.task(&prev)
{
let task = self.proc.create_task(task.node(), Some(prev_task));
self.runtime.push(&task);
}

Ok(())
Expand Down Expand Up @@ -398,13 +398,12 @@ impl Context {
self.emit_task(&task)?;

// after emitting, re-check the task state
if task.state().is_error() {
if let Some(err) = task.err() {
if let Some(parent) = task.parent() {
parent.set_err(&err);
return parent.error(self);
}
}
if task.state().is_error()
&& let Some(err) = task.err()
&& let Some(parent) = task.parent()
{
parent.set_err(&err);
return parent.error(self);
}
}

Expand All @@ -415,26 +414,26 @@ impl Context {
debug!("ctx::emit_task, task={:?}", task);

// on workflow start
if let NodeContent::Workflow(_) = &task.node().content {
if task.state().is_created() {
if self.proc.state().is_none() {
self.proc.set_state(TaskState::Running);
}
self.runtime.scher().emit_proc_event(&self.proc);
if let NodeContent::Workflow(_) = &task.node().content
&& task.state().is_created()
{
if self.proc.state().is_none() {
self.proc.set_state(TaskState::Running);
}
self.runtime.scher().emit_proc_event(&self.proc);
}

self.runtime.scher().emit_task_event(task)?;

// on workflow complete
if let NodeContent::Workflow(_) = &task.node().content {
if task.state().is_completed() {
self.proc.set_state(task.state());
if let Some(err) = task.err() {
self.proc.set_err(&err);
}
self.runtime.scher().emit_proc_event(&self.proc);
if let NodeContent::Workflow(_) = &task.node().content
&& task.state().is_completed()
{
self.proc.set_state(task.state());
if let Some(err) = task.err() {
self.proc.set_err(&err);
}
self.runtime.scher().emit_proc_event(&self.proc);
}

Ok(())
Expand Down
22 changes: 16 additions & 6 deletions acts/src/scheduler/process/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,12 +256,12 @@ impl Task {
pub fn inputs(self: &Arc<Self>) -> Vars {
let ctx = self.create_context();
let mut vars = Vars::new();
if let Some(prev) = self.prev() {
if let Some(prev_task) = self.proc.task(&prev) {
// set the prev task's outputs as current inputs
for (ref k, v) in &prev_task.outputs() {
vars.set(k, v.clone());
}
if let Some(prev) = self.prev()
&& let Some(prev_task) = self.proc.task(&prev)
{
// set the prev task's outputs as current inputs
for (ref k, v) in &prev_task.outputs() {
vars.set(k, v.clone());
}
}

Expand Down Expand Up @@ -569,6 +569,16 @@ impl Task {
task.set_data(&ctx.vars());
task.error(ctx)?;
}
EventAction::SetVars => {
if self.state().is_completed() {
return Err(ActError::Action(format!(
"task '{}:{}' is already completed",
self.pid, self.id
)));
}

self.set_data(&ctx.vars());
}
EventAction::SetProcessVars => {
if self.state().is_completed() {
return Err(ActError::Action(format!(
Expand Down
10 changes: 5 additions & 5 deletions acts/src/scheduler/process/task/act.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,11 @@ impl ActTask for Act {
return Ok(true);
}
}
} else if state.is_skip() || state.is_success() {
if let Some(next) = &task.node.next().upgrade() {
ctx.sched_task(next);
return Ok(true);
}
} else if (state.is_skip() || state.is_success())
&& let Some(next) = &task.node.next().upgrade()
{
ctx.sched_task(next);
return Ok(true);
}
Ok(is_next)
}
Expand Down
2 changes: 1 addition & 1 deletion acts/src/scheduler/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub enum TaskState {
/// task is aborted by abort action
Aborted,

/// task is skippted by exteral action or internal conditions
/// task is skipped by external action or internal conditions
Skipped,

/// task is removed
Expand Down
39 changes: 19 additions & 20 deletions acts/src/scheduler/tree/visit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,34 +74,33 @@ impl Visitor {
.enumerate()
.map(|(i, iter)| {
let mut is_last = i == len - 1;
if iter.kind() == NodeKind::Step {
if let Some(next) = iter.next().upgrade() {
if self.root.visit_count(next.id()) == 0 {
is_last = false;
}
}
if iter.kind() == NodeKind::Step
&& let Some(next) = iter.next().upgrade()
&& self.root.visit_count(next.id()) == 0
{
is_last = false;
}
Visitor::new(&self.root, iter, iter.level, i, is_last, &self.path)
})
.collect::<Vec<_>>()
}

pub fn next_visit(&self) -> Option<Box<Self>> {
if let Some(next) = self.node.next().upgrade() {
if self.root.visit_count(next.id()) == 0 {
let node = Visitor::new(
&self.root,
&next,
next.level,
self.index + 1,
next.next().upgrade().is_none(),
&self.path,
);
return Some(node);
}
if let Some(next) = self.node.next().upgrade()
&& self.root.visit_count(next.id()) == 0
{
let node = Visitor::new(
&self.root,
&next,
next.level,
self.index + 1,
next.next().upgrade().is_none(),
&self.path,
);
Some(node)
} else {
None
}

None
}

pub fn visit(&mut self) {
Expand Down
2 changes: 1 addition & 1 deletion acts/src/store/db/mem/collect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ where
#[allow(unused_assignments)]
let mut rows = vec![];
if !q.is_cond() {
rows = db.iter().map(|(_, v)| v).collect::<Vec<_>>();
rows = db.values().collect::<Vec<_>>();
} else {
let mut q = q.clone();
for cond in q.queries_mut() {
Expand Down
45 changes: 22 additions & 23 deletions acts/src/utils/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,20 +99,20 @@ pub fn fill_outputs(outputs: &Vars, ctx: &Context) -> Vars {
// println!("fill_outputs: outputs={outputs}");
let mut ret = Vars::new();
for (ref k, ref v) in outputs {
if let JsonValue::String(string) = v {
if let Some(expr) = get_expr(string) {
let result = Context::scope(ctx.clone(), move || {
ctx.runtime.env().eval::<JsonValue>(&expr)
});
let new_value = result.unwrap_or_else(|err| {
eprintln!("fill_outputs: expr:{string}, err={err}");
JsonValue::Null
});

// satisfies the rule 1
ret.insert(k.to_string(), new_value);
continue;
}
if let JsonValue::String(string) = v
&& let Some(expr) = get_expr(string)
{
let result = Context::scope(ctx.clone(), move || {
ctx.runtime.env().eval::<JsonValue>(&expr)
});
let new_value = result.unwrap_or_else(|err| {
eprintln!("fill_outputs: expr:{string}, err={err}");
JsonValue::Null
});

// satisfies the rule 1
ret.insert(k.to_string(), new_value);
continue;
}

// rule 2
Expand All @@ -134,17 +134,16 @@ pub fn fill_outputs(outputs: &Vars, ctx: &Context) -> Vars {
pub fn fill_proc_vars(task: &Arc<Task>, values: &Vars, ctx: &Context) -> Vars {
let mut ret = Vars::new();
for (ref k, ref v) in values {
if let JsonValue::String(string) = v {
if let Some(expr) = get_expr(string) {
let result =
Context::scope(ctx.clone(), || ctx.runtime.env().eval::<JsonValue>(&expr));
let new_value = result.unwrap_or(JsonValue::Null);
if let JsonValue::String(string) = v
&& let Some(expr) = get_expr(string)
{
let result = Context::scope(ctx.clone(), || ctx.runtime.env().eval::<JsonValue>(&expr));
let new_value = result.unwrap_or(JsonValue::Null);

// satisfies the rule 1
ret.insert(k.to_string(), new_value);
// satisfies the rule 1
ret.insert(k.to_string(), new_value);

continue;
}
continue;
}

// rule 2
Expand Down
8 changes: 4 additions & 4 deletions examples/timeout/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ impl Client {
}
}

if e.is_msg() {
if let Some(action) = self.messages.get(&e.key) {
action(executor, e);
}
if e.is_msg()
&& let Some(action) = self.messages.get(&e.key)
{
action(executor, e);
}

Ok(())
Expand Down
Loading