Skip to content

Commit

Permalink
Added workspace, added v0.2.2 exampels, moved files
Browse files Browse the repository at this point in the history
  • Loading branch information
GunnarMorrigan committed Jan 21, 2024
1 parent cfe5225 commit 10c4516
Show file tree
Hide file tree
Showing 70 changed files with 1,388 additions and 864 deletions.
420 changes: 375 additions & 45 deletions Cargo.lock

Large diffs are not rendered by default.

89 changes: 5 additions & 84 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,85 +1,6 @@
[package]
name = "mqrstt"
version = "0.3.0-alpha.5"
homepage = "https://github.com/GunnarMorrigan/mqrstt"
repository = "https://github.com/GunnarMorrigan/mqrstt"
documentation = "https://docs.rs/mqrstt"
categories = ["network-programming"]
readme = "README.md"
edition = "2021"
license = "MPL-2.0"
keywords = ["MQTT", "IoT", "MQTTv5", "messaging", "client"]
description = "Pure rust MQTTv5 client implementation Smol and Tokio"
[workspace]

rust-version = "1.75"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
default = [
"smol",
"tokio"
]
sync = []
tokio = ["dep:tokio", "tokio/rt"]
smol = ["dep:smol"]
logs = ["dep:tracing"]
test = []

[dependencies]
# Packets
bytes = "1.5.0"

# Errors
thiserror = "1.0.53"
tracing = { version = "0.1.40", optional = true }

async-channel = "2.1.1"
#async-mutex = "1.4.0"
futures = { version = "0.3.30", default-features = false, features = [
"std",
"async-await",
] }

# quic feature flag
# quinn = {version = "0.9.0", optional = true }

# tokio feature flag
tokio = { version = "1.35.1", features = [
"macros",
"io-util",
"net",
"time",
], optional = true }

# smol feature flag
smol = { version = "2.0.0", optional = true }

[dev-dependencies]
criterion = { version = "0.5.1", features = ["async_tokio"] }

tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }

smol = { version = "2.0.0" }
tokio = { version = "1.33.0", features = [
"rt-multi-thread",
"rt",
"macros",
"sync",
"io-util",
"net",
"time",
] }

rustls = { version = "0.21.7" }
rustls-pemfile = { version = "1.0.3" }
webpki = { version = "0.22.4" }
async-rustls = { version = "0.4.1" }
tokio-rustls = "0.24.1"
rstest = "0.18.2"
rand = "0.8.5"


[[bench]]
name = "bench_main"
harness = false
members = [
"mqrstt",
"examples/*",
]
1 change: 1 addition & 0 deletions examples/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
**/Cargo.lock
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
[package]
name = "smol_tls"
name = "smol_tcp_v0_2_2"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
mqrstt = { path = "../..", default-features = false, features = ["smol"]}
mqrstt = { version = "0.2.2", default-features = false, features = ["smol"]}

smol = { version = "1.3.0" }
futures = { version = "0.3.27", default-features = false, features = ["std", "async-await"] }
Expand Down
72 changes: 72 additions & 0 deletions examples/smol_tcp_v0.2.2/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use std::{io::{BufReader, Cursor}, sync::Arc};

use async_trait::async_trait;
use mqrstt::{MqttClient, AsyncEventHandler, packets::{self, Packet}, ConnectOptions, new_smol, smol::NetworkStatus};
use rustls::{RootCertStore, OwnedTrustAnchor, ClientConfig, Certificate, ServerName};

pub struct PingPong {
pub client: MqttClient,
}

#[async_trait]
impl AsyncEventHandler for PingPong {
// Handlers only get INCOMING packets. This can change later.
async fn handle(&mut self, event: packets::Packet) -> () {
match event {
Packet::Publish(p) => {
if let Ok(payload) = String::from_utf8(p.payload.to_vec()) {
if payload.to_lowercase().contains("ping") {
self.client
.publish(
p.topic.clone(),
p.qos,
p.retain,
"pong",
)
.await
.unwrap();
println!("Received Ping, Send pong!");
}
}
},
Packet::ConnAck(_) => { println!("Connected!") },
_ => (),
}
}
}

