Skip to content

Commit 9d58da3

Browse files
committed
[client][server] Support adding aggregation columns
Support carrying aggregation functions through alter table add column requests. Validate aggregation functions during schema alter and reject them on non-aggregation tables.
1 parent 6d3ba38 commit 9d58da3

8 files changed

Lines changed: 169 additions & 11 deletions

File tree

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,11 @@
3434
import org.apache.fluss.config.cluster.AlterConfigOpType;
3535
import org.apache.fluss.config.cluster.ColumnPositionType;
3636
import org.apache.fluss.config.cluster.ConfigEntry;
37+
import org.apache.fluss.exception.FlussRuntimeException;
3738
import org.apache.fluss.fs.FsPath;
3839
import org.apache.fluss.fs.FsPathAndFileName;
3940
import org.apache.fluss.fs.token.ObtainedSecurityToken;
41+
import org.apache.fluss.metadata.AggFunction;
4042
import org.apache.fluss.metadata.DatabaseChange;
4143
import org.apache.fluss.metadata.DatabaseSummary;
4244
import org.apache.fluss.metadata.PartitionInfo;
@@ -94,11 +96,13 @@
9496
import org.apache.fluss.rpc.messages.RegisterProducerOffsetsRequest;
9597
import org.apache.fluss.rpc.messages.ReleaseKvSnapshotLeaseRequest;
9698
import org.apache.fluss.rpc.protocol.MergeMode;
99+
import org.apache.fluss.utils.InstantiationUtils;
97100
import org.apache.fluss.utils.json.DataTypeJsonSerde;
98101
import org.apache.fluss.utils.json.JsonSerdeUtils;
99102

100103
import javax.annotation.Nullable;
101104

105+
import java.io.IOException;
102106
import java.util.ArrayList;
103107
import java.util.Arrays;
104108
import java.util.Collection;
@@ -653,10 +657,22 @@ public static PbAddColumn toPbAddColumn(TableChange.AddColumn addColumn) {
653657
if (addColumn.getComment() != null) {
654658
pbAddColumn.setComment(addColumn.getComment());
655659
}
660+
if (addColumn.getAggFunction().isPresent()) {
661+
pbAddColumn.setSerializedAggFunction(
662+
serializeAggFunction(addColumn.getAggFunction().get()));
663+
}
656664

657665
return pbAddColumn;
658666
}
659667

668+
private static byte[] serializeAggFunction(AggFunction aggFunction) {
669+
try {
670+
return InstantiationUtils.serializeObject(aggFunction);
671+
} catch (IOException e) {
672+
throw new FlussRuntimeException("Failed to serialize aggregation function.", e);
673+
}
674+
}
675+
660676
public static PbDropColumn toPbDropColumn(TableChange.DropColumn dropColumn) {
661677
return new PbDropColumn().setColumnName(dropColumn.getName());
662678
}

fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,62 @@ void testAlterTableColumn() throws Exception {
568568
.hasMessageContaining("Column nested_row already exists");
569569
}
570570

