Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ public class MetricNames {
public static final String LOG_END_OFFSET = "endOffset";
public static final String REMOTE_LOG_SIZE = "size";
public static final String LOG_LAKE_TIMESTAMP_LAG = "timestampLag";
public static final String LOG_LAKE_PENDING_RECORD_LAG = "pendingRecordLag";

// for logic storage
public static final String LOCAL_STORAGE_LOG_SIZE = "logSize";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ public final class LogTablet {
private volatile long lakeLogStartOffset = Long.MAX_VALUE;
private volatile long lakeLogEndOffset = -1L;
private volatile long lakeMaxTimestamp = -1;
private volatile long lakePendingTimestamp = -1L;

private LogTablet(
File dataDir,
Expand Down Expand Up @@ -277,6 +278,10 @@ public long getLakeMaxTimestamp() {
return lakeMaxTimestamp;
}

public long getLakePendingTimestamp() {
return lakePendingTimestamp;
}

public int getWriterIdCount() {
return writerStateManager.writerIdCount();
}
Expand Down Expand Up @@ -599,6 +604,7 @@ public void updateLakeLogStartOffset(long lakeHouseLogStartOffset) {
public void updateLakeLogEndOffset(long lakeLogEndOffset) {
if (lakeLogEndOffset > this.lakeLogEndOffset) {
this.lakeLogEndOffset = lakeLogEndOffset;
refreshLakePendingTimestamp();
}
}

Expand All @@ -608,6 +614,49 @@ public void updateLakeMaxTimestamp(long lakeMaxTimestamp) {
}
}

public void refreshLakePendingTimestamp() {
long pendingOffset =
lakeLogEndOffset < 0L
? localLogStartOffset()
: Math.max(lakeLogEndOffset, localLogStartOffset());
if (pendingOffset >= getHighWatermark()) {
lakePendingTimestamp = 0L;
return;
}

try {
lakePendingTimestamp = readCommitTimestamp(pendingOffset);
} catch (Exception e) {
LOG.debug(
"Failed to read pending record timestamp at offset {} for bucket {}",
pendingOffset,
getTableBucket(),
e);
lakePendingTimestamp = -1L;
}
}

private long readCommitTimestamp(long offset) throws IOException {
FetchDataInfo fetchDataInfo = read(offset, 1, FetchIsolation.LOG_END, true, null, null);
for (LogRecordBatch batch : fetchDataInfo.getRecords().batches()) {
return batch.commitTimestamp();
}
throw new IOException(
String.format(
"Failed to read pending batch timestamp at offset %s for bucket %s",
offset, getTableBucket()));
}

/**
* Called after a leader append to update {@link #lakePendingTimestamp} when lake tiering was
* caught up before this append.
*/
public void maybeUpdateLakePendingTimestamp(LogAppendInfo appendInfo) {
if (isDataLakeEnabled && lakeLogEndOffset >= appendInfo.firstOffset()) {
lakePendingTimestamp = appendInfo.maxTimestamp();
}
}

public void loadWriterSnapshot(long lastOffset) throws IOException {
synchronized (lock) {
rebuildWriterState(lastOffset, writerStateManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,12 @@ private void registerLakeTieringMetrics() {
logTablet.getLakeMaxTimestamp() < 0L
? -1
: logTablet.localMaxTimestamp() - logTablet.getLakeMaxTimestamp());
lakeTieringMetricGroup.gauge(
MetricNames.LOG_LAKE_PENDING_RECORD_LAG,
() -> {
long ts = logTablet.getLakePendingTimestamp();
return ts <= 0L ? ts : Math.max(clock.milliseconds() - ts, 0L);
});
}

private void onBecomeNewFollower(int standbyReplica) {
Expand Down Expand Up @@ -1061,6 +1067,7 @@ public LogAppendInfo appendRecordsToLeader(MemoryLogRecords memoryLogRecords, in
"Error while appending records to " + tableBucket, e);
}
maybeIncrementLeaderHW(logTablet, clock.milliseconds());
logTablet.maybeUpdateLakePendingTimestamp(appendInfo);

return appendInfo;
});
Expand Down Expand Up @@ -1102,6 +1109,7 @@ public LogAppendInfo putRecordsToLeader(
}
// we may need to increment high watermark.
maybeIncrementLeaderHW(logTablet, clock.milliseconds());
logTablet.maybeUpdateLakePendingTimestamp(logAppendInfo);
return logAppendInfo;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.Optional;
import java.util.concurrent.ScheduledFuture;

import static org.apache.fluss.record.TestData.ANOTHER_DATA1;
import static org.apache.fluss.record.TestData.DATA1;
import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
Expand Down Expand Up @@ -270,6 +271,38 @@ void testWriterExpireCheckAfterDelete() {
assertThat(scheduler.taskRunning(writerExpireCheck)).isFalse();
}

@Test
void testLakePendingTimestamp() throws Exception {
logTablet.updateIsDataLakeEnabled(true);

// Initial state: unknown.
assertThat(logTablet.getLakePendingTimestamp()).isEqualTo(-1L);

// Append two batches with multiple records each.
LogAppendInfo firstAppend = logTablet.appendAsLeader(genMemoryLogRecordsByObject(DATA1));
logTablet.updateHighWatermark(logTablet.localLogEndOffset());

Thread.sleep(2L);
LogAppendInfo secondAppend =
logTablet.appendAsLeader(genMemoryLogRecordsByObject(ANOTHER_DATA1));
logTablet.updateHighWatermark(logTablet.localLogEndOffset());

// Lake consumes the first batch — pending timestamp points to the second.
logTablet.updateLakeLogEndOffset(secondAppend.firstOffset());
assertThat(logTablet.getLakePendingTimestamp()).isEqualTo(secondAppend.maxTimestamp());

// Lake catches up completely — no pending.
logTablet.updateLakeLogEndOffset(logTablet.localLogEndOffset());
assertThat(logTablet.getLakePendingTimestamp()).isEqualTo(0L);

// New data arrives while lake is caught up.
Thread.sleep(2L);
LogAppendInfo thirdAppend = logTablet.appendAsLeader(genMemoryLogRecordsByObject(DATA1));
logTablet.updateHighWatermark(logTablet.localLogEndOffset());
logTablet.maybeUpdateLakePendingTimestamp(thirdAppend);
assertThat(logTablet.getLakePendingTimestamp()).isEqualTo(thirdAppend.maxTimestamp());
}

@Test
void testWriterStateOffsetUpdatedForNonIdempotentData() throws Exception {
MemoryLogRecords records = genMemoryLogRecordsByObject(DATA1);
Expand Down