Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: fix memory leak in SMP reconnects #1058

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions src/Simplex/Messaging/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}

Check warning on line 17 in src/Simplex/Messaging/Agent.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-8.10.7

unrecognised warning flag: -fno-warn-ambiguous-fields

Check warning on line 17 in src/Simplex/Messaging/Agent.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-8.10.7

unrecognised warning flag: -fno-warn-ambiguous-fields

-- |
-- Module : Simplex.Messaging.Agent
Expand Down Expand Up @@ -117,6 +117,7 @@
)
where

import Control.Concurrent.STM (retry)
import Control.Logger.Simple (logError, logInfo, showText)
import Control.Monad
import Control.Monad.Except
Expand All @@ -138,6 +139,7 @@
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.Maybe (catMaybes, fromMaybe, isJust, isNothing, mapMaybe)
import qualified Data.Set as S
import Data.Text (Text)
import qualified Data.Text as T
import Data.Time.Clock
Expand Down Expand Up @@ -179,7 +181,6 @@
import Simplex.RemoteControl.Client
import Simplex.RemoteControl.Invitation
import Simplex.RemoteControl.Types
import UnliftIO.Async (race_)
import UnliftIO.Concurrent (forkFinally, forkIO, threadDelay)
import UnliftIO.STM

Expand All @@ -200,7 +201,16 @@
pure c
runAgentThreads c
| backgroundMode = subscriber c
| otherwise = raceAny_ [subscriber c, runNtfSupervisor c, cleanupManager c]
| otherwise = raceAny_ [subscriber c, smpSupervisor c, runNtfSupervisor c, cleanupManager c]

smpSupervisor :: AgentMonad' m => AgentClient -> m ()
smpSupervisor c@AgentClient {smpSubRequests} = forever $ do
subs <- atomically $ do
subs <- readTVar smpSubRequests
if S.null subs then retry else writeTVar smpSubRequests S.empty
pure subs
mapM_ (resubscribeSMPSession c) subs
threadDelay 1000000 -- try to aggregate reconnect requests in the Set

disconnectAgentClient :: MonadUnliftIO m => AgentClient -> m ()
disconnectAgentClient c@AgentClient {agentEnv = Env {ntfSupervisor = ns, xftpAgent = xa}} = do
Expand Down Expand Up @@ -504,7 +514,7 @@

-- | Runs an SMP agent instance that receives commands and sends responses via 'TBQueue's.
runAgentClient :: AgentMonad' m => AgentClient -> m ()
runAgentClient c = race_ (subscriber c) (client c)
runAgentClient c = raceAny_ [subscriber c, client c, smpSupervisor c]

client :: forall m. AgentMonad' m => AgentClient -> m ()
client c@AgentClient {rcvQ, subQ} = forever $ do
Expand Down Expand Up @@ -730,7 +740,7 @@
AgentConfig {smpClientVRange, smpAgentVRange} <- asks config
pure $
(,)
<$> (qUri `compatibleVersion` smpClientVRange)
<$> (qUri `compatibleVersion` smpClientVRange)
<*> (crAgentVRange `compatibleVersion` smpAgentVRange pqSup)

versionPQSupport_ :: VersionSMPA -> Maybe CR.VersionE2E -> PQSupport
Expand Down
33 changes: 21 additions & 12 deletions src/Simplex/Messaging/Agent/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
closeAgentClient,
closeProtocolServerClients,
reconnectServerClients,
resubscribeSMPSession,
closeXFTPServerClient,
runSMPServerTest,
runXFTPServerTest,
Expand Down Expand Up @@ -286,6 +287,7 @@
deleteLock :: Lock,
-- smpSubWorkers for SMP servers sessions
smpSubWorkers :: TMap SMPTransportSession (SessionVar (Async ())),
smpSubRequests :: TVar (Set SMPTransportSession),
agentStats :: TMap AgentStatsKey (TVar Int),
clientId :: Int,
agentEnv :: Env
Expand Down Expand Up @@ -425,6 +427,7 @@
invLocks <- TM.empty
deleteLock <- createLock
smpSubWorkers <- TM.empty
smpSubRequests <- newTVar mempty
agentStats <- TM.empty
return
AgentClient
Expand Down Expand Up @@ -458,6 +461,7 @@
invLocks,
deleteLock,
smpSubWorkers,
smpSubRequests,
agentStats,
clientId,
agentEnv
Expand Down Expand Up @@ -516,16 +520,15 @@
-- make it expensive to check for pending subscriptions.
newClient v =
newProtocolClient c tSess smpClients connectClient v
`catchAgentError` \e -> resubscribeSMPSession c tSess >> throwError e
`catchAgentError` \e -> submitSMPResubscribe c tSess >> throwError e
connectClient :: SMPClientVar -> m SMPClient
connectClient v = do
cfg <- getClientConfig c smpCfg
g <- asks random
u <- askUnliftIO
liftEitherError (protocolClientError SMP $ B.unpack $ strEncode srv) (getProtocolClient g tSess cfg (Just msgQ) $ clientDisconnected u v)
liftEitherError (protocolClientError SMP $ B.unpack $ strEncode srv) $ getProtocolClient g tSess cfg (Just msgQ) (clientDisconnected v)

