Skip to content

Commit 8143c5b

Browse files
committed
Add KafkaHandleBase::get_offsets_for_times
1 parent c9f3b0c commit 8143c5b

File tree

6 files changed

+54
-0
lines changed

6 files changed

+54
-0
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
build
2+
include/cppkafka/config.h

CMakeLists.txt

+12
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,18 @@ include_directories(${Boost_INCLUDE_DIRS})
3434
find_package(RdKafka REQUIRED)
3535
include_directories(${RDKAFKA_INCLUDE_DIR})
3636

37+
if (HAVE_OFFSETS_FOR_TIMES)
38+
message(STATUS "Enabling support for KafkaHandleBase::get_offsets_for_times")
39+
set(CPPKAFKA_HAVE_OFFSET_FOR_TIMES ON)
40+
else()
41+
message(STATUS "Disabling support for KafkaHandleBase::get_offsets_for_times")
42+
endif()
43+
# Configuration file
44+
configure_file(
45+
"${PROJECT_SOURCE_DIR}/include/cppkafka/config.h.in"
46+
"${PROJECT_SOURCE_DIR}/include/cppkafka/config.h"
47+
)
48+
3749
add_subdirectory(src)
3850
add_subdirectory(include)
3951

cmake/FindRdKafka.cmake

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ include(CheckFunctionExists)
2424

2525
set(CMAKE_REQUIRED_LIBRARIES ${RDKAFKA_LIBRARY})
2626
check_function_exists(rd_kafka_committed HAVE_VALID_KAFKA_VERSION)
27+
check_function_exists(rd_kafka_offsets_for_times HAVE_OFFSETS_FOR_TIMES)
2728
set(CMAKE_REQUIRED_LIBRARIES)
2829

2930
if (HAVE_VALID_KAFKA_VERSION)

include/cppkafka/config.h.in

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
#ifndef CPPKAFKA_CONFIG_H
2+
#define CPPKAFKA_CONFIG_H
3+
4+
#cmakedefine CPPKAFKA_HAVE_OFFSET_FOR_TIMES
5+
6+
#endif // CPPKAFKA_CONFIG_H

include/cppkafka/kafka_handle_base.h

+17
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,17 @@
3434
#include <memory>
3535
#include <chrono>
3636
#include <unordered_map>
37+
#include <map>
3738
#include <mutex>
3839
#include <tuple>
40+
#include <chrono>
3941
#include <librdkafka/rdkafka.h>
4042
#include "topic_partition.h"
4143
#include "topic_partition_list.h"
4244
#include "topic_configuration.h"
4345
#include "configuration.h"
4446
#include "macros.h"
47+
#include "config.h"
4548

4649
namespace cppkafka {
4750

@@ -55,6 +58,7 @@ class TopicMetadata;
5558
class CPPKAFKA_API KafkaHandleBase {
5659
public:
5760
using OffsetTuple = std::tuple<int64_t, int64_t>;
61+
using TopicPartitionsTimestampsMap = std::map<TopicPartition, std::chrono::milliseconds>;
5862

5963
virtual ~KafkaHandleBase() = default;
6064
KafkaHandleBase(const KafkaHandleBase&) = delete;
@@ -152,6 +156,19 @@ class CPPKAFKA_API KafkaHandleBase {
152156
*/
153157
TopicMetadata get_metadata(const Topic& topic) const;
154158

159+
#ifdef CPPKAFKA_HAVE_OFFSET_FOR_TIMES
160+
161+
/**
162+
* \brief Gets topic/partition offsets based on timestamps
163+
*
164+
* This translates into a call to rd_kafka_offsets_for_times
165+
*
166+
* \param queries A map from topic/partition to the timestamp to be used
167+
*/
168+
TopicPartitionList get_offsets_for_times(const TopicPartitionsTimestampsMap& queries) const;
169+
170+
#endif // CPPKAFKA_HAVE_OFFSET_FOR_TIMES
171+
155172
/**
156173
* Returns the kafka handle name
157174
*/

src/kafka_handle_base.cpp

+17
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,23 @@ TopicMetadata KafkaHandleBase::get_metadata(const Topic& topic) const {
119119
return topics.front();
120120
}
121121

122+
#ifdef CPPKAFKA_HAVE_OFFSET_FOR_TIMES
123+
TopicPartitionList
124+
KafkaHandleBase::get_offsets_for_times(const TopicPartitionsTimestampsMap& queries) const {
125+
TopicPartitionList topic_partitions;
126+
for (const auto& query : queries) {
127+
const TopicPartition& topic_partition = query.first;
128+
topic_partitions.emplace_back(topic_partition.get_topic(), topic_partition.get_partition(),
129+
query.second.count());
130+
}
131+
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
132+
rd_kafka_resp_err_t result = rd_kafka_offsets_for_times(handle_.get(), topic_list_handle.get(),
133+
timeout_ms_.count());
134+
check_error(result);
135+
return convert(topic_list_handle);
136+
}
137+
#endif // CPPKAFKA_HAVE_OFFSET_FOR_TIMES
138+
122139
string KafkaHandleBase::get_name() const {
123140
return rd_kafka_name(handle_.get());
124141
}

0 commit comments

Comments
 (0)