Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

branch-3.0 pick 49325 fix query statistics leak in BE #49401

Merged
merged 1 commit into from
Mar 25, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 27 additions & 17 deletions be/src/runtime/runtime_query_statistics_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ static Status _do_report_exec_stats_rpc(const TNetworkAddress& coor_addr,
TReportExecStatusResult& res) {
Status client_status;
FrontendServiceConnection rpc_client(ExecEnv::GetInstance()->frontend_client_cache(), coor_addr,
&client_status);
config::thrift_rpc_timeout_ms, &client_status);
if (!client_status.ok()) {
LOG_WARNING(
"Could not get client rpc client of {} when reporting profiles, reason is {}, "
Expand Down Expand Up @@ -349,26 +349,36 @@ void RuntimeQueryStatisticsMgr::report_runtime_query_statistics() {
std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock);
int64_t current_time = MonotonicMillis();
int64_t conf_qs_timeout = config::query_statistics_reserve_timeout_ms;
for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) {
if (qs_ctx_ptr->_query_type == TQueryType::EXTERNAL) {
continue;
}
if (fe_qs_map.find(qs_ctx_ptr->_fe_addr) == fe_qs_map.end()) {
std::map<std::string, TQueryStatistics> tmp_map;
fe_qs_map[qs_ctx_ptr->_fe_addr] = std::move(tmp_map);
}

TQueryStatistics ret_t_qs;
qs_ctx_ptr->collect_query_statistics(&ret_t_qs);
fe_qs_map.at(qs_ctx_ptr->_fe_addr)[query_id] = ret_t_qs;

for (auto iter = _query_statistics_ctx_map.begin();
iter != _query_statistics_ctx_map.end();) {
std::string query_id = iter->first;
auto& qs_ctx_ptr = iter->second;
bool is_query_finished = qs_ctx_ptr->_is_query_finished;
bool is_timeout_after_finish = false;
if (is_query_finished) {
is_timeout_after_finish =
(current_time - qs_ctx_ptr->_query_finish_time) > conf_qs_timeout;
}
qs_status[query_id] = std::make_pair(is_query_finished, is_timeout_after_finish);

// external query not need to report to FE, so we can remove it directly.
if (qs_ctx_ptr->_query_type == TQueryType::EXTERNAL && is_query_finished) {
iter = _query_statistics_ctx_map.erase(iter);
} else {
if (qs_ctx_ptr->_query_type != TQueryType::EXTERNAL) {
if (fe_qs_map.find(qs_ctx_ptr->_fe_addr) == fe_qs_map.end()) {
std::map<std::string, TQueryStatistics> tmp_map;
fe_qs_map[qs_ctx_ptr->_fe_addr] = std::move(tmp_map);
}

TQueryStatistics ret_t_qs;
qs_ctx_ptr->collect_query_statistics(&ret_t_qs);
fe_qs_map.at(qs_ctx_ptr->_fe_addr)[query_id] = ret_t_qs;
qs_status[query_id] =
std::make_pair(is_query_finished, is_timeout_after_finish);
}

iter++;
}
}
}

Expand All @@ -379,7 +389,7 @@ void RuntimeQueryStatisticsMgr::report_runtime_query_statistics() {
// 2.1 get client
Status coord_status;
FrontendServiceConnection coord(ExecEnv::GetInstance()->frontend_client_cache(), addr,
&coord_status);
config::thrift_rpc_timeout_ms, &coord_status);
std::string add_str = PrintThriftNetworkAddress(addr);
if (!coord_status.ok()) {
std::stringstream ss;
Expand Down Expand Up @@ -409,7 +419,7 @@ void RuntimeQueryStatisticsMgr::report_runtime_query_statistics() {
} catch (apache::thrift::transport::TTransportException& e) {
LOG(WARNING) << "[report_query_statistics]report workload runtime statistics to "
<< add_str << " failed, reason: " << e.what();
rpc_status = coord.reopen();
rpc_status = coord.reopen(config::thrift_rpc_timeout_ms);
if (!rpc_status.ok()) {
LOG(WARNING) << "[report_query_statistics]reopen thrift client failed when report "
"workload runtime statistics to"
Expand Down
Loading