Skip to content

Commit

Permalink
Merge pull request #24997 from rockwotj/parquet-pages
Browse files Browse the repository at this point in the history
parquet: support multiple pages in a column per row group
  • Loading branch information
rockwotj authored Feb 4, 2025
2 parents 4c7633c + e5fe32a commit bcfa53f
Show file tree
Hide file tree
Showing 15 changed files with 455 additions and 156 deletions.
2 changes: 1 addition & 1 deletion src/go/rpk/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ require (
go.uber.org/zap v1.27.0
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c
golang.org/x/sync v0.8.0
golang.org/x/sys v0.28.0
golang.org/x/sys v0.29.0
golang.org/x/term v0.25.0
google.golang.org/protobuf v1.36.2
gopkg.in/yaml.v3 v3.0.1
Expand Down
4 changes: 2 additions & 2 deletions src/go/rpk/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,8 @@ golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.25.0 h1:WtHI/ltw4NvSUig5KARz9h521QvRC8RmF/cuYqifU24=
Expand Down
2 changes: 1 addition & 1 deletion src/transform-sdk/tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,6 @@ require (
go.opentelemetry.io/otel/trace v1.32.0 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/mod v0.22.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/sys v0.29.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
4 changes: 2 additions & 2 deletions src/transform-sdk/tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q=
golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down
2 changes: 2 additions & 0 deletions src/v/serde/parquet/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ redpanda_cc_library(
"//src/v/bytes:iobuf",
"//src/v/bytes:iobuf_parser",
"//src/v/compression",
"//src/v/container:fragmented_vector",
"//src/v/hashing:crc32",
"@abseil-cpp//absl/numeric:int128",
"@seastar",
Expand All @@ -145,6 +146,7 @@ redpanda_cc_library(
"//src/v/bytes:iobuf",
"//src/v/bytes:iostream",
"//src/v/container:contiguous_range_map",
"@abseil-cpp//absl/container:flat_hash_set",
],
include_prefix = "serde/parquet",
visibility = ["//visibility:public"],
Expand Down
42 changes: 21 additions & 21 deletions src/v/serde/parquet/column_stats_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,16 @@ fixed_byte_array_value copy(fixed_byte_array_value&);
// it in the metadata for query engine performance.
template<typename value_type, auto comparator>
class column_stats_collector {
public:
using ref_type = std::conditional_t<
std::is_trivially_copyable_v<value_type>,
value_type,
value_type&>;
using const_ref_type = std::conditional_t<
std::is_trivially_copyable_v<value_type>,
const value_type,
const value_type&>;
using bound_ref_type = std::conditional_t<
std::is_trivially_copyable_v<value_type>,
std::optional<value_type>,
const std::optional<value_type>&>;

public:
// Record a value in the collector
void record_value(ref_type v) {
if constexpr (std::is_floating_point_v<decltype(v.val)>) {
Expand All @@ -85,7 +81,7 @@ class column_stats_collector {
}

// Record a null in the collector
void record_null() { ++_null_count; }
void record_null(int64_t n = 1) { _null_count += n; }

// Merge another stats collector into this one.
void merge(column_stats_collector<value_type, comparator>& other) {
Expand All @@ -109,28 +105,32 @@ class column_stats_collector {
}

int64_t null_count() const { return _null_count; }
bound_ref_type min() const {
// According to the rules for stats, a zero floating point value is
// always written as negative.
if constexpr (std::is_floating_point_v<decltype(_min->val)>) {
if (_min && _min->val == 0.0) {
return std::make_optional<value_type>(-0.0);

bound_ref_type min() const { return view(_min, true); }
std::optional<value_type> take_min() { return take(_min, true); }
bound_ref_type max() const { return view(_max, false); }
std::optional<value_type> take_max() { return take(_max, false); }

private:
bound_ref_type view(const std::optional<value_type>& v, bool min) const {
if constexpr (std::is_floating_point_v<decltype(v->val)>) {
// min floats are always written as -0 and max as 0
if (v && v->val == 0.0) {
return std::make_optional<value_type>(min ? -0.0 : 0.0);
}
}
return _min;
return v;
}
bound_ref_type max() const {
// According to the rules for stats, a zero floating point value is
// always written as positive.
if constexpr (std::is_floating_point_v<decltype(_max->val)>) {
if (_max && _max->val == 0.0) {
return std::make_optional<value_type>(0.0);
std::optional<value_type> take(std::optional<value_type>& v, bool min) {
if constexpr (std::is_floating_point_v<decltype(v->val)>) {
// min floats are always written as -0 and max as 0
if (v && v->val == 0.0) {
return std::make_optional<value_type>(min ? -0.0 : 0.0);
}
}
return _max;
return std::exchange(v, {});
}

private:
std::optional<value_type> _min;
std::optional<value_type> _max;
int64_t _null_count = 0;
Expand Down
130 changes: 99 additions & 31 deletions src/v/serde/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@

#include "serde/parquet/column_writer.h"

#include "column_stats_collector.h"
#include "compression/compression.h"
#include "container/fragmented_vector.h"
#include "hashing/crc32.h"
#include "serde/parquet/column_stats_collector.h"
#include "serde/parquet/encoding.h"

#include <seastar/util/variant_utils.hh>
Expand All @@ -39,8 +40,10 @@ class column_writer::impl {
impl& operator=(impl&&) noexcept = default;
virtual ~impl() noexcept = default;

virtual incremental_column_stats add(value, rep_level, def_level) = 0;
virtual ss::future<data_page> flush_page() = 0;
virtual ss::future<> add(value, rep_level, def_level) = 0;
virtual size_t memory_usage() const = 0;
virtual size_t total_rows() const = 0;
virtual ss::future<flushed_pages> flush_pages() = 0;
};

namespace {
Expand All @@ -66,16 +69,20 @@ class buffered_column_writer final : public column_writer::impl {
, _max_def_level(schema_element.max_definition_level)
, _opts(opts) {}

incremental_column_stats
add(value val, rep_level rl, def_level dl) override {
++_num_values;
ss::future<> add(value val, rep_level rl, def_level dl) override {
// A repetition level of zero means that it's the start of a new row and
// not a repeated value within the same row.
if (rl == rep_level(0)) {
// We can ONLY flush on row boundaries, so make sure this is the
// first thing we do.
if (_current_buffer_size > _opts.page_buffer_size) {
co_await flush_page();
}
++_num_rows;
}
++_num_values;

uint64_t value_memory_usage = 0;
int64_t value_memory_usage = 0;

ss::visit(
std::move(val),
Expand All @@ -85,13 +92,13 @@ class buffered_column_writer final : public column_writer::impl {
} else {
value_memory_usage = sizeof(value_type);
}
_stats.record_value(v);
_current_page_stats.record_value(v);
_value_buffer.push_back(std::move(v));
},
[this](null_value&) {
// null values are valid, but are not encoded in the actual data,
// they are encoded in the defintion levels.
_stats.record_null();
_current_page_stats.record_null();
},
[](auto& v) {
throw std::runtime_error(fmt::format(
Expand All @@ -105,13 +112,28 @@ class buffered_column_writer final : public column_writer::impl {
// always use the full capacity in our value buffer, and eagerly
// accounting that usage might cause callers to overagressively
// flush pages/row groups.
return {
.memory_usage = value_memory_usage + sizeof(rep_level)
+ sizeof(def_level),
};
_current_buffer_size += value_memory_usage
+ static_cast<int64_t>(
sizeof(rep_level) + sizeof(def_level));
}

size_t memory_usage() const override {
size_t size = _current_buffer_size;
for (const auto& page : _flushed_pages) {
size += page.serialized.size_bytes();
}
return size;
}

ss::future<data_page> flush_page() override {
size_t total_rows() const override {
size_t total = _num_rows;
for (const auto& page : _flushed_pages) {
total += std::get<data_page_header>(page.header.type).num_rows;
}
return total;
}

ss::future<> flush_page() {
iobuf encoded_def_levels;
// If the max level is 0 then we don't write levels at all.
if (_max_def_level > def_level(0)) {
Expand Down Expand Up @@ -145,29 +167,41 @@ class buffered_column_writer final : public column_writer::impl {
size_t compressed_page_size = encoded_def_levels.size_bytes()
+ encoded_rep_levels.size_bytes()
+ encoded_data.size_bytes();
std::optional<statistics::bound> max_bound;
if (auto max = _current_page_stats.take_max()) {
// TODO: consider truncating large values instead of writing them
// (is_exact=false)
max_bound.emplace(
/*value=*/encode_for_stats(*max),
/*is_exact=*/true);
_flushed_stats.record_value(*max);
}
std::optional<statistics::bound> min_bound;
if (auto min = _current_page_stats.take_min()) {
// TODO: consider truncating large values instead of writing them
// (is_exact=false)
min_bound.emplace(
/*value=*/encode_for_stats(*min),
/*is_exact=*/true);
_flushed_stats.record_value(*min);
}
_flushed_stats.record_null(_current_page_stats.null_count());
page_header header{
.uncompressed_page_size = static_cast<int32_t>(uncompressed_page_size),
.compressed_page_size = static_cast<int32_t>(compressed_page_size),
.crc = compute_crc32(encoded_rep_levels, encoded_def_levels, encoded_data),
.type = data_page_header{
.num_values = std::exchange(_num_values, 0),
.num_nulls = static_cast<int32_t>(_stats.null_count()),
.num_nulls = static_cast<int32_t>(_current_page_stats.null_count()),
.num_rows = std::exchange(_num_rows, 0),
.data_encoding = encoding::plain,
.definition_levels_byte_length = static_cast<int32_t>(encoded_def_levels.size_bytes()),
.repetition_levels_byte_length = static_cast<int32_t>(encoded_rep_levels.size_bytes()),
.is_compressed = _opts.compress,
.stats = statistics{
.null_count = _stats.null_count(),
// TODO: consider truncating large values instead of writing them (is_exact=false)
.max = _stats.max() ? std::make_optional<statistics::bound>(
/*value=*/encode_for_stats(*_stats.max()),
/*is_exact=*/true
) : std::nullopt,
.min = _stats.min() ? std::make_optional<statistics::bound>(
/*value=*/encode_for_stats(*_stats.min()),
/*is_exact=*/true
) : std::nullopt,
.null_count = _current_page_stats.null_count(),
.max = std::move(max_bound),
.min = std::move(min_bound),
},
},
};
Expand All @@ -176,19 +210,50 @@ class buffered_column_writer final : public column_writer::impl {
full_page_data.append(std::move(encoded_rep_levels));
full_page_data.append(std::move(encoded_def_levels));
full_page_data.append(std::move(encoded_data));
_stats.reset();
co_return data_page{
_current_page_stats.reset();
_current_buffer_size = 0;
_flushed_pages.push_back(data_page{
.header = std::move(header),
.serialized_header_size = header_size,
.serialized = std::move(full_page_data),
});
}

ss::future<flushed_pages> flush_pages() override {
if (_current_buffer_size > 0) {
co_await flush_page();
}

statistics full_stats{
.null_count = _flushed_stats.null_count(),
.max = {},
.min = {},
};
if (auto max = _flushed_stats.take_max()) {
full_stats.max.emplace(
/*value=*/encode_for_stats(*max),
/*is_exact=*/true);
}
if (auto min = _flushed_stats.take_min()) {
full_stats.min.emplace(
/*value=*/encode_for_stats(*min),
/*is_exact=*/true);
}
_flushed_stats.reset();
co_return flushed_pages{
.pages = std::exchange(_flushed_pages, {}),
.stats = std::move(full_stats),
};
}

private:
column_stats_collector<value_type, comparator> _stats;
column_stats_collector<value_type, comparator> _current_page_stats;
column_stats_collector<value_type, comparator> _flushed_stats;
size_t _current_buffer_size = 0;
chunked_vector<value_type> _value_buffer;
chunked_vector<def_level> _def_levels;
chunked_vector<rep_level> _rep_levels;
chunked_vector<data_page> _flushed_pages;
int32_t _num_rows = 0;
int32_t _num_values = 0;
rep_level _max_rep_level;
Expand Down Expand Up @@ -279,13 +344,16 @@ column_writer::column_writer(column_writer&&) noexcept = default;
column_writer& column_writer::operator=(column_writer&&) noexcept = default;
column_writer::~column_writer() noexcept = default;

incremental_column_stats
ss::future<>
column_writer::add(value val, rep_level rep_level, def_level def_level) {
return _impl->add(std::move(val), rep_level, def_level);
}

ss::future<data_page> column_writer::flush_page() {
return _impl->flush_page();
size_t column_writer::memory_usage() const { return _impl->memory_usage(); }
size_t column_writer::total_rows() const { return _impl->total_rows(); }

ss::future<flushed_pages> column_writer::flush_pages() {
return _impl->flush_pages();
}

} // namespace serde::parquet
Loading

0 comments on commit bcfa53f

Please sign in to comment.