Skip to content

Commit 2fda120

Browse files
committed
Updated the Errors in examples
1 parent bc5cd92 commit 2fda120

File tree

8 files changed

+40
-29
lines changed

8 files changed

+40
-29
lines changed

Cargo.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ time = "0.3.7"
3232
rand = "0.8.5"
3333
lazy_static = "1.4.0"
3434
anyhow = "1.0.55"
35-
35+
log = "0.4.16"
36+
error-chain = "0.12.4"
3637

3738
[features]
3839
default = ["snappy", "gzip", "security"]

examples/console-producer.rs

+17-6
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ use std::str::FromStr;
66
use std::time::Duration;
77
use std::{env, process};
88

9+
use anyhow::anyhow;
10+
use kafka::Error;
11+
912
use kafka::client::{
1013
Compression, KafkaClient, RequiredAcks, DEFAULT_CONNECTION_IDLE_TIMEOUT_MILLIS,
1114
};
@@ -159,7 +162,8 @@ fn send_batch(producer: &mut Producer, batch: &[Record<'_, (), Trimmed>]) -> Res
159162
for r in rs {
160163
for tpc in r.partition_confirms {
161164
if let Err(code) = tpc.offset {
162-
return Err(ErrorKind::Kafka(kafka::error::ErrorKind::Kafka(code)));
165+
//return Err(Error::Kafka(kafka::error::Error::Kafka(code)));
166+
return Err(anyhow!("{:?}", code));
163167
}
164168
}
165169
}
@@ -199,11 +203,11 @@ impl Config {
199203

200204
let m = match opts.parse(&args[1..]) {
201205
Ok(m) => m,
202-
Err(e) => return Err(e),
206+
Err(e) => return Err(anyhow!("Error {:?}", e)),
203207
};
204208
if m.opt_present("help") {
205209
let brief = format!("{} [options]", args[0]);
206-
return Err(opts.usage(&brief));
210+
return Err(anyhow!("opts usage: {:?}", opts.usage(&brief)));
207211
}
208212
Ok(Config {
209213
brokers: m
@@ -221,14 +225,21 @@ impl Config {
221225
Some(ref s) if s.eq_ignore_ascii_case("gzip") => Compression::GZIP,
222226
#[cfg(feature = "snappy")]
223227
Some(ref s) if s.eq_ignore_ascii_case("snappy") => Compression::SNAPPY,
224-
Some(s) => return Err(format!("Unsupported compression type: {}", s)),
228+
Some(s) => {
229+
return Err(anyhow!(
230+
"Error {:?}",
231+
format!("Unsupported compression type: {}", s)
232+
))
233+
}
225234
},
226235
required_acks: match m.opt_str("required-acks") {
227236
None => RequiredAcks::One,
228237
Some(ref s) if s.eq_ignore_ascii_case("none") => RequiredAcks::None,
229238
Some(ref s) if s.eq_ignore_ascii_case("one") => RequiredAcks::One,
230239
Some(ref s) if s.eq_ignore_ascii_case("all") => RequiredAcks::All,
231-
Some(s) => return Err(format!("Unknown --required-acks argument: {}", s)),
240+
Some(s) => {
241+
return Err(anyhow!("{:?}", format!("Unknown --required-acks argument: {}", s)))
242+
}
232243
},
233244
batch_size: to_number(m.opt_str("batch-size"), 1)?,
234245
conn_idle_timeout: Duration::from_millis(to_number(
@@ -248,7 +259,7 @@ fn to_number<N: FromStr>(s: Option<String>, _default: N) -> Result<N> {
248259
None => Ok(_default),
249260
Some(s) => match s.parse::<N>() {
250261
Ok(n) => Ok(n),
251-
Err(_) => return Err(format!("Not a number: {}", s)),
262+
Err(_) => return Err(anyhow!("{:?}", format!("Not a number: {}", s))),
252263
},
253264
}
254265
}

examples/example-fetch.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,10 @@ fn main() {
3737
for t in resp.topics() {
3838
for p in t.partitions() {
3939
match p.data() {
40-
&Err(ref e) => {
40+
Err(ref e) => {
4141
println!("partition error: {}:{}: {}", t.topic(), p.partition(), e)
4242
}
43-
&Ok(ref data) => {
43+
Ok(ref data) => {
4444
println!(
4545
"topic: {} / partition: {} / latest available message \
4646
offset: {}",

examples/offset-monitor.rs

+11-10
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::process;
77
use std::thread;
88
//use std::time as stdtime;
99
use std::time::Duration;
10+
use std::time::SystemTime;
1011

1112
use kafka::client::{FetchOffset, GroupOffsetStorage, KafkaClient};
1213

@@ -45,7 +46,7 @@ fn run(cfg: Config) -> Result<()> {
4546
let ts = client.topics();
4647
let num_topics = ts.len();
4748
if num_topics == 0 {
48-
return Err("no topics available");
49+
return Err(Error::from("no topics available"));
4950
}
5051
let mut names: Vec<&str> = Vec::with_capacity(ts.len());
5152
names.extend(ts.names());
@@ -55,12 +56,12 @@ fn run(cfg: Config) -> Result<()> {
5556
for name in names {
5657
let _ = writeln!(buf, "topic: {}", name);
5758
}
58-
return Err("choose a topic");
59+
return Err(Error::from("choose a topic"));
5960
}
6061

6162
// ~ otherwise let's loop over the topic partition offsets
6263
let num_partitions = match client.topics().partitions(&cfg.topic) {
63-
None => return Err(format!("no such topic: {}", &cfg.topic)),
64+
None => return Err(Error::from(format!("no such topic: {}", &cfg.topic))),
6465
Some(partitions) => partitions.len(),
6566
};
6667
let mut state = State::new(num_partitions, cfg.commited_not_consumed);
@@ -70,7 +71,7 @@ fn run(cfg: Config) -> Result<()> {
7071
// ~ initialize the state
7172
let mut first_time = true;
7273
loop {
73-
let t = time::Time::now();
74+
let t = SystemTime::now();
7475
state.update_partitions(&mut client, &cfg.topic, &cfg.group)?;
7576
if first_time {
7677
state.curr_to_prev();
@@ -225,14 +226,14 @@ impl<W: Write> Printer<W> {
225226
}
226227
}
227228

228-
fn print_offsets(&mut self, time: &time::Time, partitions: &[Partition]) -> Result<()> {
229+
fn print_offsets(&mut self, time: &SystemTime, partitions: &[Partition]) -> Result<()> {
229230
self.out_buf.clear();
230231
{
231232
// ~ format
232233
use std::fmt::Write;
233234

234235
self.fmt_buf.clear();
235-
let _ = write!(self.fmt_buf, "{}", time.format(&self.timefmt));
236+
let _ = write!(self.fmt_buf, "{}", time.elapsed().unwrap().as_secs());
236237
let _ = write!(self.out_buf, "{1:<0$}", self.time_width, self.fmt_buf);
237238
if self.print_summary {
238239
let mut prev_latest = 0;
@@ -323,11 +324,11 @@ impl Config {
323324

324325
let m = match opts.parse(&args[1..]) {
325326
Ok(m) => m,
326-
Err(e) => return Err(e),
327+
Err(e) => return Err(Error::from(e)),
327328
};
328329
if m.opt_present("help") {
329330
let brief = format!("{} [options]", args[0]);
330-
return Err(opts.usage(&brief));
331+
return Err(Error::from(opts.usage(&brief)));
331332
}
332333
let mut offset_storage = GroupOffsetStorage::Zookeeper;
333334
if let Some(s) = m.opt_str("storage") {
@@ -336,14 +337,14 @@ impl Config {
336337
} else if s.eq_ignore_ascii_case("kafka") {
337338
offset_storage = GroupOffsetStorage::Kafka;
338339
} else {
339-
return Err(format!("unknown offset store: {}", s));
340+
return Err(Error::from(format!("unknown offset store: {}", s)));
340341
}
341342
}
342343
let mut period = Duration::from_secs(5);
343344
if let Some(s) = m.opt_str("sleep") {
344345
match s.parse::<u64>() {
345346
Ok(n) if n != 0 => period = Duration::from_secs(n),
346-
_ => return Err(format!("not a number greater than zero: {}", s)),
347+
_ => return Err(Error::from(format!("not a number greater than zero: {}", s))),
347348
}
348349
}
349350
Ok(Config {

src/client/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1006,10 +1006,10 @@ impl KafkaClient {
10061006
/// for t in resp.topics() {
10071007
/// for p in t.partitions() {
10081008
/// match p.data() {
1009-
/// &Err(ref e) => {
1009+
/// Err(ref e) => {
10101010
/// println!("partition error: {}:{}: {}", t.topic(), p.partition(), e)
10111011
/// }
1012-
/// &Ok(ref data) => {
1012+
/// Ok(ref data) => {
10131013
/// println!("topic: {} / partition: {} / latest available message offset: {}",
10141014
/// t.topic(), p.partition(), data.highwatermark_offset());
10151015
/// for msg in data.messages() {

src/error.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
//! Error struct and methods
22
33
use std::{io, result, sync::Arc};
4+
use thiserror::Error;
45

56
pub type Result<T> = result::Result<T, Error>;
67

7-
#[derive(Debug, thiserror::Error)]
8+
#[derive(Debug, Error)]
89
pub enum Error {
910
#[error(transparent)]
1011
Io(#[from] io::Error),

tests/integration/consumer_producer/consumer.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ fn test_consumer_non_existent_topic() {
155155
.unwrap_err();
156156

157157
let error_code = match consumer_err {
158-
error::Error(error::ErrorKind::Kafka(code), _) => code,
158+
error::Error::Kafka(code) => code,
159159
_ => panic!("Should have received Kafka error"),
160160
};
161161

tests/integration/consumer_producer/producer.rs

+3-6
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ fn test_producer_send_non_existent_topic() {
2323
.send(&Record::from_value("non-topic", "foo".as_bytes()))
2424
.unwrap_err()
2525
{
26-
error::Error(error::ErrorKind::Kafka(code), _) => code,
26+
error::Error::Kafka(code) => code,
2727
_ => panic!("Should have received Kafka error"),
2828
};
2929

@@ -45,10 +45,7 @@ fn test_producer_send_all() {
4545
assert_eq!(TEST_TOPIC_NAME.to_owned(), confirm.topic);
4646

4747
for partition_confirm in confirm.partition_confirms {
48-
assert!(
49-
partition_confirm.offset.is_ok(),
50-
format!("should have sent successfully. Got: {:?}", partition_confirm.offset)
51-
);
48+
assert!(partition_confirm.offset.is_ok(), "should have sent successfully.");
5249
}
5350
}
5451
}
@@ -63,7 +60,7 @@ fn test_producer_send_all_non_existent_topic() {
6360
];
6461

6562
let error_code = match producer.send_all(records).unwrap_err() {
66-
error::Error(error::ErrorKind::Kafka(code), _) => code,
63+
error::Error::Kafka(code) => code,
6764
_ => panic!("Should have received Kafka error"),
6865
};
6966

0 commit comments

Comments
 (0)