From 54f3829bb9cd82f8d459f58ac2a30a2c931a69f6 Mon Sep 17 00:00:00 2001 From: Junbo Wang Date: Wed, 10 Jun 2026 20:14:27 +0800 Subject: [PATCH 1/2] [lake/tiering] Add pendingRecordLag metric for lake tiering --- .../org/apache/fluss/metrics/MetricNames.java | 1 + .../apache/fluss/server/log/LogTablet.java | 49 +++++++++++++++++++ .../apache/fluss/server/replica/Replica.java | 8 +++ .../fluss/server/log/LogTabletTest.java | 33 +++++++++++++ 4 files changed, 91 insertions(+) diff --git a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java index fe34deba95..2a8450d15e 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java +++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java @@ -237,6 +237,7 @@ public class MetricNames { public static final String LOG_END_OFFSET = "endOffset"; public static final String REMOTE_LOG_SIZE = "size"; public static final String LOG_LAKE_TIMESTAMP_LAG = "timestampLag"; + public static final String LAKE_PENDING_RECORD_LAG = "pendingRecordLag"; // for logic storage public static final String LOCAL_STORAGE_LOG_SIZE = "logSize"; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java index 23c60c82e1..c2d2272eaf 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java @@ -132,6 +132,7 @@ public final class LogTablet { private volatile long lakeLogStartOffset = Long.MAX_VALUE; private volatile long lakeLogEndOffset = -1L; private volatile long lakeMaxTimestamp = -1; + private volatile long lakePendingTimestamp = -1L; private LogTablet( File dataDir, @@ -277,6 +278,10 @@ public long getLakeMaxTimestamp() { return lakeMaxTimestamp; } + public long getLakePendingTimestamp() { + return lakePendingTimestamp; + } + public int getWriterIdCount() { return writerStateManager.writerIdCount(); } @@ -599,6 +604,7 @@ public void updateLakeLogStartOffset(long lakeHouseLogStartOffset) { public void updateLakeLogEndOffset(long lakeLogEndOffset) { if (lakeLogEndOffset > this.lakeLogEndOffset) { this.lakeLogEndOffset = lakeLogEndOffset; + refreshLakePendingTimestamp(); } } @@ -608,6 +614,49 @@ public void updateLakeMaxTimestamp(long lakeMaxTimestamp) { } } + public void refreshLakePendingTimestamp() { + long pendingOffset = + lakeLogEndOffset < 0L + ? localLogStartOffset() + : Math.max(lakeLogEndOffset, localLogStartOffset()); + if (pendingOffset >= getHighWatermark()) { + lakePendingTimestamp = 0L; + return; + } + + try { + lakePendingTimestamp = readCommitTimestamp(pendingOffset); + } catch (Exception e) { + LOG.debug( + "Failed to read pending record timestamp at offset {} for bucket {}", + pendingOffset, + getTableBucket(), + e); + lakePendingTimestamp = -1L; + } + } + + private long readCommitTimestamp(long offset) throws IOException { + FetchDataInfo fetchDataInfo = read(offset, 1, FetchIsolation.LOG_END, true, null, null); + for (LogRecordBatch batch : fetchDataInfo.getRecords().batches()) { + return batch.commitTimestamp(); + } + throw new IOException( + String.format( + "Failed to read pending batch timestamp at offset %s for bucket %s", + offset, getTableBucket())); + } + + /** + * Called after a leader append to update {@link #lakePendingTimestamp} when lake tiering was + * caught up before this append. + */ + public void maybeUpdateLakePendingTimestamp(LogAppendInfo appendInfo) { + if (isDataLakeEnabled && lakeLogEndOffset >= appendInfo.firstOffset()) { + lakePendingTimestamp = appendInfo.maxTimestamp(); + } + } + public void loadWriterSnapshot(long lastOffset) throws IOException { synchronized (lock) { rebuildWriterState(lastOffset, writerStateManager); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java index e11dd69c14..4b69a848a2 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java @@ -603,6 +603,12 @@ private void registerLakeTieringMetrics() { logTablet.getLakeMaxTimestamp() < 0L ? -1 : logTablet.localMaxTimestamp() - logTablet.getLakeMaxTimestamp()); + lakeTieringMetricGroup.gauge( + MetricNames.LAKE_PENDING_RECORD_LAG, + () -> { + long ts = logTablet.getLakePendingTimestamp(); + return ts <= 0L ? ts : Math.max(clock.milliseconds() - ts, 0L); + }); } private void onBecomeNewFollower(int standbyReplica) { @@ -1061,6 +1067,7 @@ public LogAppendInfo appendRecordsToLeader(MemoryLogRecords memoryLogRecords, in "Error while appending records to " + tableBucket, e); } maybeIncrementLeaderHW(logTablet, clock.milliseconds()); + logTablet.maybeUpdateLakePendingTimestamp(appendInfo); return appendInfo; }); @@ -1102,6 +1109,7 @@ public LogAppendInfo putRecordsToLeader( } // we may need to increment high watermark. maybeIncrementLeaderHW(logTablet, clock.milliseconds()); + logTablet.maybeUpdateLakePendingTimestamp(logAppendInfo); return logAppendInfo; }); } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/log/LogTabletTest.java b/fluss-server/src/test/java/org/apache/fluss/server/log/LogTabletTest.java index 46685cfe05..1ccc63c033 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/log/LogTabletTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/log/LogTabletTest.java @@ -53,6 +53,7 @@ import java.util.Optional; import java.util.concurrent.ScheduledFuture; +import static org.apache.fluss.record.TestData.ANOTHER_DATA1; import static org.apache.fluss.record.TestData.DATA1; import static org.apache.fluss.record.TestData.DATA1_TABLE_ID; import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH; @@ -270,6 +271,38 @@ void testWriterExpireCheckAfterDelete() { assertThat(scheduler.taskRunning(writerExpireCheck)).isFalse(); } + @Test + void testLakePendingTimestamp() throws Exception { + logTablet.updateIsDataLakeEnabled(true); + + // Initial state: unknown. + assertThat(logTablet.getLakePendingTimestamp()).isEqualTo(-1L); + + // Append two batches with multiple records each. + LogAppendInfo firstAppend = logTablet.appendAsLeader(genMemoryLogRecordsByObject(DATA1)); + logTablet.updateHighWatermark(logTablet.localLogEndOffset()); + + Thread.sleep(2L); + LogAppendInfo secondAppend = + logTablet.appendAsLeader(genMemoryLogRecordsByObject(ANOTHER_DATA1)); + logTablet.updateHighWatermark(logTablet.localLogEndOffset()); + + // Lake consumes the first batch — pending timestamp points to the second. + logTablet.updateLakeLogEndOffset(secondAppend.firstOffset()); + assertThat(logTablet.getLakePendingTimestamp()).isEqualTo(secondAppend.maxTimestamp()); + + // Lake catches up completely — no pending. + logTablet.updateLakeLogEndOffset(logTablet.localLogEndOffset()); + assertThat(logTablet.getLakePendingTimestamp()).isEqualTo(0L); + + // New data arrives while lake is caught up. + Thread.sleep(2L); + LogAppendInfo thirdAppend = logTablet.appendAsLeader(genMemoryLogRecordsByObject(DATA1)); + logTablet.updateHighWatermark(logTablet.localLogEndOffset()); + logTablet.maybeUpdateLakePendingTimestamp(thirdAppend); + assertThat(logTablet.getLakePendingTimestamp()).isEqualTo(thirdAppend.maxTimestamp()); + } + @Test void testWriterStateOffsetUpdatedForNonIdempotentData() throws Exception { MemoryLogRecords records = genMemoryLogRecordsByObject(DATA1); From 75794257da691b4d6dc09935e83b7b730c68d4c4 Mon Sep 17 00:00:00 2001 From: Junbo Wang Date: Wed, 10 Jun 2026 20:53:21 +0800 Subject: [PATCH 2/2] rename metrics name --- .../src/main/java/org/apache/fluss/metrics/MetricNames.java | 2 +- .../src/main/java/org/apache/fluss/server/replica/Replica.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java index 2a8450d15e..6a916d5e2e 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java +++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java @@ -237,7 +237,7 @@ public class MetricNames { public static final String LOG_END_OFFSET = "endOffset"; public static final String REMOTE_LOG_SIZE = "size"; public static final String LOG_LAKE_TIMESTAMP_LAG = "timestampLag"; - public static final String LAKE_PENDING_RECORD_LAG = "pendingRecordLag"; + public static final String LOG_LAKE_PENDING_RECORD_LAG = "pendingRecordLag"; // for logic storage public static final String LOCAL_STORAGE_LOG_SIZE = "logSize"; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java index 4b69a848a2..3d075c2b0b 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java @@ -604,7 +604,7 @@ private void registerLakeTieringMetrics() { ? -1 : logTablet.localMaxTimestamp() - logTablet.getLakeMaxTimestamp()); lakeTieringMetricGroup.gauge( - MetricNames.LAKE_PENDING_RECORD_LAG, + MetricNames.LOG_LAKE_PENDING_RECORD_LAG, () -> { long ts = logTablet.getLakePendingTimestamp(); return ts <= 0L ? ts : Math.max(clock.milliseconds() - ts, 0L);