diff --git a/.gitignore b/.gitignore index b83d222..42288c7 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ /target/ +config +session diff --git a/Cargo.lock b/Cargo.lock index 5ff6526..9685469 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -585,11 +585,8 @@ name = "joke_bot" version = "0.1.0" dependencies = [ "futures-util", - "http", "http-body-util", "hyper", - "hyper-tls", - "hyper-util", "ruma", "ruma-client", "serde_json", diff --git a/hello_world/README.md b/hello_world/README.md index 67e1e02..963bcc9 100644 --- a/hello_world/README.md +++ b/hello_world/README.md @@ -1,5 +1,5 @@ -A simple example to demonstrate `ruma-client` functionality. Sends "Hello -World!" to the given room. +A simple example to demonstrate `ruma-client` functionality. Joins the given +room and sends "Hello World!". # Usage diff --git a/hello_world/src/main.rs b/hello_world/src/main.rs index 125f698..79f513a 100644 --- a/hello_world/src/main.rs +++ b/hello_world/src/main.rs @@ -1,8 +1,10 @@ use std::{env, process::exit}; use ruma::{ - OwnedRoomAliasId, TransactionId, - api::client::{alias::get_alias, membership::join_room_by_id, message::send_message_event}, + OwnedRoomId, OwnedRoomOrAliasId, TransactionId, + api::client::{ + alias::get_alias, membership::join_room_by_id_or_alias, message::send_message_event, + }, events::room::message::RoomMessageEventContent, }; @@ -12,8 +14,9 @@ async fn hello_world( homeserver_url: String, username: &str, password: &str, - room_alias: OwnedRoomAliasId, + room_id_or_alias: OwnedRoomOrAliasId, ) -> anyhow::Result<()> { + // Construct and log in the client. let client = ruma_client::Client::builder() .homeserver_url(homeserver_url) .build::() @@ -22,13 +25,25 @@ async fn hello_world( .log_in(username, password, None, Some("ruma-example-client")) .await?; - let room_id = client - .send_request(get_alias::v3::Request::new(room_alias)) - .await? - .room_id; + // Join the room. client - .send_request(join_room_by_id::v3::Request::new(room_id.clone())) + .send_request(join_room_by_id_or_alias::v3::Request::new( + room_id_or_alias.clone(), + )) .await?; + + // Resolve the room alias to a room ID if necessary. + let room_id = match OwnedRoomId::try_from(room_id_or_alias) { + Ok(room_id) => room_id, + Err(room_alias) => { + client + .send_request(get_alias::v3::Request::new(room_alias)) + .await? + .room_id + } + }; + + // Send the message. client .send_request(send_message_event::v3::Request::new( room_id, diff --git a/joke_bot/Cargo.toml b/joke_bot/Cargo.toml index 211a30c..0f04f9f 100644 --- a/joke_bot/Cargo.toml +++ b/joke_bot/Cargo.toml @@ -7,20 +7,10 @@ publish = false [dependencies] ruma = { version = "0.13.0", features = ["client-api-c", "rand"] } ruma-client = { version = "0.16.0", features = ["client-api", "hyper-native-tls"] } -# For building locally: use the git dependencies below. -# -# Browse the source at this revision here: https://github.com/ruma/ruma/tree/bb825c7280a4b49302e74ecd89e8a4c5c37e9a36 -# ruma = { git = "https://github.com/ruma/ruma", rev = "bb825c7280a4b49302e74ecd89e8a4c5c37e9a36", features = ["client-api-c", "rand"] } -# -# Browse the source at this revision here: https://github.com/ruma/ruma-client/tree/464eda0e196a9a43c85db12734693e0c99214e69 -# ruma-client = { git = "https://github.com/ruma/ruma-client", rev = "464eda0e196a9a43c85db12734693e0c99214e69", features = ["client-api", "hyper-native-tls"] } futures-util = { version = "0.3.21", default-features = false, features = ["std"] } -http = "1.1.0" http-body-util = "0.1.1" hyper = "1.3.1" -hyper-tls = "0.6.0" -hyper-util = { version = "0.1.3", features = ["client-legacy", "http1", "http2", "tokio"] } serde_json = "1.0" tokio = { version = "1", features = ["full"] } tokio-stream = "0.1.7" diff --git a/joke_bot/README.md b/joke_bot/README.md index 2f7054c..355852d 100644 --- a/joke_bot/README.md +++ b/joke_bot/README.md @@ -1,11 +1,5 @@ A simple bot to demonstrate `ruma-client` functionality. Tells jokes when you ask for them. -# Note on dependency versions - -This example was written against pre-release versions of `ruma` and -`ruma-client-api`. Check the comments in the `[dependencies]` section of -[`Cargo.toml`](Cargo.toml) for more information. - # Usage Create a file called `config` and populate it with the following values in `key=value` format: diff --git a/joke_bot/src/main.rs b/joke_bot/src/main.rs index 561f3be..a4826ff 100644 --- a/joke_bot/src/main.rs +++ b/joke_bot/src/main.rs @@ -1,10 +1,9 @@ -use std::{error::Error, io, process::exit, time::Duration}; +use std::{error::Error, io, sync::LazyLock, time::Duration}; use futures_util::future::{join, join_all}; use http_body_util::BodyExt as _; -use hyper_util::rt::TokioExecutor; use ruma::{ - OwnedRoomId, OwnedUserId, TransactionId, UserId, + OwnedRoomId, OwnedUserId, TransactionId, api::client::{ filter::FilterDefinition, membership::join_room_by_id, message::send_message_event, sync::sync_events, @@ -17,124 +16,101 @@ use ruma::{ presence::PresenceState, serde::Raw, }; +use ruma_client::DefaultConstructibleHttpClient as _; use serde_json::Value as JsonValue; use tokio::fs; use tokio_stream::StreamExt as _; #[tokio::main] async fn main() -> Result<(), Box> { - if let Err(e) = run().await { - eprintln!("{e}"); - exit(1) - } + let bot = Bot::build().await?; + bot.run().await?; + Ok(()) } +/// The URI used to request a new joke. +static JOKE_API_URI: LazyLock = LazyLock::new(|| { + "https://v2.jokeapi.dev/joke/Programming,Pun,Misc?safe-mode&type=single" + .parse() + .expect("URI should be valid") +}); + type HttpClient = ruma_client::http_client::HyperNativeTls; -type MatrixClient = ruma_client::Client; - -async fn run() -> Result<(), Box> { - let config = read_config() - .await - .map_err(|e| format!("configuration in ./config is invalid: {e}"))?; - let http_client = hyper_util::client::legacy::Client::builder(TokioExecutor::new()) - .build(hyper_tls::HttpsConnector::new()); - let matrix_client = if let Some(state) = read_state().await.ok().flatten() { - ruma_client::Client::builder() - .homeserver_url(config.homeserver.clone()) - .access_token(Some(state.access_token)) - .http_client(http_client.clone()) - .await? - } else if config.password.is_some() { - let client = create_matrix_session(http_client.clone(), &config).await?; - - if let Err(err) = write_state(&State { - access_token: client - .access_token() - .expect("Matrix access token is missing"), - }) - .await - { - eprintln!( - "Failed to persist access token to disk. \ - Re-authentication will be required on the next startup: {err}", - ); - } - client - } else { - return Err("No previous session found and no credentials stored in config".into()); - }; - - let filter = FilterDefinition::ignore_all().into(); - let initial_sync_response = matrix_client - .send_request(assign!(sync_events::v3::Request::new(), { - filter: Some(filter), - })) - .await?; - let user_id = &config.username; - let not_senders = vec![user_id.clone()]; - let filter = { - let mut filter = FilterDefinition::empty(); - filter.room.timeline.not_senders = not_senders; - filter - } - .into(); - - let mut sync_stream = Box::pin(matrix_client.sync( - Some(filter), - initial_sync_response.next_batch, - PresenceState::Online, - Some(Duration::from_secs(30)), - )); - - // Prevent the clients being moved by `async move` blocks - let http_client = &http_client; - let matrix_client = &matrix_client; - - println!("Listening..."); - while let Some(response) = sync_stream.try_next().await? { - let message_futures = response - .rooms - .join - .iter() - .map(|(room_id, room_info)| async move { - // Use a regular for loop for the messages within one room to handle them sequentially - for e in &room_info.timeline.events { - if let Err(err) = - handle_message(http_client, matrix_client, e, room_id.to_owned(), user_id) - .await - { - eprintln!("failed to respond to message: {err}"); - } - } - }); +type MatrixClient = ruma_client::Client; + +/// The bot. +struct Bot { + /// The client to use to make HTTP requests outside of the Matrix API. + http_client: HttpClient, + /// The client to use to make requests against the Matrix API. + matrix_client: MatrixClient, + /// The user ID of the Matrix account used by the bot. + user_id: OwnedUserId, +} + +impl Bot { + /// Build the `Bot` from the config. + async fn build() -> Result> { + let config = Config::read() + .await + .map_err(|e| format!("configuration in ./config is invalid: {e}"))?; + let http_client = HttpClient::default(); - let invite_futures = response.rooms.invite.into_keys().map(|room_id| async move { - if let Err(err) = handle_invitations(http_client, matrix_client, room_id.clone()).await - { - eprintln!("failed to accept invitation for room {room_id}: {err}"); + let matrix_client = if let Some(state) = State::read().await.ok().flatten() { + ruma_client::Client::builder() + .homeserver_url(config.homeserver) + .access_token(Some(state.access_token)) + .http_client(http_client.clone()) + .await? + } else if config.password.is_some() { + let client = Self::create_matrix_session(http_client.clone(), &config).await?; + let state = State { + access_token: client + .access_token() + .expect("Matrix access token is missing"), + }; + + if let Err(err) = state.write().await { + eprintln!( + "Failed to persist access token to disk. \ + Re-authentication will be required on the next startup: {err}", + ); } - }); + client + } else { + return Err("No previous session found and no credentials stored in config".into()); + }; - // Handle messages from different rooms as well as invites concurrently - join(join_all(message_futures), join_all(invite_futures)).await; + Ok(Self { + http_client, + matrix_client, + user_id: config.username, + }) } - Ok(()) -} - -async fn create_matrix_session( - http_client: HttpClient, - config: &Config, -) -> Result> { - if let Some(password) = &config.password { + /// Create a new matrix session, aka log in. + /// + /// This methods panics if the password is not set in the config. + async fn create_matrix_session( + http_client: HttpClient, + config: &Config, + ) -> Result> { let client = ruma_client::Client::builder() .homeserver_url(config.homeserver.clone()) .http_client(http_client) .await?; if let Err(e) = client - .log_in(config.username.as_ref(), password, None, None) + .log_in( + config.username.as_ref(), + config + .password + .as_deref() + .expect("we should have already checked that the password is set"), + None, + None, + ) .await { let reason = match e { @@ -153,144 +129,217 @@ async fn create_matrix_session( } Ok(client) - } else { - Err("Failed to create session: no password stored in config" - .to_owned() - .into()) } -} -async fn handle_message( - http_client: &HttpClient, - matrix_client: &MatrixClient, - ev: &Raw, - room_id: OwnedRoomId, - bot_user_id: &UserId, -) -> Result<(), Box> { - if let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage( - SyncMessageLikeEvent::Original(m), - ))) = ev.deserialize() - // workaround because Conduit does not implement filtering - && m.sender != bot_user_id - && let MessageType::Text(t) = m.content.msgtype - { + /// Run the bot. + async fn run(&self) -> Result<(), Box> { + // Perform an initial sync to ignore messages before the bot was launched. + let filter = FilterDefinition::ignore_all().into(); + let initial_sync_response = self + .matrix_client + .send_request(assign!(sync_events::v3::Request::new(), { + filter: Some(filter), + })) + .await?; + + // Ignore events from our bot. + let not_senders = vec![self.user_id.clone()]; + let filter = { + let mut filter = FilterDefinition::empty(); + filter.room.timeline.not_senders = not_senders; + filter + } + .into(); + + // Launch a sync loop to listen to messages and invites. + let mut sync_stream = Box::pin(self.matrix_client.sync( + Some(filter), + initial_sync_response.next_batch, + PresenceState::Online, + Some(Duration::from_secs(30)), + )); + + println!("Listening..."); + while let Some(response) = sync_stream.try_next().await? { + let message_futures = + response + .rooms + .join + .iter() + .map(|(room_id, room_info)| async move { + // Use a regular for loop for the messages within one room to handle them sequentially + for e in &room_info.timeline.events { + if let Err(err) = self.handle_message(e, room_id.to_owned()).await { + eprintln!("failed to respond to message: {err}"); + } + } + }); + + let invite_futures = response.rooms.invite.into_keys().map(|room_id| async move { + if let Err(err) = self.handle_invitations(room_id.clone()).await { + eprintln!("failed to accept invitation for room {room_id}: {err}"); + } + }); + + // Handle messages from different rooms as well as invites concurrently + join(join_all(message_futures), join_all(invite_futures)).await; + } + + Ok(()) + } + + /// Handle the given message from the given room. + async fn handle_message( + &self, + ev: &Raw, + room_id: OwnedRoomId, + ) -> Result<(), Box> { + // We are only interested in text messages that contain the word "joke". + let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage( + SyncMessageLikeEvent::Original(m), + ))) = ev.deserialize() + else { + return Ok(()); + }; + let MessageType::Text(t) = m.content.msgtype else { + return Ok(()); + }; + println!("{}:\t{}", m.sender, t.body); - if t.body.to_ascii_lowercase().contains("joke") { - let joke = match get_joke(http_client).await { - Ok(joke) => joke, - Err(_) => "I thought of a joke... but I just forgot it.".to_owned(), - }; - let joke_content = RoomMessageEventContent::text_plain(joke); - let txn_id = TransactionId::new(); - let req = - send_message_event::v3::Request::new(room_id.to_owned(), txn_id, &joke_content)?; - // Do nothing if we can't send the message. - let _ = matrix_client.send_request(req).await; + if !t.body.to_ascii_lowercase().contains("joke") { + return Ok(()); } + + let joke = self + .get_joke() + .await + .unwrap_or_else(|_| "I thought of a joke... but I just forgot it.".to_owned()); + let joke_content = RoomMessageEventContent::notice_plain(joke); + + let txn_id = TransactionId::new(); + let req = send_message_event::v3::Request::new(room_id.to_owned(), txn_id, &joke_content)?; + // Do nothing if we can't send the message. + let _ = self.matrix_client.send_request(req).await; + + Ok(()) } - Ok(()) -} + /// Handle an invitation to the given room. + async fn handle_invitations(&self, room_id: OwnedRoomId) -> Result<(), Box> { + println!("invited to {room_id}"); + self.matrix_client + .send_request(join_room_by_id::v3::Request::new(room_id.clone())) + .await?; -async fn handle_invitations( - http_client: &HttpClient, - matrix_client: &MatrixClient, - room_id: OwnedRoomId, -) -> Result<(), Box> { - println!("invited to {room_id}"); - matrix_client - .send_request(join_room_by_id::v3::Request::new(room_id.clone())) - .await?; - - let greeting = "Hello! My name is Mr. Bot! I like to tell jokes. Like this one: "; - let joke = get_joke(http_client) - .await - .unwrap_or_else(|_| "err... never mind.".to_owned()); - let content = RoomMessageEventContent::text_plain(format!("{greeting}\n{joke}")); - let txn_id = TransactionId::new(); - let message = send_message_event::v3::Request::new(room_id, txn_id, &content)?; - matrix_client.send_request(message).await?; - Ok(()) -} + let greeting = "Hello! My name is Mr. Bot! I like to tell jokes. Like this one: "; + let joke = self + .get_joke() + .await + .unwrap_or_else(|_| "err... never mind.".to_owned()); + let content = RoomMessageEventContent::notice_plain(format!("{greeting}\n{joke}")); + let txn_id = TransactionId::new(); + let message = send_message_event::v3::Request::new(room_id, txn_id, &content)?; + self.matrix_client.send_request(message).await?; + Ok(()) + } + + /// Get a new joke from the API. + async fn get_joke(&self) -> Result> { + let rsp = self.http_client.get(JOKE_API_URI.clone()).await?; + let bytes = rsp.into_body().collect().await?.to_bytes(); + + let joke_obj = serde_json::from_slice::(&bytes) + .map_err(|_| "invalid JSON returned from joke API")?; + let joke = joke_obj["joke"] + .as_str() + .ok_or("joke field missing from joke API response")?; -async fn get_joke(client: &HttpClient) -> Result> { - let uri = "https://v2.jokeapi.dev/joke/Programming,Pun,Misc?safe-mode&type=single" - .parse::()?; - let rsp = client.get(uri).await?; - let bytes = rsp.into_body().collect().await?.to_bytes(); - let joke_obj = serde_json::from_slice::(&bytes) - .map_err(|_| "invalid JSON returned from joke API")?; - let joke = joke_obj["joke"] - .as_str() - .ok_or("joke field missing from joke API response")?; - Ok(joke.to_owned()) + Ok(joke.to_owned()) + } } +/// The session data to persist. struct State { + /// The token used for authorizing requests. access_token: String, } -async fn write_state(state: &State) -> io::Result<()> { - let content = &state.access_token; - fs::write("./session", content).await?; - Ok(()) -} +impl State { + /// The path of the file where the session is persisted. + const PATH: &str = "./session"; + + /// Persist the [`State`] to a file. + async fn write(&self) -> io::Result<()> { + let content = &self.access_token; + fs::write(Self::PATH, content).await?; + Ok(()) + } -async fn read_state() -> io::Result> { - match fs::read_to_string("./session").await { - Ok(access_token) => Ok(Some(State { access_token })), - Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(None), - Err(e) => Err(e), + /// Try to read the persisted [`State`] from a file. + async fn read() -> io::Result> { + match fs::read_to_string(Self::PATH).await { + Ok(access_token) => Ok(Some(Self { access_token })), + Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(None), + Err(e) => Err(e), + } } } +/// The bot configuration. struct Config { + /// The URL of the homeserver to interact with. homeserver: String, + /// The user ID of the account to use for the bot. username: OwnedUserId, + /// The password of the account. + /// + /// Only required the first time. password: Option, } -async fn read_config() -> io::Result { - let content = fs::read_to_string("./config").await?; - let lines = content.split('\n'); - - let mut homeserver = None; - let mut username = Err("required field `username` is missing".to_owned()); - let mut password = None; - for line in lines { - if let Some((key, value)) = line.split_once('=') { - match key.trim() { - "homeserver" => homeserver = Some(value.trim().to_owned()), - // TODO: infer domain from `homeserver` - "username" => { - username = - value.trim().to_owned().try_into().map_err(|e| { +impl Config { + /// Read the config file. + async fn read() -> io::Result { + let content = fs::read_to_string("./config").await?; + let lines = content.split('\n'); + + let mut homeserver = None; + let mut username = Err("required field `username` is missing".to_owned()); + let mut password = None; + for line in lines { + if let Some((key, value)) = line.split_once('=') { + match key.trim() { + "homeserver" => homeserver = Some(value.trim().to_owned()), + // TODO: infer domain from `homeserver` + "username" => { + username = value.trim().to_owned().try_into().map_err(|e| { format!("invalid Matrix user ID format for `username`: {e}") }); + } + "password" => password = Some(value.trim().to_owned()), + _ => {} } - "password" => password = Some(value.trim().to_owned()), - _ => {} } } - } - match (homeserver, username) { - (Some(homeserver), Ok(username)) => Ok(Config { - homeserver, - username, - password, - }), - (homeserver, username) => { - let mut error = String::from("Invalid config specified:"); - if homeserver.is_none() { - error.push_str("\n required field `homeserver` is missing"); - } - if let Err(e) = username { - error.push_str("\n "); - error.push_str(&e); + match (homeserver, username) { + (Some(homeserver), Ok(username)) => Ok(Self { + homeserver, + username, + password, + }), + (homeserver, username) => { + let mut error = String::from("Invalid config specified:"); + if homeserver.is_none() { + error.push_str("\n required field `homeserver` is missing"); + } + if let Err(e) = username { + error.push_str("\n "); + error.push_str(&e); + } + Err(io::Error::new(io::ErrorKind::InvalidData, error)) } - Err(io::Error::new(io::ErrorKind::InvalidData, error)) } } } diff --git a/message_log/src/main.rs b/message_log/src/main.rs index 96a1faf..137ea1b 100644 --- a/message_log/src/main.rs +++ b/message_log/src/main.rs @@ -18,6 +18,7 @@ async fn log_messages( username: &str, password: &str, ) -> anyhow::Result<()> { + // Construct and log in the client. let client = ruma_client::Client::builder() .homeserver_url(homeserver_url) .build::() @@ -25,6 +26,7 @@ async fn log_messages( client.log_in(username, password, None, None).await?; + // Perform an initial sync to ignore messages before the bot was launched. let filter = FilterDefinition::ignore_all().into(); let initial_sync_response = client .send_request(assign!(sync_events::v3::Request::new(), { @@ -32,6 +34,7 @@ async fn log_messages( })) .await?; + // Launch a sync loop to listen to new messages. let mut sync_stream = Box::pin(client.sync( None, initial_sync_response.next_batch,