Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,49 @@ void testAlterTableConfig() throws Exception {
.get();
}

@Test
void testAlterTableLogTtl() throws Exception {
// Verify that altering 'table.log.ttl' is supported: the new value should be persisted in
// TableInfo so that subsequent reads observe the updated retention.
TablePath tablePath = TablePath.of("test_db", "alter_table_log_ttl");
admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false).get();

// verify initial value matches the property set in DEFAULT_TABLE_DESCRIPTOR (1 day)
TableInfo tableInfo = admin.getTableInfo(tablePath).get();
assertThat(tableInfo.getTableConfig().getLogTTLMs())
.isEqualTo(Duration.ofDays(1).toMillis());

// alter to 3d and verify metadata
List<TableChange> tableChanges =
Collections.singletonList(TableChange.set(ConfigOptions.TABLE_LOG_TTL.key(), "3d"));
admin.alterTable(tablePath, tableChanges, false).get();

tableInfo = admin.getTableInfo(tablePath).get();
assertThat(tableInfo.getTableConfig().getLogTTLMs())
.isEqualTo(Duration.ofDays(3).toMillis());

// alter to another value (30d) to verify multiple updates work.
tableChanges =
Collections.singletonList(
TableChange.set(ConfigOptions.TABLE_LOG_TTL.key(), "30d"));
admin.alterTable(tablePath, tableChanges, false).get();

tableInfo = admin.getTableInfo(tablePath).get();
assertThat(tableInfo.getTableConfig().getLogTTLMs())
.isEqualTo(Duration.ofDays(30).toMillis());

// reset to remove the property; value should fall back to the configured default.
tableChanges =
Collections.singletonList(TableChange.reset(ConfigOptions.TABLE_LOG_TTL.key()));
admin.alterTable(tablePath, tableChanges, false).get();

tableInfo = admin.getTableInfo(tablePath).get();
assertThat(tableInfo.toTableDescriptor().getProperties())
.doesNotContainKey(ConfigOptions.TABLE_LOG_TTL.key());
assertThat(tableInfo.getTableConfig().getLogTTLMs())
.isEqualTo(ConfigOptions.TABLE_LOG_TTL.defaultValue().toMillis());
}

