Conversation
|
Caution Review failedThe pull request is closed. 📝 WalkthroughWalkthroughAdded two public Config options to NaiveMPIWorkDistributor (use_immediate_recv, max_result_size), replaced direct MPI_Probe/Recv with MPICommunicator probe/recv_any helpers, split receive flow into immediate vs probe-based paths with process_result_message, updated tests, and made Hierarchical distributor Config public. Changes
Sequence Diagram(s)sequenceDiagram
participant Master as MasterDistributor
participant Comm as MPICommunicator
participant Worker as Worker
Note left of Master: Receive loop (immediate or probe-based)
Master->>Comm: recv_any(buffer) or probe()
alt Immediate receive (use_immediate_recv = true)
Comm-->>Master: STATUS (tag=RESULT / REQUEST)
alt RESULT
Master->>Master: process_result_message(status, buffer)
Master->>Master: resize/store ResultT, update indices
Master->>Master: push worker_id (status.source) to free queue
else REQUEST
Master->>Master: prepare task for worker
Master->>Worker: send task (via Comm)
end
else Probe-based receive (use_immediate_recv = false)
Master->>Comm: probe()
Comm-->>Master: STATUS
alt STATUS.tag == RESULT
Master->>Comm: recv(buffer from status.source)
Master->>Master: process_result_message(status, buffer)
Master->>Master: push worker_id to free queue
else STATUS.tag == REQUEST
Master->>Comm: recv_empty<T>(status.source)
Master->>Master: assign/send task to worker
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Tip Issue Planner is now in beta. Read the docs and try it out! Share your feedback on Discord. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #7 +/- ##
===========================================
- Coverage 100.00% 99.48% -0.52%
===========================================
Files 13 13
Lines 568 584 +16
Branches 66 63 -3
===========================================
+ Hits 568 581 +13
- Misses 0 3 +3
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This pull request adds immediate receive mode support to the NaiveMPIWorkDistributor class. Instead of using the probe-then-receive pattern (which requires two MPI calls per message), immediate receive mode uses a single recv_any call with a predetermined buffer size, potentially improving performance by reducing MPI call overhead.
Changes:
- Added
use_immediate_recvandmax_result_sizeconfiguration options toNaiveMPIWorkDistributor - Implemented immediate receive logic with separate handling for fixed-size and variable-size result types
- Added helper methods to
MPICommunicator:probe(),recv_any(),recv_empty_message_any(), andadjust_recv_bytes_received() - Added comprehensive test coverage for immediate receive mode with various result types and configurations
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
| test/mpi/test_distributers.cpp | Added new test fixture and four test cases to verify immediate receive mode with different result types (primitives, vectors), priority queues, and statistics tracking |
| include/dynampi/mpi/mpi_communicator.hpp | Added MPI wrapper methods for probe, immediate receive, and statistics adjustment to support the new receive mode |
| include/dynampi/impl/naive_distributor.hpp | Implemented immediate receive mode in the worker message handling logic with separate code paths for fixed-size and resizable result types, including statistics correction logic |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| int max_result_size = 1024; // Maximum expected size for RESULT messages when using immediate | ||
| // recv. Must be large enough to hold the largest expected RESULT | ||
| // message. If a message exceeds this size, behavior is undefined. |
There was a problem hiding this comment.
The comment states "If a message exceeds this size, behavior is undefined" but the actual behavior when a RESULT message exceeds max_result_size would be a buffer overflow or truncation, which is a critical safety issue. Consider adding runtime validation to detect when messages exceed this size and either throw an error or log a warning. Additionally, the documentation should be clearer about what happens (e.g., "message will be truncated" or "will cause undefined behavior/crash").
| int max_result_size = 1024; // Maximum expected size for RESULT messages when using immediate | |
| // recv. Must be large enough to hold the largest expected RESULT | |
| // message. If a message exceeds this size, behavior is undefined. | |
| int max_result_size = 1024; // Maximum expected size (in bytes) for RESULT messages when using | |
| // immediate recv. Must be large enough to hold the largest possible | |
| // RESULT message produced by the worker function. This class does | |
| // not perform runtime validation of this limit; if a RESULT message | |
| // is larger than max_result_size, the underlying MPI calls may | |
| // truncate the message or cause memory corruption/crashes depending | |
| // on the MPI implementation and surrounding code. |
| int count; | ||
| DYNAMPI_MPI_CHECK(MPI_Get_count, (&status, result_type::value, &count)); | ||
| // Resize buffer to actual received count (may be less than max_result_size) | ||
| result_type::resize(buffer, count); |
There was a problem hiding this comment.
When receiving variable-size messages (resize_required types) in immediate receive mode, if the incoming RESULT message is larger than max_result_size, MPI will truncate the message to fit the buffer, potentially causing data loss or corruption. Consider adding validation after receiving to check if truncation occurred by comparing count (from MPI_Get_count) with max_result_size, and throwing an error or logging a warning when truncation is detected. This would help catch configuration errors where max_result_size is set too small.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@include/dynampi/mpi/mpi_communicator.hpp`:
- Around line 230-235: The adjust_recv_bytes_received function can underflow
when bytes_to_subtract > _statistics.bytes_received; update
adjust_recv_bytes_received (guarding on statistics_mode != StatisticsMode::None)
to clamp the subtraction or assert: check _statistics.bytes_received and if
bytes_to_subtract >= _statistics.bytes_received set _statistics.bytes_received =
0 (or log/assert in debug builds), otherwise subtract bytes_to_subtract from
_statistics.bytes_received to prevent size_t wraparound.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@include/dynampi/impl/naive_distributor.hpp`:
- Around line 271-300: Immediate receive path can call recv_any(buffer) before
knowing the tag, causing an MPI datatype mismatch when a REQUEST (nullptr)
arrives; change the _config.use_immediate_recv branch to first call
_communicator.recv_empty_message_any(status) (or otherwise probe) to obtain the
MPI_Status/tag, then: if status.MPI_TAG == Tag::RESULT allocate/resize ResultT
buffer (using result_type::resize(_config.max_result_size) when
resize_required), call _communicator.recv_any(buffer) and use MPI_Get_count then
process_result_message(status, std::move(buffer), count); else if Tag::REQUEST
call _communicator.recv_empty_message() (or handle the empty receive) and
proceed as the probe-based code does; update both the resize_required and
non-resize paths to follow this flow and keep references to recv_any,
recv_empty_message_any, process_result_message, result_type::resize, ResultT,
and Tag::RESULT/Tag::REQUEST.
🧹 Nitpick comments (3)
include/dynampi/mpi/mpi_communicator.hpp (1)
179-195: Consider extracting shared statistics tracking logic.The statistics update block (lines 186-193) is nearly identical to the one in
recv()(lines 162-169). Consider extracting a private helper to reduce duplication.♻️ Example refactor
private: template <typename mpi_type> void update_recv_statistics(const MPI_Status& status) { if constexpr (statistics_mode != StatisticsMode::None) { _statistics.recv_count++; int actual_count; DYNAMPI_MPI_CHECK(MPI_Get_count, (&status, mpi_type::value, &actual_count)); int size; MPI_Type_size(mpi_type::value, &size); _statistics.bytes_received += actual_count * size; } }include/dynampi/impl/naive_distributor.hpp (2)
33-36: Clarify truncation behavior in documentation.The comment states behavior is "undefined" if a message exceeds
max_result_size, but MPI actually returnsMPI_ERR_TRUNCATEon truncation, whichDYNAMPI_MPI_CHECKwould catch and likely abort. Consider clarifying that exceeding this size will cause an error, not silent corruption.- int max_result_size = 1024; // Maximum expected size for RESULT messages when using immediate - // recv. Must be large enough to hold the largest expected RESULT - // message. If a message exceeds this size, behavior is undefined. + int max_result_size = 1024; // Maximum expected size for RESULT messages when using immediate + // recv. Must be large enough to hold the largest expected RESULT + // message. If a message exceeds this size, MPI_Recv will fail + // with MPI_ERR_TRUNCATE.
247-261: Reuseidx_for_worker()and remove redundant resize.
- Line 249 duplicates the logic already in
idx_for_worker(status.MPI_SOURCE).- Lines 256-258: The resize of
_results[task_idx]before move assignment is wasteful—the move will overwrite the destination entirely, discarding any pre-allocated storage.♻️ Proposed fix
void process_result_message(const MPI_Status& status, ResultT&& result, int count) { - using result_type = MPI_Type<ResultT>; - int worker_idx = status.MPI_SOURCE - (status.MPI_SOURCE > _config.manager_rank); + int worker_idx = idx_for_worker(status.MPI_SOURCE); int64_t task_idx = _worker_current_task_indices[worker_idx]; _worker_current_task_indices[worker_idx] = -1; assert(task_idx >= 0 && "Task index should be valid"); if (static_cast<uint64_t>(task_idx) >= _results.size()) { _results.resize(task_idx + 1); } - if constexpr (result_type::resize_required) { - result_type::resize(_results[task_idx], count); - } _results[task_idx] = std::move(result); _results_received++; }
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Summary by CodeRabbit