Skip to content

Commit

Permalink
[rpc] Remove describeLakeStorage to get LakeStorageInfo from table pr…
Browse files Browse the repository at this point in the history
…operties
  • Loading branch information
SteNicholas committed Feb 27, 2025
1 parent 83aef68 commit 71497c2
Show file tree
Hide file tree
Showing 13 changed files with 16 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import com.alibaba.fluss.exception.TableAlreadyExistException;
import com.alibaba.fluss.exception.TableNotExistException;
import com.alibaba.fluss.exception.TableNotPartitionedException;
import com.alibaba.fluss.lakehouse.LakeStorageInfo;
import com.alibaba.fluss.metadata.DatabaseDescriptor;
import com.alibaba.fluss.metadata.DatabaseInfo;
import com.alibaba.fluss.metadata.PartitionInfo;
Expand Down Expand Up @@ -325,7 +324,4 @@ ListOffsetsResult listOffsets(
PhysicalTablePath physicalTablePath,
Collection<Integer> buckets,
OffsetSpec offsetSpec);

/** Describe the lake used for lakehouse storage. */
CompletableFuture<LakeStorageInfo> describeLakeStorage();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.alibaba.fluss.client.utils.ClientRpcMessageUtils;
import com.alibaba.fluss.cluster.Cluster;
import com.alibaba.fluss.cluster.ServerNode;
import com.alibaba.fluss.lakehouse.LakeStorageInfo;
import com.alibaba.fluss.metadata.DatabaseDescriptor;
import com.alibaba.fluss.metadata.DatabaseInfo;
import com.alibaba.fluss.metadata.PartitionInfo;
Expand All @@ -42,7 +41,6 @@
import com.alibaba.fluss.rpc.messages.CreateTableRequest;
import com.alibaba.fluss.rpc.messages.DatabaseExistsRequest;
import com.alibaba.fluss.rpc.messages.DatabaseExistsResponse;
import com.alibaba.fluss.rpc.messages.DescribeLakeStorageRequest;
import com.alibaba.fluss.rpc.messages.DropDatabaseRequest;
import com.alibaba.fluss.rpc.messages.DropTableRequest;
import com.alibaba.fluss.rpc.messages.GetDatabaseInfoRequest;
Expand Down Expand Up @@ -341,12 +339,6 @@ public ListOffsetsResult listOffsets(
return new ListOffsetsResult(bucketToOffsetMap);
}

@Override
public CompletableFuture<LakeStorageInfo> describeLakeStorage() {
return gateway.describeLakeStorage(new DescribeLakeStorageRequest())
.thenApply(ClientRpcMessageUtils::toLakeStorageInfo);
}

