Skip to content

Commit 3b0afc0

Browse files
authored
[server] Fix tableCount metric after dropping partitioned tables (#3482)
1 parent b6717fe commit 3b0afc0

3 files changed

Lines changed: 50 additions & 1 deletion

File tree

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -959,6 +959,17 @@ private void processDropTable(DropTableEvent dropTableEvent) {
959959

960960
// remove table metrics.
961961
coordinatorMetricGroup.removeTableMetricGroup(dropTableInfo.getTablePath(), tableId);
962+
963+
// For partitioned tables, the dropped table has no table-level replicas
964+
// (all buckets live under partitionAssignments), so getAllReplicasForTable
965+
// returns empty and areAllReplicasInState(.., ReplicaDeletionSuccessful)
966+
// is vacuously true. Without this explicit trigger, the table would linger
967+
// in tablesToBeDeleted (and the tableCount metric) until some unrelated
968+
// replica-deletion response happens to invoke resumeDeletions(), which
969+
// can be never if no other table is being dropped.
970+
if (dropTableInfo.isPartitioned()) {
971+
tableManager.resumeDeletions();
972+
}
962973
}
963974

964975
private void processDropPartition(DropPartitionEvent dropPartitionEvent) {

fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManager.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.slf4j.Logger;
3333
import org.slf4j.LoggerFactory;
3434

35+
import java.util.Set;
3536
import java.util.concurrent.LinkedBlockingQueue;
3637
import java.util.concurrent.TimeUnit;
3738
import java.util.concurrent.locks.Lock;
@@ -119,7 +120,19 @@ private void updateMetricsViaAccessContext() {
119120
context -> {
120121
int coordinatorServerCount = context.getLiveCoordinatorServers().size();
121122
int tabletServerCount = context.getLiveTabletServers().size();
122-
int tableCount = context.allTables().size();
123+
// Exclude tables that have been queued for deletion (DropTable RPC
124+
// already acked, but completeDeleteTable has not yet removed them
125+
// from tablePathById). We dedup by tableId rather than subtracting
126+
// sizes so the result is robust even if tablesToBeDeleted ever
127+
// drifts out of sync with tablePathById -- it can never go negative
128+
// and only counts ids that are truly still in tablePathById.
129+
Set<Long> allTables = context.allTables().keySet();
130+
int tableCount = allTables.size();
131+
for (Long toDelete : context.getTablesToBeDeleted()) {
132+
if (allTables.contains(toDelete)) {
133+
tableCount--;
134+
}
135+
}
123136
int lakeTableCount = context.getLakeTableCount();
124137
int bucketCount = context.bucketLeaderAndIsr().size();
125138
int partitionCount = context.getTotalPartitionCount();

fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -891,6 +891,31 @@ void testDropPartitionedTableRoutedThroughCleanupManager() throws Exception {
891891
});
892892
}
893893

894+
@Test
895+
void testDropPartitionedTableWithNoPartitionsClearsTableState() throws Exception {
896+
TablePath tablePath =
897+
TablePath.of(defaultDatabase, "test_drop_partitioned_table_no_partitions");
898+
initCoordinatorChannel();
899+
900+
long tableId =
901+
metadataManager.createTable(
902+
tablePath, remoteDataDir, getPartitionedTable(), null, false);
903+
retryVerifyContext(ctx -> assertThat(ctx.getTablePathById(tableId)).isNotNull());
904+
905+
metadataManager.dropTable(tablePath, false);
906+
907+
// The fix at the tail of processDropTable explicitly calls resumeDeletions(), which
908+
// sees vacuous-true (getAllReplicasForTable returns empty) and runs
909+
// completeDeleteTable -> removeTable, evicting the table from both tablesToBeDeleted
910+
// and tablePathById.
911+
retryVerifyContext(
912+
ctx -> {
913+
assertThat(ctx.getTablePathById(tableId)).isNull();
914+
assertThat(ctx.getTablesToBeDeleted()).doesNotContain(tableId);
915+
assertThat(ctx.allTables()).doesNotContainKey(tableId);
916+
});
917+
}
918+
894919
@Test
895920
void testStartupResumesDropPartitionThroughCleanupManager() throws Exception {
896921
TablePath tablePath = TablePath.of(defaultDatabase, "test_startup_resume_via_cleanup");

0 commit comments

Comments
 (0)