Skip to content

Commit 5f2ed50

Browse files
kwen2501pytorchmergebot
authored andcommitted
[PGNCCL] Watchdog prints call-time traceback when reporting timeout (pytorch#139659)
### Motivation Today, watchdog only reports that it found a collective timeout: ``` [rank1]:[E1104 14:02:18.767594328 ProcessGroupNCCL.cpp:688] [Rank 1] Watchdog caught collective operation timeout: WorkNCCL(SeqNum=1, OpType=ALLREDUCE, NumelIn=200, NumelOut=200, Timeout(ms)=5000) ran for 5096 milliseconds before timing out. ``` While this is nice, it is hard to associate the error with user's program or library stack. ### This PR This PR gives watchdog the ability to report the call-time stack of the collective, so that it would be easier to track the error back to the program's behavior. The call-time stack was recorded by Flight Recorder with minimal overhead (for details, please read this [doc](https://dev-discuss.pytorch.org/t/fast-combined-c-python-torchscript-inductor-tracebacks/1158) written by @zdevito ). In `ProcessGroupNCCL`, we are only tracking / reporting the python part so that it fits most PyTorch users. ### Demo [stack_demo.py](https://gist.github.com/kwen2501/6758e18d305d67fc6f3f926217825c09). ``` TORCH_NCCL_TRACE_BUFFER_SIZE=100 torchrun --nproc-per-node 2 stack_demo.py ``` `TORCH_NCCL_TRACE_BUFFER_SIZE` is for turning on the Flight Recorder. Output: ``` [rank0]:[E1104 14:19:27.591610653 ProcessGroupNCCL.cpp:695] Stack trace of the timedout collective operation: #0 all_reduce from /data/users/kw2501/pytorch/torch/distributed/distributed_c10d.py:2696 #1 wrapper from /data/users/kw2501/pytorch/torch/distributed/c10d_logger.py:83 #2 bar from /data/users/kw2501/sync_async/repro.py:15 #3 foo from /data/users/kw2501/sync_async/repro.py:24 #4 main from /data/users/kw2501/sync_async/repro.py:34 #5 <module> from /data/users/kw2501/sync_async/repro.py:40 [rank1]:[E1104 14:19:27.771430164 ProcessGroupNCCL.cpp:695] Stack trace of the timedout collective operation: #0 all_gather_into_tensor from /data/users/kw2501/pytorch/torch/distributed/distributed_c10d.py:3630 #1 wrapper from /data/users/kw2501/pytorch/torch/distributed/c10d_logger.py:83 #2 baz from /data/users/kw2501/sync_async/repro.py:20 #3 foo from /data/users/kw2501/sync_async/repro.py:26 #4 main from /data/users/kw2501/sync_async/repro.py:34 #5 <module> from /data/users/kw2501/sync_async/repro.py:40 ``` From the log above, we can tell that `bar()` and `baz()` are the places where the two ranks divert. Pull Request resolved: pytorch#139659 Approved by: https://github.com/wconstab, https://github.com/fduwjj
1 parent ee42a99 commit 5f2ed50

File tree

3 files changed

+79
-0
lines changed

3 files changed

+79
-0
lines changed

torch/csrc/distributed/c10d/NCCLUtils.cpp

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,33 @@ void DebugInfoWriter::registerWriter(std::unique_ptr<DebugInfoWriter> writer) {
376376
writer_ = std::move(writer);
377377
}
378378

379+
// Returns the traceback of current entry, in string form.
380+
// Note: `getTraceback` invokes `torch::symbolize`, which may need to acquire
381+
// the GIL. If you don't want to block the current thread or take the risk of a
382+
// GIL deadlock, you can use an asynchronous calling mechanism like std::async.
383+
std::string NCCLTraceBuffer::Entry::getTraceback() {
384+
torch::CapturedTraceback* traceback = traceback_.get();
385+
torch::SymbolizedTracebacks s_tbs = torch::symbolize({traceback});
386+
// We use 0 because we only have one traceback here.
387+
const auto& s_tb = s_tbs.tracebacks.at(0);
388+
std::stringstream oss;
389+
for (auto idx : c10::irange(s_tb.size())) {
390+
auto frame_id = s_tb[idx];
391+
const auto& frame = s_tbs.all_frames.at(frame_id);
392+
oss << "#" << idx << " " << frame.funcname << " from " << frame.filename
393+
<< ":" << frame.lineno << '\n';
394+
}
395+
/* Resulted format is like:
396+
#0 all_reduce from pytorch/torch/distributed/distributed_c10d.py:2696
397+
#1 wrapper from pytorch/torch/distributed/c10d_logger.py:83
398+
#2 bar from /home/user/repro.py:15
399+
#3 foo from /home/user/repro.py:24
400+
#4 main from /home/user/repro.py:34
401+
#5 <module> from /home/user/repro.py:40
402+
*/
403+
return oss.str();
404+
}
405+
379406
std::optional<size_t> NCCLTraceBuffer::record(
380407
size_t pg_id,
381408
const std::tuple<std::string, std::string>& pg_name,
@@ -495,6 +522,23 @@ std::vector<NCCLTraceBuffer::Entry> NCCLTraceBuffer::dump_entries() {
495522
return result;
496523
}
497524

525+
// Returns the entry with the given id, if it exists. Otherwise, returns
526+
// std::nullopt.
527+
std::optional<NCCLTraceBuffer::Entry> NCCLTraceBuffer::getEntry(
528+
std::optional<size_t> id) {
529+
if (!enabled_ || !id) {
530+
return std::nullopt;
531+
}
532+
533+
std::unique_lock<std::mutex> guard(mutex_);
534+
Entry entry = entries_.at(*id % max_entries_);
535+
if (entry.id_ == *id) {
536+
return entry;
537+
} else {
538+
return std::nullopt;
539+
}
540+
}
541+
498542
void NCCLTraceBuffer::retire_id(
499543
std::optional<size_t> id,
500544
bool compute_duration) {

torch/csrc/distributed/c10d/NCCLUtils.hpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -663,6 +663,9 @@ struct NCCLTraceBuffer {
663663
c10::SmallVector<int64_t, 8> sizes_; // flattened from inputs, outputs
664664
bool retired_ = false; // is this work entry no longer in the workMetaList_?
665665
// a retired but not completed event has timed out
666+
667+
// Returns the traceback of current entry, in string form.
668+
std::string getTraceback();
666669
};
667670

668671
bool enabled_ = false;
@@ -699,6 +702,10 @@ struct NCCLTraceBuffer {
699702

700703
std::vector<Entry> dump_entries();
701704

705+
// Returns the entry with the given id, if it exists. Otherwise, returns
706+
// std::nullopt.
707+
std::optional<Entry> getEntry(std::optional<size_t> id);
708+
702709
/*
703710
Mark an Event as completed and free its events.
704711
This is called by the watchdog thread, and is asynchronous from the

torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -690,6 +690,34 @@ bool ProcessGroupNCCL::WorkNCCL::checkTimeout(
690690
" milliseconds before timing out.");
691691

692692
LOG(ERROR) << exceptionMsg;
693+
694+
// Get the stack trace of the work at call time
695+
// First step we get the corresponding record entry from FR, based on work's
696+
// trace_id_
697+
std::optional<NCCLTraceBuffer::Entry> entry =
698+
NCCLTraceBuffer::get()->getEntry(trace_id_);
699+
if (entry.has_value()) {
700+
auto entryVal = entry.value();
701+
// Get stack trace from FR entry, in string format
702+
// Note: `getTraceback` call below invokes `torch::symbolize`, which may
703+
// need to acquire the GIL. In order for watchdog to be block-free, we make
704+
// the call with std::async.
705+
auto future = std::async(
706+
std::launch::async, [&entryVal]() { return entryVal.getTraceback(); });
707+
// Wait for the future to complete or timeout
708+
auto status = future.wait_for(std::chrono::seconds(8));
709+
if (status == std::future_status::ready) {
710+
std::string tracebackStr = future.get();
711+
LOG(ERROR) << "Stack trace of the timedout collective operation: \n"
712+
<< tracebackStr;
713+
} // else, symbolizer probably timed out, we skip logging the stack trace.
714+
} else {
715+
LOG(ERROR)
716+
<< "Stack trace of the timedout collective not found, "
717+
<< "potentially because FlightRecorder is disabled. "
718+
<< "You can enable it by setting TORCH_NCCL_TRACE_BUFFER_SIZE to a non-zero value.";
719+
}
720+
693721
std::exception_ptr exception_ptr =
694722
std::make_exception_ptr(C10_BUILD_ERROR(DistBackendError, exceptionMsg));
695723
setException(exception_ptr);

0 commit comments

Comments
 (0)