Skip to content

Commit

Permalink
feat(rumqttd): Add metering (#508)
Browse files Browse the repository at this point in the history
Adds the ability to monitor data flow through a connection, i.e. reports the quant metrics of incoming and outgoing data through a connection to the broker.
  • Loading branch information
tekjar authored Nov 21, 2022
1 parent 3b597cb commit f9d1268
Show file tree
Hide file tree
Showing 16 changed files with 316 additions and 203 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ rumqttc

rumqttd
-------
- Add meters related to router, subscriptions, and connections (#505)
- Allow multi-tenancy validation for mtls clients with `Org` set in certificates
- Add meters related to router, subscriptions, and connections (#508)
- Allow multi-tenancy validation for mtls clients with `Org` set in certificates (#505)
- Add `tracing` for structured, context-aware logging (#499, #503)
- Add the ablity to change log levels and filters dynamically at runtime (#499)
- Added properties field to `Unsubscribe`, `UnsubAck`, and `Disconnect` packets so its consistent with other packets. (#480)
- Changed default segment size in demo config to 100MB (#484)
- Allow subscription on topic's starting with `test` (#494)
Expand Down
88 changes: 88 additions & 0 deletions rumqttd/examples/meters.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use rumqttd::{Broker, Config, GetMeter, Notification};
use std::{thread, time::Duration};

fn main() {
pretty_env_logger::init();

// As examples are compiled as seperate binary so this config is current path dependent. Run it
// from root of this crate
let config = config::Config::builder()
.add_source(config::File::with_name("demo.toml"))
.build()
.unwrap();

let config: Config = config.try_deserialize().unwrap();

dbg!(&config);

let broker = Broker::new(config);
let meters = broker.meters().unwrap();

let (mut link_tx, mut link_rx) = broker.link("consumer").unwrap();
link_tx.subscribe("hello/+/world").unwrap();
thread::spawn(move || {
let mut count = 0;
loop {
let notification = match link_rx.recv().unwrap() {
Some(v) => v,
None => continue,
};

match notification {
Notification::Forward(forward) => {
count += 1;
println!(
"Topic = {:?}, Count = {}, Payload = {} bytes",
forward.publish.topic,
count,
forward.publish.payload.len()
);
}
v => {
println!("{:?}", v);
}
}
}
});

for i in 0..5 {
let client_id = format!("client_{i}");
let topic = format!("hello/{}/world", client_id);
let payload = vec![0u8; 1_000]; // 0u8 is one byte, so total ~1KB
let (mut link_tx, _link_rx) = broker.link(&client_id).expect("New link should be made");

thread::spawn(move || {
for _ in 0..100 {
thread::sleep(Duration::from_secs(1));
link_tx.publish(topic.clone(), payload.clone()).unwrap();
}
});
}

loop {
// Router meters
let request = GetMeter::Router;
let v = meters.get(request).unwrap();
println!("{:#?}", v);

// Publisher meters
for i in 0..5 {
let client_id = format!("client_{i}");
let request = GetMeter::Connection(client_id);
let v = meters.get(request).unwrap();
println!("{:#?}", v);
}

// Commitlog meters
let request = GetMeter::Subscription("hello/+/world".to_owned());
let v = meters.get(request).unwrap();
println!("{:#?}", v);

// Consumer meters
let request = GetMeter::Connection("consumer".to_owned());
let v = meters.get(request).unwrap();
println!("{:#?}", v);

thread::sleep(Duration::from_secs(5));
}
}
5 changes: 4 additions & 1 deletion rumqttd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ pub type TopicId = usize;
pub type Offset = (u64, u64);
pub type Cursor = (u64, u64);

pub use link::local::{Link, LinkError, LinkRx, LinkTx};
pub use link::local;
pub use link::meters;

pub use router::GetMeter;
pub use router::Notification;
pub use server::Broker;

Expand Down
22 changes: 8 additions & 14 deletions rumqttd/src/link/console.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub struct ConsoleLink {
config: ConsoleSettings,
connection_id: ConnectionId,
router_tx: Sender<(ConnectionId, Event)>,
link_rx: LinkRx,
_link_rx: LinkRx,
}

impl ConsoleLink {
Expand All @@ -23,7 +23,7 @@ impl ConsoleLink {
ConsoleLink {
config,
router_tx,
link_rx,
_link_rx: link_rx,
connection_id,
}
}
Expand All @@ -48,8 +48,7 @@ pub fn start(console: Arc<ConsoleLink>) {
return rouille::Response::empty_404()
}

let v = console.link_rx.metrics();
rouille::Response::json(&v)
rouille::Response::text("OK").with_status_code(200)
},
(GET) (/device/{id: String}) => {
let event = Event::Metrics(MetricsRequest::Connection(id));
Expand All @@ -58,8 +57,7 @@ pub fn start(console: Arc<ConsoleLink>) {
return rouille::Response::empty_404()
}

let v = console.link_rx.metrics();
rouille::Response::json(&v)
rouille::Response::text("OK").with_status_code(200)
},
(GET) (/subscriptions) => {
let event = Event::Metrics(MetricsRequest::Subscriptions);
Expand All @@ -68,8 +66,7 @@ pub fn start(console: Arc<ConsoleLink>) {
return rouille::Response::empty_404()
}

let v = console.link_rx.metrics();
rouille::Response::json(&v)
rouille::Response::text("OK").with_status_code(200)
},
(GET) (/subscription/{filter: String}) => {
let filter = filter.replace('.', "/");
Expand All @@ -79,8 +76,7 @@ pub fn start(console: Arc<ConsoleLink>) {
return rouille::Response::empty_404()
}

let v = console.link_rx.metrics();
rouille::Response::json(&v)
rouille::Response::text("OK").with_status_code(200)
},
(GET) (/waiters/{filter: String}) => {
let filter = filter.replace('.', "/");
Expand All @@ -90,8 +86,7 @@ pub fn start(console: Arc<ConsoleLink>) {
return rouille::Response::empty_404()
}

let v = console.link_rx.metrics();
rouille::Response::json(&v)
rouille::Response::text("OK").with_status_code(200)
},
(GET) (/readyqueue) => {
let event = Event::Metrics(MetricsRequest::ReadyQueue);
Expand All @@ -100,8 +95,7 @@ pub fn start(console: Arc<ConsoleLink>) {
return rouille::Response::empty_404()
}

let v = console.link_rx.metrics();
rouille::Response::json(&v)
rouille::Response::text("OK").with_status_code(200)
},
(POST) (/logs) => {
info!("Reloading tracing filter");
Expand Down
32 changes: 8 additions & 24 deletions rumqttd/src/link/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::protocol::{
use crate::router::Ack;
use crate::router::{
iobufs::{Incoming, Outgoing},
Connection, Event, MetricsReply, Notification, ShadowRequest,
Connection, Event, Notification, ShadowRequest,
};
use crate::ConnectionId;
use bytes::Bytes;
Expand All @@ -15,7 +15,7 @@ use parking_lot::{Mutex, RawMutex};
use std::collections::VecDeque;
use std::mem;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Instant;

#[derive(Debug, thiserror::Error)]
pub enum LinkError {
Expand Down Expand Up @@ -50,9 +50,8 @@ impl Link {
Arc<Mutex<VecDeque<Packet>>>,
Arc<Mutex<VecDeque<Notification>>>,
Receiver<()>,
Receiver<MetricsReply>,
) {
let (connection, metrics_rx) = Connection::new(
let connection = Connection::new(
tenant_id,
client_id.to_owned(),
clean,
Expand All @@ -70,13 +69,7 @@ impl Link {
outgoing,
};

(
event,
incoming_data_buffer,
outgoing_data_buffer,
link_rx,
metrics_rx,
)
(event, incoming_data_buffer, outgoing_data_buffer, link_rx)
}

#[allow(clippy::new_ret_no_self)]
Expand All @@ -91,7 +84,7 @@ impl Link {
// Connect to router
// Local connections to the router shall have access to all subscriptions

let (message, i, o, link_rx, metrics_rx) =
let (message, i, o, link_rx) =
Link::prepare(tenant_id, client_id, clean, last_will, dynamic_filters);
router_tx.send((0, message))?;

Expand All @@ -106,7 +99,7 @@ impl Link {
};

let tx = LinkTx::new(id, router_tx.clone(), i);
let rx = LinkRx::new(id, router_tx, link_rx, metrics_rx, o);
let rx = LinkRx::new(id, router_tx, link_rx, o);
Ok((tx, rx, notification))
}

Expand All @@ -121,7 +114,7 @@ impl Link {
// Connect to router
// Local connections to the router shall have access to all subscriptions

let (message, i, o, link_rx, metrics_rx) =
let (message, i, o, link_rx) =
Link::prepare(tenant_id, client_id, clean, last_will, dynamic_filters);
router_tx.send_async((0, message)).await?;

Expand All @@ -135,7 +128,7 @@ impl Link {
};

let tx = LinkTx::new(id, router_tx.clone(), i);
let rx = LinkRx::new(id, router_tx, link_rx, metrics_rx, o);
let rx = LinkRx::new(id, router_tx, link_rx, o);
Ok((tx, rx, ack))
}
}
Expand Down Expand Up @@ -284,7 +277,6 @@ pub struct LinkRx {
connection_id: ConnectionId,
router_tx: Sender<(ConnectionId, Event)>,
router_rx: Receiver<()>,
metrics_rx: Receiver<MetricsReply>,
send_buffer: Arc<Mutex<VecDeque<Notification>>>,
cache: VecDeque<Notification>,
}
Expand All @@ -294,14 +286,12 @@ impl LinkRx {
connection_id: ConnectionId,
router_tx: Sender<(ConnectionId, Event)>,
router_rx: Receiver<()>,
metrics_rx: Receiver<MetricsReply>,
outgoing_data_buffer: Arc<Mutex<VecDeque<Notification>>>,
) -> LinkRx {
LinkRx {
connection_id,
router_tx,
router_rx,
metrics_rx,
send_buffer: outgoing_data_buffer,
cache: VecDeque::with_capacity(100),
}
Expand Down Expand Up @@ -376,12 +366,6 @@ impl LinkRx {
.await?;
Ok(())
}

pub fn metrics(&self) -> Option<MetricsReply> {
self.metrics_rx
.recv_deadline(Instant::now() + Duration::from_secs(1))
.ok()
}
}

#[cfg(test)]
Expand Down
68 changes: 68 additions & 0 deletions rumqttd/src/link/meters.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use crate::router::{Event, GetMeter, Meter};
use crate::ConnectionId;
use flume::{Receiver, RecvError, RecvTimeoutError, SendError, Sender, TrySendError};

#[derive(Debug, thiserror::Error)]
pub enum LinkError {
#[error("Channel try send error")]
TrySend(#[from] TrySendError<(ConnectionId, Event)>),
#[error("Channel send error")]
Send(#[from] SendError<(ConnectionId, Event)>),
#[error("Channel recv error")]
Recv(#[from] RecvError),
#[error("Channel timeout recv error")]
RecvTimeout(#[from] RecvTimeoutError),
#[error("Timeout = {0}")]
Elapsed(#[from] tokio::time::error::Elapsed),
}

pub struct MetersLink {
pub(crate) meter_id: ConnectionId,
router_tx: Sender<(ConnectionId, Event)>,
router_rx: Receiver<(ConnectionId, Meter)>,
}

impl MetersLink {
pub fn new(router_tx: Sender<(ConnectionId, Event)>) -> Result<MetersLink, LinkError> {
let (tx, rx) = flume::bounded(5);
router_tx.send((0, Event::NewMeter(tx)))?;
let (meter_id, _meter) = rx.recv()?;

let link = MetersLink {
meter_id,
router_tx,
router_rx: rx,
};

Ok(link)
}

pub async fn init(router_tx: Sender<(ConnectionId, Event)>) -> Result<MetersLink, LinkError> {
let (tx, rx) = flume::bounded(5);
router_tx.send((0, Event::NewMeter(tx)))?;
let (meter_id, _meter) = rx.recv_async().await?;

let link = MetersLink {
meter_id,
router_tx,
router_rx: rx,
};

Ok(link)
}

pub fn get(&self, meter: GetMeter) -> Result<Meter, LinkError> {
self.router_tx
.send((self.meter_id, Event::GetMeter(meter)))?;
let (_meter_id, meter) = self.router_rx.recv()?;
Ok(meter)
}

pub async fn fetch(&self, meter: GetMeter) -> Result<Meter, LinkError> {
self.router_tx
.send_async((self.meter_id, Event::GetMeter(meter)))
.await?;
let (_meter_id, meter) = self.router_rx.recv_async().await?;
Ok(meter)
}
}
1 change: 1 addition & 0 deletions rumqttd/src/link/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// pub mod bridge;
pub mod console;
pub mod local;
pub mod meters;
pub mod network;
pub mod remote;
#[cfg(feature = "websockets")]
Expand Down
4 changes: 2 additions & 2 deletions rumqttd/src/link/remote.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::link::local::{LinkError, LinkRx, LinkTx};
use crate::link::local::{Link, LinkError, LinkRx, LinkTx};
use crate::link::network;
use crate::link::network::Network;
use crate::protocol::{Connect, Packet, Protocol};
use crate::router::{Event, Notification};
use crate::{ConnectionId, ConnectionSettings, Link};
use crate::{ConnectionId, ConnectionSettings};

use flume::{RecvError, SendError, Sender, TrySendError};
use std::collections::VecDeque;
Expand Down
Loading

0 comments on commit f9d1268

Please sign in to comment.