diff --git a/src/messaging/discord.rs b/src/messaging/discord.rs index ef9c7c13f..a2cd7ed89 100644 --- a/src/messaging/discord.rs +++ b/src/messaging/discord.rs @@ -400,34 +400,63 @@ impl Messaging for DiscordAdapter { } async fn broadcast(&self, target: &str, response: OutboundResponse) -> crate::Result<()> { - let http = self.get_http().await?; + crate::messaging::traits::ensure_supported_broadcast_response( + "discord", + &response, + |response| { + matches!( + response, + OutboundResponse::Text(_) | OutboundResponse::RichMessage { .. } + ) + }, + )?; + + let http = self + .get_http() + .await + .map_err(crate::messaging::traits::mark_retryable_broadcast)?; // Support "dm:{user_id}" targets for opening DM channels let channel_id = if let Some(user_id_str) = target.strip_prefix("dm:") { let user_id = UserId::new( user_id_str .parse::() - .context("invalid discord user id for DM broadcast target")?, + .context("invalid discord user id for DM broadcast target") + .map_err(crate::messaging::traits::mark_permanent_broadcast)?, ); user_id .create_dm_channel(&*http) .await - .context("failed to open DM channel")? + .context("failed to open DM channel") + .map_err(crate::messaging::traits::mark_classified_broadcast)? .id } else { ChannelId::new( target .parse::() - .context("invalid discord channel id for broadcast target")?, + .context("invalid discord channel id for broadcast target") + .map_err(crate::messaging::traits::mark_permanent_broadcast)?, ) }; if let OutboundResponse::Text(text) = response { + let mut sent_any = false; for chunk in split_message(&text, 2000) { - channel_id + match channel_id .say(&*http, &chunk) .await - .context("failed to broadcast discord message")?; + .context("failed to broadcast discord message") + { + Ok(_) => sent_any = true, + Err(error) => { + let error = crate::messaging::traits::mark_classified_broadcast(error); + return Err( + crate::messaging::traits::classify_chunked_broadcast_failure( + "discord", error, sent_any, + ), + ); + } + } } } else if let OutboundResponse::RichMessage { text, @@ -446,6 +475,7 @@ impl Messaging for DiscordAdapter { } let chunks = split_message(&parts.text, 2000); + let mut sent_any = false; for (i, chunk) in chunks.iter().enumerate() { let is_last = i == chunks.len() - 1; let mut msg = CreateMessage::new(); @@ -474,10 +504,21 @@ impl Messaging for DiscordAdapter { } } - channel_id + match channel_id .send_message(&*http, msg) .await - .context("failed to broadcast discord rich message")?; + .context("failed to broadcast discord rich message") + { + Ok(_) => sent_any = true, + Err(error) => { + let error = crate::messaging::traits::mark_classified_broadcast(error); + return Err( + crate::messaging::traits::classify_chunked_broadcast_failure( + "discord", error, sent_any, + ), + ); + } + } } } @@ -1243,6 +1284,7 @@ fn build_poll( #[cfg(test)] mod tests { use super::*; + use crate::messaging::traits::{BroadcastFailureKind, broadcast_failure_kind}; use crate::{Button, ButtonStyle, Card, CardField, InteractiveElements, Poll}; #[test] @@ -1356,4 +1398,18 @@ mod tests { assert_eq!(parts.text, "Status\n\nAll green"); assert!(!parts.dropped_invalid_poll); } + + #[test] + fn discord_partial_delivery_failures_become_permanent() { + let error = crate::messaging::traits::classify_chunked_broadcast_failure( + "discord", + crate::messaging::traits::mark_classified_broadcast(anyhow::anyhow!("timeout")), + true, + ); + + assert_eq!( + broadcast_failure_kind(&error), + BroadcastFailureKind::Permanent + ); + } } diff --git a/src/messaging/email.rs b/src/messaging/email.rs index 884dab226..23fda5003 100644 --- a/src/messaging/email.rs +++ b/src/messaging/email.rs @@ -517,17 +517,30 @@ impl Messaging for EmailAdapter { } async fn broadcast(&self, target: &str, response: OutboundResponse) -> crate::Result<()> { + crate::messaging::traits::ensure_supported_broadcast_response( + "email", + &response, + supports_email_broadcast_response, + )?; + let recipient = normalize_email_target(target) - .ok_or_else(|| anyhow::anyhow!("invalid email target '{target}'"))?; + .ok_or_else(|| anyhow::anyhow!("invalid email target '{target}'")) + .map_err(crate::messaging::traits::mark_permanent_broadcast)?; match response { OutboundResponse::Text(text) => { self.send_email(&recipient, "Spacebot message", text, None, Vec::new(), None) - .await?; + .await + .map_err(crate::messaging::traits::mark_classified_broadcast)?; } - OutboundResponse::RichMessage { text, .. } => { - self.send_email(&recipient, "Spacebot message", text, None, Vec::new(), None) - .await?; + OutboundResponse::RichMessage { + text, cards, poll, .. + } => { + let body = rich_message_plaintext_fallback(&text, &cards, poll.as_ref()) + .expect("supported rich email broadcasts must produce plaintext"); + self.send_email(&recipient, "Spacebot message", body, None, Vec::new(), None) + .await + .map_err(crate::messaging::traits::mark_classified_broadcast)?; } OutboundResponse::File { filename, @@ -544,12 +557,14 @@ impl Messaging for EmailAdapter { Vec::new(), Some((filename, data, mime_type)), ) - .await?; + .await + .map_err(crate::messaging::traits::mark_classified_broadcast)?; } OutboundResponse::ThreadReply { text, .. } | OutboundResponse::Ephemeral { text, .. } => { self.send_email(&recipient, "Spacebot message", text, None, Vec::new(), None) - .await?; + .await + .map_err(crate::messaging::traits::mark_classified_broadcast)?; } OutboundResponse::ScheduledMessage { text, post_at } => { tracing::warn!( @@ -558,14 +573,10 @@ impl Messaging for EmailAdapter { "email adapter does not support scheduled delivery; sending immediately" ); self.send_email(&recipient, "Spacebot message", text, None, Vec::new(), None) - .await?; + .await + .map_err(crate::messaging::traits::mark_classified_broadcast)?; } - OutboundResponse::Reaction(_) - | OutboundResponse::RemoveReaction(_) - | OutboundResponse::Status(_) - | OutboundResponse::StreamStart - | OutboundResponse::StreamChunk(_) - | OutboundResponse::StreamEnd => {} + _ => unreachable!("unsupported broadcast responses are rejected up front"), } Ok(()) @@ -675,6 +686,56 @@ impl Messaging for EmailAdapter { } } +fn supports_email_broadcast_response(response: &OutboundResponse) -> bool { + match response { + OutboundResponse::Text(_) + | OutboundResponse::File { .. } + | OutboundResponse::ThreadReply { .. } + | OutboundResponse::Ephemeral { .. } + | OutboundResponse::ScheduledMessage { .. } => true, + OutboundResponse::RichMessage { + text, cards, poll, .. + } => rich_message_plaintext_fallback(text, cards, poll.as_ref()).is_some(), + _ => false, + } +} + +fn rich_message_plaintext_fallback( + text: &str, + cards: &[crate::Card], + poll: Option<&crate::Poll>, +) -> Option { + let mut sections = Vec::new(); + let text = text.trim(); + if !text.is_empty() { + sections.push(text.to_string()); + } else { + let card_text = OutboundResponse::text_from_cards(cards); + if !card_text.trim().is_empty() { + sections.push(card_text); + } + } + + if let Some(poll) = poll { + let question = poll.question.trim(); + if !question.is_empty() { + let answers = poll + .answers + .iter() + .map(|answer| answer.trim()) + .filter(|answer| !answer.is_empty()) + .map(|answer| format!("- {answer}")) + .collect::>(); + let mut poll_lines = vec![question.to_string()]; + poll_lines.extend(answers); + sections.push(poll_lines.join("\n")); + } + } + + let combined = sections.join("\n\n"); + (!combined.trim().is_empty()).then_some(combined) +} + fn build_smtp_transport(config: &EmailConfig) -> crate::Result> { let builder = if config.smtp_use_starttls { AsyncSmtpTransport::::starttls_relay(&config.smtp_host) @@ -1758,7 +1819,10 @@ mod tests { EmailSearchHit, EmailSearchQuery, build_imap_search_criterion, derive_thread_key, extract_message_ids, is_local_mail_host, normalize_email_target, normalize_reply_subject, normalize_search_folders, parse_primary_mailbox, sort_and_limit_search_hits, + supports_email_broadcast_response, }; + use crate::OutboundResponse; + use crate::messaging::traits::{BroadcastFailureKind, broadcast_failure_kind}; #[test] fn parse_primary_mailbox_parses_display_name() { @@ -1917,4 +1981,71 @@ mod tests { assert_eq!(results[0].subject, "newest"); assert_eq!(results[1].subject, "middle"); } + + #[test] + fn email_supported_broadcast_variants_include_degraded_message_forms() { + assert!(supports_email_broadcast_response(&OutboundResponse::Text( + "hello".to_string() + ))); + assert!(supports_email_broadcast_response( + &OutboundResponse::RichMessage { + text: String::new(), + blocks: Vec::new(), + cards: vec![crate::Card { + title: Some("Digest".to_string()), + description: Some("One item needs attention".to_string()), + color: None, + url: None, + fields: Vec::new(), + footer: None, + thumbnail: None, + image: None, + author: None, + timestamp: None, + }], + interactive_elements: Vec::new(), + poll: None, + } + )); + assert!(supports_email_broadcast_response( + &OutboundResponse::ScheduledMessage { + text: "hello".to_string(), + post_at: 123, + } + )); + assert!(supports_email_broadcast_response(&OutboundResponse::File { + filename: "note.txt".to_string(), + data: vec![1, 2, 3], + mime_type: "text/plain".to_string(), + caption: None, + })); + } + + #[test] + fn email_unsupported_broadcast_variants_are_permanent_failures() { + let error = crate::messaging::traits::ensure_supported_broadcast_response( + "email", + &OutboundResponse::Reaction("thumbsup".to_string()), + supports_email_broadcast_response, + ) + .expect_err("unsupported variants should error"); + + assert_eq!( + broadcast_failure_kind(&error), + BroadcastFailureKind::Permanent + ); + } + + #[test] + fn email_rejects_rich_message_without_plaintext_fallback() { + assert!(!supports_email_broadcast_response( + &OutboundResponse::RichMessage { + text: String::new(), + blocks: Vec::new(), + cards: Vec::new(), + interactive_elements: Vec::new(), + poll: None, + } + )); + } } diff --git a/src/messaging/mattermost.rs b/src/messaging/mattermost.rs index 01b011478..bcf244d82 100644 --- a/src/messaging/mattermost.rs +++ b/src/messaging/mattermost.rs @@ -999,19 +999,50 @@ impl Messaging for MattermostAdapter { } async fn broadcast(&self, target: &str, response: OutboundResponse) -> crate::Result<()> { + crate::messaging::traits::ensure_supported_broadcast_response( + "mattermost", + &response, + supports_mattermost_broadcast_response, + )?; + // Resolve DM targets (dm:{user_id}) to a real Mattermost channel ID. let resolved_target; let target = if let Some(user_id) = target.strip_prefix("dm:") { - resolved_target = self.get_or_create_dm_channel(user_id).await?; + Self::validate_id(user_id) + .map_err(crate::messaging::traits::mark_permanent_broadcast)?; + resolved_target = self + .get_or_create_dm_channel(user_id) + .await + .map_err(crate::messaging::traits::mark_classified_broadcast)?; resolved_target.as_str() } else { + Self::validate_id(target) + .map_err(crate::messaging::traits::mark_permanent_broadcast)?; target }; match response { OutboundResponse::Text(text) => { + let mut sent_any = false; for chunk in split_message(&text, MAX_MESSAGE_LENGTH) { - self.create_post(target, &chunk, None).await?; + match self.create_post(target, &chunk, None).await { + Ok(_) => sent_any = true, + Err(error) => { + let error = match error { + crate::error::Error::Other(error) => { + crate::messaging::traits::mark_classified_broadcast(error) + } + other => other, + }; + return Err( + crate::messaging::traits::classify_chunked_broadcast_failure( + "mattermost", + error, + sent_any, + ), + ); + } + } } } OutboundResponse::File { @@ -1021,17 +1052,19 @@ impl Messaging for MattermostAdapter { caption, } => { if data.len() > self.max_attachment_bytes { - return Err(anyhow::anyhow!( - "file too large: {} bytes (max: {})", - data.len(), - self.max_attachment_bytes - ) - .into()); + return Err(crate::messaging::traits::mark_permanent_broadcast( + anyhow::anyhow!( + "file too large: {} bytes (max: {})", + data.len(), + self.max_attachment_bytes + ), + )); } let part = reqwest::multipart::Part::bytes(data) .file_name(filename) .mime_str(&mime_type) - .context("invalid mime type")?; + .context("invalid mime type") + .map_err(crate::messaging::traits::mark_permanent_broadcast)?; let form = reqwest::multipart::Form::new() .part("files", part) .text("channel_id", target.to_string()); @@ -1042,20 +1075,23 @@ impl Messaging for MattermostAdapter { .multipart(form) .send() .await - .context("failed to upload file")?; + .context("failed to upload file") + .map_err(crate::messaging::traits::mark_classified_broadcast)?; let upload_status = upload_response.status(); if !upload_status.is_success() { let body = upload_response.text().await.unwrap_or_default(); - return Err(anyhow::anyhow!( - "mattermost file upload failed with status {}: {body}", - upload_status.as_u16() - ) - .into()); + return Err(crate::messaging::traits::mark_classified_broadcast( + anyhow::anyhow!( + "mattermost file upload failed with status {}: {body}", + upload_status.as_u16() + ), + )); } let upload: MattermostFileUpload = upload_response .json() .await - .context("failed to parse file upload response")?; + .context("failed to parse file upload response") + .map_err(crate::messaging::traits::mark_classified_broadcast)?; let file_ids: Vec<_> = upload.file_infos.iter().map(|f| f.id.as_str()).collect(); let post_response = self .client @@ -1068,28 +1104,32 @@ impl Messaging for MattermostAdapter { })) .send() .await - .context("failed to create post with file")?; + .context("failed to create post with file") + .map_err(crate::messaging::traits::mark_classified_broadcast)?; let post_status = post_response.status(); if !post_status.is_success() { let body = post_response.text().await.unwrap_or_default(); - return Err(anyhow::anyhow!( - "mattermost create post with file failed with status {}: {body}", - post_status.as_u16() - ) - .into()); + return Err(crate::messaging::traits::mark_classified_broadcast( + anyhow::anyhow!( + "mattermost create post with file failed with status {}: {body}", + post_status.as_u16() + ), + )); } } - other => { - tracing::debug!( - ?other, - "mattermost broadcast does not support this response type" - ); - } + _ => unreachable!("unsupported broadcast responses are rejected up front"), } Ok(()) } } +fn supports_mattermost_broadcast_response(response: &OutboundResponse) -> bool { + matches!( + response, + OutboundResponse::Text(_) | OutboundResponse::File { .. } + ) +} + /// Convert a [`MattermostPost`] from a WebSocket event into an [`InboundMessage`], /// applying all permission filters. /// @@ -1525,6 +1565,8 @@ fn split_message(text: &str, max_len: usize) -> Vec { #[cfg(test)] mod tests { use super::*; + use crate::OutboundResponse; + use crate::messaging::traits::{BroadcastFailureKind, broadcast_failure_kind}; // --- helpers --- @@ -1922,4 +1964,73 @@ mod tests { let result = split_message(text, 10); assert_eq!(result, vec!["abcdefghij", "klmnopqrst", "uvwxyz"]); } + + #[test] + fn mattermost_supported_broadcast_variants_are_explicit() { + assert!(supports_mattermost_broadcast_response( + &OutboundResponse::Text("hello".to_string()) + )); + assert!(supports_mattermost_broadcast_response( + &OutboundResponse::File { + filename: "note.txt".to_string(), + data: vec![1, 2, 3], + mime_type: "text/plain".to_string(), + caption: None, + } + )); + } + + #[test] + fn mattermost_unsupported_broadcast_variants_are_permanent_failures() { + let error = crate::messaging::traits::ensure_supported_broadcast_response( + "mattermost", + &OutboundResponse::RichMessage { + text: "hello".to_string(), + blocks: Vec::new(), + cards: Vec::new(), + interactive_elements: Vec::new(), + poll: None, + }, + supports_mattermost_broadcast_response, + ) + .expect_err("unsupported variants should error"); + + assert_eq!( + broadcast_failure_kind(&error), + BroadcastFailureKind::Permanent + ); + } + + #[test] + fn mattermost_partial_delivery_failures_become_permanent() { + let error = crate::messaging::traits::classify_chunked_broadcast_failure( + "mattermost", + crate::messaging::traits::mark_classified_broadcast(anyhow::anyhow!("timeout")), + true, + ); + + assert_eq!( + broadcast_failure_kind(&error), + BroadcastFailureKind::Permanent + ); + } + + #[test] + fn mattermost_invalid_target_validation_is_permanent() { + let direct_error = MattermostAdapter::validate_id("bad:channel") + .map_err(crate::messaging::traits::mark_permanent_broadcast) + .expect_err("invalid direct target should fail"); + let dm_error = MattermostAdapter::validate_id("bad:user") + .map_err(crate::messaging::traits::mark_permanent_broadcast) + .expect_err("invalid dm user should fail"); + + assert_eq!( + broadcast_failure_kind(&direct_error), + BroadcastFailureKind::Permanent + ); + assert_eq!( + broadcast_failure_kind(&dm_error), + BroadcastFailureKind::Permanent + ); + } } diff --git a/src/messaging/portal.rs b/src/messaging/portal.rs index f438b0a92..b7cb0b9c2 100644 --- a/src/messaging/portal.rs +++ b/src/messaging/portal.rs @@ -74,30 +74,32 @@ impl Messaging for PortalAdapter { } async fn broadcast(&self, target: &str, response: OutboundResponse) -> crate::Result<()> { - let text = match &response { - OutboundResponse::Text(text) => text.clone(), - OutboundResponse::RichMessage { text, .. } => text.clone(), - _ => return Ok(()), - }; + let text = extract_portal_broadcast_text(response)?; // Target format is the full conversation_id: "portal:chat:{agent_id}" let agent_id = target .strip_prefix("portal:chat:") - .context("portal broadcast target must be in 'portal:chat:{agent_id}' format")?; + .context("portal broadcast target must be in 'portal:chat:{agent_id}' format") + .map_err(crate::messaging::traits::mark_permanent_broadcast)?; let tx = self .event_tx .read() .unwrap() .clone() - .context("portal event_tx not configured")?; + .context("portal event_tx not configured") + .map_err(crate::messaging::traits::mark_retryable_broadcast)?; tx.send(ApiEvent::OutboundMessage { agent_id: agent_id.to_string(), channel_id: target.to_string(), text, }) - .ok(); + .map_err(|_| { + crate::messaging::traits::mark_retryable_broadcast(anyhow::anyhow!( + "portal broadcast dropped: no active event subscribers" + )) + })?; Ok(()) } @@ -167,10 +169,72 @@ impl Messaging for PortalAdapter { } } +fn extract_portal_broadcast_text(response: OutboundResponse) -> crate::Result { + match response { + OutboundResponse::Text(text) => Ok(text), + OutboundResponse::RichMessage { + text, cards, poll, .. + } => { + let text = + rich_message_plaintext_fallback(&text, &cards, poll.as_ref()).ok_or_else(|| { + crate::messaging::traits::unsupported_broadcast_variant_error( + "portal", + &OutboundResponse::RichMessage { + text, + blocks: Vec::new(), + cards, + interactive_elements: Vec::new(), + poll, + }, + ) + })?; + Ok(text) + } + other => { + Err(crate::messaging::traits::unsupported_broadcast_variant_error("portal", &other)) + } + } +} + +fn rich_message_plaintext_fallback( + text: &str, + cards: &[crate::Card], + poll: Option<&crate::Poll>, +) -> Option { + let text = text.trim(); + if !text.is_empty() { + return Some(text.to_string()); + } + + let card_text = OutboundResponse::text_from_cards(cards); + if !card_text.trim().is_empty() { + return Some(card_text); + } + + poll.and_then(|poll| { + let question = poll.question.trim(); + if question.is_empty() { + return None; + } + + let answers = poll + .answers + .iter() + .map(|answer| answer.trim()) + .filter(|answer| !answer.is_empty()) + .map(|answer| format!("- {answer}")) + .collect::>(); + let mut lines = vec![question.to_string()]; + lines.extend(answers); + Some(lines.join("\n")) + }) +} + #[cfg(test)] mod tests { use super::*; - use crate::MessageContent; + use crate::messaging::traits::{BroadcastFailureKind, broadcast_failure_kind}; + use crate::{Card, MessageContent, OutboundResponse, Poll}; use chrono::Utc; #[tokio::test] @@ -258,4 +322,110 @@ mod tests { assert_eq!(history[1].content, "hello Alice"); assert!(history[1].is_bot); } + + #[test] + fn unsupported_portal_broadcast_variants_are_permanent_failures() { + let error = + extract_portal_broadcast_text(OutboundResponse::Reaction("thumbsup".to_string())) + .expect_err("unsupported variants should error"); + + assert_eq!( + broadcast_failure_kind(&error), + BroadcastFailureKind::Permanent + ); + assert!( + error + .to_string() + .contains("unsupported portal broadcast response variant: Reaction") + ); + } + + #[test] + fn portal_extracts_card_text_fallback_for_rich_broadcasts() { + let text = extract_portal_broadcast_text(OutboundResponse::RichMessage { + text: String::new(), + blocks: Vec::new(), + cards: vec![Card { + title: Some("Digest".to_string()), + description: Some("One item needs attention".to_string()), + color: None, + url: None, + fields: Vec::new(), + footer: None, + thumbnail: None, + image: None, + author: None, + timestamp: None, + }], + interactive_elements: Vec::new(), + poll: None, + }) + .expect("card-only portal broadcasts should derive plaintext"); + + assert!(text.contains("Digest")); + assert!(text.contains("One item needs attention")); + } + + #[test] + fn portal_extracts_poll_text_fallback_for_rich_broadcasts() { + let text = extract_portal_broadcast_text(OutboundResponse::RichMessage { + text: String::new(), + blocks: Vec::new(), + cards: Vec::new(), + interactive_elements: Vec::new(), + poll: Some(Poll { + question: "Ship it?".to_string(), + answers: vec!["Yes".to_string(), "No".to_string()], + allow_multiselect: false, + duration_hours: 24, + }), + }) + .expect("poll-only portal broadcasts should derive plaintext"); + + assert!(text.contains("Ship it?")); + assert!(text.contains("- Yes")); + } + + #[test] + fn portal_rejects_empty_rich_broadcasts() { + let error = extract_portal_broadcast_text(OutboundResponse::RichMessage { + text: String::new(), + blocks: Vec::new(), + cards: Vec::new(), + interactive_elements: Vec::new(), + poll: None, + }) + .expect_err("empty rich broadcasts should be rejected"); + + assert_eq!( + broadcast_failure_kind(&error), + BroadcastFailureKind::Permanent + ); + } + + #[tokio::test] + async fn portal_broadcast_fails_when_event_subscribers_are_gone() { + let adapter = PortalAdapter::new(HashMap::new()); + let (event_tx, event_rx) = broadcast::channel(4); + drop(event_rx); + adapter.set_event_tx(event_tx); + + let error = adapter + .broadcast( + "portal:chat:agent-a", + OutboundResponse::Text("hello".to_string()), + ) + .await + .expect_err("dropped send must surface as an error"); + + assert_eq!( + broadcast_failure_kind(&error), + BroadcastFailureKind::Transient + ); + assert!( + error + .to_string() + .contains("portal broadcast dropped: no active event subscribers") + ); + } } diff --git a/src/messaging/signal.rs b/src/messaging/signal.rs index ff4fe72b5..9003eb91e 100644 --- a/src/messaging/signal.rs +++ b/src/messaging/signal.rs @@ -935,6 +935,12 @@ impl Messaging for SignalAdapter { target: &str, response: crate::OutboundResponse, ) -> crate::Result<()> { + crate::messaging::traits::ensure_supported_broadcast_response( + "signal", + &response, + supports_signal_broadcast_response, + )?; + // Parse target into RecipientTarget // Format: uuid:xxx, group:xxx, or +xxx let recipient = if let Some(uuid) = target.strip_prefix("uuid:") { @@ -944,18 +950,22 @@ impl Messaging for SignalAdapter { } else if target.starts_with('+') { RecipientTarget::Direct(target.to_string()) } else { - return Err(crate::Error::Other(anyhow::anyhow!( - "invalid signal broadcast target format: {target}" - ))); + return Err(crate::messaging::traits::mark_permanent_broadcast( + anyhow::anyhow!("invalid signal broadcast target format"), + )); }; match response { crate::OutboundResponse::Text(text) => { - self.send_text(&recipient, &text).await?; + self.send_text(&recipient, &text) + .await + .map_err(crate::messaging::traits::mark_classified_broadcast)?; } crate::OutboundResponse::RichMessage { text, .. } => { // Signal has no rich formatting — send plain text - self.send_text(&recipient, &text).await?; + self.send_text(&recipient, &text) + .await + .map_err(crate::messaging::traits::mark_classified_broadcast)?; } crate::OutboundResponse::File { filename, @@ -964,27 +974,28 @@ impl Messaging for SignalAdapter { .. } => { self.send_file(&recipient, &filename, &data, caption.as_deref()) - .await?; + .await + .map_err(crate::messaging::traits::mark_classified_broadcast)?; } crate::OutboundResponse::ThreadReply { text, .. } => { // Signal has no threads — send as regular message - self.send_text(&recipient, &text).await?; + self.send_text(&recipient, &text) + .await + .map_err(crate::messaging::traits::mark_classified_broadcast)?; } crate::OutboundResponse::Ephemeral { text, .. } => { // Signal has no ephemeral messages — send as regular text - self.send_text(&recipient, &text).await?; + self.send_text(&recipient, &text) + .await + .map_err(crate::messaging::traits::mark_classified_broadcast)?; } crate::OutboundResponse::ScheduledMessage { text, .. } => { // Signal has no scheduled messages — send immediately - self.send_text(&recipient, &text).await?; - } - _ => { - // Other response types are not supported for broadcast - tracing::debug!( - target = %redact_identifier(target), - "signal: unsupported broadcast response type, dropping" - ); + self.send_text(&recipient, &text) + .await + .map_err(crate::messaging::traits::mark_classified_broadcast)?; } + _ => unreachable!("unsupported broadcast responses are rejected up front"), } Ok(()) @@ -1030,6 +1041,18 @@ impl Messaging for SignalAdapter { } } +fn supports_signal_broadcast_response(response: &crate::OutboundResponse) -> bool { + matches!( + response, + crate::OutboundResponse::Text(_) + | crate::OutboundResponse::RichMessage { .. } + | crate::OutboundResponse::File { .. } + | crate::OutboundResponse::ThreadReply { .. } + | crate::OutboundResponse::Ephemeral { .. } + | crate::OutboundResponse::ScheduledMessage { .. } + ) +} + // ── SSE listener ──────────────────────────────────────────────── /// Long-running SSE listener that reconnects with exponential backoff. @@ -1450,6 +1473,8 @@ fn parse_recipient_target(target: &str) -> RecipientTarget { #[cfg(test)] mod tests { use super::*; + use crate::OutboundResponse; + use crate::messaging::traits::{BroadcastFailureKind, broadcast_failure_kind}; #[test] fn parse_recipient_target_dm() { @@ -1995,6 +2020,42 @@ mod tests { "Group message should be rejected when sender not in allowed users" ); } + + #[test] + fn signal_supported_broadcast_variants_include_textual_fallbacks() { + assert!(supports_signal_broadcast_response(&OutboundResponse::Text( + "hello".to_string() + ))); + assert!(supports_signal_broadcast_response( + &OutboundResponse::ScheduledMessage { + text: "hello".to_string(), + post_at: 123, + } + )); + assert!(supports_signal_broadcast_response( + &OutboundResponse::File { + filename: "note.txt".to_string(), + data: vec![1, 2, 3], + mime_type: "text/plain".to_string(), + caption: Some("note".to_string()), + } + )); + } + + #[test] + fn signal_unsupported_broadcast_variants_are_permanent_failures() { + let error = crate::messaging::traits::ensure_supported_broadcast_response( + "signal", + &OutboundResponse::Reaction("thumbsup".to_string()), + supports_signal_broadcast_response, + ) + .expect_err("unsupported variants should error"); + + assert_eq!( + broadcast_failure_kind(&error), + BroadcastFailureKind::Permanent + ); + } } #[cfg(test)] diff --git a/src/messaging/slack.rs b/src/messaging/slack.rs index d3ace875d..e42faad82 100644 --- a/src/messaging/slack.rs +++ b/src/messaging/slack.rs @@ -1122,6 +1122,7 @@ impl Messaging for SlackAdapter { } async fn broadcast(&self, target: &str, response: OutboundResponse) -> crate::Result<()> { + ensure_supported_broadcast_response(&response)?; let session = self.session(); // Parse an optional thread target encoded as `#thread:` suffix. @@ -1136,7 +1137,8 @@ impl Messaging for SlackAdapter { let open_resp = session .conversations_open(&open_req) .await - .context("failed to open Slack DM conversation")?; + .context("failed to open Slack DM conversation") + .map_err(crate::messaging::traits::mark_classified_broadcast)?; open_resp.channel.id } else { SlackChannelId(bare_target.to_string()) @@ -1144,16 +1146,28 @@ impl Messaging for SlackAdapter { match response { OutboundResponse::Text(text) => { + let mut sent_any = false; for chunk in split_message(&text, 12_000) { let mut req = SlackApiChatPostMessageRequest::new( channel_id.clone(), markdown_content(chunk), ); req = req.opt_thread_ts(thread_ts.clone()); - session + match session .chat_post_message(&req) .await - .context("failed to broadcast slack message")?; + .context("failed to broadcast slack message") + { + Ok(_) => sent_any = true, + Err(error) => { + let error = crate::messaging::traits::mark_classified_broadcast(error); + return Err( + crate::messaging::traits::classify_chunked_broadcast_failure( + "slack", error, sent_any, + ), + ); + } + } } } OutboundResponse::RichMessage { text, blocks, .. } => { @@ -1170,18 +1184,10 @@ impl Messaging for SlackAdapter { session .chat_post_message(&req) .await - .context("failed to broadcast slack rich message")?; - } - // Other variants are not meaningful for broadcast (e.g. Ephemeral requires a - // specific user_id from a live conversation, Reaction requires an existing ts, - // Scheduled/Stream are respond()-only flows). - other => { - tracing::warn!( - variant = %variant_name(&other), - target = %target, - "broadcast() received a variant that is not supported for broadcast — ignoring" - ); + .context("failed to broadcast slack rich message") + .map_err(crate::messaging::traits::mark_classified_broadcast)?; } + _ => unreachable!("unsupported broadcast responses are rejected up front"), } Ok(()) @@ -1625,22 +1631,13 @@ fn strip_bot_mention(text: &str, bot_user_id: &str) -> String { .to_string() } -/// Return a short human-readable name for an `OutboundResponse` variant for log messages. -fn variant_name(response: &OutboundResponse) -> &'static str { - match response { - OutboundResponse::Text(_) => "Text", - OutboundResponse::ThreadReply { .. } => "ThreadReply", - OutboundResponse::File { .. } => "File", - OutboundResponse::Reaction(_) => "Reaction", - OutboundResponse::RemoveReaction(_) => "RemoveReaction", - OutboundResponse::Ephemeral { .. } => "Ephemeral", - OutboundResponse::RichMessage { .. } => "RichMessage", - OutboundResponse::ScheduledMessage { .. } => "ScheduledMessage", - OutboundResponse::StreamStart => "StreamStart", - OutboundResponse::StreamChunk(_) => "StreamChunk", - OutboundResponse::StreamEnd => "StreamEnd", - OutboundResponse::Status(_) => "Status", - } +fn ensure_supported_broadcast_response(response: &OutboundResponse) -> crate::Result<()> { + crate::messaging::traits::ensure_supported_broadcast_response("slack", response, |response| { + matches!( + response, + OutboundResponse::Text(_) | OutboundResponse::RichMessage { .. } + ) + }) } /// Split a message into UTF-8-safe chunks at line/word boundaries. @@ -1731,6 +1728,7 @@ fn resolve_slack_user_identity(user: &SlackUser, user_id: &str) -> SlackUserIden #[cfg(test)] mod tests { use super::*; + use crate::messaging::traits::{BroadcastFailureKind, broadcast_failure_kind}; #[test] fn sanitize_reaction_name_unicode_emoji_with_shortcode() { @@ -1803,4 +1801,54 @@ mod tests { let result = sanitize_reaction_name(":partyparrot:"); assert_eq!(result, "partyparrot"); } + + #[test] + fn supported_broadcast_variants_validate() { + assert!( + ensure_supported_broadcast_response(&OutboundResponse::Text("hello".to_string())) + .is_ok() + ); + assert!( + ensure_supported_broadcast_response(&OutboundResponse::RichMessage { + text: "hello".to_string(), + blocks: Vec::new(), + cards: Vec::new(), + interactive_elements: Vec::new(), + poll: None, + }) + .is_ok() + ); + } + + #[test] + fn unsupported_broadcast_variants_are_permanent_failures() { + let error = ensure_supported_broadcast_response(&OutboundResponse::Reaction( + "thumbsup".to_string(), + )) + .expect_err("unsupported variants should error"); + + assert_eq!( + broadcast_failure_kind(&error), + BroadcastFailureKind::Permanent + ); + assert!( + error + .to_string() + .contains("unsupported slack broadcast response variant: Reaction") + ); + } + + #[test] + fn slack_partial_delivery_failures_become_permanent() { + let error = crate::messaging::traits::classify_chunked_broadcast_failure( + "slack", + crate::messaging::traits::mark_classified_broadcast(anyhow::anyhow!("timeout")), + true, + ); + + assert_eq!( + broadcast_failure_kind(&error), + BroadcastFailureKind::Permanent + ); + } } diff --git a/src/messaging/telegram.rs b/src/messaging/telegram.rs index dc6f762d1..49172834a 100644 --- a/src/messaging/telegram.rs +++ b/src/messaging/telegram.rs @@ -544,19 +544,37 @@ impl Messaging for TelegramAdapter { } async fn broadcast(&self, target: &str, response: OutboundResponse) -> crate::Result<()> { + crate::messaging::traits::ensure_supported_broadcast_response( + "telegram", + &response, + supports_telegram_broadcast_response, + )?; + let chat_id = ChatId( target .parse::() - .context("invalid telegram chat id for broadcast target")?, + .context("invalid telegram chat id for broadcast target") + .map_err(crate::messaging::traits::mark_permanent_broadcast)?, ); if let OutboundResponse::Text(text) = response { - send_formatted(&self.bot, chat_id, &text, None).await?; - } else if let OutboundResponse::RichMessage { text, poll, .. } = response { - send_formatted(&self.bot, chat_id, &text, None).await?; + send_formatted(&self.bot, chat_id, &text, None) + .await + .map_err(crate::messaging::traits::mark_classified_broadcast)?; + } else if let OutboundResponse::RichMessage { + text, cards, poll, .. + } = response + { + if let Some(text) = telegram_rich_message_text(&text, &cards) { + send_formatted(&self.bot, chat_id, &text, None) + .await + .map_err(crate::messaging::traits::mark_classified_broadcast)?; + } if let Some(poll_data) = poll { - send_poll(&self.bot, chat_id, &poll_data).await?; + send_poll(&self.bot, chat_id, &poll_data) + .await + .map_err(crate::messaging::traits::mark_classified_broadcast)?; } } @@ -589,6 +607,33 @@ impl Messaging for TelegramAdapter { } } +fn supports_telegram_broadcast_response(response: &OutboundResponse) -> bool { + match response { + OutboundResponse::Text(_) => true, + OutboundResponse::RichMessage { + text, + cards, + interactive_elements, + poll, + .. + } => { + interactive_elements.is_empty() + && (telegram_rich_message_text(text, cards).is_some() || poll.is_some()) + } + _ => false, + } +} + +fn telegram_rich_message_text(text: &str, cards: &[crate::Card]) -> Option { + let text = text.trim(); + if !text.is_empty() { + return Some(text.to_string()); + } + + let derived = OutboundResponse::text_from_cards(cards); + (!derived.trim().is_empty()).then_some(derived) +} + // -- Helper functions -- /// Extract text content from a Telegram message. @@ -1266,6 +1311,7 @@ async fn send_formatted( #[cfg(test)] mod tests { use super::*; + use crate::{Card, InteractiveElements, OutboundResponse, Poll}; #[test] fn bold() { @@ -1434,4 +1480,61 @@ mod tests { assert!(should_retry_plain_caption(&parse_error)); assert!(!should_retry_plain_caption(&non_parse_error)); } + + #[test] + fn telegram_supports_card_only_rich_message_with_text_fallback() { + assert!(supports_telegram_broadcast_response( + &OutboundResponse::RichMessage { + text: String::new(), + blocks: Vec::new(), + cards: vec![Card { + title: Some("Digest".to_string()), + description: Some("One item needs attention".to_string()), + color: None, + url: None, + fields: Vec::new(), + footer: None, + thumbnail: None, + image: None, + author: None, + timestamp: None, + }], + interactive_elements: Vec::new(), + poll: None, + } + )); + } + + #[test] + fn telegram_rejects_interactive_only_rich_messages() { + assert!(!supports_telegram_broadcast_response( + &OutboundResponse::RichMessage { + text: String::new(), + blocks: Vec::new(), + cards: Vec::new(), + interactive_elements: vec![InteractiveElements::Buttons { + buttons: Vec::new() + }], + poll: None, + } + )); + } + + #[test] + fn telegram_supports_poll_only_rich_messages() { + assert!(supports_telegram_broadcast_response( + &OutboundResponse::RichMessage { + text: String::new(), + blocks: Vec::new(), + cards: Vec::new(), + interactive_elements: Vec::new(), + poll: Some(Poll { + question: "Ship it?".to_string(), + answers: vec!["Yes".to_string(), "No".to_string()], + allow_multiselect: false, + duration_hours: 24, + }), + } + )); + } } diff --git a/src/messaging/traits.rs b/src/messaging/traits.rs index ced0dee1a..6414461b1 100644 --- a/src/messaging/traits.rs +++ b/src/messaging/traits.rs @@ -74,6 +74,30 @@ pub fn mark_classified_broadcast(error: impl Into) -> crate::Erro mark_broadcast_failure(kind, error) } +/// Classify a proactive broadcast failure for chunked/multi-send delivery. +/// +/// `broadcast_proactive` retries the whole response payload, not individual chunks. +/// Once any chunk has already been accepted by the platform, a later failure must +/// become terminal so the retry loop does not replay previously delivered chunks. +pub fn classify_chunked_broadcast_failure( + platform: &str, + error: crate::Error, + sent_any: bool, +) -> crate::Error { + if sent_any { + match error { + crate::error::Error::Other(error) => mark_permanent_broadcast(error.context(format!( + "{platform} broadcast partially delivered before failure" + ))), + other => mark_permanent_broadcast(anyhow::anyhow!( + "{platform} broadcast partially delivered before failure: {other}" + )), + } + } else { + error + } +} + /// Reject unsupported proactive broadcast variants unless an adapter opts into an explicit fallback. pub fn unsupported_broadcast_variant_error( platform: &str, @@ -374,7 +398,8 @@ pub fn apply_runtime_adapter_to_conversation_id( mod tests { use super::{ BroadcastFailureKind, broadcast_failure_kind, broadcast_variant_name, - ensure_supported_broadcast_response, mark_broadcast_failure, mark_classified_broadcast, + classify_chunked_broadcast_failure, ensure_supported_broadcast_response, + mark_broadcast_failure, mark_classified_broadcast, }; use crate::OutboundResponse; @@ -474,4 +499,37 @@ mod tests { BroadcastFailureKind::Permanent ); } + + #[test] + fn chunked_broadcast_failures_preserve_retryability_before_any_send() { + let error = classify_chunked_broadcast_failure( + "test", + mark_classified_broadcast(anyhow::anyhow!("HTTP 503")), + false, + ); + + assert_eq!( + broadcast_failure_kind(&error), + BroadcastFailureKind::Transient + ); + } + + #[test] + fn chunked_broadcast_failures_become_permanent_after_partial_delivery() { + let error = classify_chunked_broadcast_failure( + "test", + mark_classified_broadcast(anyhow::anyhow!("HTTP 503")), + true, + ); + + assert_eq!( + broadcast_failure_kind(&error), + BroadcastFailureKind::Permanent + ); + assert!( + error + .to_string() + .contains("test broadcast partially delivered before failure") + ); + } } diff --git a/src/messaging/twitch.rs b/src/messaging/twitch.rs index 0974a1326..633b2bc05 100644 --- a/src/messaging/twitch.rs +++ b/src/messaging/twitch.rs @@ -365,7 +365,8 @@ impl Messaging for TwitchAdapter { let client_guard = self.client.read().await; let client = client_guard .as_ref() - .context("twitch client not connected")?; + .context("twitch client not connected") + .map_err(crate::messaging::traits::mark_retryable_broadcast)?; let channel = message .metadata @@ -457,26 +458,39 @@ impl Messaging for TwitchAdapter { } async fn broadcast(&self, target: &str, response: OutboundResponse) -> crate::Result<()> { + crate::messaging::traits::ensure_supported_broadcast_response( + "twitch", + &response, + supports_twitch_broadcast_response, + )?; + let client_guard = self.client.read().await; let client = client_guard .as_ref() - .context("twitch client not connected")?; + .context("twitch client not connected") + .map_err(crate::messaging::traits::mark_retryable_broadcast)?; - if let OutboundResponse::Text(text) = response { - let channel = target.strip_prefix('#').unwrap_or(target); - for chunk in split_message(&text, MAX_MESSAGE_LENGTH) { - client - .say(channel.to_owned(), chunk) - .await - .context("failed to broadcast twitch message")?; - } - } else if let OutboundResponse::RichMessage { text, .. } = response { - let channel = target.strip_prefix('#').unwrap_or(target); - for chunk in split_message(&text, MAX_MESSAGE_LENGTH) { - client - .say(channel.to_owned(), chunk) - .await - .context("failed to broadcast twitch message")?; + let text = match response { + OutboundResponse::Text(text) | OutboundResponse::RichMessage { text, .. } => text, + _ => unreachable!("unsupported broadcast responses are rejected up front"), + }; + let channel = target.strip_prefix('#').unwrap_or(target); + let mut sent_any = false; + for chunk in split_message(&text, MAX_MESSAGE_LENGTH) { + match client + .say(channel.to_owned(), chunk) + .await + .context("failed to broadcast twitch message") + { + Ok(()) => sent_any = true, + Err(error) => { + let error = crate::messaging::traits::mark_classified_broadcast(error); + return Err( + crate::messaging::traits::classify_chunked_broadcast_failure( + "twitch", error, sent_any, + ), + ); + } } } @@ -505,6 +519,13 @@ impl Messaging for TwitchAdapter { } } +fn supports_twitch_broadcast_response(response: &OutboundResponse) -> bool { + matches!( + response, + OutboundResponse::Text(_) | OutboundResponse::RichMessage { .. } + ) +} + /// Split a message into chunks that fit within Twitch's character limit. /// Tries to split at newlines, then spaces, then hard-cuts. fn split_message(text: &str, max_len: usize) -> Vec { @@ -532,3 +553,54 @@ fn split_message(text: &str, max_len: usize) -> Vec { chunks } + +#[cfg(test)] +mod tests { + use super::supports_twitch_broadcast_response; + use crate::OutboundResponse; + use crate::messaging::traits::{BroadcastFailureKind, broadcast_failure_kind}; + + #[test] + fn twitch_supported_broadcast_variants_are_explicit() { + assert!(supports_twitch_broadcast_response(&OutboundResponse::Text( + "hello".to_string() + ))); + assert!(supports_twitch_broadcast_response( + &OutboundResponse::RichMessage { + text: "hello".to_string(), + blocks: Vec::new(), + cards: Vec::new(), + interactive_elements: Vec::new(), + poll: None, + } + )); + } + + #[test] + fn twitch_unsupported_broadcast_variants_are_permanent_failures() { + let error = crate::messaging::traits::ensure_supported_broadcast_response( + "twitch", + &OutboundResponse::Reaction("thumbsup".to_string()), + supports_twitch_broadcast_response, + ) + .expect_err("unsupported variants should error"); + + assert_eq!( + broadcast_failure_kind(&error), + BroadcastFailureKind::Permanent + ); + } + + #[test] + fn twitch_partial_delivery_failures_become_permanent() { + let error = crate::messaging::traits::classify_chunked_broadcast_failure( + "twitch", + crate::messaging::traits::mark_classified_broadcast(anyhow::anyhow!("timeout")), + true, + ); + assert_eq!( + broadcast_failure_kind(&error), + BroadcastFailureKind::Permanent + ); + } +}