Skip to content

Commit a0b27ca

Browse files
committed
streamnative#386: Check serializability of crypto key reader and encryption keys.
1 parent 50b9e9b commit a0b27ca

File tree

2 files changed

+8
-0
lines changed

2 files changed

+8
-0
lines changed

pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSink.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@
3535
import java.util.Set;
3636
import java.util.concurrent.CompletableFuture;
3737

38+
import static org.apache.flink.util.InstantiationUtil.isSerializable;
3839
import static org.apache.flink.util.Preconditions.checkNotNull;
40+
import static org.apache.flink.util.Preconditions.checkState;
3941

4042
/**
4143
* Write data to Flink.
@@ -136,6 +138,8 @@ public FlinkPulsarSink<T> build(){
136138
if ((cryptoKeyReader == null) != (encryptionKeys.isEmpty())){
137139
throw new IllegalStateException("Set crypto key reader and encryption keys in conjunction.");
138140
}
141+
checkState(isSerializable(cryptoKeyReader));
142+
checkState(isSerializable(encryptionKeys));
139143
return new FlinkPulsarSink<>(this);
140144
}
141145

pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,11 @@
8888
import static org.apache.flink.streaming.connectors.pulsar.internal.metrics.PulsarSourceMetrics.COMMITS_FAILED_METRICS_COUNTER;
8989
import static org.apache.flink.streaming.connectors.pulsar.internal.metrics.PulsarSourceMetrics.COMMITS_SUCCEEDED_METRICS_COUNTER;
9090
import static org.apache.flink.streaming.connectors.pulsar.internal.metrics.PulsarSourceMetrics.PULSAR_SOURCE_METRICS_GROUP;
91+
import static org.apache.flink.util.InstantiationUtil.isSerializable;
9192
import static org.apache.flink.util.Preconditions.checkArgument;
9293
import static org.apache.flink.util.Preconditions.checkNotNull;
94+
import static org.apache.flink.util.Preconditions.checkState;
95+
9396

9497
/**
9598
* Pulsar data source.
@@ -168,6 +171,7 @@ public FlinkPulsarSource<T> build(){
168171
if (clientConf == null){
169172
throw new IllegalStateException("Client conf mustn't be null. Either provide a client conf or a service URL plus properties.");
170173
}
174+
checkState(isSerializable(cryptoKeyReader));
171175
return new FlinkPulsarSource<>(this);
172176
}
173177

0 commit comments

Comments
 (0)