clientDisconnected :: UnliftIO m -> SMPClientVar -> SMPClient -> IO ()
clientDisconnected u v client = do
clientDisconnected :: SMPClientVar -> SMPClient -> IO ()
clientDisconnected v client = do
removeClientAndSubs >>= serverDown
logInfo . decodeUtf8 $ "Agent disconnected from " <> showServer srv
where
Expand All @@ -548,11 +551,18 @@
unless (null conns) $ notifySub "" $ DOWN srv conns
unless (null qs) $ do
atomically $ mapM_ (releaseGetLock c) qs
unliftIO u $ resubscribeSMPSession c tSess
submitSMPResubscribe c tSess

notifySub :: forall e. AEntityI e => ConnId -> ACommand 'Agent e -> IO ()
notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, APC (sAEntity @e) cmd)

-- | Schedule SMP resubscription job
--
-- Attempting to resubscribe directly in exception handlers results in untraceable resource leaking.
-- A supervisor agent thread starts workers to process session resubscription requests instead allowing the handler to finish quickly.
submitSMPResubscribe :: MonadIO m => AgentClient -> SMPTransportSession -> m ()
submitSMPResubscribe c = atomically . modifyTVar' (smpSubRequests c) . S.insert

resubscribeSMPSession :: AgentMonad' m => AgentClient -> SMPTransportSession -> m ()
resubscribeSMPSession c@AgentClient {smpSubWorkers} tSess =
atomically getWorkerVar >>= mapM_ (either newSubWorker (\_ -> pure ()))
Expand Down Expand Up @@ -1066,19 +1076,18 @@
atomically $ do
modifyTVar' (subscrConns c) (`S.union` S.fromList (map qConnId qs'))
RQ.batchAddQueues (pendingSubs c) qs'
u <- askUnliftIO
-- only "checked" queues are subscribed
(errs <>) <$> sendTSessionBatches "SUB" 90 id (subscribeQueues_ u) c qs'
(errs <>) <$> sendTSessionBatches "SUB" 90 id subscribeQueues_ c qs'
where
checkQueue rq = do
prohibited <- atomically $ hasGetLock c rq
pure $ if prohibited then Left (rq, Left $ CMD PROHIBITED) else Right rq
subscribeQueues_ :: UnliftIO m -> SMPClient -> NonEmpty RcvQueue -> IO (BatchResponses SMPClientError ())
subscribeQueues_ u smp qs' = do
subscribeQueues_ :: SMPClient -> NonEmpty RcvQueue -> IO (BatchResponses SMPClientError ())
subscribeQueues_ smp qs' = do
rs <- sendBatch subscribeSMPQueues smp qs'
mapM_ (uncurry $ processSubResult c) rs
when (any temporaryClientError . lefts . map snd $ L.toList rs) . unliftIO u $
resubscribeSMPSession c (transportSession' smp)
when (any temporaryClientError . lefts . map snd $ L.toList rs) $
submitSMPResubscribe c (transportSession' smp)
pure rs

type BatchResponses e r = (NonEmpty (RcvQueue, Either e r))
Expand Down Expand Up @@ -1515,7 +1524,7 @@
where
statsKey = AgentStatsKey {userId, host = strEncode $ clientTransportHost pc, clientTs = strEncode $ clientSessionTs pc, cmd, res}

userServers :: forall p. (ProtocolTypeI p, UserProtocol p) => AgentClient -> TMap UserId (NonEmpty (ProtoServerWithAuth p))

Check warning on line 1527 in src/Simplex/Messaging/Agent/Client.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-9.6.3

Redundant constraint: UserProtocol p

Check warning on line 1527 in src/Simplex/Messaging/Agent/Client.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-22.04-9.6.3

Redundant constraint: UserProtocol p
userServers c = case protocolTypeI @p of
SPSMP -> smpServers c
SPXFTP -> xftpServers c
Expand Down
Loading