Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

agent: combine connection deletion events #1442

Merged
merged 1 commit into from
Jan 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions src/Simplex/Messaging/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}

Check warning on line 18 in src/Simplex/Messaging/Agent.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-8.10.7

unrecognised warning flag: -fno-warn-ambiguous-fields

-- |
-- Module : Simplex.Messaging.Agent
Expand Down Expand Up @@ -1805,8 +1805,8 @@
-- ! if it was used to notify about the result, it might be necessary to differentiate
-- ! between completed deletions of connections, and deletions delayed due to wait for delivery (see deleteConn)
deliveryTimeout <- if waitDelivery then asks (Just . connDeleteDeliveryTimeout . config) else pure Nothing
rs' <- lift $ catMaybes . rights <$> withStoreBatch' c (\db -> map (deleteConn db deliveryTimeout) (M.keys delRs))
forM_ rs' $ \cId -> notify ("", cId, AEvt SAEConn DEL_CONN)
cIds_ <- lift $ L.nonEmpty . catMaybes . rights <$> withStoreBatch' c (\db -> map (deleteConn db deliveryTimeout) (M.keys delRs))
forM_ cIds_ $ \cIds -> notify ("", "", AEvt SAEConn $ DEL_CONNS cIds)
pure (errs' <> delRs, rqs, connIds')
where
rcvQueues :: SomeConn -> Either (Either AgentErrorType ()) [RcvQueue]
Expand All @@ -1826,32 +1826,33 @@
rs <- connResults <$> (deleteQueueRecs =<< deleteQueues c rqs)
let connIds = M.keys $ M.filter isRight rs
deliveryTimeout <- if waitDelivery then asks (Just . connDeleteDeliveryTimeout . config) else pure Nothing
rs' <- catMaybes . rights <$> withStoreBatch' c (\db -> map (deleteConn db deliveryTimeout) connIds)
forM_ rs' $ \cId -> notify ("", cId, AEvt SAEConn DEL_CONN)
cIds_ <- L.nonEmpty . catMaybes . rights <$> withStoreBatch' c (\db -> map (deleteConn db deliveryTimeout) connIds)
forM_ cIds_ $ \cIds -> notify ("", "", AEvt SAEConn $ DEL_CONNS cIds)
pure rs
where
deleteQueueRecs :: [(RcvQueue, Either AgentErrorType ())] -> AM' [(RcvQueue, Either AgentErrorType ())]
deleteQueueRecs rs = do
maxErrs <- asks $ deleteErrorCount . config
(rs', notifyActions) <- unzip . rights <$> withStoreBatch' c (\db -> map (deleteQueueRec db maxErrs) rs)
mapM_ sequence_ notifyActions
pure rs'
rs' <- rights <$> withStoreBatch' c (\db -> map (deleteQueueRec db maxErrs) rs)
let delQ ((rq, _), err_) = (qConnId rq,qServer rq,queueId rq,) <$> err_
delQs_ = L.nonEmpty $ mapMaybe delQ rs'
forM_ delQs_ $ \delQs -> notify ("", "", AEvt SAEConn $ DEL_RCVQS delQs)
pure $ map fst rs'
where
deleteQueueRec ::
DB.Connection ->
Int ->
(RcvQueue, Either AgentErrorType ()) ->
IO ((RcvQueue, Either AgentErrorType ()), Maybe (AM' ()))
IO ((RcvQueue, Either AgentErrorType ()), Maybe (Maybe AgentErrorType)) -- Nothing - no event, Just Nothing - no error
deleteQueueRec db maxErrs (rq@RcvQueue {userId, server}, r) = case r of
Right _ -> deleteConnRcvQueue db rq $> ((rq, r), Just (notifyRQ rq Nothing))
Right _ -> deleteConnRcvQueue db rq $> ((rq, r), Just Nothing)
Left e
| temporaryOrHostError e && deleteErrors rq + 1 < maxErrs -> incRcvDeleteErrors db rq $> ((rq, r), Nothing)
| otherwise -> do
deleteConnRcvQueue db rq
-- attempts and successes are counted in deleteQueues function
atomically $ incSMPServerStat c userId server connDeleted
pure ((rq, Right ()), Just (notifyRQ rq (Just e)))
notifyRQ rq e_ = notify ("", qConnId rq, AEvt SAEConn $ DEL_RCVQ (qServer rq) (queueId rq) e_)
pure ((rq, Right ()), Just (Just e))
notify = when ntf . atomically . writeTBQueue (subQ c)
connResults :: [(RcvQueue, Either AgentErrorType ())] -> Map ConnId (Either AgentErrorType ())
connResults = M.map snd . foldl' addResult M.empty
Expand Down
25 changes: 9 additions & 16 deletions src/Simplex/Messaging/Agent/Protocol.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DuplicateRecordFields #-}
Expand Down Expand Up @@ -168,12 +167,13 @@ import Data.Time.Clock.System (SystemTime)
import Data.Type.Equality
import Data.Typeable ()
import Data.Word (Word16, Word32)
import Database.SQLite.Simple.FromField
import Database.SQLite.Simple.ToField
import Simplex.FileTransfer.Description
import Simplex.FileTransfer.Protocol (FileParty (..))
import Simplex.FileTransfer.Transport (XFTPErrorType)
import Simplex.FileTransfer.Types (FileErrorType)
import Simplex.Messaging.Agent.QueryString
import Simplex.Messaging.Agent.Store.DB (Binary (..))
import Simplex.Messaging.Client (ProxyClientError)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Crypto.Ratchet
Expand Down Expand Up @@ -224,13 +224,6 @@ import Simplex.Messaging.Version
import Simplex.Messaging.Version.Internal
import Simplex.RemoteControl.Types
import UnliftIO.Exception (Exception)
#if defined(dbPostgres)
import Database.PostgreSQL.Simple.FromField (FromField (..))
import Database.PostgreSQL.Simple.ToField (ToField (..))
#else
import Database.SQLite.Simple.FromField (FromField (..))
import Database.SQLite.Simple.ToField (ToField (..))
#endif

-- SMP agent protocol version history:
-- 1 - binary protocol encoding (1/1/2022)
Expand Down Expand Up @@ -366,8 +359,8 @@ data AEvent (e :: AEntity) where
MSGNTF :: MsgId -> Maybe UTCTime -> AEvent AEConn
RCVD :: MsgMeta -> NonEmpty MsgReceipt -> AEvent AEConn
QCONT :: AEvent AEConn
DEL_RCVQ :: SMPServer -> SMP.RecipientId -> Maybe AgentErrorType -> AEvent AEConn
DEL_CONN :: AEvent AEConn
DEL_RCVQS :: NonEmpty (ConnId, SMPServer, SMP.RecipientId, Maybe AgentErrorType) -> AEvent AEConn
DEL_CONNS :: NonEmpty ConnId -> AEvent AEConn
DEL_USER :: Int64 -> AEvent AENone
STAT :: ConnectionStats -> AEvent AEConn
OK :: AEvent AEConn
Expand Down Expand Up @@ -437,8 +430,8 @@ data AEventTag (e :: AEntity) where
MSGNTF_ :: AEventTag AEConn
RCVD_ :: AEventTag AEConn
QCONT_ :: AEventTag AEConn
DEL_RCVQ_ :: AEventTag AEConn
DEL_CONN_ :: AEventTag AEConn
DEL_RCVQS_ :: AEventTag AEConn
DEL_CONNS_ :: AEventTag AEConn
DEL_USER_ :: AEventTag AENone
STAT_ :: AEventTag AEConn
OK_ :: AEventTag AEConn
Expand Down Expand Up @@ -492,8 +485,8 @@ aEventTag = \case
MSGNTF {} -> MSGNTF_
RCVD {} -> RCVD_
QCONT -> QCONT_
DEL_RCVQ {} -> DEL_RCVQ_
DEL_CONN -> DEL_CONN_
DEL_RCVQS _ -> DEL_RCVQS_
DEL_CONNS _ -> DEL_CONNS_
DEL_USER _ -> DEL_USER_
STAT _ -> STAT_
OK -> OK_
Expand Down Expand Up @@ -651,7 +644,7 @@ instance ToJSON NotificationsMode where
instance FromJSON NotificationsMode where
parseJSON = strParseJSON "NotificationsMode"

instance ToField NotificationsMode where toField = toField . Binary . strEncode
instance ToField NotificationsMode where toField = toField . strEncode

instance FromField NotificationsMode where fromField = blobFieldDecoder $ parseAll strP

Expand Down
50 changes: 23 additions & 27 deletions tests/AgentTests/FunctionalAPITests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2055,8 +2055,8 @@ testAsyncCommands sqSecured alice bob baseId =
ackMessageAsync alice "7" bobId (baseId + 4) Nothing
get alice =##> \case ("7", _, OK) -> True; _ -> False
deleteConnectionAsync alice False bobId
get alice =##> \case ("", c, DEL_RCVQ _ _ Nothing) -> c == bobId; _ -> False
get alice =##> \case ("", c, DEL_CONN) -> c == bobId; _ -> False
get alice =##> \case ("", "", DEL_RCVQS [(c, _, _, Nothing)]) -> c == bobId; _ -> False
get alice =##> \case ("", "", DEL_CONNS [c]) -> c == bobId; _ -> False
liftIO $ noMessages alice "nothing else should be delivered to alice"
where
msgId = subtract baseId
Expand Down Expand Up @@ -2123,12 +2123,9 @@ testDeleteConnectionAsync t =
runRight_ $ do
deleteConnectionsAsync a False connIds
nGet a =##> \case ("", "", DOWN {}) -> True; _ -> False
get a =##> \case ("", c, DEL_RCVQ _ _ (Just (BROKER _ e))) -> c `elem` connIds && (e == TIMEOUT || e == NETWORK); _ -> False
get a =##> \case ("", c, DEL_RCVQ _ _ (Just (BROKER _ e))) -> c `elem` connIds && (e == TIMEOUT || e == NETWORK); _ -> False
get a =##> \case ("", c, DEL_RCVQ _ _ (Just (BROKER _ e))) -> c `elem` connIds && (e == TIMEOUT || e == NETWORK); _ -> False
get a =##> \case ("", c, DEL_CONN) -> c `elem` connIds; _ -> False
get a =##> \case ("", c, DEL_CONN) -> c `elem` connIds; _ -> False
get a =##> \case ("", c, DEL_CONN) -> c `elem` connIds; _ -> False
let delOk = \case (c, _, _, Just (BROKER _ e)) -> c `elem` connIds && (e == TIMEOUT || e == NETWORK); _ -> False
get a =##> \case ("", "", DEL_RCVQS rs) -> length rs == 3 && all delOk rs; _ -> False
get a =##> \case ("", "", DEL_CONNS cs) -> length cs == 3 && all (`elem` connIds) cs; _ -> False
liftIO $ noMessages a "nothing else should be delivered to alice"

testWaitDeliveryNoPending :: ATransport -> IO ()
Expand All @@ -2147,8 +2144,8 @@ testWaitDeliveryNoPending t = withAgentClients2 $ \alice bob ->
ackMessage alice bobId (baseId + 2) Nothing

deleteConnectionsAsync alice True [bobId]
get alice =##> \case ("", cId, DEL_RCVQ _ _ Nothing) -> cId == bobId; _ -> False
get alice =##> \case ("", cId, DEL_CONN) -> cId == bobId; _ -> False
get alice =##> \case ("", "", DEL_RCVQS [(cId, _, _, Nothing)]) -> cId == bobId; _ -> False
get alice =##> \case ("", "", DEL_CONNS [cId]) -> cId == bobId; _ -> False

3 <- msgId <$> sendMessage bob aliceId SMP.noMsgFlags "message 2"
get bob =##> \case ("", cId, MERR mId (SMP _ AUTH)) -> cId == aliceId && mId == (baseId + 3); _ -> False
Expand Down Expand Up @@ -2184,14 +2181,14 @@ testWaitDelivery t =
3 <- msgId <$> sendMessage alice bobId SMP.noMsgFlags "how are you?"
4 <- msgId <$> sendMessage alice bobId SMP.noMsgFlags "message 1"
deleteConnectionsAsync alice True [bobId]
get alice =##> \case ("", cId, DEL_RCVQ _ _ (Just (BROKER _ e))) -> cId == bobId && (e == TIMEOUT || e == NETWORK); _ -> False
get alice =##> \case ("", "", DEL_RCVQS [(cId, _, _, Just (BROKER _ e))]) -> cId == bobId && (e == TIMEOUT || e == NETWORK); _ -> False
liftIO $ noMessages alice "nothing else should be delivered to alice"
liftIO $ noMessages bob "nothing else should be delivered to bob"

withSmpServerStoreLogOn t testPort $ \_ -> runRight_ $ do
get alice ##> ("", bobId, SENT $ baseId + 3)
get alice ##> ("", bobId, SENT $ baseId + 4)
get alice =##> \case ("", cId, DEL_CONN) -> cId == bobId; _ -> False
get alice =##> \case ("", "", DEL_CONNS [cId]) -> cId == bobId; _ -> False

liftIO $
getInAnyOrder
Expand Down Expand Up @@ -2231,8 +2228,8 @@ testWaitDeliveryAUTHErr t =
ackMessage alice bobId (baseId + 2) Nothing

deleteConnectionsAsync bob False [aliceId]
get bob =##> \case ("", cId, DEL_RCVQ _ _ Nothing) -> cId == aliceId; _ -> False
get bob =##> \case ("", cId, DEL_CONN) -> cId == aliceId; _ -> False
get bob =##> \case ("", "", DEL_RCVQS [(cId, _, _, Nothing)]) -> cId == aliceId; _ -> False
get bob =##> \case ("", "", DEL_CONNS [cId]) -> cId == aliceId; _ -> False

pure (aliceId, bobId)

Expand All @@ -2241,14 +2238,14 @@ testWaitDeliveryAUTHErr t =
3 <- msgId <$> sendMessage alice bobId SMP.noMsgFlags "how are you?"
4 <- msgId <$> sendMessage alice bobId SMP.noMsgFlags "message 1"
deleteConnectionsAsync alice True [bobId]
get alice =##> \case ("", cId, DEL_RCVQ _ _ (Just (BROKER _ e))) -> cId == bobId && (e == TIMEOUT || e == NETWORK); _ -> False
get alice =##> \case ("", "", DEL_RCVQS [(cId, _, _, Just (BROKER _ e))]) -> cId == bobId && (e == TIMEOUT || e == NETWORK); _ -> False
liftIO $ noMessages alice "nothing else should be delivered to alice"
liftIO $ noMessages bob "nothing else should be delivered to bob"

withSmpServerStoreLogOn t testPort $ \_ -> do
get alice =##> \case ("", cId, MERR mId (SMP _ AUTH)) -> cId == bobId && mId == (baseId + 3); _ -> False
get alice =##> \case ("", cId, MERR mId (SMP _ AUTH)) -> cId == bobId && mId == (baseId + 4); _ -> False
get alice =##> \case ("", cId, DEL_CONN) -> cId == bobId; _ -> False
get alice =##> \case ("", "", DEL_CONNS [cId]) -> cId == bobId; _ -> False

liftIO $ noMessages alice "nothing else should be delivered to alice"
liftIO $ noMessages bob "nothing else should be delivered to bob"
Expand Down Expand Up @@ -2281,8 +2278,8 @@ testWaitDeliveryTimeout t =
3 <- msgId <$> sendMessage alice bobId SMP.noMsgFlags "how are you?"
4 <- msgId <$> sendMessage alice bobId SMP.noMsgFlags "message 1"
deleteConnectionsAsync alice True [bobId]
get alice =##> \case ("", cId, DEL_RCVQ _ _ (Just (BROKER _ e))) -> cId == bobId && (e == TIMEOUT || e == NETWORK); _ -> False
get alice =##> \case ("", cId, DEL_CONN) -> cId == bobId; _ -> False
get alice =##> \case ("", "", DEL_RCVQS [(cId, _, _, Just (BROKER _ e))]) -> cId == bobId && (e == TIMEOUT || e == NETWORK); _ -> False
get alice =##> \case ("", "", DEL_CONNS [cId]) -> cId == bobId; _ -> False
liftIO $ noMessages alice "nothing else should be delivered to alice"
liftIO $ noMessages bob "nothing else should be delivered to bob"

Expand Down Expand Up @@ -2321,8 +2318,8 @@ testWaitDeliveryTimeout2 t =
3 <- msgId <$> sendMessage alice bobId SMP.noMsgFlags "how are you?"
4 <- msgId <$> sendMessage alice bobId SMP.noMsgFlags "message 1"
deleteConnectionsAsync alice True [bobId]
get alice =##> \case ("", cId, DEL_RCVQ _ _ (Just (BROKER _ e))) -> cId == bobId && (e == TIMEOUT || e == NETWORK); _ -> False
get alice =##> \case ("", cId, DEL_CONN) -> cId == bobId; _ -> False
get alice =##> \case ("", "", DEL_RCVQS [(cId, _, _, Just (BROKER _ e))]) -> cId == bobId && (e == TIMEOUT || e == NETWORK); _ -> False
get alice =##> \case ("", "", DEL_CONNS [cId]) -> cId == bobId; _ -> False
liftIO $ noMessages alice "nothing else should be delivered to alice"
liftIO $ noMessages bob "nothing else should be delivered to bob"

Expand Down Expand Up @@ -2430,8 +2427,8 @@ testUsers =
(aId', bId') <- makeConnectionForUsers a auId b 1
exchangeGreetings a bId' b aId'
deleteUser a auId True
get a =##> \case ("", c, DEL_RCVQ _ _ Nothing) -> c == bId'; _ -> False
get a =##> \case ("", c, DEL_CONN) -> c == bId'; _ -> False
get a =##> \case ("", "", DEL_RCVQS [(c, _, _, Nothing)]) -> c == bId'; _ -> False
get a =##> \case ("", "", DEL_CONNS [c]) -> c == bId'; _ -> False
nGet a =##> \case ("", "", DEL_USER u) -> u == auId; _ -> False
exchangeGreetingsMsgId 4 a bId b aId
liftIO $ noMessages a "nothing else should be delivered to alice"
Expand Down Expand Up @@ -2462,8 +2459,8 @@ testUsersNoServer t = withAgentClientsCfg2 aCfg agentCfg $ \a b -> do
nGet b =##> \case ("", "", DOWN _ cs) -> length cs == 2; _ -> False
runRight_ $ do
deleteUser a auId True
get a =##> \case ("", c, DEL_RCVQ _ _ (Just (BROKER _ e))) -> c == bId' && (e == TIMEOUT || e == NETWORK); _ -> False
get a =##> \case ("", c, DEL_CONN) -> c == bId'; _ -> False
get a =##> \case ("", "", DEL_RCVQS [(c, _, _, Just (BROKER _ e))]) -> c == bId' && (e == TIMEOUT || e == NETWORK); _ -> False
get a =##> \case ("", "", DEL_CONNS [c]) -> c == bId'; _ -> False
nGet a =##> \case ("", "", DEL_USER u) -> u == auId; _ -> False
liftIO $ noMessages a "nothing else should be delivered to alice"
withSmpServerStoreLogOn t testPort $ \_ -> runRight_ $ do
Expand Down Expand Up @@ -2581,9 +2578,8 @@ testSwitchDelete servers =
liftIO $ rcvSwchStatuses' stats `shouldMatchList` [Just RSSwitchStarted]
phaseRcv a bId SPStarted [Just RSSendingQADD, Nothing]
deleteConnectionAsync a False bId
get a =##> \case ("", c, DEL_RCVQ _ _ Nothing) -> c == bId; _ -> False
get a =##> \case ("", c, DEL_RCVQ _ _ Nothing) -> c == bId; _ -> False
get a =##> \case ("", c, DEL_CONN) -> c == bId; _ -> False
get a =##> \case ("", "", DEL_RCVQS [(c, _, _, Nothing), (c', _, _, Nothing)]) -> c == bId && c' == bId; _ -> False
get a =##> \case ("", "", DEL_CONNS [c]) -> c == bId; _ -> False
liftIO $ noMessages a "nothing else should be delivered to alice"

testAbortSwitchStarted :: HasCallStack => InitialAgentServers -> IO ()
Expand Down
Loading