Skip to content

Commit

Permalink
[rpc] Remove describeLakeStorage to get LakeStorageInfo from table pr…
Browse files Browse the repository at this point in the history
…operties
  • Loading branch information
SteNicholas committed Feb 28, 2025
1 parent 83aef68 commit f4e2326
Show file tree
Hide file tree
Showing 16 changed files with 100 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import com.alibaba.fluss.exception.TableAlreadyExistException;
import com.alibaba.fluss.exception.TableNotExistException;
import com.alibaba.fluss.exception.TableNotPartitionedException;
import com.alibaba.fluss.lakehouse.LakeStorageInfo;
import com.alibaba.fluss.metadata.DatabaseDescriptor;
import com.alibaba.fluss.metadata.DatabaseInfo;
import com.alibaba.fluss.metadata.PartitionInfo;
Expand Down Expand Up @@ -325,7 +324,4 @@ ListOffsetsResult listOffsets(
PhysicalTablePath physicalTablePath,
Collection<Integer> buckets,
OffsetSpec offsetSpec);

/** Describe the lake used for lakehouse storage. */
CompletableFuture<LakeStorageInfo> describeLakeStorage();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.alibaba.fluss.client.utils.ClientRpcMessageUtils;
import com.alibaba.fluss.cluster.Cluster;
import com.alibaba.fluss.cluster.ServerNode;
import com.alibaba.fluss.lakehouse.LakeStorageInfo;
import com.alibaba.fluss.metadata.DatabaseDescriptor;
import com.alibaba.fluss.metadata.DatabaseInfo;
import com.alibaba.fluss.metadata.PartitionInfo;
Expand All @@ -42,7 +41,6 @@
import com.alibaba.fluss.rpc.messages.CreateTableRequest;
import com.alibaba.fluss.rpc.messages.DatabaseExistsRequest;
import com.alibaba.fluss.rpc.messages.DatabaseExistsResponse;
import com.alibaba.fluss.rpc.messages.DescribeLakeStorageRequest;
import com.alibaba.fluss.rpc.messages.DropDatabaseRequest;
import com.alibaba.fluss.rpc.messages.DropTableRequest;
import com.alibaba.fluss.rpc.messages.GetDatabaseInfoRequest;
Expand Down Expand Up @@ -341,12 +339,6 @@ public ListOffsetsResult listOffsets(
return new ListOffsetsResult(bucketToOffsetMap);
}

@Override
public CompletableFuture<LakeStorageInfo> describeLakeStorage() {
return gateway.describeLakeStorage(new DescribeLakeStorageRequest())
.thenApply(ClientRpcMessageUtils::toLakeStorageInfo);
}

@Override
public void close() {
// nothing to do yet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import com.alibaba.fluss.remote.RemoteLogFetchInfo;
import com.alibaba.fluss.remote.RemoteLogSegment;
import com.alibaba.fluss.rpc.entity.FetchLogResultForBucket;
import com.alibaba.fluss.rpc.messages.DescribeLakeStorageResponse;
import com.alibaba.fluss.rpc.messages.GetFileSystemSecurityTokenResponse;
import com.alibaba.fluss.rpc.messages.GetKvSnapshotMetadataResponse;
import com.alibaba.fluss.rpc.messages.GetLatestKvSnapshotsResponse;
Expand Down Expand Up @@ -387,10 +386,6 @@ public static List<PartitionInfo> toPartitionInfos(ListPartitionInfosResponse re
.collect(Collectors.toList());
}

public static LakeStorageInfo toLakeStorageInfo(DescribeLakeStorageResponse response) {
return toLakeStorageInfo(response.getLakehouseStorageInfo());
}

private static LakeStorageInfo toLakeStorageInfo(PbLakeStorageInfo pbLakeStorageInfo) {
Map<String, String> dataLakeCatalogConfig =
toKeyValueMap(pbLakeStorageInfo.getCatalogPropertiesList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.alibaba.fluss.connector.flink.lakehouse.LakeCatalog;
import com.alibaba.fluss.connector.flink.utils.CatalogExceptionUtils;
import com.alibaba.fluss.connector.flink.utils.FlinkConversions;
import com.alibaba.fluss.connector.flink.utils.LakeStorageInfoUtils;
import com.alibaba.fluss.exception.FlussRuntimeException;
import com.alibaba.fluss.metadata.DatabaseDescriptor;
import com.alibaba.fluss.metadata.TableDescriptor;
Expand Down Expand Up @@ -262,7 +263,7 @@ public CatalogBaseTable getTable(ObjectPath objectPath)
objectPath.getDatabaseName(),
tableName.split("\\" + LAKE_TABLE_SPLITTER)[0])));
}
return getLakeTable(objectPath.getDatabaseName(), tableName);
return getLakeTable(objectPath.getDatabaseName(), tableName, tableInfo);
} else {
tableInfo = admin.getTableInfo(tablePath).get();
}
Expand All @@ -284,9 +285,10 @@ public CatalogBaseTable getTable(ObjectPath objectPath)
}
}

