diff --git a/fluss-common/src/main/java/com/alibaba/fluss/lake/lakestorage/LakeStorage.java b/fluss-common/src/main/java/com/alibaba/fluss/lake/lakestorage/LakeStorage.java
index 64b50b57f7..9d6331d5e9 100644
--- a/fluss-common/src/main/java/com/alibaba/fluss/lake/lakestorage/LakeStorage.java
+++ b/fluss-common/src/main/java/com/alibaba/fluss/lake/lakestorage/LakeStorage.java
@@ -18,7 +18,9 @@
package com.alibaba.fluss.lake.lakestorage;
import com.alibaba.fluss.annotation.PublicEvolving;
+import com.alibaba.fluss.lake.source.LakeSource;
import com.alibaba.fluss.lake.writer.LakeTieringFactory;
+import com.alibaba.fluss.metadata.TablePath;
/**
* The LakeStorage interface defines how to implement lakehouse storage system such as Paimon and
@@ -38,4 +40,14 @@ public interface LakeStorage {
/** Create lake catalog. */
LakeCatalog createLakeCatalog();
+
+ /**
+ * Creates a lake source instance for reading lakehouse data from the specified table path. The
+ * lake source provides capabilities for split planning and record reading, enabling efficient
+ * distributed processing of lakehouse data.
+ *
+ * @param tablePath the logical path identifying the table in the lakehouse storage
+ * @return a configured lake source instance for the specified table
+ */
+ LakeSource> createLakeSource(TablePath tablePath);
}
diff --git a/fluss-common/src/main/java/com/alibaba/fluss/lake/lakestorage/PluginLakeStorageWrapper.java b/fluss-common/src/main/java/com/alibaba/fluss/lake/lakestorage/PluginLakeStorageWrapper.java
index fd17fe2029..457f6747dd 100644
--- a/fluss-common/src/main/java/com/alibaba/fluss/lake/lakestorage/PluginLakeStorageWrapper.java
+++ b/fluss-common/src/main/java/com/alibaba/fluss/lake/lakestorage/PluginLakeStorageWrapper.java
@@ -19,6 +19,7 @@
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.exception.TableAlreadyExistException;
+import com.alibaba.fluss.lake.source.LakeSource;
import com.alibaba.fluss.lake.writer.LakeTieringFactory;
import com.alibaba.fluss.metadata.TableDescriptor;
import com.alibaba.fluss.metadata.TablePath;
@@ -116,5 +117,10 @@ public LakeCatalog createLakeCatalog() {
return new ClassLoaderFixingLakeCatalog(inner.createLakeCatalog(), loader);
}
}
+
+ @Override
+ public LakeSource> createLakeSource(TablePath tablePath) {
+ return inner.createLakeSource(tablePath);
+ }
}
}
diff --git a/fluss-common/src/main/java/com/alibaba/fluss/lake/source/LakeSource.java b/fluss-common/src/main/java/com/alibaba/fluss/lake/source/LakeSource.java
new file mode 100644
index 0000000000..08c65cd9ef
--- /dev/null
+++ b/fluss-common/src/main/java/com/alibaba/fluss/lake/source/LakeSource.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.source;
+
+import com.alibaba.fluss.annotation.PublicEvolving;
+import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
+import com.alibaba.fluss.predicate.Predicate;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * A generic interface for lake data sources that defines how to plan splits and read data. Any data
+ * lake format supporting reading from data tiered in lake as Fluss records should implement this
+ * interface.
+ *
+ *
This interface provides methods for projection, filtering, limiting to enable query engine to
+ * push to lake source. Implementations must ensure that split planning and record reading
+ * operations properly account for these pushed-down operations during execution.
+ *
+ * @param The type of data split, which must extend {@link LakeSplit}
+ * @since 0.8
+ */
+@PublicEvolving
+public interface LakeSource extends Serializable {
+
+ /**
+ * Applies column projection to the data source. it provides the field index paths that should
+ * be used for a projection. The indices are 0-based and support fields within (possibly nested)
+ * structures.
+ *
+ * For nested, given the following SQL, CREATE TABLE t (i INT, r ROW < d DOUBLE, b BOOLEAN>,
+ * s STRING); SELECT s, r.d FROM t; the project will be [[2], [1, 0]]
+ */
+ void withProject(int[][] project);
+
+ /** Applies a row limit to the lake source. */
+ void withLimit(int limit);
+
+ /** Applies filters to the lake source. */
+ FilterPushDownResult withFilters(List predicates);
+
+ /**
+ * Creates a planner for plan splits to be read.
+ *
+ * @param context The planning context providing necessary planning information
+ * @return A planner instance for this lake source
+ * @throws IOException if an error occurs during planner creation
+ */
+ Planner createPlanner(PlannerContext context) throws IOException;
+
+ /**
+ * Creates a record reader for reading data from the lake source for the specified split.
+ *
+ * @param context The reader context containing the split to be read
+ * @return A record reader instance for the given split
+ * @throws IOException if an error occurs during reader creation
+ */
+ RecordReader createRecordReader(ReaderContext context) throws IOException;
+
+ /**
+ * Returns the serializer for the data split, used to transfer split information in distributed
+ * environment.
+ *
+ * @return The serializer for the split
+ */
+ SimpleVersionedSerializer getSplitSerializer();
+
+ /**
+ * Context interface for planners, providing the snapshot id of the table in data-lake to plan
+ * splits.
+ */
+ interface PlannerContext extends Serializable {
+ long snapshotId();
+ }
+
+ /**
+ * Context interface for record readers, providing access to the lake split being read.
+ *
+ * @param The type of lake split
+ */
+ interface ReaderContext extends Serializable {
+ Split lakeSplit();
+ }
+
+ /**
+ * Represents the result of a filter push down operation to lake source, indicating which
+ * predicates were accepted by the source and which remain to be evaluated.
+ *
+ * @since 0.8
+ */
+ @PublicEvolving
+ final class FilterPushDownResult {
+ private final List acceptedPredicates;
+ private final List remainingPredicates;
+
+ private FilterPushDownResult(
+ List acceptedPredicates, List remainingPredicates) {
+ this.acceptedPredicates = acceptedPredicates;
+ this.remainingPredicates = remainingPredicates;
+ }
+
+ /**
+ * Creates a new FilterPushDownResult instance.
+ *
+ * @param acceptedPredicates The accepted predicates
+ * @param remainingPredicates The remaining predicates
+ * @return A new FilterPushDownResult instance
+ */
+ public static FilterPushDownResult of(
+ List acceptedPredicates, List remainingPredicates) {
+ return new FilterPushDownResult(acceptedPredicates, remainingPredicates);
+ }
+
+ /**
+ * Returns the predicates that were accepted by the source.
+ *
+ * @return The list of accepted predicates
+ */
+ public List acceptedPredicates() {
+ return acceptedPredicates;
+ }
+
+ /**
+ * Returns the predicates that remain to be evaluated.
+ *
+ * @return The list of remaining predicates
+ */
+ public List remainingPredicates() {
+ return remainingPredicates;
+ }
+ }
+}
diff --git a/fluss-common/src/main/java/com/alibaba/fluss/lake/source/LakeSplit.java b/fluss-common/src/main/java/com/alibaba/fluss/lake/source/LakeSplit.java
new file mode 100644
index 0000000000..99e375ef97
--- /dev/null
+++ b/fluss-common/src/main/java/com/alibaba/fluss/lake/source/LakeSplit.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.source;
+
+import com.alibaba.fluss.annotation.PublicEvolving;
+
+import java.util.List;
+
+/**
+ * Represents a logical partition or segment of data in data-lake.
+ *
+ * @since 0.8
+ */
+@PublicEvolving
+public interface LakeSplit {
+
+ /**
+ * Returns the bucket id for this data split. Any data split in lake must belong to a Fluss
+ * bucket. The bucket id is used to aggregate splits that in same Fluss bucket into same reader
+ * in bucket-aware table, such primary key table, log table with pre-defined bucket keys. If
+ * it's not bucket-aware table, it's also feasible to return -1 directly for all data splits.
+ *
+ * @return the bucket id
+ */
+ int bucket();
+
+ /**
+ * Returns the hierarchical partition values for this split, or an empty list if the split
+ * doesn't belong to a specific partition in non-partitioned table.
+ *
+ * The returned list represents the complete partition path, with each element corresponding
+ * to one level of the partitioning hierarchy in order. For example, in a table partitioned by
+ * {@code dt=20250101/hr=12}, this method would return {@code ["20250101", "12"]}.
+ *
+ *
The list size should match the table's partition column count, and each element's position
+ * corresponds to the declared partition column order. Values should be in their
+ * string-represented form as they would appear in the filesystem path.
+ *
+ * @return the resolved partition values specification, or {@code null} if this split doesn't
+ * belong to a specific partition in non-partitioned table.
+ */
+ List partition();
+}
diff --git a/fluss-common/src/main/java/com/alibaba/fluss/lake/source/Planner.java b/fluss-common/src/main/java/com/alibaba/fluss/lake/source/Planner.java
new file mode 100644
index 0000000000..13e1be11a5
--- /dev/null
+++ b/fluss-common/src/main/java/com/alibaba/fluss/lake/source/Planner.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.source;
+
+import com.alibaba.fluss.annotation.PublicEvolving;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A planner interface for generating readable splits for lake data sources.
+ *
+ * Implementations of this interface are responsible for determining how to divide the data into
+ * manageable splits that can be read in parallel. The planning should consider the pushed-down
+ * optimizations (filters, limits, etc.) from {@link LakeSource}.
+ *
+ * @param the type of data split this planner generates, must extend {@link LakeSplit}
+ * @since 0.8
+ */
+@PublicEvolving
+public interface Planner {
+
+ /**
+ * Plans and generates a list of readable data splits in parallel.
+ *
+ * @return the list of readable data splits
+ * @throws IOException if an I/O error occurs
+ */
+ List plan() throws IOException;
+}
diff --git a/fluss-common/src/main/java/com/alibaba/fluss/lake/source/RecordReader.java b/fluss-common/src/main/java/com/alibaba/fluss/lake/source/RecordReader.java
new file mode 100644
index 0000000000..e6f3d68386
--- /dev/null
+++ b/fluss-common/src/main/java/com/alibaba/fluss/lake/source/RecordReader.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.source;
+
+import com.alibaba.fluss.annotation.PublicEvolving;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.utils.CloseableIterator;
+
+import java.io.IOException;
+
+/**
+ * An interface for reading records from {@link LakeSplit}.
+ *
+ * Implementations of this interface provide an iterator-style access to records, allowing
+ * efficient sequential reading of potentially large datasets without loading all data into memory
+ * at once. The reading should consider the pushed-down optimizations (project, filters, limits,
+ * etc.) from {@link LakeSource}.
+ *
+ * @since 0.8
+ */
+@PublicEvolving
+public interface RecordReader {
+
+ /**
+ * Read a {@link LakeSplit} into a closeable iterator.
+ *
+ * @return the closeable iterator of records
+ * @throws IOException if an I/O error occurs
+ */
+ CloseableIterator read() throws IOException;
+}
diff --git a/fluss-common/src/main/java/com/alibaba/fluss/lake/source/SortedRecordReader.java b/fluss-common/src/main/java/com/alibaba/fluss/lake/source/SortedRecordReader.java
new file mode 100644
index 0000000000..f695896839
--- /dev/null
+++ b/fluss-common/src/main/java/com/alibaba/fluss/lake/source/SortedRecordReader.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.source;
+
+import com.alibaba.fluss.annotation.PublicEvolving;
+import com.alibaba.fluss.row.InternalRow;
+
+import java.util.Comparator;
+
+/**
+ * A specialized {@link RecordReader} that produces records in a defined sorted order.
+ *
+ * Extends the basic record reading capability with sorting semantics, ensuring that records are
+ * returned according to a specified ordering.
+ *
+ *
Implementations must guarantee that the {@link #read()} method returns records in the order
+ * defined by the comparator from {@link #order()}.
+ *
+ *
Note: This is mainly used for union read primary key table since we will do sort merge records
+ * in lake and fluss. The records in primary key table for lake may should implement this method for
+ * union read with a better performance.
+ *
+ * @since 0.8
+ */
+@PublicEvolving
+public interface SortedRecordReader extends RecordReader {
+
+ /**
+ * Returns the comparator that defines the sort order of the records.
+ *
+ * @return a non-null comparator defining the sort order of the records
+ */
+ Comparator order();
+}
diff --git a/fluss-common/src/test/java/com/alibaba/fluss/lake/lakestorage/LakeStorageTest.java b/fluss-common/src/test/java/com/alibaba/fluss/lake/lakestorage/LakeStorageTest.java
index 4c34923374..9c8ae73d42 100644
--- a/fluss-common/src/test/java/com/alibaba/fluss/lake/lakestorage/LakeStorageTest.java
+++ b/fluss-common/src/test/java/com/alibaba/fluss/lake/lakestorage/LakeStorageTest.java
@@ -18,6 +18,7 @@
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.exception.TableAlreadyExistException;
+import com.alibaba.fluss.lake.source.LakeSource;
import com.alibaba.fluss.lake.writer.LakeTieringFactory;
import com.alibaba.fluss.metadata.TableDescriptor;
import com.alibaba.fluss.metadata.TablePath;
@@ -131,6 +132,11 @@ public TestPaimonLakeStorage() {}
public TestPaimonLakeCatalog createLakeCatalog() {
return new TestPaimonLakeCatalog();
}
+
+ @Override
+ public LakeSource> createLakeSource(TablePath tablePath) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
}
private static class TestPaimonLakeCatalog implements LakeCatalog {
diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeStorage.java b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeStorage.java
index 932dd2b9e0..fab753afed 100644
--- a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeStorage.java
+++ b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeStorage.java
@@ -20,7 +20,9 @@
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.lake.lakestorage.LakeCatalog;
import com.alibaba.fluss.lake.lakestorage.LakeStorage;
+import com.alibaba.fluss.lake.source.LakeSource;
import com.alibaba.fluss.lake.writer.LakeTieringFactory;
+import com.alibaba.fluss.metadata.TablePath;
/** Iceberg implementation of {@link LakeStorage}. */
public class IcebergLakeStorage implements LakeStorage {
@@ -40,4 +42,9 @@ public IcebergLakeStorage(Configuration configuration) {
public LakeCatalog createLakeCatalog() {
throw new UnsupportedOperationException("Not implemented");
}
+
+ @Override
+ public LakeSource> createLakeSource(TablePath tablePath) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
}
diff --git a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceLakeStorage.java b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceLakeStorage.java
index 11e6fef289..5864ccf77f 100644
--- a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceLakeStorage.java
+++ b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceLakeStorage.java
@@ -21,7 +21,9 @@
import com.alibaba.fluss.lake.lakestorage.LakeStorage;
import com.alibaba.fluss.lake.lance.tiering.LanceCommittable;
import com.alibaba.fluss.lake.lance.tiering.LanceWriteResult;
+import com.alibaba.fluss.lake.source.LakeSource;
import com.alibaba.fluss.lake.writer.LakeTieringFactory;
+import com.alibaba.fluss.metadata.TablePath;
/** Lance implementation of {@link LakeStorage}. */
public class LanceLakeStorage implements LakeStorage {
@@ -40,4 +42,9 @@ public LakeTieringFactory createLakeTieringF
public LanceLakeCatalog createLakeCatalog() {
return new LanceLakeCatalog(config);
}
+
+ @Override
+ public LakeSource> createLakeSource(TablePath tablePath) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
}
diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/PaimonLakeStorage.java b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/PaimonLakeStorage.java
index a4da3fb549..03da696a0f 100644
--- a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/PaimonLakeStorage.java
+++ b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/PaimonLakeStorage.java
@@ -22,7 +22,9 @@
import com.alibaba.fluss.lake.paimon.tiering.PaimonCommittable;
import com.alibaba.fluss.lake.paimon.tiering.PaimonLakeTieringFactory;
import com.alibaba.fluss.lake.paimon.tiering.PaimonWriteResult;
+import com.alibaba.fluss.lake.source.LakeSource;
import com.alibaba.fluss.lake.writer.LakeTieringFactory;
+import com.alibaba.fluss.metadata.TablePath;
/** Paimon implementation of {@link LakeStorage}. */
public class PaimonLakeStorage implements LakeStorage {
@@ -42,4 +44,9 @@ public LakeTieringFactory createLakeTierin
public PaimonLakeCatalog createLakeCatalog() {
return new PaimonLakeCatalog(paimonConfig);
}
+
+ @Override
+ public LakeSource> createLakeSource(TablePath tablePath) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
}
diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/lakehouse/TestingPaimonStoragePlugin.java b/fluss-server/src/test/java/com/alibaba/fluss/server/lakehouse/TestingPaimonStoragePlugin.java
index b427897610..da9258e5fb 100644
--- a/fluss-server/src/test/java/com/alibaba/fluss/server/lakehouse/TestingPaimonStoragePlugin.java
+++ b/fluss-server/src/test/java/com/alibaba/fluss/server/lakehouse/TestingPaimonStoragePlugin.java
@@ -22,6 +22,7 @@
import com.alibaba.fluss.lake.lakestorage.LakeCatalog;
import com.alibaba.fluss.lake.lakestorage.LakeStorage;
import com.alibaba.fluss.lake.lakestorage.LakeStoragePlugin;
+import com.alibaba.fluss.lake.source.LakeSource;
import com.alibaba.fluss.lake.writer.LakeTieringFactory;
import com.alibaba.fluss.metadata.DataLakeFormat;
import com.alibaba.fluss.metadata.TableDescriptor;
@@ -57,6 +58,11 @@ public static class TestingPaimonLakeStorage implements LakeStorage {
public LakeCatalog createLakeCatalog() {
return new TestingPaimonCatalog();
}
+
+ @Override
+ public LakeSource> createLakeSource(TablePath tablePath) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
}
/** Paimon implementation of LakeCatalog for testing purpose. */
diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml
index 5feb9c8ae1..d4015908b0 100644
--- a/fluss-test-coverage/pom.xml
+++ b/fluss-test-coverage/pom.xml
@@ -324,6 +324,8 @@
com.alibaba.fluss.tools.ci.*
com.alibaba.fluss.predicate.*
+
+ com.alibaba.fluss.lake.source.*
com.alibaba.fluss.dist.DummyClass
com.alibaba.fluss.flink.DummyClass120