Skip to content

Commit

Permalink
remove logging tag
Browse files Browse the repository at this point in the history
  • Loading branch information
epoberezkin committed Jan 18, 2025
1 parent 15513dc commit 61ca96d
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 39 deletions.
72 changes: 35 additions & 37 deletions src/Simplex/Messaging/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ data PClient v err msg = PClient
timeoutErrorCount :: TVar Int,
clientCorrId :: TVar ChaChaDRG,
sentCommands :: TMap CorrId (Request err msg),
sndQ :: TBQueue (Maybe (Request err msg), ByteString, Maybe (Tag (ProtoCommand msg))),
sndQ :: TBQueue (Maybe (Request err msg), ByteString),
rcvQ :: TBQueue (NonEmpty (SignedTransmission err msg)),
msgQ :: Maybe (TBQueue (ServerTransmissionBatch v err msg))
}
Expand Down Expand Up @@ -566,14 +566,12 @@ getProtocolClient g transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize
send :: Transport c => ProtocolClient v err msg -> THandle v c 'TClient -> IO ()
send ProtocolClient {client_ = PClient {sndQ}} h = forever $ atomically (readTBQueue sndQ) >>= sendPending
where
sendPending (Nothing, s, tag) = send_ s tag Nothing
sendPending (Just Request {pending, responseVar}, s, tag) = whenM (readTVarIO pending) $ send_ s tag (Just responseVar)
send_ s tag responseVar_ =
tPutLog h s >>= \case
Right () -> pure ()
Left e -> do
putStrLn $ "send error: " <> show tag <> ", " <> show e
atomically $ forM_ responseVar_ $ \v -> putTMVar v $ Left $ PCETransportError e
sendPending (r, s) = case r of
Nothing -> void $ tPutLog h s
Just Request {pending, responseVar} ->
whenM (readTVarIO pending) $ tPutLog h s >>= either responseErr pure
where
responseErr = atomically . putTMVar responseVar . Left . PCETransportError

receive :: Transport c => ProtocolClient v err msg -> THandle v c 'TClient -> IO ()
receive ProtocolClient {client_ = PClient {rcvQ, lastReceived, timeoutErrorCount}} h = forever $ do
Expand All @@ -593,7 +591,7 @@ getProtocolClient g transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize
if remaining > 1_000_000 -- delay pings only for significant time
then loop remaining
else do
whenM (readTVarIO sendPings) $ void . runExceptT $ sendProtocolCommand c Nothing NoEntity (protocolPing @v @err @msg) Nothing
whenM (readTVarIO sendPings) $ void . runExceptT $ sendProtocolCommand c Nothing NoEntity (protocolPing @v @err @msg)
-- sendProtocolCommand/getResponse updates counter for each command
cnt <- readTVarIO timeoutErrorCount
-- drop client when maxCnt of commands have timed out in sequence, but only after some time has passed after last received response
Expand Down Expand Up @@ -715,7 +713,7 @@ createSMPQueue ::
Bool ->
ExceptT SMPClientError IO QueueIdsKeys
createSMPQueue c (rKey, rpKey) dhKey auth subMode sndSecure =
sendSMPCommand c (Just rpKey) NoEntity (NEW rKey dhKey auth subMode sndSecure) (Just NEW_) >>= \case
sendSMPCommand c (Just rpKey) NoEntity (NEW rKey dhKey auth subMode sndSecure) >>= \case
IDS qik -> pure qik
r -> throwE $ unexpectedResponse r

