Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public class RegexSourceIntegrationTest {
private static final int NUM_BROKERS = 1;
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);

private static final List<TopicPartition> ASSIGNED_PARTITIONS = new ArrayList<>();

@BeforeAll
public static void startCluster() throws IOException, InterruptedException {
CLUSTER.start();
Expand Down Expand Up @@ -254,6 +256,64 @@ public void testRegexRecordsAreProcessedAfterNewTopicCreatedWithMultipleSubtopol
}
}

@Test
public void shouldNotCrashIfPatternMatchesTopicHasNoData() throws Exception {
final String topic1 = "TEST-TOPIC-1";
final String topic2 = "TEST-TOPIC-2";

try {
CLUSTER.createTopic(topic1);

final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
builder.stream(Pattern.compile("not-a-match"));
final List<String> assignedTopics = new CopyOnWriteArrayList<>();

pattern1Stream
.selectKey((k, v) -> k)
.groupByKey()
.aggregate(() -> "", (k, v, a) -> v)
.toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));

final Topology topology = builder.build();
assertThat(topology.describe().subtopologies().size(), greaterThan(1));
streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier() {
@Override
public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config) {
return new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()) {
@Override
public void subscribe(final Pattern topics, final ConsumerRebalanceListener listener) {
super.subscribe(topics, new TheConsumerRebalanceListener(assignedTopics, listener));
}
};
}
});

startApplicationAndWaitUntilRunning(streams);
TestUtils.waitForCondition(() -> assignedTopics.contains(topic1) && !assignedTopics.contains(topic2), STREAM_TASKS_NOT_UPDATED);

CLUSTER.createTopic(topic2);
TestUtils.waitForCondition(() -> assignedTopics.contains(topic1) && assignedTopics.contains(topic2), STREAM_TASKS_NOT_UPDATED);

final KeyValue<String, String> record1 = new KeyValue<>("1", "1");
IntegrationTestUtils.produceKeyValuesSynchronously(
topic1,
Collections.singletonList(record1),
TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class),
CLUSTER.time
);
IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class),
outputTopic,
List.of(record1)
);

streams.close();
} finally {
CLUSTER.deleteTopics(topic1, topic2);
}
}

private String createTopic(final int suffix) throws InterruptedException {
final String outputTopic = "outputTopic_" + suffix;
CLUSTER.createTopic(outputTopic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,38 +465,29 @@ public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final boolean clean)
}
}

private OffsetAndMetadata findOffsetAndMetadata(final TopicPartition partition) {
private Optional<OffsetAndMetadata> findOffsetAndMetadata(final TopicPartition partition) {
Long offset = partitionGroup.headRecordOffset(partition);
Optional<Integer> leaderEpoch = partitionGroup.headRecordLeaderEpoch(partition);
final long partitionTime = partitionGroup.partitionTimestamp(partition);
if (offset == null) {
try {
if (nextOffsetsAndMetadataToBeConsumed.containsKey(partition)) {
final OffsetAndMetadata offsetAndMetadata = nextOffsetsAndMetadataToBeConsumed.get(partition);
offset = offsetAndMetadata.offset();
leaderEpoch = offsetAndMetadata.leaderEpoch();
} else {
// This indicates a bug and thus we rethrow it as fatal `IllegalStateException`
throw new IllegalStateException("Stream task " + id + " does not know the partition: " + partition);
}
} catch (final KafkaException fatal) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as there are no client calls anymore, the catch is redundant

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice slide cleanup.

throw new StreamsException(fatal);
final OffsetAndMetadata offsetAndMetadata = nextOffsetsAndMetadataToBeConsumed.get(partition);
if (offsetAndMetadata == null) {
// it may be that we have not yet consumed any record from this partition, hence nothing to commit
return Optional.empty();
}
offset = offsetAndMetadata.offset();
leaderEpoch = offsetAndMetadata.leaderEpoch();
}
return new OffsetAndMetadata(offset,
return Optional.of(new OffsetAndMetadata(offset,
leaderEpoch,
new TopicPartitionMetadata(partitionTime, processorContext.processorMetadata()).encode());
new TopicPartitionMetadata(partitionTime, processorContext.processorMetadata()).encode()));
}

private Map<TopicPartition, OffsetAndMetadata> committableOffsetsAndMetadata() {
final Map<TopicPartition, OffsetAndMetadata> committableOffsets;

switch (state()) {
case CREATED:
case RESTORING:
committableOffsets = Collections.emptyMap();

break;
return Collections.emptyMap();

case RUNNING:
case SUSPENDED:
Expand All @@ -505,12 +496,13 @@ private Map<TopicPartition, OffsetAndMetadata> committableOffsetsAndMetadata() {
// input partitions
final Set<TopicPartition> partitionsNeedCommit = processorContext.processorMetadata().needsCommit() ?
inputPartitions() : consumedOffsets.keySet();
committableOffsets = new HashMap<>(partitionsNeedCommit.size());

for (final TopicPartition partition : partitionsNeedCommit) {
committableOffsets.put(partition, findOffsetAndMetadata(partition));
}
break;
return partitionsNeedCommit.stream()
.map(partition -> findOffsetAndMetadata(partition)
.map(offsetAndMetadata -> Map.entry(partition, offsetAndMetadata)))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

case CLOSED:
throw new IllegalStateException("Illegal state " + state() + " while getting committable offsets for active task " + id);
Expand All @@ -519,7 +511,6 @@ private Map<TopicPartition, OffsetAndMetadata> committableOffsetsAndMetadata() {
throw new IllegalStateException("Unknown state " + state() + " while post committing active task " + id);
}

return committableOffsets;
}

@Override
Expand Down
Loading