diff --git a/src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkTask.java b/src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkTask.java index c49d13e..aa26b37 100644 --- a/src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkTask.java +++ b/src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkTask.java @@ -280,8 +280,11 @@ public void open(Collection partitions) { public void close(Collection partitions) { if (singleKinesisProducerPerPartition) { for (TopicPartition topicPartition : partitions) { - producerMap.get(topicPartition.partition() + "@" + topicPartition.topic()).destroy(); - producerMap.remove(topicPartition.partition() + "@" + topicPartition.topic()); + String producerName = topicPartition.partition() + "@" + topicPartition.topic(); + if (producerMap.containsKey(producerName)) { + producerMap.get(producerName).destroy(); + producerMap.remove(producerName); + } } } } @@ -295,6 +298,7 @@ public void stop() { kp.destroy(); } } else { + kinesisProducer.flushSync(); kinesisProducer.destroy(); }