Skip to content

Commit 7055fc7

Browse files
[rpc][server][client] add ListRemoteLogManifests and ListKvSnapshots RPCs for orphan cleanup
1 parent b6717fe commit 7055fc7

16 files changed

Lines changed: 935 additions & 2 deletions

File tree

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
package org.apache.fluss.client.admin;
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.client.metadata.ActiveKvSnapshots;
2122
import org.apache.fluss.client.metadata.KvSnapshotMetadata;
2223
import org.apache.fluss.client.metadata.KvSnapshots;
2324
import org.apache.fluss.client.metadata.LakeSnapshot;
25+
import org.apache.fluss.client.metadata.RemoteLogManifestInfo;
2426
import org.apache.fluss.cluster.ServerNode;
2527
import org.apache.fluss.cluster.rebalance.GoalType;
2628
import org.apache.fluss.cluster.rebalance.RebalanceProgress;
@@ -796,4 +798,32 @@ CompletableFuture<RegisterResult> registerProducerOffsets(
796798
* @since 1.0
797799
*/
798800
CompletableFuture<ClusterHealth> getClusterHealth();
801+
802+
/**
803+
* List per-bucket remote log manifest entries for a table or partition scope.
804+
*
805+
* @param tableId the table to query
806+
* @param partitionId optional partition id (null for non-partitioned tables)
807+
* @return per-bucket manifest paths and end offsets
808+
*/
809+
@PublicEvolving
810+
default CompletableFuture<List<RemoteLogManifestInfo>> listRemoteLogManifests(
811+
long tableId, @Nullable Long partitionId) {
812+
throw new UnsupportedOperationException(
813+
"listRemoteLogManifests is not supported by this Admin implementation");
814+
}
815+
816+
/**
817+
* List per-bucket active KV snapshot ids for a table or partition scope.
818+
*
819+
* @param tableId the table to query
820+
* @param partitionId optional partition id (null for non-partitioned tables)
821+
* @return per-bucket active snapshot ids grouped by bucket
822+
*/
823+
@PublicEvolving
824+
default CompletableFuture<ActiveKvSnapshots> listKvSnapshots(
825+
long tableId, @Nullable Long partitionId) {
826+
throw new UnsupportedOperationException(
827+
"listKvSnapshots is not supported by this Admin implementation");
828+
}
799829
}

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818
package org.apache.fluss.client.admin;
1919

2020
import org.apache.fluss.annotation.VisibleForTesting;
21+
import org.apache.fluss.client.metadata.ActiveKvSnapshots;
2122
import org.apache.fluss.client.metadata.KvSnapshotMetadata;
2223
import org.apache.fluss.client.metadata.KvSnapshots;
2324
import org.apache.fluss.client.metadata.LakeSnapshot;
2425
import org.apache.fluss.client.metadata.MetadataUpdater;
26+
import org.apache.fluss.client.metadata.RemoteLogManifestInfo;
2527
import org.apache.fluss.client.utils.ClientRpcMessageUtils;
2628
import org.apache.fluss.cluster.Cluster;
2729
import org.apache.fluss.cluster.ServerNode;
@@ -81,10 +83,12 @@
8183
import org.apache.fluss.rpc.messages.ListAclsRequest;
8284
import org.apache.fluss.rpc.messages.ListDatabasesRequest;
8385
import org.apache.fluss.rpc.messages.ListDatabasesResponse;
86+
import org.apache.fluss.rpc.messages.ListKvSnapshotsRequest;
8487
import org.apache.fluss.rpc.messages.ListOffsetsRequest;
8588
import org.apache.fluss.rpc.messages.ListOffsetsResponse;
8689
import org.apache.fluss.rpc.messages.ListPartitionInfosRequest;
8790
import org.apache.fluss.rpc.messages.ListRebalanceProgressRequest;
91+
import org.apache.fluss.rpc.messages.ListRemoteLogManifestsRequest;
8892
import org.apache.fluss.rpc.messages.ListTablesRequest;
8993
import org.apache.fluss.rpc.messages.ListTablesResponse;
9094
import org.apache.fluss.rpc.messages.PbAlterConfig;
@@ -392,6 +396,36 @@ public CompletableFuture<List<PartitionInfo>> listPartitionInfos(
392396
.thenApply(ClientRpcMessageUtils::toPartitionInfos);
393397
}
394398

