Skip to content

Commit a344dfe

Browse files
authored
Merge pull request #537 from bytebeamio/deferred-acks
feat(rumqttd): Add deferred acking
2 parents 2a5fc87 + 6a86513 commit a344dfe

File tree

11 files changed

+741
-67
lines changed

11 files changed

+741
-67
lines changed

rumqttd/rumqttd.toml

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ max_connections = 10010
1616
[v4.1]
1717
name = "v4-1"
1818
listen = "0.0.0.0:1883"
19-
persistence = true
2019
next_connection_delay_ms = 1
2120
[v4.1.connections]
2221
connection_timeout_ms = 60000
@@ -45,10 +44,23 @@ next_connection_delay_ms = 10
4544
max_inflight_count = 100
4645
max_inflight_size = 1024
4746

47+
[v4.3]
48+
name = "v4-3"
49+
listen = "0.0.0.0:1884"
50+
persistence = true
51+
next_connection_delay_ms = 1
52+
[v4.3.connections]
53+
connection_timeout_ms = 60000
54+
max_client_id_len = 256
55+
throttle_delay_ms = 0
56+
max_payload_size = 20480
57+
max_inflight_count = 500
58+
max_inflight_size = 1024
59+
dynamic_filters = true
60+
4861
[v5.1]
4962
name = "v5-1"
50-
listen = "0.0.0.0:1884"
51-
persistence = false
63+
listen = "0.0.0.0:1885"
5264
next_connection_delay_ms = 1
5365
[v5.1.connections]
5466
connection_timeout_ms = 60000

rumqttd/src/link/local.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
use crate::protocol::{
22
ConnAck, Filter, LastWill, Packet, Publish, QoS, RetainForwardRule, Subscribe,
33
};
4-
use crate::router::Ack;
54
use crate::router::{
65
iobufs::{Incoming, Outgoing},
76
Connection, Event, Notification, ShadowRequest,
87
};
9-
use crate::ConnectionId;
8+
use crate::router::{Ack, AckData, FilterIdx};
9+
use crate::{ConnectionId, Offset};
1010
use bytes::Bytes;
1111
use flume::{Receiver, RecvError, RecvTimeoutError, SendError, Sender, TrySendError};
1212
use parking_lot::lock_api::MutexGuard;
1313
use parking_lot::{Mutex, RawMutex};
1414

15-
use std::collections::VecDeque;
15+
use std::collections::{HashMap, VecDeque};
1616
use std::mem;
1717
use std::sync::Arc;
1818
use std::time::Instant;
@@ -88,10 +88,10 @@ impl Link {
8888
let (mut message, i, o, link_rx) =
8989
Link::prepare(tenant_id, client_id, clean, last_will, dynamic_filters);
9090
if let Event::Connect {
91-
ref mut outgoing, ..
91+
ref mut connection, ..
9292
} = message
9393
{
94-
outgoing.persistent = persistent;
94+
connection.persistent = persistent;
9595
}
9696

9797
router_tx.send((0, message))?;
@@ -292,6 +292,15 @@ impl LinkTx {
292292
self.router_tx.try_send((self.connection_id, message))?;
293293
Ok(())
294294
}
295+
296+
pub async fn ack(&mut self, written: HashMap<FilterIdx, Offset>) -> Result<(), LinkError> {
297+
let ack_data = Event::AckData(AckData { read_map: written });
298+
299+
self.router_tx
300+
.send_async((self.connection_id, ack_data))
301+
.await?;
302+
Ok(())
303+
}
295304
}
296305

297306
#[derive(Debug)]

rumqttd/src/link/persistance.rs

Lines changed: 85 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,20 @@
1+
use crate::disk::Storage;
12
use crate::link::local::{LinkError, LinkRx, LinkTx};
23
use crate::link::network;
34
use crate::link::network::Network;
45
use crate::local::Link;
5-
use crate::protocol::{Protocol, Connect, LastWill, Packet, self};
6-
use crate::router::{Event, Notification};
7-
use crate::{ConnectionId, ConnectionSettings};
8-
use crate::disk::Storage;
6+
use crate::protocol::{self, Connect, LastWill, Packet, Protocol};
7+
use crate::router::{Event, FilterIdx, Notification};
8+
use crate::{ConnectionId, ConnectionSettings, Offset};
99

