Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package com.alibaba.fluss.lake.lakestorage;

import com.alibaba.fluss.annotation.PublicEvolving;
import com.alibaba.fluss.lake.source.LakeSource;
import com.alibaba.fluss.lake.writer.LakeTieringFactory;
import com.alibaba.fluss.metadata.TablePath;

/**
* The LakeStorage interface defines how to implement lakehouse storage system such as Paimon and
Expand All @@ -38,4 +40,14 @@ public interface LakeStorage {

/** Create lake catalog. */
LakeCatalog createLakeCatalog();

/**
* Creates a lake source instance for reading lakehouse data from the specified table path. The
* lake source provides capabilities for split planning and record reading, enabling efficient
* distributed processing of lakehouse data.
*
* @param tablePath the logical path identifying the table in the lakehouse storage
* @return a configured lake source instance for the specified table
*/
LakeSource<?> createLakeSource(TablePath tablePath);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.exception.TableAlreadyExistException;
import com.alibaba.fluss.lake.source.LakeSource;
import com.alibaba.fluss.lake.writer.LakeTieringFactory;
import com.alibaba.fluss.metadata.TableDescriptor;
import com.alibaba.fluss.metadata.TablePath;
Expand Down Expand Up @@ -116,5 +117,10 @@ public LakeCatalog createLakeCatalog() {
return new ClassLoaderFixingLakeCatalog(inner.createLakeCatalog(), loader);
}
}

