Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 5 additions & 13 deletions crates/mofa-cli/src/state/agent_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,7 @@ pub struct AgentMetadata {
impl AgentMetadata {
/// Create new agent metadata
pub fn new(id: String, name: String) -> Self {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let now = mofa_kernel::utils::now_ms();

Self {
id,
Expand Down Expand Up @@ -106,10 +103,7 @@ impl AgentMetadata {

/// Mark as started
pub fn mark_started(&mut self, pid: u32) {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let now = mofa_kernel::utils::now_ms();
self.last_started = Some(now);
self.process_id = Some(pid);
self.last_state = AgentProcessState::Running;
Expand All @@ -118,10 +112,7 @@ impl AgentMetadata {

/// Mark as stopped
pub fn mark_stopped(&mut self) {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let now = mofa_kernel::utils::now_ms();
self.last_stopped = Some(now);
self.process_id = None;
self.last_state = AgentProcessState::Stopped;
Expand Down Expand Up @@ -342,7 +333,8 @@ mod tests {
"tags": []
}"#;

let metadata: AgentMetadata = serde_json::from_str(legacy).expect("legacy metadata should deserialize");
let metadata: AgentMetadata =
serde_json::from_str(legacy).expect("legacy metadata should deserialize");
assert_eq!(metadata.last_state, AgentProcessState::Stopped);
assert_eq!(metadata.id, "agent-legacy");
}
Expand Down
4 changes: 2 additions & 2 deletions crates/mofa-extra/src/rhai/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ impl RhaiScriptEngine {
// Execute the script
let result = self.engine.eval_with_scope::<Dynamic>(&mut scope, source);

let execution_time_ms = start_time.elapsed().as_millis() as u64;
let execution_time_ms = u64::try_from(start_time.elapsed().as_millis()).unwrap_or(u64::MAX);
let logs = self.logs.read().await.clone();

match result {
Expand Down Expand Up @@ -607,7 +607,7 @@ impl RhaiScriptEngine {
.engine
.eval_ast_with_scope::<Dynamic>(&mut scope, &compiled.ast);

let execution_time_ms = start_time.elapsed().as_millis() as u64;
let execution_time_ms = u64::try_from(start_time.elapsed().as_millis()).unwrap_or(u64::MAX);
let logs = self.logs.read().await.clone();

match result {
Expand Down
30 changes: 19 additions & 11 deletions crates/mofa-extra/src/rhai/rules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,8 @@ impl RuleEngine {
success: false,
result: serde_json::Value::Null,
error: result.error,
execution_time_ms: start_time.elapsed().as_millis() as u64,
execution_time_ms: u64::try_from(start_time.elapsed().as_millis())
.unwrap_or(u64::MAX),
variable_updates,
triggered_events,
});
Expand All @@ -455,7 +456,8 @@ impl RuleEngine {
"Invalid function name: '{}'. Must be a valid identifier.",
function
)),
execution_time_ms: start_time.elapsed().as_millis() as u64,
execution_time_ms: u64::try_from(start_time.elapsed().as_millis())
.unwrap_or(u64::MAX),
variable_updates,
triggered_events,
});
Expand All @@ -473,7 +475,8 @@ impl RuleEngine {
success: false,
result: serde_json::Value::Null,
error: result.error,
execution_time_ms: start_time.elapsed().as_millis() as u64,
execution_time_ms: u64::try_from(start_time.elapsed().as_millis())
.unwrap_or(u64::MAX),
variable_updates,
triggered_events,
});
Expand Down Expand Up @@ -530,7 +533,8 @@ impl RuleEngine {
success: true,
result,
error: None,
execution_time_ms: start_time.elapsed().as_millis() as u64,
execution_time_ms: u64::try_from(start_time.elapsed().as_millis())
.unwrap_or(u64::MAX),
variable_updates,
triggered_events,
})
Expand Down Expand Up @@ -558,7 +562,8 @@ impl RuleEngine {
success: false,
result: serde_json::Value::Null,
error: result.error,
execution_time_ms: start_time.elapsed().as_millis() as u64,
execution_time_ms: u64::try_from(start_time.elapsed().as_millis())
.unwrap_or(u64::MAX),
variable_updates,
triggered_events,
});
Expand All @@ -577,7 +582,8 @@ impl RuleEngine {
"Invalid function name: '{}'. Must be a valid identifier.",
function
)),
execution_time_ms: start_time.elapsed().as_millis() as u64,
execution_time_ms: u64::try_from(start_time.elapsed().as_millis())
.unwrap_or(u64::MAX),
variable_updates,
triggered_events,
});
Expand All @@ -595,7 +601,8 @@ impl RuleEngine {
success: false,
result: serde_json::Value::Null,
error: result.error,
execution_time_ms: start_time.elapsed().as_millis() as u64,
execution_time_ms: u64::try_from(start_time.elapsed().as_millis())
.unwrap_or(u64::MAX),
variable_updates,
triggered_events,
});
Expand Down Expand Up @@ -641,7 +648,7 @@ impl RuleEngine {
success: true,
result,
error: None,
execution_time_ms: start_time.elapsed().as_millis() as u64,
execution_time_ms: u64::try_from(start_time.elapsed().as_millis()).unwrap_or(u64::MAX),
variable_updates,
triggered_events,
})
Expand Down Expand Up @@ -694,7 +701,7 @@ impl RuleEngine {
final_result: None,
any_matched: false,
used_default: false,
total_time_ms: start_time.elapsed().as_millis() as u64,
total_time_ms: u64::try_from(start_time.elapsed().as_millis()).unwrap_or(u64::MAX),
});
}

