diff --git a/mqttbytes/src/v4/suback.rs b/mqttbytes/src/v4/suback.rs index 143a12598..4eadcaf75 100644 --- a/mqttbytes/src/v4/suback.rs +++ b/mqttbytes/src/v4/suback.rs @@ -12,10 +12,7 @@ pub struct SubAck { impl SubAck { pub fn new(pkid: u16, return_codes: Vec) -> SubAck { - SubAck { - pkid, - return_codes, - } + SubAck { pkid, return_codes } } pub fn len(&self) -> usize { @@ -39,10 +36,7 @@ impl SubAck { return_codes.push(return_code.try_into()?); } - let suback = SubAck { - pkid, - return_codes, - }; + let suback = SubAck { pkid, return_codes }; Ok(suback) } @@ -53,28 +47,23 @@ impl SubAck { let remaining_len_bytes = write_remaining_length(buffer, remaining_len)?; buffer.put_u16(self.pkid); - let p: Vec = self.return_codes.iter().map(|code| *code as u8).collect(); + let p: Vec = self + .return_codes + .iter() + .map(|&code| match code { + SubscribeReasonCode::Success(qos) => qos as u8, + SubscribeReasonCode::Failure => 0x80, + }) + .collect(); buffer.extend_from_slice(&p); Ok(1 + remaining_len_bytes + remaining_len) } } - -#[repr(u8)] #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum SubscribeReasonCode { - QoS0 = 0, - QoS1 = 1, - QoS2 = 2, - Unspecified = 128, - ImplementationSpecific = 131, - NotAuthorized = 135, - TopicFilterInvalid = 143, - PkidInUse = 145, - QuotaExceeded = 151, - SharedSubscriptionsNotSupported = 158, - SubscriptionIdNotSupported = 161, - WildcardSubscriptionsNotSupported = 162, + Success(QoS), + Failure, } impl TryFrom for SubscribeReasonCode { @@ -82,18 +71,10 @@ impl TryFrom for SubscribeReasonCode { fn try_from(value: u8) -> Result { let v = match value { - 0 => SubscribeReasonCode::QoS0, - 1 => SubscribeReasonCode::QoS1, - 2 => SubscribeReasonCode::QoS2, - 128 => SubscribeReasonCode::Unspecified, - 131 => SubscribeReasonCode::ImplementationSpecific, - 135 => SubscribeReasonCode::NotAuthorized, - 143 => SubscribeReasonCode::TopicFilterInvalid, - 145 => SubscribeReasonCode::PkidInUse, - 151 => SubscribeReasonCode::QuotaExceeded, - 158 => SubscribeReasonCode::SharedSubscriptionsNotSupported, - 161 => SubscribeReasonCode::SubscriptionIdNotSupported, - 162 => SubscribeReasonCode::WildcardSubscriptionsNotSupported, + 0 => SubscribeReasonCode::Success(QoS::AtMostOnce), + 1 => SubscribeReasonCode::Success(QoS::AtLeastOnce), + 2 => SubscribeReasonCode::Success(QoS::ExactlyOnce), + 128 => SubscribeReasonCode::Failure, v => return Err(crate::Error::InvalidSubscribeReasonCode(v)), }; @@ -126,9 +107,11 @@ mod test { packet, SubAck { pkid: 15, - return_codes: vec![SubscribeReasonCode::QoS1, SubscribeReasonCode::Unspecified,], + return_codes: vec![ + SubscribeReasonCode::Success(QoS::AtLeastOnce), + SubscribeReasonCode::Failure, + ], } ); } } - diff --git a/rumqttlog/src/router/router.rs b/rumqttlog/src/router/router.rs index 15467856d..04b75e20a 100644 --- a/rumqttlog/src/router/router.rs +++ b/rumqttlog/src/router/router.rs @@ -7,8 +7,8 @@ use thiserror::Error; use super::connection::ConnectionType; use super::readyqueue::ReadyQueue; use super::slab::Slab; -use crate::logs::acks::Acks; use super::*; +use crate::logs::acks::Acks; use crate::logs::{ConnectionsLog, DataLog, TopicsLog}; use crate::router::metrics::RouterMetrics; @@ -388,10 +388,9 @@ impl Router { let mut return_codes = Vec::new(); for filter in subscribe.filters.iter() { if filter.path.starts_with("test") || filter.path.starts_with("$") { - return_codes.push(SubscribeReasonCode::TopicFilterInvalid); + return_codes.push(SubscribeReasonCode::Failure); } else { - // TODO: Fix subscribe return code - return_codes.push(SubscribeReasonCode::QoS0); + return_codes.push(SubscribeReasonCode::Success(filter.qos)); } } @@ -792,7 +791,9 @@ mod test { let client_id = &format!("{}", i); add_new_remote_connection(&mut router, client_id); add_new_subscription(&mut router, i, "hello/world"); - router.data_waiters.register(i, DataRequest::new("hello/world".to_owned(), 1)); + router + .data_waiters + .register(i, DataRequest::new("hello/world".to_owned(), 1)); } let payload = Bytes::from(vec![1, 2, 3]);