Skip to content

Commit

Permalink
MQTT client examples
Browse files Browse the repository at this point in the history
  • Loading branch information
ivmarkov committed Jan 23, 2024
1 parent 0d2d0e9 commit e54bc21
Show file tree
Hide file tree
Showing 3 changed files with 243 additions and 0 deletions.
112 changes: 112 additions & 0 deletions examples/mqtt_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
use core::time::Duration;

use esp_idf_svc::eventloop::EspSystemEventLoop;
use esp_idf_svc::hal::peripherals::Peripherals;
use esp_idf_svc::mqtt::client::*;
use esp_idf_svc::nvs::EspDefaultNvsPartition;
use esp_idf_svc::sys::EspError;
use esp_idf_svc::wifi::*;

use log::*;

const SSID: &str = env!("WIFI_SSID");
const PASSWORD: &str = env!("WIFI_PASS");

const MQTT_URL: &str = "mqtt://broker.emqx.io:1883";
const MQTT_CLIENT_ID: &str = "esp-mqtt-demo";
const MQTT_TOPIC: &str = "esp-mqtt-demo";

fn main() -> Result<(), EspError> {
esp_idf_svc::sys::link_patches();
esp_idf_svc::log::EspLogger::initialize_default();

let _wifi = wifi_create()?;

let (mut client, mut conn) = mqtt_create(MQTT_URL, MQTT_CLIENT_ID)?;

run(&mut client, &mut conn, MQTT_TOPIC)
}

fn run(
client: &mut EspMqttClient<'_>,
connection: &mut EspMqttConnection,
topic: &str,
) -> Result<(), EspError> {
std::thread::scope(|s| {
info!("About to start the MQTT client");

info!("MQTT client started");

s.spawn(move || {
info!("MQTT Listening for messages");

while let Ok(event) = connection.next() {
info!("[Queue] Event: {}", event.payload());
}

info!("Connection closed");
});

client.subscribe(topic, QoS::AtMostOnce)?;

info!("Subscribed to topic \"{topic}\"");

// Just to give a chance of our connection to get even the first published message
std::thread::sleep(Duration::from_millis(500));

let payload = "Hello from esp-mqtt-demo!";

loop {
client.enqueue(topic, QoS::AtMostOnce, false, payload.as_bytes())?;

info!("Published \"{payload}\" to topic \"{topic}\"");

let sleep_secs = 2;

info!("Now sleeping for {sleep_secs}s...");
std::thread::sleep(Duration::from_secs(sleep_secs));
}
})
}

fn mqtt_create(
url: &str,
client_id: &str,
) -> Result<(EspMqttClient<'static>, EspMqttConnection), EspError> {
let (mqtt_client, mqtt_conn) = EspMqttClient::new_with_conn(
url,
&MqttClientConfiguration {
client_id: Some(client_id),
..Default::default()
},
)?;

Ok((mqtt_client, mqtt_conn))
}

fn wifi_create() -> Result<EspWifi<'static>, EspError> {
let peripherals = Peripherals::take()?;

let sys_loop = EspSystemEventLoop::take()?;
let nvs = EspDefaultNvsPartition::take()?;

let mut esp_wifi = EspWifi::new(peripherals.modem, sys_loop.clone(), Some(nvs))?;

let mut wifi = BlockingWifi::wrap(&mut esp_wifi, sys_loop)?;

wifi.set_configuration(&Configuration::Client(ClientConfiguration {
ssid: SSID.try_into().unwrap(),
password: PASSWORD.try_into().unwrap(),
..Default::default()
}))?;

wifi.start()?;
wifi.wait_netif_up()?;

info!(
"Created Wi-Fi with WIFI_SSID `{}` and WIFI_PASS `{}`",
SSID, PASSWORD
);

Ok(esp_wifi)
}
129 changes: 129 additions & 0 deletions examples/mqtt_client_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
use core::time::Duration;

use embassy_futures::select::{select, Either};

use esp_idf_svc::eventloop::EspSystemEventLoop;
use esp_idf_svc::hal::peripherals::Peripherals;
use esp_idf_svc::mqtt::client::*;
use esp_idf_svc::nvs::EspDefaultNvsPartition;
use esp_idf_svc::sys::EspError;
use esp_idf_svc::timer::{EspAsyncTimer, EspTaskTimerService, EspTimerService};
use esp_idf_svc::wifi::*;