1010
use flume::{Receiver, RecvError, SendError, Sender, TrySendError};
11-
use tracing::{info, error, trace};
12-
use std::collections::{VecDeque};
13-
use std::{io, fs};
11+
use std::collections::{HashMap, VecDeque};
1412
use std::sync::Arc;
1513
use std::time::Duration;
14+
use std::{fs, io};
1615
use tokio::time::error::Elapsed;
1716
use tokio::{select, time};
17+
use tracing::{error, info, trace};
1818

1919
#[derive(Debug, thiserror::Error)]
2020
pub enum Error {
@@ -55,6 +55,7 @@ pub struct PersistanceLink<P: Protocol> {
5555
disk_handler: DiskHandler<P>,
5656
network_update_rx: Receiver<Network<P>>,
5757
connack: Notification,
58+
inflight_publishes: VecDeque<Notification>,
5859
}
5960

6061
pub(super) struct DiskHandler<P: Protocol> {
@@ -74,19 +75,43 @@ impl<P: Protocol> DiskHandler<P> {
7475
})
7576
}
7677

77-
pub fn write(&mut self, notifications: &mut VecDeque<Notification>) {
78+
pub fn write(
79+
&mut self,
80+
notifications: &mut VecDeque<Notification>,
81+
) -> HashMap<FilterIdx, Offset> {
82+
// let ack_list = VecDeque::new();
83+
84+
let mut stored_filter_offset_map: HashMap<FilterIdx, Offset> = HashMap::new();
7885
for notif in notifications.drain(..) {
79-
let packet_or_unscheduled = notif.into();
86+
let packet_or_unscheduled = notif.clone().into();
8087
if let Some(packet) = packet_or_unscheduled {
8188
if let Err(e) = self.protocol.write(packet, self.storage.writer()) {
8289
error!("Failed to write to storage: {e}");
90+
continue;
8391
}
8492

8593
if let Err(e) = self.storage.flush_on_overflow() {
8694
error!("Failed to flush storage: {e}");
95+
continue;
96+
}
97+
98+
match &notif {
99+
Notification::Forward(forward) => {
100+
stored_filter_offset_map
101+
.entry(forward.filter_idx)
102+
.and_modify(|cursor| {
103+
if forward.next_cursor > *cursor {
104+
*cursor = forward.next_cursor
105+
}
106+
})
107+
.or_insert(forward.next_cursor);
108+
}
109+
_ => continue,
87110
}
88111
}
89112
}
113+
114+
stored_filter_offset_map
90115
}
91116

92117
pub fn read(&mut self, buffer: &mut VecDeque<Packet>) {
@@ -105,7 +130,7 @@ impl<P: Protocol> DiskHandler<P> {
105130
let connection_buffer_length = buffer.len();
106131
//TODO: Don't hardcode max_connection_buffer_len
107132
if connection_buffer_length >= 100 {
108-
return
133+
return
109134
}
110135
}
111136
Err(protocol::Error::InsufficientBytes(_)) => {
@@ -180,6 +205,7 @@ impl<P: Protocol> PersistanceLink<P> {
180205
disk_handler: DiskHandler::new(&client_id, protocol)?,
181206
network_update_rx,
182207
connack: notification,
208+
inflight_publishes: VecDeque::with_capacity(100),
183209
},
184210
))
185211
}
@@ -220,38 +246,68 @@ impl<P: Protocol> PersistanceLink<P> {
220246
// write to disk
221247
o = self.link_rx.exchange(&mut self.notifications) => {
222248
o?;
223-
self.disk_handler.write(&mut self.notifications);
249+
// TODO: write only publishes
250+
self.write_to_disconnected_client().await?;
224251
}
225252
}
226253
}
227254
}
228255