571+
@Test
572+
void testAlterAggregationTableColumnWithAggFunction() throws Exception {
573+
TablePath tablePath = TablePath.of("test_db", "alter_aggregation_table_column");
574+
Map<String, String> properties = new HashMap<>();
575+
properties.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "aggregation");
576+
TableDescriptor tableDescriptor =
577+
TableDescriptor.builder()
578+
.schema(
579+
Schema.newBuilder()
580+
.column("id", DataTypes.INT())
581+
.column("value", DataTypes.BIGINT(), AggFunctions.SUM())
582+
.primaryKey("id")
583+
.build())
584+
.distributedBy(3, "id")
585+
.properties(properties)
586+
.build();
587+
admin.createTable(tablePath, tableDescriptor, false).get();
588+
589+
admin.alterTable(
590+
tablePath,
591+
Collections.singletonList(
592+
TableChange.addColumn(
593+
"new_value",
594+
DataTypes.BIGINT(),
595+
"new aggregate column",
596+
TableChange.ColumnPosition.last(),
597+
AggFunctions.SUM())),
598+
false)
599+
.get();
600+
601+
SchemaInfo schemaInfo = admin.getTableSchema(tablePath).get();
602+
assertThat(schemaInfo.getSchema().getAggFunction("new_value")).hasValue(AggFunctions.SUM());
603+
}
604+
605+
@Test
606+
void testAlterNonAggregationTableColumnWithAggFunction() throws Exception {
607+
TablePath tablePath = TablePath.of("test_db", "alter_non_aggregation_table_column");
608+
admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false).get();
609+
610+
assertThatThrownBy(
611+
() ->
612+
admin.alterTable(
613+
tablePath,
614+
Collections.singletonList(
615+
TableChange.addColumn(
616+
"new_value",
617+
DataTypes.BIGINT(),
618+
"new aggregate column",
619+
TableChange.ColumnPosition.last(),
620+
AggFunctions.SUM())),
621+
false)
622+
.get())
623+
.hasMessageContaining(
624+
"Aggregation function is only supported for aggregation merge engine table");
625+
}
626+
571627
@Test
572628
void testCreateInvalidDatabaseAndTable() throws Exception {
573629
assertThatThrownBy(

fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import javax.annotation.Nullable;
2323

2424
import java.util.Objects;
25+
import java.util.Optional;
2526

2627
/** {@link TableChange} represents the modification of the Fluss Table. */
2728
public interface TableChange {
@@ -42,7 +43,21 @@ static AddColumn addColumn(
4243
DataType dataType,
4344
@Nullable String comment,
4445
ColumnPosition position) {
45-
return new AddColumn(columnName, dataType, comment, position);
46+
return new AddColumn(columnName, dataType, comment, position, null);
47+
}
48+
49+
/**
50+
* A table change to add the column with specified position and aggregation function.
51+
*
52+
* @return a TableChange represents the modification.
53+
*/
54+
static AddColumn addColumn(
55+
String columnName,
56+
DataType dataType,
57+
@Nullable String comment,
58+
ColumnPosition position,
59+
@Nullable AggFunction aggFunction) {
60+
return new AddColumn(columnName, dataType, comment, position, aggFunction);
4661
}
4762

4863
/**
@@ -230,15 +245,21 @@ class AddColumn implements SchemaChange {
230245
private final String name;
231246
private final DataType dataType;
232247
private final @Nullable String comment;
248+
private final @Nullable AggFunction aggFunction;
233249

234250
private final ColumnPosition position;
235251

236252
private AddColumn(
237-
String name, DataType dataType, @Nullable String comment, ColumnPosition position) {
253+
String name,
254+
DataType dataType,
255+
@Nullable String comment,
256+
ColumnPosition position,
257+
@Nullable AggFunction aggFunction) {
238258
this.name = name;
239259
this.dataType = dataType;
240260
this.comment = comment;
241261
this.position = position;
262+
this.aggFunction = aggFunction;
242263
}
243264

244265
public String getName() {
@@ -258,6 +279,10 @@ public ColumnPosition getPosition() {
258279
return position;
259280
}
260281

282+
public Optional<AggFunction> getAggFunction() {
283+
return Optional.ofNullable(aggFunction);
284+
}
285+
261286
@Override
262287
public String toString() {
263288
return "AddColumn{"
@@ -269,6 +294,8 @@ public String toString() {
269294
+ ", comment='"
270295
+ comment
271296
+ '\''
297+
+ ", aggFunction="
298+
+ aggFunction
272299
+ ", position="
273300
+ position
274301
+ '}';

fluss-rpc/src/main/proto/FlussApi.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1154,6 +1154,7 @@ message PbAddColumn {
11541154
required bytes data_type_json = 2;
11551155
optional string comment = 3;
11561156
required int32 column_position_type = 4; // LAST=0,FIRST=1,AFTER=3
1157+
optional bytes serialized_agg_function = 5;
11571158
}
11581159

11591160
message PbDropColumn {

fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
import java.util.concurrent.Callable;
7777

7878
import static org.apache.fluss.server.utils.TableDescriptorValidation.validateAlterTableProperties;
79+
import static org.apache.fluss.server.utils.TableDescriptorValidation.validateAlterTableSchema;
7980

8081
/** A manager for metadata. */
8182
public class MetadataManager {
@@ -440,6 +441,7 @@ public void alterTableSchema(
440441
if (!schemaChanges.isEmpty()) {
441442
Schema newSchema =
442443
SchemaUpdate.applySchemaChanges(table.getSchema(), schemaChanges);
444+
validateAlterTableSchema(table, newSchema);
443445
LakeCatalog.Context lakeCatalogContext =
444446
new CoordinatorService.DefaultLakeCatalogContext(
445447
false,

fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,12 @@ private SchemaUpdate addColumn(TableChange.AddColumn addColumn) {
8282
}
8383

8484
// Delegate the actual addition to the builder
85-
builder.column(addColumn.getName(), addColumn.getDataType());
85+
if (addColumn.getAggFunction().isPresent()) {
86+
builder.column(
87+
addColumn.getName(), addColumn.getDataType(), addColumn.getAggFunction().get());
88+
} else {
89+
builder.column(addColumn.getName(), addColumn.getDataType());
90+
}
8691

8792
// Fixed: Use null check for the String comment
8893
String comment = addColumn.getComment();

fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,11 @@
2727
import org.apache.fluss.config.cluster.AlterConfigOpType;
2828
import org.apache.fluss.config.cluster.ColumnPositionType;
2929
import org.apache.fluss.config.cluster.ConfigEntry;
30+
import org.apache.fluss.exception.FlussRuntimeException;
3031
import org.apache.fluss.fs.FsPath;
3132
import org.apache.fluss.fs.token.ObtainedSecurityToken;
3233
import org.apache.fluss.lake.committer.LakeCommitResult;
34+
import org.apache.fluss.metadata.AggFunction;
3335
import org.apache.fluss.metadata.DatabaseChange;
3436
import org.apache.fluss.metadata.DatabaseSummary;
3537
import org.apache.fluss.metadata.PartitionSpec;
@@ -197,6 +199,7 @@
197199
import org.apache.fluss.server.zk.data.PartitionRegistration;
198200
import org.apache.fluss.server.zk.data.lake.LakeTable;
199201
import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot;
202+
import org.apache.fluss.utils.InstantiationUtils;
200203
import org.apache.fluss.utils.json.DataTypeJsonSerde;
201204
import org.apache.fluss.utils.json.JsonSerdeUtils;
202205
import org.apache.fluss.utils.json.TableBucketOffsets;
@@ -206,6 +209,7 @@
206209

207210
import javax.annotation.Nullable;
208211

212+
import java.io.IOException;
209213
import java.nio.ByteBuffer;
210214
import java.util.ArrayList;
211215
import java.util.Arrays;
@@ -355,17 +359,33 @@ public static List<TableChange> toAddColumns(List<PbAddColumn> addColumns) {
355359
return addColumns.stream()
356360
.filter(Objects::nonNull)
357361
.map(
358-
pbAddColumn ->
359-
TableChange.addColumn(
360-
pbAddColumn.getColumnName(),
361-
JsonSerdeUtils.readValue(
362-
pbAddColumn.getDataTypeJson(),
363-
DataTypeJsonSerde.INSTANCE),
364-
pbAddColumn.hasComment() ? pbAddColumn.getComment() : null,
365-
toColumnPosition(pbAddColumn.getColumnPositionType())))
362+
pbAddColumn -> {
363+
AggFunction aggFunction =
364+
pbAddColumn.hasSerializedAggFunction()
365+
? deserializeAggFunction(
366+
pbAddColumn.getSerializedAggFunction())
367+
: null;
368+
return TableChange.addColumn(
369+
pbAddColumn.getColumnName(),
370+
JsonSerdeUtils.readValue(
371+
pbAddColumn.getDataTypeJson(),
372+
DataTypeJsonSerde.INSTANCE),
373+
pbAddColumn.hasComment() ? pbAddColumn.getComment() : null,
374+
toColumnPosition(pbAddColumn.getColumnPositionType()),
375+
aggFunction);
376+
})
366377
.collect(Collectors.toList());
367378
}
368379

380+
private static AggFunction deserializeAggFunction(byte[] serializedAggFunction) {
381+
try {
382+
return InstantiationUtils.deserializeObject(
383+
serializedAggFunction, AggFunction.class.getClassLoader());
384+
} catch (IOException | ClassNotFoundException e) {
385+
throw new FlussRuntimeException("Failed to deserialize aggregation function.", e);
386+
}
387+
}
388+
369389
public static List<TableChange.SchemaChange> toDropColumns(List<PbDropColumn> dropColumns) {
370390
return dropColumns.stream()
371391
.filter(Objects::nonNull)

fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.fluss.server.utils;
1919

20+
import org.apache.fluss.annotation.Internal;
2021
import org.apache.fluss.config.ConfigOption;
2122
import org.apache.fluss.config.ConfigOptions;
2223
import org.apache.fluss.config.Configuration;
@@ -129,6 +130,19 @@ public static void validateTableDescriptor(
129130
checkTableLakeFormatMatchesCluster(tableConf, clusterDataLakeFormat);
130131
}
131132

133+
/** Validates the schema after altering table columns. */
134+
@Internal
135+
public static void validateAlterTableSchema(TableInfo table, Schema newSchema) {
136+
if (table.getTableConfig()
137+
.getMergeEngineType()
138+
.map(MergeEngineType.AGGREGATION::equals)
139+
.orElse(false)) {
140+
validateAggregationFunctionParameters(newSchema);
141+
} else {
142+
validateNoAggregationFunctions(newSchema);
143+
}
144+
}
145+
132146
private static void checkTableLakeFormatMatchesCluster(
133147
Configuration tableConf, @Nullable DataLakeFormat clusterDataLakeFormat) {
134148
if (clusterDataLakeFormat == null) {
@@ -323,6 +337,9 @@ private static void checkArrowCompression(Configuration tableConf) {
323337
private static void checkMergeEngine(
324338
Configuration tableConf, boolean hasPrimaryKey, Schema schema) {
325339
MergeEngineType mergeEngine = tableConf.get(ConfigOptions.TABLE_MERGE_ENGINE);
340+
if (mergeEngine != MergeEngineType.AGGREGATION) {
341+
validateNoAggregationFunctions(schema);
342+
}
326343
if (mergeEngine != null) {
327344
if (!hasPrimaryKey) {
328345
throw new InvalidConfigException(
@@ -377,6 +394,20 @@ private static void checkMergeEngine(
377394
}
378395
}
379396

397+
/** Validates that the schema doesn't contain any aggregation functions. */
398+
private static void validateNoAggregationFunctions(Schema schema) {
399+
for (Schema.Column column : schema.getColumns()) {
400+
Optional<AggFunction> aggFunction = column.getAggFunction();
401+
if (aggFunction.isPresent()) {
402+
throw new InvalidConfigException(
403+
String.format(
404+
"Aggregation function is only supported for aggregation merge engine table, "
405+
+ "but column '%s' has aggregation function '%s'.",
406+
column.getName(), aggFunction.get()));
407+
}
408+
}
409+
}
410+
380411
/**
381412
* Validates aggregation function parameters in the schema.
382413
*

0 commit comments

Comments
 (0)