protected CatalogBaseTable getLakeTable(String databaseName, String tableName)
protected CatalogBaseTable getLakeTable(
String databaseName, String tableName, TableInfo tableInfo)
throws TableNotExistException, CatalogException {
mayInitLakeCatalogCatalog();
mayInitLakeCatalogCatalog(tableInfo);
String[] tableComponents = tableName.split("\\" + LAKE_TABLE_SPLITTER);
if (tableComponents.length == 1) {
// should be pattern like table_name$lake
Expand Down Expand Up @@ -530,13 +532,14 @@ protected TablePath toTablePath(ObjectPath objectPath) {
return TablePath.of(objectPath.getDatabaseName(), objectPath.getObjectName());
}

private void mayInitLakeCatalogCatalog() {
private void mayInitLakeCatalogCatalog(TableInfo tableInfo) {
if (lakeCatalog == null) {
synchronized (this) {
if (lakeCatalog == null) {
try {
Map<String, String> catalogProperties =
admin.describeLakeStorage().get().getCatalogProperties();
LakeStorageInfoUtils.getLakeStorageInfo(tableInfo)
.getCatalogProperties();
lakeCatalog = new LakeCatalog(catalogName, catalogProperties, classLoader);
} catch (Exception e) {
throw new FlussRuntimeException("Failed to init paimon catalog.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@
package com.alibaba.fluss.connector.flink.lakehouse;

import com.alibaba.fluss.client.Connection;
import com.alibaba.fluss.client.admin.Admin;
import com.alibaba.fluss.client.table.Table;
import com.alibaba.fluss.connector.flink.lakehouse.paimon.reader.PaimonSnapshotAndLogSplitScanner;
import com.alibaba.fluss.connector.flink.lakehouse.paimon.reader.PaimonSnapshotScanner;
import com.alibaba.fluss.connector.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit;
import com.alibaba.fluss.connector.flink.lakehouse.paimon.split.PaimonSnapshotSplit;
import com.alibaba.fluss.connector.flink.source.reader.BoundedSplitReader;
import com.alibaba.fluss.connector.flink.source.split.SourceSplitBase;
import com.alibaba.fluss.lakehouse.LakeStorageInfo;
import com.alibaba.fluss.connector.flink.utils.LakeStorageInfoUtils;
import com.alibaba.fluss.metadata.TablePath;

import org.apache.flink.util.ExceptionUtils;
Expand Down Expand Up @@ -120,18 +119,16 @@ private FileStoreTable getFileStoreTable() {
return fileStoreTable;
}

try (Admin admin = connection.getAdmin()) {
LakeStorageInfo dataLakeInfo = admin.describeLakeStorage().get();
try (Catalog paimonCatalog =
FlinkCatalogFactory.createPaimonCatalog(
Options.fromMap(dataLakeInfo.getCatalogProperties()))) {
fileStoreTable =
(FileStoreTable)
paimonCatalog.getTable(
Identifier.create(
tablePath.getDatabaseName(),
tablePath.getTableName()));
}
try (Catalog paimonCatalog =
FlinkCatalogFactory.createPaimonCatalog(
Options.fromMap(
LakeStorageInfoUtils.getLakeStorageInfo(table.getTableInfo())
.getCatalogProperties()))) {
fileStoreTable =
(FileStoreTable)
paimonCatalog.getTable(
Identifier.create(
tablePath.getDatabaseName(), tablePath.getTableName()));
return fileStoreTable;
} catch (Exception e) {
throw new FlinkRuntimeException(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (c) 2024 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.connector.flink.utils;

import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.lakehouse.LakeStorageInfo;
import com.alibaba.fluss.metadata.DataLakeFormat;
import com.alibaba.fluss.metadata.TableInfo;

import java.util.HashMap;
import java.util.Map;

/** Utility class for {@link LakeStorageInfo}. */
public class LakeStorageInfoUtils {

private static final String TABLE_DATALAKE_PAIMON_PREFIX = "table.datalake.paimon.";

public static LakeStorageInfo getLakeStorageInfo(TableInfo tableInfo) {
DataLakeFormat datalakeFormat =
tableInfo.getProperties().get(ConfigOptions.TABLE_DATALAKE_FORMAT);
if (datalakeFormat == null) {
throw new IllegalArgumentException(
String.format(
"The lakehouse storage is not set, please set it by %s",
ConfigOptions.TABLE_DATALAKE_FORMAT.key()));
}

if (datalakeFormat != DataLakeFormat.PAIMON) {
throw new UnsupportedOperationException(
String.format(
"The lakehouse storage %s "
+ " is not supported. Only %s is supported.",
datalakeFormat, DataLakeFormat.PAIMON));
}

// currently, extract catalog config
Map<String, String> datalakeConfig = new HashMap<>();
Map<String, String> flussConfig = tableInfo.getCustomProperties().toMap();
for (Map.Entry<String, String> configEntry : flussConfig.entrySet()) {
String configKey = configEntry.getKey();
String configValue = configEntry.getValue();
if (configKey.startsWith(TABLE_DATALAKE_PAIMON_PREFIX)) {
datalakeConfig.put(
configKey.substring(TABLE_DATALAKE_PAIMON_PREFIX.length()), configValue);
}
}
return new LakeStorageInfo(datalakeFormat.toString(), datalakeConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@
import com.alibaba.fluss.config.AutoPartitionTimeUnit;
import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.lakehouse.paimon.sink.PaimonDataBaseSyncSinkBuilder;
import com.alibaba.fluss.metadata.Schema;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metadata.TableDescriptor;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.metadata.*;
import com.alibaba.fluss.row.InternalRow;
import com.alibaba.fluss.server.replica.Replica;
import com.alibaba.fluss.types.DataTypes;
Expand Down Expand Up @@ -148,7 +145,10 @@ void testPrimaryKeyTable(boolean isPartitioned) throws Exception {
batchTEnv.executeSql(
String.format("select * from %s$lake$options", tableName)));
assertThat(paimonOptionsRows.toString())
.isEqualTo("[+I[bucket, 3], +I[bucket-key, a], +I[changelog-producer, input]]");
.isEqualTo(
String.format(
"[+I[bucket, 3], +I[bucket-key, a], +I[changelog-producer, input], +I[table.datalake.paimon.metastore, filesystem], +I[table.datalake.paimon.warehouse, %s]]",
warehousePath));

// stop sync database job
jobClient.cancel().get();
Expand Down Expand Up @@ -306,7 +306,10 @@ protected long createPkTable(TablePath tablePath, int bucketNum, boolean isParti
TableDescriptor.Builder tableBuilder =
TableDescriptor.builder()
.distributedBy(bucketNum)
.property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true");
.property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true")
.property(ConfigOptions.TABLE_DATALAKE_FORMAT, DataLakeFormat.PAIMON)
.customProperty("table.datalake.paimon.metastore", "filesystem")
.customProperty("table.datalake.paimon.warehouse", warehousePath);

if (isPartitioned) {
schemaBuilder.column("c", DataTypes.STRING());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class FlinkPaimonTestBase {
protected static Connection conn;
protected static Admin admin;
protected static Configuration clientConf;
private static String warehousePath;
protected static String warehousePath;

private static Configuration initConfig() {
Configuration conf = new Configuration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.alibaba.fluss.lakehouse.paimon.sink.NewTablesAddedPaimonListener;
import com.alibaba.fluss.lakehouse.paimon.sink.PaimonDataBaseSyncSinkBuilder;
import com.alibaba.fluss.lakehouse.paimon.source.FlussDatabaseSyncSource;
import com.alibaba.fluss.metadata.DataLakeFormat;
import com.alibaba.fluss.metadata.Schema;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metadata.TableDescriptor;
Expand Down Expand Up @@ -122,7 +123,10 @@ protected long createLogTable(TablePath tablePath, int bucketNum, boolean isPart
TableDescriptor.Builder tableBuilder =
TableDescriptor.builder()
.distributedBy(bucketNum, "a")
.property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true");
.property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true")
.property(ConfigOptions.TABLE_DATALAKE_FORMAT, DataLakeFormat.PAIMON)
.customProperty("table.datalake.paimon.metastore", "filesystem")
.customProperty("table.datalake.paimon.warehouse", warehousePath);

if (isPartitioned) {
schemaBuilder.column("c", DataTypes.STRING());
Expand Down Expand Up @@ -150,6 +154,9 @@ protected long createPkTable(TablePath tablePath, int bucketNum) throws Exceptio
.build())
.distributedBy(bucketNum)
.property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true")
.property(ConfigOptions.TABLE_DATALAKE_FORMAT, DataLakeFormat.PAIMON)
.customProperty("table.datalake.paimon.metastore", "filesystem")
.customProperty("table.datalake.paimon.warehouse", warehousePath)
.build();
return createTable(tablePath, table1Descriptor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import com.alibaba.fluss.rpc.RpcGateway;
import com.alibaba.fluss.rpc.messages.DatabaseExistsRequest;
import com.alibaba.fluss.rpc.messages.DatabaseExistsResponse;
import com.alibaba.fluss.rpc.messages.DescribeLakeStorageRequest;
import com.alibaba.fluss.rpc.messages.DescribeLakeStorageResponse;
import com.alibaba.fluss.rpc.messages.GetDatabaseInfoRequest;
import com.alibaba.fluss.rpc.messages.GetDatabaseInfoResponse;
import com.alibaba.fluss.rpc.messages.GetFileSystemSecurityTokenRequest;
Expand Down Expand Up @@ -179,15 +177,6 @@ CompletableFuture<GetFileSystemSecurityTokenResponse> getFileSystemSecurityToken
CompletableFuture<ListPartitionInfosResponse> listPartitionInfos(
ListPartitionInfosRequest request);

/**
* Describe the lake storage used for Fluss.
*
* @return a future returns lake storage info
*/
@RPC(api = ApiKeys.DESCRIBE_LAKE_STORAGE)
CompletableFuture<DescribeLakeStorageResponse> describeLakeStorage(
DescribeLakeStorageRequest request);

/**
* Get the latest lake snapshot for the given table.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public enum ApiKeys {
NOTIFY_KV_SNAPSHOT_OFFSET(1029, 0, 0, PRIVATE),
COMMIT_LAKE_TABLE_SNAPSHOT(1030, 0, 0, PRIVATE),
NOTIFY_LAKE_TABLE_OFFSET(1031, 0, 0, PRIVATE),
DESCRIBE_LAKE_STORAGE(1032, 0, 0, PUBLIC),
GET_LATEST_LAKE_SNAPSHOT(1033, 0, 0, PUBLIC),
LIMIT_SCAN(1034, 0, 0, PUBLIC),
PREFIX_LOOKUP(1035, 0, 0, PUBLIC),
Expand Down
7 changes: 0 additions & 7 deletions fluss-rpc/src/main/proto/FlussApi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -359,13 +359,6 @@ message GetFileSystemSecurityTokenResponse {
repeated PbKeyValue addition_info = 4;
}

message DescribeLakeStorageRequest {
}

message DescribeLakeStorageResponse {
required PbLakeStorageInfo lakehouse_storage_info = 1;
}

// init writer request and response
message InitWriterRequest {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@
import com.alibaba.fluss.rpc.messages.ApiVersionsResponse;
import com.alibaba.fluss.rpc.messages.DatabaseExistsRequest;
import com.alibaba.fluss.rpc.messages.DatabaseExistsResponse;
import com.alibaba.fluss.rpc.messages.DescribeLakeStorageRequest;
import com.alibaba.fluss.rpc.messages.DescribeLakeStorageResponse;
import com.alibaba.fluss.rpc.messages.GetDatabaseInfoRequest;
import com.alibaba.fluss.rpc.messages.GetDatabaseInfoResponse;
import com.alibaba.fluss.rpc.messages.GetFileSystemSecurityTokenRequest;
Expand Down Expand Up @@ -413,17 +411,6 @@ public CompletableFuture<ListPartitionInfosResponse> listPartitionInfos(
RpcMessageUtils.toListPartitionInfosResponse(partitionNameAndIds));
}

@Override
public CompletableFuture<DescribeLakeStorageResponse> describeLakeStorage(
DescribeLakeStorageRequest request) {
if (lakeStorageInfo == null) {
throw new LakeStorageNotConfiguredException("Lake storage is not configured.");
}

return CompletableFuture.completedFuture(
RpcMessageUtils.makeDescribeLakeStorageResponse(lakeStorageInfo));
}

@Override
public CompletableFuture<GetLatestLakeSnapshotResponse> getLatestLakeSnapshot(
GetLatestLakeSnapshotRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import com.alibaba.fluss.rpc.messages.CommitKvSnapshotRequest;
import com.alibaba.fluss.rpc.messages.CommitLakeTableSnapshotRequest;
import com.alibaba.fluss.rpc.messages.CommitRemoteLogManifestRequest;
import com.alibaba.fluss.rpc.messages.DescribeLakeStorageResponse;
import com.alibaba.fluss.rpc.messages.FetchLogRequest;
import com.alibaba.fluss.rpc.messages.FetchLogResponse;
import com.alibaba.fluss.rpc.messages.GetFileSystemSecurityTokenResponse;
Expand Down Expand Up @@ -1227,13 +1226,6 @@ public static NotifyLakeTableOffsetData getNotifyLakeTableOffset(
notifyLakeTableOffsetRequest.getCoordinatorEpoch(), lakeBucketOffsetMap);
}

public static DescribeLakeStorageResponse makeDescribeLakeStorageResponse(
LakeStorageInfo lakeStorageInfo) {
DescribeLakeStorageResponse describeLakeStorageResponse = new DescribeLakeStorageResponse();
describeLakeStorageResponse.setLakehouseStorageInfo(toPbLakeStorageInfo(lakeStorageInfo));
return describeLakeStorageResponse;
}

public static GetLatestLakeSnapshotResponse makeGetLatestLakeSnapshotResponse(
long tableId, LakeStorageInfo lakeStorageInfo, LakeTableSnapshot lakeTableSnapshot) {
GetLatestLakeSnapshotResponse getLakeTableSnapshotResponse =
Expand Down
Loading

0 comments on commit f4e2326

Please sign in to comment.