Skip to content

Commit

Permalink
[rpc] Remove describeLakeStorage to get LakeStorageInfo from table pr…
Browse files Browse the repository at this point in the history
…operties
  • Loading branch information
SteNicholas committed Mar 2, 2025
1 parent 76faabc commit f14578e
Show file tree
Hide file tree
Showing 18 changed files with 100 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,25 @@
package com.alibaba.fluss.client.metadata;

import com.alibaba.fluss.annotation.PublicEvolving;
import com.alibaba.fluss.lakehouse.LakeStorageInfo;
import com.alibaba.fluss.metadata.TableBucket;

import java.util.Map;

/**
* A class representing the lake snapshot information of a table. It contains:
* <li>The lake storage info
* <li>The snapshot id and the log offset for each bucket.
*
* @since 0.3
*/
@PublicEvolving
public class LakeSnapshot {

private final LakeStorageInfo lakeStorageInfo;

private final long snapshotId;

// the specific log offset of the snapshot
private final Map<TableBucket, Long> tableBucketsOffset;

public LakeSnapshot(
LakeStorageInfo lakeStorageInfo,
long snapshotId,
Map<TableBucket, Long> tableBucketsOffset) {
this.lakeStorageInfo = lakeStorageInfo;
public LakeSnapshot(long snapshotId, Map<TableBucket, Long> tableBucketsOffset) {
this.snapshotId = snapshotId;
this.tableBucketsOffset = tableBucketsOffset;
}
Expand All @@ -52,10 +44,6 @@ public long getSnapshotId() {
return snapshotId;
}

public LakeStorageInfo getLakeStorageInfo() {
return lakeStorageInfo;
}

public Map<TableBucket, Long> getTableBucketsOffset() {
return tableBucketsOffset;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ public static LakeSnapshot toLakeTableSnapshotInfo(GetLatestLakeSnapshotResponse
new TableBucket(tableId, partitionId, pbLakeSnapshotForBucket.getBucketId());
tableBucketsOffset.put(tableBucket, pbLakeSnapshotForBucket.getLogOffset());
}
return new LakeSnapshot(lakeStorageInfo, snapshotId, tableBucketsOffset);
return new LakeSnapshot(snapshotId, tableBucketsOffset);
}

public static List<FsPathAndFileName> toFsPathAndFileName(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,8 @@ public CatalogBaseTable getTable(ObjectPath objectPath)
objectPath.getDatabaseName(),
tableName.split("\\" + LAKE_TABLE_SPLITTER)[0])));
}
return getLakeTable(objectPath.getDatabaseName(), tableName, tableInfo);
return getLakeTable(
objectPath.getDatabaseName(), tableName, tableInfo.getProperties());
} else {
tableInfo = admin.getTableInfo(tablePath).get();
}
Expand All @@ -294,9 +295,9 @@ public CatalogBaseTable getTable(ObjectPath objectPath)
}

