Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -548,8 +548,8 @@ void PrestoServer::run() {
}
if (spillerExecutor_ != nullptr) {
PRESTO_STARTUP_LOG(INFO)
<< "Spiller CPU executor '" << spillerExecutor_->getName() << "', has "
<< spillerExecutor_->numThreads() << " threads.";
<< "Spiller CPU executor '" << spillerCpuExecutor_->getName()
<< "', has " << spillerCpuExecutor_->numThreads() << " threads.";
} else {
PRESTO_STARTUP_LOG(INFO) << "Spill executor was not configured.";
}
Expand All @@ -560,7 +560,7 @@ void PrestoServer::run() {
auto* asyncDataCache = velox::cache::AsyncDataCache::getInstance();
periodicTaskManager_ = std::make_unique<PeriodicTaskManager>(
driverCpuExecutor_,
spillerExecutor_.get(),
spillerCpuExecutor_,
httpSrvIoExecutor_.get(),
httpSrvCpuExecutor_.get(),
exchangeHttpIoExecutor_.get(),
Expand Down Expand Up @@ -836,10 +836,11 @@ void PrestoServer::initializeThreadPools() {
threadFactory = std::make_shared<folly::NamedThreadFactory>("Driver");
}

auto driverExecutor = std::make_unique<folly::CPUThreadPoolExecutor>(
driverExecutor_ = std::make_unique<folly::CPUThreadPoolExecutor>(
numDriverCpuThreads, threadFactory);
driverCpuExecutor_ = driverExecutor.get();
driverExecutor_ = std::move(driverExecutor);
driverCpuExecutor_ =
velox::checked_pointer_cast<folly::CPUThreadPoolExecutor>(
driverExecutor_.get());

const auto numIoThreads = std::max<size_t>(
systemConfig->httpServerNumIoThreadsHwMultiplier() * hwConcurrency, 1);
Expand All @@ -857,8 +858,10 @@ void PrestoServer::initializeThreadPools() {
spillerExecutor_ = std::make_unique<folly::CPUThreadPoolExecutor>(
numSpillerCpuThreads,
std::make_shared<folly::NamedThreadFactory>("Spiller"));
spillerCpuExecutor_ =
velox::checked_pointer_cast<folly::CPUThreadPoolExecutor>(
spillerExecutor_.get());
}

const auto numExchangeHttpClientIoThreads = std::max<size_t>(
systemConfig->exchangeHttpClientNumIoThreadsHwMultiplier() *
std::thread::hardware_concurrency(),
Expand Down
10 changes: 8 additions & 2 deletions presto-native-execution/presto_cpp/main/PrestoServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,14 @@ class PrestoServer {
// 'driverExecutor_'.
folly::CPUThreadPoolExecutor* driverCpuExecutor_;

// Executor for spilling.
std::unique_ptr<folly::CPUThreadPoolExecutor> spillerExecutor_;
// Executor for spilling. The underlying thread pool executor is a
// folly::CPUThreadPoolExecutor. The executor is stored as abstract type to
// provide flexibility of thread pool monitoring. The underlying
// folly::CPUThreadPoolExecutor can be obtained through 'spillerCpuExecutor_'.
std::unique_ptr<folly::Executor> spillerExecutor_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @duxiao1212 for this code. I'm not sure I follow the motivation for this change. Please can you help explain.

Copy link
Contributor Author

@duxiao1212 duxiao1212 Nov 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right @aditi-pandit - the original type worked. However, we need to change it to std::unique_ptr<folly::Executor> because the build system allows wrapping executors with monitoring capabilities.

Background

While investigating stuck driver issues, we identified the need for thread monitoring capability on the spiller thread executor. This helps detect and diagnose thread stalls during spill operations.

Implementation

By storing as the base type Executor, the executor can be wrapped with monitoring.
While keeping the typed spillerCpuExecutor_ pointer allows PeriodicTaskManager to access monitoring methods.

This follows the same pattern as driverExecutor_ and enables consistent thread stall detection across execution contexts.

// Raw pointer pointing to the underlying folly::CPUThreadPoolExecutor of
// 'spillerExecutor_'.
folly::CPUThreadPoolExecutor* spillerCpuExecutor_;

std::unique_ptr<VeloxPlanValidator> planValidator_;

Expand Down
Loading