229-
async fn write_to_client(&mut self) -> Result<(), Error> {
230-
// separate publish notifications out
231-
let mut publish = VecDeque::new();
232-
let mut non_publish = VecDeque::new();
256+
async fn write_to_disconnected_client(&mut self) -> Result<(), Error> {
233257
for notif in self.notifications.drain(..) {
234258
match notif {
235-
Notification::Forward(_) | Notification::ForwardWithProperties(_, _) => publish.push_back(notif),
236-
_ => non_publish.push_back(notif),
259+
Notification::Forward(_) | Notification::ForwardWithProperties(_, _) => {
260+
self.inflight_publishes.push_back(notif)
261+
}
262+
_ => continue,
237263
}
238-
};
264+
}
239265

240-
// write non-publishes to network
241-
let unscheduled = self.network.writev(&mut non_publish).await?;
266+
// write publishes to disk
267+
if !self.inflight_publishes.is_empty() {
268+
let acked_offsets = self.disk_handler.write(&mut self.inflight_publishes);
269+
if let Err(e) = self.link_tx.ack(acked_offsets).await {
270+
error!("Failed to inform router of read progress: {e}")
271+
};
272+
}
273+
Ok(())
274+
}
275+
276+
async fn write_to_active_client(&mut self) -> Result<(), Error> {
277+
// separate notifications out
278+
let mut unpersisted_messages = VecDeque::new();
279+
// let mut acked_publishes = VecDeque::new();
280+
for notif in self.notifications.drain(..) {
281+
match notif {
282+
Notification::Forward(_) | Notification::ForwardWithProperties(_, _) => {
283+
self.inflight_publishes.push_back(notif)
284+
}
285+
Notification::AckDone => {
286+
continue;
287+
}
288+
_ => unpersisted_messages.push_back(notif),
289+
}
290+
}
291+
292+
let unscheduled = self.network.writev(&mut unpersisted_messages).await?;
242293
if unscheduled {
243294
self.link_rx.wake().await?;
244295
};
245296

246297
// write publishes to disk
247-
if !publish.is_empty() {
248-
self.disk_handler.write(&mut publish);
298+
if !self.inflight_publishes.is_empty() {
299+
let acked_offsets = self.disk_handler.write(&mut self.inflight_publishes);
300+
if let Err(e) = self.link_tx.ack(acked_offsets).await {
301+
error!("Failed to inform router of read progress: {e}")
302+
};
249303
}
250304
// read publishes from disk
251305
let mut buffer = VecDeque::new();
252306
self.disk_handler.read(&mut buffer);
307+
253308
// TODO: if network write throws an error then this means we again got network I/O error
254309
// write publishes to network
310+
255311
let unscheduled = self.network.writev(&mut buffer).await?;
256312
if unscheduled {
257313
self.link_rx.wake().await?;
@@ -285,9 +341,12 @@ impl<P: Protocol> PersistanceLink<P> {
285341
Ok(packet) => self.read_from_client(packet).await?,
286342
// change state to disconnected on I/O connection errors and
287343
// wait for a reconnection
288-
Err(e) => match e.kind() {
289-
io::ErrorKind::ConnectionAborted | io::ErrorKind::ConnectionReset => return Ok(State::Disconnected),
290-
_ => return Err(e.into())
344+
Err(e) => {
345+
println!("some error while reading from the network? {e:?}");
346+
match e.kind() {
347+
io::ErrorKind::ConnectionAborted | io::ErrorKind::ConnectionReset => return Ok(State::Disconnected),
348+
_ => return Err(e.into())
349+
}
291350
}
292351
};
293352
}
@@ -296,7 +355,7 @@ impl<P: Protocol> PersistanceLink<P> {
296355
// exchange will through error if all senders to router are dropped
297356
// which is not possible since Persistent Link always lives
298357
o?;
299-
self.write_to_client().await?;
358+
self.write_to_active_client().await?;
300359
}
301360
}
302361
}

rumqttd/src/router/connection.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ pub struct Connection {
2222
pub last_will: Option<LastWill>,
2323
/// Connection events
2424
pub events: ConnectionEvents,
25+
/// Flag to identify if connection is persistent
26+
pub persistent: bool,
2527
}
2628

2729
impl Connection {
@@ -51,6 +53,7 @@ impl Connection {
5153
clean,
5254
subscriptions: HashSet::default(),
5355
last_will,
56+
persistent: false,
5457
events: ConnectionEvents::default(),
5558
}
5659
}

rumqttd/src/router/iobufs.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,6 @@ pub struct Outgoing {
6464
last_pkid: u16,
6565
/// Metrics of outgoing messages of this connection
6666
pub(crate) meter: OutgoingMeter,
67-
/// Flag to identify if `Outgoing` belongs to a `PersistentLink`
68-
pub(crate) persistent: bool,
6967
}
7068

7169
impl Outgoing {
@@ -86,7 +84,6 @@ impl Outgoing {
8684
handle,
8785
last_pkid: 0,
8886
meter: Default::default(),
89-
persistent: false,
9087
};
9188

9289
(outgoing, rx)
@@ -136,7 +133,7 @@ impl Outgoing {
136133
p.publish.pkid = self.last_pkid;
137134

138135
self.inflight_buffer
139-
.push_back((self.last_pkid, filter_idx, p.cursor));
136+
.push_back((self.last_pkid, filter_idx, p.curr_cursor));
140137

141138
// Place max pkid packet at index 0
142139
if self.last_pkid == MAX_PKID {

0 commit comments

Comments
 (0)