Skip to content

Commit b6717fe

Browse files
authored
[server] Support altering auto partition enabled option (#3453)
1 parent 296a0a2 commit b6717fe

6 files changed

Lines changed: 203 additions & 12 deletions

File tree

fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public class FlussConfigUtils {
5050
ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(),
5151
ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION.key(),
5252
ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key(),
53+
ConfigOptions.TABLE_AUTO_PARTITION_ENABLED.key(),
5354
ConfigOptions.TABLE_AUTO_PARTITION_NUM_RETENTION.key(),
5455
ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE.key(),
5556
ConfigOptions.TABLE_STATISTICS_COLUMNS.key(),

fluss-common/src/main/java/org/apache/fluss/utils/AutoPartitionStrategy.java

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,26 +22,27 @@
2222
import org.apache.fluss.config.Configuration;
2323

2424
import java.util.Map;
25+
import java.util.Objects;
2526
import java.util.TimeZone;
2627

2728
/** A class wrapping the strategy for auto partition. */
2829
public class AutoPartitionStrategy {
2930

30-
private final boolean autoPartitionEnable;
31+
private final boolean autoPartitionEnabled;
3132
private final String key;
3233
private final AutoPartitionTimeUnit timeUnit;
3334
private final int numPreCreate;
3435
private final int numToRetain;
3536
private final TimeZone timeZone;
3637

3738
private AutoPartitionStrategy(
38-
boolean autoPartitionEnable,
39+
boolean autoPartitionEnabled,
3940
String key,
4041
AutoPartitionTimeUnit autoPartitionTimeUnit,
4142
int numPreCreate,
4243
int numToRetain,
4344
TimeZone timeZone) {
44-
this.autoPartitionEnable = autoPartitionEnable;
45+
this.autoPartitionEnabled = autoPartitionEnabled;
4546
this.key = key;
4647
this.timeUnit = autoPartitionTimeUnit;
4748
this.numPreCreate = numPreCreate;
@@ -64,7 +65,7 @@ public static AutoPartitionStrategy from(Configuration conf) {
6465
}
6566

6667
public boolean isAutoPartitionEnabled() {
67-
return autoPartitionEnable;
68+
return autoPartitionEnabled;
6869
}
6970

7071
public String key() {
@@ -86,4 +87,46 @@ public int numToRetain() {
8687
public TimeZone timeZone() {
8788
return timeZone;
8889
}
90+
91+
@Override
92+
public String toString() {
93+
return "AutoPartitionStrategy{"
94+
+ "autoPartitionEnabled="
95+
+ autoPartitionEnabled
96+
+ ", key='"
97+
+ key
98+
+ '\''
99+
+ ", timeUnit="
100+
+ timeUnit
101+
+ ", numPreCreate="
102+
+ numPreCreate
103+
+ ", numToRetain="
104+
+ numToRetain
105+
+ ", timeZone="
106+
+ timeZone
107+
+ '}';
108+
}
109+
110+
@Override
111+
public boolean equals(Object o) {
112+
if (this == o) {
113+
return true;
114+
}
115+
if (o == null || getClass() != o.getClass()) {
116+
return false;
117+
}
118+
AutoPartitionStrategy that = (AutoPartitionStrategy) o;
119+
return autoPartitionEnabled == that.autoPartitionEnabled
120+
&& numPreCreate == that.numPreCreate
121+
&& numToRetain == that.numToRetain
122+
&& Objects.equals(key, that.key)
123+
&& timeUnit == that.timeUnit
124+
&& Objects.equals(timeZone, that.timeZone);
125+
}
126+
127+
@Override
128+
public int hashCode() {
129+
return Objects.hash(
130+
autoPartitionEnabled, key, timeUnit, numPreCreate, numToRetain, timeZone);
131+
}
89132
}

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -251,14 +251,22 @@ void testAlterTableConfig() throws Exception {
251251

252252
// alter table set an unsupported modification option should throw exception
253253
String unSupportedDml1 =
254-
"alter table test_alter_table_append_only set ('table.auto-partition.enabled' = 'true', 'table.kv.format' = 'indexed')";
254+
"alter table test_alter_table_append_only set ('table.kv.format' = 'indexed')";
255255

256256
assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml1))
257257
.rootCause()
258258
.isInstanceOf(InvalidAlterTableException.class)
259259
.hasMessageContaining("The following options are not supported to alter yet:")
260-
.hasMessageContaining("table.kv.format")
261-
.hasMessageContaining("table.auto-partition.enabled");
260+
.hasMessageContaining("table.kv.format");
261+
262+
// alter table to enable auto partition for a non-partitioned table should fail validation
263+
String invalidAutoPartitionDml =
264+
"alter table test_alter_table_append_only set ('table.auto-partition.enabled' = 'true')";
265+
assertThatThrownBy(() -> tEnv.executeSql(invalidAutoPartitionDml))
266+
.rootCause()
267+
.isInstanceOf(InvalidConfigException.class)
268+
.hasMessage(
269+
"Currently, auto partition is only supported for partitioned table, please set table property 'table.auto-partition.enabled' to false.");
262270

263271
String unSupportedDml2 =
264272
"alter table test_alter_table_append_only set ('bucket.num' = '1000')";
@@ -666,6 +674,36 @@ void testAlterAutoPartitionNumPrecreate() throws Exception {
666674
.doesNotContainKey(ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE.key());
667675
}
668676

677+
@Test
678+
void testAlterAutoPartitionEnabled() throws Exception {
679+
String tblName = "test_alter_auto_partition_enabled";
680+
ObjectPath objectPath = new ObjectPath(DEFAULT_DB, tblName);
681+
TablePath tablePath = new TablePath(DEFAULT_DB, tblName);
682+
683+
tEnv.executeSql(
684+
"create table "
685+
+ tblName
686+
+ " (a int, dt string) partitioned by (dt) "
687+
+ "with ('table.auto-partition.enabled' = 'false',"
688+
+ " 'table.auto-partition.key' = 'dt',"
689+
+ " 'table.auto-partition.time-unit' = 'hour',"
690+
+ " 'table.auto-partition.num-precreate' = '2')");
691+
692+
tEnv.executeSql(
693+
"alter table " + tblName + " set ('table.auto-partition.enabled' = 'true')");
694+
FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath, 2);
695+
696+
CatalogTable table = (CatalogTable) catalog.getTable(objectPath);
697+
assertThat(table.getOptions().get(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED.key()))
698+
.isEqualTo("true");
699+
700+
tEnv.executeSql(
701+
"alter table " + tblName + " set ('table.auto-partition.enabled' = 'false')");
702+
table = (CatalogTable) catalog.getTable(objectPath);
703+
assertThat(table.getOptions().get(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED.key()))
704+
.isEqualTo("false");
705+
}
706+
669707
@Test
670708
void testTableWithExpression() throws Exception {
671709
// create a table with watermark and computed column

fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,34 @@ public void removeAutoPartitionTable(long tableId) {
194194
}
195195
}
196196

197+
/**
198+
* Handles a table's auto-partition strategy change after table properties are updated.
199+
*
200+
* @param newTableInfo the updated table information
201+
* @param oldStrategy the old auto partition strategy
202+
* @param newStrategy the updated auto partition strategy
203+
*/
204+
public void handleAutoPartitionStrategyChange(
205+
TableInfo newTableInfo,
206+
AutoPartitionStrategy oldStrategy,
207+
AutoPartitionStrategy newStrategy) {
208+
checkNotClosed();
209+
long tableId = newTableInfo.getTableId();
210+
boolean oldAutoPartitionEnabled = oldStrategy.isAutoPartitionEnabled();
211+
boolean newAutoPartitionEnabled = newStrategy.isAutoPartitionEnabled();
212+
213+
if (!oldAutoPartitionEnabled && newAutoPartitionEnabled) {
214+
LOG.info("Table {} auto partition enabled from false to true.", tableId);
215+
addAutoPartitionTable(newTableInfo, true);
216+
} else if (oldAutoPartitionEnabled && !newAutoPartitionEnabled) {
217+
LOG.info("Table {} auto partition enabled from true to false.", tableId);
218+
removeAutoPartitionTable(tableId);
219+
} else if (newAutoPartitionEnabled) {
220+
LOG.info("Table {} auto partition strategy changed.", tableId);
221+
updateAutoPartitionTables(newTableInfo);
222+
}
223+
}
224+
197225
/** Must be called while holding {@link #lock}. */
198226
@Nullable
199227
private TableInfo removeAutoPartitionTableLocked(long tableId) {
@@ -319,6 +347,12 @@ private void doAutoPartition(Instant now, Set<Long> tableIds, boolean forceDoAut
319347
}
320348

321349
TableInfo tableInfo = autoPartitionTables.get(tableId);
350+
if (tableInfo == null) {
351+
LOG.debug(
352+
"Skipping auto partitioning for table id {} as it is not registered.",
353+
tableId);
354+
continue;
355+
}
322356
TablePath tablePath = tableInfo.getTablePath();
323357
TreeMap<String, Set<String>> currentPartitions =
324358
partitionsByTable.computeIfAbsent(

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -853,11 +853,14 @@ private void postAlterTableProperties(TableInfo oldTableInfo, TableInfo newTable
853853
oldTableInfo.getTableConfig().getAutoPartitionStrategy();
854854
AutoPartitionStrategy newAutoPartitionStrategy =
855855
newTableInfo.getTableConfig().getAutoPartitionStrategy();
856-
if (newAutoPartitionStrategy.isAutoPartitionEnabled()
857-
&& (newAutoPartitionStrategy.numToRetain() != oldAutoPartitionStrategy.numToRetain()
858-
|| newAutoPartitionStrategy.numPreCreate()
859-
!= oldAutoPartitionStrategy.numPreCreate())) {
860-
autoPartitionManager.updateAutoPartitionTables(newTableInfo);
856+
if (!Objects.equals(oldAutoPartitionStrategy, newAutoPartitionStrategy)) {
857+
LOG.info(
858+
"Table {} auto partition strategy changed from {} to {}.",
859+
oldTableInfo.getTableId(),
860+
oldAutoPartitionStrategy,
861+
newAutoPartitionStrategy);
862+
autoPartitionManager.handleAutoPartitionStrategyChange(
863+
newTableInfo, oldAutoPartitionStrategy, newAutoPartitionStrategy);
861864
}
862865

863866
// If standby replica config changed, trigger re-election for all online buckets

fluss-server/src/test/java/org/apache/fluss/server/coordinator/AutoPartitionManagerTest.java

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -746,6 +746,67 @@ void testUpdateAutoPartitionNumPrecreate() throws Exception {
746746
.containsExactlyInAnyOrder("2024091000", "2024091001", "2024091002", "2024091003");
747747
}
748748

749+
@Test
750+
void testUpdateAutoPartitionEnabled() throws Exception {
751+
ZonedDateTime startTime =
752+
LocalDateTime.parse("2024-09-10T00:00:00").atZone(ZoneId.systemDefault());
753+
long startMs = startTime.toInstant().toEpochMilli();
754+
ManualClock clock = new ManualClock(startMs);
755+
ManuallyTriggeredScheduledExecutorService periodicExecutor =
756+
new ManuallyTriggeredScheduledExecutorService();
757+
758+
AutoPartitionManager autoPartitionManager =
759+
new AutoPartitionManager(
760+
new TestingServerMetadataCache(3),
761+
metadataManager,
762+
remoteDirDynamicLoader,
763+
new Configuration(),
764+
clock,
765+
periodicExecutor);
766+
autoPartitionManager.start();
767+
768+
TableInfo table = createPartitionedTable(-1, 4, AutoPartitionTimeUnit.HOUR);
769+
TablePath tablePath = table.getTablePath();
770+
autoPartitionManager.addAutoPartitionTable(table, true);
771+
periodicExecutor.triggerNonPeriodicScheduledTask();
772+
773+
Map<String, PartitionRegistration> partitions =
774+
zookeeperClient.getPartitionRegistrations(tablePath);
775+
assertThat(partitions.keySet())
776+
.containsExactlyInAnyOrder("2024091000", "2024091001", "2024091002", "2024091003");
777+
778+
TableInfo disabledTable = createUpdatedAutoPartitionEnabledTableInfo(table, false);
779+
autoPartitionManager.handleAutoPartitionStrategyChange(
780+
disabledTable,
781+
table.getTableConfig().getAutoPartitionStrategy(),
782+
disabledTable.getTableConfig().getAutoPartitionStrategy());
783+
784+
clock.advanceTime(Duration.ofHours(4));
785+
periodicExecutor.triggerPeriodicScheduledTasks();
786+
partitions = zookeeperClient.getPartitionRegistrations(tablePath);
787+
assertThat(partitions.keySet())
788+
.containsExactlyInAnyOrder("2024091000", "2024091001", "2024091002", "2024091003");
789+
790+
TableInfo reEnabledTable = createUpdatedAutoPartitionEnabledTableInfo(disabledTable, true);
791+
autoPartitionManager.handleAutoPartitionStrategyChange(
792+
reEnabledTable,
793+
disabledTable.getTableConfig().getAutoPartitionStrategy(),
794+
reEnabledTable.getTableConfig().getAutoPartitionStrategy());
795+
periodicExecutor.triggerNonPeriodicScheduledTask();
796+
797+
partitions = zookeeperClient.getPartitionRegistrations(tablePath);
798+
assertThat(partitions.keySet())
799+
.containsExactlyInAnyOrder(
800+
"2024091000",
801+
"2024091001",
802+
"2024091002",
803+
"2024091003",
804+
"2024091004",
805+
"2024091005",
806+
"2024091006",
807+
"2024091007");
808+
}
809+
749810
// ---------------------------------------------------------------------------------------
750811
// Batch / inflight tests previously housed here have moved to TableLifecycleThrottlerTest.
751812
// The AutoPartitionManager now drops expired partitions synchronously and the asynchronous
@@ -1033,6 +1094,17 @@ private TableInfo createUpdatedTableInfo(
10331094
Configuration newProperties = new Configuration(original.getProperties());
10341095
newProperties.set(ConfigOptions.TABLE_AUTO_PARTITION_NUM_RETENTION, newNumRetention);
10351096
newProperties.set(ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE, newNumPreCreate);
1097+
return createUpdatedTableInfo(original, newProperties);
1098+
}
1099+
1100+
private TableInfo createUpdatedAutoPartitionEnabledTableInfo(
1101+
TableInfo original, boolean autoPartitionEnabled) {
1102+
Configuration newProperties = new Configuration(original.getProperties());
1103+
newProperties.set(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, autoPartitionEnabled);
1104+
return createUpdatedTableInfo(original, newProperties);
1105+
}
1106+
1107+
private TableInfo createUpdatedTableInfo(TableInfo original, Configuration newProperties) {
10361108
return new TableInfo(
10371109
original.getTablePath(),
10381110
original.getTableId(),

0 commit comments

Comments
 (0)