fn main() {
smol::block_on(async {
let client_id = "SmolTls_MQrsTT_Example".to_string();
let options = ConnectOptions::new(client_id);

let address = "broker.emqx.io";
let port = 8883;

let (mut network, client) = new_smol(options);

let stream = smol::net::TcpStream::connect((address, port)).await.unwrap();

let mut pingpong = PingPong { client: client.clone() };

network.connect(stream, &mut pingpong).await.unwrap();

client.subscribe("mqrstt").await.unwrap();

let (n, _) = futures::join!(
async {
loop {
return match network.poll(&mut pingpong).await {
Ok(NetworkStatus::Active) => continue,
otherwise => otherwise,
};
}
},
async {
smol::Timer::after(std::time::Duration::from_secs(30)).await;
client.disconnect().await.unwrap();
}
);
assert!(n.is_ok());
});
}
19 changes: 19 additions & 0 deletions examples/smol_tls_v0.2.2/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "smol_tls_v0_2_2"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
mqrstt = { version = "0.2.2", default-features = false, features = ["smol"]}

smol = { version = "1.3.0" }
futures = { version = "0.3.27", default-features = false, features = ["std", "async-await"] }

async-trait = "0.1.68"

rustls = { version = "0.20.7" }
rustls-pemfile = { version = "1.0.1" }
webpki = { version = "0.22.0" }
async-rustls = { version = "0.3.0" }
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{io::{BufReader, Cursor}, sync::Arc};

use mqrstt::{MqttClient, AsyncEventHandler, packets::{self, Packet}, ConnectOptions, new_smol, NetworkStatus};
use async_trait::async_trait;
use mqrstt::{MqttClient, AsyncEventHandler, packets::{self, Packet}, ConnectOptions, new_smol, smol::NetworkStatus};
use rustls::{RootCertStore, OwnedTrustAnchor, ClientConfig, Certificate, ServerName};


Expand All @@ -11,6 +12,7 @@ pub struct PingPong {
pub client: MqttClient,
}

