From 61ca96dcbba65a4f2073d9ef0becd999c524af59 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Sat, 18 Jan 2025 23:51:59 +0000 Subject: [PATCH] remove logging tag --- src/Simplex/Messaging/Client.hs | 72 +++++++++---------- src/Simplex/Messaging/Notifications/Client.hs | 2 +- src/Simplex/Messaging/Protocol.hs | 2 +- 3 files changed, 37 insertions(+), 39 deletions(-) diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index 386e22c23..bde663b32 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -171,7 +171,7 @@ data PClient v err msg = PClient timeoutErrorCount :: TVar Int, clientCorrId :: TVar ChaChaDRG, sentCommands :: TMap CorrId (Request err msg), - sndQ :: TBQueue (Maybe (Request err msg), ByteString, Maybe (Tag (ProtoCommand msg))), + sndQ :: TBQueue (Maybe (Request err msg), ByteString), rcvQ :: TBQueue (NonEmpty (SignedTransmission err msg)), msgQ :: Maybe (TBQueue (ServerTransmissionBatch v err msg)) } @@ -566,14 +566,12 @@ getProtocolClient g transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize send :: Transport c => ProtocolClient v err msg -> THandle v c 'TClient -> IO () send ProtocolClient {client_ = PClient {sndQ}} h = forever $ atomically (readTBQueue sndQ) >>= sendPending where - sendPending (Nothing, s, tag) = send_ s tag Nothing - sendPending (Just Request {pending, responseVar}, s, tag) = whenM (readTVarIO pending) $ send_ s tag (Just responseVar) - send_ s tag responseVar_ = - tPutLog h s >>= \case - Right () -> pure () - Left e -> do - putStrLn $ "send error: " <> show tag <> ", " <> show e - atomically $ forM_ responseVar_ $ \v -> putTMVar v $ Left $ PCETransportError e + sendPending (r, s) = case r of + Nothing -> void $ tPutLog h s + Just Request {pending, responseVar} -> + whenM (readTVarIO pending) $ tPutLog h s >>= either responseErr pure + where + responseErr = atomically . putTMVar responseVar . Left . PCETransportError receive :: Transport c => ProtocolClient v err msg -> THandle v c 'TClient -> IO () receive ProtocolClient {client_ = PClient {rcvQ, lastReceived, timeoutErrorCount}} h = forever $ do @@ -593,7 +591,7 @@ getProtocolClient g transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize if remaining > 1_000_000 -- delay pings only for significant time then loop remaining else do - whenM (readTVarIO sendPings) $ void . runExceptT $ sendProtocolCommand c Nothing NoEntity (protocolPing @v @err @msg) Nothing + whenM (readTVarIO sendPings) $ void . runExceptT $ sendProtocolCommand c Nothing NoEntity (protocolPing @v @err @msg) -- sendProtocolCommand/getResponse updates counter for each command cnt <- readTVarIO timeoutErrorCount -- drop client when maxCnt of commands have timed out in sequence, but only after some time has passed after last received response @@ -715,7 +713,7 @@ createSMPQueue :: Bool -> ExceptT SMPClientError IO QueueIdsKeys createSMPQueue c (rKey, rpKey) dhKey auth subMode sndSecure = - sendSMPCommand c (Just rpKey) NoEntity (NEW rKey dhKey auth subMode sndSecure) (Just NEW_) >>= \case + sendSMPCommand c (Just rpKey) NoEntity (NEW rKey dhKey auth subMode sndSecure) >>= \case IDS qik -> pure qik r -> throwE $ unexpectedResponse r @@ -725,7 +723,7 @@ createSMPQueue c (rKey, rpKey) dhKey auth subMode sndSecure = subscribeSMPQueue :: SMPClient -> RcvPrivateAuthKey -> RecipientId -> ExceptT SMPClientError IO () subscribeSMPQueue c rpKey rId = do liftIO $ enablePings c - sendSMPCommand c (Just rpKey) rId SUB (Just SUB_) >>= \case + sendSMPCommand c (Just rpKey) rId SUB >>= \case OK -> pure () cmd@MSG {} -> liftIO $ writeSMPMessage c rId cmd r -> throwE $ unexpectedResponse r @@ -763,7 +761,7 @@ serverTransmission ProtocolClient {thParams = THandleParams {thVersion, sessionI -- https://github.covm/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#receive-a-message-from-the-queue getSMPMessage :: SMPClient -> RcvPrivateAuthKey -> RecipientId -> ExceptT SMPClientError IO (Maybe RcvMessage) getSMPMessage c rpKey rId = - sendSMPCommand c (Just rpKey) rId GET (Just GET_) >>= \case + sendSMPCommand c (Just rpKey) rId GET >>= \case OK -> pure Nothing cmd@(MSG msg) -> liftIO (writeSMPMessage c rId cmd) $> Just msg r -> throwE $ unexpectedResponse r @@ -774,7 +772,7 @@ getSMPMessage c rpKey rId = subscribeSMPQueueNotifications :: SMPClient -> NtfPrivateAuthKey -> NotifierId -> ExceptT SMPClientError IO () subscribeSMPQueueNotifications c npKey nId = do liftIO $ enablePings c - okSMPCommand NSUB (Just NSUB_) c npKey nId + okSMPCommand NSUB c npKey nId {-# INLINE subscribeSMPQueueNotifications #-} -- | Subscribe to multiple SMP queues notifications batching commands if supported. @@ -792,12 +790,12 @@ enablePings ProtocolClient {client_ = PClient {sendPings}} = atomically $ writeT -- -- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#secure-queue-command secureSMPQueue :: SMPClient -> RcvPrivateAuthKey -> RecipientId -> SndPublicAuthKey -> ExceptT SMPClientError IO () -secureSMPQueue c rpKey rId senderKey = okSMPCommand (KEY senderKey) (Just KEY_) c rpKey rId +secureSMPQueue c rpKey rId senderKey = okSMPCommand (KEY senderKey) c rpKey rId {-# INLINE secureSMPQueue #-} -- | Secure the SMP queue via sender queue ID. secureSndSMPQueue :: SMPClient -> SndPrivateAuthKey -> SenderId -> SndPublicAuthKey -> ExceptT SMPClientError IO () -secureSndSMPQueue c spKey sId senderKey = okSMPCommand (SKEY senderKey) (Just SKEY_) c spKey sId +secureSndSMPQueue c spKey sId senderKey = okSMPCommand (SKEY senderKey) c spKey sId {-# INLINE secureSndSMPQueue #-} proxySecureSndSMPQueue :: SMPClient -> ProxiedRelay -> SndPrivateAuthKey -> SenderId -> SndPublicAuthKey -> ExceptT SMPClientError IO (Either ProxyClientError ()) @@ -809,7 +807,7 @@ proxySecureSndSMPQueue c proxiedRelay spKey sId senderKey = proxySMPCommand c pr -- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#enable-notifications-command enableSMPQueueNotifications :: SMPClient -> RcvPrivateAuthKey -> RecipientId -> NtfPublicAuthKey -> RcvNtfPublicDhKey -> ExceptT SMPClientError IO (NotifierId, RcvNtfPublicDhKey) enableSMPQueueNotifications c rpKey rId notifierKey rcvNtfPublicDhKey = - sendSMPCommand c (Just rpKey) rId (NKEY notifierKey rcvNtfPublicDhKey) (Just NKEY_) >>= \case + sendSMPCommand c (Just rpKey) rId (NKEY notifierKey rcvNtfPublicDhKey) >>= \case NID nId rcvNtfSrvPublicDhKey -> pure (nId, rcvNtfSrvPublicDhKey) r -> throwE $ unexpectedResponse r @@ -827,7 +825,7 @@ enableSMPQueuesNtfs c qs = L.map process <$> sendProtocolCommands c cs -- -- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#disable-notifications-command disableSMPQueueNotifications :: SMPClient -> RcvPrivateAuthKey -> RecipientId -> ExceptT SMPClientError IO () -disableSMPQueueNotifications = okSMPCommand NDEL (Just NDEL_) +disableSMPQueueNotifications = okSMPCommand NDEL {-# INLINE disableSMPQueueNotifications #-} -- | Disable notifications for multiple queues for push notifications server. @@ -840,7 +838,7 @@ disableSMPQueuesNtfs = okSMPCommands NDEL -- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#send-message sendSMPMessage :: SMPClient -> Maybe SndPrivateAuthKey -> SenderId -> MsgFlags -> MsgBody -> ExceptT SMPClientError IO () sendSMPMessage c spKey sId flags msg = - sendSMPCommand c spKey sId (SEND flags msg) (Just SEND_) >>= \case + sendSMPCommand c spKey sId (SEND flags msg) >>= \case OK -> pure () r -> throwE $ unexpectedResponse r @@ -852,7 +850,7 @@ proxySMPMessage c proxiedRelay spKey sId flags msg = proxySMPCommand c proxiedRe -- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#acknowledge-message-delivery ackSMPMessage :: SMPClient -> RcvPrivateAuthKey -> QueueId -> MsgId -> ExceptT SMPClientError IO () ackSMPMessage c rpKey rId msgId = - sendSMPCommand c (Just rpKey) rId (ACK msgId) (Just ACK_) >>= \case + sendSMPCommand c (Just rpKey) rId (ACK msgId) >>= \case OK -> return () cmd@MSG {} -> liftIO $ writeSMPMessage c rId cmd r -> throwE $ unexpectedResponse r @@ -862,14 +860,14 @@ ackSMPMessage c rpKey rId msgId = -- -- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#suspend-queue suspendSMPQueue :: SMPClient -> RcvPrivateAuthKey -> QueueId -> ExceptT SMPClientError IO () -suspendSMPQueue = okSMPCommand OFF (Just OFF_) +suspendSMPQueue = okSMPCommand OFF {-# INLINE suspendSMPQueue #-} -- | Irreversibly delete SMP queue and all messages in it. -- -- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#delete-queue deleteSMPQueue :: SMPClient -> RcvPrivateAuthKey -> RecipientId -> ExceptT SMPClientError IO () -deleteSMPQueue = okSMPCommand DEL (Just DEL_) +deleteSMPQueue = okSMPCommand DEL {-# INLINE deleteSMPQueue #-} -- | Delete multiple SMP queues batching commands if supported. @@ -882,7 +880,7 @@ deleteSMPQueues = okSMPCommands DEL connectSMPProxiedRelay :: SMPClient -> SMPServer -> Maybe BasicAuth -> ExceptT SMPClientError IO ProxiedRelay connectSMPProxiedRelay c@ProtocolClient {client_ = PClient {tcpConnectTimeout, tcpTimeout}} relayServ@ProtocolServer {keyHash = C.KeyHash kh} proxyAuth | thVersion (thParams c) >= sendingProxySMPVersion = - sendProtocolCommand_ c Nothing tOut Nothing NoEntity (Cmd SProxiedClient (PRXY relayServ proxyAuth)) (Just $ CT SProxiedClient PRXY_) >>= \case + sendProtocolCommand_ c Nothing tOut Nothing NoEntity (Cmd SProxiedClient (PRXY relayServ proxyAuth)) >>= \case PKEY sId vr (chain, key) -> case supportedClientSMPRelayVRange `compatibleVersion` vr of Nothing -> throwE $ transportErr TEVersion @@ -984,7 +982,7 @@ proxySMPCommand c@ProtocolClient {thParams = proxyThParams, client_ = PClient {c et <- liftEitherWith PCECryptoError $ EncTransmission <$> C.cbEncrypt cmdSecret nonce b paddedProxiedTLength -- proxy interaction errors are wrapped let tOut = Just $ 2 * tcpTimeout - tryE (sendProtocolCommand_ c (Just nonce) tOut Nothing (EntityId sessionId) (Cmd SProxiedClient (PFWD v cmdPubKey et)) (Just $ CT SProxiedClient PFWD_)) >>= \case + tryE (sendProtocolCommand_ c (Just nonce) tOut Nothing (EntityId sessionId) (Cmd SProxiedClient (PFWD v cmdPubKey et))) >>= \case Right r -> case r of PRES (EncResponse er) -> do -- server interaction errors are thrown directly @@ -1020,7 +1018,7 @@ forwardSMPTransmission c@ProtocolClient {thParams, client_ = PClient {clientCorr let fwdT = FwdTransmission {fwdCorrId, fwdVersion, fwdKey, fwdTransmission} eft = EncFwdTransmission $ C.cbEncryptNoPad sessSecret nonce (smpEncode fwdT) -- send - sendProtocolCommand_ c (Just nonce) Nothing Nothing NoEntity (Cmd SSender (RFWD eft)) (Just $ CT SSender RFWD_) >>= \case + sendProtocolCommand_ c (Just nonce) Nothing Nothing NoEntity (Cmd SSender (RFWD eft)) >>= \case RRES (EncFwdResponse efr) -> do -- unwrap r' <- liftEitherWith PCECryptoError $ C.cbDecryptNoPad sessSecret (C.reverseNonce nonce) efr @@ -1030,13 +1028,13 @@ forwardSMPTransmission c@ProtocolClient {thParams, client_ = PClient {clientCorr getSMPQueueInfo :: SMPClient -> C.APrivateAuthKey -> QueueId -> ExceptT SMPClientError IO QueueInfo getSMPQueueInfo c pKey qId = - sendSMPCommand c (Just pKey) qId QUE (Just QUE_) >>= \case + sendSMPCommand c (Just pKey) qId QUE >>= \case INFO info -> pure info r -> throwE $ unexpectedResponse r -okSMPCommand :: PartyI p => Command p -> Maybe (CommandTag p) -> SMPClient -> C.APrivateAuthKey -> QueueId -> ExceptT SMPClientError IO () -okSMPCommand cmd cmdTag c pKey qId = - sendSMPCommand c (Just pKey) qId cmd cmdTag >>= \case +okSMPCommand :: PartyI p => Command p -> SMPClient -> C.APrivateAuthKey -> QueueId -> ExceptT SMPClientError IO () +okSMPCommand cmd c pKey qId = + sendSMPCommand c (Just pKey) qId cmd >>= \case OK -> return () r -> throwE $ unexpectedResponse r @@ -1051,8 +1049,8 @@ okSMPCommands cmd c qs = L.map process <$> sendProtocolCommands c cs Left e -> Left e -- | Send SMP command -sendSMPCommand :: PartyI p => SMPClient -> Maybe C.APrivateAuthKey -> QueueId -> Command p -> Maybe (CommandTag p) -> ExceptT SMPClientError IO BrokerMsg -sendSMPCommand c pKey qId cmd cmdTag = sendProtocolCommand c pKey qId (Cmd sParty cmd) (CT sParty <$> cmdTag) +sendSMPCommand :: PartyI p => SMPClient -> Maybe C.APrivateAuthKey -> QueueId -> Command p -> ExceptT SMPClientError IO BrokerMsg +sendSMPCommand c pKey qId cmd = sendProtocolCommand c pKey qId (Cmd sParty cmd) {-# INLINE sendSMPCommand #-} type PCTransmission err msg = (Either TransportError SentRawTransmission, Request err msg) @@ -1088,23 +1086,23 @@ sendBatch c@ProtocolClient {client_ = PClient {sndQ}} b = do pure [Response entityId $ Left $ PCETransportError e] TBTransmissions s n rs | n > 0 -> do - atomically $ writeTBQueue sndQ (Nothing, s, Nothing) -- do not expire batched responses + atomically $ writeTBQueue sndQ (Nothing, s) -- do not expire batched responses mapConcurrently (getResponse c Nothing) rs | otherwise -> pure [] TBTransmission s r -> do - atomically $ writeTBQueue sndQ (Nothing, s, Nothing) + atomically $ writeTBQueue sndQ (Nothing, s) (: []) <$> getResponse c Nothing r -- | Send Protocol command -sendProtocolCommand :: forall v err msg. Protocol v err msg => ProtocolClient v err msg -> Maybe C.APrivateAuthKey -> EntityId -> ProtoCommand msg -> Maybe (Tag (ProtoCommand msg)) -> ExceptT (ProtocolClientError err) IO msg +sendProtocolCommand :: forall v err msg. Protocol v err msg => ProtocolClient v err msg -> Maybe C.APrivateAuthKey -> EntityId -> ProtoCommand msg -> ExceptT (ProtocolClientError err) IO msg sendProtocolCommand c = sendProtocolCommand_ c Nothing Nothing -- Currently there is coupling - batch commands do not expire, and individually sent commands do. -- This is to reflect the fact that we send subscriptions only as batches, and also because we do not track a separate timeout for the whole batch, so it is not obvious when should we expire it. -- We could expire a batch of deletes, for example, either when the first response expires or when the last one does. -- But a better solution is to process delayed delete responses. -sendProtocolCommand_ :: forall v err msg. Protocol v err msg => ProtocolClient v err msg -> Maybe C.CbNonce -> Maybe Int -> Maybe C.APrivateAuthKey -> EntityId -> ProtoCommand msg -> Maybe (Tag (ProtoCommand msg)) -> ExceptT (ProtocolClientError err) IO msg -sendProtocolCommand_ c@ProtocolClient {client_ = PClient {sndQ}, thParams = THandleParams {batch, blockSize}} nonce_ tOut pKey entId cmd cmdTag = +sendProtocolCommand_ :: forall v err msg. Protocol v err msg => ProtocolClient v err msg -> Maybe C.CbNonce -> Maybe Int -> Maybe C.APrivateAuthKey -> EntityId -> ProtoCommand msg -> ExceptT (ProtocolClientError err) IO msg +sendProtocolCommand_ c@ProtocolClient {client_ = PClient {sndQ}, thParams = THandleParams {batch, blockSize}} nonce_ tOut pKey entId cmd = ExceptT $ uncurry sendRecv =<< mkTransmission_ c nonce_ (pKey, entId, cmd) where -- two separate "atomically" needed to avoid blocking @@ -1114,7 +1112,7 @@ sendProtocolCommand_ c@ProtocolClient {client_ = PClient {sndQ}, thParams = THan Right t | B.length s > blockSize - 2 -> pure . Left $ PCETransportError TELargeMsg | otherwise -> do - atomically $ writeTBQueue sndQ (Just r, s, cmdTag) + atomically $ writeTBQueue sndQ (Just r, s) response <$> getResponse c tOut r where s diff --git a/src/Simplex/Messaging/Notifications/Client.hs b/src/Simplex/Messaging/Notifications/Client.hs index af80474a2..273010c2c 100644 --- a/src/Simplex/Messaging/Notifications/Client.hs +++ b/src/Simplex/Messaging/Notifications/Client.hs @@ -87,7 +87,7 @@ ntfDeleteSubscription = okNtfCommand SDEL -- | Send notification server command sendNtfCommand :: NtfEntityI e => NtfClient -> Maybe C.APrivateAuthKey -> NtfEntityId -> NtfCommand e -> ExceptT NtfClientError IO NtfResponse -sendNtfCommand c pKey entId cmd = sendProtocolCommand c pKey entId (NtfCmd sNtfEntity cmd) Nothing +sendNtfCommand c pKey entId cmd = sendProtocolCommand c pKey entId (NtfCmd sNtfEntity cmd) okNtfCommand :: NtfEntityI e => NtfCommand e -> NtfClient -> C.APrivateAuthKey -> NtfEntityId -> ExceptT NtfClientError IO () okNtfCommand cmd c pKey entId = diff --git a/src/Simplex/Messaging/Protocol.hs b/src/Simplex/Messaging/Protocol.hs index 2d54629b4..43e032d0b 100644 --- a/src/Simplex/Messaging/Protocol.hs +++ b/src/Simplex/Messaging/Protocol.hs @@ -1359,7 +1359,7 @@ instance Protocol SMPVersion ErrorType BrokerMsg where ERR e -> Just e _ -> Nothing -class (ProtocolMsgTag (Tag msg), Show (Tag msg)) => ProtocolEncoding v err msg | msg -> err, msg -> v where +class ProtocolMsgTag (Tag msg) => ProtocolEncoding v err msg | msg -> err, msg -> v where type Tag msg encodeProtocol :: Version v -> msg -> ByteString protocolP :: Version v -> Tag msg -> Parser msg