From 8281cbf7624c3a4eb90bf58671daf76843d00819 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Mon, 22 Nov 2021 11:05:05 +0800 Subject: [PATCH] [HUDI-2799] Fix the classloader of flink write task (#4042) --- .../java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java | 2 ++ .../apache/hudi/sink/common/AbstractStreamWriteFunction.java | 2 ++ .../main/java/org/apache/hudi/sink/compact/CompactFunction.java | 2 ++ 3 files changed, 6 insertions(+) diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java index 4089907243c8..a5980dbb1b83 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java @@ -111,6 +111,8 @@ public BulkInsertWriteFunction(Configuration config, RowType rowType) { @Override public void open(Configuration parameters) throws IOException { + // always use the user classloader + Thread.currentThread().setContextClassLoader(getRuntimeContext().getUserCodeClassLoader()); this.taskID = getRuntimeContext().getIndexOfThisSubtask(); this.metaClient = StreamerUtil.createMetaClient(this.config); this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext()); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java index 0e7300591286..7fbbb140324d 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java @@ -125,6 +125,8 @@ public AbstractStreamWriteFunction(Configuration config) { @Override public void initializeState(FunctionInitializationContext context) throws Exception { + // always use the user classloader + Thread.currentThread().setContextClassLoader(getRuntimeContext().getUserCodeClassLoader()); this.taskID = getRuntimeContext().getIndexOfThisSubtask(); this.metaClient = StreamerUtil.createMetaClient(this.config); this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext()); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java index 560b5ffbad30..73392a571210 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java @@ -75,6 +75,8 @@ public CompactFunction(Configuration conf) { @Override public void open(Configuration parameters) throws Exception { + // always use the user classloader + Thread.currentThread().setContextClassLoader(getRuntimeContext().getUserCodeClassLoader()); this.taskID = getRuntimeContext().getIndexOfThisSubtask(); this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext()); if (this.asyncCompaction) {