Skip to content

Commit 9dfad70

Browse files
committed
Address feedback
1 parent aa8f010 commit 9dfad70

5 files changed

Lines changed: 45 additions & 27 deletions

File tree

fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -453,7 +453,7 @@ void testAlterTableLogTtl() throws Exception {
453453
assertThat(tableInfo.getTableConfig().getLogTTLMs())
454454
.isEqualTo(Duration.ofDays(30).toMillis());
455455

456-
// reset to remove the property; value should fall back to the default (7 days).
456+
// reset to remove the property; value should fall back to the configured default.
457457
tableChanges =
458458
Collections.singletonList(TableChange.reset(ConfigOptions.TABLE_LOG_TTL.key()));
459459
admin.alterTable(tablePath, tableChanges, false).get();
@@ -462,7 +462,7 @@ void testAlterTableLogTtl() throws Exception {
462462
assertThat(tableInfo.toTableDescriptor().getProperties())
463463
.doesNotContainKey(ConfigOptions.TABLE_LOG_TTL.key());
464464
assertThat(tableInfo.getTableConfig().getLogTTLMs())
465-
.isEqualTo(Duration.ofDays(7).toMillis());
465+
.isEqualTo(ConfigOptions.TABLE_LOG_TTL.defaultValue().toMillis());
466466
}
467467

468468
@Test

fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,33 @@ public List<RemoteLogSegment> relevantRemoteLogSegments(TableBucket tableBucket,
275275
return remoteLogTablet(tableBucket).relevantRemoteLogSegments(offset);
276276
}
277277

278+
/**
279+
* Updates the ttl of the {@link RemoteLogTablet} for the given bucket and returns its previous
280+
* ttl, or {@link Optional#empty()} if no tablet is currently registered (which happens in two
281+
* normal scenarios):
282+
*
283+
* <ul>
284+
* <li>remote logging is globally disabled, so no tablet is ever registered;
285+
* <li>the replica is still being created. The eventually-constructed tablet will read its
286+
* initial ttl from {@code Replica.getLogTTLMs()}; whether that reflects the latest ALTER
287+
* depends on the order in which the metadata update arrives at the owning {@code
288+
* ReplicaManager}.
289+
* </ul>
290+
*
291+
* @return the previous ttl in milliseconds, or {@link Optional#empty()} if absent.
292+
*/
293+
public Optional<Long> updateLogTtlMs(TableBucket tableBucket, long newTtlMs) {
294+
RemoteLogTablet remoteLogTablet = remoteLogs.get(tableBucket);
295+
if (remoteLogTablet == null) {
296+
return Optional.empty();
297+
}
298+
long oldTtlMs = remoteLogTablet.getTtlMs();
299+
if (oldTtlMs != newTtlMs) {
300+
remoteLogTablet.updateTtlMs(newTtlMs);
301+
}
302+
return Optional.of(oldTtlMs);
303+
}
304+
278305
private boolean remoteDisabled() {
279306
return taskInterval <= 0L;
280307
}

fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogTablet.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,12 @@ public List<RemoteLogSegment> allRemoteLogSegments() {
169169
*/
170170
public List<RemoteLogSegment> expiredRemoteLogSegments(
171171
long currentTimeMs, Long lakeLogEndOffset) {
172-
if (!logExpireEnable()) {
172+
// Snapshot the volatile ttlMs once so that a concurrent updateTtlMs() cannot change the
173+
// comparison base mid-iteration. Otherwise an in-flight change to a non-positive value
174+
// (which disables expiration) could make currentTimeMs - ts > ttlMs hold for ALL segments
175+
// and result in wrongly deleting every remote segment.
176+
final long ttlSnapshotMs = ttlMs;
177+
if (ttlSnapshotMs <= 0) {
173178
return Collections.emptyList();
174179
}
175180
return inReadLock(
@@ -179,7 +184,7 @@ public List<RemoteLogSegment> expiredRemoteLogSegments(
179184
for (Map.Entry<Long, Set<UUID>> entry :
180185
timestampToRemoteLogSegmentId.entrySet()) {
181186
long ts = entry.getKey();
182-
if (currentTimeMs - ts > ttlMs) {
187+
if (currentTimeMs - ts > ttlSnapshotMs) {
183188
for (UUID uuid : entry.getValue()) {
184189
RemoteLogSegment segment = idToRemoteLogSegment.get(uuid);
185190
if (lakeLogEndOffset != null) {
@@ -361,20 +366,13 @@ public void addAndDeleteLogSegments(
361366
});
362367
}
363368

364-
private boolean logExpireEnable() {
365-
return ttlMs > 0;
366-
}
367-
368369
/** Returns the current ttl in milliseconds for remote log segments. */
369-
@VisibleForTesting
370370
public long getTtlMs() {
371371
return ttlMs;
372372
}
373373

374374
/**
375-
* Update the ttl in milliseconds for remote log segments. This is invoked when the user alters
376-
* the table option {@code table.log.ttl}. The new value takes effect on the next round of
377-
* expired-segment evaluation.
375+
* Update the ttl in milliseconds for remote log segments.
378376
*
379377
* @param newTtlMs the new ttl in milliseconds; a non-positive value disables expiration
380378
*/

fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@
8888
import org.apache.fluss.server.log.LogTablet;
8989
import org.apache.fluss.server.log.checkpoint.OffsetCheckpointFile;
9090
import org.apache.fluss.server.log.remote.RemoteLogManager;
91-
import org.apache.fluss.server.log.remote.RemoteLogTablet;
9291
import org.apache.fluss.server.metadata.ServerMetadataCache;
9392
import org.apache.fluss.server.metadata.TabletServerMetadataCache;
9493
import org.apache.fluss.server.metrics.group.BucketMetricGroup;
@@ -694,27 +693,21 @@ public void updateTieredLogLocalSegments(int tieredLogLocalSegments) {
694693
* @param newTtlMs the new ttl in milliseconds; a non-positive value disables expiration
695694
*/
696695
public void updateLogTtlMs(long newTtlMs) {
697-
final RemoteLogTablet remoteLogTablet;
698-
try {
699-
remoteLogTablet = remoteLogManager.remoteLogTablet(tableBucket);
700-
} catch (IllegalStateException e) {
701-
// RemoteLogTablet may not be registered yet during the early stage of replica
702-
// creation. In that case, the alter is a no-op here because the Replica will read the
703-
// up-to-date ttl from the persisted TableInfo when constructing the RemoteLogTablet.
704-
LOG.warn(
705-
"RemoteLogTablet for {} is not registered yet, skip applying new logTtlMs={}.",
696+
Optional<Long> oldValueOpt = remoteLogManager.updateLogTtlMs(tableBucket, newTtlMs);
697+
if (!oldValueOpt.isPresent()) {
698+
LOG.debug(
699+
"RemoteLogTablet for {} is unavailable; skip applying new logTtlMs={} "
700+
+ "(remote logging may be disabled or the replica is still initializing).",
706701
tableBucket,
707702
newTtlMs);
708703
return;
709704
}
710705

711-
long oldValue = remoteLogTablet.getTtlMs();
706+
long oldValue = oldValueOpt.get();
712707
if (oldValue == newTtlMs) {
713708
return;
714709
}
715710

716-
remoteLogTablet.updateTtlMs(newTtlMs);
717-
718711
LOG.info("Replica for {} logTtlMs changed from {} to {}", tableBucket, oldValue, newTtlMs);
719712
}
720713

fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -709,8 +709,8 @@ void testAlterTableLogTtl(boolean partitionedTable) throws Exception {
709709
Replica replica = replicaManager.getReplicaOrException(tb);
710710
RemoteLogTablet remoteLog = remoteLogManager.remoteLogTablet(tb);
711711

712-
// Verify initial ttl is 7 days
713-
long defaultTtlMs = java.time.Duration.ofDays(7).toMillis();
712+
// Verify initial ttl matches the configured default table.log.ttl
713+
long defaultTtlMs = ConfigOptions.TABLE_LOG_TTL.defaultValue().toMillis();
714714
assertThat(remoteLog.getTtlMs()).isEqualTo(defaultTtlMs);
715715

716716
// 2. Update ttl via Replica.updateLogTtlMs (simulating metadata propagation)

0 commit comments

Comments
 (0)