@Override
public void close() {
// nothing to do yet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import com.alibaba.fluss.remote.RemoteLogFetchInfo;
import com.alibaba.fluss.remote.RemoteLogSegment;
import com.alibaba.fluss.rpc.entity.FetchLogResultForBucket;
import com.alibaba.fluss.rpc.messages.DescribeLakeStorageResponse;
import com.alibaba.fluss.rpc.messages.GetFileSystemSecurityTokenResponse;
import com.alibaba.fluss.rpc.messages.GetKvSnapshotMetadataResponse;
import com.alibaba.fluss.rpc.messages.GetLatestKvSnapshotsResponse;
Expand Down Expand Up @@ -387,10 +386,6 @@ public static List<PartitionInfo> toPartitionInfos(ListPartitionInfosResponse re
.collect(Collectors.toList());
}

public static LakeStorageInfo toLakeStorageInfo(DescribeLakeStorageResponse response) {
return toLakeStorageInfo(response.getLakehouseStorageInfo());
}

private static LakeStorageInfo toLakeStorageInfo(PbLakeStorageInfo pbLakeStorageInfo) {
Map<String, String> dataLakeCatalogConfig =
toKeyValueMap(pbLakeStorageInfo.getCatalogPropertiesList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.alibaba.fluss.server.utils;
package com.alibaba.fluss.utils.lakehouse;

import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.utils.ExceptionUtils;
import com.alibaba.fluss.utils.IOUtils;
import com.alibaba.fluss.utils.lakehouse.LakeStorageUtils;

import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
Expand Down Expand Up @@ -536,7 +537,8 @@ private void mayInitLakeCatalogCatalog() {
if (lakeCatalog == null) {
try {
Map<String, String> catalogProperties =
admin.describeLakeStorage().get().getCatalogProperties();
LakeStorageUtils.getLakeStorageInfo(connection.getConfiguration())
.getCatalogProperties();
lakeCatalog = new LakeCatalog(catalogName, catalogProperties, classLoader);
} catch (Exception e) {
throw new FlussRuntimeException("Failed to init paimon catalog.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.alibaba.fluss.connector.flink.lakehouse;

import com.alibaba.fluss.client.Connection;
import com.alibaba.fluss.client.admin.Admin;
import com.alibaba.fluss.client.table.Table;
import com.alibaba.fluss.connector.flink.lakehouse.paimon.reader.PaimonSnapshotAndLogSplitScanner;
import com.alibaba.fluss.connector.flink.lakehouse.paimon.reader.PaimonSnapshotScanner;
Expand All @@ -27,6 +26,7 @@
import com.alibaba.fluss.connector.flink.source.split.SourceSplitBase;
import com.alibaba.fluss.lakehouse.LakeStorageInfo;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.utils.lakehouse.LakeStorageUtils;

import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
Expand Down Expand Up @@ -120,18 +120,16 @@ private FileStoreTable getFileStoreTable() {
return fileStoreTable;
}

try (Admin admin = connection.getAdmin()) {
LakeStorageInfo dataLakeInfo = admin.describeLakeStorage().get();
try (Catalog paimonCatalog =
FlinkCatalogFactory.createPaimonCatalog(
Options.fromMap(dataLakeInfo.getCatalogProperties()))) {
fileStoreTable =
(FileStoreTable)
paimonCatalog.getTable(
Identifier.create(
tablePath.getDatabaseName(),
tablePath.getTableName()));
}
LakeStorageInfo dataLakeInfo =
LakeStorageUtils.getLakeStorageInfo(connection.getConfiguration());
try (Catalog paimonCatalog =
FlinkCatalogFactory.createPaimonCatalog(
Options.fromMap(dataLakeInfo.getCatalogProperties()))) {
fileStoreTable =
(FileStoreTable)
paimonCatalog.getTable(
Identifier.create(
tablePath.getDatabaseName(), tablePath.getTableName()));
return fileStoreTable;
} catch (Exception e) {
throw new FlinkRuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import com.alibaba.fluss.rpc.RpcGateway;
import com.alibaba.fluss.rpc.messages.DatabaseExistsRequest;
import com.alibaba.fluss.rpc.messages.DatabaseExistsResponse;
import com.alibaba.fluss.rpc.messages.DescribeLakeStorageRequest;
import com.alibaba.fluss.rpc.messages.DescribeLakeStorageResponse;
import com.alibaba.fluss.rpc.messages.GetDatabaseInfoRequest;
import com.alibaba.fluss.rpc.messages.GetDatabaseInfoResponse;
import com.alibaba.fluss.rpc.messages.GetFileSystemSecurityTokenRequest;
Expand Down Expand Up @@ -179,15 +177,6 @@ CompletableFuture<GetFileSystemSecurityTokenResponse> getFileSystemSecurityToken
CompletableFuture<ListPartitionInfosResponse> listPartitionInfos(
ListPartitionInfosRequest request);

/**
* Describe the lake storage used for Fluss.
*
* @return a future returns lake storage info
*/
@RPC(api = ApiKeys.DESCRIBE_LAKE_STORAGE)
CompletableFuture<DescribeLakeStorageResponse> describeLakeStorage(
DescribeLakeStorageRequest request);

/**
* Get the latest lake snapshot for the given table.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public enum ApiKeys {
NOTIFY_KV_SNAPSHOT_OFFSET(1029, 0, 0, PRIVATE),
COMMIT_LAKE_TABLE_SNAPSHOT(1030, 0, 0, PRIVATE),
NOTIFY_LAKE_TABLE_OFFSET(1031, 0, 0, PRIVATE),
DESCRIBE_LAKE_STORAGE(1032, 0, 0, PUBLIC),
GET_LATEST_LAKE_SNAPSHOT(1033, 0, 0, PUBLIC),
LIMIT_SCAN(1034, 0, 0, PUBLIC),
PREFIX_LOOKUP(1035, 0, 0, PUBLIC),
Expand Down
7 changes: 0 additions & 7 deletions fluss-rpc/src/main/proto/FlussApi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -359,13 +359,6 @@ message GetFileSystemSecurityTokenResponse {
repeated PbKeyValue addition_info = 4;
}

message DescribeLakeStorageRequest {
}

message DescribeLakeStorageResponse {
required PbLakeStorageInfo lakehouse_storage_info = 1;
}

// init writer request and response
message InitWriterRequest {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@
import com.alibaba.fluss.rpc.messages.ApiVersionsResponse;
import com.alibaba.fluss.rpc.messages.DatabaseExistsRequest;
import com.alibaba.fluss.rpc.messages.DatabaseExistsResponse;
import com.alibaba.fluss.rpc.messages.DescribeLakeStorageRequest;
import com.alibaba.fluss.rpc.messages.DescribeLakeStorageResponse;
import com.alibaba.fluss.rpc.messages.GetDatabaseInfoRequest;
import com.alibaba.fluss.rpc.messages.GetDatabaseInfoResponse;
import com.alibaba.fluss.rpc.messages.GetFileSystemSecurityTokenRequest;
Expand Down Expand Up @@ -87,14 +85,14 @@
import com.alibaba.fluss.server.metadata.ServerMetadataCache;
import com.alibaba.fluss.server.metadata.TableMetadataInfo;
import com.alibaba.fluss.server.tablet.TabletService;
import com.alibaba.fluss.server.utils.LakeStorageUtils;
import com.alibaba.fluss.server.utils.RpcMessageUtils;
import com.alibaba.fluss.server.zk.ZooKeeperClient;
import com.alibaba.fluss.server.zk.data.BucketAssignment;
import com.alibaba.fluss.server.zk.data.BucketSnapshot;
import com.alibaba.fluss.server.zk.data.LakeTableSnapshot;
import com.alibaba.fluss.server.zk.data.LeaderAndIsr;
import com.alibaba.fluss.server.zk.data.TableAssignment;
import com.alibaba.fluss.utils.lakehouse.LakeStorageUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -413,17 +411,6 @@ public CompletableFuture<ListPartitionInfosResponse> listPartitionInfos(
RpcMessageUtils.toListPartitionInfosResponse(partitionNameAndIds));
}

@Override
public CompletableFuture<DescribeLakeStorageResponse> describeLakeStorage(
DescribeLakeStorageRequest request) {
if (lakeStorageInfo == null) {
throw new LakeStorageNotConfiguredException("Lake storage is not configured.");
}

return CompletableFuture.completedFuture(
RpcMessageUtils.makeDescribeLakeStorageResponse(lakeStorageInfo));
}

@Override
public CompletableFuture<GetLatestLakeSnapshotResponse> getLatestLakeSnapshot(
GetLatestLakeSnapshotRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import com.alibaba.fluss.rpc.messages.CommitKvSnapshotRequest;
import com.alibaba.fluss.rpc.messages.CommitLakeTableSnapshotRequest;
import com.alibaba.fluss.rpc.messages.CommitRemoteLogManifestRequest;
import com.alibaba.fluss.rpc.messages.DescribeLakeStorageResponse;
import com.alibaba.fluss.rpc.messages.FetchLogRequest;
import com.alibaba.fluss.rpc.messages.FetchLogResponse;
import com.alibaba.fluss.rpc.messages.GetFileSystemSecurityTokenResponse;
Expand Down Expand Up @@ -1227,13 +1226,6 @@ public static NotifyLakeTableOffsetData getNotifyLakeTableOffset(
notifyLakeTableOffsetRequest.getCoordinatorEpoch(), lakeBucketOffsetMap);
}

public static DescribeLakeStorageResponse makeDescribeLakeStorageResponse(
LakeStorageInfo lakeStorageInfo) {
DescribeLakeStorageResponse describeLakeStorageResponse = new DescribeLakeStorageResponse();
describeLakeStorageResponse.setLakehouseStorageInfo(toPbLakeStorageInfo(lakeStorageInfo));
return describeLakeStorageResponse;
}

public static GetLatestLakeSnapshotResponse makeGetLatestLakeSnapshotResponse(
long tableId, LakeStorageInfo lakeStorageInfo, LakeTableSnapshot lakeTableSnapshot) {
GetLatestLakeSnapshotResponse getLakeTableSnapshotResponse =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
import com.alibaba.fluss.rpc.messages.CreateTableResponse;
import com.alibaba.fluss.rpc.messages.DatabaseExistsRequest;
import com.alibaba.fluss.rpc.messages.DatabaseExistsResponse;
import com.alibaba.fluss.rpc.messages.DescribeLakeStorageRequest;
import com.alibaba.fluss.rpc.messages.DescribeLakeStorageResponse;
import com.alibaba.fluss.rpc.messages.DropDatabaseRequest;
import com.alibaba.fluss.rpc.messages.DropDatabaseResponse;
import com.alibaba.fluss.rpc.messages.DropTableRequest;
Expand Down Expand Up @@ -124,12 +122,6 @@ public CompletableFuture<DropTableResponse> dropTable(DropTableRequest request)
throw new UnsupportedOperationException();
}

@Override
public CompletableFuture<DescribeLakeStorageResponse> describeLakeStorage(
DescribeLakeStorageRequest request) {
throw new UnsupportedOperationException();
}

@Override
public CompletableFuture<GetLatestLakeSnapshotResponse> getLatestLakeSnapshot(
GetLatestLakeSnapshotRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import com.alibaba.fluss.rpc.messages.ApiVersionsResponse;
import com.alibaba.fluss.rpc.messages.DatabaseExistsRequest;
import com.alibaba.fluss.rpc.messages.DatabaseExistsResponse;
import com.alibaba.fluss.rpc.messages.DescribeLakeStorageRequest;
import com.alibaba.fluss.rpc.messages.DescribeLakeStorageResponse;
import com.alibaba.fluss.rpc.messages.FetchLogRequest;
import com.alibaba.fluss.rpc.messages.FetchLogResponse;
import com.alibaba.fluss.rpc.messages.GetDatabaseInfoRequest;
Expand Down Expand Up @@ -146,12 +144,6 @@ public CompletableFuture<ListPartitionInfosResponse> listPartitionInfos(
throw new UnsupportedOperationException();
}

@Override
public CompletableFuture<DescribeLakeStorageResponse> describeLakeStorage(
DescribeLakeStorageRequest request) {
throw new UnsupportedOperationException();
}

@Override
public CompletableFuture<GetLatestLakeSnapshotResponse> getLatestLakeSnapshot(
GetLatestLakeSnapshotRequest request) {
Expand Down

0 comments on commit 71497c2

Please sign in to comment.