Skip to content

Commit b450fb8

Browse files
duxiao1212facebook-github-bot
authored andcommitted
misc: Refactor TaskManager init and clean up QueryContextManager (#26711)
Summary: as title Differential Revision: D87955845
1 parent d5c8633 commit b450fb8

File tree

4 files changed

+35
-26
lines changed

4 files changed

+35
-26
lines changed

presto-native-execution/presto_cpp/main/QueryContextManager.cpp

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,16 @@
1111
* See the License for the specific language governing permissions and
1212
* limitations under the License.
1313
*/
14-
1514
#include "presto_cpp/main/QueryContextManager.h"
16-
#include <folly/executors/IOThreadPoolExecutor.h>
1715
#include "presto_cpp/main/PrestoToVeloxQueryConfig.h"
1816
#include "presto_cpp/main/SessionProperties.h"
1917
#include "presto_cpp/main/common/Configs.h"
18+
2019
#include "velox/connectors/hive/HiveConfig.h"
2120
#include "velox/core/QueryConfig.h"
2221

22+
#include <folly/executors/IOThreadPoolExecutor.h>
23+
2324
using namespace facebook::velox;
2425

2526
using facebook::presto::protocol::QueryId;
@@ -60,8 +61,11 @@ std::shared_ptr<velox::core::QueryCtx> QueryContextCache::insert(
6061
evict();
6162
}
6263
queryIds_.push_front(queryId);
63-
queryCtxs_[queryId] = {
64-
folly::to_weak_ptr(queryCtx), queryIds_.begin(), false};
64+
queryCtxs_.insert(
65+
{queryId,
66+
{.queryCtx = folly::to_weak_ptr(queryCtx),
67+
.idListIterator = queryIds_.begin(),
68+
.hasStartedTasks = false}});
6569
return queryCtx;
6670
}
6771

@@ -83,11 +87,11 @@ void QueryContextCache::setTasksStarted(const protocol::QueryId& queryId) {
8387

8488
void QueryContextCache::evict() {
8589
// Evict least recently used queryCtx if it is not referenced elsewhere.
86-
for (auto victim = queryIds_.end(); victim != queryIds_.begin();) {
87-
--victim;
88-
if (!queryCtxs_[*victim].queryCtx.lock()) {
89-
queryCtxs_.erase(*victim);
90-
queryIds_.erase(victim);
90+
for (auto victim = queryIds_.rbegin(); victim != queryIds_.rend(); ++victim) {
91+
auto iter = queryCtxs_.find(*victim);
92+
if (iter != queryCtxs_.end() && !iter->second.queryCtx.lock()) {
93+
queryCtxs_.erase(iter);
94+
queryIds_.erase(std::next(victim).base());
9195
return;
9296
}
9397
}

presto-native-execution/presto_cpp/main/QueryContextManager.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class QueryContextCache {
3535
};
3636
using QueryCtxMap = std::unordered_map<protocol::QueryId, QueryCtxCacheValue>;
3737

38-
QueryContextCache(size_t initial_capacity = kInitialCapacity)
38+
explicit QueryContextCache(size_t initial_capacity = kInitialCapacity)
3939
: capacity_(initial_capacity) {}
4040

4141
size_t capacity() const {

presto-native-execution/presto_cpp/main/TaskManager.cpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -354,11 +354,14 @@ void enqueueTask(
354354
TaskManager::TaskManager(
355355
folly::Executor* driverExecutor,
356356
folly::Executor* httpSrvCpuExecutor,
357-
folly::Executor* spillerExecutor)
357+
folly::Executor* spillerExecutor,
358+
std::unique_ptr<QueryContextManager> queryContextManager)
358359
: queryContextManager_(
359-
std::make_unique<QueryContextManager>(
360-
driverExecutor,
361-
spillerExecutor)),
360+
queryContextManager == nullptr
361+
? std::make_unique<QueryContextManager>(
362+
driverExecutor,
363+
spillerExecutor)
364+
: std::move(queryContextManager)),
362365
bufferManager_(velox::exec::OutputBufferManager::getInstanceRef()),
363366
httpSrvCpuExecutor_(httpSrvCpuExecutor),
364367
lastNotOverloadedTimeInSecs_(velox::getCurrentTimeSec()) {

presto-native-execution/presto_cpp/main/TaskManager.h

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,23 @@ class TaskManager {
3232
TaskManager(
3333
folly::Executor* driverExecutor,
3434
folly::Executor* httpSrvExecutor,
35-
folly::Executor* spillerExecutor);
35+
folly::Executor* spillerExecutor,
36+
std::unique_ptr<QueryContextManager> queryContextManager = nullptr);
3637

3738
virtual ~TaskManager() = default;
3839

40+
/// Always returns tuple of non-empty string containing the spill directory
41+
/// and the date string directory, which is parent directory of task spill
42+
/// directory.
43+
static std::tuple<std::string, std::string> buildTaskSpillDirectoryPath(
44+
const std::string& baseSpillPath,
45+
const std::string& nodeIp,
46+
const std::string& nodeId,
47+
const std::string& queryId,
48+
const protocol::TaskId& taskId,
49+
bool includeNodeInSpillPath);
50+
51+
protected:
3952
/// Invoked by Presto server shutdown to wait for all the tasks to complete
4053
/// and cleanup the completed tasks.
4154
void shutdown();
@@ -166,17 +179,6 @@ class TaskManager {
166179
std::vector<std::string>& deadlockTasks,
167180
std::vector<velox::exec::Task::OpCallInfo>& stuckOpCalls) const;
168181

169-
/// Always returns tuple of non-empty string containing the spill directory
170-
/// and the date string directory, which is parent directory of task spill
171-
/// directory.
172-
static std::tuple<std::string, std::string> buildTaskSpillDirectoryPath(
173-
const std::string& baseSpillPath,
174-
const std::string& nodeIp,
175-
const std::string& nodeId,
176-
const std::string& queryId,
177-
const protocol::TaskId& taskId,
178-
bool includeNodeInSpillPath);
179-
180182
/// Presto Server can notify the Task Manager that the former is overloaded,
181183
/// so the Task Manager can optionally change Task admission algorithm.
182184
void setServerOverloaded(bool serverOverloaded);

0 commit comments

Comments
 (0)