diff --git a/src/clj_kafka/consumer/simple.clj b/src/clj_kafka/consumer/simple.clj index f3281ea..01878f3 100644 --- a/src/clj_kafka/consumer/simple.clj +++ b/src/clj_kafka/consumer/simple.clj @@ -37,3 +37,10 @@ hm (java.util.HashMap. {tp pori})] (let [response (.getOffsetsBefore consumer (OffsetRequest. hm (kafka.api.OffsetRequest/CurrentVersion) "clj-kafka-id"))] (first (.offsets response topic partition))))) + +(defn oldest-topic-offset [consumer topic partition] + (let [tp (TopicAndPartition. topic partition) + pori (PartitionOffsetRequestInfo. -2 1) + hm (java.util.HashMap. {tp pori})] + (let [response (.getOffsetsBefore consumer (OffsetRequest. hm (kafka.api.OffsetRequest/CurrentVersion) "clj-kafka-id"))] + (first (.offsets response topic partition))))) \ No newline at end of file