Skip to content

Commit b8c74ea

Browse files
committed
[lake/hudi] Introduce Hudi lake writer support for tiering tables
1 parent c108fea commit b8c74ea

22 files changed

Lines changed: 2440 additions & 0 deletions

fluss-lake/fluss-lake-hudi/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,12 @@
156156
<scope>provided</scope>
157157
</dependency>
158158

159+
<dependency>
160+
<groupId>org.apache.flink</groupId>
161+
<artifactId>flink-table-runtime</artifactId>
162+
<scope>provided</scope>
163+
</dependency>
164+
159165
<!-- test dependency -->
160166
<dependency>
161167
<groupId>org.apache.fluss</groupId>
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.lake.hudi.tiering;
19+
20+
import org.apache.fluss.config.Configuration;
21+
import org.apache.fluss.lake.hudi.utils.catalog.HudiCatalogUtils;
22+
23+
import org.apache.flink.table.catalog.Catalog;
24+
25+
import java.io.Serializable;
26+
27+
/** Serializable provider for creating Hudi catalogs in tiering subtasks. */
28+
public class HudiCatalogProvider implements Serializable {
29+
30+
private static final long serialVersionUID = 1L;
31+
32+
private final Configuration hudiConfig;
33+
34+
public HudiCatalogProvider(Configuration hudiConfig) {
35+
this.hudiConfig = hudiConfig;
36+
}
37+
38+
public Configuration getHudiConfig() {
39+
return hudiConfig;
40+
}
41+
42+
public Catalog get() {
43+
Catalog hudiCatalog = HudiCatalogUtils.createHudiCatalog(hudiConfig);
44+
hudiCatalog.open();
45+
return hudiCatalog;
46+
}
47+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.lake.hudi.tiering;
19+
20+
import org.apache.fluss.config.Configuration;
21+
import org.apache.fluss.lake.committer.CommitterInitContext;
22+
import org.apache.fluss.lake.committer.LakeCommitter;
23+
import org.apache.fluss.lake.hudi.utils.meta.CkpMetadataProvider;
24+
import org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
25+
import org.apache.fluss.lake.writer.LakeTieringFactory;
26+
import org.apache.fluss.lake.writer.LakeWriter;
27+
import org.apache.fluss.lake.writer.WriterInitContext;
28+
29+
import java.io.IOException;
30+
import java.io.Serializable;
31+
32+
/** Hudi implementation of {@link LakeTieringFactory}. */
33+
public class HudiLakeTieringFactory implements LakeTieringFactory<HudiWriteResult, Serializable> {
34+
35+
private static final long serialVersionUID = 1L;
36+
37+
private final HudiCatalogProvider hudiCatalogProvider;
38+
private final CkpMetadataProvider ckpMetadataProvider;
39+
40+
public HudiLakeTieringFactory(Configuration hudiConfig) {
41+
this.hudiCatalogProvider = new HudiCatalogProvider(hudiConfig);
42+
this.ckpMetadataProvider = new CkpMetadataProvider();
43+
}
44+
45+
@Override
46+
public LakeWriter<HudiWriteResult> createLakeWriter(WriterInitContext writerInitContext)
47+
throws IOException {
48+
return new HudiLakeWriter(hudiCatalogProvider, ckpMetadataProvider, writerInitContext);
49+
}
50+
51+
@Override
52+
public SimpleVersionedSerializer<HudiWriteResult> getWriteResultSerializer() {
53+
return new HudiWriteResultSerializer();
54+
}
55+
56+
@Override
57+
public LakeCommitter<HudiWriteResult, Serializable> createLakeCommitter(
58+
CommitterInitContext committerInitContext) {
59+
throw new UnsupportedOperationException("Hudi lake committer is not implemented yet.");
60+
}
61+
62+
@Override
63+
public SimpleVersionedSerializer<Serializable> getCommittableSerializer() {
64+
throw new UnsupportedOperationException(
65+
"Hudi lake committable serializer is not implemented yet.");
66+
}
67+
}
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.lake.hudi.tiering;
19+
20+
import org.apache.fluss.lake.hudi.tiering.writer.HudiRecordWriter;
21+
import org.apache.fluss.lake.hudi.utils.meta.CkpMetadata;
22+
import org.apache.fluss.lake.hudi.utils.meta.CkpMetadataProvider;
23+
import org.apache.fluss.lake.writer.LakeWriter;
24+
import org.apache.fluss.lake.writer.WriterInitContext;
25+
import org.apache.fluss.metadata.TableInfo;
26+
import org.apache.fluss.record.LogRecord;
27+
28+
import org.apache.flink.configuration.Configuration;
29+
import org.apache.hudi.client.WriteStatus;
30+
import org.apache.hudi.common.model.HoodieTableType;
31+
import org.apache.hudi.common.model.WriteOperationType;
32+
import org.apache.hudi.common.table.HoodieTableMetaClient;
33+
import org.apache.hudi.common.util.CommitUtils;
34+
import org.apache.hudi.configuration.FlinkOptions;
35+
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
37+
38+
import java.io.IOException;
39+
import java.util.HashMap;
40+
import java.util.List;
41+
import java.util.Map;
42+
43+
import static org.apache.fluss.lake.writer.WriterInitContext.UNKNOWN_SPLIT_INDEX;
44+
import static org.apache.fluss.lake.writer.WriterInitContext.UNKNOWN_TIERING_ROUND_TIMESTAMP;
45+
import static org.apache.fluss.utils.Preconditions.checkArgument;
46+
47+
/** Hudi implementation of {@link LakeWriter}. */
48+
public class HudiLakeWriter implements LakeWriter<HudiWriteResult> {
49+
50+
private static final Logger LOG = LoggerFactory.getLogger(HudiLakeWriter.class);
51+
52+
private final RecordWriter recordWriter;
53+
private final TableInfo tableInfo;
54+
private final HudiWriteTableInfo hudiTableInfo;
55+
private final CkpMetadata ckpMetadata;
56+
57+
public HudiLakeWriter(
58+
HudiCatalogProvider hudiCatalogProvider,
59+
CkpMetadataProvider ckpMetadataProvider,
60+
WriterInitContext writerInitContext)
61+
throws IOException {
62+
validateWriterInitContext(writerInitContext);
63+
this.tableInfo = writerInitContext.tableInfo();
64+
this.hudiTableInfo =
65+
HudiWriteTableInfo.create(hudiCatalogProvider, tableInfo.getTablePath());
66+
this.ckpMetadata = ckpMetadataProvider.get(tableInfo.getTablePath(), hudiTableInfo);
67+
68+
if (writerInitContext.splitIndex() == 0) {
69+
ckpMetadata.bootstrap();
70+
initInstant(hudiTableInfo.getFlinkConfig(), hudiTableInfo.getMetaClient());
71+
LOG.info(
72+
"Initialized Hudi instant for first split of table {}, bucket {}.",
73+
tableInfo.getTablePath(),
74+
writerInitContext.tableBucket());
75+
}
76+
77+
this.recordWriter =
78+
new HudiRecordWriter(writerInitContext, tableInfo, hudiTableInfo, ckpMetadata);
79+
LOG.info("Created HudiLakeWriter with configuration {}.", hudiTableInfo.getFlinkConfig());
80+
}
81+
82+
@Override
83+
public void write(LogRecord record) throws IOException {
84+
try {
85+
recordWriter.write(record);
86+
} catch (Exception e) {
87+
throw new IOException("Failed to write Fluss record to Hudi.", e);
88+
}
89+
}
90+
91+
@Override
92+
public HudiWriteResult complete() throws IOException {
93+
try {
94+
Map<String, List<WriteStatus>> writeStatuses = recordWriter.complete();
95+
return new HudiWriteResult(writeStatuses, new HashMap<>());
96+
} catch (Exception e) {
97+
throw new IOException("Failed to complete Hudi write.", e);
98+
}
99+
}
100+
101+
@Override
102+
public void close() throws IOException {
103+
try {
104+
recordWriter.close();
105+
ckpMetadata.close();
106+
} catch (Exception e) {
107+
throw new IOException("Failed to close HudiLakeWriter.", e);
108+
}
109+
}
110+
111+
private void initInstant(Configuration configuration, HoodieTableMetaClient metaClient) {
112+
metaClient.reloadActiveTimeline();
113+
WriteOperationType writeOperationType =
114+
WriteOperationType.fromValue(configuration.get(FlinkOptions.OPERATION));
115+
hudiTableInfo.getWriteClient().preTxn(writeOperationType, metaClient);
116+
117+
String commitAction =
118+
CommitUtils.getCommitActionType(
119+
writeOperationType,
120+
HoodieTableType.valueOf(configuration.get(FlinkOptions.TABLE_TYPE)));
121+
String instant = hudiTableInfo.getWriteClient().startCommit(commitAction, metaClient);
122+
metaClient.getActiveTimeline().transitionRequestedToInflight(commitAction, instant);
123+
hudiTableInfo.getWriteClient().setWriteTimer(commitAction);
124+
ckpMetadata.startInstant(instant);
125+
LOG.info(
126+
"Created Hudi instant {} for table {} with type {}.",
127+
instant,
128+
tableInfo.getTablePath(),
129+
configuration.get(FlinkOptions.TABLE_TYPE));
130+
}
131+
132+
private static void validateWriterInitContext(WriterInitContext writerInitContext) {
133+
checkArgument(
134+
writerInitContext.splitIndex() != UNKNOWN_SPLIT_INDEX,
135+
"Hudi lake writer requires split index in WriterInitContext.");
136+
checkArgument(
137+
writerInitContext.tieringRoundTimestamp() != UNKNOWN_TIERING_ROUND_TIMESTAMP,
138+
"Hudi lake writer requires tiering round timestamp in WriterInitContext.");
139+
}
140+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.lake.hudi.tiering;
19+
20+
import org.apache.hudi.client.WriteStatus;
21+
22+
import java.io.Serializable;
23+
import java.util.List;
24+
import java.util.Map;
25+
26+
/** Write result produced by the Hudi lake writer and consumed by a future Hudi committer. */
27+
public class HudiWriteResult implements Serializable {
28+
29+
private static final long serialVersionUID = 1L;
30+
31+
private final Map<String, List<WriteStatus>> writeStatuses;
32+
private final Map<String, List<WriteStatus>> compactionWriteStatuses;
33+
34+
public HudiWriteResult(
35+
Map<String, List<WriteStatus>> writeStatuses,
36+
Map<String, List<WriteStatus>> compactionWriteStatuses) {
37+
this.writeStatuses = writeStatuses;
38+
this.compactionWriteStatuses = compactionWriteStatuses;
39+
}
40+
41+
public Map<String, List<WriteStatus>> getWriteStatuses() {
42+
return writeStatuses;
43+
}
44+
45+
public Map<String, List<WriteStatus>> getCompactionWriteStatuses() {
46+
return compactionWriteStatuses;
47+
}
48+
}

0 commit comments

Comments
 (0)