Expand Down Expand Up @@ -722,7 +729,8 @@ impl RuleEngine {
match_results.push(RuleMatchResult {
rule_id: rule.id.clone(),
matched,
evaluation_time_ms: eval_start.elapsed().as_millis() as u64,
evaluation_time_ms: u64::try_from(eval_start.elapsed().as_millis())
.unwrap_or(u64::MAX),
});

if !matched {
Expand Down Expand Up @@ -792,7 +800,7 @@ impl RuleEngine {
final_result,
any_matched,
used_default,
total_time_ms: start_time.elapsed().as_millis() as u64,
total_time_ms: u64::try_from(start_time.elapsed().as_millis()).unwrap_or(u64::MAX),
})
}

Expand Down
12 changes: 8 additions & 4 deletions crates/mofa-extra/src/rhai/tools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,8 @@ impl ScriptToolRegistry {
success: true,
result: value,
error: None,
execution_time_ms: start_time.elapsed().as_millis() as u64,
execution_time_ms: u64::try_from(start_time.elapsed().as_millis())
.unwrap_or(u64::MAX),
logs: Vec::new(),
}),
Err(_e) => {
Expand All @@ -641,7 +642,8 @@ impl ScriptToolRegistry {
success: true,
result: script_result.value,
error: None,
execution_time_ms: start_time.elapsed().as_millis() as u64,
execution_time_ms: u64::try_from(start_time.elapsed().as_millis())
.unwrap_or(u64::MAX),
logs: script_result.logs,
})
} else {
Expand All @@ -650,7 +652,8 @@ impl ScriptToolRegistry {
success: false,
result: serde_json::Value::Null,
error: script_result.error,
execution_time_ms: start_time.elapsed().as_millis() as u64,
execution_time_ms: u64::try_from(start_time.elapsed().as_millis())
.unwrap_or(u64::MAX),
logs: script_result.logs,
})
}
Expand All @@ -663,7 +666,8 @@ impl ScriptToolRegistry {
success: script_result.success,
result: script_result.value,
error: script_result.error,
execution_time_ms: start_time.elapsed().as_millis() as u64,
execution_time_ms: u64::try_from(start_time.elapsed().as_millis())
.unwrap_or(u64::MAX),
logs: script_result.logs,
})
}
Expand Down
5 changes: 3 additions & 2 deletions crates/mofa-extra/src/rhai/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,8 @@ impl ScriptWorkflowNode {
success: true,
output: script_result.value,
error: None,
execution_time_ms: start_time.elapsed().as_millis() as u64,
execution_time_ms: u64::try_from(start_time.elapsed().as_millis())
.unwrap_or(u64::MAX),
retry_count,
logs: script_result.logs,
});
Expand All @@ -285,7 +286,7 @@ impl ScriptWorkflowNode {
success: false,
output: serde_json::Value::Null,
error: last_error,
execution_time_ms: start_time.elapsed().as_millis() as u64,
execution_time_ms: u64::try_from(start_time.elapsed().as_millis()).unwrap_or(u64::MAX),
retry_count: retry_count.saturating_sub(1),
logs: Vec::new(),
})
Expand Down
2 changes: 1 addition & 1 deletion crates/mofa-foundation/src/adapter/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ impl DeferredRequest {

/// Get wait time in milliseconds
pub fn wait_time_ms(&self) -> u64 {
self.enqueued_at.elapsed().as_millis() as u64
u64::try_from(self.enqueued_at.elapsed().as_millis()).unwrap_or(u64::MAX)
}

/// Increment retry count
Expand Down
2 changes: 1 addition & 1 deletion crates/mofa-foundation/src/agent/components/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ impl SimpleToolRegistry {
steps: vec![ExecutionStep {
step_id: format!("tool:{}", tool_name),
step_type: "tool_call".to_string(),
timestamp_ms: chrono::Utc::now().timestamp_millis() as u64,
timestamp_ms: mofa_kernel::utils::chrono_now_ms(),
input: Some(args_json.clone()),
output: None,
metadata: HashMap::new(),
Expand Down
20 changes: 4 additions & 16 deletions crates/mofa-foundation/src/agent/context/rich.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,7 @@ impl ExecutionMetrics {
/// 创建新的指标
/// Create new metrics
pub fn new() -> Self {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let now = mofa_kernel::utils::now_ms();

Self {
start_time_ms: now,
Expand All @@ -64,10 +61,7 @@ impl ExecutionMetrics {
/// 获取执行时长 (毫秒)
/// Get execution duration (ms)
pub fn duration_ms(&self) -> u64 {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let now = mofa_kernel::utils::now_ms();

self.end_time_ms.unwrap_or(now) - self.start_time_ms
}
Expand Down Expand Up @@ -131,10 +125,7 @@ impl RichAgentContext {
/// 记录组件输出
/// Record component output
pub async fn record_output(&self, component: impl Into<String>, output: serde_json::Value) {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let now = mofa_kernel::utils::now_ms();

let mut outputs = self.outputs.write().await;
outputs.push(ComponentOutput {
Expand Down Expand Up @@ -185,10 +176,7 @@ impl RichAgentContext {
/// 结束执行 (记录结束时间)
/// Finish execution (record end time)
pub async fn finish(&self) {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let now = mofa_kernel::utils::now_ms();

let mut metrics = self.metrics.write().await;
metrics.end_time_ms = Some(now);
Expand Down
12 changes: 6 additions & 6 deletions crates/mofa-foundation/src/collaboration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ impl CollaborationProtocol for RequestResponseProtocol {
// Received and processed request from {}
};

let duration = start.elapsed().as_millis() as u64;
let duration = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);

Ok(
CollaborationResult::success(content, duration, CollaborationMode::RequestResponse)
Expand Down Expand Up @@ -383,7 +383,7 @@ impl CollaborationProtocol for PublishSubscribeProtocol {
// Message published to topic {:?}
};

let duration = start.elapsed().as_millis() as u64;
let duration = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);

Ok(
CollaborationResult::success(content, duration, CollaborationMode::PublishSubscribe)
Expand Down Expand Up @@ -510,7 +510,7 @@ impl CollaborationProtocol for ConsensusProtocol {
// Participated in consensus decision, threshold: {}
};

let duration = start.elapsed().as_millis() as u64;
let duration = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);

Ok(
CollaborationResult::success(content, duration, CollaborationMode::Consensus)
Expand Down Expand Up @@ -633,7 +633,7 @@ impl CollaborationProtocol for DebateProtocol {
// Participated in debate, max rounds: {}
};

let duration = start.elapsed().as_millis() as u64;
let duration = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);

Ok(
CollaborationResult::success(content, duration, CollaborationMode::Debate)
Expand Down Expand Up @@ -759,7 +759,7 @@ impl CollaborationProtocol for ParallelProtocol {
))
};

let duration = start.elapsed().as_millis() as u64;
let duration = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);

Ok(
CollaborationResult::success(content, duration, CollaborationMode::Parallel)
Expand Down Expand Up @@ -882,7 +882,7 @@ impl CollaborationProtocol for SequentialProtocol {
// Sequential task chain executed
};

let duration = start.elapsed().as_millis() as u64;
let duration = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);

Ok(
CollaborationResult::success(content, duration, CollaborationMode::Sequential)
Expand Down
7 changes: 2 additions & 5 deletions crates/mofa-foundation/src/collaboration/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,7 @@ impl CollaborationMessage {
content: impl Into<CollaborationContent>,
mode: CollaborationMode,
) -> Self {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let now = mofa_kernel::utils::now_ms();

Self {
id: Uuid::now_v7().to_string(),
Expand Down Expand Up @@ -785,7 +782,7 @@ impl LLMDrivenCollaborationManager {
// Process message
let result = protocol.process_message(msg).await;

let duration = start.elapsed().as_millis() as u64;
let duration = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);

match result {
Ok(mut result) => {
Expand Down
Loading
Loading