399+
/**
400+
* Returns per-bucket remote log manifest path for the given table or partition.
401+
*
402+
* <p>Used by the orphan cleanup action to construct the active manifest path set without
403+
* relying on FS LIST + mtime selection.
404+
*/
405+
@Override
406+
public CompletableFuture<List<RemoteLogManifestInfo>> listRemoteLogManifests(
407+
long tableId, @Nullable Long partitionId) {
408+
ListRemoteLogManifestsRequest request = new ListRemoteLogManifestsRequest();
409+
request.setTableId(tableId);
410+
if (partitionId != null) {
411+
request.setPartitionId(partitionId);
412+
}
413+
return gateway.listRemoteLogManifests(request)
414+
.thenApply(ClientRpcMessageUtils::toRemoteLogManifestInfos);
415+
}
416+
417+
@Override
418+
public CompletableFuture<ActiveKvSnapshots> listKvSnapshots(
419+
long tableId, @Nullable Long partitionId) {
420+
ListKvSnapshotsRequest request = new ListKvSnapshotsRequest();
421+
request.setTableId(tableId);
422+
if (partitionId != null) {
423+
request.setPartitionId(partitionId);
424+
}
425+
return gateway.listKvSnapshots(request)
426+
.thenApply(ClientRpcMessageUtils::toActiveKvSnapshots);
427+
}
428+
395429
@Override
396430
public CompletableFuture<Void> createPartition(
397431
TablePath tablePath, PartitionSpec partitionSpec, boolean ignoreIfExists) {
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.metadata;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
22+
import javax.annotation.Nullable;
23+
24+
import java.util.Collections;
25+
import java.util.HashMap;
26+
import java.util.Map;
27+
import java.util.Set;
28+
29+
/**
30+
* Per-bucket active KV snapshot ids for a table or partition scope. The active set is the union of
31+
* retained snapshots and lease-pinned (still-in-use) snapshots reported by the coordinator.
32+
*
33+
* @since 0.7
34+
*/
35+
@PublicEvolving
36+
public final class ActiveKvSnapshots {
37+
38+
private final long tableId;
39+
@Nullable private final Long partitionId;
40+
private final Map<Integer, Set<Long>> snapshotIdsByBucket;
41+
42+
public ActiveKvSnapshots(
43+
long tableId, @Nullable Long partitionId, Map<Integer, Set<Long>> snapshotIdsByBucket) {
44+
this.tableId = tableId;
45+
this.partitionId = partitionId;
46+
Map<Integer, Set<Long>> copy = new HashMap<>();
47+
for (Map.Entry<Integer, Set<Long>> entry : snapshotIdsByBucket.entrySet()) {
48+
copy.put(entry.getKey(), Collections.unmodifiableSet(entry.getValue()));
49+
}
50+
this.snapshotIdsByBucket = Collections.unmodifiableMap(copy);
51+
}
52+
53+
public long getTableId() {
54+
return tableId;
55+
}
56+
57+
@Nullable
58+
public Long getPartitionId() {
59+
return partitionId;
60+
}
61+
62+
public Map<Integer, Set<Long>> getSnapshotIdsByBucket() {
63+
return snapshotIdsByBucket;
64+
}
65+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.metadata;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.metadata.TableBucket;
22+
23+
/**
24+
* A single remote log manifest entry returned by the coordinator. Each entry maps a {@link
25+
* TableBucket} to its current manifest file path and the end offset covered by that manifest.
26+
*
27+
* @since 0.7
28+
*/
29+
@PublicEvolving
30+
public final class RemoteLogManifestInfo {
31+
32+
private final TableBucket tableBucket;
33+
private final String remoteLogManifestPath;
34+
private final long remoteLogEndOffset;
35+
36+
public RemoteLogManifestInfo(
37+
TableBucket tableBucket, String remoteLogManifestPath, long remoteLogEndOffset) {
38+
this.tableBucket = tableBucket;
39+
this.remoteLogManifestPath = remoteLogManifestPath;
40+
this.remoteLogEndOffset = remoteLogEndOffset;
41+
}
42+
43+
public TableBucket getTableBucket() {
44+
return tableBucket;
45+
}
46+
47+
public String getRemoteLogManifestPath() {
48+
return remoteLogManifestPath;
49+
}
50+
51+
public long getRemoteLogEndOffset() {
52+
return remoteLogEndOffset;
53+
}
54+
}

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@
2424
import org.apache.fluss.client.lookup.LookupBatch;
2525
import org.apache.fluss.client.lookup.PrefixLookupBatch;
2626
import org.apache.fluss.client.metadata.AcquireKvSnapshotLeaseResult;
27+
import org.apache.fluss.client.metadata.ActiveKvSnapshots;
2728
import org.apache.fluss.client.metadata.KvSnapshotMetadata;
2829
import org.apache.fluss.client.metadata.KvSnapshots;
2930
import org.apache.fluss.client.metadata.LakeSnapshot;
31+
import org.apache.fluss.client.metadata.RemoteLogManifestInfo;
3032
import org.apache.fluss.client.write.KvWriteBatch;
3133
import org.apache.fluss.client.write.ReadyWriteBatch;
3234
import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
@@ -61,9 +63,11 @@
6163
import org.apache.fluss.rpc.messages.GetProducerOffsetsResponse;
6264
import org.apache.fluss.rpc.messages.GetTableStatsRequest;
6365
import org.apache.fluss.rpc.messages.ListDatabasesResponse;
66+
import org.apache.fluss.rpc.messages.ListKvSnapshotsResponse;
6467
import org.apache.fluss.rpc.messages.ListOffsetsRequest;
6568
import org.apache.fluss.rpc.messages.ListPartitionInfosResponse;
6669
import org.apache.fluss.rpc.messages.ListRebalanceProgressResponse;
70+
import org.apache.fluss.rpc.messages.ListRemoteLogManifestsResponse;
6771
import org.apache.fluss.rpc.messages.LookupRequest;
6872
import org.apache.fluss.rpc.messages.MetadataRequest;
6973
import org.apache.fluss.rpc.messages.PbAddColumn;
@@ -87,6 +91,7 @@
8791
import org.apache.fluss.rpc.messages.PbRebalancePlanForBucket;
8892
import org.apache.fluss.rpc.messages.PbRebalanceProgressForBucket;
8993
import org.apache.fluss.rpc.messages.PbRebalanceProgressForTable;
94+
import org.apache.fluss.rpc.messages.PbRemoteLogManifestEntry;
9095
import org.apache.fluss.rpc.messages.PbRemotePathAndLocalFile;
9196
import org.apache.fluss.rpc.messages.PbRenameColumn;
9297
import org.apache.fluss.rpc.messages.PbTableBucket;
@@ -106,6 +111,7 @@
106111
import java.util.Arrays;
107112
import java.util.Collection;
108113
import java.util.HashMap;
114+
import java.util.HashSet;
109115
import java.util.List;
110116
import java.util.Map;
111117
import java.util.Optional;
@@ -536,6 +542,34 @@ public static ReleaseKvSnapshotLeaseRequest makeReleaseKvSnapshotLeaseRequest(
536542
return request;
537543
}
538544

545+
public static List<RemoteLogManifestInfo> toRemoteLogManifestInfos(
546+
ListRemoteLogManifestsResponse response) {
547+
List<RemoteLogManifestInfo> result = new ArrayList<>(response.getManifestsCount());
548+
for (PbRemoteLogManifestEntry entry : response.getManifestsList()) {
549+
PbTableBucket pb = entry.getTableBucket();
550+
Long partitionId = pb.hasPartitionId() ? pb.getPartitionId() : null;
551+
TableBucket tableBucket =
552+
new TableBucket(pb.getTableId(), partitionId, pb.getBucketId());
553+
result.add(
554+
new RemoteLogManifestInfo(
555+
tableBucket,
556+
entry.getRemoteLogManifestPath(),
557+
entry.getRemoteLogEndOffset()));
558+
}
559+
return result;
560+
}
561+
562+
public static ActiveKvSnapshots toActiveKvSnapshots(ListKvSnapshotsResponse response) {
563+
Map<Integer, Set<Long>> snapshotIdsByBucket = new HashMap<>();
564+
for (PbKvSnapshot snapshot : response.getActiveSnapshotsList()) {
565+
snapshotIdsByBucket
566+
.computeIfAbsent(snapshot.getBucketId(), k -> new HashSet<>())
567+
.add(snapshot.getSnapshotId());
568+
}
569+
Long partitionId = response.hasPartitionId() ? response.getPartitionId() : null;
570+
return new ActiveKvSnapshots(response.getTableId(), partitionId, snapshotIdsByBucket);
571+
}
572+
539573
public static Optional<RebalanceProgress> toRebalanceProgress(
540574
ListRebalanceProgressResponse response) {
541575
if (!response.hasRebalanceId()) {

fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,12 @@
5151
import org.apache.fluss.rpc.messages.DropTableResponse;
5252
import org.apache.fluss.rpc.messages.GetProducerOffsetsRequest;
5353
import org.apache.fluss.rpc.messages.GetProducerOffsetsResponse;
54+
import org.apache.fluss.rpc.messages.ListKvSnapshotsRequest;
55+
import org.apache.fluss.rpc.messages.ListKvSnapshotsResponse;
5456
import org.apache.fluss.rpc.messages.ListRebalanceProgressRequest;
5557
import org.apache.fluss.rpc.messages.ListRebalanceProgressResponse;
58+
import org.apache.fluss.rpc.messages.ListRemoteLogManifestsRequest;
59+
import org.apache.fluss.rpc.messages.ListRemoteLogManifestsResponse;
5660
import org.apache.fluss.rpc.messages.RebalanceRequest;
5761
import org.apache.fluss.rpc.messages.RebalanceResponse;
5862
import org.apache.fluss.rpc.messages.RegisterProducerOffsetsRequest;
@@ -216,4 +220,29 @@ CompletableFuture<DropKvSnapshotLeaseResponse> dropKvSnapshotLease(
216220

217221
// todo: rename table & alter table
218222

223+
// ==================================================================================
224+
// Orphan Cleanup RPCs (coordinator-only, internal)
225+
// ==================================================================================
226+
227+
/**
228+
* List remote log manifest entries for all buckets of a table or single partition.
229+
*
230+
* @param request request with table_id and optional partition_id
231+
* @return per-bucket manifest path and end offset
232+
*/
233+
@RPC(api = ApiKeys.LIST_REMOTE_LOG_MANIFESTS)
234+
CompletableFuture<ListRemoteLogManifestsResponse> listRemoteLogManifests(
235+
ListRemoteLogManifestsRequest request);
236+
237+
/**
238+
* List active KV snapshot ids for a (tableId, partitionId) unit. The response is the union of
239+
* (a) snapshots currently held by the in-memory {@code CompletedSnapshotStore} for each bucket
240+
* and (b) snapshots still pinned by an active KV snapshot lease. No retention truncation is
241+
* applied — every snapshot the coordinator has not yet pruned from ZK is reported as active so
242+
* orphan cleanup never misdeletes a still-referenced snapshot. The server emits one entry per
243+
* active {@code (bucket_id, snapshot_id)} pair with no source discriminator; callers must treat
244+
* the entire response as the active set.
245+
*/
246+
@RPC(api = ApiKeys.LIST_KV_SNAPSHOTS)
247+
CompletableFuture<ListKvSnapshotsResponse> listKvSnapshots(ListKvSnapshotsRequest request);
219248
}

fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,9 @@ public enum ApiKeys {
104104
GET_TABLE_STATS(1059, 0, 0, PUBLIC),
105105
ALTER_DATABASE(1060, 0, 0, PUBLIC),
106106
SCAN_KV(1061, 0, 0, PUBLIC),
107-
GET_CLUSTER_HEALTH(1062, 0, 0, PUBLIC);
107+
GET_CLUSTER_HEALTH(1062, 0, 0, PUBLIC),
108+
LIST_REMOTE_LOG_MANIFESTS(1063, 0, 0, PUBLIC),
109+
LIST_KV_SNAPSHOTS(1064, 0, 0, PUBLIC);
108110

109111
private static final Map<Integer, ApiKeys> ID_TO_TYPE =
110112
Arrays.stream(ApiKeys.values())

fluss-rpc/src/main/proto/FlussApi.proto

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,36 @@ message ListPartitionInfosResponse {
506506
repeated PbPartitionInfo partitions_info = 1;
507507
}
508508

509+
// list remote log manifest entries (one per bucket of a table or partition)
510+
message ListRemoteLogManifestsRequest {
511+
required int64 table_id = 1;
512+
optional int64 partition_id = 2; // required if table is partitioned
513+
}
514+
515+
message ListRemoteLogManifestsResponse {
516+
repeated PbRemoteLogManifestEntry manifests = 1;
517+
}
518+
519+
message PbRemoteLogManifestEntry {
520+
required PbTableBucket table_bucket = 1;
521+
required string remote_log_manifest_path = 2;
522+
required int64 remote_log_end_offset = 3;
523+
}
524+
525+
// list active KV snapshot dirs (retained_N + still-in-use) for a unit
526+
message ListKvSnapshotsRequest {
527+
required int64 table_id = 1;
528+
optional int64 partition_id = 2;
529+
}
530+
531+
message ListKvSnapshotsResponse {
532+
required int64 table_id = 1;
533+
// null if it is a non-partitioned table, otherwise, it must be not null
534+
optional int64 partition_id = 2;
535+
// Active snapshots = retained_N ∪ still-in-use; multiple entries per bucket allowed.
536+
repeated PbKvSnapshot active_snapshots = 3;
537+
}
538+
509539
// create partition request and response
510540
message CreatePartitionRequest {
511541
required PbTablePath table_path = 1;

0 commit comments

Comments
 (0)