Skip to content

Commit 91fc9fe

Browse files
[Feature] support http query api (StarRocks#25760)
This is a new implementation of http query api on the basis of StarRocks#4464. Instead of transforming result to json in FE, this pr generate result in json format in BE, FE just send json to client, which can reduce Fe's burden. This pr also register connectContext into connectScheduler, so query timeout and wait timeout can be used for http sql. Right now select, show ,explain, kill statements are supported, under the current code framework, support other statement is very easy. --------- Signed-off-by: mchades <[email protected]> Signed-off-by: before-Sunrise <[email protected]> Co-authored-by: mchades <[email protected]>
1 parent 1b02456 commit 91fc9fe

36 files changed

+1852
-75
lines changed

be/src/column/type_traits.h

+5
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,11 @@ struct ColumnTraits<int8_t> {
300300
using ColumnType = Int8Column;
301301
};
302302

303+
template <>
304+
struct ColumnTraits<uint8_t> {
305+
using ColumnType = UInt8Column;
306+
};
307+
303308
template <>
304309
struct ColumnTraits<int16_t> {
305310
using ColumnType = Int16Column;

be/src/exec/pipeline/fragment_executor.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -762,6 +762,7 @@ Status FragmentExecutor::_decompose_data_sink_to_operator(RuntimeState* runtime_
762762
result_sink->get_file_opts(), dop, fragment_ctx);
763763
} else {
764764
op = std::make_shared<ResultSinkOperatorFactory>(context->next_operator_id(), result_sink->get_sink_type(),
765+
result_sink->get_format_type(),
765766
result_sink->get_output_exprs(), fragment_ctx);
766767
}
767768
// Add result sink operator to last pipeline

be/src/exec/pipeline/result_sink_operator.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include "column/chunk.h"
1818
#include "exprs/expr.h"
1919
#include "runtime/buffer_control_block.h"
20+
#include "runtime/http_result_writer.h"
2021
#include "runtime/mysql_result_writer.h"
2122
#include "runtime/query_statistics.h"
2223
#include "runtime/result_buffer_mgr.h"
@@ -42,6 +43,9 @@ Status ResultSinkOperator::prepare(RuntimeState* state) {
4243
case TResultSinkType::VARIABLE:
4344
_writer = std::make_shared<VariableResultWriter>(_sender.get(), _output_expr_ctxs, _profile.get());
4445
break;
46+
case TResultSinkType::HTTP_PROTOCAL:
47+
_writer = std::make_shared<HttpResultWriter>(_sender.get(), _output_expr_ctxs, _profile.get(), _format_type);
48+
break;
4549
default:
4650
return Status::InternalError("Unknown result sink type");
4751
}

be/src/exec/pipeline/result_sink_operator.h

+11-6
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,13 @@ namespace pipeline {
3030
class ResultSinkOperator final : public Operator {
3131
public:
3232
ResultSinkOperator(OperatorFactory* factory, int32_t id, int32_t plan_node_id, int32_t driver_sequence,
33-
TResultSinkType::type sink_type, std::vector<ExprContext*> output_expr_ctxs,
34-
const std::shared_ptr<BufferControlBlock>& sender, std::atomic<int32_t>& num_result_sinks,
35-
std::atomic<int64_t>& num_written_rows, FragmentContext* const fragment_ctx)
33+
TResultSinkType::type sink_type, TResultSinkFormatType::type format_type,
34+
std::vector<ExprContext*> output_expr_ctxs, const std::shared_ptr<BufferControlBlock>& sender,
35+
std::atomic<int32_t>& num_result_sinks, std::atomic<int64_t>& num_written_rows,
36+
FragmentContext* const fragment_ctx)
3637
: Operator(factory, id, "result_sink", plan_node_id, driver_sequence),
3738
_sink_type(sink_type),
39+
_format_type(format_type),
3840
_output_expr_ctxs(std::move(output_expr_ctxs)),
3941
_sender(sender),
4042
_num_result_sinkers(num_result_sinks),
@@ -68,6 +70,7 @@ class ResultSinkOperator final : public Operator {
6870

6971
private:
7072
TResultSinkType::type _sink_type;
73+
TResultSinkFormatType::type _format_type;
7174
std::vector<ExprContext*> _output_expr_ctxs;
7275

7376
/// The following three fields are shared by all the ResultSinkOperators
@@ -89,10 +92,11 @@ class ResultSinkOperator final : public Operator {
8992

9093
class ResultSinkOperatorFactory final : public OperatorFactory {
9194
public:
92-
ResultSinkOperatorFactory(int32_t id, TResultSinkType::type sink_type, std::vector<TExpr> t_output_expr,
93-
FragmentContext* const fragment_ctx)
95+
ResultSinkOperatorFactory(int32_t id, TResultSinkType::type sink_type, TResultSinkFormatType::type format_type,
96+
std::vector<TExpr> t_output_expr, FragmentContext* const fragment_ctx)
9497
: OperatorFactory(id, "result_sink", Operator::s_pseudo_plan_node_id_for_result_sink),
9598
_sink_type(sink_type),
99+
_format_type(format_type),
96100
_t_output_expr(std::move(t_output_expr)),
97101
_fragment_ctx(fragment_ctx) {}
98102

@@ -104,7 +108,7 @@ class ResultSinkOperatorFactory final : public OperatorFactory {
104108
// of increasing _num_result_sinkers to ResultSinkOperator::close is guaranteed by pipeline driver queue,
105109
// so it doesn't need memory barrier here.
106110
_increment_num_result_sinkers_no_barrier();
107-
return std::make_shared<ResultSinkOperator>(this, _id, _plan_node_id, driver_sequence, _sink_type,
111+
return std::make_shared<ResultSinkOperator>(this, _id, _plan_node_id, driver_sequence, _sink_type, _format_type,
108112
_output_expr_ctxs, _sender, _num_result_sinkers, _num_written_rows,
109113
_fragment_ctx);
110114
}
@@ -117,6 +121,7 @@ class ResultSinkOperatorFactory final : public OperatorFactory {
117121
void _increment_num_result_sinkers_no_barrier() { _num_result_sinkers.fetch_add(1, std::memory_order_relaxed); }
118122

119123
TResultSinkType::type _sink_type;
124+
TResultSinkFormatType::type _format_type;
120125
std::vector<TExpr> _t_output_expr;
121126
std::vector<ExprContext*> _output_expr_ctxs;
122127

be/src/exprs/cast_expr.h

+3
Original file line numberDiff line numberDiff line change
@@ -193,4 +193,7 @@ struct CastToString {
193193

194194
StatusOr<ColumnPtr> cast_nested_to_json(const ColumnPtr& column);
195195

196+
// cast column[idx] to coresponding json type.
197+
StatusOr<std::string> cast_type_to_json_str(const ColumnPtr& column, int idx);
198+
196199
} // namespace starrocks

be/src/exprs/cast_expr_json.cpp

+10-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class CastColumnItemVisitor final : public ColumnVisitorAdapter<CastColumnItemVi
5959
auto value = col.get(_row).template get<T>();
6060
std::string str = CastToString::apply<T, std::string>(value);
6161
_add_element(std::move(str));
62-
} else if constexpr (std::is_integral_v<T>) {
62+
} else if constexpr (std::is_integral_v<T> || std::is_floating_point_v<T>) {
6363
auto value = col.get(_row).template get<T>();
6464
_add_element(std::move(value));
6565
} else {
@@ -208,4 +208,13 @@ StatusOr<ColumnPtr> cast_nested_to_json(const ColumnPtr& column) {
208208
return column_builder.build(false);
209209
}
210210

211+
StatusOr<std::string> cast_type_to_json_str(const ColumnPtr& column, int idx) {
212+
vpack::Builder json_builder;
213+
json_builder.clear();
214+
RETURN_IF_ERROR(CastColumnItemVisitor::cast_datum_to_json(column, idx, "", &json_builder));
215+
JsonValue json(json_builder.slice());
216+
217+
return json.to_string();
218+
}
219+
211220
} // namespace starrocks

be/src/runtime/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ set(RUNTIME_FILES
7070
memory_scratch_sink.cpp
7171
external_scan_context_mgr.cpp
7272
mysql_result_writer.cpp
73+
http_result_writer.cpp
7374
file_result_writer.cpp
7475
statistic_result_writer.cpp
7576
variable_result_writer.cpp

be/src/runtime/http_result_writer.cpp

+168
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
// Copyright 2021-present StarRocks, Inc. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "runtime/http_result_writer.h"
16+
17+
#include <column/column_helper.h>
18+
19+
#include "column/chunk.h"
20+
#include "column/const_column.h"
21+
#include "exprs/cast_expr.h"
22+
#include "exprs/expr.h"
23+
#include "rapidjson/stringbuffer.h"
24+
#include "rapidjson/writer.h"
25+
#include "runtime/buffer_control_block.h"
26+
#include "runtime/current_thread.h"
27+
#include "types/logical_type.h"
28+
29+
namespace starrocks {
30+
31+
HttpResultWriter::HttpResultWriter(BufferControlBlock* sinker, const std::vector<ExprContext*>& output_expr_ctxs,
32+
RuntimeProfile* parent_profile, TResultSinkFormatType::type format_type)
33+
: _sinker(sinker),
34+
_output_expr_ctxs(output_expr_ctxs),
35+
_parent_profile(parent_profile),
36+
_format_type(format_type) {}
37+
38+
Status HttpResultWriter::init(RuntimeState* state) {
39+
_init_profile();
40+
if (nullptr == _sinker) {
41+
return Status::InternalError("sinker is NULL pointer.");
42+
}
43+
44+
return Status::OK();
45+
}
46+
47+
void HttpResultWriter::_init_profile() {
48+
_append_chunk_timer = ADD_TIMER(_parent_profile, "AppendChunkTime");
49+
_convert_tuple_timer = ADD_CHILD_TIMER(_parent_profile, "TupleConvertTime", "AppendChunkTime");
50+
_result_send_timer = ADD_CHILD_TIMER(_parent_profile, "ResultRendTime", "AppendChunkTime");
51+
_sent_rows_counter = ADD_COUNTER(_parent_profile, "NumSentRows", TUnit::UNIT);
52+
}
53+
54+
// transform one row into json format
55+
void HttpResultWriter::_transform_row_to_json(const Columns& result_columns, int idx) {
56+
int num_columns = result_columns.size();
57+
58+
row_str.append("{\"data\":[");
59+
for (auto& result_column : result_columns) {
60+
std::string row = cast_type_to_json_str(result_column, idx).value();
61+
row_str.append(row);
62+
if (result_column != result_columns[num_columns - 1]) {
63+
row_str.append(",");
64+
}
65+
}
66+
row_str.append("]}\n");
67+
}
68+
69+
Status HttpResultWriter::append_chunk(Chunk* chunk) {
70+
return Status::NotSupported("HttpResultWriter doesn't support non-pipeline engine");
71+
}
72+
73+
Status HttpResultWriter::close() {
74+
COUNTER_SET(_sent_rows_counter, _written_rows);
75+
return Status::OK();
76+
}
77+
78+
StatusOr<TFetchDataResultPtrs> HttpResultWriter::process_chunk(Chunk* chunk) {
79+
SCOPED_TIMER(_append_chunk_timer);
80+
int num_rows = chunk->num_rows();
81+
std::vector<TFetchDataResultPtr> results;
82+
83+
Columns result_columns;
84+
// Step 1: compute expr
85+
int num_columns = _output_expr_ctxs.size();
86+
result_columns.reserve(num_columns);
87+
88+
for (int i = 0; i < num_columns; ++i) {
89+
ASSIGN_OR_RETURN(ColumnPtr column, _output_expr_ctxs[i]->evaluate(chunk));
90+
column = _output_expr_ctxs[i]->root()->type().type == TYPE_TIME
91+
? ColumnHelper::convert_time_column_from_double_to_str(column)
92+
: column;
93+
result_columns.emplace_back(std::move(column));
94+
}
95+
96+
// Step 2: convert chunk to http json row format row by row
97+
{
98+
TRY_CATCH_ALLOC_SCOPE_START()
99+
row_str.reserve(128);
100+
size_t current_bytes = 0;
101+
int current_rows = 0;
102+
SCOPED_TIMER(_convert_tuple_timer);
103+
auto result = std::make_unique<TFetchDataResult>();
104+
auto& result_rows = result->result_batch.rows;
105+
result_rows.resize(num_rows);
106+
107+
for (int i = 0; i < num_rows; ++i) {
108+
switch (_format_type) {
109+
case TResultSinkFormatType::type::JSON:
110+
_transform_row_to_json(result_columns, i);
111+
break;
112+
case TResultSinkFormatType::type::OTHERS:
113+
return Status::NotSupported("HttpResultWriter only support json format right now");
114+
}
115+
size_t len = row_str.size();
116+
117+
if (UNLIKELY(current_bytes + len >= _max_row_buffer_size)) {
118+
result_rows.resize(current_rows);
119+
results.emplace_back(std::move(result));
120+
121+
result = std::make_unique<TFetchDataResult>();
122+
result_rows = result->result_batch.rows;
123+
result_rows.resize(num_rows - i);
124+
125+
current_bytes = 0;
126+
current_rows = 0;
127+
}
128+
129+
// VLOG_ROW << "written row:" << row_str;
130+
result_rows[current_rows] = std::move(row_str);
131+
row_str.clear();
132+
133+
row_str.reserve(len * 1.1);
134+
135+
current_bytes += len;
136+
current_rows += 1;
137+
}
138+
if (current_rows > 0) {
139+
result_rows.resize(current_rows);
140+
results.emplace_back(std::move(result));
141+
}
142+
TRY_CATCH_ALLOC_SCOPE_END()
143+
}
144+
return results;
145+
}
146+
147+
StatusOr<bool> HttpResultWriter::try_add_batch(TFetchDataResultPtrs& results) {
148+
SCOPED_TIMER(_result_send_timer);
149+
size_t num_rows = 0;
150+
for (auto& result : results) {
151+
num_rows += result->result_batch.rows.size();
152+
}
153+
154+
auto status = _sinker->try_add_batch(results);
155+
if (status.ok()) {
156+
// success in add result to ResultQueue of _sinker
157+
if (status.value()) {
158+
_written_rows += num_rows;
159+
results.clear();
160+
}
161+
} else {
162+
results.clear();
163+
LOG(WARNING) << "Append result batch to sink failed: status=" << status.status().to_string();
164+
}
165+
return status;
166+
}
167+
168+
} // namespace starrocks

be/src/runtime/http_result_writer.h

+90
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
// Copyright 2021-present StarRocks, Inc. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
// This file is based on code available under the Apache license here:
16+
// https://github.com/apache/incubator-doris/blob/master/be/src/runtime/mysql_result_writer.h
17+
18+
// Licensed to the Apache Software Foundation (ASF) under one
19+
// or more contributor license agreements. See the NOTICE file
20+
// distributed with this work for additional information
21+
// regarding copyright ownership. The ASF licenses this file
22+
// to you under the Apache License, Version 2.0 (the
23+
// "License"); you may not use this file except in compliance
24+
// with the License. You may obtain a copy of the License at
25+
//
26+
// http://www.apache.org/licenses/LICENSE-2.0
27+
//
28+
// Unless required by applicable law or agreed to in writing,
29+
// software distributed under the License is distributed on an
30+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
31+
// KIND, either express or implied. See the License for the
32+
// specific language governing permissions and limitations
33+
// under the License.
34+
35+
#pragma once
36+
37+
#include "common/statusor.h"
38+
#include "runtime/result_writer.h"
39+
#include "runtime/runtime_state.h"
40+
#include "util/raw_container.h"
41+
42+
namespace starrocks {
43+
44+
class ExprContext;
45+
class BufferControlBlock;
46+
class RuntimeProfile;
47+
using TFetchDataResultPtr = std::unique_ptr<TFetchDataResult>;
48+
using TFetchDataResultPtrs = std::vector<TFetchDataResultPtr>;
49+
// convert the row batch to mysql protocol row
50+
class HttpResultWriter final : public ResultWriter {
51+
public:
52+
HttpResultWriter(BufferControlBlock* sinker, const std::vector<ExprContext*>& output_expr_ctxs,
53+
RuntimeProfile* parent_profile, TResultSinkFormatType::type format_type);
54+
55+
Status init(RuntimeState* state) override;
56+
57+
Status append_chunk(Chunk* chunk) override;
58+
59+
Status close() override;
60+
61+
StatusOr<TFetchDataResultPtrs> process_chunk(Chunk* chunk) override;
62+
63+
StatusOr<bool> try_add_batch(TFetchDataResultPtrs& results) override;
64+
65+
private:
66+
void _init_profile();
67+
68+
void _transform_row_to_json(const Columns& column, int idx);
69+
70+
BufferControlBlock* _sinker;
71+
const std::vector<ExprContext*>& _output_expr_ctxs;
72+
73+
raw::RawString row_str;
74+
RuntimeProfile* _parent_profile; // parent profile from result sink. not owned
75+
// total time cost on append chunk operation
76+
RuntimeProfile::Counter* _append_chunk_timer = nullptr;
77+
// tuple convert timer, child timer of _append_chunk_timer
78+
RuntimeProfile::Counter* _convert_tuple_timer = nullptr;
79+
// file write timer, child timer of _append_chunk_timer
80+
RuntimeProfile::Counter* _result_send_timer = nullptr;
81+
// number of sent rows
82+
RuntimeProfile::Counter* _sent_rows_counter = nullptr;
83+
84+
const size_t _max_row_buffer_size = 1024 * 1024 * 1024;
85+
86+
// result's format, right now just support json format
87+
TResultSinkFormatType::type _format_type;
88+
};
89+
90+
} // namespace starrocks

0 commit comments

Comments
 (0)