Skip to content

Commit

Permalink
docs: better explain client surfaces (#380)
Browse files Browse the repository at this point in the history
* Improve description of client interfaces
* Add links in doc comments to associated components

Attribute: EdJoPaTo <[email protected]>, Devdutt Shenoi <[email protected]>, Vivek R <[email protected]>
  • Loading branch information
123vivekr authored Aug 11, 2022
1 parent e44e265 commit c2090fb
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 32 deletions.
2 changes: 2 additions & 0 deletions rumqttc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ Quick overview of features
- Queue size based flow control on outgoing packets
- Automatic reconnections by just continuing the `eventloop.poll()/connection.iter()` loop
- Natural backpressure to client APIs during bad network
- Support for WebSockets
- Secure transport using TLS

In short, everything necessary to maintain a robust connection

Expand Down
75 changes: 45 additions & 30 deletions rumqttc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,22 @@ impl From<TrySendError<Request>> for ClientError {
}
}

/// `AsyncClient` to communicate with MQTT `Eventloop`
/// This is cloneable and can be used to asynchronously Publish, Subscribe.
/// An asynchronous client, communicates with MQTT `EventLoop`.
///
/// This is cloneable and can be used to asynchronously [`publish`](`AsyncClient::publish`),
/// [`subscribe`](`AsynClient::subscribe`) through the `EventLoop`, which is to be polled parallelly.
///
/// **NOTE**: The `EventLoop` must be regularly polled in order to send, receive and process packets
/// from the broker, i.e. move ahead.
#[derive(Clone, Debug)]
pub struct AsyncClient {
request_tx: Sender<Request>,
}

