Skip to content

Commit bfebae8

Browse files
committed
Test KafkaClient::commit_offset
1 parent d9d57c3 commit bfebae8

File tree

3 files changed

+44
-5
lines changed

3 files changed

+44
-5
lines changed

src/utils.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
55
/// A retrieved offset for a particular partition in the context of an
66
/// already known topic.
7-
#[derive(Debug)]
7+
#[derive(Debug, Hash, PartialEq, Eq)]
88
pub struct PartitionOffset {
99
pub offset: i64,
1010
pub partition: i32,

tests/integration/client/mod.rs

+34-4
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
use super::*;
77
use std::collections::HashSet;
88
use std::time::Duration;
9-
use kafka::client::{KafkaClient, FetchPartition, ProduceMessage, RequiredAcks};
9+
use kafka::client::{KafkaClient, PartitionOffset, FetchPartition, ProduceMessage, RequiredAcks};
1010
use kafka::client::fetch::Response;
1111

1212
fn flatten_fetched_messages(resps: &Vec<Response>) -> Vec<(&str, i32, &[u8])> {
@@ -65,9 +65,7 @@ fn test_kafka_client_load_metadata() {
6565

6666
#[test]
6767
fn test_produce_fetch_messages() {
68-
let hosts = vec![LOCAL_KAFKA_BOOTSTRAP_HOST.to_owned()];
69-
let mut client = KafkaClient::new(hosts.clone());
70-
client.load_metadata_all().unwrap();
68+
let mut client = new_ready_kafka_client();
7169

7270
// first send the messages and verify correct confirmation responses
7371
// from kafka
@@ -124,3 +122,35 @@ fn test_produce_fetch_messages() {
124122
.into_iter()
125123
.all(|c_msg| messages.contains(&c_msg)));
126124
}
125+
126+
#[test]
127+
fn test_commit_offset() {
128+
let mut client = new_ready_kafka_client();
129+
130+
for &(partition, offset) in
131+
&[(TEST_TOPIC_PARTITIONS[0], 100),
132+
(TEST_TOPIC_PARTITIONS[1], 200),
133+
(TEST_TOPIC_PARTITIONS[0], 300),
134+
(TEST_TOPIC_PARTITIONS[1], 400),
135+
(TEST_TOPIC_PARTITIONS[0], 500),
136+
(TEST_TOPIC_PARTITIONS[1], 600)] {
137+
138+
139+
client
140+
.commit_offset(TEST_GROUP_NAME, TEST_TOPIC_NAME, partition, offset)
141+
.unwrap();
142+
143+
let partition_offsets: HashSet<PartitionOffset> = client
144+
.fetch_group_topic_offsets(TEST_GROUP_NAME, TEST_TOPIC_NAME)
145+
.unwrap()
146+
.into_iter()
147+
.collect();
148+
149+
let correct_partition_offset = PartitionOffset {
150+
partition: partition,
151+
offset: offset,
152+
};
153+
154+
assert!(partition_offsets.contains(&correct_partition_offset));
155+
}
156+
}

tests/test_kafka.rs

+9
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ extern crate kafka;
33

44
#[cfg(feature = "integration_tests")]
55
mod integration {
6+
use kafka::client::KafkaClient;
7+
68
mod client;
79
mod consumer_producer;
810

@@ -12,4 +14,11 @@ mod integration {
1214
pub const TEST_GROUP_NAME: &str = "kafka-rust-tester";
1315
pub const TEST_TOPIC_PARTITIONS: [i32; 2] = [0, 1];
1416
pub const KAFKA_CONSUMER_OFFSETS_TOPIC_NAME: &str = "__consumer_offsets";
17+
18+
pub(crate) fn new_ready_kafka_client() -> KafkaClient {
19+
let hosts = vec![LOCAL_KAFKA_BOOTSTRAP_HOST.to_owned()];
20+
let mut client = KafkaClient::new(hosts);
21+
client.load_metadata_all().unwrap();
22+
client
23+
}
1524
}

0 commit comments

Comments
 (0)