Skip to content

Commit 39e679f

Browse files
duxiao1212facebook-github-bot
authored andcommitted
misc: Refactor TaskManager init and clean up QueryContextManager (#26711)
Summary: as title Differential Revision: D87955845
1 parent 08e141f commit 39e679f

File tree

4 files changed

+32
-27
lines changed

4 files changed

+32
-27
lines changed

presto-native-execution/presto_cpp/main/QueryContextManager.cpp

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,16 @@
1111
* See the License for the specific language governing permissions and
1212
* limitations under the License.
1313
*/
14-
1514
#include "presto_cpp/main/QueryContextManager.h"
16-
#include <folly/executors/IOThreadPoolExecutor.h>
1715
#include "presto_cpp/main/PrestoToVeloxQueryConfig.h"
1816
#include "presto_cpp/main/SessionProperties.h"
1917
#include "presto_cpp/main/common/Configs.h"
18+
2019
#include "velox/connectors/hive/HiveConfig.h"
2120
#include "velox/core/QueryConfig.h"
2221

22+
#include <folly/executors/IOThreadPoolExecutor.h>
23+
2324
using namespace facebook::velox;
2425

2526
using facebook::presto::protocol::QueryId;
@@ -60,8 +61,8 @@ std::shared_ptr<velox::core::QueryCtx> QueryContextCache::insert(
6061
evict();
6162
}
6263
queryIds_.push_front(queryId);
63-
queryCtxs_[queryId] = {
64-
folly::to_weak_ptr(queryCtx), queryIds_.begin(), false};
64+
queryCtxs_.insert({queryId, {
65+
.queryCtx=folly::to_weak_ptr(queryCtx), .idListIterator=queryIds_.begin(), .hasStartedTasks=false}});
6566
return queryCtx;
6667
}
6768

@@ -83,11 +84,11 @@ void QueryContextCache::setTasksStarted(const protocol::QueryId& queryId) {
8384

8485
void QueryContextCache::evict() {
8586
// Evict least recently used queryCtx if it is not referenced elsewhere.
86-
for (auto victim = queryIds_.end(); victim != queryIds_.begin();) {
87-
--victim;
88-
if (!queryCtxs_[*victim].queryCtx.lock()) {
89-
queryCtxs_.erase(*victim);
90-
queryIds_.erase(victim);
87+
for (auto victim = queryIds_.rbegin(); victim != queryIds_.rend(); ++victim) {
88+
auto iter = queryCtxs_.find(*victim);
89+
if (iter != queryCtxs_.end() && !iter->second.queryCtx.lock()) {
90+
queryCtxs_.erase(iter);
91+
queryIds_.erase(std::next(victim).base());
9192
return;
9293
}
9394
}

presto-native-execution/presto_cpp/main/QueryContextManager.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class QueryContextCache {
3535
};
3636
using QueryCtxMap = std::unordered_map<protocol::QueryId, QueryCtxCacheValue>;
3737

38-
QueryContextCache(size_t initial_capacity = kInitialCapacity)
38+
explicit QueryContextCache(size_t initial_capacity = kInitialCapacity)
3939
: capacity_(initial_capacity) {}
4040

4141
size_t capacity() const {

presto-native-execution/presto_cpp/main/TaskManager.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -354,11 +354,12 @@ void enqueueTask(
354354
TaskManager::TaskManager(
355355
folly::Executor* driverExecutor,
356356
folly::Executor* httpSrvCpuExecutor,
357-
folly::Executor* spillerExecutor)
357+
folly::Executor* spillerExecutor,
358+
std::unique_ptr<QueryContextManager> queryContextManager)
358359
: queryContextManager_(
359-
std::make_unique<QueryContextManager>(
360-
driverExecutor,
361-
spillerExecutor)),
360+
queryContextManager == nullptr
361+
? std::make_unique<QueryContextManager>(driverExecutor, spillerExecutor)
362+
: std::move(queryContextManager)),
362363
bufferManager_(velox::exec::OutputBufferManager::getInstanceRef()),
363364
httpSrvCpuExecutor_(httpSrvCpuExecutor),
364365
lastNotOverloadedTimeInSecs_(velox::getCurrentTimeSec()) {

presto-native-execution/presto_cpp/main/TaskManager.h

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,25 @@ class TaskManager {
3232
TaskManager(
3333
folly::Executor* driverExecutor,
3434
folly::Executor* httpSrvExecutor,
35-
folly::Executor* spillerExecutor);
35+
folly::Executor* spillerExecutor,
36+
std::unique_ptr<QueryContextManager> queryContextManager = nullptr);
3637

3738
virtual ~TaskManager() = default;
3839

39-
/// Invoked by Presto server shutdown to wait for all the tasks to complete
40+
/// Always returns tuple of non-empty string containing the spill directory
41+
/// and the date string directory, which is parent directory of task spill
42+
/// directory.
43+
static std::tuple<std::string, std::string> buildTaskSpillDirectoryPath(
44+
const std::string& baseSpillPath,
45+
const std::string& nodeIp,
46+
const std::string& nodeId,
47+
const std::string& queryId,
48+
const protocol::TaskId& taskId,
49+
bool includeNodeInSpillPath);
50+
51+
protected:
52+
53+
/// Invoked by Presto server shutdown to wait for all the tasks to complete
4054
/// and cleanup the completed tasks.
4155
void shutdown();
4256

@@ -166,17 +180,6 @@ class TaskManager {
166180
std::vector<std::string>& deadlockTasks,
167181
std::vector<velox::exec::Task::OpCallInfo>& stuckOpCalls) const;
168182

169-
/// Always returns tuple of non-empty string containing the spill directory
170-
/// and the date string directory, which is parent directory of task spill
171-
/// directory.
172-
static std::tuple<std::string, std::string> buildTaskSpillDirectoryPath(
173-
const std::string& baseSpillPath,
174-
const std::string& nodeIp,
175-
const std::string& nodeId,
176-
const std::string& queryId,
177-
const protocol::TaskId& taskId,
178-
bool includeNodeInSpillPath);
179-
180183
/// Presto Server can notify the Task Manager that the former is overloaded,
181184
/// so the Task Manager can optionally change Task admission algorithm.
182185
void setServerOverloaded(bool serverOverloaded);

0 commit comments

Comments
 (0)