protected CatalogBaseTable getLakeTable(
String databaseName, String tableName, TableInfo tableInfo)
String databaseName, String tableName, Configuration properties)
throws TableNotExistException, CatalogException {
mayInitLakeCatalogCatalog(tableInfo);
mayInitLakeCatalogCatalog(properties);
String[] tableComponents = tableName.split("\\" + LAKE_TABLE_SPLITTER);
if (tableComponents.length == 1) {
// should be pattern like table_name$lake
Expand Down Expand Up @@ -631,13 +632,13 @@ protected TablePath toTablePath(ObjectPath objectPath) {
return TablePath.of(objectPath.getDatabaseName(), objectPath.getObjectName());
}

private void mayInitLakeCatalogCatalog(TableInfo tableInfo) {
private void mayInitLakeCatalogCatalog(Configuration configuration) {
if (lakeCatalog == null) {
synchronized (this) {
if (lakeCatalog == null) {
try {
Map<String, String> catalogProperties =
LakeStorageInfoUtils.getLakeStorageInfo(tableInfo)
LakeStorageInfoUtils.getLakeStorageInfo(configuration)
.getCatalogProperties();
lakeCatalog = new LakeCatalog(catalogName, catalogProperties, classLoader);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
import com.alibaba.fluss.connector.flink.source.enumerator.initializer.OffsetsInitializer;
import com.alibaba.fluss.connector.flink.source.split.LogSplit;
import com.alibaba.fluss.connector.flink.source.split.SourceSplitBase;
import com.alibaba.fluss.connector.flink.utils.LakeStorageInfoUtils;
import com.alibaba.fluss.metadata.PartitionInfo;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.metadata.TableInfo;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
Expand Down Expand Up @@ -57,22 +58,19 @@
*/
public class LakeSplitGenerator {

private final long tableId;
private final TablePath tablePath;
private final TableInfo tableInfo;
private final Admin flussAdmin;
private final OffsetsInitializer.BucketOffsetsRetriever bucketOffsetsRetriever;
private final OffsetsInitializer stoppingOffsetInitializer;
private final int bucketCount;

public LakeSplitGenerator(
long tableId,
TablePath tablePath,
TableInfo tableInfo,
Admin flussAdmin,
OffsetsInitializer.BucketOffsetsRetriever bucketOffsetsRetriever,
OffsetsInitializer stoppingOffsetInitializer,
int bucketCount) {
this.tableId = tableId;
this.tablePath = tablePath;
this.tableInfo = tableInfo;
this.flussAdmin = flussAdmin;
this.bucketOffsetsRetriever = bucketOffsetsRetriever;
this.stoppingOffsetInitializer = stoppingOffsetInitializer;
Expand All @@ -81,16 +79,19 @@ public LakeSplitGenerator(

public List<SourceSplitBase> generateLakeSplits() throws Exception {
// get the file store
LakeSnapshot lakeSnapshotInfo = flussAdmin.getLatestLakeSnapshot(tablePath).get();
LakeSnapshot lakeSnapshotInfo =
flussAdmin.getLatestLakeSnapshot(tableInfo.getTablePath()).get();
FileStoreTable fileStoreTable =
getTable(
lakeSnapshotInfo.getSnapshotId(),
lakeSnapshotInfo.getLakeStorageInfo().getCatalogProperties());
LakeStorageInfoUtils.getLakeStorageInfo(tableInfo.getProperties())
.getCatalogProperties());
boolean isLogTable = fileStoreTable.schema().primaryKeys().isEmpty();
boolean isPartitioned = !fileStoreTable.schema().partitionKeys().isEmpty();

if (isPartitioned) {
List<PartitionInfo> partitionInfos = flussAdmin.listPartitionInfos(tablePath).get();
List<PartitionInfo> partitionInfos =
flussAdmin.listPartitionInfos(tableInfo.getTablePath()).get();
Map<Long, String> partitionNameById =
partitionInfos.stream()
.collect(
Expand Down Expand Up @@ -152,7 +153,8 @@ private List<SourceSplitBase> generateSplit(
generateSplitForLogSnapshot(
fileStoreTable, splitGenerator, partitionId, partitionName));
for (int bucket = 0; bucket < bucketCount; bucket++) {
TableBucket tableBucket = new TableBucket(tableId, partitionId, bucket);
TableBucket tableBucket =
new TableBucket(tableInfo.getTableId(), partitionId, bucket);
Long snapshotLogOffset = tableBucketSnapshotLogOffset.get(tableBucket);
long stoppingOffset = bucketEndOffset.get(bucket);
if (snapshotLogOffset == null) {
Expand All @@ -175,7 +177,8 @@ private List<SourceSplitBase> generateSplit(
} else {
// it's primary key table
for (int bucket = 0; bucket < bucketCount; bucket++) {
TableBucket tableBucket = new TableBucket(tableId, partitionId, bucket);
TableBucket tableBucket =
new TableBucket(tableInfo.getTableId(), partitionId, bucket);
Long snapshotLogOffset = tableBucketSnapshotLogOffset.get(tableBucket);
long stoppingOffset = bucketEndOffset.get(bucket);
splits.add(
Expand Down Expand Up @@ -205,7 +208,7 @@ private List<SourceSplitBase> generateSplitForLogSnapshot(
}
// for snapshot splits, we always use bucket = -1 ad the bucket since we can't get bucket in
// paimon's log table
TableBucket tableBucket = new TableBucket(tableId, partitionId, -1);
TableBucket tableBucket = new TableBucket(tableInfo.getTableId(), partitionId, -1);
// snapshot splits + one log split
for (FileStoreSourceSplit fileStoreSourceSplit : splitGenerator.createSplits(scan.plan())) {
splits.add(new PaimonSnapshotSplit(tableBucket, partitionName, fileStoreSourceSplit));
Expand All @@ -219,7 +222,7 @@ private Map<String, String> getPartitionSpec(
checkState(
partitionKeys.size() == 1,
"Must only one partition key for paimon table %, but got %s, the partition keys are: ",
tablePath,
tableInfo.getTablePath(),
partitionKeys.size(),
partitionKeys.size());
return Collections.singletonMap(partitionKeys.get(0), partitionName);
Expand Down Expand Up @@ -297,7 +300,8 @@ private FileStoreTable getTable(long snapshotId, Map<String, String> catalogProp
return (FileStoreTable)
catalog.getTable(
Identifier.create(
tablePath.getDatabaseName(), tablePath.getTableName()))
tableInfo.getTablePath().getDatabaseName(),
tableInfo.getTablePath().getTableName()))
.copy(
Collections.singletonMap(
CoreOptions.SCAN_SNAPSHOT_ID.key(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ private FileStoreTable getFileStoreTable() {
try (Catalog paimonCatalog =
FlinkCatalogFactory.createPaimonCatalog(
Options.fromMap(
LakeStorageInfoUtils.getLakeStorageInfo(table.getTableInfo())
LakeStorageInfoUtils.getLakeStorageInfo(
table.getTableInfo().getProperties())
.getCatalogProperties()))) {
fileStoreTable =
(FileStoreTable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ public class FlinkSourceEnumerator
private Connection connection;
private Admin flussAdmin;
private BucketOffsetsRetriever bucketOffsetsRetriever;
private long tableId;
private int bucketCount;
private TableInfo tableInfo;

// This flag will be marked as true if periodically partition discovery is disabled AND the
// split initializing has finished.
Expand Down Expand Up @@ -181,10 +180,8 @@ public void start() {
flussAdmin = connection.getAdmin();
bucketOffsetsRetriever = new BucketOffsetsRetrieverImpl(flussAdmin, tablePath);
try {
TableInfo tableInfo = flussAdmin.getTableInfo(tablePath).get();
tableId = tableInfo.getTableId();
tableInfo = flussAdmin.getTableInfo(tablePath).get();
lakeEnabled = tableInfo.getTableConfig().isDataLakeEnabled();
bucketCount = tableInfo.getNumBuckets();
} catch (Exception e) {
throw new FlinkRuntimeException(
String.format("Failed to get table info for %s", tablePath),
Expand Down Expand Up @@ -419,8 +416,9 @@ private List<SourceSplitBase> getLogSplit(
// always assume the bucket is from 0 to bucket num
List<SourceSplitBase> splits = new ArrayList<>();
List<Integer> bucketsNeedInitOffset = new ArrayList<>();
for (int bucketId = 0; bucketId < bucketCount; bucketId++) {
TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId);
for (int bucketId = 0; bucketId < tableInfo.getNumBuckets(); bucketId++) {
TableBucket tableBucket =
new TableBucket(tableInfo.getTableId(), partitionId, bucketId);
if (ignoreTableBucket(tableBucket)) {
continue;
}
Expand All @@ -434,7 +432,10 @@ private List<SourceSplitBase> getLogSplit(
(bucketId, startingOffset) ->
splits.add(
new LogSplit(
new TableBucket(tableId, partitionId, bucketId),
new TableBucket(
tableInfo.getTableId(),
partitionId,
bucketId),
partitionName,
startingOffset)));
}
Expand All @@ -444,12 +445,11 @@ private List<SourceSplitBase> getLogSplit(
private List<SourceSplitBase> getLakeSplit() throws Exception {
LakeSplitGenerator lakeSplitGenerator =
new LakeSplitGenerator(
tableId,
tablePath,
tableInfo,
flussAdmin,
bucketOffsetsRetriever,
stoppingOffsetsInitializer,
bucketCount);
tableInfo.getNumBuckets());
return lakeSplitGenerator.generateLakeSplits();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,45 +17,41 @@
package com.alibaba.fluss.connector.flink.utils;

import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.lakehouse.LakeStorageInfo;
import com.alibaba.fluss.metadata.DataLakeFormat;
import com.alibaba.fluss.metadata.TableInfo;

import java.util.HashMap;
import java.util.Map;

/** Utility class for {@link LakeStorageInfo}. */
public class LakeStorageInfoUtils {

private static final String TABLE_DATALAKE_PAIMON_PREFIX = "table.datalake.paimon.";

public static LakeStorageInfo getLakeStorageInfo(TableInfo tableInfo) {
DataLakeFormat datalakeFormat =
tableInfo.getProperties().get(ConfigOptions.TABLE_DATALAKE_FORMAT);
public static LakeStorageInfo getLakeStorageInfo(Configuration configuration) {
DataLakeFormat datalakeFormat = configuration.get(ConfigOptions.TABLE_DATALAKE_FORMAT);
if (datalakeFormat == null) {
throw new IllegalArgumentException(
String.format(
"The lakehouse storage is not set, please set it by %s",
"The datalake format is not set, please set it by %s",
ConfigOptions.TABLE_DATALAKE_FORMAT.key()));
}

if (datalakeFormat != DataLakeFormat.PAIMON) {
throw new UnsupportedOperationException(
String.format(
"The lakehouse storage %s "
+ " is not supported. Only %s is supported.",
"The datalake format %s " + " is not supported. Only %s is supported.",
datalakeFormat, DataLakeFormat.PAIMON));
}

// currently, extract catalog config
Map<String, String> datalakeConfig = new HashMap<>();
Map<String, String> flussConfig = tableInfo.getCustomProperties().toMap();
Map<String, String> flussConfig = configuration.toMap();
String dataLakePrefix = "table.datalake." + datalakeFormat + ".";
for (Map.Entry<String, String> configEntry : flussConfig.entrySet()) {
String configKey = configEntry.getKey();
String configValue = configEntry.getValue();
if (configKey.startsWith(TABLE_DATALAKE_PAIMON_PREFIX)) {
datalakeConfig.put(
configKey.substring(TABLE_DATALAKE_PAIMON_PREFIX.length()), configValue);
if (configKey.startsWith(dataLakePrefix)) {
datalakeConfig.put(configKey.substring(dataLakePrefix.length()), configValue);
}
}
return new LakeStorageInfo(datalakeFormat.toString(), datalakeConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.alibaba.fluss.config.AutoPartitionTimeUnit;
import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.lakehouse.paimon.sink.PaimonDataBaseSyncSinkBuilder;
import com.alibaba.fluss.metadata.DataLakeFormat;
import com.alibaba.fluss.metadata.Schema;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metadata.TableDescriptor;
Expand Down Expand Up @@ -149,10 +148,7 @@ void testPrimaryKeyTable(boolean isPartitioned) throws Exception {
batchTEnv.executeSql(
String.format("select * from %s$lake$options", tableName)));
assertThat(paimonOptionsRows.toString())
.isEqualTo(
String.format(
"[+I[bucket, 3], +I[bucket-key, a], +I[changelog-producer, input], +I[table.datalake.paimon.metastore, filesystem], +I[table.datalake.paimon.warehouse, %s]]",
warehousePath));
.isEqualTo("[+I[bucket, 3], +I[bucket-key, a], +I[changelog-producer, input]]");

// stop sync database job
jobClient.cancel().get();
Expand Down Expand Up @@ -310,10 +306,7 @@ protected long createPkTable(TablePath tablePath, int bucketNum, boolean isParti
TableDescriptor.Builder tableBuilder =
TableDescriptor.builder()
.distributedBy(bucketNum)
.property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true")
.property(ConfigOptions.TABLE_DATALAKE_FORMAT, DataLakeFormat.PAIMON)
.customProperty("table.datalake.paimon.metastore", "filesystem")
.customProperty("table.datalake.paimon.warehouse", warehousePath);
.property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true");

if (isPartitioned) {
schemaBuilder.column("c", DataTypes.STRING());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class FlinkPaimonTestBase {
protected static Connection conn;
protected static Admin admin;
protected static Configuration clientConf;
protected static String warehousePath;
private static String warehousePath;

private static Configuration initConfig() {
Configuration conf = new Configuration();
Expand Down
Loading

0 comments on commit f14578e

Please sign in to comment.