-
Notifications
You must be signed in to change notification settings - Fork 0
GH-28: Add Job Manager #31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughAdds a new workspace crate Changes
Sequence Diagram(s)sequenceDiagram
participant Worker
participant Storage
participant JobRegistry
participant TaskExecutor
participant Metrics
Worker->>Storage: find_job_meta(job_code)
Storage-->>Worker: JobMeta
opt cache miss / stale
Worker->>Storage: get_job_by_meta(meta)
Storage-->>Worker: Job
end
Worker->>Worker: pick_task_to_execute()
Worker->>Worker: start_task(task, worker_id)
Worker->>JobRegistry: get_task_executor(job_code, task_code)
JobRegistry-->>Worker: TaskExecutorFn
Worker->>TaskExecutor: executor(immutable_task, job_manager, cancel_token)
TaskExecutor-->>Worker: Result<(), Error>
alt Success
Worker->>Worker: complete_task(task_id, output)
Worker->>Metrics: record_task_processed(Completed)
else Failure
Worker->>Worker: fail_task(task_id, error_msg)
Worker->>Metrics: record_task_processed(Failed)
end
Worker->>Storage: save_job(updated_job, cancel_token)
alt Conflict (ConcurrentModification)
Storage-->>Worker: ConcurrentModification
Worker->>Storage: get_job_by_meta(updated_meta)
Storage-->>Worker: Job
Worker->>Worker: merge_with_processed_task()
Worker->>Storage: save_job(merged_job)
else Saved
Storage-->>Worker: OK
Worker->>Metrics: record_job_iteration_complete()
end
sequenceDiagram
participant JobsManager
participant Worker1
participant Worker2
participant Cache
participant Storage
JobsManager->>JobsManager: start()
JobsManager->>Worker1: spawn(...)
JobsManager->>Worker2: spawn(...)
par Worker1 loop
Worker1->>Cache: get_job(job_code)
alt Cache miss
Worker1->>Storage: get_job_by_meta(meta)
Storage-->>Worker1: Job
Worker1->>Cache: update_cache(job)
else Cache hit
Cache-->>Worker1: job
end
Worker1->>Worker1: execute_task()
Worker1->>Storage: save_job()
and Worker2 loop
Worker2->>Cache: get_job(job_code)
alt Cache stale / conflict observed
Worker2->>Storage: get_job_by_meta(meta)
Storage-->>Worker2: Job
Worker2->>Cache: invalidate & update
else Cache valid
Cache-->>Worker2: job
end
Worker2->>Worker2: execute_task()
Worker2->>Storage: save_job()
end
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
49aafd0 to
ac6204e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 9
🧹 Nitpick comments (18)
crates/icegate-jobmanager/examples/docker-compose.yml (1)
18-18: Consider pinning theminio/mcimage version.The MinIO server image is pinned to a specific release for reproducibility, but
minio/mc:latestmay drift over time. Consider pinning to a compatible release for consistent behavior across development environments.crates/icegate-jobmanager/src/infra/retrier.rs (1)
90-105: Potential panic ifdelaysis empty.While
Defaultprovides a non-emptydelaysvector, if a user constructsRetrierConfigwith an emptydelays, line 94'sunwrap_orwould return the fallback, but line 92's index access could still be an issue ifattemptis 0 anddelaysis empty (though this wouldn't happen sinceattemptstarts at 1 after increment).However, consider adding validation in
RetrierConfigconstruction or documenting thatdelaysmust not be empty.crates/icegate-jobmanager/src/tests/shutdown_test.rs (1)
25-30: Consider renaming shadowed variable for clarity.Line 30 shadows
started_txfrom line 26. While this works correctly, using a distinct name (e.g.,started_tx_clone) would improve readability and make the ownership flow clearer.🔎 Suggested rename
let (started_tx, started_rx) = oneshot::channel(); let started_tx = Arc::new(Mutex::new(Some(started_tx))); let cancelled = Arc::new(AtomicBool::new(false)); let cancelled_flag = Arc::clone(&cancelled); - let started_tx = Arc::clone(&started_tx); + let started_tx_clone = Arc::clone(&started_tx); let executor: TaskExecutorFn = Arc::new(move |task, _manager, cancel_token| { let cancelled_flag = Arc::clone(&cancelled_flag); - let started_tx = Arc::clone(&started_tx); + let started_tx = Arc::clone(&started_tx_clone);crates/icegate-jobmanager/src/tests/dynamic_task_test.rs (1)
28-28: Consider usingusizefordynamic_task_count.The variable is used as a loop bound and for length comparisons (
tasks.len()), which returnusize. Usingusizewould eliminate the need for conversions at lines 125 and 141.🔎 Suggested change
- let dynamic_task_count = 5; + let dynamic_task_count: usize = 5;Then adjust the assertion:
assert_eq!( dynamic_tasks_executed.load(Ordering::SeqCst), - i32::try_from(dynamic_task_count)?, + dynamic_task_count as i32, "all dynamic tasks should be executed" );crates/icegate-jobmanager/src/tests/common/minio_env.rs (1)
40-48: TCP readiness check may be insufficient.The TCP connection check confirms the port is open but doesn't guarantee MinIO's S3 API is fully initialized. Consider adding an HTTP health check to
/minio/health/livefor more reliable readiness detection, especially under slow CI environments.That said, the current approach works in practice since
WaitFor::seconds(1)provides initial delay and subsequent S3 operations will retry on failure.crates/icegate-jobmanager/examples/json_model_job.rs (1)
107-112: Remove leftover comments.Lines 107 and 112 contain cleanup notes that should be removed before merging.
🔎 Proposed fix
let job_registry = Arc::new(JobRegistry::new(vec![job_def.clone()])?); - // retrier was unused. let s3_storage = Arc::new(S3Storage::new(s3_config, job_registry.clone(), Metrics::new_disabled()).await?); let cached_storage = Arc::new(CachedStorage::new(s3_storage, Metrics::new_disabled())); - // job_code was unused. - // 2. Start Managercrates/icegate-jobmanager/src/infra/metrics.rs (1)
77-108: Consider usingDisplayinstead ofDebugfor status labels.Lines 85 and 105 use
format!("{status:?}")which produces Debug output (e.g.,Startedvs a lowercasestarted). SinceJobStatusandTaskStatusimplementDisplay(per the relevant code snippets), usingformat!("{status}")would produce more consistent, human-readable metric labels.🔎 Proposed fix
self.job_duration.record( duration.as_secs_f64(), &[ KeyValue::new("code", code.to_string()), - KeyValue::new("status", format!("{status:?}")), + KeyValue::new("status", status.to_string()), ], );self.task_duration.record( duration.as_secs_f64(), &[ KeyValue::new("job_code", job_code.to_string()), KeyValue::new("task_code", task_code.to_string()), - KeyValue::new("status", format!("{status:?}")), + KeyValue::new("status", status.to_string()), ], );crates/icegate-jobmanager/src/tests/deadline_expiry_test.rs (1)
43-46: Timing-sensitive test logic.The test relies on specific timing (500ms sleep vs 100ms deadline) to trigger deadline expiry. While the margin is reasonable (5x), consider adding a comment explaining this relationship for maintainability.
crates/icegate-jobmanager/src/storage/cached.rs (1)
11-16: Acknowledged: TODO for cache eviction.The TODO on line 15 about TTL or LRU cache is important for production use to prevent unbounded memory growth. Consider creating an issue to track this.
Would you like me to open an issue to track implementing TTL or LRU eviction for the cache?
crates/icegate-jobmanager/src/tests/common/manager_env.rs (1)
36-80: CancellationToken is local-only and cannot be externally triggered.The
cancel_tokencreated at line 38 is only used to pass toget_job()calls but is never exposed for external cancellation. If you want to support cancelling the wait from outside (e.g., on test failure), consider accepting aCancellationTokenparameter or checkingmanager_handle's cancellation status.For a test helper, this is likely acceptable, but be aware that the only way to break out of this loop early is via timeout.
crates/icegate-jobmanager/src/tests/concurrent_workers_test.rs (1)
190-194: Consider clarifying the PUT count formula in the comment.The formula
(((secondary_task_count + 1) * 2) + 1)is correct (1 job creation + 2 PUTs per task), but the comment "1 PUT for create job, 2 PUT for each task" could explicitly mention this includes both task creation and completion PUTs. Minor clarity improvement.crates/icegate-jobmanager/src/lib.rs (1)
1-3: Temporary lint allows noted.The
#![allow(missing_docs)],#![allow(dead_code)], and#![allow(clippy::redundant_pub_crate)]are reasonable for early development. Consider adding a TODO comment to track removal of these allows once the crate stabilizes.crates/icegate-jobmanager/src/core/registry.rs (1)
80-82: Task executor key format could theoretically collide.The key format
"{job_code}:{task_code}"could have collisions if job or task codes contain the:delimiter. For example, job"a:b"with task"c"produces the same key as job"a"with task"b:c".Consider using a delimiter unlikely to appear in codes, or validate that codes don't contain the delimiter.
🔎 Alternative delimiter approach
fn task_executor_key(job_code: &JobCode, task_code: &TaskCode) -> String { - format!("{job_code}:{task_code}") + format!("{job_code}\0{task_code}") }Or validate codes during registration to reject
:characters.crates/icegate-jobmanager/src/execution/worker.rs (1)
462-484: Inconsistent handling of failed task save result.When a task fails (line 462), the result of
save_processed_taskis partially discarded with_ = .... The?operator at line 484 propagates errors, but the returnedJobis discarded. In contrast, the success path (lines 494-514) uses the returned job for further processing.This asymmetry may be intentional (failed task state doesn't need further processing), but consider adding a brief comment explaining why the returned job is unused here.
crates/icegate-jobmanager/src/core/task.rs (2)
249-255: Good defensive logic incan_be_picked_up, but note the TODO.The method correctly allows picking up Todo, Failed, or expired Started tasks. The TODO at line 251 about limiting retry attempts is important for preventing infinite retry loops on consistently failing tasks.
Would you like me to open an issue to track implementing the attempt limit for task retries?
142-173: Consider a builder pattern forrestore.The
restorefunction has 13 parameters, which is acknowledged with#[allow(clippy::too_many_arguments)]. For better maintainability, consider a builder pattern or aTaskSnapshotstruct that captures persisted state.crates/icegate-jobmanager/src/core/job.rs (1)
246-261: Thenext_iterationmethod's self-replacement pattern is unusual but functional.The approach of calling
Self::new(...)and then overwritingselfworks, but it's slightly confusing becauseidanditer_numare immediately overwritten after construction. Consider extracting the initialization logic into a helper or using a more explicit approach.🔎 Alternative approach
pub(crate) fn next_iteration( &mut self, tasks: Vec<Task>, worker_id: String, max_iterations: u64, ) -> Result<(), JobError> { if !self.is_ready_to_next_iteration() { return Err(JobError::Other("job is not ready to next iteration".into())); } self.status.transition_to(JobStatus::Started)?; - let old_id = self.id.clone(); - let old_iter_num = self.iter_num; - let old_metadata = self.metadata.clone(); - - *self = Self::new(self.code.clone(), tasks, old_metadata, worker_id, max_iterations); - self.id = old_id; - // TODO(low): ... - self.iter_num = old_iter_num + 1; - self.started_at = Utc::now(); + // Reset task state for new iteration + self.tasks_by_id.clear(); + for task in tasks { + self.tasks_by_id.insert(task.id().to_string(), Arc::new(task)); + } + self.iter_num += 1; + self.updated_by_worker_id = worker_id; + self.started_at = Utc::now(); + self.running_at = None; + self.completed_at = None; + self.max_iterations = max_iterations; Ok(()) }crates/icegate-jobmanager/Cargo.toml (1)
44-49: Consider workspace-level dependency definitions for potential reuse.The direct dependencies (
futures-util,parking_lot,async-trait,dashmap) are reasonable choices for job orchestration. If these utilities are expected to be used by other workspace crates in the future, consider moving them to workspace dependencies for version consistency.For now, crate-specific definitions are acceptable.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (40)
.gitignoreCargo.tomlMakefilecrates/icegate-jobmanager/Cargo.tomlcrates/icegate-jobmanager/Makefilecrates/icegate-jobmanager/examples/docker-compose.ymlcrates/icegate-jobmanager/examples/json_model_job.rscrates/icegate-jobmanager/examples/simple_job.rscrates/icegate-jobmanager/examples/simple_sequence_job.rscrates/icegate-jobmanager/src/core/error.rscrates/icegate-jobmanager/src/core/job.rscrates/icegate-jobmanager/src/core/mod.rscrates/icegate-jobmanager/src/core/registry.rscrates/icegate-jobmanager/src/core/task.rscrates/icegate-jobmanager/src/execution/job_manager.rscrates/icegate-jobmanager/src/execution/jobs_manager.rscrates/icegate-jobmanager/src/execution/mod.rscrates/icegate-jobmanager/src/execution/worker.rscrates/icegate-jobmanager/src/infra/metrics.rscrates/icegate-jobmanager/src/infra/mod.rscrates/icegate-jobmanager/src/infra/retrier.rscrates/icegate-jobmanager/src/lib.rscrates/icegate-jobmanager/src/storage/cached.rscrates/icegate-jobmanager/src/storage/mod.rscrates/icegate-jobmanager/src/storage/s3.rscrates/icegate-jobmanager/src/tests/cache_invalidation_test.rscrates/icegate-jobmanager/src/tests/common/in_memory_storage.rscrates/icegate-jobmanager/src/tests/common/manager_env.rscrates/icegate-jobmanager/src/tests/common/minio_env.rscrates/icegate-jobmanager/src/tests/common/mod.rscrates/icegate-jobmanager/src/tests/common/storage_wrapper.rscrates/icegate-jobmanager/src/tests/concurrent_workers_test.rscrates/icegate-jobmanager/src/tests/deadline_expiry_test.rscrates/icegate-jobmanager/src/tests/dynamic_task_test.rscrates/icegate-jobmanager/src/tests/job_iterations_test.rscrates/icegate-jobmanager/src/tests/mod.rscrates/icegate-jobmanager/src/tests/shutdown_test.rscrates/icegate-jobmanager/src/tests/simple_job_test.rscrates/icegate-jobmanager/src/tests/task_failure_test.rscrates/icegate-jobmanager/src/tests/two_jobs_test.rs
🧰 Additional context used
📓 Path-based instructions (4)
**/*.{rs,toml}
📄 CodeRabbit inference engine (AGENTS.md)
Use
cargo buildfor debug builds,cargo build --releasefor release builds, and specific binary builds withcargo build --bin <name>
Files:
crates/icegate-jobmanager/src/tests/cache_invalidation_test.rscrates/icegate-jobmanager/src/infra/mod.rscrates/icegate-jobmanager/src/core/mod.rscrates/icegate-jobmanager/src/tests/mod.rscrates/icegate-jobmanager/src/tests/shutdown_test.rscrates/icegate-jobmanager/src/tests/task_failure_test.rscrates/icegate-jobmanager/examples/simple_job.rscrates/icegate-jobmanager/src/tests/common/mod.rscrates/icegate-jobmanager/src/tests/dynamic_task_test.rscrates/icegate-jobmanager/src/storage/mod.rscrates/icegate-jobmanager/src/tests/common/manager_env.rscrates/icegate-jobmanager/src/tests/simple_job_test.rscrates/icegate-jobmanager/src/tests/deadline_expiry_test.rscrates/icegate-jobmanager/src/storage/cached.rscrates/icegate-jobmanager/examples/json_model_job.rscrates/icegate-jobmanager/src/tests/common/minio_env.rscrates/icegate-jobmanager/src/tests/concurrent_workers_test.rscrates/icegate-jobmanager/src/tests/two_jobs_test.rscrates/icegate-jobmanager/Cargo.tomlcrates/icegate-jobmanager/examples/simple_sequence_job.rscrates/icegate-jobmanager/src/execution/mod.rscrates/icegate-jobmanager/src/tests/common/in_memory_storage.rscrates/icegate-jobmanager/src/execution/jobs_manager.rscrates/icegate-jobmanager/src/infra/metrics.rscrates/icegate-jobmanager/src/execution/job_manager.rscrates/icegate-jobmanager/src/core/registry.rscrates/icegate-jobmanager/src/tests/job_iterations_test.rsCargo.tomlcrates/icegate-jobmanager/src/core/job.rscrates/icegate-jobmanager/src/tests/common/storage_wrapper.rscrates/icegate-jobmanager/src/execution/worker.rscrates/icegate-jobmanager/src/core/error.rscrates/icegate-jobmanager/src/core/task.rscrates/icegate-jobmanager/src/lib.rscrates/icegate-jobmanager/src/storage/s3.rscrates/icegate-jobmanager/src/infra/retrier.rs
**/*.rs
📄 CodeRabbit inference engine (AGENTS.md)
**/*.rs: Run all tests withcargo test, specific tests withcargo test test_name, and use--nocaptureflag to show test output
Usemake fmtto check code format; DO NOT run via rustup because it doesn't respect rustfmt.toml
Usemake clippyto run the linter with warnings as errors
Runmake auditto perform security audits and usemake installto install cargo-audit
Runmake cito execute all CI checks (check, fmt, clippy, test, audit)
Use rustfmt for code formatting with configuration in rustfmt.toml
Files:
crates/icegate-jobmanager/src/tests/cache_invalidation_test.rscrates/icegate-jobmanager/src/infra/mod.rscrates/icegate-jobmanager/src/core/mod.rscrates/icegate-jobmanager/src/tests/mod.rscrates/icegate-jobmanager/src/tests/shutdown_test.rscrates/icegate-jobmanager/src/tests/task_failure_test.rscrates/icegate-jobmanager/examples/simple_job.rscrates/icegate-jobmanager/src/tests/common/mod.rscrates/icegate-jobmanager/src/tests/dynamic_task_test.rscrates/icegate-jobmanager/src/storage/mod.rscrates/icegate-jobmanager/src/tests/common/manager_env.rscrates/icegate-jobmanager/src/tests/simple_job_test.rscrates/icegate-jobmanager/src/tests/deadline_expiry_test.rscrates/icegate-jobmanager/src/storage/cached.rscrates/icegate-jobmanager/examples/json_model_job.rscrates/icegate-jobmanager/src/tests/common/minio_env.rscrates/icegate-jobmanager/src/tests/concurrent_workers_test.rscrates/icegate-jobmanager/src/tests/two_jobs_test.rscrates/icegate-jobmanager/examples/simple_sequence_job.rscrates/icegate-jobmanager/src/execution/mod.rscrates/icegate-jobmanager/src/tests/common/in_memory_storage.rscrates/icegate-jobmanager/src/execution/jobs_manager.rscrates/icegate-jobmanager/src/infra/metrics.rscrates/icegate-jobmanager/src/execution/job_manager.rscrates/icegate-jobmanager/src/core/registry.rscrates/icegate-jobmanager/src/tests/job_iterations_test.rscrates/icegate-jobmanager/src/core/job.rscrates/icegate-jobmanager/src/tests/common/storage_wrapper.rscrates/icegate-jobmanager/src/execution/worker.rscrates/icegate-jobmanager/src/core/error.rscrates/icegate-jobmanager/src/core/task.rscrates/icegate-jobmanager/src/lib.rscrates/icegate-jobmanager/src/storage/s3.rscrates/icegate-jobmanager/src/infra/retrier.rs
**/*.{rs,toml,md}
📄 CodeRabbit inference engine (AGENTS.md)
Ensure each file ends with a newline; do not duplicate if it already exists
Files:
crates/icegate-jobmanager/src/tests/cache_invalidation_test.rscrates/icegate-jobmanager/src/infra/mod.rscrates/icegate-jobmanager/src/core/mod.rscrates/icegate-jobmanager/src/tests/mod.rscrates/icegate-jobmanager/src/tests/shutdown_test.rscrates/icegate-jobmanager/src/tests/task_failure_test.rscrates/icegate-jobmanager/examples/simple_job.rscrates/icegate-jobmanager/src/tests/common/mod.rscrates/icegate-jobmanager/src/tests/dynamic_task_test.rscrates/icegate-jobmanager/src/storage/mod.rscrates/icegate-jobmanager/src/tests/common/manager_env.rscrates/icegate-jobmanager/src/tests/simple_job_test.rscrates/icegate-jobmanager/src/tests/deadline_expiry_test.rscrates/icegate-jobmanager/src/storage/cached.rscrates/icegate-jobmanager/examples/json_model_job.rscrates/icegate-jobmanager/src/tests/common/minio_env.rscrates/icegate-jobmanager/src/tests/concurrent_workers_test.rscrates/icegate-jobmanager/src/tests/two_jobs_test.rscrates/icegate-jobmanager/Cargo.tomlcrates/icegate-jobmanager/examples/simple_sequence_job.rscrates/icegate-jobmanager/src/execution/mod.rscrates/icegate-jobmanager/src/tests/common/in_memory_storage.rscrates/icegate-jobmanager/src/execution/jobs_manager.rscrates/icegate-jobmanager/src/infra/metrics.rscrates/icegate-jobmanager/src/execution/job_manager.rscrates/icegate-jobmanager/src/core/registry.rscrates/icegate-jobmanager/src/tests/job_iterations_test.rsCargo.tomlcrates/icegate-jobmanager/src/core/job.rscrates/icegate-jobmanager/src/tests/common/storage_wrapper.rscrates/icegate-jobmanager/src/execution/worker.rscrates/icegate-jobmanager/src/core/error.rscrates/icegate-jobmanager/src/core/task.rscrates/icegate-jobmanager/src/lib.rscrates/icegate-jobmanager/src/storage/s3.rscrates/icegate-jobmanager/src/infra/retrier.rs
Cargo.toml
📄 CodeRabbit inference engine (AGENTS.md)
Configure strict clippy and rustc lints: forbid unsafe_code, deny missing_docs and dead_code, and enable clippy pedantic/nursery
Files:
Cargo.toml
🧠 Learnings (11)
📓 Common learnings
Learnt from: CR
Repo: icegatetech/icegate PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-27T13:38:58.955Z
Learning: Organize code in Cargo workspace with 4 crates: icegate-common, icegate-query, icegate-ingest, and icegate-maintain
📚 Learning: 2025-12-27T13:38:58.955Z
Learnt from: CR
Repo: icegatetech/icegate PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-27T13:38:58.955Z
Learning: Organize code in Cargo workspace with 4 crates: icegate-common, icegate-query, icegate-ingest, and icegate-maintain
Applied to files:
crates/icegate-jobmanager/src/core/mod.rscrates/icegate-jobmanager/src/tests/mod.rscrates/icegate-jobmanager/Makefilecrates/icegate-jobmanager/src/tests/common/mod.rscrates/icegate-jobmanager/src/storage/mod.rscrates/icegate-jobmanager/examples/json_model_job.rscrates/icegate-jobmanager/Cargo.tomlcrates/icegate-jobmanager/src/execution/mod.rscrates/icegate-jobmanager/src/execution/jobs_manager.rsCargo.tomlcrates/icegate-jobmanager/src/execution/worker.rs
📚 Learning: 2025-12-27T13:38:58.955Z
Learnt from: CR
Repo: icegatetech/icegate PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-27T13:38:58.955Z
Learning: Applies to **/*.rs : Use `make clippy` to run the linter with warnings as errors
Applied to files:
MakefileCargo.toml
📚 Learning: 2025-12-27T13:38:58.955Z
Learnt from: CR
Repo: icegatetech/icegate PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-27T13:38:58.955Z
Learning: Applies to **/*.rs : Run `make ci` to execute all CI checks (check, fmt, clippy, test, audit)
Applied to files:
Makefilecrates/icegate-jobmanager/Makefile
📚 Learning: 2025-12-27T13:38:58.955Z
Learnt from: CR
Repo: icegatetech/icegate PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-27T13:38:58.955Z
Learning: Applies to **/*.rs : Run `make audit` to perform security audits and use `make install` to install cargo-audit
Applied to files:
Makefilecrates/icegate-jobmanager/Makefile
📚 Learning: 2025-12-27T13:38:58.955Z
Learnt from: CR
Repo: icegatetech/icegate PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-27T13:38:58.955Z
Learning: Applies to **/*.rs : Use `make fmt` to check code format; DO NOT run via rustup because it doesn't respect rustfmt.toml
Applied to files:
MakefileCargo.toml
📚 Learning: 2025-12-27T13:38:58.955Z
Learnt from: CR
Repo: icegatetech/icegate PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-27T13:38:58.955Z
Learning: Applies to Cargo.toml : Configure strict clippy and rustc lints: forbid unsafe_code, deny missing_docs and dead_code, and enable clippy pedantic/nursery
Applied to files:
MakefileCargo.toml
📚 Learning: 2025-12-27T13:38:58.955Z
Learnt from: CR
Repo: icegatetech/icegate PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-27T13:38:58.955Z
Learning: Applies to clippy.toml : Set clippy thresholds in clippy.toml: cognitive-complexity=30, too-many-arguments=8, too-many-lines=150
Applied to files:
MakefileCargo.toml
📚 Learning: 2025-12-27T13:38:58.955Z
Learnt from: CR
Repo: icegatetech/icegate PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-27T13:38:58.955Z
Learning: Applies to **/*.{rs,toml} : Use `cargo build` for debug builds, `cargo build --release` for release builds, and specific binary builds with `cargo build --bin <name>`
Applied to files:
Makefile
📚 Learning: 2025-12-27T13:38:58.955Z
Learnt from: CR
Repo: icegatetech/icegate PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-27T13:38:58.955Z
Learning: Applies to **/*.rs : Use rustfmt for code formatting with configuration in rustfmt.toml
Applied to files:
MakefileCargo.toml
📚 Learning: 2025-12-27T13:38:58.955Z
Learnt from: CR
Repo: icegatetech/icegate PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-27T13:38:58.955Z
Learning: Applies to **/*.rs : Run all tests with `cargo test`, specific tests with `cargo test test_name`, and use `--nocapture` flag to show test output
Applied to files:
Makefilecrates/icegate-jobmanager/Makefile
🧬 Code graph analysis (17)
crates/icegate-jobmanager/src/tests/cache_invalidation_test.rs (3)
crates/icegate-jobmanager/src/infra/metrics.rs (2)
new(36-75)new_disabled(22-34)crates/icegate-jobmanager/src/storage/cached.rs (1)
new(26-32)crates/icegate-jobmanager/src/tests/common/in_memory_storage.rs (4)
new(16-23)version(25-27)find_meta_calls(29-31)get_by_meta_calls(33-35)
crates/icegate-jobmanager/src/tests/task_failure_test.rs (3)
crates/icegate-jobmanager/src/storage/s3.rs (1)
new(99-162)crates/icegate-jobmanager/src/tests/common/manager_env.rs (2)
new(16-33)storage(90-92)crates/icegate-jobmanager/src/tests/common/minio_env.rs (1)
new(20-56)
crates/icegate-jobmanager/examples/simple_job.rs (3)
crates/icegate-jobmanager/src/core/job.rs (4)
new(15-17)new(103-131)new(168-195)task_executors(141-143)crates/icegate-jobmanager/src/core/task.rs (3)
new(13-15)new(71-76)new(124-140)crates/icegate-jobmanager/src/storage/s3.rs (1)
new(99-162)
crates/icegate-jobmanager/src/tests/dynamic_task_test.rs (5)
crates/icegate-jobmanager/src/core/job.rs (3)
new(15-17)new(103-131)new(168-195)crates/icegate-jobmanager/src/core/task.rs (3)
new(13-15)new(71-76)new(124-140)crates/icegate-jobmanager/src/storage/s3.rs (1)
new(99-162)crates/icegate-jobmanager/src/tests/common/manager_env.rs (2)
new(16-33)storage(90-92)crates/icegate-jobmanager/src/tests/common/minio_env.rs (1)
new(20-56)
crates/icegate-jobmanager/src/storage/mod.rs (2)
crates/icegate-jobmanager/src/core/error.rs (2)
cancelled(85-87)max_attempts(89-91)crates/icegate-jobmanager/src/infra/retrier.rs (2)
cancelled(44-44)max_attempts(45-45)
crates/icegate-jobmanager/src/tests/common/manager_env.rs (6)
crates/icegate-jobmanager/src/execution/worker.rs (2)
new(98-115)start(121-155)crates/icegate-jobmanager/src/core/job.rs (3)
new(15-17)new(103-131)new(168-195)crates/icegate-jobmanager/src/execution/job_manager.rs (1)
new(26-28)crates/icegate-jobmanager/src/execution/jobs_manager.rs (2)
new(78-90)start(93-124)crates/icegate-jobmanager/src/infra/metrics.rs (2)
new(36-75)new_disabled(22-34)crates/icegate-jobmanager/src/tests/common/minio_env.rs (1)
new(20-56)
crates/icegate-jobmanager/src/storage/cached.rs (4)
crates/icegate-jobmanager/src/core/job.rs (9)
new(15-17)new(103-131)new(168-195)code(133-135)code(330-332)id(326-328)iter_num(334-336)version(342-344)tasks_as_string(505-524)crates/icegate-jobmanager/src/infra/metrics.rs (3)
new(36-75)record_cache_hit(123-128)record_cache_miss(130-135)crates/icegate-jobmanager/src/storage/s3.rs (5)
new(99-162)get_job(399-426)get_job_by_meta(428-466)find_job_meta(468-508)save_job(511-583)crates/icegate-jobmanager/src/storage/mod.rs (5)
get_job(87-87)get_job(102-102)get_job_by_meta(90-90)find_job_meta(93-93)save_job(97-97)
crates/icegate-jobmanager/examples/json_model_job.rs (9)
crates/icegate-jobmanager/src/core/job.rs (5)
fmt(25-27)fmt(55-62)new(15-17)new(103-131)new(168-195)crates/icegate-jobmanager/src/core/task.rs (7)
fmt(23-25)fmt(51-58)new(13-15)new(71-76)new(124-140)input(83-85)input(216-218)crates/icegate-jobmanager/src/execution/jobs_manager.rs (2)
default(16-21)new(78-90)crates/icegate-jobmanager/src/infra/retrier.rs (2)
default(19-36)new(49-51)crates/icegate-jobmanager/src/infra/metrics.rs (2)
new(36-75)new_disabled(22-34)crates/icegate-jobmanager/src/storage/cached.rs (1)
new(26-32)crates/icegate-jobmanager/src/storage/s3.rs (1)
new(99-162)crates/icegate-jobmanager/src/tests/common/in_memory_storage.rs (1)
new(16-23)crates/icegate-jobmanager/src/tests/common/manager_env.rs (1)
new(16-33)
crates/icegate-jobmanager/src/tests/two_jobs_test.rs (6)
crates/icegate-jobmanager/src/core/job.rs (6)
max_iterations(145-147)new(15-17)new(103-131)new(168-195)from(31-33)from(37-39)crates/icegate-jobmanager/src/execution/worker.rs (2)
new(98-115)default(33-40)crates/icegate-jobmanager/src/infra/metrics.rs (2)
new(36-75)new_disabled(22-34)crates/icegate-jobmanager/src/storage/s3.rs (1)
new(99-162)crates/icegate-jobmanager/src/tests/common/manager_env.rs (2)
new(16-33)storage(90-92)crates/icegate-jobmanager/src/core/registry.rs (1)
new(29-58)
crates/icegate-jobmanager/src/tests/common/in_memory_storage.rs (1)
crates/icegate-jobmanager/src/storage/mod.rs (5)
get_job(87-87)get_job(102-102)get_job_by_meta(90-90)find_job_meta(93-93)save_job(97-97)
crates/icegate-jobmanager/src/execution/jobs_manager.rs (3)
crates/icegate-jobmanager/src/execution/worker.rs (2)
new(98-115)start(121-155)crates/icegate-jobmanager/src/infra/metrics.rs (1)
new(36-75)crates/icegate-jobmanager/src/core/registry.rs (1)
new(29-58)
crates/icegate-jobmanager/src/infra/metrics.rs (3)
crates/icegate-jobmanager/src/core/job.rs (5)
new(15-17)new(103-131)new(168-195)code(133-135)code(330-332)crates/icegate-jobmanager/src/core/task.rs (7)
new(13-15)new(71-76)new(124-140)code(78-80)code(95-95)code(180-182)code(315-317)crates/icegate-jobmanager/src/storage/cached.rs (3)
new(26-32)record_cache_hit(46-48)record_cache_miss(50-52)
crates/icegate-jobmanager/src/execution/job_manager.rs (2)
crates/icegate-jobmanager/src/core/job.rs (10)
add_task(263-275)complete_task(287-292)fail_task(294-299)get_task(492-495)get_tasks_by_code(497-503)code(133-135)code(330-332)new(15-17)new(103-131)new(168-195)crates/icegate-jobmanager/src/core/task.rs (9)
output(220-222)error_msg(224-226)code(78-80)code(95-95)code(180-182)code(315-317)new(13-15)new(71-76)new(124-140)
crates/icegate-jobmanager/src/core/job.rs (3)
crates/icegate-jobmanager/src/core/task.rs (18)
new(13-15)new(71-76)new(124-140)code(78-80)code(95-95)code(180-182)code(315-317)as_str(17-19)from(29-31)from(35-37)id(94-94)id(176-178)id(311-313)status(184-186)started_at(204-206)completed_at(208-210)output(220-222)error_msg(224-226)crates/icegate-jobmanager/src/core/registry.rs (1)
new(29-58)crates/icegate-jobmanager/src/core/error.rs (3)
from(60-62)from(66-71)from(75-81)
crates/icegate-jobmanager/src/core/error.rs (2)
crates/icegate-jobmanager/src/infra/retrier.rs (2)
cancelled(44-44)max_attempts(45-45)crates/icegate-jobmanager/src/storage/mod.rs (2)
cancelled(63-65)max_attempts(67-69)
crates/icegate-jobmanager/src/core/task.rs (1)
crates/icegate-jobmanager/src/core/error.rs (3)
from(60-62)from(66-71)from(75-81)
crates/icegate-jobmanager/src/infra/retrier.rs (2)
crates/icegate-jobmanager/src/core/error.rs (2)
cancelled(85-87)max_attempts(89-91)crates/icegate-jobmanager/src/storage/mod.rs (2)
cancelled(63-65)max_attempts(67-69)
🪛 checkmake (0.2.2)
crates/icegate-jobmanager/Makefile
[warning] 1-1: Missing required phony target "all"
(minphony)
[warning] 1-1: Missing required phony target "clean"
(minphony)
[warning] 1-1: Missing required phony target "test"
(minphony)
🔇 Additional comments (64)
.gitignore (1)
5-6: LGTM!The additions appropriately ignore temporary development data (
.tmpfor MinIO storage) and IDE-specific files (*.imlfor IntelliJ IDEA).crates/icegate-jobmanager/examples/simple_job.rs (1)
1-101: Well-structured example demonstrating the job manager workflow.The example clearly illustrates the complete lifecycle: task definition → executor mapping → job definition → registry → storage → manager → graceful shutdown. The hardcoded MinIO credentials are appropriate for a local development example.
crates/icegate-jobmanager/src/infra/retrier.rs (2)
53-88: Well-designed retry mechanism with proper cancellation support.The implementation correctly handles the retry loop with:
- Cancellation checks before and during sleep
- Proper attempt counting and max attempts enforcement
- Clean separation between retryable results and hard errors
102-102: The rand API usage is compatible. The code at line 102 correctly usesrand::rng().random_range(0..max_jitter), which is the proper API for rand 0.9.2 as specified in the workspace Cargo.toml.crates/icegate-jobmanager/src/infra/mod.rs (1)
1-2: LGTM!Clean module structure exposing the infrastructure components.
crates/icegate-jobmanager/src/execution/mod.rs (1)
1-3: LGTM!Clear module organization for the execution layer with well-named components. Note: This PR introduces
icegate-jobmanageras a new crate beyond the original 4-crate workspace structure mentioned in learnings, which appears to be an intentional expansion.crates/icegate-jobmanager/src/tests/common/mod.rs (1)
1-6: LGTM!Well-organized test utilities with focused modules for different testing concerns: in-memory storage, manager lifecycle, MinIO environment, and storage instrumentation.
crates/icegate-jobmanager/src/tests/mod.rs (1)
1-14: Good test organization with comprehensive coverage.The decision to place integration tests in
src/to accesspub(crate)types is well-documented and justified. The test modules cover a good range of scenarios including concurrency, lifecycle, failure handling, and edge cases.crates/icegate-jobmanager/examples/docker-compose.yml (1)
16-16: Fix the volume path typo.The path contains a double slash (
.//.tmp/minio_data) which appears to be a typo. While Docker may resolve this correctly, it should be cleaned up for clarity.🔎 Proposed fix
- - ./.tmp/minio_data:/data + - ./.tmp/minio_data:/dataNote: If the original has
.//.tmp, change to./.tmp.Likely an incorrect or invalid review comment.
crates/icegate-jobmanager/src/tests/task_failure_test.rs (1)
1-115: LGTM!The test correctly validates the retry mechanism for flaky tasks. The atomic counter properly tracks attempts across async boundaries with
SeqCstordering, the executor logic correctly fails on the first attempt and succeeds on subsequent attempts, and the assertions verify both the retry count and final job state.crates/icegate-jobmanager/src/tests/cache_invalidation_test.rs (1)
1-63: LGTM!The test effectively validates cache invalidation behavior by tracking storage call counts. The sequence of operations correctly demonstrates:
- Cache miss on first access
- Cache hit when data is unchanged
- Cache invalidation when underlying storage is modified directly
The assertions on
find_meta_callsandget_by_meta_callscounters properly verify that the cache avoids redundant S3 calls when versions match.crates/icegate-jobmanager/src/tests/shutdown_test.rs (1)
22-83: LGTM!The test correctly validates that shutdown cancellation propagates to running executors. The use of
tokio::select!to race cancellation against timeout is idiomatic, and theAtomicBoolflag properly captures whether cancellation was observed.crates/icegate-jobmanager/src/core/error.rs (1)
1-92: LGTM!The error hierarchy is well-structured with clean separation between public (
Error) and internal (InternalError,JobError) types. TheFromimplementations correctly propagate errors while preserving theCancelledvariant for control flow, and theRetryErrortrait implementation enables the retry infrastructure.crates/icegate-jobmanager/src/core/mod.rs (1)
1-4: LGTM!The module structure cleanly organizes the core domain logic into logical submodules. Note: Based on learnings, the original workspace plan mentioned 4 crates (
icegate-common,icegate-query,icegate-ingest,icegate-maintain), and this PR addsicegate-jobmanageras an additional crate, which appears to be an intentional expansion.crates/icegate-jobmanager/examples/simple_sequence_job.rs (2)
1-144: Well-structured example demonstrating sequential job workflow.The example effectively demonstrates:
- Multi-step job definition with dynamic task creation
- Flaky task simulation for retry testing
- S3 storage configuration
- Graceful shutdown handling via Ctrl+C
The code is well-commented and serves as a good reference for users of the library.
46-46: No issues found with the rand API usage.The
rand::rng().random_bool(0.3)call is valid. Therandom_boolmethod exists on the Rng trait and accepts a probability parameter. The code correctly simulates a 30% failure chance.crates/icegate-jobmanager/src/tests/dynamic_task_test.rs (1)
21-146: LGTM!The test effectively validates dynamic task creation by having an initial task spawn multiple dynamic tasks at runtime. The assertions properly verify that all tasks executed and the job completed successfully.
crates/icegate-jobmanager/src/tests/two_jobs_test.rs (1)
21-185: LGTM!Comprehensive integration test for concurrent job execution. The test correctly:
- Runs two distinct jobs with separate execution counters
- Uses payload-based routing in a shared executor
- Validates execution counts match expected iterations × tasks
- Verifies final job states, iteration counts, and zero timeouts
The multi-worker configuration with 10 tokio threads properly exercises concurrency.
crates/icegate-jobmanager/src/tests/job_iterations_test.rs (1)
21-113: LGTM! Well-structured iteration test.The test correctly validates the job iteration mechanism by:
- Tracking iterations with an atomic counter
- Completing tasks to trigger automatic restarts
- Verifying both the iteration count and final job state
The use of
SeqCstordering is appropriate for test verification where strong consistency is needed.crates/icegate-jobmanager/src/tests/common/minio_env.rs (1)
58-74: LGTM!Clean accessor methods and appropriate documentation. The automatic container cleanup via
Dropis correctly noted.crates/icegate-jobmanager/examples/json_model_job.rs (1)
21-140: Good example demonstrating JSON workflow.The example effectively showcases:
- Task data serialization/deserialization with serde
- Two-step sequential task execution
- Storage layer composition (S3 + cache)
- Manager lifecycle management
The unique job code using UUID (line 100) is a good pattern for examples that may be run multiple times.
crates/icegate-jobmanager/src/tests/common/storage_wrapper.rs (2)
55-61: Inconsistent counting forget_job_by_meta.
get_jobincrementslist_and_get_successeson success, butget_job_by_metadoes not. If this is intentional (perhaps to avoid double-counting whenget_jobinternally callsget_job_by_meta), consider adding a comment. Otherwise, add the counter increment for consistency.
45-92: LGTM! Useful test instrumentation.The
CountingStoragewrapper provides good visibility into storage operations for test assertions. The logging with job code, iteration, and version is helpful for debugging concurrency scenarios.crates/icegate-jobmanager/src/infra/metrics.rs (1)
21-34: LGTM! Clean no-op metrics pattern.The
new_disabled()constructor creates valid instrument handles without exporting. Theenabledflag ensures no overhead during recording operations.crates/icegate-jobmanager/src/tests/deadline_expiry_test.rs (1)
21-129: LGTM! Good coverage of deadline expiry behavior.The test effectively validates that:
- Tasks exceeding their deadline are re-picked by other workers
- The attempt counter tracks multiple executions
- Final job state reflects successful completion after retries
The multi-thread runtime and multiple workers ensure realistic concurrency.
crates/icegate-jobmanager/src/tests/simple_job_test.rs (2)
21-121: LGTM! Clean basic execution test.The test properly validates:
- Task execution triggers the executor
- Input data flows correctly to the task
- Job completes with expected state
Using
parking_lot::Mutexfor the captured input is appropriate for the simple synchronization needs.
123-236: LGTM! Good test of dynamic task creation.The multi-task sequence test demonstrates:
- Dynamic task injection via
manager.add_task()- Data flow between tasks through task input
- Sequential task execution ordering
The assertion inside the async block (line 163) will cause a panic on failure, which is acceptable for tests.
crates/icegate-jobmanager/src/storage/cached.rs (2)
34-44: Cache update logic is correct.The
update_cache_if_newerfunction correctly handles the "equal or newer iteration" case, which allows cache updates for version changes within the same iteration.
55-105: Well-structured caching implementation.The
Storageimplementation correctly:
- Validates cache against latest metadata before serving
- Holds the cache lock during fetch to prevent races
- Invalidates cache on conflicts
- Records metrics for cache hits/misses
The use of
Arc<Mutex<CachedJob>>per entry allows concurrent access to different jobs while serializing access to the same job.Also applies to: 107-148, 150-190
crates/icegate-jobmanager/src/tests/common/manager_env.rs (1)
96-101: LGTM!The
Dropimplementation correctly ensures the manager is aborted if the test doesn't explicitly callstop(), preventing resource leaks in test scenarios.crates/icegate-jobmanager/src/tests/concurrent_workers_test.rs (2)
53-73: Primary executor closure capturesexecuted_primary_tasks_countercorrectly.The executor properly clones the Arc before the async block and tracks execution. The pattern of creating secondary tasks and then completing is well-structured.
37-50: LGTM!The test setup properly initializes tracking structures and the MinIO environment. Using
DashMapfor concurrent task execution tracking is appropriate given the multi-threaded test context.crates/icegate-jobmanager/src/execution/jobs_manager.rs (2)
37-66: LGTM!The
JobsManagerHandleprovides clean lifecycle management with gracefulshutdown()and forcedabort()paths. Thewait()method properly drains theJoinSetand handles panics appropriately.
68-72: LGTM!The
Dropimplementation ensures workers are aborted if the handle is dropped without explicit shutdown, preventing resource leaks.crates/icegate-jobmanager/src/lib.rs (1)
21-44: LGTM!The re-export structure is well-organized with clear separation between public API (
pub use) and crate-internal types (pub(crate) use). The compatibility modules provide clean namespacing for related types.crates/icegate-jobmanager/src/storage/mod.rs (2)
48-60: LGTM!The
StorageErrorhelper methods (is_retryable,is_conflict,is_cancelled) provide clean APIs for error classification. Theis_retryable()correctly identifies transient errors that should trigger retry logic.
82-103: LGTM!The
StorageandJobDefinitionRegistrytraits are well-designed. All async methods properly acceptCancellationTokenfor cooperative cancellation, enabling graceful shutdown behavior.crates/icegate-jobmanager/src/execution/job_manager.rs (2)
31-62: LGTM!The
JobManagerImplcorrectly useswrite()locks for mutating operations (add_task,complete_task,fail_task) andread()locks for read-only operations (get_task,get_tasks_by_code). The error mapping toError::Otherprovides a simplified interface for executors.
10-17: LGTM!The
JobManagertrait provides a clean, thread-safe interface (Send + Sync) for task executors. The method signatures are consistent and appropriate for the use case.crates/icegate-jobmanager/src/tests/common/in_memory_storage.rs (2)
80-88:save_jobdoesn't implement optimistic locking.Unlike what the
Storagetrait doc suggests ("Save job with optimistic locking"), this implementation always succeeds without checking if the stored job's version matches the incoming job's version. This may be intentional for a simplified test helper, but could mask concurrency issues in tests that rely on version conflict detection.If this is intentional for simplicity, consider adding a brief comment noting it's a simplified implementation.
38-78: LGTM!The
Storageimplementation correctly handles cancellation checks first in all methods and properly implements atomic counter increments for tracking method calls. The version matching logic inget_job_by_metais correct.crates/icegate-jobmanager/src/core/registry.rs (2)
29-58: LGTM!The
JobRegistry::new()constructor has comprehensive validation: non-empty jobs list, non-empty job codes, and duplicate detection. The error messages are descriptive.
7-19: LGTM!The
TaskExecutorFntype alias is well-designed with appropriate lifetime handling via HRTB (for<'a>). The doc comments clearly explain the design decisions: borrowingJobManagerprevents moving it into background tasks, andCancellationTokenenables early shutdown.crates/icegate-jobmanager/src/execution/worker.rs (1)
538-596: Thesave_job_statemethod is well-structured with proper retry semantics.The retry logic correctly handles:
- Job ownership transfer via
Arc<Mutex<Option<Job>>>- Conflict detection with merge handler callback
- Lock scope minimization (not held across await points)
The pattern of taking ownership before await and restoring after is a good approach for the retry loop.
crates/icegate-jobmanager/src/core/job.rs (3)
65-89: Well-designed status transition validation.The
can_transition_tomethod provides clear, explicit transition rules with proper error reporting. The transition graph is intuitive:
- Started → Running | Failed
- Running → Running | Completed | Failed
- Completed/Failed → Started (for next iteration)
102-131: JobDefinition validation is thorough.Good defensive validation ensuring:
- Non-empty initial tasks
- Non-empty executors
- Every initial task has a corresponding executor
477-481: Boundary condition is correct and intentional.The condition
iter_num >= max_iterationswithiter_numstarting at 1 meansmax_iterationsdirectly specifies the iteration count. Withmax_iterations=1, the job runs exactly 1 iteration (checked after completion, preventing a second iteration). This is confirmed by the test case injob_iterations_test.rs, which expects exactly 3 iterations withmax_iterations=3and verifiesiter_num==3at completion.crates/icegate-jobmanager/src/storage/s3.rs (3)
206-217: Clever use of inverted iteration number for S3 listing optimization.Using
u64::MAX - iter_numensures that the most recent iteration always appears first in lexicographic order, making themax_keys(1)LIST operation efficient. Well-documented with the comment explaining the rationale.
510-583: Thesave_jobimplementation handles both new and existing iterations correctly.The retry logic properly distinguishes between retryable errors and conflicts, and the ETag is correctly propagated back to the job for subsequent operations.
337-393: Atomic write semantics correctly implemented with conditional requests.The code properly uses:
if_none_match("*")for new iterations (create-if-not-exists)if_match(version)for current iteration updates (compare-and-swap)This provides the optimistic concurrency control needed for distributed workers. The TODO at lines 369–370 about multipart uploads and ETag is a valid concern: S3 multipart ETags are composite values (MD5 of concatenated part MD5s with "-" suffix) and differ from single-part ETags. If the SDK auto-enables multipart for larger objects, the ETag format could change between writes, potentially affecting conditional request matching. Worth investigating whether multipart should be explicitly disabled to ensure ETag stability.
crates/icegate-jobmanager/src/core/task.rs (1)
245-247: This is valid Rust code and compiles successfully.The
is_processedmethod is correctly marked asconst fnand can call otherconst fnmethods (is_completed()andis_failed()) throughself. Modern Rust (1.57+) fully supports const fn method calls in const contexts, so the compilation concern raised is unfounded.Likely an incorrect or invalid review comment.
Cargo.toml (6)
9-9: LGTM! New workspace member added correctly.The
icegate-jobmanagercrate is properly added to the workspace members, aligning with the PR objective to introduce job orchestration functionality.
23-24: LGTM! Stricter Rust lints improve code quality.Setting
unused_importsandunused_variablesto deny helps maintain a cleaner codebase by preventing dead code accumulation.
28-45: LGTM! Significantly stricter clippy configuration enforces best practices.The upgraded lint levels (pedantic, nursery, perf, cargo to deny) and style lints (dbg_macro, todo, unimplemented, print_stdout, print_stderr to deny) align with the coding guidelines for strict linting. The
single_match_else = "allow"is a pragmatic exception to avoid overly verbose code.Note that these changes apply workspace-wide and require all existing crates to be compliant.
128-128: No action required. Cargo.toml already ends with a newline character and complies with the coding guidelines.
61-61: UUID version 1.19 with featuresv7,v4, andserdeis valid and supported by the crate.
126-127: Both AWS SDK versions exist and are compatible. The specificationsaws-config = "1.1"andaws-sdk-s3 = "1.11"are valid semver constraints that resolve to available releases (aws-config 1.1.0–1.1.10+ and aws-sdk-s3 1.11.0+) and are compatible with each other.crates/icegate-jobmanager/Cargo.toml (3)
1-14: LGTM! Package metadata properly configured.The package metadata correctly uses workspace inheritance for version, edition, authors, license, and repository. The readme path pointing to the root README and workspace lint inheritance ensure consistency across the workspace.
16-50: LGTM! Workspace dependencies correctly utilized.The crate properly leverages workspace dependencies for error handling, logging, async runtime, AWS S3, serialization, and utilities, ensuring version consistency across the workspace.
53-55: Dev dependencies properly configured.The testcontainers dependency correctly uses workspace inheritance with the
blockingfeature, appropriate for synchronous test setup. File ends with a newline as required.Makefile (2)
22-22: LGTM! Workspace-wide clippy ensures comprehensive linting.The addition of
--workspaceto the clippy command ensures that all workspace members, including the newicegate-jobmanagercrate, are linted consistently with the strict lint configuration.
25-25: LGTM! Consistent workspace-wide clippy fix capability.The
--workspaceflag ensures automatic fixes can be applied across all workspace members, maintaining consistency with the clippy check target.crates/icegate-jobmanager/Makefile (2)
3-7: LGTM! Example infrastructure targets properly configured.The targets provide convenient commands to spin up and tear down example infrastructure (likely MinIO for S3 testing). The
--detachflag for the up command is appropriate for background operation.
9-16: LGTM! Standard Cargo targets properly implemented.The
test,clean, andclippytargets are correctly implemented. The clippy target appropriately scopes to theicegate-jobmanagerpackage with strict warning-as-error enforcement (-D warnings), consistent with workspace linting policy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (6)
crates/icegate-jobmanager/src/storage/mod.rs (1)
95-96: Doc comment issue resolved.The doc comment for
save_jobnow correctly states it "ReturnsConcurrentModificationif version mismatch." This addresses the previous review feedback.crates/icegate-jobmanager/src/execution/jobs_manager.rs (1)
84-87: Worker count validation now present.The validation for
worker_count == 0has been added innew(), returning an appropriate error. This addresses the previous review concern about an emptyJobsManagerHandlebeing returned silently.crates/icegate-jobmanager/src/execution/worker.rs (2)
169-169: Typo fixed: "skipped" is now correct.The previous review flagged "skiped" → "skipped". This has been corrected.
145-155: Jitter logic appears inverted when randomization is disabled.When
poll_interval_randomization.is_zero(), the code setsjitter_ms = self.config.poll_interval(line 147), thenwait_duration = jitter_ms + poll_interval(line 155). This means the worker waits2 * poll_intervalwhen randomization is disabled, which seems unintentional.If randomization is disabled, jitter should be zero:
🔎 Proposed fix
// Reduce strong concurrency between workers let jitter_ms = if self.config.poll_interval_randomization.is_zero() { - self.config.poll_interval + Duration::ZERO } else { #[allow(clippy::cast_possible_truncation)] Duration::from_millis( rand::rng().random_range(0..self.config.poll_interval_randomization.as_millis() as u64), ) }; wait_duration = jitter_ms + poll_interval;crates/icegate-jobmanager/src/storage/s3.rs (2)
138-150: Bucket creation error handling now distinguishes 404 from other errors.The code now uses pattern matching on
SdkError::ServiceErrorwith status code 404 check before attempting bucket creation. Other errors (auth failures, etc.) are properly propagated. This addresses the previous review concern.
24-47:created_by_workeris now persisted correctly.The
TaskJsonstruct includescreated_by_worker(line 29), and bothtask_to_json(line 266) andtask_from_json(line 306) handle this field. This addresses the previous review concern about losing creator information during S3 round-trips.Also applies to: 260-276, 300-316
🧹 Nitpick comments (12)
crates/icegate-jobmanager/Makefile (1)
9-10: Consider scoping the test target to this package for consistency.The
clippytarget is scoped toicegate-jobmanagerwith the-pflag, buttestruns all workspace tests. For consistency and faster local development feedback, consider scoping the test target similarly.🔎 Proposed change for package-scoped tests
test: - cargo test + cargo test -p icegate-jobmanagercrates/icegate-jobmanager/src/tests/cache_invalidation_test.rs (1)
31-62: Consider adding comments to clarify the cache invalidation strategy.The test exercises a specific cache invalidation pattern where:
find_metais called on everyget_jobto check for version changesget_by_metais only called when the cache is stale or emptyAdding comments before each test phase would improve maintainability and help future readers understand the expected behavior.
💡 Suggested comment structure
+ // Phase 1: Direct storage save (bypasses cache) storage.save_job(&mut job, &cancel_token).await?; assert_eq!(storage.version(), 1); + // Phase 2: First cached get (cache miss, populates cache) let job_from_cache = cached_storage.get_job(&job_code, &cancel_token).await?; assert_eq!(storage.find_meta_calls(), 1); assert_eq!(storage.get_by_meta_calls(), 1);crates/icegate-jobmanager/src/tests/two_jobs_test.rs (1)
22-22: Reduce excessive worker threads allocation.The test configures 10 tokio worker threads but only uses 2 manager workers. This wastes resources and could slow down the test suite.
🔎 Proposed fix
-#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn test_two_jobs_concurrent() -> Result<(), Box<dyn std::error::Error>> {Using 4 threads provides sufficient parallelism (2× the number of manager workers) while being more resource-efficient.
crates/icegate-jobmanager/src/tests/deadline_expiry_test.rs (2)
23-23: Reduce excessive worker threads allocation.The test configures 10 tokio worker threads but only uses 3 manager workers. This wastes resources and could slow down the test suite.
🔎 Proposed fix
-#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +#[tokio::test(flavor = "multi_thread", worker_threads = 6)] async fn test_task_deadline_expiry() -> Result<(), Box<dyn std::error::Error>> {Using 6 threads provides sufficient parallelism (2× the number of manager workers) while being more resource-efficient.
45-48: Clarify the deadline expiry strategy.The test sleeps for 500ms (5× the deadline) on the first 3 attempts. While this works, a shorter sleep (e.g., 150ms or 2× the deadline) would make the test faster without sacrificing reliability.
💡 Alternative approach
if attempt <= 3 { - // First attempt: exceed deadline so another worker can re-pick. - tokio::time::sleep(Duration::from_millis(500)).await; + // First 3 attempts: exceed deadline so another worker can re-pick + tokio::time::sleep(Duration::from_millis(150)).await; }This reduces the total sleep time from 1500ms to 450ms, making the test ~1 second faster.
crates/icegate-jobmanager/src/storage/cached.rs (2)
11-16: TODO acknowledged: consider TTL/LRU cache to bound memory.The TODO on line 15 is a valid concern. Without eviction, the cache will grow unboundedly with each unique
JobCode. For production use, consider using a crate likemokaorquick_cachethat provides TTL/LRU semantics out of the box.Would you like me to open an issue to track implementing TTL/LRU eviction for the job cache?
36-44: Consider checking version for staleness, not just iteration.
update_cache_if_neweronly comparesiter_num. If two updates occur within the same iteration (e.g., task state changes), the cache might hold a stale job with an older version. Consider also comparing versions wheniter_numis equal.🔎 Proposed refinement
fn update_cache_if_newer(storage_job: &Job, cached_job: &mut CachedJob) { if let Some(ref current_job) = cached_job.job { - if storage_job.iter_num() >= current_job.iter_num() { + if storage_job.iter_num() > current_job.iter_num() + || (storage_job.iter_num() == current_job.iter_num() + && storage_job.version() != current_job.version()) + { cached_job.job = Some(storage_job.clone()); } } else { cached_job.job = Some(storage_job.clone()); } }crates/icegate-jobmanager/src/tests/concurrent_workers_test.rs (2)
194-198: Clarify the PUT count formula in comments.The formula
(((secondary_task_count + 1) * 2) + 1) + timeoutsaccounts for:
- 1 PUT for job creation
- 2 PUTs per task (start + complete)
- Additional PUTs for timeout retries
Consider adding a more detailed comment explaining each component for future maintainability.
🔎 Suggested comment improvement
assert_eq!( counting_storage.put_successes(), - (((secondary_task_count + 1) * 2) + 1) as u64 + timeouts, // 1 PUT for create job, 2 PUT for each task + // Expected PUTs: + // - 1 for initial job creation + // - 2 per task (1 to start, 1 to complete) × (primary + secondary tasks) + // - additional PUTs for task timeout retries + (((secondary_task_count + 1) * 2) + 1) as u64 + timeouts, "all tasks must be executed" );
41-44: Magic numbers could be extracted as constants.The test configuration uses several magic numbers (
10,30,100,20). Consider extracting key values as named constants at the top of the function for clarity.🔎 Suggested refactor
async fn run_concurrent_workers_test(use_cached_storage: bool) -> Result<(), Box<dyn std::error::Error>> { - let secondary_task_count = 10; - let max_iterations = 1u64; - let workers_cnt = 10; + const SECONDARY_TASK_COUNT: usize = 10; + const MAX_ITERATIONS: u64 = 1; + const WORKER_COUNT: usize = 10; + const COMPLETION_TIMEOUT: Duration = Duration::from_secs(30); + + let secondary_task_count = SECONDARY_TASK_COUNT; + let max_iterations = MAX_ITERATIONS; + let workers_cnt = WORKER_COUNT;crates/icegate-jobmanager/src/execution/worker.rs (2)
66-75: Consider usingstd::panic::catch_unwindhelper.The
panic_payload_to_stringfunction is straightforward. Consider placing it in a shared utility module if panic handling is needed elsewhere.
91-91: TODO: Task timeout cancellation is not implemented.Line 91 notes
TODO(med): cancel task if timeout. Currently, tasks that exceed their deadline are logged (line 457) but not cancelled mid-execution. This could lead to resource exhaustion if tasks hang.Would you like me to open an issue to track implementing task timeout cancellation?
crates/icegate-jobmanager/src/storage/s3.rs (1)
18-19: TODOs noted for cleanup and network testing.Line 18-19 TODOs for old job state cleanup and Toxiproxy testing are valid future work items. Consider tracking these as issues.
Would you like me to open issues to track:
- Mechanism to clean up old job states
- Toxiproxy-based network failure testing
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (16)
crates/icegate-jobmanager/Makefilecrates/icegate-jobmanager/src/execution/jobs_manager.rscrates/icegate-jobmanager/src/execution/worker.rscrates/icegate-jobmanager/src/storage/cached.rscrates/icegate-jobmanager/src/storage/mod.rscrates/icegate-jobmanager/src/storage/s3.rscrates/icegate-jobmanager/src/tests/cache_invalidation_test.rscrates/icegate-jobmanager/src/tests/common/mod.rscrates/icegate-jobmanager/src/tests/concurrent_workers_test.rscrates/icegate-jobmanager/src/tests/deadline_expiry_test.rscrates/icegate-jobmanager/src/tests/dynamic_task_test.rscrates/icegate-jobmanager/src/tests/job_iterations_test.rscrates/icegate-jobmanager/src/tests/shutdown_test.rscrates/icegate-jobmanager/src/tests/simple_job_test.rscrates/icegate-jobmanager/src/tests/task_failure_test.rscrates/icegate-jobmanager/src/tests/two_jobs_test.rs
🚧 Files skipped from review as they are similar to previous changes (4)
- crates/icegate-jobmanager/src/tests/shutdown_test.rs
- crates/icegate-jobmanager/src/tests/task_failure_test.rs
- crates/icegate-jobmanager/src/tests/simple_job_test.rs
- crates/icegate-jobmanager/src/tests/job_iterations_test.rs
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{rs,toml}
📄 CodeRabbit inference engine (AGENTS.md)
Use
cargo buildfor debug builds,cargo build --releasefor release builds, and specific binary builds withcargo build --bin <name>
Files:
crates/icegate-jobmanager/src/tests/cache_invalidation_test.rscrates/icegate-jobmanager/src/tests/dynamic_task_test.rscrates/icegate-jobmanager/src/tests/common/mod.rscrates/icegate-jobmanager/src/tests/deadline_expiry_test.rscrates/icegate-jobmanager/src/tests/two_jobs_test.rscrates/icegate-jobmanager/src/tests/concurrent_workers_test.rscrates/icegate-jobmanager/src/execution/worker.rscrates/icegate-jobmanager/src/storage/cached.rscrates/icegate-jobmanager/src/storage/mod.rscrates/icegate-jobmanager/src/execution/jobs_manager.rscrates/icegate-jobmanager/src/storage/s3.rs
**/*.rs
📄 CodeRabbit inference engine (AGENTS.md)
**/*.rs: Run all tests withcargo test, specific tests withcargo test test_name, and use--nocaptureflag to show test output
Usemake fmtto check code format; DO NOT run via rustup because it doesn't respect rustfmt.toml
Usemake clippyto run the linter with warnings as errors
Runmake auditto perform security audits and usemake installto install cargo-audit
Runmake cito execute all CI checks (check, fmt, clippy, test, audit)
Use rustfmt for code formatting with configuration in rustfmt.toml
Files:
crates/icegate-jobmanager/src/tests/cache_invalidation_test.rscrates/icegate-jobmanager/src/tests/dynamic_task_test.rscrates/icegate-jobmanager/src/tests/common/mod.rscrates/icegate-jobmanager/src/tests/deadline_expiry_test.rscrates/icegate-jobmanager/src/tests/two_jobs_test.rscrates/icegate-jobmanager/src/tests/concurrent_workers_test.rscrates/icegate-jobmanager/src/execution/worker.rscrates/icegate-jobmanager/src/storage/cached.rscrates/icegate-jobmanager/src/storage/mod.rscrates/icegate-jobmanager/src/execution/jobs_manager.rscrates/icegate-jobmanager/src/storage/s3.rs
**/*.{rs,toml,md}
📄 CodeRabbit inference engine (AGENTS.md)
Ensure each file ends with a newline; do not duplicate if it already exists
Files:
crates/icegate-jobmanager/src/tests/cache_invalidation_test.rscrates/icegate-jobmanager/src/tests/dynamic_task_test.rscrates/icegate-jobmanager/src/tests/common/mod.rscrates/icegate-jobmanager/src/tests/deadline_expiry_test.rscrates/icegate-jobmanager/src/tests/two_jobs_test.rscrates/icegate-jobmanager/src/tests/concurrent_workers_test.rscrates/icegate-jobmanager/src/execution/worker.rscrates/icegate-jobmanager/src/storage/cached.rscrates/icegate-jobmanager/src/storage/mod.rscrates/icegate-jobmanager/src/execution/jobs_manager.rscrates/icegate-jobmanager/src/storage/s3.rs
🧠 Learnings (5)
📓 Common learnings
Learnt from: CR
Repo: icegatetech/icegate PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-27T13:38:58.955Z
Learning: Organize code in Cargo workspace with 4 crates: icegate-common, icegate-query, icegate-ingest, and icegate-maintain
📚 Learning: 2025-12-27T13:38:58.955Z
Learnt from: CR
Repo: icegatetech/icegate PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-27T13:38:58.955Z
Learning: Organize code in Cargo workspace with 4 crates: icegate-common, icegate-query, icegate-ingest, and icegate-maintain
Applied to files:
crates/icegate-jobmanager/src/tests/cache_invalidation_test.rscrates/icegate-jobmanager/src/tests/common/mod.rscrates/icegate-jobmanager/src/execution/worker.rscrates/icegate-jobmanager/src/storage/mod.rscrates/icegate-jobmanager/src/execution/jobs_manager.rs
📚 Learning: 2025-12-27T13:38:58.955Z
Learnt from: CR
Repo: icegatetech/icegate PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-27T13:38:58.955Z
Learning: Applies to **/*.rs : Use `make clippy` to run the linter with warnings as errors
Applied to files:
crates/icegate-jobmanager/Makefile
📚 Learning: 2025-12-27T13:38:58.955Z
Learnt from: CR
Repo: icegatetech/icegate PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-27T13:38:58.955Z
Learning: Applies to **/*.rs : Run `make ci` to execute all CI checks (check, fmt, clippy, test, audit)
Applied to files:
crates/icegate-jobmanager/Makefile
📚 Learning: 2025-12-27T13:38:58.955Z
Learnt from: CR
Repo: icegatetech/icegate PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-27T13:38:58.955Z
Learning: Applies to **/*.rs : Run `make audit` to perform security audits and use `make install` to install cargo-audit
Applied to files:
crates/icegate-jobmanager/Makefile
🧬 Code graph analysis (7)
crates/icegate-jobmanager/src/tests/cache_invalidation_test.rs (3)
crates/icegate-jobmanager/src/tests/common/mod.rs (1)
init_tracing(12-19)crates/icegate-jobmanager/src/storage/cached.rs (1)
new(26-32)crates/icegate-jobmanager/src/tests/common/in_memory_storage.rs (4)
new(16-23)version(25-27)find_meta_calls(29-31)get_by_meta_calls(33-35)
crates/icegate-jobmanager/src/tests/dynamic_task_test.rs (2)
crates/icegate-jobmanager/src/tests/common/manager_env.rs (2)
new(16-33)storage(90-92)crates/icegate-jobmanager/src/tests/common/minio_env.rs (1)
new(20-56)
crates/icegate-jobmanager/src/tests/two_jobs_test.rs (6)
crates/icegate-jobmanager/src/core/job.rs (7)
max_iterations(145-147)new(15-17)new(103-131)new(168-195)status(338-340)from(31-33)from(37-39)crates/icegate-jobmanager/src/execution/worker.rs (2)
new(98-115)default(33-40)crates/icegate-jobmanager/src/storage/s3.rs (1)
new(100-162)crates/icegate-jobmanager/src/core/task.rs (6)
new(13-15)new(71-76)new(124-140)status(184-186)from(29-31)from(35-37)crates/icegate-jobmanager/src/tests/common/manager_env.rs (2)
new(16-33)storage(90-92)crates/icegate-jobmanager/src/core/registry.rs (1)
new(29-58)
crates/icegate-jobmanager/src/tests/concurrent_workers_test.rs (3)
crates/icegate-jobmanager/src/tests/common/mod.rs (1)
init_tracing(12-19)crates/icegate-jobmanager/src/tests/common/minio_env.rs (1)
new(20-56)crates/icegate-jobmanager/src/tests/common/storage_wrapper.rs (3)
put_attempts(32-34)put_successes(36-38)list_and_get_successes(40-42)
crates/icegate-jobmanager/src/execution/worker.rs (3)
crates/icegate-jobmanager/src/core/job.rs (11)
id(326-328)new(15-17)new(103-131)new(168-195)code(133-135)code(330-332)updated_by_worker_id(366-368)iter_num(334-336)from(31-33)from(37-39)version(342-344)crates/icegate-jobmanager/src/storage/cached.rs (1)
new(26-32)crates/icegate-jobmanager/src/storage/s3.rs (1)
new(100-162)
crates/icegate-jobmanager/src/storage/cached.rs (4)
crates/icegate-jobmanager/src/storage/s3.rs (5)
new(100-162)get_job(400-427)get_job_by_meta(429-467)find_job_meta(469-509)save_job(512-584)crates/icegate-jobmanager/src/infra/metrics.rs (3)
new(36-75)record_cache_hit(123-128)record_cache_miss(130-135)crates/icegate-jobmanager/src/tests/common/in_memory_storage.rs (6)
new(16-23)get_job(40-49)version(25-27)get_job_by_meta(51-62)find_job_meta(64-78)save_job(80-88)crates/icegate-jobmanager/src/storage/mod.rs (5)
get_job(87-87)get_job(101-101)get_job_by_meta(90-90)find_job_meta(93-93)save_job(96-96)
crates/icegate-jobmanager/src/execution/jobs_manager.rs (5)
crates/icegate-jobmanager/src/execution/worker.rs (3)
default(33-40)new(98-115)start(121-157)crates/icegate-jobmanager/src/infra/retrier.rs (2)
default(19-36)new(49-51)crates/icegate-jobmanager/src/storage/cached.rs (1)
new(26-32)crates/icegate-jobmanager/src/infra/metrics.rs (1)
new(36-75)crates/icegate-jobmanager/src/core/registry.rs (1)
new(29-58)
🪛 checkmake (0.2.2)
crates/icegate-jobmanager/Makefile
[warning] 1-1: Missing required phony target "all"
(minphony)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: Test beta on ubuntu-latest
- GitHub Check: Test stable on ubuntu-latest
- GitHub Check: Benchmark
- GitHub Check: Security Audit
- GitHub Check: Build Release
🔇 Additional comments (16)
crates/icegate-jobmanager/Makefile (2)
1-7: LGTM! Infrastructure targets are well-defined.The
.PHONYdeclaration correctly lists all targets, and the docker compose commands appropriately manage example infrastructure for local development and testing.
15-16: LGTM! Clippy target follows project conventions.The clippy target correctly scopes to the icegate-jobmanager package and treats warnings as errors, aligning with the project's linting standards.
crates/icegate-jobmanager/src/tests/common/mod.rs (1)
12-19: LGTM!The
init_tracingfunction correctly usesOnceLockto ensure single initialization across multiple test runs, and the fallback filter level is appropriate for test debugging.crates/icegate-jobmanager/src/tests/two_jobs_test.rs (1)
168-169: The timeout calculation logic is correct.Lines 168-169 and 183-184 calculate timeouts by summing
(attempt - 1)for all tasks. Theattemptfield starts at 0 when a task is created, increments to 1 on the firststart()call (first execution attempt), and increments further for each retry due to timeout. This means(attempt - 1)correctly counts the number of retries/timeouts: a fresh task with no timeouts hasattempt = 1, yielding(1 - 1) = 0. The test expectation is valid.crates/icegate-jobmanager/src/storage/cached.rs (1)
93-107: Retry loop is now bounded—good fix.The previous unbounded
loophas been replaced with a boundedfor _ in 0..2loop (max 2 attempts). This addresses the earlier concern about indefinite spinning under contention. The fallback toStorageError::ConcurrentModificationon exhaustion is appropriate.crates/icegate-jobmanager/src/tests/concurrent_workers_test.rs (1)
1-201: LGTM: Comprehensive concurrent worker test.The test thoroughly validates concurrent task processing with proper tracking, cleanup, and assertions. Good coverage for both S3 and cached storage variants.
crates/icegate-jobmanager/src/storage/mod.rs (2)
11-46: Well-structured error enum with appropriate variants.
StorageErrorcovers the essential failure modes (not found, conflicts, timeouts, auth, rate limiting, etc.) with clear helper predicates. Thethiserrorderivation provides good error messages.
82-97: Storage trait design is clean and async-friendly.The trait methods accept
CancellationTokenfor cooperative cancellation and returnStorageResult. The separation ofget_jobvsget_job_by_metavsfind_job_metaprovides flexibility for callers to optimize storage access patterns.crates/icegate-jobmanager/src/execution/jobs_manager.rs (3)
68-72: Drop implementation provides safety net.The
Dropimpl callingabort()ensures workers are cancelled even if the handle is dropped without explicit shutdown. This prevents resource leaks and orphaned tasks.
117-125: Worker panic handling needs attention (per TODO).The TODO on line 118 notes that worker panics are not handled gracefully—the worker dies but the manager continues. Consider whether panics should trigger a restart or escalate to the manager level.
Is the current behavior (worker dies silently, others continue) acceptable for production, or should there be a restart mechanism or escalation policy?
37-66: JobsManagerHandle lifecycle API is well-designed.The separation of
shutdown()(graceful) vsabort()(forceful) with properwait()semantics provides good control over worker lifecycle. The error logging for panics inwait()is appropriate.crates/icegate-jobmanager/src/execution/worker.rs (2)
527-599: Complex but well-structured retry logic insave_job_state.The
save_job_statemethod handles optimistic locking with a merge callback pattern. The use ofArc<Mutex<Option<Job>>>to manage job ownership across async retry boundaries is sound. TheMergeDecisionenum cleanly separates retry vs done outcomes.
431-440: Good panic handling for task executors.Wrapping the executor in
AssertUnwindSafeand catching panics prevents a single misbehaving task from crashing the worker. The panic payload is converted to a string for logging.crates/icegate-jobmanager/src/storage/s3.rs (3)
206-217: Inverted iteration numbering for S3 listing is clever.Using
u64::MAX - iter_numensures newer iterations appear first in S3's lexicographic ordering, makingmax_keys(1)efficient for fetching the latest state. Good design choice documented with comments.
370-371: TODO: Verify multipart upload behavior with ETag.The comment on lines 370-371 notes potential issues with multipart uploads affecting ETag atomicity. This is worth investigating for large job states.
Consider testing with job states large enough to trigger multipart uploads to verify ETag behavior remains correct for optimistic locking.
99-162: S3 client initialization is thorough.The setup includes proper timeout configuration, path-style access forcing, and bucket existence check with creation fallback. The credentials are passed explicitly which is appropriate for non-AWS S3-compatible services.
| serde_json = { workspace = true } | ||
|
|
||
| # Metrics | ||
| opentelemetry = { version = "0.31.0", features = ["metrics"] } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason to use specific version in this cargo, and do not use workspace one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Version 0.27 was released in 2024-11
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fill free to update workspace version!
| TaskWorkerMismatch, | ||
|
|
||
| #[error("invalid job status transition from {from} to {to}")] | ||
| InvalidStatusTransition { from: String, to: String }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: Is it possible to use enum here
| /// Job identifier used to select a job definition and persisted state. | ||
| #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] | ||
| #[serde(transparent)] | ||
| pub struct JobCode(String); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From my POV Code is not referred to ID, may be use JobID?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JobID and TaskId are already there - these are technical identifiers. Name usually does not imply uniqueness, type is more suitable for pre-known typing. It's as if code is the most suitable here as a unique user key.
Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.