#[async_trait]
impl AsyncEventHandler for PingPong {
// Handlers only get INCOMING packets. This can change later.
async fn handle(&mut self, event: packets::Packet) -> () {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
[package]
name = "tokio_tls"
name = "tokio_tcp_v0_2_2"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
mqrstt = { path = "../..", default-features = false, features = ["tokio"]}
mqrstt = { version = "0.2.2", default-features = false, features = ["tokio"]}

tokio = { version = "1.26.0", features = ["rt-multi-thread", "rt", "macros", "sync", "io-util", "net", "time"] }
tokio-rustls = "0.24.0"
Expand Down
71 changes: 71 additions & 0 deletions examples/tokio_tcp_v0.2.2/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use std::{io::{BufReader, Cursor}, sync::Arc, time::Duration};

use async_trait::async_trait;
use mqrstt::{MqttClient, AsyncEventHandler, packets::{self, Packet}, ConnectOptions, tokio::NetworkStatus, new_tokio};
use tokio_rustls::rustls::{ClientConfig, RootCertStore, OwnedTrustAnchor, Certificate, ServerName};

pub struct PingPong {
pub client: MqttClient,
}

#[async_trait]
impl AsyncEventHandler for PingPong {
// Handlers only get INCOMING packets. This can change later.
async fn handle(&mut self, event: packets::Packet) -> () {
match event {
Packet::Publish(p) => {
if let Ok(payload) = String::from_utf8(p.payload.to_vec()) {
if payload.to_lowercase().contains("ping") {
self.client
.publish(
p.topic.clone(),
p.qos,
p.retain,
"pong",
)
.await
.unwrap();
println!("Received Ping, Send pong!");
}
}
},
Packet::ConnAck(_) => { println!("Connected!") },
_ => (),
}
}
}

#[tokio::main]
async fn main() {
let client_id = "TokioTls_MQrsTT_Example".to_string();
let options = ConnectOptions::new(client_id);

let address = "broker.emqx.io";
let port = 8883;

let (mut network, client) = new_tokio(options);

let stream = tokio::net::TcpStream::connect((address, port)).await.unwrap();

let mut pingpong = PingPong { client: client.clone() };

network.connect(stream, &mut pingpong).await.unwrap();

client.subscribe("mqrstt").await.unwrap();

let (n, _) = tokio::join!(
async {
loop {
return match network.poll(&mut pingpong).await {
Ok(NetworkStatus::Active) => continue,
otherwise => otherwise,
};
}
},
async {
tokio::time::sleep(Duration::from_secs(30)).await;
client.disconnect().await.unwrap();
}
);
assert!(n.is_ok());
}
18 changes: 18 additions & 0 deletions examples/tokio_tls_v0.2.2/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "tokio_tls_v0_2_2"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
mqrstt = { version = "0.2.2", default-features = false, features = ["tokio"]}

tokio = { version = "1.26.0", features = ["rt-multi-thread", "rt", "macros", "sync", "io-util", "net", "time"] }
tokio-rustls = "0.24.0"

async-trait = "0.1.68"

rustls = { version = "0.20.7" }
rustls-pemfile = { version = "1.0.1" }
webpki = { version = "0.22.0" }
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use std::{io::{BufReader, Cursor}, sync::Arc, time::Duration};

use mqrstt::{MqttClient, AsyncEventHandler, packets::{self, Packet}, ConnectOptions, NetworkStatus, new_tokio};
use async_trait::async_trait;
use mqrstt::{MqttClient, AsyncEventHandler, packets::{self, Packet}, ConnectOptions, tokio::NetworkStatus, new_tokio};
use tokio_rustls::rustls::{ClientConfig, RootCertStore, OwnedTrustAnchor, Certificate, ServerName};


pub const EMQX_CERT: &[u8] = include_bytes!("broker.emqx.io-ca.crt");


pub struct PingPong {
pub client: MqttClient,
}

#[async_trait]
impl AsyncEventHandler for PingPong {
// Handlers only get INCOMING packets. This can change later.
async fn handle(&mut self, event: packets::Packet) -> () {
Expand Down Expand Up @@ -42,7 +45,7 @@ pub enum PrivateKey {
ECC(Vec<u8>),
}

pub fn simple_rust_tls(ca: Vec<u8>, alpn: Option<Vec<Vec<u8>>>, client_auth: Option<(Vec<u8>, PrivateKey)>) -> Result<Arc<ClientConfig>, tokio_rustls::rustls::Error> {
pub fn simple_rust_tls(ca: Vec<u8>, alpn: Option<Vec<Vec<u8>>>, client_auth: Option<(Vec<u8>, PrivateKey)>) -> Result<Arc<ClientConfig>, rustls::Error> {
let mut root_cert_store = RootCertStore::empty();

let ca_certs = rustls_pemfile::certs(&mut BufReader::new(Cursor::new(ca))).unwrap();
Expand Down Expand Up @@ -73,7 +76,7 @@ pub fn simple_rust_tls(ca: Vec<u8>, alpn: Option<Vec<Vec<u8>>>, client_auth: Opt
let client_certs = rustls_pemfile::certs(&mut BufReader::new(Cursor::new(client_cert_info))).unwrap();
let client_cert_chain = client_certs.into_iter().map(Certificate).collect();

config.with_single_cert(client_cert_chain, tokio_rustls::rustls::PrivateKey(key))?
config.with_single_cert(client_cert_chain, tokio_rustls::rustls::PrivateKey(key)).unwrap()
}
None => config.with_no_client_auth(),
};
Expand Down
Loading

0 comments on commit 10c4516

Please sign in to comment.