Skip to content

Commit 408e53c

Browse files
committed
support native format
1 parent bb92995 commit 408e53c

File tree

18 files changed

+1422
-16
lines changed

18 files changed

+1422
-16
lines changed

be/src/service/internal_service.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@
122122
#include "vec/exec/format/csv/csv_reader.h"
123123
#include "vec/exec/format/generic_reader.h"
124124
#include "vec/exec/format/json/new_json_reader.h"
125+
#include "vec/exec/format/native/native_reader.h"
125126
#include "vec/exec/format/orc/vorc_reader.h"
126127
#include "vec/exec/format/parquet/vparquet_reader.h"
127128
#include "vec/exec/format/text/text_reader.h"
@@ -856,6 +857,14 @@ void PInternalService::fetch_table_schema(google::protobuf::RpcController* contr
856857
reader = vectorized::OrcReader::create_unique(params, range, "", &io_ctx);
857858
break;
858859
}
860+
case TFileFormatType::FORMAT_NATIVE: {
861+
// Doris Native binary format reader for schema probing in S3/FILE TVF.
862+
// Use a reasonable default batch size; it only affects internal buffering.
863+
size_t batch_size = 4096;
864+
reader = vectorized::NativeReader::create_unique(profile.get(), params, range,
865+
batch_size, &io_ctx, nullptr);
866+
break;
867+
}
859868
case TFileFormatType::FORMAT_JSON: {
860869
reader = vectorized::NewJsonReader::create_unique(profile.get(), params, range,
861870
file_slots, &io_ctx);

be/src/vec/data_types/serde/data_type_serde.h

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ class IColumn;
8989
class Arena;
9090
class IDataType;
9191
struct CastParameters;
92+
class BufferWritable;
93+
class ReadBuffer;
9294

9395
class DataTypeSerDe;
9496
using DataTypeSerDeSPtr = std::shared_ptr<DataTypeSerDe>;
@@ -458,6 +460,30 @@ class DataTypeSerDe {
458460
int64_t start, int64_t end,
459461
const cctz::time_zone& ctz) const = 0;
460462

463+
// Doris Native binary columnar format serializer and deserializer.
464+
// These APIs are designed to be the low-level building blocks for
465+
// NativeReader / VNativeTransformer.
466+
//
467+
// Contract:
468+
// - write_column_to_native writes [offset, offset + limit) rows of `column`
469+
// into `buf` in a compact binary form, including null map if necessary.
470+
// - read_column_from_native reads exactly `rows` values from `buf` and
471+
// appends them to `column`.
472+
//
473+
// Default implementation returns NotSupported to allow incremental adoption
474+
// by each concrete SerDe.
475+
virtual Status write_column_to_native(const IColumn& /*column*/, size_t /*offset*/,
476+
size_t /*limit*/, BufferWritable& /*buf*/) const {
477+
return Status::NotSupported("write_column_to_native is not implemented for type {}",
478+
get_name());
479+
}
480+
481+
virtual Status read_column_from_native(IColumn& /*column*/, ReadBuffer& /*buf*/,
482+
size_t /*rows*/) const {
483+
return Status::NotSupported("read_column_from_native is not implemented for type {}",
484+
get_name());
485+
}
486+
461487
// ORC serializer
462488
virtual Status write_column_to_orc(const std::string& timezone, const IColumn& column,
463489
const NullMap* null_map,

be/src/vec/data_types/serde/data_type_variant_serde.cpp

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
#include "data_type_variant_serde.h"
1919

20+
#include <arrow/array/builder_binary.h>
21+
2022
#include <cstdint>
2123
#include <string>
2224

@@ -145,21 +147,34 @@ Status DataTypeVariantSerDe::write_column_to_arrow(const IColumn& column, const
145147
int64_t start, int64_t end,
146148
const cctz::time_zone& ctz) const {
147149
const auto* var = check_and_get_column<ColumnVariant>(column);
148-
auto& builder = assert_cast<arrow::StringBuilder&>(*array_builder);
149-
for (size_t i = start; i < end; ++i) {
150-
if (null_map && (*null_map)[i]) {
151-
RETURN_IF_ERROR(checkArrowStatus(builder.AppendNull(), column.get_name(),
152-
array_builder->type()->name()));
153-
} else {
154-
std::string serialized_value;
155-
var->serialize_one_row_to_string(i, &serialized_value);
156-
RETURN_IF_ERROR(
157-
checkArrowStatus(builder.Append(serialized_value.data(),
158-
static_cast<int>(serialized_value.size())),
159-
column.get_name(), array_builder->type()->name()));
150+
151+
auto write_impl = [&](auto& builder) -> Status {
152+
for (size_t i = start; i < end; ++i) {
153+
if (null_map && (*null_map)[i]) {
154+
RETURN_IF_ERROR(checkArrowStatus(builder.AppendNull(), column.get_name(),
155+
array_builder->type()->name()));
156+
} else {
157+
std::string serialized_value;
158+
var->serialize_one_row_to_string(i, &serialized_value);
159+
RETURN_IF_ERROR(
160+
checkArrowStatus(builder.Append(serialized_value.data(),
161+
static_cast<int>(serialized_value.size())),
162+
column.get_name(), array_builder->type()->name()));
163+
}
160164
}
165+
return Status::OK();
166+
};
167+
168+
if (array_builder->type()->id() == arrow::Type::LARGE_STRING) {
169+
auto& builder = assert_cast<arrow::LargeStringBuilder&>(*array_builder);
170+
return write_impl(builder);
171+
} else if (array_builder->type()->id() == arrow::Type::STRING) {
172+
auto& builder = assert_cast<arrow::StringBuilder&>(*array_builder);
173+
return write_impl(builder);
161174
}
162-
return Status::OK();
175+
176+
return Status::InvalidArgument("Unsupported arrow type for variant column: {}",
177+
array_builder->type()->name());
163178
}
164179

165180
void DataTypeVariantSerDe::to_string(const IColumn& column, size_t row_num,

0 commit comments

Comments
 (0)