Skip to content

Commit 981933e

Browse files
committed
[hotfix-38914][kafka] Throw no-restart exception when there is no data in all partitions.
Signed-off-by: tiezhu <[email protected]>
1 parent 4c46b9a commit 981933e

File tree

2 files changed

+42
-4
lines changed

2 files changed

+42
-4
lines changed

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/sample/OffsetFetcher.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package com.dtstack.flink.sql.source.kafka.sample;
2020

21+
import com.dtstack.flink.sql.source.kafka.throwable.KafkaSamplingUnavailableException;
2122
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
2223
import org.apache.kafka.clients.consumer.ConsumerConfig;
2324
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -41,7 +42,7 @@ default OffsetMap seekOffset(Properties props, String topic) {
4142
try (KafkaConsumer<?, ?> consumer = new KafkaConsumer<>(props)) {
4243
OffsetMap offsetMap = fetchOffset(consumer, topic);
4344

44-
judgeKafkaSampleIsAvailable(offsetMap);
45+
judgeKafkaSampleIsAvailable(offsetMap, topic);
4546

4647
return offsetMap;
4748
}
@@ -53,7 +54,7 @@ default OffsetMap seekOffset(Properties props, String topic) {
5354
*
5455
* @param offsetMap offset map
5556
*/
56-
default void judgeKafkaSampleIsAvailable(OffsetMap offsetMap) {
57+
default void judgeKafkaSampleIsAvailable(OffsetMap offsetMap, String topic) {
5758
boolean kafkaSampleIsAvailable = false;
5859
Map<KafkaTopicPartition, Long> latest = offsetMap.getLatest();
5960
Map<KafkaTopicPartition, Long> earliest = offsetMap.getEarliest();
@@ -68,8 +69,10 @@ default void judgeKafkaSampleIsAvailable(OffsetMap offsetMap) {
6869
}
6970

7071
if (!kafkaSampleIsAvailable) {
71-
throw new RuntimeException(
72-
"Kafka sample is unavailable because there is no data in all partitions");
72+
throw new KafkaSamplingUnavailableException(
73+
String.format(
74+
"Kafka sampling of [%s] is unavailable because there is no data in all partitions",
75+
topic));
7376
}
7477
}
7578

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.source.kafka.throwable;
20+
21+
import org.apache.flink.runtime.throwable.ThrowableAnnotation;
22+
import org.apache.flink.runtime.throwable.ThrowableType;
23+
import org.apache.flink.util.FlinkRuntimeException;
24+
25+
/**
26+
* @author tiezhu
27+
* @since 2021/6/17 星期四
28+
*/
29+
@ThrowableAnnotation(ThrowableType.NonRecoverableError)
30+
public class KafkaSamplingUnavailableException extends FlinkRuntimeException {
31+
32+
public KafkaSamplingUnavailableException(String message) {
33+
super(message);
34+
}
35+
}

0 commit comments

Comments
 (0)