Skip to content

Commit bbb329c

Browse files
committed
Merge pull request #2 from spicavigo/refact_2
Refact 2
2 parents a904661 + 47d4cc9 commit bbb329c

File tree

6 files changed

+139
-45
lines changed

6 files changed

+139
-45
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
target
22
Cargo.lock
3+
src/main.rs

src/client.rs

+97-12
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
/// User facing module of this library.
2+
///
3+
/// Fetching message from multiple (topic, partition) pair or producing messages to multiple
4+
/// topics is not yet supported.
5+
/// It should be added very soon.
6+
17
use error::{Result, Error};
28
use utils;
39
use protocol;
@@ -11,19 +17,35 @@ const CLIENTID: &'static str = "kafka-rust";
1117
const DEFAULT_TIMEOUT: i32 = 120; // seconds
1218

1319

14-
#[derive(Default)]
15-
#[derive(Debug)]
20+
/// Client struct. It keeps track of brokers and topic metadata
21+
///
22+
/// # Examples
23+
///
24+
/// ```no_run
25+
/// let mut client = kafka::client::KafkaClient::new(&vec!("localhost:9092".to_string()));
26+
/// let res = client.load_metadata_all();
27+
/// ```
28+
///
29+
/// You will have to load metadata before making any other request.
30+
#[derive(Default, Debug)]
1631
pub struct KafkaClient {
17-
pub clientid: String,
18-
pub timeout: i32,
19-
pub hosts: Vec<String>,
20-
pub correlation: i32,
21-
pub conns: HashMap<String, KafkaConnection>,
32+
clientid: String,
33+
timeout: i32,
34+
hosts: Vec<String>,
35+
correlation: i32,
36+
conns: HashMap<String, KafkaConnection>,
2237
pub topic_partitions: HashMap<String, Vec<i32>>,
23-
pub topic_brokers: HashMap<String, String>
38+
topic_brokers: HashMap<String, String>
2439
}
2540