@Override
public LakeSource<?> createLakeSource(TablePath tablePath) {
return inner.createLakeSource(tablePath);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.lake.source;

import com.alibaba.fluss.annotation.PublicEvolving;
import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
import com.alibaba.fluss.predicate.Predicate;

import java.io.IOException;
import java.io.Serializable;
import java.util.List;

/**
* A generic interface for lake data sources that defines how to plan splits and read data. Any data
* lake format supporting reading from data tiered in lake as Fluss records should implement this
* interface.
*
* <p>This interface provides methods for projection, filtering, limiting to enable query engine to
* push to lake source. Implementations must ensure that split planning and record reading
* operations properly account for these pushed-down operations during execution.
*
* @param <Split> The type of data split, which must extend {@link LakeSplit}
* @since 0.8
*/
@PublicEvolving
public interface LakeSource<Split extends LakeSplit> extends Serializable {

/**
* Applies column projection to the data source. it provides the field index paths that should
* be used for a projection. The indices are 0-based and support fields within (possibly nested)
* structures.
*
* <p>For nested, given the following SQL, CREATE TABLE t (i INT, r ROW < d DOUBLE, b BOOLEAN>,
* s STRING); SELECT s, r.d FROM t; the project will be [[2], [1, 0]]
*/
void withProject(int[][] project);

/** Applies a row limit to the lake source. */
void withLimit(int limit);

/** Applies filters to the lake source. */
FilterPushDownResult withFilters(List<Predicate> predicates);

/**
* Creates a planner for plan splits to be read.
*
* @param context The planning context providing necessary planning information
* @return A planner instance for this lake source
* @throws IOException if an error occurs during planner creation
*/
Planner<Split> createPlanner(PlannerContext context) throws IOException;

/**
* Creates a record reader for reading data from the lake source for the specified split.
*
* @param context The reader context containing the split to be read
* @return A record reader instance for the given split
* @throws IOException if an error occurs during reader creation
*/
RecordReader createRecordReader(ReaderContext<Split> context) throws IOException;

/**
* Returns the serializer for the data split, used to transfer split information in distributed
* environment.
*
* @return The serializer for the split
*/
SimpleVersionedSerializer<Split> getSplitSerializer();

/**
* Context interface for planners, providing the snapshot id of the table in data-lake to plan
* splits.
*/
interface PlannerContext extends Serializable {
long snapshotId();
}

/**
* Context interface for record readers, providing access to the lake split being read.
*
* @param <Split> The type of lake split
*/
interface ReaderContext<Split extends LakeSplit> extends Serializable {
Split lakeSplit();
}

/**
* Represents the result of a filter push down operation to lake source, indicating which
* predicates were accepted by the source and which remain to be evaluated.
*
* @since 0.8
*/
@PublicEvolving
final class FilterPushDownResult {
private final List<Predicate> acceptedPredicates;
private final List<Predicate> remainingPredicates;

private FilterPushDownResult(
List<Predicate> acceptedPredicates, List<Predicate> remainingPredicates) {
this.acceptedPredicates = acceptedPredicates;
this.remainingPredicates = remainingPredicates;
}

/**
* Creates a new FilterPushDownResult instance.
*
* @param acceptedPredicates The accepted predicates
* @param remainingPredicates The remaining predicates
* @return A new FilterPushDownResult instance
*/
public static FilterPushDownResult of(
List<Predicate> acceptedPredicates, List<Predicate> remainingPredicates) {
return new FilterPushDownResult(acceptedPredicates, remainingPredicates);
}

/**
* Returns the predicates that were accepted by the source.
*
* @return The list of accepted predicates
*/
public List<Predicate> acceptedPredicates() {
return acceptedPredicates;
}

/**
* Returns the predicates that remain to be evaluated.
*
* @return The list of remaining predicates
*/
public List<Predicate> remainingPredicates() {
return remainingPredicates;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.lake.source;

import com.alibaba.fluss.annotation.PublicEvolving;

import java.util.List;

/**
* Represents a logical partition or segment of data in data-lake.
*
* @since 0.8
*/
@PublicEvolving
public interface LakeSplit {

/**
* Returns the bucket id for this data split. Any data split in lake must belong to a Fluss
* bucket. The bucket id is used to aggregate splits that in same Fluss bucket into same reader
* in bucket-aware table, such primary key table, log table with pre-defined bucket keys. If
* it's not bucket-aware table, it's also feasible to return -1 directly for all data splits.
*
* @return the bucket id
*/
int bucket();

/**
* Returns the hierarchical partition values for this split, or an empty list if the split
* doesn't belong to a specific partition in non-partitioned table.
*
* <p>The returned list represents the complete partition path, with each element corresponding
* to one level of the partitioning hierarchy in order. For example, in a table partitioned by
* {@code dt=20250101/hr=12}, this method would return {@code ["20250101", "12"]}.
*
* <p>The list size should match the table's partition column count, and each element's position
* corresponds to the declared partition column order. Values should be in their
* string-represented form as they would appear in the filesystem path.
*
* @return the resolved partition values specification, or {@code null} if this split doesn't
* belong to a specific partition in non-partitioned table.
*/
List<String> partition();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.lake.source;

import com.alibaba.fluss.annotation.PublicEvolving;

import java.io.IOException;
import java.util.List;

/**
* A planner interface for generating readable splits for lake data sources.
*
* <p>Implementations of this interface are responsible for determining how to divide the data into
* manageable splits that can be read in parallel. The planning should consider the pushed-down
* optimizations (filters, limits, etc.) from {@link LakeSource}.
*
* @param <Split> the type of data split this planner generates, must extend {@link LakeSplit}
* @since 0.8
*/
@PublicEvolving
public interface Planner<Split extends LakeSplit> {

/**
* Plans and generates a list of readable data splits in parallel.
*
* @return the list of readable data splits
* @throws IOException if an I/O error occurs
*/
List<Split> plan() throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.lake.source;

import com.alibaba.fluss.annotation.PublicEvolving;
import com.alibaba.fluss.record.LogRecord;
import com.alibaba.fluss.utils.CloseableIterator;

import java.io.IOException;

/**
* An interface for reading records from {@link LakeSplit}.
*
* <p>Implementations of this interface provide an iterator-style access to records, allowing
* efficient sequential reading of potentially large datasets without loading all data into memory
* at once. The reading should consider the pushed-down optimizations (project, filters, limits,
* etc.) from {@link LakeSource}.
*
* @since 0.8
*/
@PublicEvolving
public interface RecordReader {

/**
* Read a {@link LakeSplit} into a closeable iterator.
*
* @return the closeable iterator of records
* @throws IOException if an I/O error occurs
*/
CloseableIterator<LogRecord> read() throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.lake.source;

import com.alibaba.fluss.annotation.PublicEvolving;
import com.alibaba.fluss.row.InternalRow;

import java.util.Comparator;

/**
* A specialized {@link RecordReader} that produces records in a defined sorted order.
*
* <p>Extends the basic record reading capability with sorting semantics, ensuring that records are
* returned according to a specified ordering.
*
* <p>Implementations must guarantee that the {@link #read()} method returns records in the order
* defined by the comparator from {@link #order()}.
*
* <p>Note: This is mainly used for union read primary key table since we will do sort merge records
* in lake and fluss. The records in primary key table for lake may should implement this method for
* union read with a better performance.
*
* @since 0.8
*/
@PublicEvolving
public interface SortedRecordReader extends RecordReader {

/**
* Returns the comparator that defines the sort order of the records.
*
* @return a non-null comparator defining the sort order of the records
*/
Comparator<InternalRow> order();
}
Loading
Loading