Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
tekjar committed May 31, 2021
1 parent 520f4a7 commit e0d459d
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 54 deletions.
2 changes: 1 addition & 1 deletion rumqttc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ rustdoc-args = ["--cfg", "docsrs"]
websocket = ["async-tungstenite", "ws_stream_tungstenite"]

[dependencies]
tokio = { version = "1.0", features = ["full"] }
tokio = { version = "1.0", features = ["net", "time"] }
bytes = "1.0"
webpki = "0.21"
tokio-rustls = "0.22"
Expand Down
105 changes: 52 additions & 53 deletions rumqttc/tests/reliability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,13 +299,13 @@ async fn requests_are_recovered_after_inflight_queue_size_falls_below_max() {
#[tokio::test]
async fn packet_id_collisions_are_detected_and_flow_control_is_applied() {
let mut options = MqttOptions::new("dummy", "127.0.0.1", 1891);
options.set_inflight(4).set_collision_safety(true);
options.set_inflight(10);

let mut eventloop = EventLoop::new(options, 5);
let requests_tx = eventloop.handle();

task::spawn(async move {
start_requests(8, QoS::AtLeastOnce, 0, requests_tx).await;
start_requests(15, QoS::AtLeastOnce, 0, requests_tx).await;
time::sleep(Duration::from_secs(60)).await;
});

Expand All @@ -326,23 +326,23 @@ async fn packet_id_collisions_are_detected_and_flow_control_is_applied() {
broker.ack(2).await;

// read and ack remaining packets in order
for i in 5..=10 {
for i in 5..=15 {
let packet = broker.read_publish().await;
let packet = packet.unwrap();
assert_eq!(packet.payload[0], i);
broker.ack(packet.pkid).await;
}

time::sleep(Duration::from_secs(5)).await;
time::sleep(Duration::from_secs(10)).await;
});

time::sleep(Duration::from_secs(1)).await;

// sends 4 requests. 5th request will trigger collision
// Poll until there is collision.
loop {
match eventloop.poll().await {
Err(ConnectionError::MqttState(StateError::Collision(1))) => break,
match eventloop.poll().await.unwrap() {
Event::Outgoing(Outgoing::AwaitAck(1)) => break,
v => {
println!("Poll = {:?}", v);
continue;
Expand All @@ -352,64 +352,63 @@ async fn packet_id_collisions_are_detected_and_flow_control_is_applied() {

loop {
let start = Instant::now();
let event = eventloop.poll().await;
let event = eventloop.poll().await.unwrap();
println!("Poll = {:?}", event);

match event {
Ok(Event::Outgoing(Outgoing::Publish(ack))) => {
Event::Outgoing(Outgoing::Publish(ack)) => {
if ack == 1 {
assert_eq!(start.elapsed().as_secs(), 5)
let elapsed = start.elapsed().as_millis() as i64;
let deviation_millis: i64 = (5000 - elapsed).abs();
assert!(deviation_millis < 10);
break;
}
}
Err(_) => break,
_ => continue,
}
}
}

#[tokio::test]
async fn packet_id_collisions_are_timedout_on_second_ping() {
let mut options = MqttOptions::new("dummy", "127.0.0.1", 1892);
options
.set_inflight(4)
.set_collision_safety(true)
.set_keep_alive(5);

let mut eventloop = EventLoop::new(options, 5);
let requests_tx = eventloop.handle();

task::spawn(async move {
start_requests(10, QoS::AtLeastOnce, 0, requests_tx).await;
time::sleep(Duration::from_secs(60)).await;
});

task::spawn(async move {
let mut broker = Broker::new(1892, 0).await;
// read all incoming packets first
for i in 1..=4 {
let packet = broker.read_publish().await;
assert_eq!(packet.unwrap().payload[0], i);
}

// out of order ack
broker.ack(3).await;
broker.ack(4).await;
time::sleep(Duration::from_secs(15)).await;
});

time::sleep(Duration::from_secs(1)).await;

// Collision error but no network disconneciton
match run(&mut eventloop, false).await {
Err(ConnectionError::MqttState(StateError::Collision(1))) => (),
o => panic!("Expecting collision error. Found = {:?}", o),
}

match run(&mut eventloop, false).await {
Err(ConnectionError::MqttState(StateError::CollisionTimeout)) => (),
o => panic!("Expecting collision error. Found = {:?}", o),
}
}
// #[tokio::test]
// async fn packet_id_collisions_are_timedout_on_second_ping() {
// let mut options = MqttOptions::new("dummy", "127.0.0.1", 1892);
// options.set_inflight(4).set_keep_alive(5);
//
// let mut eventloop = EventLoop::new(options, 5);
// let requests_tx = eventloop.handle();
//
// task::spawn(async move {
// start_requests(10, QoS::AtLeastOnce, 0, requests_tx).await;
// time::sleep(Duration::from_secs(60)).await;
// });
//
// task::spawn(async move {
// let mut broker = Broker::new(1892, 0).await;
// // read all incoming packets first
// for i in 1..=4 {
// let packet = broker.read_publish().await;
// assert_eq!(packet.unwrap().payload[0], i);
// }
//
// // out of order ack
// broker.ack(3).await;
// broker.ack(4).await;
// time::sleep(Duration::from_secs(15)).await;
// });
//
// time::sleep(Duration::from_secs(1)).await;
//
// // Collision error but no network disconneciton
// match run(&mut eventloop, false).await.unwrap() {
// Event::Outgoing(Outgoing::AwaitAck(1)) => (),
// o => panic!("Expecting collision error. Found = {:?}", o),
// }
//
// match run(&mut eventloop, false).await {
// Err(ConnectionError::MqttState(StateError::CollisionTimeout)) => (),
// o => panic!("Expecting collision error. Found = {:?}", o),
// }
// }

//
// All reconnection tests here
Expand Down

0 comments on commit e0d459d

Please sign in to comment.