Conversation
dedbdc9 to
5993b7b
Compare
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #2 +/- ##
==========================================
+ Coverage 99.48% 99.82% +0.33%
==========================================
Files 13 18 +5
Lines 584 1126 +542
Branches 63 122 +59
==========================================
+ Hits 581 1124 +543
+ Misses 3 2 -1
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
1ebad76 to
69ca93c
Compare
69ca93c to
f65e640
Compare
There was a problem hiding this comment.
Pull Request Overview
This PR implements a hierarchical work distributor as an alternative to the naive distributor, introducing a tree-based task distribution pattern for improved scalability in MPI environments. The hierarchical approach organizes workers in a coordinator-worker hierarchy to reduce communication overhead with the root manager.
Key changes include:
- Implementation of
HierarchicalMPIWorkDistributorwith tree-based task distribution - Addition of utility headers for assertions and printing support
- Updates to test suite to handle both ordered and unordered result patterns
- Enhanced test coverage with additional MPI rank configurations
Reviewed Changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| include/dynampi/impl/hierarchical_distributor.hpp | Core implementation of hierarchical work distribution with coordinator-worker tree structure |
| include/dynampi/utilities/assert.hpp | Custom assertion macros with MPI-aware error reporting and source location support |
| include/dynampi/utilities/printing.hpp | Stream operators for common container types to support debugging output |
| test/mpi/test_distributers.cpp | Updated tests to handle both distributors with conditional logic for ordered/unordered results |
| include/dynampi/impl/naive_distributor.hpp | Minor refactoring to use communicator probe method |
| include/dynampi/mpi/mpi_communicator.hpp | Added probe method for non-blocking message detection |
| test/lsan.supp | Additional leak suppression entries for MPI library components |
| test/CMakeLists.txt | Extended test matrix to include 16-rank configuration |
| include/dynampi/mpi/mpi_types.hpp | Added string header include |
| benchmark/asymptotic_distribution_throughput.cpp | Commented out worker task count statistics output |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
d05ab60 to
ce38e0f
Compare
1ec6669 to
5c62ccb
Compare
2199113 to
7451805
Compare
7451805 to
82d0ba0
Compare
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughLarge refactor adding hierarchical and improved naive work distributors with batching and RunConfig, new MPIGroup/MPICommunicator features, assertion and printing utilities, many benchmarks and helper scripts, expanded unit and MPI tests, CI changes to limit/serialize MPI test ranks, and assorted build/config updates. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Root as Root Manager
participant Leader as Leader/Coordinator
participant Worker as Leaf Worker
rect rgba(200,220,255,0.5)
Root->>Leader: submit TASK / TASK_BATCH
end
loop workers request work
Worker->>Leader: REQUEST or REQUEST_BATCH
alt tasks available
Leader->>Worker: TASK or TASK_BATCH
Worker->>Worker: execute task(s)
Worker-->>Leader: RESULT or RESULT_BATCH
else no tasks
Leader-->>Worker: DONE
end
end
rect rgba(200,220,255,0.5)
Leader-->>Root: aggregate RESULT_BATCH / DONE
end
sequenceDiagram
autonumber
participant A as Rank A (sender)
participant B as Rank B (receiver)
participant R as Rank 0 (collector)
rect rgba(255,240,200,0.5)
A->>B: send (SEND / ISEND / BSEND / SSEND)
B-->>A: pong
A->>A: measure RTT & timings
A->>R: MPI_Gatherv(csv line)
B->>R: MPI_Gatherv(csv line)
R->>R: write CSV outfile
end
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 6
🤖 Fix all issues with AI agents
In `@benchmark/scripts/launch_aurora_naive_shutdown.sh`:
- Line 70: The script calls "${LAUNCHER}" with "${LAUNCHER_ARGS[@]}" which will
cause an error under set -u when LAUNCHER_ARGS is an empty array; update both
invocations that use LAUNCHER_ARGS to guard the expansion using the safe pattern
"${LAUNCHER_ARGS[@]+"${LAUNCHER_ARGS[@]}"}" (or an equivalent that yields
nothing when the array is empty) so the command runs whether LAUNCHER_ARGS is
empty or not; ensure you update every place where LAUNCHER_ARGS is passed to
"${LAUNCHER}" (including the second occurrence mentioned).
- Around line 57-61: The rank-per-node branch incorrectly falls back to a
hardcoded default (CORES_PER_NODE:-102) when rpn=="core"; update the
ranks_per_node assignment so that when rpn is "core" or "cores" it uses the
computed ALLOC_CORES_PER_NODE (if set) instead of the hardcoded default, e.g.
prefer ALLOC_CORES_PER_NODE and only fall back to CORES_PER_NODE or a literal
default if ALLOC_CORES_PER_NODE is unset; change the block that sets
ranks_per_node (the rpn check) to reference ALLOC_CORES_PER_NODE so the later
allocation check against PBS_NCPUS uses the correct value.
In `@benchmark/scripts/launch_aurora_strong_scaling.sh`:
- Line 81: The launcher invocation uses the LAUNCHER_ARGS array directly which
will cause failures under set -u when the array is empty; update both
occurrences where LAUNCHER is invoked (the lines using "${LAUNCHER}"
"${LAUNCHER_ARGS[@]}" -n "${total_ranks}" --ppn "${ranks_per_node}") to use the
safe expansion form that guards against empty arrays (i.e., expand LAUNCHER_ARGS
only when non-empty) so the script works with set -u enabled while preserving
existing arguments.
In `@benchmark/scripts/launch_frontier_naive_shutdown.sh`:
- Line 50: The launcher invocation can fail under set -u when LAUNCHER_ARGS is
an empty array; update the two usages of the array expansion in the command that
calls "${LAUNCHER}" to use the safe expansion pattern
${LAUNCHER_ARGS[@]+"${LAUNCHER_ARGS[@]}"} so an empty LAUNCHER_ARGS won't
trigger an unbound variable error—replace the current "${LAUNCHER_ARGS[@]}"
occurrences with the safe pattern for both launcher invocations.
In `@benchmark/scripts/submit_aurora_naive_shutdown.sh`:
- Line 15: The expansion of the QSUB_ARGS array can fail under set -u when
QSUB_ARGS is empty; update any uses of "${QSUB_ARGS[@]}" (the QSUB_ARGS array
expansion) to the safe idiom ${QSUB_ARGS[@]+"${QSUB_ARGS[@]}"} so the expansion
is skipped when the array is unset/empty, or alternatively ensure QSUB_ARGS is
initialized to an empty array before use (the variable and the expansion sites
reference QSUB_ARGS and its "${QSUB_ARGS[@]}" expansion).
In `@benchmark/scripts/submit_frontier_naive_shutdown.sh`:
- Around line 31-37: The submit script expands ${SLURM_JOB_ID:-manual} at
submission time, so change the --wrap value to defer expansion to job runtime by
escaping the dollar sign (use \${SLURM_JOB_ID:-manual}) inside the --wrap
string; update the sbatch invocation that builds OUTPUT_DIR (where job_name,
nodes, ROOT_DIR, SCRIPT and submit_args are used) to use the escaped variable so
the job sees the real SLURM_JOB_ID when it runs.
♻️ Duplicate comments (2)
include/dynampi/impl/naive_distributor.hpp (1)
251-254: Worker lacks exception handling for task execution.If
m_worker_functionthrows an exception, the worker will crash without notifying the manager, potentially causing the manager to hang indefinitely waiting for a result. Consider wrapping the call in a try-catch block, similar to the hierarchical distributor's approach.🛠️ Suggested fix
- ResultT result = m_worker_function(std::move(message)); - - m_communicator.send(result, m_config.manager_rank, Tag::RESULT); + try { + ResultT result = m_worker_function(std::move(message)); + m_communicator.send(result, m_config.manager_rank, Tag::RESULT); + } catch (const std::exception& e) { + // Consider sending an error notification to the manager + // m_communicator.send(/* error info */, m_config.manager_rank, Tag::ERROR); + throw; // Re-throw after notification + }benchmark/scripts/launch_aurora_strong_scaling.sh (1)
61-62: UseALLOC_CORES_PER_NODEfor thecorealias.This issue was flagged in a previous review. When
rpn=core, the script usesCORES_PER_NODE:-102instead of the dynamically computedALLOC_CORES_PER_NODE, which can exceed the scheduler allocation and trigger the hard exit on line 66-68.🐛 Proposed fix
if [[ "${rpn}" == "core" || "${rpn}" == "cores" ]]; then - ranks_per_node="${CORES_PER_NODE:-102}" + ranks_per_node="${ALLOC_CORES_PER_NODE}" else
🧹 Nitpick comments (4)
include/dynampi/impl/naive_distributor.hpp (1)
337-342: Verify invariant:task_id >= m_front_result_idx.The relative indexing
vector_idx = task_id - m_front_result_idxassumes the task's result slot hasn't already been collected and erased. If a result arrives for a task whose slot was already collected (i.e.,task_id < m_front_result_idx), this would cause an underflow/wraparound invector_idx.This should be safe given the current flow (workers are tracked and results are collected only when contiguous), but consider adding a defensive assertion to catch any future regressions.
🔒 Defensive assertion
// Store in vector (using relative indexing) + assert(task_id >= static_cast<int64_t>(m_front_result_idx) && + "Result received for already-collected task"); size_t vector_idx = task_id - m_front_result_idx;benchmark/scripts/launch_aurora_strong_scaling.sh (1)
76-78: Fix indentation inconsistency.The nested loop body has inconsistent indentation—line 78 is indented more than lines 77 and 79, making the structure harder to follow.
🧹 Suggested fix
for mode in "${MODES[@]}"; do - for expected_us in "${TASK_US_LIST[@]}"; do - echo "Running ${SYSTEM} nodes=${nodes} ranks_per_node=${ranks_per_node} dist=${dist} mode=${mode} expected_us=${expected_us}" + for expected_us in "${TASK_US_LIST[@]}"; do + echo "Running ${SYSTEM} nodes=${nodes} ranks_per_node=${ranks_per_node} dist=${dist} mode=${mode} expected_us=${expected_us}"benchmark/scripts/launch_frontier_naive_shutdown.sh (1)
36-45: Consider adding allocation validation.Unlike the Aurora launcher, this script doesn't validate that
ranks_per_nodedoesn't exceed the allocated cores. This could lead to oversubscription or runtime failures when the user specifies a custom value inRANKS_PER_NODE_LIST.♻️ Suggested enhancement
+get_allocated_cores_per_node() { + if [[ -n "${SLURM_JOB_CPUS_PER_NODE:-}" ]]; then + local cpus="${SLURM_JOB_CPUS_PER_NODE%%(*}" + echo "${cpus%%,*}" + return + fi + echo "${CORES_PER_NODE:-56}" +} + +ALLOC_CORES_PER_NODE="$(get_allocated_cores_per_node)" + for nodes in "${NODE_LIST[@]}"; do for rpn in "${RANKS_PER_NODE_LIST[@]}"; do if [[ "${rpn}" == "core" || "${rpn}" == "cores" ]]; then - if [[ -n "${SLURM_JOB_CPUS_PER_NODE:-}" ]]; then - ranks_per_node="${SLURM_JOB_CPUS_PER_NODE%%(*}" - ranks_per_node="${ranks_per_node%%,*}" - else - ranks_per_node="${CORES_PER_NODE:-56}" - fi + ranks_per_node="${ALLOC_CORES_PER_NODE}" else ranks_per_node="${rpn}" fi + if [[ "${ranks_per_node}" -gt "${ALLOC_CORES_PER_NODE}" ]]; then + echo "Requested ranks_per_node=${ranks_per_node} exceeds allocation ${ALLOC_CORES_PER_NODE}" >&2 + exit 1 + fibenchmark/naive_shutdown_time.cpp (1)
147-157: Consider adding error check for file open failure.If
std::ofstreamfails to open the file (e.g., invalid path, permissions), the write operations silently fail. Adding a check improves robustness for debugging benchmark runs.Proposed fix
std::ofstream out(opts.output_path, std::ios::app); + if (!out) { + std::cerr << "Error: Could not open output file: " << opts.output_path << std::endl; + } else { if (needs_header) { write_csv_header(out); } write_csv_row(out, opts, result); + }
| if [[ "${rpn}" == "core" || "${rpn}" == "cores" ]]; then | ||
| ranks_per_node="${CORES_PER_NODE:-102}" | ||
| else | ||
| ranks_per_node="${rpn}" | ||
| fi |
There was a problem hiding this comment.
Use ALLOC_CORES_PER_NODE for the core alias.
When rpn=core, the script falls back to CORES_PER_NODE:-102 instead of using the computed ALLOC_CORES_PER_NODE. This can cause the subsequent allocation check (lines 62-65) to fail if PBS_NCPUS differs from the hardcoded default.
🐛 Proposed fix
if [[ "${rpn}" == "core" || "${rpn}" == "cores" ]]; then
- ranks_per_node="${CORES_PER_NODE:-102}"
+ ranks_per_node="${ALLOC_CORES_PER_NODE}"
else🤖 Prompt for AI Agents
In `@benchmark/scripts/launch_aurora_naive_shutdown.sh` around lines 57 - 61, The
rank-per-node branch incorrectly falls back to a hardcoded default
(CORES_PER_NODE:-102) when rpn=="core"; update the ranks_per_node assignment so
that when rpn is "core" or "cores" it uses the computed ALLOC_CORES_PER_NODE (if
set) instead of the hardcoded default, e.g. prefer ALLOC_CORES_PER_NODE and only
fall back to CORES_PER_NODE or a literal default if ALLOC_CORES_PER_NODE is
unset; change the block that sets ranks_per_node (the rpn check) to reference
ALLOC_CORES_PER_NODE so the later allocation check against PBS_NCPUS uses the
correct value.
| echo "Running ${SYSTEM} nodes=${nodes} ranks_per_node=${ranks_per_node}" | ||
| launcher_base="$(basename "${LAUNCHER}")" | ||
| if [[ "${launcher_base}" == mpiexec || "${launcher_base}" == mpirun ]]; then | ||
| "${LAUNCHER}" "${LAUNCHER_ARGS[@]}" -n "${total_ranks}" --ppn "${ranks_per_node}" \ |
There was a problem hiding this comment.
Empty LAUNCHER_ARGS array may fail with set -u.
Unlike the local script which uses the safe ${LAUNCHER_ARGS[@]+"${LAUNCHER_ARGS[@]}"} pattern, this script uses "${LAUNCHER_ARGS[@]}" directly, which will fail when the array is empty under set -u.
🐛 Proposed fix
- "${LAUNCHER}" "${LAUNCHER_ARGS[@]}" -n "${total_ranks}" --ppn "${ranks_per_node}" \
+ "${LAUNCHER}" ${LAUNCHER_ARGS[@]+"${LAUNCHER_ARGS[@]}"} -n "${total_ranks}" --ppn "${ranks_per_node}" \
...
- "${LAUNCHER}" "${LAUNCHER_ARGS[@]}" -N "${nodes}" -n "${total_ranks}" \
+ "${LAUNCHER}" ${LAUNCHER_ARGS[@]+"${LAUNCHER_ARGS[@]}"} -N "${nodes}" -n "${total_ranks}" \Also applies to: 76-76
🤖 Prompt for AI Agents
In `@benchmark/scripts/launch_aurora_naive_shutdown.sh` at line 70, The script
calls "${LAUNCHER}" with "${LAUNCHER_ARGS[@]}" which will cause an error under
set -u when LAUNCHER_ARGS is an empty array; update both invocations that use
LAUNCHER_ARGS to guard the expansion using the safe pattern
"${LAUNCHER_ARGS[@]+"${LAUNCHER_ARGS[@]}"}" (or an equivalent that yields
nothing when the array is empty) so the command runs whether LAUNCHER_ARGS is
empty or not; ensure you update every place where LAUNCHER_ARGS is passed to
"${LAUNCHER}" (including the second occurrence mentioned).
| echo "Running ${SYSTEM} nodes=${nodes} ranks_per_node=${ranks_per_node} dist=${dist} mode=${mode} expected_us=${expected_us}" | ||
| launcher_base="$(basename "${LAUNCHER}")" | ||
| if [[ "${launcher_base}" == mpiexec || "${launcher_base}" == mpirun ]]; then | ||
| "${LAUNCHER}" "${LAUNCHER_ARGS[@]}" -n "${total_ranks}" --ppn "${ranks_per_node}" \ |
There was a problem hiding this comment.
Empty LAUNCHER_ARGS array may fail with set -u.
Use the safe expansion pattern ${LAUNCHER_ARGS[@]+"${LAUNCHER_ARGS[@]}"} to handle empty arrays under set -u.
Also applies to: 91-91
🤖 Prompt for AI Agents
In `@benchmark/scripts/launch_aurora_strong_scaling.sh` at line 81, The launcher
invocation uses the LAUNCHER_ARGS array directly which will cause failures under
set -u when the array is empty; update both occurrences where LAUNCHER is
invoked (the lines using "${LAUNCHER}" "${LAUNCHER_ARGS[@]}" -n "${total_ranks}"
--ppn "${ranks_per_node}") to use the safe expansion form that guards against
empty arrays (i.e., expand LAUNCHER_ARGS only when non-empty) so the script
works with set -u enabled while preserving existing arguments.
| echo "Running ${SYSTEM} nodes=${nodes} ranks_per_node=${ranks_per_node}" | ||
| launcher_base="$(basename "${LAUNCHER}")" | ||
| if [[ "${launcher_base}" == mpiexec || "${launcher_base}" == mpirun ]]; then | ||
| "${LAUNCHER}" "${LAUNCHER_ARGS[@]}" -n "${total_ranks}" --ppn "${ranks_per_node}" \ |
There was a problem hiding this comment.
Empty LAUNCHER_ARGS array may fail with set -u.
Use the safe expansion pattern ${LAUNCHER_ARGS[@]+"${LAUNCHER_ARGS[@]}"} to handle empty arrays under set -u.
🐛 Proposed fix
- "${LAUNCHER}" "${LAUNCHER_ARGS[@]}" -n "${total_ranks}" --ppn "${ranks_per_node}" \
+ "${LAUNCHER}" ${LAUNCHER_ARGS[@]+"${LAUNCHER_ARGS[@]}"} -n "${total_ranks}" --ppn "${ranks_per_node}" \
...
- "${LAUNCHER}" "${LAUNCHER_ARGS[@]}" -N "${nodes}" -n "${total_ranks}" \
+ "${LAUNCHER}" ${LAUNCHER_ARGS[@]+"${LAUNCHER_ARGS[@]}"} -N "${nodes}" -n "${total_ranks}" \Also applies to: 56-56
🤖 Prompt for AI Agents
In `@benchmark/scripts/launch_frontier_naive_shutdown.sh` at line 50, The launcher
invocation can fail under set -u when LAUNCHER_ARGS is an empty array; update
the two usages of the array expansion in the command that calls "${LAUNCHER}" to
use the safe expansion pattern ${LAUNCHER_ARGS[@]+"${LAUNCHER_ARGS[@]}"} so an
empty LAUNCHER_ARGS won't trigger an unbound variable error—replace the current
"${LAUNCHER_ARGS[@]}" occurrences with the safe pattern for both launcher
invocations.
| SCRIPT="${ROOT_DIR}/benchmark/scripts/launch_aurora_naive_shutdown.sh" | ||
|
|
||
| IFS=' ' read -r -a NODE_LIST <<< "${NODE_LIST:-1 2 4 8 16 32 64 128 256 512}" | ||
| IFS=' ' read -r -a QSUB_ARGS <<< "${QSUB_ARGS:-}" |
There was a problem hiding this comment.
Empty array expansion may fail with set -u.
When QSUB_ARGS is empty (the default), "${QSUB_ARGS[@]}" on line 27 will trigger an unbound variable error under set -u. Use the ${arr[@]+"${arr[@]}"} idiom for safe expansion.
🐛 Proposed fix
-IFS=' ' read -r -a QSUB_ARGS <<< "${QSUB_ARGS:-}"
+QSUB_ARGS_RAW="${QSUB_ARGS:-}"
+QSUB_ARGS=()
+if [[ -n "${QSUB_ARGS_RAW}" ]]; then
+ IFS=' ' read -r -a QSUB_ARGS <<< "${QSUB_ARGS_RAW}"
+fi
...
- submit_args=("${QSUB_ARGS[@]}")
+ submit_args=(${QSUB_ARGS[@]+"${QSUB_ARGS[@]}"})Also applies to: 27-27
🤖 Prompt for AI Agents
In `@benchmark/scripts/submit_aurora_naive_shutdown.sh` at line 15, The expansion
of the QSUB_ARGS array can fail under set -u when QSUB_ARGS is empty; update any
uses of "${QSUB_ARGS[@]}" (the QSUB_ARGS array expansion) to the safe idiom
${QSUB_ARGS[@]+"${QSUB_ARGS[@]}"} so the expansion is skipped when the array is
unset/empty, or alternatively ensure QSUB_ARGS is initialized to an empty array
before use (the variable and the expansion sites reference QSUB_ARGS and its
"${QSUB_ARGS[@]}" expansion).
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@benchmark/scripts/submit_aurora_strong_scaling.sh`:
- Line 41: The OUTPUT_DIR line currently uses undefined PBS_JOBID_SHORT causing
fallback to "manual"; update it to derive a short job id from PBS_JOBID using
shell parameter expansion and fall back to "manual" if PBS_JOBID is unset:
replace the use of PBS_JOBID_SHORT in the OUTPUT_DIR assignment with an
expression that extracts the portion of PBS_JOBID before the first dot (e.g.,
parameter expansion on PBS_JOBID) and use a default value of "manual" when
PBS_JOBID is empty so OUTPUT_DIR uses ${OUTPUT_BASE}, ${SYSTEM}, ${nodes},
${job_name} and the computed short ID reliably.
🧹 Nitpick comments (2)
benchmark/scripts/submit_aurora_naive_shutdown.sh (1)
10-23: Validate launcher script before submitting jobs.Fail fast if the launcher script is missing or not executable; otherwise
qsubwill enqueue jobs that are guaranteed to fail at runtime.♻️ Proposed fix
SCRIPT="${ROOT_DIR}/benchmark/scripts/launch_aurora_naive_shutdown.sh" +if [[ ! -x "${SCRIPT}" ]]; then + echo "ERROR: launcher script not found or not executable: ${SCRIPT}" >&2 + exit 1 +fibenchmark/scripts/submit_aurora_strong_scaling.sh (1)
23-23: Consider renaming to avoid confusion.
OUTPUT_BASEis assigned from${OUTPUT_DIR:-...}, but then a newOUTPUT_DIRis exported inside the heredoc (line 41). Users settingOUTPUT_DIRexternally might expect it to be used directly, not as a base path.Consider using a distinct external variable name like
OUTPUT_BASE_DIRfor the input, or documenting this behavior in the script header.
| export LAUNCHER="${LAUNCHER}" | ||
| export LAUNCHER_ARGS="${LAUNCHER_ARGS}" | ||
| export CORES_PER_NODE="${NCPUS_PER_NODE}" | ||
| export OUTPUT_DIR="${OUTPUT_BASE}/${SYSTEM}/${nodes}-${job_name}-\${PBS_JOBID_SHORT:-manual}" |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
PBS_JOBID_SHORT environment variable ALCF Aurora PBS Pro
💡 Result:
Short answer: There is no documented PBS_JOBID_SHORT provided by PBS Pro on ALCF Aurora. Use PBS_JOBID (and other standard PBS_* vars) and, if you want a “short” job id, create it yourself by stripping the server suffix.
Example (in your job script):
export PBS_JOBID_SHORT=${PBS_JOBID%%.*}
Sources: ALCF Aurora job docs (shows use of PBS_JOBID, PBS_O_WORKDIR, PBS_NODEFILE). [1] PBS/PBS Pro environment-variable reference (lists standard PBS_* variables). [2]
References
[1] ALCF Aurora — Running Jobs / Job scripts and environment.
[2] PBS Environment Variables / PBS Pro documentation.
Use PBS_JOBID with parameter expansion instead of undefined PBS_JOBID_SHORT.
PBS_JOBID_SHORT is not provided by PBS Pro on ALCF Aurora. According to ALCF Aurora documentation, only standard PBS environment variables are available (e.g., PBS_JOBID, PBS_O_WORKDIR, PBS_NODEFILE). Using an undefined variable causes the fallback to always produce manual, creating output collisions across multiple jobs.
Extract the short job ID from the standard PBS_JOBID variable using parameter expansion:
Fix
-export OUTPUT_DIR="${OUTPUT_BASE}/${SYSTEM}/${nodes}-${job_name}-\${PBS_JOBID_SHORT:-manual}"
+export OUTPUT_DIR="${OUTPUT_BASE}/${SYSTEM}/${nodes}-${job_name}-\${PBS_JOBID%%.*}"🤖 Prompt for AI Agents
In `@benchmark/scripts/submit_aurora_strong_scaling.sh` at line 41, The OUTPUT_DIR
line currently uses undefined PBS_JOBID_SHORT causing fallback to "manual";
update it to derive a short job id from PBS_JOBID using shell parameter
expansion and fall back to "manual" if PBS_JOBID is unset: replace the use of
PBS_JOBID_SHORT in the OUTPUT_DIR assignment with an expression that extracts
the portion of PBS_JOBID before the first dot (e.g., parameter expansion on
PBS_JOBID) and use a default value of "manual" when PBS_JOBID is empty so
OUTPUT_DIR uses ${OUTPUT_BASE}, ${SYSTEM}, ${nodes}, ${job_name} and the
computed short ID reliably.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@benchmark/naive_shutdown_time.cpp`:
- Around line 136-171: The MPI entrypoint is not guarded against exceptions from
run_benchmark or distributor construction; wrap the block that uses MPI (the
code that obtains MPI_Comm rank/size and calls run_benchmark) in a try-catch
that catches std::exception (and a catch-all), and in the catch call
MPI_Abort(MPI_COMM_WORLD, 1) to ensure all ranks terminate together, then
rethrow or exit; reference the existing MPI_Comm usage and the
run_benchmark(...) call so you wrap that region and invoke MPI_Abort on error.
♻️ Duplicate comments (1)
benchmark/strong_scaling_distribution_rate.cpp (1)
225-279: Guard MPI entrypoint against uncaught exceptions.If
parse_distributor,parse_duration_mode, orrun_benchmarkthrows afterMPI_Init, ranks may hang withoutMPI_Finalize. This was flagged before and still appears unhandled here.🛠️ Suggested guard (wrap main body)
- BenchmarkOptions opts; - opts.expected_us = args["expected_us"].as<uint64_t>(); - opts.duration_s = args["duration_s"].as<double>(); - opts.distributor = parse_distributor(args["distribution"].as<std::string>()); - opts.duration_mode = parse_duration_mode(args["mode"].as<std::string>()); - opts.nodes = args["nodes"].as<uint64_t>(); - opts.system = args["system"].as<std::string>(); - opts.output_path = args["output"].as<std::string>(); - - if (opts.expected_us == 0) { - if (world_rank == 0) { - std::cerr << "Expected task duration must be >= 1 microsecond." << std::endl; - } - MPI_Finalize(); - return 1; - } - - { + try { + BenchmarkOptions opts; + opts.expected_us = args["expected_us"].as<uint64_t>(); + opts.duration_s = args["duration_s"].as<double>(); + opts.distributor = parse_distributor(args["distribution"].as<std::string>()); + opts.duration_mode = parse_duration_mode(args["mode"].as<std::string>()); + opts.nodes = args["nodes"].as<uint64_t>(); + opts.system = args["system"].as<std::string>(); + opts.output_path = args["output"].as<std::string>(); + + if (opts.expected_us == 0) { + if (world_rank == 0) { + std::cerr << "Expected task duration must be >= 1 microsecond." << std::endl; + } + MPI_Finalize(); + return 1; + } + MPI_Comm comm = MPI_COMM_WORLD; int rank = 0; int size = 0; @@ if (rank == 0) { @@ if (!opts.output_path.empty()) { @@ } } - } + } catch (const std::exception& e) { + if (world_rank == 0) { + std::cerr << "Benchmark failed: " << e.what() << std::endl; + } + MPI_Abort(MPI_COMM_WORLD, 1); + }
| BenchmarkOptions opts; | ||
| opts.nodes = args["nodes"].as<uint64_t>(); | ||
| opts.system = args["system"].as<std::string>(); | ||
| opts.output_path = args["output"].as<std::string>(); | ||
|
|
||
| { | ||
| MPI_Comm comm = MPI_COMM_WORLD; | ||
| int rank = 0; | ||
| int size = 0; | ||
| MPI_Comm_rank(comm, &rank); | ||
| MPI_Comm_size(comm, &size); | ||
| if (opts.nodes == 0) { | ||
| opts.nodes = static_cast<uint64_t>(size); | ||
| } | ||
|
|
||
| BenchmarkResult result = run_benchmark(opts, comm); | ||
|
|
||
| if (rank == 0) { | ||
| std::cout << "RESULT" | ||
| << " nodes=" << opts.nodes << " world_size=" << result.world_size | ||
| << " workers=" << result.workers | ||
| << " time_per_shutdown_us=" << result.time_per_shutdown_us | ||
| << " iterations=" << result.iterations << std::endl; | ||
| if (!opts.output_path.empty()) { | ||
| std::ifstream check(opts.output_path); | ||
| const bool needs_header = | ||
| !check.good() || check.peek() == std::ifstream::traits_type::eof(); | ||
| check.close(); | ||
| std::ofstream out(opts.output_path, std::ios::app); | ||
| if (needs_header) { | ||
| write_csv_header(out); | ||
| } | ||
| write_csv_row(out, opts, result); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Guard MPI entrypoint against uncaught exceptions.
If run_benchmark (or distributor construction) throws after MPI_Init, ranks can hang without a coordinated abort. Consider wrapping the post-parse main body and calling MPI_Abort on failure.
🛠️ Suggested guard
- BenchmarkOptions opts;
- opts.nodes = args["nodes"].as<uint64_t>();
- opts.system = args["system"].as<std::string>();
- opts.output_path = args["output"].as<std::string>();
-
- {
+ try {
+ BenchmarkOptions opts;
+ opts.nodes = args["nodes"].as<uint64_t>();
+ opts.system = args["system"].as<std::string>();
+ opts.output_path = args["output"].as<std::string>();
+
MPI_Comm comm = MPI_COMM_WORLD;
int rank = 0;
int size = 0;
@@
if (rank == 0) {
@@
if (!opts.output_path.empty()) {
@@
}
}
- }
+ } catch (const std::exception& e) {
+ if (world_rank == 0) {
+ std::cerr << "Benchmark failed: " << e.what() << std::endl;
+ }
+ MPI_Abort(MPI_COMM_WORLD, 1);
+ }🤖 Prompt for AI Agents
In `@benchmark/naive_shutdown_time.cpp` around lines 136 - 171, The MPI entrypoint
is not guarded against exceptions from run_benchmark or distributor
construction; wrap the block that uses MPI (the code that obtains MPI_Comm
rank/size and calls run_benchmark) in a try-catch that catches std::exception
(and a catch-all), and in the catch call MPI_Abort(MPI_COMM_WORLD, 1) to ensure
all ranks terminate together, then rethrow or exit; reference the existing
MPI_Comm usage and the run_benchmark(...) call so you wrap that region and
invoke MPI_Abort on error.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@benchmark/scripts/submit_frontier_strong_scaling.sh`:
- Around line 33-38: The --wrap argument builds a shell command using variables
ROOT_DIR and SCRIPT unquoted which will break with spaces; update the sbatch
invocation that constructs --wrap (the line using ROOT_DIR, OUTPUT_BASE, SYSTEM,
nodes, job_name, SLURM_JOB_ID and SCRIPT) to quote those path variables (e.g.,
wrap ${ROOT_DIR} and ${SCRIPT} in quotes and ensure the embedded OUTPUT_DIR
assignment is quoted) and escape quotes correctly inside the --wrap string so
the generated command preserves spaces in paths.
- Around line 14-19: The script currently clobbers any caller-provided
SBATCH_ARGS by unconditionally setting SBATCH_ARGS=() before parsing; remove
that reset so the existing environment value is preserved and then parse it into
an array. Concretely, delete the line "SBATCH_ARGS=()" and keep the existing
"IFS=' ' read -r -a SBATCH_ARGS <<< "${SBATCH_ARGS:-}"", and ensure NODE_LIST
default logic remains as-is (the NODE_LIST parsing lines and the NODE_LIST
default block should stay unchanged).
🧹 Nitpick comments (3)
benchmark/scripts/plot_shutdown_time.py (3)
11-18:MARKER_SHAPESconstant is defined but never used.The
scienceplotsimport is correctly retained (it registers styles with matplotlib on import forplt.style.context(['science', 'ieee'])), butMARKER_SHAPESon line 18 is dead code.♻️ Suggested fix
-# Hollow marker shapes for different series -MARKER_SHAPES = ['o', 's', '^', 'v', 'D', 'p', '*', 'h', 'X', '<', '>', 'd']
45-54: Unused fields extracted:world_sizeandworkers.These fields are parsed and stored in each row dict but are never used in
group_rowsorplot_system. If they're intended for future use, consider adding a comment; otherwise, remove them to reduce noise.♻️ Suggested fix (if not needed)
def parse_rows(paths): rows = [] for path in paths: file_mtime = os.path.getmtime(path) with open(path, "r", encoding="utf-8") as handle: reader = csv.DictReader(handle) for row in reader: nodes = int(float(row.get("nodes", 0))) - world_size = int(float(row.get("world_size", 0))) - workers = int(float(row.get("workers", 0))) time_per_shutdown_us = float(row.get("time_per_shutdown_us", 0.0)) rows.append( { "system": row.get("system", "").strip() or "unknown", "nodes": nodes, - "world_size": world_size, - "workers": workers, "time_per_shutdown_us": time_per_shutdown_us, "file_mtime": file_mtime, } ) return rows
145-147: Minor redundancy: systems can be derived fromgrouped.Since
group_rowsalready keys by system, you can simplify by usinggrouped.keys()directly instead of re-extracting fromrows.♻️ Suggested simplification
- systems = sorted(set(row["system"] for row in rows)) - for system in systems: + for system in sorted(grouped.keys()): plot_system(system, grouped, args.output_dir, args.format)
| SBATCH_ARGS=() | ||
| if [[ -z "${NODE_LIST:-}" ]]; then | ||
| NODE_LIST="1 2 4 8 16 32 64 128 256 512" | ||
| fi | ||
| IFS=' ' read -r -a NODE_LIST <<< "${NODE_LIST}" | ||
| IFS=' ' read -r -a SBATCH_ARGS <<< "${SBATCH_ARGS:-}" |
There was a problem hiding this comment.
Preserve SBATCH_ARGS from the environment.
Line 14 resets SBATCH_ARGS before parsing, so any SBATCH_ARGS provided by the caller are discarded. That makes the override ineffective.
🛠️ Proposed fix
-SBATCH_ARGS=()
if [[ -z "${NODE_LIST:-}" ]]; then
NODE_LIST="1 2 4 8 16 32 64 128 256 512"
fi
IFS=' ' read -r -a NODE_LIST <<< "${NODE_LIST}"
-IFS=' ' read -r -a SBATCH_ARGS <<< "${SBATCH_ARGS:-}"
+SBATCH_ARGS_RAW="${SBATCH_ARGS:-}"
+IFS=' ' read -r -a SBATCH_ARGS <<< "${SBATCH_ARGS_RAW}"🤖 Prompt for AI Agents
In `@benchmark/scripts/submit_frontier_strong_scaling.sh` around lines 14 - 19,
The script currently clobbers any caller-provided SBATCH_ARGS by unconditionally
setting SBATCH_ARGS=() before parsing; remove that reset so the existing
environment value is preserved and then parse it into an array. Concretely,
delete the line "SBATCH_ARGS=()" and keep the existing "IFS=' ' read -r -a
SBATCH_ARGS <<< "${SBATCH_ARGS:-}"", and ensure NODE_LIST default logic remains
as-is (the NODE_LIST parsing lines and the NODE_LIST default block should stay
unchanged).
| sbatch "${submit_args[@]}" \ | ||
| --job-name="${job_name}" \ | ||
| --nodes="${nodes}" \ | ||
| --time="${WALLTIME}" \ | ||
| --export=ALL,NODE_LIST="${nodes}",LAUNCHER="${LAUNCHER}",LAUNCHER_ARGS="${LAUNCHER_ARGS}" \ | ||
| --wrap="cd ${ROOT_DIR} && OUTPUT_DIR=\"${OUTPUT_BASE}/${SYSTEM}/${nodes}-${job_name}-\${SLURM_JOB_ID:-manual}\" ${SCRIPT}" |
There was a problem hiding this comment.
Quote paths inside the wrapped command.
Line 38 leaves ${ROOT_DIR} and ${SCRIPT} unquoted inside --wrap, which breaks if the repo path contains spaces.
🛠️ Proposed fix
- --wrap="cd ${ROOT_DIR} && OUTPUT_DIR=\"${OUTPUT_BASE}/${SYSTEM}/${nodes}-${job_name}-\${SLURM_JOB_ID:-manual}\" ${SCRIPT}"
+ --wrap="cd \"${ROOT_DIR}\" && OUTPUT_DIR=\"${OUTPUT_BASE}/${SYSTEM}/${nodes}-${job_name}-\${SLURM_JOB_ID:-manual}\" \"${SCRIPT}\""📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| sbatch "${submit_args[@]}" \ | |
| --job-name="${job_name}" \ | |
| --nodes="${nodes}" \ | |
| --time="${WALLTIME}" \ | |
| --export=ALL,NODE_LIST="${nodes}",LAUNCHER="${LAUNCHER}",LAUNCHER_ARGS="${LAUNCHER_ARGS}" \ | |
| --wrap="cd ${ROOT_DIR} && OUTPUT_DIR=\"${OUTPUT_BASE}/${SYSTEM}/${nodes}-${job_name}-\${SLURM_JOB_ID:-manual}\" ${SCRIPT}" | |
| sbatch "${submit_args[@]}" \ | |
| --job-name="${job_name}" \ | |
| --nodes="${nodes}" \ | |
| --time="${WALLTIME}" \ | |
| --export=ALL,NODE_LIST="${nodes}",LAUNCHER="${LAUNCHER}",LAUNCHER_ARGS="${LAUNCHER_ARGS}" \ | |
| --wrap="cd \"${ROOT_DIR}\" && OUTPUT_DIR=\"${OUTPUT_BASE}/${SYSTEM}/${nodes}-${job_name}-\${SLURM_JOB_ID:-manual}\" \"${SCRIPT}\"" |
🤖 Prompt for AI Agents
In `@benchmark/scripts/submit_frontier_strong_scaling.sh` around lines 33 - 38,
The --wrap argument builds a shell command using variables ROOT_DIR and SCRIPT
unquoted which will break with spaces; update the sbatch invocation that
constructs --wrap (the line using ROOT_DIR, OUTPUT_BASE, SYSTEM, nodes,
job_name, SLURM_JOB_ID and SCRIPT) to quote those path variables (e.g., wrap
${ROOT_DIR} and ${SCRIPT} in quotes and ensure the embedded OUTPUT_DIR
assignment is quoted) and escape quotes correctly inside the --wrap string so
the generated command preserves spaces in paths.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@benchmark/scripts/plot_shutdown_time.py`:
- Around line 51-57: When parsing CSV rows in the plotting script, skip any rows
with non‑positive node counts so the log‑scale x axis won't receive
zero/negative values: after computing nodes = int(float(row.get("nodes", 0)))
add a guard that continues the loop if nodes <= 0 (similar to the existing
time_per_shutdown_us check) so only rows with nodes > 0 are used for plotting;
ensure this check is placed before any downstream use of nodes in plotting logic
(e.g., where world_size, workers, and time_per_shutdown_us are used).
In `@benchmark/scripts/plot_strong_scaling.py`:
- Around line 60-80: The parsed row currently sets expected_ns = 0 when missing,
which later causes a division-by-zero in plot_distributor (ideal line uses 1e9 /
expected_ns); modify the parsing logic in plot_strong_scaling.py (the block that
computes expected_ns and appends to rows) to skip any row where expected_ns <= 0
(i.e., if expected_ns_raw/expected_us_raw are absent or computed expected_ns <=
0, do not append that row), or alternatively set a sentinel and ensure
plot_distributor checks expected_ns > 0 before computing 1e9/expected_ns; update
the code paths that use rows (e.g., plot_distributor) accordingly so only rows
with valid expected_ns are plotted.
♻️ Duplicate comments (2)
include/dynampi/impl/hierarchical_distributor.hpp (2)
702-709: Add validation forrequest_countto prevent pathological allocations.A malformed or negative
request_countreceived via MPI could cause undefined behavior when later used inallocate_task_to_child()fortasks.reserve(). Add a guard to ensure the count is valid.🛠️ Proposed fix
void receive_request_batch_from(MPI_Status status, MPICommunicator& source_comm, CommLayer layer) { int request_count; source_comm.recv(request_count, status.MPI_SOURCE, Tag::REQUEST_BATCH); + DYNAMPI_ASSERT_GT(request_count, 0, "REQUEST_BATCH count must be > 0"); m_free_worker_indices.push(TaskRequest{.worker_rank = status.MPI_SOURCE, .source_layer = layer, .num_tasks_requested = request_count}); }
425-445: Handle empty batch case explicitly to avoid orphaning workers.When
actual_num_tasksis 0 (queue exhausted), an emptyTASK_BATCHis sent. The receiving worker may not handle this gracefully and could wait indefinitely. Consider re-adding the worker to the free list or sending a specific signal.🛠️ Proposed fix
if (request.num_tasks_requested.has_value()) { std::vector<TaskT> tasks; int num_tasks = request.num_tasks_requested.value(); const int actual_num_tasks = std::min<int>(num_tasks, static_cast<int>(m_unallocated_task_queue.size())); + if (actual_num_tasks == 0) { + // No tasks available, re-add worker to free list + m_free_worker_indices.push(request); + return; + } tasks.reserve(actual_num_tasks);
🧹 Nitpick comments (3)
include/dynampi/impl/hierarchical_distributor.hpp (2)
109-160: Consider extracting coordinator_per_node branch into a helper.The method has high cyclomatic complexity (10) due to handling both topology modes. While the caching mechanism is a good optimization, consider extracting the
coordinator_per_nodebranch into a separate private method likecompute_parent_target_node_topology()to improve readability.
725-750: Busy-polling loop may cause high CPU usage.The
while (!found)loop withstd::this_thread::yield()will consume CPU cycles while waiting for messages. For long-running distributed computations, this could impact system resources. Consider usingMPI_Waitanyor introducing a small sleep after several unsuccessful probes.♻️ Suggested improvement
while (!found) { + int probe_count = 0; if (m_local_comm) { auto opt_status = m_local_comm->iprobe(); if (opt_status.has_value()) { status = opt_status.value(); layer = CommLayer::Local; active_comm = &m_local_comm.value(); found = true; break; } } if (m_leader_comm) { auto opt_status = m_leader_comm->iprobe(); if (opt_status.has_value()) { status = opt_status.value(); layer = CommLayer::Leader; active_comm = &m_leader_comm.value(); found = true; break; } } + if (++probe_count % 100 == 0) { + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } else { std::this_thread::yield(); + } }test/mpi/test_distributers.cpp (1)
178-198: Consider adding individual size assertions for clarity.The combined assertion on line 196 verifies total correctness but doesn't pinpoint which batch failed if the test fails. Consider adding individual expectations.
♻️ Suggested improvement
config.max_seconds = 0.0; auto results = work_distributer.run_tasks(config); - EXPECT_EQ(results.size(), 0u); + EXPECT_EQ(results.size(), 0u) << "Expected 0 results with max_seconds=0.0"; auto remaining_results = work_distributer.run_tasks(); - EXPECT_EQ(results.size() + remaining_results.size(), 5u); + EXPECT_EQ(remaining_results.size(), 5u) << "Expected all 5 tasks in remaining results";
| nodes = int(float(row.get("nodes", 0))) | ||
| world_size = int(float(row.get("world_size", 0))) | ||
| workers = int(float(row.get("workers", 0))) | ||
| time_per_shutdown_us = float(row.get("time_per_shutdown_us", 0.0)) | ||
| # Skip rows with zero or invalid shutdown times | ||
| if time_per_shutdown_us <= 0.0: | ||
| continue |
There was a problem hiding this comment.
Guard against non‑positive node counts before log‑scale plotting.
The log‑scaled x‑axis can’t plot zero or negative nodes; those rows will trigger warnings or errors during plotting. Add a guard when parsing.
🧹 Proposed fix
- nodes = int(float(row.get("nodes", 0)))
- world_size = int(float(row.get("world_size", 0)))
- workers = int(float(row.get("workers", 0)))
- time_per_shutdown_us = float(row.get("time_per_shutdown_us", 0.0))
+ nodes = int(float(row.get("nodes", 0)))
+ world_size = int(float(row.get("world_size", 0)))
+ workers = int(float(row.get("workers", 0)))
+ time_per_shutdown_us = float(row.get("time_per_shutdown_us", 0.0))
+ if nodes <= 0:
+ continue📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| nodes = int(float(row.get("nodes", 0))) | |
| world_size = int(float(row.get("world_size", 0))) | |
| workers = int(float(row.get("workers", 0))) | |
| time_per_shutdown_us = float(row.get("time_per_shutdown_us", 0.0)) | |
| # Skip rows with zero or invalid shutdown times | |
| if time_per_shutdown_us <= 0.0: | |
| continue | |
| nodes = int(float(row.get("nodes", 0))) | |
| world_size = int(float(row.get("world_size", 0))) | |
| workers = int(float(row.get("workers", 0))) | |
| time_per_shutdown_us = float(row.get("time_per_shutdown_us", 0.0)) | |
| if nodes <= 0: | |
| continue | |
| # Skip rows with zero or invalid shutdown times | |
| if time_per_shutdown_us <= 0.0: | |
| continue |
🤖 Prompt for AI Agents
In `@benchmark/scripts/plot_shutdown_time.py` around lines 51 - 57, When parsing
CSV rows in the plotting script, skip any rows with non‑positive node counts so
the log‑scale x axis won't receive zero/negative values: after computing nodes =
int(float(row.get("nodes", 0))) add a guard that continues the loop if nodes <=
0 (similar to the existing time_per_shutdown_us check) so only rows with nodes >
0 are used for plotting; ensure this check is placed before any downstream use
of nodes in plotting logic (e.g., where world_size, workers, and
time_per_shutdown_us are used).
| expected_ns_raw = row.get("expected_ns", "").strip() | ||
| expected_us_raw = row.get("expected_us", "").strip() | ||
| if expected_ns_raw: | ||
| expected_ns = int(float(expected_ns_raw)) | ||
| elif expected_us_raw: | ||
| expected_ns = int(float(expected_us_raw) * 1000) | ||
| else: | ||
| expected_ns = 0 | ||
| nodes = int(float(row.get("nodes", 0))) | ||
| world_size = int(float(row.get("world_size", 0))) | ||
| ranks_per_node = int(round(world_size / nodes)) if nodes else 0 | ||
| rows.append( | ||
| { | ||
| "system": row.get("system", "").strip() or "unknown", | ||
| "distributor": row.get("distributor", "").strip(), | ||
| "mode": row.get("mode", "").strip(), | ||
| "expected_ns": expected_ns, | ||
| "nodes": nodes, | ||
| "ranks_per_node": ranks_per_node, | ||
| "throughput": float(row.get("throughput_tasks_per_s", 0.0)), | ||
| "file_mtime": file_mtime, |
There was a problem hiding this comment.
Avoid division‑by‑zero when expected_ns is missing.
expected_ns falls back to 0, but plot_distributor computes ideal lines using 1e9 / expected_ns, which will crash. Consider skipping rows with expected_ns <= 0 (or guard before ideal‑line computation).
🛠️ Proposed fix (skip invalid rows)
if expected_ns_raw:
expected_ns = int(float(expected_ns_raw))
elif expected_us_raw:
expected_ns = int(float(expected_us_raw) * 1000)
else:
expected_ns = 0
+ if expected_ns <= 0:
+ continue🧰 Tools
🪛 Ruff (0.14.13)
70-70: Value being cast to int is already an integer
Remove unnecessary int call
(RUF046)
🤖 Prompt for AI Agents
In `@benchmark/scripts/plot_strong_scaling.py` around lines 60 - 80, The parsed
row currently sets expected_ns = 0 when missing, which later causes a
division-by-zero in plot_distributor (ideal line uses 1e9 / expected_ns); modify
the parsing logic in plot_strong_scaling.py (the block that computes expected_ns
and appends to rows) to skip any row where expected_ns <= 0 (i.e., if
expected_ns_raw/expected_us_raw are absent or computed expected_ns <= 0, do not
append that row), or alternatively set a sentinel and ensure plot_distributor
checks expected_ns > 0 before computing 1e9/expected_ns; update the code paths
that use rows (e.g., plot_distributor) accordingly so only rows with valid
expected_ns are plotted.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@include/dynampi/impl/hierarchical_distributor.hpp`:
- Around line 726-737: The received request_count in receive_request_batch_from
(obtained via m_communicator.recv with Tag::REQUEST_BATCH) must be validated
before creating a TaskRequest and pushing into m_free_worker_indices; add a
guard that checks if request_count is <= 0 (or above a reasonable max if
applicable), log or warn about the bad value (using the existing logging
mechanism), and skip pushing the TaskRequest (or normalize to a safe minimum) to
avoid downstream tasks.reserve() pathological allocations; ensure the check is
applied immediately after recv and that TaskRequest.worker_rank, source_layer,
and num_tasks_requested are only used when request_count is valid.
♻️ Duplicate comments (1)
include/dynampi/impl/hierarchical_distributor.hpp (1)
430-450: Handle empty batch case to avoid wasteful round-trip.When
actual_num_tasksis 0 (queue exhausted), the code still sends an emptyTASK_BATCHto the worker. The worker will process zero tasks and return an empty result batch, wasting a round-trip. Consider re-queuing the worker request instead.🛠️ Proposed fix
if (request.num_tasks_requested.has_value()) { std::vector<TaskT> tasks; int num_tasks = request.num_tasks_requested.value(); const int actual_num_tasks = std::min<int>(num_tasks, static_cast<int>(m_unallocated_task_queue.size())); + if (actual_num_tasks == 0) { + // No tasks available, re-add worker to free list + m_free_worker_indices.push(request); + return; + } tasks.reserve(actual_num_tasks);
🧹 Nitpick comments (2)
include/dynampi/impl/hierarchical_distributor.hpp (2)
337-359: Unused variable pattern suggests incomplete logic.
num_tasks_should_be_receivedis assigned at line 337 but only used at line 358-359 with(void)to suppress warnings before an assertion. This pattern suggests either:
- The assertion is intended for debugging only and the variable serves no other purpose
- The logic is incomplete and the variable should be used elsewhere
Consider removing the variable if only needed for the assertion, or clarify its purpose with a comment.
- size_t num_tasks_should_be_received = m_unallocated_task_queue.size(); - // Process tasks: Give to workers or execute ourselves if needed while (!m_unallocated_task_queue.empty()) { // ... processing } // Wait for results from children while (m_tasks_sent_to_child > m_results_received_from_child) { receive_from_anyone(); } if (m_done) break; - (void)num_tasks_should_be_received; - DYNAMPI_ASSERT_EQ(m_results.size(), num_tasks_should_be_received); + DYNAMPI_ASSERT_EQ(m_results.size(), m_tasks_received_from_parent - m_results_sent_to_parent);
754-766: Busy-wait loop may consume excessive CPU.The
while (!found)loop withstd::this_thread::yield()is a spin-wait pattern that can consume significant CPU cycles while waiting for messages. Consider using a blocking probe (like in the non-coordinator_per_node path at line 768) or adding a small sleep to reduce CPU usage when no messages are available.while (!found) { auto opt_status = m_communicator.iprobe(); if (opt_status.has_value()) { status = opt_status.value(); found = true; break; } - std::this_thread::yield(); + std::this_thread::sleep_for(std::chrono::microseconds(10)); }Alternatively, if multi-layer probing is the goal, consider using
MPI_ProbewithMPI_ANY_SOURCEwhich would block efficiently.
| void receive_request_batch_from(MPI_Status status, [[maybe_unused]] MPICommunicator& source_comm, | ||
| CommLayer layer) { | ||
| // With groups, always use global communicator and determine layer from source rank | ||
| int world_source = status.MPI_SOURCE; | ||
| if (m_config.coordinator_per_node) { | ||
| layer = determine_layer_from_world_rank(world_source); | ||
| } | ||
| int request_count; | ||
| m_communicator.recv(request_count, world_source, Tag::REQUEST_BATCH); | ||
| m_free_worker_indices.push(TaskRequest{ | ||
| .worker_rank = world_source, .source_layer = layer, .num_tasks_requested = request_count}); | ||
| } |
There was a problem hiding this comment.
Add validation for request_count to prevent pathological allocations.
The request_count received via MPI is used directly without validation. A negative or zero value (from protocol mismatch, corruption, or misconfiguration) would be passed to tasks.reserve() downstream, potentially causing undefined behavior or exceptions.
🛠️ Proposed fix
void receive_request_batch_from(MPI_Status status, [[maybe_unused]] MPICommunicator& source_comm,
CommLayer layer) {
// With groups, always use global communicator and determine layer from source rank
int world_source = status.MPI_SOURCE;
if (m_config.coordinator_per_node) {
layer = determine_layer_from_world_rank(world_source);
}
int request_count;
m_communicator.recv(request_count, world_source, Tag::REQUEST_BATCH);
+ DYNAMPI_ASSERT_GT(request_count, 0, "REQUEST_BATCH count must be > 0");
m_free_worker_indices.push(TaskRequest{
.worker_rank = world_source, .source_layer = layer, .num_tasks_requested = request_count});
}🤖 Prompt for AI Agents
In `@include/dynampi/impl/hierarchical_distributor.hpp` around lines 726 - 737,
The received request_count in receive_request_batch_from (obtained via
m_communicator.recv with Tag::REQUEST_BATCH) must be validated before creating a
TaskRequest and pushing into m_free_worker_indices; add a guard that checks if
request_count is <= 0 (or above a reasonable max if applicable), log or warn
about the bad value (using the existing logging mechanism), and skip pushing the
TaskRequest (or normalize to a safe minimum) to avoid downstream tasks.reserve()
pathological allocations; ensure the check is applied immediately after recv and
that TaskRequest.worker_rank, source_layer, and num_tasks_requested are only
used when request_count is valid.
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
test/mpi/test_distributers.cpp (1)
332-344:⚠️ Potential issue | 🔴 CriticalBug:
AutoRunWorkerstest doesn't sort results for unordered distributors — direct cause of CI failures.The pipeline failures show
DynamicDistribution/1.AutoRunWorkersand/2.AutoRunWorkers(bothHierarchicalDistributerTypeWrappervariants) returning results in non-deterministic order. UnlikeBasicFlow(Line 123) and other tests, this test doesn't check theorderedflag before comparing.🐛 Proposed fix
dist.insert_tasks({1, 2, 3, 4, 5}); auto results = dist.finish_remaining_tasks(); + if constexpr (!decltype(dist)::ordered) { + std::sort(results.begin(), results.end()); + } EXPECT_EQ(results, (std::vector<int>{1, 4, 9, 16, 25}));🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@test/mpi/test_distributers.cpp` around lines 332 - 344, The test DynamicDistribution::AutoRunWorkers assumes ordered results but fails for unordered distributors; update the test after calling dist.finish_remaining_tasks() to check the distributor's ordered flag (e.g., dist.ordered()) and, if false, sort the returned results (and the expected vector {1,4,9,16,25}) before calling EXPECT_EQ; locate this logic in the TYPED_TEST named AutoRunWorkers that uses make_distributor<int,int>, dist, and finish_remaining_tasks() and add the conditional sort so unordered distributors compare deterministically.
🧹 Nitpick comments (1)
include/dynampi/impl/naive_distributor.hpp (1)
350-376: O(n) front-erase on everycollect_available_resultscall.
m_pending_results.erase(begin, begin + n)andm_pending_results_valid.erase(...)are O(n) in the remaining vector size, applied on everyrun_tasksreturn. For large task sets with many incremental calls, this can add up. Astd::dequeor an advancing-index approach (no erase, just bumpm_front_result_idx) would be O(1).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@include/dynampi/impl/naive_distributor.hpp` around lines 350 - 376, collect_available_results currently does O(n) front-erase on m_pending_results and m_pending_results_valid which is expensive; change to an O(1) advancing-index or deque approach: either replace m_pending_results and m_pending_results_valid with std::deque and continue using pop_front/move into batch, or keep them as std::vector but stop erasing—compute an offset using m_front_result_idx (or introduce m_pending_offset) and move elements from m_pending_results.begin() + offset into batch, then increment the offset and m_front_result_idx and decrement m_known_contiguous_results; also add a rare compaction step to shrink/realign the vectors when offset grows large. Update any code that assumes 0-based vector indexing to use the offset/m_front_result_idx or the deque semantics (refer to collect_available_results, m_pending_results, m_pending_results_valid, m_front_result_idx, m_known_contiguous_results).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@include/dynampi/impl/naive_distributor.hpp`:
- Around line 314-315: The comment in process_incoming_message("blocking"
parameter) is misleading because MPICommunicator::iprobe() now exists; replace
the comment and either wire up non-blocking probing or explicitly state deferred
behavior. If wiring up, call MPICommunicator::iprobe() when blocking==false to
detect incoming messages (mirroring the blocking path logic), process any
available messages without blocking, and ensure you handle the iprobe return
(any flag/status or source/tag) consistently with the existing message-handling
code; otherwise update the comment to clearly state that non-blocking probing is
intentionally deferred and why.
- Around line 128-177: The Timer in run_tasks is never started so the
max_seconds check using timer.elapsed() is invalid; after constructing Timer
timer in naive_distributor::run_tasks, call timer.start() (or the equivalent
start method on Timer) before the loop so elapsed() returns real time for the
config.max_seconds check; ensure any existing logic that expects an
already-running timer still works with the new start() placement.
In `@test/mpi/test_distributers.cpp`:
- Around line 205-210: The inner declaration typename Distributer::RunConfig
config; shadows the outer auto config = get_distributer_config<TypeParam, Task,
Result>();—rename the inner variable (e.g., to run_config) and update all
subsequent references to use run_config to avoid confusion; locate the inner
declaration in the block where Distributer work_distributer(worker_task, config)
is created and adjust any uses of RunConfig members accordingly (symbols to
check: get_distributer_config, Distributer, RunConfig,
work_distributer.is_root_manager(), work_distributer.insert_tasks()).
---
Outside diff comments:
In `@test/mpi/test_distributers.cpp`:
- Around line 332-344: The test DynamicDistribution::AutoRunWorkers assumes
ordered results but fails for unordered distributors; update the test after
calling dist.finish_remaining_tasks() to check the distributor's ordered flag
(e.g., dist.ordered()) and, if false, sort the returned results (and the
expected vector {1,4,9,16,25}) before calling EXPECT_EQ; locate this logic in
the TYPED_TEST named AutoRunWorkers that uses make_distributor<int,int>, dist,
and finish_remaining_tasks() and add the conditional sort so unordered
distributors compare deterministically.
---
Duplicate comments:
In `@include/dynampi/impl/naive_distributor.hpp`:
- Around line 234-260: run_worker lacks exception handling around the call to
m_worker_function so thrown exceptions crash the worker and leave the manager
waiting; wrap the call to m_worker_function(std::move(message)) in a try/catch,
and in the catch send an error notification to the manager using the same
protocol as the hierarchical distributor (use m_communicator.send with
Tag::ERROR and m_config.manager_rank, include whatever error payload the manager
expects—e.g., an error code or message), then break/return from run_worker after
sending the error to avoid further processing. Ensure you reference run_worker,
m_worker_function, Tag::ERROR, m_communicator.send and m_config.manager_rank
when applying the fix.
In `@include/dynampi/mpi/mpi_communicator.hpp`:
- Around line 256-267: The gather implementation in
MPICommunicator::gather(const T&, std::vector<T>*, int) leaves the root's result
vector unresized, allowing MPI_Gather to write past the end; before calling
MPI_Gather, if root == rank() and result != nullptr resize result to hold
comm_size * mpi_type::count(data) elements (compute comm_size from m_comm via
MPI_Comm_size or a cached member) so the receive buffer is large enough, then
call MPI_Gather as before; keep the existing DYNAMPI_ASSERT_EQ check and
statistics increment.
---
Nitpick comments:
In `@include/dynampi/impl/naive_distributor.hpp`:
- Around line 350-376: collect_available_results currently does O(n) front-erase
on m_pending_results and m_pending_results_valid which is expensive; change to
an O(1) advancing-index or deque approach: either replace m_pending_results and
m_pending_results_valid with std::deque and continue using pop_front/move into
batch, or keep them as std::vector but stop erasing—compute an offset using
m_front_result_idx (or introduce m_pending_offset) and move elements from
m_pending_results.begin() + offset into batch, then increment the offset and
m_front_result_idx and decrement m_known_contiguous_results; also add a rare
compaction step to shrink/realign the vectors when offset grows large. Update
any code that assumes 0-based vector indexing to use the
offset/m_front_result_idx or the deque semantics (refer to
collect_available_results, m_pending_results, m_pending_results_valid,
m_front_result_idx, m_known_contiguous_results).
| [[nodiscard]] std::vector<ResultT> run_tasks(RunConfig config = RunConfig{}) { | ||
| assert(is_root_manager() && "Only the manager can distribute tasks"); | ||
|
|
||
| Timer timer; | ||
|
|
||
| // We loop until one of the exit conditions is met. | ||
| while (true) { | ||
| MPI_Status status = _communicator.probe(); | ||
| if (status.MPI_TAG == Tag::DONE) { | ||
| _communicator.recv_empty_message(_config.manager_rank, Tag::DONE); | ||
| // --- 1. Check Exit Conditions --- | ||
|
|
||
| // A. Have we collected enough contiguous results? | ||
| if (m_known_contiguous_results >= config.target_num_tasks) { | ||
| break; | ||
| } | ||
| int count; | ||
| DYNAMPI_MPI_CHECK(MPI_Get_count, (&status, task_type::value, &count)); | ||
| TaskT message; | ||
| task_type::resize(message, count); | ||
| _communicator.recv(message, _config.manager_rank, Tag::TASK); | ||
| _tasks_sent++; | ||
| ResultT result = _worker_function(message); | ||
| _communicator.send(result, _config.manager_rank, Tag::RESULT); | ||
| _results_received++; | ||
|
|
||
| // B. Time limit check | ||
| if (config.max_seconds && timer.elapsed().count() >= *config.max_seconds) { | ||
| break; | ||
| } | ||
|
|
||
| // C. Total exhaustion check | ||
| if (m_unallocated_task_queue.empty() && active_worker_count() == 0) { | ||
| break; | ||
| } | ||
|
|
||
| // --- 2. Action Logic (Send vs Receive) --- | ||
|
|
||
| // Priority: Keep workers busy | ||
| if (!m_unallocated_task_queue.empty() && !m_free_worker_ranks.empty()) { | ||
| send_next_task_to_worker(m_free_worker_ranks.top()); | ||
| m_free_worker_ranks.pop(); | ||
| } else { | ||
| // Single process mode fallback | ||
| if (num_workers() == 0 && !m_unallocated_task_queue.empty()) { | ||
| run_task_locally(); | ||
| } | ||
| // Standard MPI wait | ||
| else if (active_worker_count() > 0) { | ||
| process_incoming_message(true); // Blocking wait | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // --- 3. Return Logic --- | ||
| size_t limit = std::numeric_limits<size_t>::max(); | ||
| if (!config.allow_more_than_target_tasks) { | ||
| limit = config.target_num_tasks; | ||
| } | ||
|
|
||
| return collect_available_results(limit); | ||
| } |
There was a problem hiding this comment.
Bug: Timer is never started, so max_seconds exit condition is broken.
Timer timer; at Line 131 constructs a timer but never calls timer.start(). Based on the Timer API (from timer.hpp), elapsed() returns a zero duration when the timer hasn't been started, so timer.elapsed().count() will always be 0.0. This means the time-limit check on Line 143 will either always trigger (when max_seconds == 0.0) or never trigger (for any positive value), making the feature non-functional.
🐛 Proposed fix
[[nodiscard]] std::vector<ResultT> run_tasks(RunConfig config = RunConfig{}) {
assert(is_root_manager() && "Only the manager can distribute tasks");
Timer timer;
+ timer.start();
// We loop until one of the exit conditions is met.
while (true) {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@include/dynampi/impl/naive_distributor.hpp` around lines 128 - 177, The Timer
in run_tasks is never started so the max_seconds check using timer.elapsed() is
invalid; after constructing Timer timer in naive_distributor::run_tasks, call
timer.start() (or the equivalent start method on Timer) before the loop so
elapsed() returns real time for the config.max_seconds check; ensure any
existing logic that expects an already-running timer still works with the new
start() placement.
| void process_incoming_message(bool blocking) { | ||
| if (!blocking) return; // iprobe not available in wrapper |
There was a problem hiding this comment.
Misleading comment: iprobe is now available.
Line 315 says "iprobe not available in wrapper" but iprobe() was just added to MPICommunicator. If non-blocking probing is intentionally deferred, update the comment. Otherwise, this is ready to be wired up.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@include/dynampi/impl/naive_distributor.hpp` around lines 314 - 315, The
comment in process_incoming_message("blocking" parameter) is misleading because
MPICommunicator::iprobe() now exists; replace the comment and either wire up
non-blocking probing or explicitly state deferred behavior. If wiring up, call
MPICommunicator::iprobe() when blocking==false to detect incoming messages
(mirroring the blocking path logic), process any available messages without
blocking, and ensure you handle the iprobe return (any flag/status or
source/tag) consistently with the existing message-handling code; otherwise
update the comment to clearly state that non-blocking probing is intentionally
deferred and why.
Summary by CodeRabbit