diff --git a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/kafka/KafkaItemReader.java b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/kafka/KafkaItemReader.java index bde6dc4513..3773e846d6 100644 --- a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/kafka/KafkaItemReader.java +++ b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/kafka/KafkaItemReader.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2023 the original author or authors. + * Copyright 2019-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -56,6 +56,8 @@ public class KafkaItemReader extends AbstractItemStreamItemReader { private static final long DEFAULT_POLL_TIMEOUT = 30L; + private final String topicName; + private final List topicPartitions; private Map partitionOffsets; @@ -110,6 +112,7 @@ public KafkaItemReader(Properties consumerProperties, String topicName, List(); for (Integer partition : partitions) { @@ -174,10 +177,10 @@ public void open(ExecutionContext executionContext) { } } if (this.saveState && executionContext.containsKey(TOPIC_PARTITION_OFFSETS)) { - Map offsets = (Map) executionContext - .get(TOPIC_PARTITION_OFFSETS); - for (Map.Entry entry : offsets.entrySet()) { - this.partitionOffsets.put(entry.getKey(), entry.getValue() == 0 ? 0 : entry.getValue() + 1); + Map offsets = (Map) executionContext.get(TOPIC_PARTITION_OFFSETS); + for (Map.Entry entry : offsets.entrySet()) { + this.partitionOffsets.put(new TopicPartition(this.topicName, Integer.parseInt(entry.getKey())), + entry.getValue() == 0 ? 0 : entry.getValue() + 1); } } this.kafkaConsumer.assign(this.topicPartitions); @@ -203,7 +206,11 @@ public V read() { @Override public void update(ExecutionContext executionContext) { if (this.saveState) { - executionContext.put(TOPIC_PARTITION_OFFSETS, new HashMap<>(this.partitionOffsets)); + Map offsets = new HashMap<>(); + for (Map.Entry entry : this.partitionOffsets.entrySet()) { + offsets.put(String.valueOf(entry.getKey().partition()), entry.getValue()); + } + executionContext.put(TOPIC_PARTITION_OFFSETS, offsets); } this.kafkaConsumer.commitSync(); } diff --git a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/kafka/KafkaItemReaderIntegrationTests.java b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/kafka/KafkaItemReaderIntegrationTests.java index 11d36c89e1..e358fae51e 100644 --- a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/kafka/KafkaItemReaderIntegrationTests.java +++ b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/kafka/KafkaItemReaderIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2023 the original author or authors. + * Copyright 2019-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -256,8 +256,8 @@ void testReadFromSinglePartitionAfterRestart() throws ExecutionException, Interr future.get(); } ExecutionContext executionContext = new ExecutionContext(); - Map offsets = new HashMap<>(); - offsets.put(new TopicPartition("topic3", 0), 1L); + Map offsets = new HashMap<>(); + offsets.put("0", 1L); executionContext.put("topic.partition.offsets", offsets); // topic3-0: val0, val1, val2, val3, val4 @@ -297,9 +297,9 @@ void testReadFromMultiplePartitionsAfterRestart() throws ExecutionException, Int } ExecutionContext executionContext = new ExecutionContext(); - Map offsets = new HashMap<>(); - offsets.put(new TopicPartition("topic4", 0), 1L); - offsets.put(new TopicPartition("topic4", 1), 2L); + Map offsets = new HashMap<>(); + offsets.put("0", 1L); + offsets.put("1", 2L); executionContext.put("topic.partition.offsets", offsets); // topic4-0: val0, val2, val4, val6