use log::*;

const SSID: &str = env!("WIFI_SSID");
const PASSWORD: &str = env!("WIFI_PASS");

const MQTT_URL: &str = "mqtt://broker.emqx.io:1883";
const MQTT_CLIENT_ID: &str = "esp-mqtt-demo";
const MQTT_TOPIC: &str = "esp-mqtt-demo";

fn main() -> Result<(), EspError> {
esp_idf_svc::sys::link_patches();
esp_idf_svc::log::EspLogger::initialize_default();

esp_idf_svc::hal::task::block_on(async {
let timer_service = EspTimerService::new()?;
let _wifi = wifi_create(&timer_service).await?;

let (mut client, mut conn) = mqtt_create(MQTT_URL, MQTT_CLIENT_ID)?;

let mut timer = timer_service.timer_async()?;
run(&mut client, &mut conn, &mut timer, MQTT_TOPIC).await
})
}

async fn run(
client: &mut EspMqttClient<'_>,
connection: &mut EspMqttConnection,
timer: &mut EspAsyncTimer,
topic: &str,
) -> Result<(), EspError> {
info!("About to start the MQTT client");

info!("MQTT client started");

let res = select(
async move {
info!("MQTT Listening for messages");

while let Ok(event) = connection.next_async().await {
info!("[Queue] Event: {}", event.payload());
}

info!("Connection closed");

Ok(())
},
async move {
client.subscribe(topic, QoS::AtMostOnce)?;

info!("Subscribed to topic \"{topic}\"");

// Just to give a chance of our connection to get even the first published message
timer.after(Duration::from_millis(500)).await?;

let payload = "Hello from esp-mqtt-demo!";

loop {
client.enqueue(topic, QoS::AtMostOnce, false, payload.as_bytes())?;

info!("Published \"{payload}\" to topic \"{topic}\"");

let sleep_secs = 2;

info!("Now sleeping for {sleep_secs}s...");
timer.after(Duration::from_secs(sleep_secs)).await?;
}
},
)
.await;

match res {
Either::First(res) => res,
Either::Second(res) => res,
}
}

fn mqtt_create(
url: &str,
client_id: &str,
) -> Result<(EspMqttClient<'static>, EspMqttConnection), EspError> {
let (mqtt_client, mqtt_conn) = EspMqttClient::new_with_conn(
url,
&MqttClientConfiguration {
client_id: Some(client_id),
..Default::default()
},
)?;

Ok((mqtt_client, mqtt_conn))
}

async fn wifi_create(timer_service: &EspTaskTimerService) -> Result<EspWifi<'static>, EspError> {
let peripherals = Peripherals::take()?;

let sys_loop = EspSystemEventLoop::take()?;
let nvs = EspDefaultNvsPartition::take()?;

let mut esp_wifi = EspWifi::new(peripherals.modem, sys_loop.clone(), Some(nvs))?;

let mut wifi = AsyncWifi::wrap(&mut esp_wifi, sys_loop, timer_service.clone())?;

wifi.set_configuration(&Configuration::Client(ClientConfiguration {
ssid: SSID.try_into().unwrap(),
password: PASSWORD.try_into().unwrap(),
..Default::default()
}))?;

wifi.start().await?;
wifi.wait_netif_up().await?;

info!(
"Created Wi-Fi with WIFI_SSID `{}` and WIFI_PASS `{}`",
SSID, PASSWORD
);

Ok(esp_wifi)
}
2 changes: 2 additions & 0 deletions examples/ws_guessing_game.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use std::{borrow::Cow, collections::BTreeMap, str, sync::Mutex};

const SSID: &str = env!("WIFI_SSID");
const PASSWORD: &str = env!("WIFI_PASS");

static INDEX_HTML: &str = include_str!("ws_guessing_game.html");

// Max payload length
Expand Down Expand Up @@ -122,6 +123,7 @@ fn nth(n: u32) -> Cow<'static, str> {
fn main() -> anyhow::Result<()> {
esp_idf_svc::sys::link_patches();
esp_idf_svc::log::EspLogger::initialize_default();

let mut server = create_server()?;

server.fn_handler("/", Method::Get, |req| {
Expand Down

0 comments on commit e54bc21

Please sign in to comment.