Skip to content

Commit 9805fae

Browse files
authored
feat: implement struct like for partition values (#354)
- Added PartitionValues to extend StructLike - Implemented PartitionMap - Implemented PartitionSet
1 parent 913b335 commit 9805fae

19 files changed

+1140
-28
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ set(ICEBERG_SOURCES
4646
partition_summary.cc
4747
row/arrow_array_wrapper.cc
4848
row/manifest_wrapper.cc
49+
row/partition_values.cc
4950
row/struct_like.cc
5051
schema.cc
5152
schema_field.cc

src/iceberg/manifest_adapter.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -165,17 +165,17 @@ ManifestEntryAdapter::~ManifestEntryAdapter() {
165165

166166
Status ManifestEntryAdapter::AppendPartitionValues(
167167
ArrowArray* array, const std::shared_ptr<StructType>& partition_type,
168-
const std::vector<Literal>& partition_values) {
168+
const PartitionValues& partition_values) {
169169
if (array->n_children != partition_type->fields().size()) [[unlikely]] {
170170
return InvalidArrowData("Arrow array of partition does not match partition type.");
171171
}
172-
if (partition_values.size() != partition_type->fields().size()) [[unlikely]] {
172+
if (partition_values.num_fields() != partition_type->fields().size()) [[unlikely]] {
173173
return InvalidArrowData("Literal list of partition does not match partition type.");
174174
}
175175
auto fields = partition_type->fields();
176176

177177
for (size_t i = 0; i < fields.size(); i++) {
178-
const auto& partition_value = partition_values[i];
178+
const auto& partition_value = partition_values.ValueAt(i)->get();
179179
const auto& partition_field = fields[i];
180180
auto child_array = array->children[i];
181181
if (partition_value.IsNull()) {

src/iceberg/manifest_adapter.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
8585
const DataFile& file);
8686
static Status AppendPartitionValues(ArrowArray* array,
8787
const std::shared_ptr<StructType>& partition_type,
88-
const std::vector<Literal>& partition_values);
88+
const PartitionValues& partition_values);
8989

9090
virtual Result<std::optional<int64_t>> GetSequenceNumber(
9191
const ManifestEntry& entry) const;

src/iceberg/manifest_entry.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@
2626
#include <string>
2727
#include <vector>
2828

29-
#include "iceberg/expression/literal.h"
3029
#include "iceberg/file_format.h"
3130
#include "iceberg/iceberg_export.h"
3231
#include "iceberg/partition_spec.h"
3332
#include "iceberg/result.h"
33+
#include "iceberg/row/partition_values.h"
3434
#include "iceberg/schema_field.h"
3535
#include "iceberg/type.h"
3636

@@ -79,7 +79,7 @@ struct ICEBERG_EXPORT DataFile {
7979
/// Field id: 102
8080
/// Partition data tuple, schema based on the partition spec output using partition
8181
/// field ids
82-
std::vector<Literal> partition;
82+
PartitionValues partition;
8383
/// Field id: 103
8484
/// Number of records in this file, or the cardinality of a deletion vector
8585
int64_t record_count = 0;

src/iceberg/manifest_reader_internal.cc

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -297,27 +297,26 @@ Status ParseLiteral(ArrowArrayView* view_of_partition, int64_t row_idx,
297297
std::vector<ManifestEntry>& manifest_entries) {
298298
if (view_of_partition->storage_type == ArrowType::NANOARROW_TYPE_BOOL) {
299299
auto value = ArrowArrayViewGetUIntUnsafe(view_of_partition, row_idx);
300-
manifest_entries[row_idx].data_file->partition.emplace_back(
301-
Literal::Boolean(value != 0));
300+
manifest_entries[row_idx].data_file->partition.AddValue(Literal::Boolean(value != 0));
302301
} else if (view_of_partition->storage_type == ArrowType::NANOARROW_TYPE_INT32) {
303302
auto value = ArrowArrayViewGetIntUnsafe(view_of_partition, row_idx);
304-
manifest_entries[row_idx].data_file->partition.emplace_back(Literal::Int(value));
303+
manifest_entries[row_idx].data_file->partition.AddValue(Literal::Int(value));
305304
} else if (view_of_partition->storage_type == ArrowType::NANOARROW_TYPE_INT64) {
306305
auto value = ArrowArrayViewGetIntUnsafe(view_of_partition, row_idx);
307-
manifest_entries[row_idx].data_file->partition.emplace_back(Literal::Long(value));
306+
manifest_entries[row_idx].data_file->partition.AddValue(Literal::Long(value));
308307
} else if (view_of_partition->storage_type == ArrowType::NANOARROW_TYPE_FLOAT) {
309308
auto value = ArrowArrayViewGetDoubleUnsafe(view_of_partition, row_idx);
310-
manifest_entries[row_idx].data_file->partition.emplace_back(Literal::Float(value));
309+
manifest_entries[row_idx].data_file->partition.AddValue(Literal::Float(value));
311310
} else if (view_of_partition->storage_type == ArrowType::NANOARROW_TYPE_DOUBLE) {
312311
auto value = ArrowArrayViewGetDoubleUnsafe(view_of_partition, row_idx);
313-
manifest_entries[row_idx].data_file->partition.emplace_back(Literal::Double(value));
312+
manifest_entries[row_idx].data_file->partition.AddValue(Literal::Double(value));
314313
} else if (view_of_partition->storage_type == ArrowType::NANOARROW_TYPE_STRING) {
315314
auto value = ArrowArrayViewGetStringUnsafe(view_of_partition, row_idx);
316-
manifest_entries[row_idx].data_file->partition.emplace_back(
315+
manifest_entries[row_idx].data_file->partition.AddValue(
317316
Literal::String(std::string(value.data, value.size_bytes)));
318317
} else if (view_of_partition->storage_type == ArrowType::NANOARROW_TYPE_BINARY) {
319318
auto buffer = ArrowArrayViewGetBytesUnsafe(view_of_partition, row_idx);
320-
manifest_entries[row_idx].data_file->partition.emplace_back(
319+
manifest_entries[row_idx].data_file->partition.AddValue(
321320
Literal::Binary(std::vector<uint8_t>(buffer.data.as_char,
322321
buffer.data.as_char + buffer.size_bytes)));
323322
} else {

src/iceberg/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ iceberg_sources = files(
6868
'partition_summary.cc',
6969
'row/arrow_array_wrapper.cc',
7070
'row/manifest_wrapper.cc',
71+
'row/partition_values.cc',
7172
'row/struct_like.cc',
7273
'schema.cc',
7374
'schema_field.cc',

src/iceberg/partition_summary.cc

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "iceberg/manifest_list.h"
2424
#include "iceberg/partition_summary_internal.h"
2525
#include "iceberg/result.h"
26+
#include "iceberg/row/partition_values.h"
2627
#include "iceberg/util/checked_cast.h"
2728
#include "iceberg/util/formatter.h" // IWYU pragma: keep
2829
#include "iceberg/util/macros.h"
@@ -74,18 +75,18 @@ PartitionSummary::PartitionSummary(const StructType& partition_type) {
7475
}
7576
}
7677

77-
Status PartitionSummary::Update(const std::vector<Literal>& partition_values) {
78-
if (partition_values.size() != field_stats_.size()) [[unlikely]] {
78+
Status PartitionSummary::Update(const PartitionValues& partition_values) {
79+
if (partition_values.num_fields() != field_stats_.size()) [[unlikely]] {
7980
return InvalidArgument("partition values size {} does not match field stats size {}",
80-
partition_values.size(), field_stats_.size());
81+
partition_values.num_fields(), field_stats_.size());
8182
}
8283

83-
for (size_t i = 0; i < partition_values.size(); i++) {
84+
for (size_t i = 0; i < partition_values.num_fields(); i++) {
85+
ICEBERG_ASSIGN_OR_RAISE(auto val, partition_values.ValueAt(i));
8486
ICEBERG_ASSIGN_OR_RAISE(
85-
auto literal,
86-
partition_values[i].CastTo(
87-
internal::checked_pointer_cast<PrimitiveType>(field_stats_[i].type())));
88-
ICEBERG_RETURN_UNEXPECTED(field_stats_[i].Update(literal));
87+
auto lit, val.get().CastTo(internal::checked_pointer_cast<PrimitiveType>(
88+
field_stats_[i].type())));
89+
ICEBERG_RETURN_UNEXPECTED(field_stats_[i].Update(lit));
8990
}
9091
return {};
9192
}

src/iceberg/partition_summary_internal.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ class PartitionSummary {
5858
explicit PartitionSummary(const StructType& partition_type);
5959

6060
/// \brief Update the partition summary with partition values.
61-
Status Update(const std::vector<Literal>& partition_values);
61+
Status Update(const PartitionValues& partition_values);
6262

6363
/// \brief Get the list of partition field summaries.
6464
Result<std::vector<PartitionFieldSummary>> Summaries() const;

src/iceberg/row/meson.build

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@
1616
# under the License.
1717

1818
install_headers(
19-
['arrow_array_wrapper.h', 'manifest_wrapper.h', 'struct_like.h'],
19+
[
20+
'arrow_array_wrapper.h',
21+
'manifest_wrapper.h',
22+
'partition_values.h',
23+
'struct_like.h',
24+
],
2025
subdir: 'iceberg/row',
2126
)
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/row/partition_values.h"
21+
22+
namespace iceberg {
23+
24+
PartitionValues& PartitionValues::operator=(const PartitionValues& other) {
25+
if (this != &other) {
26+
values_ = other.values_;
27+
}
28+
return *this;
29+
}
30+
31+
bool PartitionValues::operator==(const PartitionValues& other) const {
32+
return values_ == other.values_;
33+
}
34+
35+
Result<Scalar> PartitionValues::GetField(size_t pos) const {
36+
if (pos >= values_.size()) {
37+
return InvalidArgument(
38+
"Position {} is out of bounds for PartitionValues with {} fields", pos,
39+
values_.size());
40+
}
41+
42+
const auto& literal = values_[pos];
43+
44+
// Handle null values
45+
if (literal.IsNull()) {
46+
return Scalar{std::monostate{}};
47+
}
48+
49+
// Convert Literal to Scalar based on type
50+
switch (literal.type()->type_id()) {
51+
case TypeId::kBoolean:
52+
return Scalar{std::get<bool>(literal.value())};
53+
case TypeId::kInt:
54+
case TypeId::kDate:
55+
return Scalar{std::get<int32_t>(literal.value())};
56+
case TypeId::kLong:
57+
case TypeId::kTime:
58+
case TypeId::kTimestamp:
59+
case TypeId::kTimestampTz:
60+
return Scalar{std::get<int64_t>(literal.value())};
61+
case TypeId::kFloat:
62+
return Scalar{std::get<float>(literal.value())};
63+
case TypeId::kDouble:
64+
return Scalar{std::get<double>(literal.value())};
65+
case TypeId::kString: {
66+
const auto& str = std::get<std::string>(literal.value());
67+
return Scalar{std::string_view(str)};
68+
}
69+
case TypeId::kBinary:
70+
case TypeId::kFixed: {
71+
const auto& bytes = std::get<std::vector<uint8_t>>(literal.value());
72+
return Scalar{
73+
std::string_view(reinterpret_cast<const char*>(bytes.data()), bytes.size())};
74+
}
75+
case TypeId::kDecimal:
76+
return Scalar{std::get<Decimal>(literal.value())};
77+
default:
78+
return NotSupported("Cannot convert literal of type {} to Scalar",
79+
literal.type()->ToString());
80+
}
81+
}
82+
83+
Result<std::reference_wrapper<const Literal>> PartitionValues::ValueAt(size_t pos) const {
84+
if (pos >= values_.size()) {
85+
return InvalidArgument("Cannot get partition value at {} from {} fields", pos,
86+
values_.size());
87+
}
88+
return std::cref(values_[pos]);
89+
}
90+
91+
} // namespace iceberg

0 commit comments

Comments
 (0)