Skip to content

Commit 5b29bb6

Browse files
bryndenZhclaude
authored andcommitted
[lake] Fix union read duplicating rows for non-String partition columns
For a partitioned table with table.datalake.enabled=true and a non-String partition column (DATE, INT, TIMESTAMP, etc.), default union read returned each tiered row twice because PaimonSplit#partition() read every partition field via BinaryRow.getString regardless of logical type. The lake-side partition name then never matched the Fluss-side name in LakeSplitGenerator, so the same partition was emitted as both a lake-only split and a Fluss-log split. Add PaimonPartitionUtils to render partition values in a logical-type-aware way matching PartitionUtils#convertValueOfType, compute them in PaimonSplitPlanner, and apply the same fix to DvTableReadableSnapshotRetriever. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
1 parent 03dad88 commit 5b29bb6

8 files changed

Lines changed: 284 additions & 28 deletions

File tree

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplit.java

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,8 @@
2020

2121
import org.apache.fluss.lake.source.LakeSplit;
2222

23-
import org.apache.paimon.data.BinaryRow;
2423
import org.apache.paimon.table.source.DataSplit;
2524

26-
import java.util.ArrayList;
27-
import java.util.Collections;
2825
import java.util.List;
2926

3027
/** Split for paimon table. */
@@ -36,9 +33,13 @@ public class PaimonSplit implements LakeSplit {
3633

3734
private final boolean isBucketUnAware;
3835

39-
public PaimonSplit(DataSplit dataSplit, boolean isBucketUnAware) {
36+
// Partition values in Fluss partition-name format
37+
private final List<String> partition;
38+
39+
public PaimonSplit(DataSplit dataSplit, boolean isBucketUnAware, List<String> partition) {
4040
this.dataSplit = dataSplit;
4141
this.isBucketUnAware = isBucketUnAware;
42+
this.partition = partition;
4243
}
4344

4445
@Override
@@ -52,19 +53,7 @@ public int bucket() {
5253

5354
@Override
5455
public List<String> partition() {
55-
BinaryRow partition = dataSplit.partition();
56-
if (partition.getFieldCount() == 0) {
57-
return Collections.emptyList();
58-
}
59-
60-
List<String> partitions = new ArrayList<>();
61-
for (int i = 0; i < partition.getFieldCount(); i++) {
62-
// Todo Currently, partition column must be String datatype, so we can always use
63-
// consider it as string. Revisit here when
64-
// #489 is finished.
65-
partitions.add(partition.getString(i).toString());
66-
}
67-
return partitions;
56+
return partition;
6857
}
6958

7059
public DataSplit dataSplit() {

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitPlanner.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.fluss.lake.paimon.source;
2020

2121
import org.apache.fluss.config.Configuration;
22+
import org.apache.fluss.lake.paimon.utils.PaimonPartitionUtils;
2223
import org.apache.fluss.lake.source.Planner;
2324
import org.apache.fluss.metadata.TablePath;
2425

@@ -33,6 +34,7 @@
3334
import org.apache.paimon.table.source.DataSplit;
3435
import org.apache.paimon.table.source.InnerTableScan;
3536
import org.apache.paimon.table.source.Split;
37+
import org.apache.paimon.types.RowType;
3638

3739
import javax.annotation.Nullable;
3840

@@ -69,13 +71,19 @@ public List<PaimonSplit> plan() {
6971
FileStoreTable fileStoreTable = getTable(catalog, tablePath, snapshotId);
7072
InnerTableScan tableScan = fileStoreTable.newScan();
7173
boolean isBucketUnAware = fileStoreTable.bucketMode() == BucketMode.BUCKET_UNAWARE;
74+
RowType partitionType = fileStoreTable.schema().logicalPartitionType();
7275

7376
if (predicate != null) {
7477
tableScan = tableScan.withFilter(predicate);
7578
}
7679
for (Split split : tableScan.plan().splits()) {
7780
DataSplit dataSplit = (DataSplit) split;
78-
splits.add(new PaimonSplit(dataSplit, isBucketUnAware));
81+
List<String> partition =
82+
dataSplit.partition().getFieldCount() == 0
83+
? Collections.emptyList()
84+
: PaimonPartitionUtils.partitionValues(
85+
dataSplit.partition(), partitionType);
86+
splits.add(new PaimonSplit(dataSplit, isBucketUnAware, partition));
7987
}
8088
}
8189
return splits;

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitSerializer.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import java.io.ByteArrayOutputStream;
2929
import java.io.DataInputStream;
3030
import java.io.IOException;
31+
import java.util.ArrayList;
32+
import java.util.List;
3133

3234
/** Serializer for paimon split. */
3335
public class PaimonSplitSerializer implements SimpleVersionedSerializer<PaimonSplit> {
@@ -46,6 +48,11 @@ public byte[] serialize(PaimonSplit paimonSplit) throws IOException {
4648
DataSplit dataSplit = paimonSplit.dataSplit();
4749
InstantiationUtil.serializeObject(view, dataSplit);
4850
view.writeBoolean(paimonSplit.isBucketUnAware());
51+
List<String> partition = paimonSplit.partition();
52+
view.writeInt(partition.size());
53+
for (String value : partition) {
54+
view.writeUTF(value);
55+
}
4956
return out.toByteArray();
5057
}
5158

@@ -59,7 +66,12 @@ public PaimonSplit deserialize(int version, byte[] serialized) throws IOExceptio
5966
if (version == VERSION_1) {
6067
DataInputStream dis = new DataInputStream(in);
6168
boolean isBucketUnAware = dis.readBoolean();
62-
return new PaimonSplit(dataSplit, isBucketUnAware);
69+
int size = dis.readInt();
70+
List<String> partition = new ArrayList<>(size);
71+
for (int i = 0; i < size; i++) {
72+
partition.add(dis.readUTF());
73+
}
74+
return new PaimonSplit(dataSplit, isBucketUnAware, partition);
6375
} else {
6476
throw new IOException("Unsupported PaimonSplit serialization version: " + version);
6577
}

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import org.apache.paimon.CoreOptions;
3636
import org.apache.paimon.Snapshot;
3737
import org.apache.paimon.data.BinaryRow;
38-
import org.apache.paimon.data.BinaryString;
3938
import org.apache.paimon.manifest.FileKind;
4039
import org.apache.paimon.manifest.ManifestEntry;
4140
import org.apache.paimon.operation.FileStoreScan;
@@ -48,7 +47,6 @@
4847
import javax.annotation.Nullable;
4948

5049
import java.io.IOException;
51-
import java.util.ArrayList;
5250
import java.util.HashMap;
5351
import java.util.HashSet;
5452
import java.util.List;
@@ -601,13 +599,10 @@ private Map<String, Long> getPartitionNameToIdMapping() throws IOException {
601599
* @return partition name string
602600
*/
603601
private String getPartitionNameFromBinaryRow(BinaryRow partition) {
604-
List<String> partitionValues = new ArrayList<>();
605-
for (int i = 0; i < partition.getFieldCount(); i++) {
606-
// todo: consider other partition type
607-
BinaryString binaryString = partition.getString(i);
608-
partitionValues.add(binaryString.toString());
609-
}
610-
return String.join(ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR, partitionValues);
602+
return String.join(
603+
ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR,
604+
PaimonPartitionUtils.partitionValues(
605+
partition, fileStoreTable.schema().logicalPartitionType()));
611606
}
612607

613608
@Override
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.lake.paimon.utils;
20+
21+
import org.apache.fluss.row.TimestampLtz;
22+
import org.apache.fluss.row.TimestampNtz;
23+
import org.apache.fluss.utils.PartitionNameConverters;
24+
25+
import org.apache.paimon.data.BinaryRow;
26+
import org.apache.paimon.data.Timestamp;
27+
import org.apache.paimon.types.DataType;
28+
import org.apache.paimon.types.DataTypeChecks;
29+
import org.apache.paimon.types.RowType;
30+
31+
import java.util.ArrayList;
32+
import java.util.List;
33+
34+
/** Extracts partition values from a Paimon partition row as Fluss partition-name strings. */
35+
public class PaimonPartitionUtils {
36+
37+
private PaimonPartitionUtils() {}
38+
39+
/**
40+
* Converts a Paimon partition row into Fluss partition value strings, in partition-key order.
41+
* The output must match {@code PartitionUtils#convertValueOfType} (the Fluss-side format),
42+
* otherwise lake-side and Fluss-side partition names won't match during union read split
43+
* generation.
44+
*/
45+
public static List<String> partitionValues(BinaryRow partition, RowType partitionType) {
46+
List<String> values = new ArrayList<>(partition.getFieldCount());
47+
for (int i = 0; i < partition.getFieldCount(); i++) {
48+
values.add(toFlussPartitionString(partition, i, partitionType.getTypeAt(i)));
49+
}
50+
return values;
51+
}
52+
53+
private static String toFlussPartitionString(BinaryRow partition, int pos, DataType type) {
54+
switch (type.getTypeRoot()) {
55+
case CHAR:
56+
case VARCHAR:
57+
return partition.getString(pos).toString();
58+
case BOOLEAN:
59+
return Boolean.toString(partition.getBoolean(pos));
60+
case TINYINT:
61+
return Byte.toString(partition.getByte(pos));
62+
case SMALLINT:
63+
return Short.toString(partition.getShort(pos));
64+
case INTEGER:
65+
return Integer.toString(partition.getInt(pos));
66+
case BIGINT:
67+
return Long.toString(partition.getLong(pos));
68+
case FLOAT:
69+
return PartitionNameConverters.reformatFloat(partition.getFloat(pos));
70+
case DOUBLE:
71+
return PartitionNameConverters.reformatDouble(partition.getDouble(pos));
72+
case DATE:
73+
return PartitionNameConverters.dayToString(partition.getInt(pos));
74+
case TIME_WITHOUT_TIME_ZONE:
75+
return PartitionNameConverters.milliToString(partition.getInt(pos));
76+
case BINARY:
77+
case VARBINARY:
78+
return PartitionNameConverters.hexString(partition.getBinary(pos));
79+
case TIMESTAMP_WITHOUT_TIME_ZONE:
80+
{
81+
Timestamp ts = partition.getTimestamp(pos, DataTypeChecks.getPrecision(type));
82+
return PartitionNameConverters.timestampToString(
83+
TimestampNtz.fromMillis(
84+
ts.getMillisecond(), ts.getNanoOfMillisecond()));
85+
}
86+
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
87+
{
88+
Timestamp ts = partition.getTimestamp(pos, DataTypeChecks.getPrecision(type));
89+
return PartitionNameConverters.timestampToString(
90+
TimestampLtz.fromEpochMillis(
91+
ts.getMillisecond(), ts.getNanoOfMillisecond()));
92+
}
93+
default:
94+
throw new IllegalArgumentException(
95+
"Unsupported partition column type: " + type.getTypeRoot());
96+
}
97+
}
98+
}

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSplitSerializerTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ void testSerializeAndDeserialize() throws Exception {
7777

7878
assertThat(deserialized.dataSplit()).isEqualTo(originalPaimonSplit.dataSplit());
7979
assertThat(deserialized.isBucketUnAware()).isEqualTo(originalPaimonSplit.isBucketUnAware());
80+
assertThat(deserialized.partition()).isEqualTo(originalPaimonSplit.partition());
8081
}
8182

8283
@Test

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSplitTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.paimon.types.DataTypes;
3636
import org.junit.jupiter.api.Test;
3737

38+
import java.time.LocalDate;
3839
import java.util.Collections;
3940
import java.util.List;
4041

@@ -77,4 +78,33 @@ void testPaimonSplit() throws Exception {
7778
assertThat(actualSplit).isEqualTo(paimonSplit.dataSplit());
7879
assertThat(((DataSplit) actualSplit).bucket()).isEqualTo(paimonSplit.bucket());
7980
}
81+
82+
@Test
83+
void testPaimonSplitWithDatePartition() throws Exception {
84+
int bucketNum = 1;
85+
TablePath tablePath = TablePath.of(DEFAULT_DB, "non_string_partition_table");
86+
Schema.Builder builder =
87+
Schema.newBuilder()
88+
.column("c1", DataTypes.INT())
89+
.column("c2", DataTypes.STRING())
90+
.column("dt", DataTypes.DATE());
91+
builder.partitionKeys("dt");
92+
builder.primaryKey("c1", "dt");
93+
builder.option(CoreOptions.BUCKET.key(), String.valueOf(bucketNum));
94+
createTable(tablePath, builder.build());
95+
Table table = getTable(tablePath);
96+
97+
int epochDay = (int) LocalDate.of(2024, 3, 1).toEpochDay();
98+
GenericRow record1 = GenericRow.of(12, BinaryString.fromString("a"), epochDay);
99+
writeRecord(tablePath, Collections.singletonList(record1));
100+
Snapshot snapshot = table.latestSnapshot().get();
101+
102+
LakeSource<PaimonSplit> lakeSource = lakeStorage.createLakeSource(tablePath);
103+
List<PaimonSplit> paimonSplits = lakeSource.createPlanner(snapshot::id).plan();
104+
105+
// The DATE partition must be rendered in Fluss partition-name format ("2024-03-01"),
106+
// not read blindly via BinaryRow.getString which yields garbage for non-String columns.
107+
PaimonSplit paimonSplit = paimonSplits.get(0);
108+
assertThat(paimonSplit.partition()).isEqualTo(Collections.singletonList("2024-03-01"));
109+
}
80110
}

0 commit comments

Comments
 (0)