Skip to content

Commit c16e78b

Browse files
committed
[client] Support log scanner and writer for multiple tables
1 parent cc89a3d commit c16e78b

45 files changed

Lines changed: 4095 additions & 505 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

fluss-client/src/main/java/org/apache/fluss/client/Connection.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
2121
import org.apache.fluss.client.admin.Admin;
22+
import org.apache.fluss.client.table.MultiTable;
2223
import org.apache.fluss.client.table.Table;
2324
import org.apache.fluss.config.Configuration;
2425
import org.apache.fluss.metadata.TablePath;
@@ -56,6 +57,16 @@ public interface Connection extends AutoCloseable {
5657
/** Retrieve a new Table client to operate data in table. */
5758
Table getTable(TablePath tablePath);
5859

60+
/**
61+
* Retrieve a {@link MultiTable} client to scan and write data across multiple tables in a
62+
* single unified API. Unlike {@link #getTable(TablePath)} which is bound to one table, {@code
63+
* MultiTable} is table-agnostic: tables are identified per scan subscription and per write
64+
* record.
65+
*
66+
* <p>{@link MultiTable} instances are light-weight and NOT thread-safe; obtain per-thread.
67+
*/
68+
MultiTable getMultiTable();
69+
5970
/** Close the connection and release all resources. */
6071
@Override
6172
void close() throws Exception;

fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.apache.fluss.client.lookup.LookupClient;
2323
import org.apache.fluss.client.metadata.MetadataUpdater;
2424
import org.apache.fluss.client.table.FlussTable;
25+
import org.apache.fluss.client.table.MultiTable;
26+
import org.apache.fluss.client.table.MultiTableImpl;
2527
import org.apache.fluss.client.table.Table;
2628
import org.apache.fluss.client.table.scanner.RemoteFileDownloader;
2729
import org.apache.fluss.client.token.DefaultSecurityTokenManager;
@@ -106,6 +108,11 @@ public Table getTable(TablePath tablePath) {
106108
return new FlussTable(this, tablePath, admin.getTableInfo(tablePath).join());
107109
}
108110

111+
@Override
112+
public MultiTable getMultiTable() {
113+
return new MultiTableImpl(this);
114+
}
115+
109116
public MetadataUpdater getMetadataUpdater() {
110117
return metadataUpdater;
111118
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.table;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.client.Connection;
22+
import org.apache.fluss.client.table.scanner.MultiTableScan;
23+
import org.apache.fluss.client.table.writer.MultiTableWrite;
24+
import org.apache.fluss.metadata.TablePath;
25+
26+
/**
27+
* Used to communicate with multiple Fluss tables through a single client. Obtain an instance from
28+
* {@link Connection#getMultiTable()}.
29+
*
30+
* <p>{@code MultiTable} is the cross-table counterpart of {@link Table}. Useful for CDC ingestion,
31+
* cross-table streaming pipelines, and multi-table catalog sinks.
32+
*
33+
* <p>Unlike {@link Connection#getTable(TablePath)} which is bound to one table, {@code MultiTable}
34+
* is table-agnostic: tables are identified per scan subscription and per write record.
35+
*
36+
* <p>{@code MultiTable} instances are light-weight and NOT thread-safe; obtain per-thread.
37+
*
38+
* @since 0.7
39+
*/
40+
@PublicEvolving
41+
public interface MultiTable {
42+
43+
/** Build a scanner that reads data from multiple tables simultaneously. */
44+
MultiTableScan newMultiTableScan();
45+
46+
/** Build a writer that writes data (with change types) to multiple tables. */
47+
MultiTableWrite newMultiTableWrite();
48+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.table;
19+
20+
import org.apache.fluss.annotation.Internal;
21+
import org.apache.fluss.client.FlussConnection;
22+
import org.apache.fluss.client.table.scanner.MultiTableScan;
23+
import org.apache.fluss.client.table.scanner.MultiTableScanImpl;
24+
import org.apache.fluss.client.table.writer.MultiTableWrite;
25+
import org.apache.fluss.client.table.writer.MultiTableWriteImpl;
26+
27+
import static org.apache.fluss.utils.Preconditions.checkNotNull;
28+
29+
/**
30+
* Default implementation of {@link MultiTable}.
31+
*
32+
* <p>{@code MultiTableImpl} is light-weight and NOT thread-safe; the underlying {@link
33+
* FlussConnection} is shared and thread-safe, but each {@code MultiTableImpl} instance should be
34+
* obtained per-thread.
35+
*
36+
* @since 0.7
37+
*/
38+
@Internal
39+
public class MultiTableImpl implements MultiTable {
40+
41+
private final FlussConnection connection;
42+
43+
public MultiTableImpl(FlussConnection connection) {
44+
this.connection = checkNotNull(connection, "connection");
45+
}
46+
47+
@Override
48+
public MultiTableScan newMultiTableScan() {
49+
return new MultiTableScanImpl(connection);
50+
}
51+
52+
@Override
53+
public MultiTableWrite newMultiTableWrite() {
54+
return new MultiTableWriteImpl(connection);
55+
}
56+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.table.scanner;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
22+
/**
23+
* Reads current data from multiple tables through a single client.
24+
*
25+
* <p>NOTE: planned for a future release; declared as a placeholder for forward compatibility. There
26+
* is no concrete implementation today.
27+
*
28+
* @since 0.7
29+
*/
30+
@PublicEvolving
31+
public interface MultiTableBatchScanner extends AutoCloseable {}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.table.scanner;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.client.table.scanner.log.MultiTableLogScanner;
22+
import org.apache.fluss.client.table.writer.MultiTableWriter;
23+
import org.apache.fluss.metadata.Schema;
24+
import org.apache.fluss.metadata.TablePath;
25+
import org.apache.fluss.record.ChangeType;
26+
import org.apache.fluss.row.InternalRow;
27+
28+
import static org.apache.fluss.utils.Preconditions.checkNotNull;
29+
30+
/**
31+
* A record produced by a {@link MultiTableLogScanner}. Wraps an unchanged {@link ScanRecord} and
32+
* enriches it with the source table identity (path, id) and schema (schema object, schema id).
33+
*
34+
* <p>The underlying {@link ScanRecord} structure is NOT modified.
35+
*
36+
* <p>Read-side only. For writes via {@link MultiTableWriter}, use {@link
37+
* org.apache.fluss.client.table.writer.MultiTableWriteRecord} which carries only the write-relevant
38+
* fields.
39+
*
40+
* @since 0.7
41+
*/
42+
@PublicEvolving
43+
public final class MultiTableRecord {
44+
45+
private final TablePath tablePath;
46+
private final long tableId;
47+
private final Schema schema;
48+
private final int schemaId;
49+
private final ScanRecord scanRecord;
50+
51+
public MultiTableRecord(
52+
TablePath tablePath, long tableId, int schemaId, Schema schema, ScanRecord scanRecord) {
53+
this.tablePath = checkNotNull(tablePath, "tablePath");
54+
this.tableId = tableId;
55+
this.schema = checkNotNull(schema, "schema");
56+
this.schemaId = schemaId;
57+
this.scanRecord = checkNotNull(scanRecord, "scanRecord");
58+
}
59+
60+
// ---- table identity ----
61+
62+
public TablePath getTablePath() {
63+
return tablePath;
64+
}
65+
66+
public long getTableId() {
67+
return tableId;
68+
}
69+
70+
// ---- schema info ----
71+
72+
public Schema getSchema() {
73+
return schema;
74+
}
75+
76+
public int getSchemaId() {
77+
return schemaId;
78+
}
79+
80+
// ---- underlying record (unchanged) ----
81+
82+
public ScanRecord getScanRecord() {
83+
return scanRecord;
84+
}
85+
86+
// ---- convenience delegates ----
87+
88+
public long logOffset() {
89+
return scanRecord.logOffset();
90+
}
91+
92+
public long timestamp() {
93+
return scanRecord.timestamp();
94+
}
95+
96+
public ChangeType getChangeType() {
97+
return scanRecord.getChangeType();
98+
}
99+
100+
public InternalRow getRow() {
101+
return scanRecord.getRow();
102+
}
103+
104+
@Override
105+
public String toString() {
106+
return "MultiTableRecord{"
107+
+ "tablePath="
108+
+ tablePath
109+
+ ", tableId="
110+
+ tableId
111+
+ ", schemaId="
112+
+ schemaId
113+
+ ", scanRecord="
114+
+ scanRecord
115+
+ '}';
116+
}
117+
}

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScan.java renamed to fluss-client/src/main/java/org/apache/fluss/client/table/scanner/MultiTableScan.java

Lines changed: 13 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -15,42 +15,27 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.fluss.client.table.scanner.log;
18+
package org.apache.fluss.client.table.scanner;
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
21-
22-
import javax.annotation.Nullable;
21+
import org.apache.fluss.client.table.scanner.log.MultiTableLogScanner;
2322

2423
/**
25-
* Used to describe the operation to scan log data by {@link LogScanner} to a table.
24+
* Builder for a multi-table scanner.
25+
*
26+
* <p>Phase 1 surface intentionally exposes no per-table builder configuration: tables are
27+
* registered dynamically on the created {@link MultiTableLogScanner} when {@code subscribe(...)} is
28+
* first called for a {@code TablePath}. Future phases may add {@code withTable(...)} overloads to
29+
* pre-declare tables together with projection / filter pushdown.
2630
*
27-
* @since 0.1
31+
* @since 0.7
2832
*/
2933
@PublicEvolving
30-
public class LogScan {
31-
32-
/** The projected fields to do projection. No projection if is null. */
33-
@Nullable private final int[] projectedFields;
34-
35-
public LogScan() {
36-
this(null);
37-
}
38-
39-
private LogScan(@Nullable int[] projectedFields) {
40-
this.projectedFields = projectedFields;
41-
}
34+
public interface MultiTableScan {
4235

4336
/**
44-
* Returns a new instance of LogScan description with column projection.
45-
*
46-
* @param projectedFields the projection fields
37+
* Creates a {@link MultiTableLogScanner} to continuously read log data from multiple tables.
38+
* Tables are registered dynamically on first {@code subscribe(...)} call.
4739
*/
48-
public LogScan withProjectedFields(int[] projectedFields) {
49-
return new LogScan(projectedFields);
50-
}
51-
52-
@Nullable
53-
public int[] getProjectedFields() {
54-
return projectedFields;
55-
}
40+
MultiTableLogScanner createLogScanner();
5641
}

0 commit comments

Comments
 (0)