impl AsyncClient {
/// Create a new `AsyncClient`
/// Create a new `AsyncClient`.
///
/// `cap` specifies the capacity of the bounded async channel.
pub fn new(options: MqttOptions, cap: usize) -> (AsyncClient, EventLoop) {
let eventloop = EventLoop::new(options, cap);
let request_tx = eventloop.handle();
Expand All @@ -54,7 +61,7 @@ impl AsyncClient {
AsyncClient { request_tx }
}

/// Sends a MQTT Publish to the eventloop
/// Sends a MQTT Publish to the `EventLoop`.
pub async fn publish<S, V>(
&self,
topic: S,
Expand All @@ -73,7 +80,7 @@ impl AsyncClient {
Ok(())
}

/// Sends a MQTT Publish to the eventloop
/// Attempts to send a MQTT Publish to the `EventLoop`.
pub fn try_publish<S, V>(
&self,
topic: S,
Expand All @@ -92,7 +99,7 @@ impl AsyncClient {
Ok(())
}

/// Sends a MQTT PubAck to the eventloop. Only needed in if `manual_acks` flag is set.
/// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub async fn ack(&self, publish: &Publish) -> Result<(), ClientError> {
let ack = get_ack_req(publish);

Expand All @@ -102,7 +109,7 @@ impl AsyncClient {
Ok(())
}

/// Sends a MQTT PubAck to the eventloop. Only needed in if `manual_acks` flag is set.
/// Attempts to send a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub fn try_ack(&self, publish: &Publish) -> Result<(), ClientError> {
let ack = get_ack_req(publish);
if let Some(ack) = ack {
Expand All @@ -111,7 +118,7 @@ impl AsyncClient {
Ok(())
}

/// Sends a MQTT Publish to the eventloop
/// Sends a MQTT Publish to the `EventLoop`
pub async fn publish_bytes<S>(
&self,
topic: S,
Expand All @@ -129,23 +136,23 @@ impl AsyncClient {
Ok(())
}

/// Sends a MQTT Subscribe to the eventloop
/// Sends a MQTT Subscribe to the `EventLoop`
pub async fn subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
let subscribe = Subscribe::new(topic.into(), qos);
let request = Request::Subscribe(subscribe);
self.request_tx.send_async(request).await?;
Ok(())
}

/// Sends a MQTT Subscribe to the eventloop
/// Attempts to send a MQTT Subscribe to the `EventLoop`
pub fn try_subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
let subscribe = Subscribe::new(topic.into(), qos);
let request = Request::Subscribe(subscribe);
self.request_tx.try_send(request)?;
Ok(())
}

/// Sends a MQTT Subscribe for multiple topics to the eventloop
/// Sends a MQTT Subscribe for multiple topics to the `EventLoop`
pub async fn subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
where
T: IntoIterator<Item = SubscribeFilter>,
Expand All @@ -156,7 +163,7 @@ impl AsyncClient {
Ok(())
}

/// Sends a MQTT Subscribe for multiple topics to the eventloop
/// Attempts to send a MQTT Subscribe for multiple topics to the `EventLoop`
pub fn try_subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
where
T: IntoIterator<Item = SubscribeFilter>,
Expand All @@ -167,30 +174,30 @@ impl AsyncClient {
Ok(())
}

/// Sends a MQTT Unsubscribe to the eventloop
/// Sends a MQTT Unsubscribe to the `EventLoop`
pub async fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
let unsubscribe = Unsubscribe::new(topic.into());
let request = Request::Unsubscribe(unsubscribe);
self.request_tx.send_async(request).await?;
Ok(())
}

/// Sends a MQTT Unsubscribe to the eventloop
/// Attempts to send a MQTT Unsubscribe to the `EventLoop`
pub fn try_unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
let unsubscribe = Unsubscribe::new(topic.into());
let request = Request::Unsubscribe(unsubscribe);
self.request_tx.try_send(request)?;
Ok(())
}

/// Sends a MQTT disconnect to the eventloop
/// Sends a MQTT disconnect to the `EventLoop`
pub async fn disconnect(&self) -> Result<(), ClientError> {
let request = Request::Disconnect;
self.request_tx.send_async(request).await?;
Ok(())
}

/// Sends a MQTT disconnect to the eventloop
/// Attempts to send a MQTT disconnect to the `EventLoop`
pub fn try_disconnect(&self) -> Result<(), ClientError> {
let request = Request::Disconnect;
self.request_tx.try_send(request)?;
Expand All @@ -207,17 +214,25 @@ fn get_ack_req(publish: &Publish) -> Option<Request> {
Some(ack)
}

/// `Client` to communicate with MQTT eventloop `Connection`.
/// A synchronous client, communicates with MQTT `EventLoop`.
///
/// This is cloneable and can be used to synchronously [`publish`](`AsyncClient::publish`),
/// [`subscribe`](`AsynClient::subscribe`) through the `EventLoop`/`Connection`, which is to be polled in parallel
/// by iterating over the object returned by [`Connection.iter()`](Connection::iter) in a separate thread.
///
/// Client is cloneable and can be used to synchronously Publish, Subscribe.
/// Asynchronous channel handle can also be extracted if necessary
/// **NOTE**: The `EventLoop`/`Connection` must be regularly polled(`.next()` in case of `Connection`) in order
/// to send, receive and process packets from the broker, i.e. move ahead.
///
/// An asynchronous channel handle can also be extracted if necessary.
#[derive(Clone)]
pub struct Client {
client: AsyncClient,
}

impl Client {
/// Create a new `Client`
///
/// `cap` specifies the capacity of the bounded async channel.
pub fn new(options: MqttOptions, cap: usize) -> (Client, Connection) {
let (client, eventloop) = AsyncClient::new(options, cap);
let client = Client { client };
Expand All @@ -230,7 +245,7 @@ impl Client {
(client, connection)
}

/// Sends a MQTT Publish to the eventloop
/// Sends a MQTT Publish to the `EventLoop`
pub fn publish<S, V>(
&mut self,
topic: S,
Expand Down Expand Up @@ -261,25 +276,25 @@ impl Client {
Ok(())
}

/// Sends a MQTT PubAck to the eventloop. Only needed in if `manual_acks` flag is set.
/// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub fn ack(&self, publish: &Publish) -> Result<(), ClientError> {
pollster::block_on(self.client.ack(publish))?;
Ok(())
}

/// Sends a MQTT PubAck to the eventloop. Only needed in if `manual_acks` flag is set.
/// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub fn try_ack(&self, publish: &Publish) -> Result<(), ClientError> {
self.client.try_ack(publish)?;
Ok(())
}

/// Sends a MQTT Subscribe to the eventloop
/// Sends a MQTT Subscribe to the `EventLoop`
pub fn subscribe<S: Into<String>>(&mut self, topic: S, qos: QoS) -> Result<(), ClientError> {
pollster::block_on(self.client.subscribe(topic, qos))?;
Ok(())
}

/// Sends a MQTT Subscribe to the eventloop
/// Sends a MQTT Subscribe to the `EventLoop`
pub fn try_subscribe<S: Into<String>>(
&mut self,
topic: S,
Expand All @@ -289,7 +304,7 @@ impl Client {
Ok(())
}

/// Sends a MQTT Subscribe for multiple topics to the eventloop
/// Sends a MQTT Subscribe for multiple topics to the `EventLoop`
pub fn subscribe_many<T>(&mut self, topics: T) -> Result<(), ClientError>
where
T: IntoIterator<Item = SubscribeFilter>,
Expand All @@ -304,25 +319,25 @@ impl Client {
self.client.try_subscribe_many(topics)
}

/// Sends a MQTT Unsubscribe to the eventloop
/// Sends a MQTT Unsubscribe to the `EventLoop`
pub fn unsubscribe<S: Into<String>>(&mut self, topic: S) -> Result<(), ClientError> {
pollster::block_on(self.client.unsubscribe(topic))?;
Ok(())
}

/// Sends a MQTT Unsubscribe to the eventloop
/// Sends a MQTT Unsubscribe to the `EventLoop`
pub fn try_unsubscribe<S: Into<String>>(&mut self, topic: S) -> Result<(), ClientError> {
self.client.try_unsubscribe(topic)?;
Ok(())
}

/// Sends a MQTT disconnect to the eventloop
/// Sends a MQTT disconnect to the `EventLoop`
pub fn disconnect(&mut self) -> Result<(), ClientError> {
pollster::block_on(self.client.disconnect())?;
Ok(())
}

/// Sends a MQTT disconnect to the eventloop
/// Sends a MQTT disconnect to the `EventLoop`
pub fn try_disconnect(&mut self) -> Result<(), ClientError> {
self.client.try_disconnect()?;
Ok(())
Expand Down Expand Up @@ -357,7 +372,7 @@ impl Connection {
}
}

/// Iterator which polls the eventloop for connection progress
/// Iterator which polls the `EventLoop` for connection progress
pub struct Iter<'a> {
connection: &'a mut Connection,
runtime: runtime::Runtime,
Expand Down
4 changes: 3 additions & 1 deletion rumqttc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ impl From<Unsubscribe> for Request {
}
}

/// Transport methods. Defaults to TCP.
#[derive(Clone)]
pub enum Transport {
Tcp,
Expand Down Expand Up @@ -279,6 +280,7 @@ impl Transport {
}
}

/// TLS configuration method
#[derive(Clone)]
#[cfg(feature = "use-rustls")]
pub enum TlsConfiguration {
Expand All @@ -304,7 +306,7 @@ impl From<ClientConfig> for TlsConfiguration {
// TODO: Should all the options be exposed as public? Drawback
// would be loosing the ability to panic when the user options
// are wrong (e.g empty client id) or aggressive (keep alive time)
/// Options to configure the behaviour of mqtt connection
/// Options to configure the behaviour of MQTT connection
#[derive(Clone)]
pub struct MqttOptions {
/// broker address that you want to connect to
Expand Down
9 changes: 8 additions & 1 deletion rumqttc/src/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,26 @@ use std::io::{BufReader, Cursor};
use std::net::AddrParseError;
use std::sync::Arc;

/// TLS backend error
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// Error parsing IP address
#[error("Addr")]
Addr(#[from] AddrParseError),
/// I/O related error
#[error("I/O: {0}")]
Io(#[from] io::Error),
/// Certificate/Name validation error
#[error("Web Pki: {0}")]
WebPki(#[from] webpki::Error),
/// Invalid DNS name
#[error("DNS name")]
DNSName(#[from] InvalidDnsNameError),
/// Error from rustls module
#[error("TLS error: {0}")]
TLS(#[from] rustls::Error),
#[error("No valid cert in chain")]
/// No valid certificate in chain
#[error("No valid certificate in chain")]
NoValidCertInChain,
}

Expand Down

0 comments on commit c2090fb

Please sign in to comment.