@Test
void testAlterTableColumn() throws Exception {
// create table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class FlussConfigUtils {
Arrays.asList(
ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(),
ConfigOptions.TABLE_LOG_TTL.key(),
ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION.key(),
ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key(),
ConfigOptions.TABLE_AUTO_PARTITION_ENABLED.key(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,33 @@ public List<RemoteLogSegment> relevantRemoteLogSegments(TableBucket tableBucket,
return remoteLogTablet(tableBucket).relevantRemoteLogSegments(offset);
}

/**
* Updates the ttl of the {@link RemoteLogTablet} for the given bucket and returns its previous
* ttl, or {@link Optional#empty()} if no tablet is currently registered (which happens in two
* normal scenarios):
*
* <ul>
* <li>remote logging is globally disabled, so no tablet is ever registered;
* <li>the replica is still being created. The eventually-constructed tablet will read its
* initial ttl from {@code Replica.getLogTTLMs()}; whether that reflects the latest ALTER
* depends on the order in which the metadata update arrives at the owning {@code
* ReplicaManager}.
* </ul>
*
* @return the previous ttl in milliseconds, or {@link Optional#empty()} if absent.
*/
public Optional<Long> updateLogTtlMs(TableBucket tableBucket, long newTtlMs) {
RemoteLogTablet remoteLogTablet = remoteLogs.get(tableBucket);
if (remoteLogTablet == null) {
return Optional.empty();
}
long oldTtlMs = remoteLogTablet.getTtlMs();
if (oldTtlMs != newTtlMs) {
remoteLogTablet.updateTtlMs(newTtlMs);
}
return Optional.of(oldTtlMs);
}

private boolean remoteDisabled() {
return taskInterval <= 0L;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class RemoteLogTablet {
/** The lock to protect the remote log segment list. */
private final ReadWriteLock lock = new ReentrantReadWriteLock();

private final long ttlMs;
private volatile long ttlMs;

/** The registered metrics for remote log. */
private volatile MetricGroup remoteLogMetrics;
Expand Down Expand Up @@ -169,7 +169,12 @@ public List<RemoteLogSegment> allRemoteLogSegments() {
*/
public List<RemoteLogSegment> expiredRemoteLogSegments(
long currentTimeMs, Long lakeLogEndOffset) {
if (!logExpireEnable()) {
// Snapshot the volatile ttlMs once so that a concurrent updateTtlMs() cannot change the
// comparison base mid-iteration. Otherwise an in-flight change to a non-positive value
// (which disables expiration) could make currentTimeMs - ts > ttlMs hold for ALL segments
// and result in wrongly deleting every remote segment.
final long ttlSnapshotMs = ttlMs;
if (ttlSnapshotMs <= 0) {
return Collections.emptyList();
}
return inReadLock(
Expand All @@ -179,7 +184,7 @@ public List<RemoteLogSegment> expiredRemoteLogSegments(
for (Map.Entry<Long, Set<UUID>> entry :
timestampToRemoteLogSegmentId.entrySet()) {
long ts = entry.getKey();
if (currentTimeMs - ts > ttlMs) {
if (currentTimeMs - ts > ttlSnapshotMs) {
for (UUID uuid : entry.getValue()) {
RemoteLogSegment segment = idToRemoteLogSegment.get(uuid);
if (lakeLogEndOffset != null) {
Expand Down Expand Up @@ -361,8 +366,18 @@ public void addAndDeleteLogSegments(
});
}

private boolean logExpireEnable() {
return ttlMs > 0;
/** Returns the current ttl in milliseconds for remote log segments. */
public long getTtlMs() {
return ttlMs;
}

/**
* Update the ttl in milliseconds for remote log segments.
*
* @param newTtlMs the new ttl in milliseconds; a non-positive value disables expiration
*/
public void updateTtlMs(long newTtlMs) {
this.ttlMs = newTtlMs;
}

private void reset() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,33 @@ public void updateTieredLogLocalSegments(int tieredLogLocalSegments) {
tieredLogLocalSegments);
}

/**
* Update the ttl in milliseconds for remote log segments. This method is called when the table
* configuration {@code table.log.ttl} is altered, so that the new ttl takes effect immediately
* for the next round of expired-segment evaluation, without requiring a server restart or table
* re-creation.
*
* @param newTtlMs the new ttl in milliseconds; a non-positive value disables expiration
*/
public void updateLogTtlMs(long newTtlMs) {
Optional<Long> oldValueOpt = remoteLogManager.updateLogTtlMs(tableBucket, newTtlMs);
if (!oldValueOpt.isPresent()) {
LOG.debug(
"RemoteLogTablet for {} is unavailable; skip applying new logTtlMs={} "
+ "(remote logging may be disabled or the replica is still initializing).",
tableBucket,
newTtlMs);
return;
}

long oldValue = oldValueOpt.get();
if (oldValue == newTtlMs) {
return;
}

LOG.info("Replica for {} logTtlMs changed from {} to {}", tableBucket, oldValue, newTtlMs);
}

private void createKv() {
try {
// create a closeable registry for the closable related to kv
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,7 @@ public void maybeUpdateMetadataCache(int coordinatorEpoch, ClusterMetadata clust
private void updateReplicaTableConfig(ClusterMetadata clusterMetadata) {
Map<Long, Boolean> tableIdToLakeFlag = new HashMap<>();
Map<Long, Integer> tableIdToTieredLogLocalSegments = new HashMap<>();
Map<Long, Long> tableIdToLogTtlMs = new HashMap<>();

for (TableMetadata tableMetadata : clusterMetadata.getTableMetadataList()) {
TableInfo tableInfo = tableMetadata.getTableInfo();
Expand All @@ -578,9 +579,15 @@ private void updateReplicaTableConfig(ClusterMetadata clusterMetadata) {
// Collect tiered log local segments configuration
int tieredLogLocalSegments = tableInfo.getTableConfig().getTieredLogLocalSegments();
tableIdToTieredLogLocalSegments.put(tableId, tieredLogLocalSegments);

// Collect log ttl configuration
long logTtlMs = tableInfo.getTableConfig().getLogTTLMs();
tableIdToLogTtlMs.put(tableId, logTtlMs);
}

if (tableIdToLakeFlag.isEmpty() && tableIdToTieredLogLocalSegments.isEmpty()) {
if (tableIdToLakeFlag.isEmpty()
&& tableIdToTieredLogLocalSegments.isEmpty()
&& tableIdToLogTtlMs.isEmpty()) {
return;
}

Expand All @@ -600,6 +607,11 @@ private void updateReplicaTableConfig(ClusterMetadata clusterMetadata) {
replica.updateTieredLogLocalSegments(
tableIdToTieredLogLocalSegments.get(tableId));
}

// Update log ttl configuration
if (tableIdToLogTtlMs.containsKey(tableId)) {
replica.updateLogTtlMs(tableIdToLogTtlMs.get(tableId));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,43 @@ void testAlterTableTieredLogLocalSegments(boolean partitionedTable) throws Excep
assertThat(logTablet.getSegments()).hasSize(3);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testAlterTableLogTtl(boolean partitionedTable) throws Exception {
// 1. Create table with default table.log.ttl (= 7 days)
long tableId =
registerTableInZkClient(
DATA1_TABLE_PATH,
DATA1_SCHEMA,
201L,
Collections.emptyList(),
Collections.emptyMap());
TableBucket tb = makeTableBucket(tableId, partitionedTable);
makeLogTableAsLeader(tb, partitionedTable);

Replica replica = replicaManager.getReplicaOrException(tb);
RemoteLogTablet remoteLog = remoteLogManager.remoteLogTablet(tb);

// Verify initial ttl matches the configured default table.log.ttl
long defaultTtlMs = ConfigOptions.TABLE_LOG_TTL.defaultValue().toMillis();
assertThat(remoteLog.getTtlMs()).isEqualTo(defaultTtlMs);

// 2. Update ttl via Replica.updateLogTtlMs (simulating metadata propagation)
long newTtlMs = java.time.Duration.ofDays(1).toMillis();
replica.updateLogTtlMs(newTtlMs);

// Verify RemoteLogTablet internal state has been updated
assertThat(remoteLog.getTtlMs()).isEqualTo(newTtlMs);

// 3. Update with the same value should be a no-op
replica.updateLogTtlMs(newTtlMs);
assertThat(remoteLog.getTtlMs()).isEqualTo(newTtlMs);

// 4. Disabling expiration via a non-positive ttl should propagate as-is.
replica.updateLogTtlMs(-1L);
assertThat(remoteLog.getTtlMs()).isEqualTo(-1L);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testCopySegmentPartialFailureCommitsSuccessfulOnes(boolean partitionTable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.fluss.server.log.remote;

import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.remote.RemoteLogSegment;
import org.apache.fluss.server.log.LogTablet;

Expand Down Expand Up @@ -181,6 +182,37 @@ void testFindRemoteLogSegmentByTimestamp(boolean partitionTable) throws Exceptio
assertThat(remoteLogTablet.findSegmentByTimestamp(51L)).isNull();
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testUpdateTtlMs(boolean partitionTable) throws Exception {
LogTablet logTablet = makeLogTabletAndAddSegments(partitionTable);
RemoteLogTablet remoteLogTablet = buildRemoteLogTablet(logTablet);

// initial ttl follows the default in ConfigOptions.TABLE_LOG_TTL (7 days)
long defaultTtlMs = conf.get(ConfigOptions.TABLE_LOG_TTL).toMillis();
assertThat(remoteLogTablet.getTtlMs()).isEqualTo(defaultTtlMs);

// add 1 segment with maxTimestamp = 0
RemoteLogSegment segment = createLogSegmentWithMaxTimestamp(logTablet, 0L, 0L, 10L);
remoteLogTablet.addAndDeleteLogSegments(
Collections.singletonList(segment), Collections.emptyList());

// currentTime = 1 hour. (1h - 0) < 7d, so the segment is NOT expired.
long oneHourMs = java.time.Duration.ofHours(1).toMillis();
assertThat(remoteLogTablet.expiredRemoteLogSegments(oneHourMs, null)).isEmpty();

// shrink ttl to 1 ms via updateTtlMs, the same segment should now be expired.
remoteLogTablet.updateTtlMs(1L);
assertThat(remoteLogTablet.getTtlMs()).isEqualTo(1L);
assertThat(remoteLogTablet.expiredRemoteLogSegments(oneHourMs, null))
.containsExactly(segment);

// disable expiration by setting ttl to a non-positive value.
remoteLogTablet.updateTtlMs(-1L);
assertThat(remoteLogTablet.getTtlMs()).isEqualTo(-1L);
assertThat(remoteLogTablet.expiredRemoteLogSegments(oneHourMs, null)).isEmpty();
}

RemoteLogSegment createLogSegmentWithMaxTimestamp(
LogTablet logTablet,
long timestamp,
Expand Down