2641
impl KafkaClient {
42+
/// Create a new instance of KafkaClient
43+
///
44+
/// # Examples
45+
///
46+
/// ```no_run
47+
/// let mut client = kafka::client::KafkaClient::new(&vec!("localhost:9092".to_string()));
48+
/// ```
2749
pub fn new(hosts: &Vec<String>) -> KafkaClient {
2850
KafkaClient { hosts: hosts.to_vec(), clientid: CLIENTID.to_string(),
2951
timeout: DEFAULT_TIMEOUT, ..KafkaClient::default()}
@@ -48,15 +70,18 @@ impl KafkaClient {
4870
}
4971

5072

73+
/// Resets and loads metadata for all topics.
5174
pub fn load_metadata_all(&mut self) -> Result<()>{
5275
self.reset_metadata();
5376
self.load_metadata(&vec!())
5477
}
5578

79+
/// Reloads metadata for a list of supplied topics
80+
///
81+
/// returns Result<(), error::Error>
5682
pub fn load_metadata (&mut self, topics: &Vec<String>) -> Result<()>{
5783
let resp = try!(self.get_metadata(topics));
5884

59-
6085
let mut brokers: HashMap<i32, String> = HashMap::new();
6186
for broker in resp.brokers {
6287
brokers.insert(broker.nodeid, format!("{}:{}", broker.host, broker.port));
@@ -81,6 +106,8 @@ impl KafkaClient {
81106
Ok(())
82107
}
83108

109+
/// Clears metadata stored in the client. You must load metadata after this call if you want
110+
/// to use the client
84111
pub fn reset_metadata(&mut self) {
85112
self.topic_partitions.clear();
86113
self.topic_brokers.clear();
@@ -101,11 +128,25 @@ impl KafkaClient {
101128
Err(Error::NoHostReachable)
102129
}
103130

104-
pub fn fetch_offsets(&mut self) {
131+
/// Fetch offsets for a list of topics
132+
/// Not implemented as yet.
133+
pub fn fetch_offsets(&mut self, _topics: &Vec<String>) {
105134
// TODO - Implement method to fetch offsets for more than 1 topic
106135

107136
}
108137

138+
/// Fetch offset for a topic.
139+
/// It gets the latest offset only. Support for getting earliest will be added soon
140+
///
141+
/// # Examples
142+
///
143+
/// ```no_run
144+
/// let mut client = kafka::client::KafkaClient::new(&vec!("localhost:9092".to_string()));
145+
/// let res = client.load_metadata_all();
146+
/// let offsets = client.fetch_topic_offset(&"my-topic".to_string());
147+
/// ```
148+
/// Returns a vector of (topic, partition offset data).
149+
/// PartitionOffset will contain parition and offset info Or Error code as returned by Kafka.
109150
pub fn fetch_topic_offset(&mut self, topic: &String) -> Result<Vec<(String, Vec<utils::PartitionOffset>)>> {
110151
// Doing it like this because HashMap will not return borrow of self otherwise
111152
let partitions = self.topic_partitions
@@ -160,6 +201,20 @@ impl KafkaClient {
160201
}
161202
}
162203

204+
/// Fetch messages from Kafka
205+
///
206+
/// It takes a single topic, parition and offset and return a vector of messages
207+
/// or error::Error
208+
/// You can figure out the appropriate partition and offset using client's
209+
/// client.topic_partitions and client.fetch_topic_offset(topic)
210+
///
211+
/// # Examples
212+
///
213+
/// ```no_run
214+
/// let mut client = kafka::client::KafkaClient::new(&vec!("localhost:9092".to_string()));
215+
/// let res = client.load_metadata_all();
216+
/// let msgs = client.fetch_messages(&"my-topic".to_string(), 0, 0);
217+
/// ```
163218
pub fn fetch_messages(&mut self, topic: &String, partition: i32, offset: i64) -> Result<Vec<utils::OffsetMessage>>{
164219

165220
let host = self.get_broker(topic, partition).unwrap();
@@ -171,16 +226,46 @@ impl KafkaClient {
171226
Ok(resp.get_messages())
172227
}
173228

229+
/// Send a message to Kafka
230+
///
231+
/// You can figure out the appropriate partition and offset using client's
232+
/// client.topic_partitions and client.fetch_topic_offset(topic)
233+
///
234+
/// `required_acks` - indicates how many acknowledgements the servers should receive before
235+
/// responding to the request. If it is 0 the server will not send any response
236+
/// (this is the only case where the server will not reply to a request).
237+
/// If it is 1, the server will wait the data is written to the local log before sending
238+
/// a response. If it is -1 the server will block until the message is committed by all
239+
/// in sync replicas before sending a response. For any number > 1 the server will block
240+
/// waiting for this number of acknowledgements to occur (but the server will never wait
241+
/// for more acknowledgements than there are in-sync replicas).
242+
///
243+
/// `timeout` - This provides a maximum time in milliseconds the server can await the
244+
/// receipt of the number of acknowledgements in `required_acks`
245+
/// `message` - A single message as a vector of u8s
246+
///
247+
/// # Example
248+
///
249+
/// ```no_run
250+
/// let mut client = kafka::client::KafkaClient::new(&vec!("localhost:9092".to_string()));
251+
/// let res = client.load_metadata_all();
252+
/// let msgs = client.send_message(&"my-topic".to_string(), 0, 1,
253+
/// 100, &"b".to_string().into_bytes());
254+
/// ```
255+
/// The return value will contain topic, partition, offset and error if any
256+
/// OR error:Error
174257
pub fn send_message(&mut self, topic: &String, partition: i32, required_acks: i16,
175-
timeout: i32, message: &Vec<u8>) -> Result<protocol::ProduceResponse> {
258+
timeout: i32, message: &Vec<u8>) -> Result<Vec<utils::TopicPartitionOffset>> {
176259

177260
let host = self.get_broker(topic, partition).unwrap();
178261

179262
let correlation = self.next_id();
180263
let req = protocol::ProduceRequest::new_single(topic, partition, required_acks,
181264
timeout, message, correlation, &self.clientid);
182265

183-
self.send_receive::<protocol::ProduceRequest, protocol::ProduceResponse>(&host, req)
266+
let resp = try!(self.send_receive
267+
::<protocol::ProduceRequest, protocol::ProduceResponse>(&host, req));
268+
Ok(resp.get_response())
184269

185270
}
186271

src/crc32.rs

+9-5
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,13 @@ pub struct Crc32 {
66
impl Crc32 {
77
pub fn tocrc(buf: &[u8]) -> u32 {
88
let mut crc = Crc32 { table: [0; 256], value: 0xffffffff };
9-
let _ = (0..256).map(|x| crc.table[x as usize] = Crc32::calc_table(x as u32));
10-
let _ = buf.iter().map(|&x| crc.value = crc.calc_value(x as u32));
11-
crc.value ^ 0xffffffff
9+
for i in 0..256 {
10+
crc.table[i] = Crc32::calc_table(i as u32);
11+
}
12+
for &i in buf {
13+
crc.value = crc.calc_value(i as u32);
14+
}
15+
crc.value ^ 0xffffffffu32
1216
}
1317

1418
fn calc_value(&self, b: u32) -> u32 {
@@ -17,12 +21,12 @@ impl Crc32 {
1721
}
1822

1923
fn calc_table(index: u32) -> u32{
20-
let mut temp = index as u32;
24+
let mut temp = index;
2125
for _ in 0..8 {
2226
if temp & 1 == 0 {
2327
temp = temp >> 1
2428
} else {
25-
temp = 0xedb88320 ^ (temp >> 1)
29+
temp = 0xedb88320u32 ^ (temp >> 1)
2630
}
2731
}
2832
temp

src/lib.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
extern crate num;
22
extern crate byteorder;
33

4-
mod error;
5-
mod utils;
4+
pub mod error;
5+
pub mod utils;
66
mod crc32;
77
mod snappy;
88
mod codecs;

src/main.rs

-25
This file was deleted.

src/protocol.rs

+30-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ pub struct TopicPartitionProduceResponse {
8383
#[derive(Default, Debug, Clone)]
8484
pub struct PartitionProduceResponse {
8585
pub partition: i32,
86-
pub error: i32,
86+
pub error: i16,
8787
pub offset: i64
8888
}
8989

@@ -332,6 +332,35 @@ impl PartitionProduceRequest {
332332
}
333333
}
334334

335+
impl ProduceResponse {
336+
pub fn get_response(& self) -> Vec<TopicPartitionOffset>{
337+
self.topic_partitions
338+
.iter()
339+
.flat_map(|ref tp| tp.get_response(&tp.topic))
340+
.collect()
341+
}
342+
}
343+
344+
impl TopicPartitionProduceResponse {
345+
pub fn get_response(& self, topic: &String) -> Vec<TopicPartitionOffset>{
346+
self.partitions
347+
.iter()
348+
.map(|ref p| p.get_response(topic))
349+
.collect()
350+
}
351+
}
352+
353+
impl PartitionProduceResponse {
354+
pub fn get_response(& self, topic: &String) -> TopicPartitionOffset{
355+
TopicPartitionOffset{
356+
topic: topic.clone(),
357+
partition: self.partition,
358+
offset:self.offset,
359+
error: self.error
360+
}
361+
}
362+
}
363+
335364
impl FetchRequest {
336365

337366
pub fn new_single(topic: &String, partition: i32, offset: i64,

0 commit comments

Comments
 (0)