Skip to content

Commit 907f9a2

Browse files
author
黄潇
committed
rebase main and resolve comments
1 parent 5abe025 commit 907f9a2

9 files changed

Lines changed: 349 additions & 10 deletions

File tree

fluss-common/src/main/java/org/apache/fluss/lake/watermark/WatermarkExtractor.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.fluss.lake.watermark;
1919

20+
import org.apache.fluss.lake.batch.RecordBatch;
2021
import org.apache.fluss.row.InternalRow;
2122

2223
import javax.annotation.Nullable;
@@ -38,4 +39,14 @@ public interface WatermarkExtractor {
3839
*/
3940
@Nullable
4041
Long currentWatermark(InternalRow row);
42+
43+
/**
44+
* Extracts the maximum watermark for the given record batch.
45+
*
46+
* @param recordBatch the record batch to extract the watermark from
47+
* @return the maximum watermark in epoch milliseconds, or {@code null} if the batch does not
48+
* provide a watermark value
49+
*/
50+
@Nullable
51+
Long currentWatermark(RecordBatch recordBatch);
4152
}

fluss-common/src/main/java/org/apache/fluss/record/ArrowBatchData.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,15 @@
2121

2222
import org.apache.arrow.memory.ArrowBuf;
2323
import org.apache.arrow.vector.FieldVector;
24+
import org.apache.arrow.vector.TimeStampMicroVector;
25+
import org.apache.arrow.vector.TimeStampMilliVector;
26+
import org.apache.arrow.vector.TimeStampNanoVector;
27+
import org.apache.arrow.vector.TimeStampSecVector;
28+
import org.apache.arrow.vector.TimeStampVector;
2429
import org.apache.arrow.vector.VectorSchemaRoot;
2530

31+
import javax.annotation.Nullable;
32+
2633
import static org.apache.fluss.utils.Preconditions.checkArgument;
2734
import static org.apache.fluss.utils.Preconditions.checkNotNull;
2835

@@ -86,6 +93,53 @@ public long getSizeInBytes() {
8693
return size;
8794
}
8895