Expand All @@ -725,7 +723,7 @@ createSMPQueue c (rKey, rpKey) dhKey auth subMode sndSecure =
subscribeSMPQueue :: SMPClient -> RcvPrivateAuthKey -> RecipientId -> ExceptT SMPClientError IO ()
subscribeSMPQueue c rpKey rId = do
liftIO $ enablePings c
sendSMPCommand c (Just rpKey) rId SUB (Just SUB_) >>= \case
sendSMPCommand c (Just rpKey) rId SUB >>= \case
OK -> pure ()
cmd@MSG {} -> liftIO $ writeSMPMessage c rId cmd
r -> throwE $ unexpectedResponse r
Expand Down Expand Up @@ -763,7 +761,7 @@ serverTransmission ProtocolClient {thParams = THandleParams {thVersion, sessionI
-- https://github.covm/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#receive-a-message-from-the-queue
getSMPMessage :: SMPClient -> RcvPrivateAuthKey -> RecipientId -> ExceptT SMPClientError IO (Maybe RcvMessage)
getSMPMessage c rpKey rId =
sendSMPCommand c (Just rpKey) rId GET (Just GET_) >>= \case
sendSMPCommand c (Just rpKey) rId GET >>= \case
OK -> pure Nothing
cmd@(MSG msg) -> liftIO (writeSMPMessage c rId cmd) $> Just msg
r -> throwE $ unexpectedResponse r
Expand All @@ -774,7 +772,7 @@ getSMPMessage c rpKey rId =
subscribeSMPQueueNotifications :: SMPClient -> NtfPrivateAuthKey -> NotifierId -> ExceptT SMPClientError IO ()
subscribeSMPQueueNotifications c npKey nId = do
liftIO $ enablePings c
okSMPCommand NSUB (Just NSUB_) c npKey nId
okSMPCommand NSUB c npKey nId
{-# INLINE subscribeSMPQueueNotifications #-}

-- | Subscribe to multiple SMP queues notifications batching commands if supported.
Expand All @@ -792,12 +790,12 @@ enablePings ProtocolClient {client_ = PClient {sendPings}} = atomically $ writeT
--
-- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#secure-queue-command
secureSMPQueue :: SMPClient -> RcvPrivateAuthKey -> RecipientId -> SndPublicAuthKey -> ExceptT SMPClientError IO ()
secureSMPQueue c rpKey rId senderKey = okSMPCommand (KEY senderKey) (Just KEY_) c rpKey rId
secureSMPQueue c rpKey rId senderKey = okSMPCommand (KEY senderKey) c rpKey rId
{-# INLINE secureSMPQueue #-}

-- | Secure the SMP queue via sender queue ID.
secureSndSMPQueue :: SMPClient -> SndPrivateAuthKey -> SenderId -> SndPublicAuthKey -> ExceptT SMPClientError IO ()
secureSndSMPQueue c spKey sId senderKey = okSMPCommand (SKEY senderKey) (Just SKEY_) c spKey sId
secureSndSMPQueue c spKey sId senderKey = okSMPCommand (SKEY senderKey) c spKey sId
{-# INLINE secureSndSMPQueue #-}

proxySecureSndSMPQueue :: SMPClient -> ProxiedRelay -> SndPrivateAuthKey -> SenderId -> SndPublicAuthKey -> ExceptT SMPClientError IO (Either ProxyClientError ())
Expand All @@ -809,7 +807,7 @@ proxySecureSndSMPQueue c proxiedRelay spKey sId senderKey = proxySMPCommand c pr
-- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#enable-notifications-command
enableSMPQueueNotifications :: SMPClient -> RcvPrivateAuthKey -> RecipientId -> NtfPublicAuthKey -> RcvNtfPublicDhKey -> ExceptT SMPClientError IO (NotifierId, RcvNtfPublicDhKey)
enableSMPQueueNotifications c rpKey rId notifierKey rcvNtfPublicDhKey =
sendSMPCommand c (Just rpKey) rId (NKEY notifierKey rcvNtfPublicDhKey) (Just NKEY_) >>= \case
sendSMPCommand c (Just rpKey) rId (NKEY notifierKey rcvNtfPublicDhKey) >>= \case
NID nId rcvNtfSrvPublicDhKey -> pure (nId, rcvNtfSrvPublicDhKey)
r -> throwE $ unexpectedResponse r

Expand All @@ -827,7 +825,7 @@ enableSMPQueuesNtfs c qs = L.map process <$> sendProtocolCommands c cs
--
-- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#disable-notifications-command
disableSMPQueueNotifications :: SMPClient -> RcvPrivateAuthKey -> RecipientId -> ExceptT SMPClientError IO ()
disableSMPQueueNotifications = okSMPCommand NDEL (Just NDEL_)
disableSMPQueueNotifications = okSMPCommand NDEL
{-# INLINE disableSMPQueueNotifications #-}

-- | Disable notifications for multiple queues for push notifications server.
Expand All @@ -840,7 +838,7 @@ disableSMPQueuesNtfs = okSMPCommands NDEL
-- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#send-message
sendSMPMessage :: SMPClient -> Maybe SndPrivateAuthKey -> SenderId -> MsgFlags -> MsgBody -> ExceptT SMPClientError IO ()
sendSMPMessage c spKey sId flags msg =
sendSMPCommand c spKey sId (SEND flags msg) (Just SEND_) >>= \case
sendSMPCommand c spKey sId (SEND flags msg) >>= \case
OK -> pure ()
r -> throwE $ unexpectedResponse r

Expand All @@ -852,7 +850,7 @@ proxySMPMessage c proxiedRelay spKey sId flags msg = proxySMPCommand c proxiedRe
-- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#acknowledge-message-delivery
ackSMPMessage :: SMPClient -> RcvPrivateAuthKey -> QueueId -> MsgId -> ExceptT SMPClientError IO ()
ackSMPMessage c rpKey rId msgId =
sendSMPCommand c (Just rpKey) rId (ACK msgId) (Just ACK_) >>= \case
sendSMPCommand c (Just rpKey) rId (ACK msgId) >>= \case
OK -> return ()
cmd@MSG {} -> liftIO $ writeSMPMessage c rId cmd
r -> throwE $ unexpectedResponse r
Expand All @@ -862,14 +860,14 @@ ackSMPMessage c rpKey rId msgId =
--
-- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#suspend-queue
suspendSMPQueue :: SMPClient -> RcvPrivateAuthKey -> QueueId -> ExceptT SMPClientError IO ()
suspendSMPQueue = okSMPCommand OFF (Just OFF_)
suspendSMPQueue = okSMPCommand OFF
{-# INLINE suspendSMPQueue #-}

-- | Irreversibly delete SMP queue and all messages in it.
--
-- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#delete-queue
deleteSMPQueue :: SMPClient -> RcvPrivateAuthKey -> RecipientId -> ExceptT SMPClientError IO ()
deleteSMPQueue = okSMPCommand DEL (Just DEL_)
deleteSMPQueue = okSMPCommand DEL
{-# INLINE deleteSMPQueue #-}

-- | Delete multiple SMP queues batching commands if supported.
Expand All @@ -882,7 +880,7 @@ deleteSMPQueues = okSMPCommands DEL
connectSMPProxiedRelay :: SMPClient -> SMPServer -> Maybe BasicAuth -> ExceptT SMPClientError IO ProxiedRelay
connectSMPProxiedRelay c@ProtocolClient {client_ = PClient {tcpConnectTimeout, tcpTimeout}} relayServ@ProtocolServer {keyHash = C.KeyHash kh} proxyAuth
| thVersion (thParams c) >= sendingProxySMPVersion =
sendProtocolCommand_ c Nothing tOut Nothing NoEntity (Cmd SProxiedClient (PRXY relayServ proxyAuth)) (Just $ CT SProxiedClient PRXY_) >>= \case
sendProtocolCommand_ c Nothing tOut Nothing NoEntity (Cmd SProxiedClient (PRXY relayServ proxyAuth)) >>= \case
PKEY sId vr (chain, key) ->
case supportedClientSMPRelayVRange `compatibleVersion` vr of
Nothing -> throwE $ transportErr TEVersion
Expand Down Expand Up @@ -984,7 +982,7 @@ proxySMPCommand c@ProtocolClient {thParams = proxyThParams, client_ = PClient {c
et <- liftEitherWith PCECryptoError $ EncTransmission <$> C.cbEncrypt cmdSecret nonce b paddedProxiedTLength
-- proxy interaction errors are wrapped
let tOut = Just $ 2 * tcpTimeout
tryE (sendProtocolCommand_ c (Just nonce) tOut Nothing (EntityId sessionId) (Cmd SProxiedClient (PFWD v cmdPubKey et)) (Just $ CT SProxiedClient PFWD_)) >>= \case
tryE (sendProtocolCommand_ c (Just nonce) tOut Nothing (EntityId sessionId) (Cmd SProxiedClient (PFWD v cmdPubKey et))) >>= \case
Right r -> case r of
PRES (EncResponse er) -> do
-- server interaction errors are thrown directly
Expand Down Expand Up @@ -1020,7 +1018,7 @@ forwardSMPTransmission c@ProtocolClient {thParams, client_ = PClient {clientCorr
let fwdT = FwdTransmission {fwdCorrId, fwdVersion, fwdKey, fwdTransmission}
eft = EncFwdTransmission $ C.cbEncryptNoPad sessSecret nonce (smpEncode fwdT)
-- send
sendProtocolCommand_ c (Just nonce) Nothing Nothing NoEntity (Cmd SSender (RFWD eft)) (Just $ CT SSender RFWD_) >>= \case
sendProtocolCommand_ c (Just nonce) Nothing Nothing NoEntity (Cmd SSender (RFWD eft)) >>= \case
RRES (EncFwdResponse efr) -> do
-- unwrap
r' <- liftEitherWith PCECryptoError $ C.cbDecryptNoPad sessSecret (C.reverseNonce nonce) efr
Expand All @@ -1030,13 +1028,13 @@ forwardSMPTransmission c@ProtocolClient {thParams, client_ = PClient {clientCorr

getSMPQueueInfo :: SMPClient -> C.APrivateAuthKey -> QueueId -> ExceptT SMPClientError IO QueueInfo
getSMPQueueInfo c pKey qId =
sendSMPCommand c (Just pKey) qId QUE (Just QUE_) >>= \case
sendSMPCommand c (Just pKey) qId QUE >>= \case
INFO info -> pure info
r -> throwE $ unexpectedResponse r

okSMPCommand :: PartyI p => Command p -> Maybe (CommandTag p) -> SMPClient -> C.APrivateAuthKey -> QueueId -> ExceptT SMPClientError IO ()
okSMPCommand cmd cmdTag c pKey qId =
sendSMPCommand c (Just pKey) qId cmd cmdTag >>= \case
okSMPCommand :: PartyI p => Command p -> SMPClient -> C.APrivateAuthKey -> QueueId -> ExceptT SMPClientError IO ()
okSMPCommand cmd c pKey qId =
sendSMPCommand c (Just pKey) qId cmd >>= \case
OK -> return ()
r -> throwE $ unexpectedResponse r

Expand All @@ -1051,8 +1049,8 @@ okSMPCommands cmd c qs = L.map process <$> sendProtocolCommands c cs
Left e -> Left e

-- | Send SMP command
sendSMPCommand :: PartyI p => SMPClient -> Maybe C.APrivateAuthKey -> QueueId -> Command p -> Maybe (CommandTag p) -> ExceptT SMPClientError IO BrokerMsg
sendSMPCommand c pKey qId cmd cmdTag = sendProtocolCommand c pKey qId (Cmd sParty cmd) (CT sParty <$> cmdTag)
sendSMPCommand :: PartyI p => SMPClient -> Maybe C.APrivateAuthKey -> QueueId -> Command p -> ExceptT SMPClientError IO BrokerMsg
sendSMPCommand c pKey qId cmd = sendProtocolCommand c pKey qId (Cmd sParty cmd)
{-# INLINE sendSMPCommand #-}

type PCTransmission err msg = (Either TransportError SentRawTransmission, Request err msg)
Expand Down Expand Up @@ -1088,23 +1086,23 @@ sendBatch c@ProtocolClient {client_ = PClient {sndQ}} b = do
pure [Response entityId $ Left $ PCETransportError e]
TBTransmissions s n rs
| n > 0 -> do
atomically $ writeTBQueue sndQ (Nothing, s, Nothing) -- do not expire batched responses
atomically $ writeTBQueue sndQ (Nothing, s) -- do not expire batched responses
mapConcurrently (getResponse c Nothing) rs
| otherwise -> pure []
TBTransmission s r -> do
atomically $ writeTBQueue sndQ (Nothing, s, Nothing)
atomically $ writeTBQueue sndQ (Nothing, s)
(: []) <$> getResponse c Nothing r

-- | Send Protocol command
sendProtocolCommand :: forall v err msg. Protocol v err msg => ProtocolClient v err msg -> Maybe C.APrivateAuthKey -> EntityId -> ProtoCommand msg -> Maybe (Tag (ProtoCommand msg)) -> ExceptT (ProtocolClientError err) IO msg
sendProtocolCommand :: forall v err msg. Protocol v err msg => ProtocolClient v err msg -> Maybe C.APrivateAuthKey -> EntityId -> ProtoCommand msg -> ExceptT (ProtocolClientError err) IO msg
sendProtocolCommand c = sendProtocolCommand_ c Nothing Nothing

-- Currently there is coupling - batch commands do not expire, and individually sent commands do.
-- This is to reflect the fact that we send subscriptions only as batches, and also because we do not track a separate timeout for the whole batch, so it is not obvious when should we expire it.
-- We could expire a batch of deletes, for example, either when the first response expires or when the last one does.
-- But a better solution is to process delayed delete responses.
sendProtocolCommand_ :: forall v err msg. Protocol v err msg => ProtocolClient v err msg -> Maybe C.CbNonce -> Maybe Int -> Maybe C.APrivateAuthKey -> EntityId -> ProtoCommand msg -> Maybe (Tag (ProtoCommand msg)) -> ExceptT (ProtocolClientError err) IO msg
sendProtocolCommand_ c@ProtocolClient {client_ = PClient {sndQ}, thParams = THandleParams {batch, blockSize}} nonce_ tOut pKey entId cmd cmdTag =
sendProtocolCommand_ :: forall v err msg. Protocol v err msg => ProtocolClient v err msg -> Maybe C.CbNonce -> Maybe Int -> Maybe C.APrivateAuthKey -> EntityId -> ProtoCommand msg -> ExceptT (ProtocolClientError err) IO msg
sendProtocolCommand_ c@ProtocolClient {client_ = PClient {sndQ}, thParams = THandleParams {batch, blockSize}} nonce_ tOut pKey entId cmd =
ExceptT $ uncurry sendRecv =<< mkTransmission_ c nonce_ (pKey, entId, cmd)
where
-- two separate "atomically" needed to avoid blocking
Expand All @@ -1114,7 +1112,7 @@ sendProtocolCommand_ c@ProtocolClient {client_ = PClient {sndQ}, thParams = THan
Right t
| B.length s > blockSize - 2 -> pure . Left $ PCETransportError TELargeMsg
| otherwise -> do
atomically $ writeTBQueue sndQ (Just r, s, cmdTag)
atomically $ writeTBQueue sndQ (Just r, s)
response <$> getResponse c tOut r
where
s
Expand Down
2 changes: 1 addition & 1 deletion src/Simplex/Messaging/Notifications/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ ntfDeleteSubscription = okNtfCommand SDEL

-- | Send notification server command
sendNtfCommand :: NtfEntityI e => NtfClient -> Maybe C.APrivateAuthKey -> NtfEntityId -> NtfCommand e -> ExceptT NtfClientError IO NtfResponse
sendNtfCommand c pKey entId cmd = sendProtocolCommand c pKey entId (NtfCmd sNtfEntity cmd) Nothing
sendNtfCommand c pKey entId cmd = sendProtocolCommand c pKey entId (NtfCmd sNtfEntity cmd)

okNtfCommand :: NtfEntityI e => NtfCommand e -> NtfClient -> C.APrivateAuthKey -> NtfEntityId -> ExceptT NtfClientError IO ()
okNtfCommand cmd c pKey entId =
Expand Down
2 changes: 1 addition & 1 deletion src/Simplex/Messaging/Protocol.hs
Original file line number Diff line number Diff line change
Expand Up @@ -1359,7 +1359,7 @@ instance Protocol SMPVersion ErrorType BrokerMsg where
ERR e -> Just e
_ -> Nothing

class (ProtocolMsgTag (Tag msg), Show (Tag msg)) => ProtocolEncoding v err msg | msg -> err, msg -> v where
class ProtocolMsgTag (Tag msg) => ProtocolEncoding v err msg | msg -> err, msg -> v where
type Tag msg
encodeProtocol :: Version v -> msg -> ByteString
protocolP :: Version v -> Tag msg -> Parser msg
Expand Down

0 comments on commit 61ca96d

Please sign in to comment.