96+
/**
97+
* Returns the maximum non-null timestamp value of the given field in epoch milliseconds, or
98+
* {@code null} if the field is not a timestamp field or all values are null.
99+
*/
100+
@Nullable
101+
public Long getMaxTimestampMillis(int fieldIndex) {
102+
if (fieldIndex < 0 || fieldIndex >= vectorSchemaRoot.getFieldVectors().size()) {
103+
return null;
104+
}
105+
FieldVector vector = vectorSchemaRoot.getVector(fieldIndex);
106+
if (!(vector instanceof TimeStampVector)) {
107+
return null;
108+
}
109+
110+
Long maxTimestampMillis = null;
111+
int rowCount = getRecordCount();
112+
for (int rowId = 0; rowId < rowCount; rowId++) {
113+
if (vector.isNull(rowId)) {
114+
continue;
115+
}
116+
Long timestampMillis = getTimestampMillis((TimeStampVector) vector, rowId);
117+
if (timestampMillis == null) {
118+
return null;
119+
}
120+
maxTimestampMillis =
121+
maxTimestampMillis == null
122+
? timestampMillis
123+
: Math.max(maxTimestampMillis, timestampMillis);
124+
}
125+
return maxTimestampMillis;
126+
}
127+
128+
@Nullable
129+
private static Long getTimestampMillis(TimeStampVector vector, int rowId) {
130+
if (vector instanceof TimeStampSecVector) {
131+
return vector.get(rowId) * 1000;
132+
} else if (vector instanceof TimeStampMilliVector) {
133+
return vector.get(rowId);
134+
} else if (vector instanceof TimeStampMicroVector) {
135+
return vector.get(rowId) / 1000;
136+
} else if (vector instanceof TimeStampNanoVector) {
137+
return vector.get(rowId) / 1_000_000;
138+
} else {
139+
return null;
140+
}
141+
}
142+
89143
/**
90144
* Creates a new {@link ArrowBatchData} containing a contiguous slice of this batch's rows and
91145
* releases the original vector data.

fluss-flink/fluss-flink-common/pom.xml

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,20 @@
185185
<type>test-jar</type>
186186
</dependency>
187187

188+
<dependency>
189+
<groupId>org.apache.arrow</groupId>
190+
<artifactId>arrow-memory-netty</artifactId>
191+
<version>${arrow.version}</version>
192+
<scope>test</scope>
193+
</dependency>
194+
195+
<dependency>
196+
<groupId>org.apache.arrow</groupId>
197+
<artifactId>arrow-vector</artifactId>
198+
<version>${arrow.version}</version>
199+
<scope>test</scope>
200+
</dependency>
201+
188202
<dependency>
189203
<groupId>org.apache.flink</groupId>
190204
<artifactId>flink-clients</artifactId>
@@ -225,4 +239,4 @@
225239
</plugins>
226240
</build>
227241

228-
</project>
242+
</project>

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ private CommitResult commitWriteResults(
249249
}
250250

251251
if (nonEmptyResults.size() < committableWriteResults.size()) {
252-
// Empty results means some splits has not been processed, possibly caused by force
252+
// Empty results means some splits has not been processed, possibly caused by forced
253253
// completion. Do not update watermark here.
254254
if (watermark != null) {
255255
LOG.warn(

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@
3232
import org.apache.fluss.flink.tiering.source.split.TieringSnapshotSplit;
3333
import org.apache.fluss.flink.tiering.source.split.TieringSplit;
3434
import org.apache.fluss.flink.tiering.source.watermark.SimpleWatermarkExtractor;
35-
import org.apache.fluss.lake.watermark.WatermarkExtractor;
3635
import org.apache.fluss.lake.batch.ArrowRecordBatch;
36+
import org.apache.fluss.lake.watermark.WatermarkExtractor;
3737
import org.apache.fluss.lake.writer.LakeTieringFactory;
3838
import org.apache.fluss.lake.writer.LakeWriteResult;
3939
import org.apache.fluss.lake.writer.LakeWriter;

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/watermark/SimpleWatermarkExtractor.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@
1717

1818
package org.apache.fluss.flink.tiering.source.watermark;
1919

20+
import org.apache.fluss.lake.batch.ArrowRecordBatch;
21+
import org.apache.fluss.lake.batch.RecordBatch;
2022
import org.apache.fluss.lake.watermark.WatermarkExtractor;
2123
import org.apache.fluss.metadata.TableInfo;
24+
import org.apache.fluss.record.ArrowBatchData;
2225
import org.apache.fluss.row.InternalRow;
2326
import org.apache.fluss.types.RowType;
2427

@@ -116,8 +119,8 @@ public static SimpleWatermarkExtractor create(TableInfo tableInfo) {
116119
LOG.warn(
117120
"Watermark rowtime column '{}' not found in row type for {}, "
118121
+ "computed column is not supported for watermark extraction.",
119-
tableInfo.getTablePath(),
120-
rowtimeColumn);
122+
rowtimeColumn,
123+
tableInfo.getTablePath());
121124
return null;
122125
}
123126

@@ -164,6 +167,18 @@ public Long currentWatermark(InternalRow row) {
164167
}
165168
}
166169

170+
/** Extracts the maximum epoch-millis watermark from the given record batch. */
171+
@Override
172+
@Nullable
173+
public Long currentWatermark(RecordBatch recordBatch) {
174+
if (!(recordBatch instanceof ArrowRecordBatch)) {
175+
return null;
176+
}
177+
ArrowBatchData arrowBatchData = ((ArrowRecordBatch) recordBatch).getArrowBatchData();
178+
Long maxTimestampMillis = arrowBatchData.getMaxTimestampMillis(fieldIndex);
179+
return maxTimestampMillis == null ? null : maxTimestampMillis - delayMillis;
180+
}
181+
167182
/**
168183
* Parses the watermark delay from the strategy expression. For example, {@code `col` - INTERVAL
169184
* '5' SECOND} yields 5000 milliseconds. Returns {@code null} if the expression is unsupported.

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/watermark/SimpleWatermarkExtractorTest.java

Lines changed: 121 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,30 @@
1818
package org.apache.fluss.flink.tiering.source.watermark;
1919

2020
import org.apache.fluss.config.Configuration;
21+
import org.apache.fluss.lake.batch.ArrowRecordBatch;
2122
import org.apache.fluss.metadata.Schema;
2223
import org.apache.fluss.metadata.TableInfo;
2324
import org.apache.fluss.metadata.TablePath;
25+
import org.apache.fluss.record.ArrowBatchData;
2426
import org.apache.fluss.row.GenericRow;
2527
import org.apache.fluss.row.TimestampLtz;
2628
import org.apache.fluss.row.TimestampNtz;
2729
import org.apache.fluss.types.DataTypes;
2830

31+
import org.apache.arrow.memory.BufferAllocator;
32+
import org.apache.arrow.memory.RootAllocator;
33+
import org.apache.arrow.vector.FieldVector;
34+
import org.apache.arrow.vector.IntVector;
35+
import org.apache.arrow.vector.TimeStampMilliVector;
36+
import org.apache.arrow.vector.VectorSchemaRoot;
2937
import org.junit.jupiter.api.Test;
3038
import org.junit.jupiter.params.ParameterizedTest;
3139
import org.junit.jupiter.params.provider.CsvSource;
3240

41+
import java.util.Arrays;
3342
import java.util.Collections;
3443
import java.util.HashMap;
44+
import java.util.List;
3545
import java.util.Map;
3646

3747
import static org.assertj.core.api.Assertions.assertThat;
@@ -186,6 +196,75 @@ void testCurrentWatermarkWithTimestampLtzSimpleColumn() {
186196
assertThat(extractor.currentWatermark(row)).isEqualTo(5000L);
187197
}
188198

199+
@Test
200+
void testCurrentWatermarkWithArrowRecordBatchUsesMaximumTimestamp() throws Exception {
201+
TableInfo tableInfo =
202+
createTableInfoWithWatermark("`ts` - INTERVAL '5' SECOND", "TIMESTAMP(3)");
203+
SimpleWatermarkExtractor extractor = SimpleWatermarkExtractor.create(tableInfo);
204+
assertThat(extractor).isNotNull();
205+
206+
try (ArrowRecordBatch recordBatch =
207+
createArrowRecordBatch(
208+
new Object[] {1, TimestampNtz.fromMillis(10_000L)},
209+
new Object[] {2, TimestampNtz.fromMillis(30_000L)},
210+
new Object[] {3, TimestampNtz.fromMillis(20_000L)})) {
211+
assertThat(extractor.currentWatermark(recordBatch)).isEqualTo(25_000L);
212+
}
213+
}
214+
215+
@Test
216+
void testCurrentWatermarkWithArrowRecordBatchSkipsNullRowtime() throws Exception {
217+
TableInfo tableInfo = createTableInfoWithWatermark("`ts`", "TIMESTAMP(3)");
218+
SimpleWatermarkExtractor extractor = SimpleWatermarkExtractor.create(tableInfo);
219+
assertThat(extractor).isNotNull();
220+
221+
try (ArrowRecordBatch recordBatch =
222+
createArrowRecordBatch(
223+
new Object[] {1, null},
224+
new Object[] {2, TimestampNtz.fromMillis(8_000L)},
225+
new Object[] {3, null})) {
226+
assertThat(extractor.currentWatermark(recordBatch)).isEqualTo(8_000L);
227+
}
228+
}
229+
230+
@Test
231+
void testCurrentWatermarkWithArrowRecordBatchReturnsNullForAllNullRowtime() throws Exception {
232+
TableInfo tableInfo = createTableInfoWithWatermark("`ts`", "TIMESTAMP(3)");
233+
SimpleWatermarkExtractor extractor = SimpleWatermarkExtractor.create(tableInfo);
234+
assertThat(extractor).isNotNull();
235+
236+
try (ArrowRecordBatch recordBatch =
237+
createArrowRecordBatch(new Object[] {1, null}, new Object[] {2, null})) {
238+
assertThat(extractor.currentWatermark(recordBatch)).isNull();
239+
}
240+
}
241+
242+
@Test
243+
void testCurrentWatermarkWithArrowRecordBatchSupportsNegativeWatermark() throws Exception {
244+
TableInfo tableInfo =
245+
createTableInfoWithWatermark("`ts` - INTERVAL '5' SECOND", "TIMESTAMP(3)");
246+
SimpleWatermarkExtractor extractor = SimpleWatermarkExtractor.create(tableInfo);
247+
assertThat(extractor).isNotNull();
248+
249+
try (ArrowRecordBatch recordBatch =
250+
createArrowRecordBatch(
251+
new Object[] {1, TimestampNtz.fromMillis(-10_000L)},
252+
new Object[] {2, TimestampNtz.fromMillis(-3_000L)})) {
253+
assertThat(extractor.currentWatermark(recordBatch)).isEqualTo(-8_000L);
254+
}
255+
}
256+
257+
@Test
258+
void testCurrentWatermarkWithEmptyArrowRecordBatchReturnsNull() throws Exception {
259+
TableInfo tableInfo = createTableInfoWithWatermark("`ts`", "TIMESTAMP(3)");
260+
SimpleWatermarkExtractor extractor = SimpleWatermarkExtractor.create(tableInfo);
261+
assertThat(extractor).isNotNull();
262+
263+
try (ArrowRecordBatch recordBatch = createArrowRecordBatch()) {
264+
assertThat(extractor.currentWatermark(recordBatch)).isNull();
265+
}
266+
}
267+
189268
@ParameterizedTest
190269
@CsvSource({
191270
"TIMESTAMP, 0",
@@ -233,6 +312,7 @@ private static TableInfo createTableInfoWithoutWatermark() {
233312

234313
private static TableInfo createTableInfoFromSchema(
235314
Schema schema, Map<String, String> customProps) {
315+
Configuration tableConfig = new Configuration();
236316
return new TableInfo(
237317
TablePath.of("test_db", "test_table"),
238318
1L,
@@ -241,11 +321,51 @@ private static TableInfo createTableInfoFromSchema(
241321
Collections.emptyList(),
242322
Collections.emptyList(),
243323
1,
244-
new Configuration(),
324+
tableConfig,
245325
Configuration.fromMap(customProps),
246326
null,
247327
null,
248328
System.currentTimeMillis(),
249329
System.currentTimeMillis());
250330
}
331+
332+
private static ArrowRecordBatch createArrowRecordBatch(Object[]... rows) {
333+
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
334+
IntVector idVector = new IntVector("id", allocator);
335+
TimeStampMilliVector timestampVector = new TimeStampMilliVector("ts", allocator);
336+
for (int rowId = 0; rowId < rows.length; rowId++) {
337+
idVector.setSafe(rowId, (Integer) rows[rowId][0]);
338+
Object timestamp = rows[rowId][1];
339+
if (timestamp == null) {
340+
timestampVector.setNull(rowId);
341+
} else {
342+
timestampVector.setSafe(rowId, ((TimestampNtz) timestamp).getMillisecond());
343+
}
344+
}
345+
idVector.setValueCount(rows.length);
346+
timestampVector.setValueCount(rows.length);
347+
List<FieldVector> vectors = Arrays.asList(idVector, timestampVector);
348+
VectorSchemaRoot vectorSchemaRoot =
349+
new VectorSchemaRoot(
350+
Arrays.asList(idVector.getField(), timestampVector.getField()),
351+
vectors,
352+
rows.length);
353+
return new TestingArrowRecordBatch(
354+
new ArrowBatchData(vectorSchemaRoot, 0L, 0L, rows.length), allocator);
355+
}
356+
357+
private static class TestingArrowRecordBatch extends ArrowRecordBatch {
358+
private final BufferAllocator allocator;
359+
360+
private TestingArrowRecordBatch(ArrowBatchData arrowBatchData, BufferAllocator allocator) {
361+
super(arrowBatchData);
362+
this.allocator = allocator;
363+
}
364+
365+
@Override
366+
public void close() {
367+
super.close();
368+
allocator.close();
369+
}
370+
}
251371
}

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,7 @@ public void write(LogRecord record) throws IOException {
8686
try {
8787
recordWriter.write(record);
8888
if (watermarkExtractor != null) {
89-
Long ts = watermarkExtractor.currentWatermark(record.getRow());
90-
if (ts != null) {
91-
maxWatermark = maxWatermark == null ? ts : Math.max(maxWatermark, ts);
92-
}
89+
updateMaxWatermark(watermarkExtractor.currentWatermark(record.getRow()));
9390
}
9491
} catch (Exception e) {
9592
throw new IOException("Failed to write Fluss record to Paimon.", e);
@@ -110,6 +107,9 @@ public void write(RecordBatch recordBatch) throws IOException {
110107
try {
111108
((AppendOnlyWriter) recordWriter)
112109
.writeArrowBatch(((ArrowRecordBatch) recordBatch).getArrowBatchData());
110+
if (watermarkExtractor != null) {
111+
updateMaxWatermark(watermarkExtractor.currentWatermark(recordBatch));
112+
}
113113
} catch (Exception e) {
114114
throw new IOException("Failed to write Arrow record batch to Paimon.", e);
115115
}
@@ -153,4 +153,10 @@ private FileStoreTable getTable(TablePath tablePath, boolean isAutoCompaction)
153153
throw new IOException("Failed to get table " + tablePath + " in Paimon.", e);
154154
}
155155
}
156+
157+
private void updateMaxWatermark(@Nullable Long watermark) {
158+
if (watermark != null) {
159+
maxWatermark = maxWatermark == null ? watermark : Math.max(maxWatermark, watermark);
160+
}
161+
}
156162
}

0 commit comments

Comments
 (0)