From 1bdf8bc3e3c15c9242f64347881aed889b4f39b7 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <evgeny@poberezkin.com> Date: Thu, 12 Dec 2024 07:46:06 +0000 Subject: [PATCH] parameterize journal store for queue storage --- src/Simplex/Messaging/Server.hs | 33 ++- src/Simplex/Messaging/Server/Env/STM.hs | 8 +- src/Simplex/Messaging/Server/Main.hs | 2 +- .../Messaging/Server/MsgStore/Journal.hs | 206 ++++++++++++------ src/Simplex/Messaging/Server/MsgStore/STM.hs | 35 ++- .../Messaging/Server/MsgStore/Types.hs | 18 +- .../Messaging/Server/QueueStore/STM.hs | 89 +++----- src/Simplex/Messaging/Server/StoreLog.hs | 38 +++- tests/CoreTests/MsgStoreTests.hs | 22 +- tests/CoreTests/StoreLogTests.hs | 4 +- 10 files changed, 280 insertions(+), 175 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 988639f5c..07d8b0e00 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -100,7 +100,6 @@ import Simplex.Messaging.Server.MsgStore.Types import Simplex.Messaging.Server.NtfStore import Simplex.Messaging.Server.QueueStore import Simplex.Messaging.Server.QueueStore.QueueInfo -import Simplex.Messaging.Server.QueueStore.STM import Simplex.Messaging.Server.Stats import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM @@ -423,9 +422,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT ss@ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedAllB, qDeletedNew, qDeletedSecured, qSub, qSubAllB, qSubAuth, qSubDuplicate, qSubProhibited, qSubEnd, qSubEndB, ntfCreated, ntfDeleted, ntfDeletedB, ntfSub, ntfSubB, ntfSubAuth, ntfSubDuplicate, msgSent, msgSentAuth, msgSentQuota, msgSentLarge, msgRecv, msgRecvGet, msgGet, msgGetNoMsg, msgGetAuth, msgGetDuplicate, msgGetProhibited, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount, ntfCount, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv} <- asks serverStats AMS _ st <- asks msgStore - let queues = activeMsgQueues st - notifiers = notifiers' st - interval = 1000000 * logInterval + QueueCounts {queueCount, notifierCount} <- liftIO $ queueCounts st + let interval = 1000000 * logInterval forever $ do withFile statsFilePath AppendMode $ \h -> liftIO $ do hSetBuffering h LineBuffering @@ -478,8 +476,6 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT pMsgFwdsOwn' <- getResetProxyStatsData pMsgFwdsOwn pMsgFwdsRecv' <- atomicSwapIORef pMsgFwdsRecv 0 qCount' <- readIORef qCount - qCount'' <- M.size <$> readTVarIO queues - notifierCount' <- M.size <$> readTVarIO notifiers msgCount' <- readIORef msgCount ntfCount' <- readIORef ntfCount hPutStrLn h $ @@ -532,13 +528,13 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT "0", -- dayCount psSub; psSub is removed to reduce memory usage "0", -- weekCount psSub "0", -- monthCount psSub - show qCount'', + show queueCount, show ntfCreated', show ntfDeleted', show ntfSub', show ntfSubAuth', show ntfSubDuplicate', - show notifierCount', + show notifierCount, show qDeletedAllB', show qSubAllB', show qSubEnd', @@ -625,9 +621,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT CPStats -> withUserRole $ do ss <- unliftIO u $ asks serverStats AMS _ st <- unliftIO u $ asks msgStore - let queues = activeMsgQueues st - notifiers = notifiers' st - getStat :: (ServerStats -> IORef a) -> IO a + QueueCounts {queueCount, notifierCount} <- queueCounts st + let getStat :: (ServerStats -> IORef a) -> IO a getStat var = readIORef (var ss) putStat :: Show a => String -> (ServerStats -> IORef a) -> IO () putStat label var = getStat var >>= \v -> hPutStrLn h $ label <> ": " <> show v @@ -664,9 +659,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT putStat "msgNtfsB" msgNtfsB putStat "msgNtfExpired" msgNtfExpired putStat "qCount" qCount - qCount2 <- M.size <$> readTVarIO queues - hPutStrLn h $ "qCount 2: " <> show qCount2 - notifierCount <- M.size <$> readTVarIO notifiers + hPutStrLn h $ "qCount 2: " <> show queueCount hPutStrLn h $ "notifiers: " <> show notifierCount putStat "msgCount" msgCount putStat "ntfCount" ntfCount @@ -841,7 +834,7 @@ runClientTransport h@THandle {params = thParams@THandleParams {thVersion, sessio c <- liftIO $ newClient msType clientId q thVersion sessionId ts runClientThreads msType ms active c clientId `finally` clientDisconnected c where - runClientThreads :: STMQueueStore (MsgStore s) => SMSType s -> MsgStore s -> TVar (IM.IntMap (Maybe AClient)) -> Client (MsgStore s) -> IS.Key -> M () + runClientThreads :: MsgStoreClass (MsgStore s) => SMSType s -> MsgStore s -> TVar (IM.IntMap (Maybe AClient)) -> Client (MsgStore s) -> IS.Key -> M () runClientThreads msType ms active c clientId = do atomically $ modifyTVar' active $ IM.insert clientId $ Just (AClient msType c) s <- asks server @@ -897,7 +890,7 @@ cancelSub s = case subThread s of _ -> pure () ProhibitSub -> pure () -receive :: forall c s. (Transport c, STMQueueStore s) => THandleSMP c 'TServer -> s -> Client s -> M () +receive :: forall c s. (Transport c, MsgStoreClass s) => THandleSMP c 'TServer -> s -> Client s -> M () receive h@THandle {params = THandleParams {thAuth}} ms Client {rcvQ, sndQ, rcvActiveAt, sessionId} = do labelMyThread . B.unpack $ "client $" <> encode sessionId <> " receive" sa <- asks serverActive @@ -997,7 +990,7 @@ data VerificationResult s = VRVerified (Maybe (StoreQueue s, QueueRec)) | VRFail -- - the queue or party key do not exist. -- In all cases, the time of the verification should depend only on the provided authorization type, -- a dummy key is used to run verification in the last two cases, and failure is returned irrespective of the result. -verifyTransmission :: forall s. STMQueueStore s => s -> Maybe (THandleAuth 'TServer, C.CbNonce) -> Maybe TransmissionAuth -> ByteString -> QueueId -> Cmd -> M (VerificationResult s) +verifyTransmission :: forall s. MsgStoreClass s => s -> Maybe (THandleAuth 'TServer, C.CbNonce) -> Maybe TransmissionAuth -> ByteString -> QueueId -> Cmd -> M (VerificationResult s) verifyTransmission ms auth_ tAuth authorized queueId cmd = case cmd of Cmd SRecipient (NEW k _ _ _ _) -> pure $ Nothing `verifiedWith` k @@ -1074,7 +1067,7 @@ forkClient Client {endThreads, endThreadSeq} label action = do action `finally` atomically (modifyTVar' endThreads $ IM.delete tId) mkWeakThreadId t >>= atomically . modifyTVar' endThreads . IM.insert tId -client :: forall s. STMQueueStore s => THandleParams SMPVersion 'TServer -> Server -> s -> Client s -> M () +client :: forall s. MsgStoreClass s => THandleParams SMPVersion 'TServer -> Server -> s -> Client s -> M () client thParams' Server {subscribedQ, ntfSubscribedQ, subscribers} @@ -1768,7 +1761,7 @@ processServerMessages = do stored'' <- getQueueSize ms rId q liftIO $ closeMsgQueue q pure (stored'', expired'') - processValidateQueue :: RecipientId -> JournalQueue -> IO MessageStats + processValidateQueue :: RecipientId -> JournalQueue 'MSMemory -> IO MessageStats processValidateQueue rId q = runExceptT (getQueueSize ms rId q) >>= \case Right storedMsgsCount -> pure newMessageStats {storedMsgsCount, storedQueues = 1} @@ -1777,7 +1770,7 @@ processServerMessages = do exitFailure -- TODO this function should be called after importing queues from store log -importMessages :: forall s. STMQueueStore s => Bool -> s -> FilePath -> Maybe Int64 -> IO MessageStats +importMessages :: forall s. MsgStoreClass s => Bool -> s -> FilePath -> Maybe Int64 -> IO MessageStats importMessages tty ms f old_ = do logInfo $ "restoring messages from file " <> T.pack f LB.readFile f >>= runExceptT . foldM restoreMsg (0, Nothing, (0, 0, M.empty)) . LB.lines >>= \case diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index f598bdcb8..05322e0f1 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -184,9 +184,9 @@ data Env = Env type family MsgStore s where MsgStore 'MSMemory = STMMsgStore - MsgStore 'MSJournal = JournalMsgStore + MsgStore 'MSJournal = JournalMsgStore 'MSMemory -data AMsgStore = forall s. (STMQueueStore (MsgStore s), MsgStoreClass (MsgStore s)) => AMS (SMSType s) (MsgStore s) +data AMsgStore = forall s. MsgStoreClass (MsgStore s) => AMS (SMSType s) (MsgStore s) data AStoreQueue = forall s. MsgStoreClass (MsgStore s) => ASQ (SMSType s) (StoreQueue (MsgStore s)) @@ -295,7 +295,7 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, storeLogFile, msgSt AMSType SMSMemory -> AMS SMSMemory <$> newMsgStore STMStoreConfig {storePath = storeMsgsFile, quota = msgQueueQuota} AMSType SMSJournal -> case storeMsgsFile of Just storePath -> - let cfg = JournalStoreConfig {storePath, quota = msgQueueQuota, pathParts = journalMsgStoreDepth, maxMsgCount = maxJournalMsgCount, maxStateLines = maxJournalStateLines, stateTailSize = defaultStateTailSize, idleInterval = idleQueueInterval} + let cfg = JournalStoreConfig {storePath, quota = msgQueueQuota, pathParts = journalMsgStoreDepth, queueStoreType = SMSMemory, maxMsgCount = maxJournalMsgCount, maxStateLines = maxJournalStateLines, stateTailSize = defaultStateTailSize, idleInterval = idleQueueInterval} in AMS SMSJournal <$> newMsgStore cfg Nothing -> putStrLn "Error: journal msg store require path in [STORE_LOG], restore_messages" >> exitFailure ntfStore <- NtfStore <$> TM.emptyIO @@ -359,5 +359,5 @@ newSMPProxyAgent smpAgentCfg random = do smpAgent <- newSMPClientAgent smpAgentCfg random pure ProxyAgent {smpAgent} -readWriteQueueStore :: STMQueueStore s => FilePath -> s -> IO (StoreLog 'WriteMode) +readWriteQueueStore :: MsgStoreClass s => FilePath -> s -> IO (StoreLog 'WriteMode) readWriteQueueStore = readWriteStoreLog readQueueStore writeQueueStore diff --git a/src/Simplex/Messaging/Server/Main.hs b/src/Simplex/Messaging/Server/Main.hs index 3da2aaeb4..d5a3157dd 100644 --- a/src/Simplex/Messaging/Server/Main.hs +++ b/src/Simplex/Messaging/Server/Main.hs @@ -148,7 +148,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = doesFileExist iniFile >>= \case True -> readIniFile iniFile >>= either exitError a _ -> exitError $ "Error: server is not initialized (" <> iniFile <> " does not exist).\nRun `" <> executableName <> " init`." - newJournalMsgStore = newMsgStore JournalStoreConfig {storePath = storeMsgsJournalDir, pathParts = journalMsgStoreDepth, quota = defaultMsgQueueQuota, maxMsgCount = defaultMaxJournalMsgCount, maxStateLines = defaultMaxJournalStateLines, stateTailSize = defaultStateTailSize, idleInterval = checkInterval defaultMessageExpiration} + newJournalMsgStore = newMsgStore JournalStoreConfig {storePath = storeMsgsJournalDir, pathParts = journalMsgStoreDepth, queueStoreType = SMSMemory, quota = defaultMsgQueueQuota, maxMsgCount = defaultMaxJournalMsgCount, maxStateLines = defaultMaxJournalStateLines, stateTailSize = defaultStateTailSize, idleInterval = checkInterval defaultMessageExpiration} iniFile = combine cfgPath "smp-server.ini" serverVersion = "SMP server v" <> simplexMQVersion defaultServerPorts = "5223,443" diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index 4e5496f66..78be1980f 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -2,6 +2,7 @@ {-# LANGUAGE DataKinds #-} {-# LANGUAGE DerivingStrategies #-} {-# LANGUAGE DuplicateRecordFields #-} +{-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE InstanceSigs #-} @@ -15,7 +16,7 @@ {-# LANGUAGE TupleSections #-} module Simplex.Messaging.Server.MsgStore.Journal - ( JournalMsgStore (queues, senders, notifiers, random), + ( JournalMsgStore (queueStore, random), JournalQueue, JournalMsgQueue (queue, state), JMQueue (queueDirectory, statePath), @@ -49,6 +50,7 @@ import qualified Data.ByteString.Char8 as B import Data.Functor (($>)) import Data.Int (Int64) import Data.List (intercalate) +import qualified Data.Map.Strict as M import Data.Maybe (catMaybes, fromMaybe, isNothing) import qualified Data.Text as T import Data.Time.Clock (getCurrentTime) @@ -73,19 +75,32 @@ import System.IO (BufferMode (..), Handle, IOMode (..), SeekMode (..), stdout) import qualified System.IO as IO import System.Random (StdGen, genByteString, newStdGen) -data JournalMsgStore = JournalMsgStore - { config :: JournalStoreConfig, +data JournalMsgStore s = JournalMsgStore + { config :: JournalStoreConfig s, random :: TVar StdGen, queueLocks :: TMap RecipientId Lock, - queues :: TMap RecipientId JournalQueue, - senders :: TMap SenderId RecipientId, - notifiers :: TMap NotifierId RecipientId, - storeLog :: TVar (Maybe (StoreLog 'WriteMode)) + queueStore :: QueueStore s } -data JournalStoreConfig = JournalStoreConfig +data QueueStore (s :: MSType) where + MQStore :: + { queues :: TMap RecipientId (JournalQueue 'MSMemory), + senders :: TMap SenderId RecipientId, + notifiers :: TMap NotifierId RecipientId, + storeLog :: TVar (Maybe (StoreLog 'WriteMode)) + } -> QueueStore 'MSMemory + -- maps store cached queues + -- Nothing in map indicates that the queue doesn't exist + -- JQStore :: + -- { queues_ :: TMap RecipientId (Maybe (JournalQueue 'MSJournal)), + -- senders_ :: TMap SenderId (Maybe RecipientId), + -- notifiers_ :: TMap NotifierId (Maybe RecipientId) + -- } -> QueueStore 'MSJournal + +data JournalStoreConfig s = JournalStoreConfig { storePath :: FilePath, pathParts :: Int, + queueStoreType :: SMSType s, quota :: Int, -- Max number of messages per journal file - ignored in STM store. -- When this limit is reached, the file will be changed. @@ -97,12 +112,12 @@ data JournalStoreConfig = JournalStoreConfig idleInterval :: Int64 } -data JournalQueue = JournalQueue +data JournalQueue (s :: MSType) = JournalQueue { queueLock :: Lock, -- To avoid race conditions and errors when restoring queues, -- Nothing is written to TVar when queue is deleted. queueRec :: TVar (Maybe QueueRec), - msgQueue_ :: TVar (Maybe JournalMsgQueue), + msgQueue_ :: TVar (Maybe (JournalMsgQueue s)), -- system time in seconds since epoch activeAt :: TVar Int64, -- Just True - empty, Just False - non-empty, Nothing - unknown @@ -114,7 +129,7 @@ data JMQueue = JMQueue statePath :: FilePath } -data JournalMsgQueue = JournalMsgQueue +data JournalMsgQueue (s :: MSType) = JournalMsgQueue { queue :: JMQueue, state :: TVar MsgQueueState, -- tipMsg contains last message and length incl. newline @@ -215,14 +230,14 @@ msgLogFileName = "messages" logFileExt :: String logFileExt = ".log" -newtype StoreIO a = StoreIO {unStoreIO :: IO a} +newtype StoreIO (s :: MSType) a = StoreIO {unStoreIO :: IO a} deriving newtype (Functor, Applicative, Monad) -instance STMQueueStore JournalMsgStore where - queues' = queues - senders' = senders - notifiers' = notifiers - storeLog' = storeLog +instance STMQueueStore (JournalMsgStore 'MSMemory) where + queues' = queues . queueStore + senders' = senders . queueStore + notifiers' = notifiers . queueStore + storeLog' = storeLog . queueStore mkQueue st qr = do lock <- getMapLock (queueLocks st) $ recipientId qr q <- newTVar $ Just qr @@ -230,39 +245,45 @@ instance STMQueueStore JournalMsgStore where activeAt <- newTVar 0 isEmpty <- newTVar Nothing pure $ JournalQueue lock q mq activeAt isEmpty - msgQueue_' = msgQueue_ -instance MsgStoreClass JournalMsgStore where - type StoreMonad JournalMsgStore = StoreIO - type StoreQueue JournalMsgStore = JournalQueue - type MsgQueue JournalMsgStore = JournalMsgQueue - type MsgStoreConfig JournalMsgStore = JournalStoreConfig +instance MsgStoreClass (JournalMsgStore s) where + type StoreMonad (JournalMsgStore s) = StoreIO s + type StoreQueue (JournalMsgStore s) = JournalQueue s + type MsgQueue (JournalMsgStore s) = JournalMsgQueue s + type MsgStoreConfig (JournalMsgStore s) = (JournalStoreConfig s) - newMsgStore :: JournalStoreConfig -> IO JournalMsgStore + newMsgStore :: JournalStoreConfig s -> IO (JournalMsgStore s) newMsgStore config = do random <- newTVarIO =<< newStdGen - queueLocks <- TM.emptyIO - queues <- TM.emptyIO - senders <- TM.emptyIO - notifiers <- TM.emptyIO - storeLog <- newTVarIO Nothing - pure JournalMsgStore {config, random, queueLocks, queues, senders, notifiers, storeLog} - - setStoreLog :: JournalMsgStore -> StoreLog 'WriteMode -> IO () - setStoreLog st sl = atomically $ writeTVar (storeLog st) (Just sl) - - closeMsgStore st = do - readTVarIO (storeLog st) >>= mapM_ closeStoreLog - readTVarIO (queues st) >>= mapM_ closeMsgQueue - - activeMsgQueues = queues + queueLocks :: TMap RecipientId Lock <- TM.emptyIO + case queueStoreType config of + SMSMemory -> do + queues <- TM.emptyIO + senders <- TM.emptyIO + notifiers <- TM.emptyIO + storeLog <- newTVarIO Nothing + let queueStore = MQStore {queues, senders, notifiers, storeLog} + pure JournalMsgStore {config, random, queueLocks, queueStore} + SMSJournal -> undefined + + setStoreLog :: JournalMsgStore s -> StoreLog 'WriteMode -> IO () + setStoreLog st sl = case queueStore st of + MQStore {storeLog} -> atomically $ writeTVar storeLog (Just sl) + + closeMsgStore st = case queueStore st of + MQStore {queues, storeLog} -> do + readTVarIO storeLog >>= mapM_ closeStoreLog + readTVarIO queues >>= mapM_ closeMsgQueue + + activeMsgQueues st = case queueStore st of + MQStore {queues} -> queues {-# INLINE activeMsgQueues #-} -- This function is a "foldr" that opens and closes all queues, processes them as defined by action and accumulates the result. -- It is used to export storage to a single file and also to expire messages and validate all queues when server is started. -- TODO this function requires case-sensitive file system, because it uses queue directory as recipient ID. -- It can be made to support case-insensite FS by supporting more than one queue per directory, by getting recipient ID from state file name. - withAllMsgQueues :: forall a. Monoid a => Bool -> JournalMsgStore -> (RecipientId -> JournalQueue -> IO a) -> IO a + withAllMsgQueues :: forall a. Monoid a => Bool -> JournalMsgStore s -> (RecipientId -> JournalQueue s -> IO a) -> IO a withAllMsgQueues tty ms@JournalMsgStore {config} action = ifM (doesDirectoryExist storePath) processStore (pure mempty) where processStore = do @@ -302,10 +323,10 @@ instance MsgStoreClass JournalMsgStore where (pure $ Just (queueId', path')) (Nothing <$ putStrLn ("Error: path " <> path' <> " is not a directory, skipping")) - logQueueStates :: JournalMsgStore -> IO () + logQueueStates :: JournalMsgStore s -> IO () logQueueStates ms = withActiveMsgQueues ms $ \_ -> unStoreIO . logQueueState - logQueueState :: JournalQueue -> StoreIO () + logQueueState :: JournalQueue s -> StoreIO s () logQueueState q = StoreIO . void $ readTVarIO (msgQueue_ q) @@ -315,7 +336,49 @@ instance MsgStoreClass JournalMsgStore where queueRec' = queueRec {-# INLINE queueRec' #-} - getMsgQueue :: JournalMsgStore -> RecipientId -> JournalQueue -> StoreIO JournalMsgQueue + msgQueue_' = msgQueue_ + {-# INLINE msgQueue_' #-} + + queueCounts :: JournalMsgStore s -> IO QueueCounts + queueCounts st = case queueStore st of + MQStore {queues, notifiers} -> do + queueCount <- M.size <$> readTVarIO queues + notifierCount <- M.size <$> readTVarIO notifiers + pure QueueCounts {queueCount, notifierCount} + + addQueue :: JournalMsgStore s -> QueueRec -> IO (Either ErrorType (JournalQueue s)) + addQueue st qr = case queueStore st of + MQStore {} -> addQueue' st qr + + getQueue :: DirectParty p => JournalMsgStore s -> SParty p -> QueueId -> IO (Either ErrorType (JournalQueue s)) + getQueue st party qId = case queueStore st of + MQStore {} -> getQueue' st party qId + + getQueueRec :: DirectParty p => JournalMsgStore s -> SParty p -> QueueId -> IO (Either ErrorType (JournalQueue s, QueueRec)) + getQueueRec st party qId = case queueStore st of + MQStore {} -> getQueueRec' st party qId + + secureQueue :: JournalMsgStore s -> JournalQueue s -> SndPublicAuthKey -> IO (Either ErrorType ()) + secureQueue st sq sKey = case queueStore st of + MQStore {} -> secureQueue' st sq sKey + + addQueueNotifier :: JournalMsgStore s -> JournalQueue s -> NtfCreds -> IO (Either ErrorType (Maybe NotifierId)) + addQueueNotifier st sq ntfCreds = case queueStore st of + MQStore {} -> addQueueNotifier' st sq ntfCreds + + deleteQueueNotifier :: JournalMsgStore s -> JournalQueue s -> IO (Either ErrorType (Maybe NotifierId)) + deleteQueueNotifier st sq = case queueStore st of + MQStore {} -> deleteQueueNotifier' st sq + + suspendQueue :: JournalMsgStore s -> JournalQueue s -> IO (Either ErrorType ()) + suspendQueue st sq = case queueStore st of + MQStore {} -> suspendQueue' st sq + + updateQueueTime :: JournalMsgStore s -> JournalQueue s -> RoundedSystemTime -> IO (Either ErrorType QueueRec) + updateQueueTime st sq t = case queueStore st of + MQStore {} -> updateQueueTime' st sq t + + getMsgQueue :: JournalMsgStore s -> RecipientId -> JournalQueue s -> StoreIO s (JournalMsgQueue s) getMsgQueue ms@JournalMsgStore {random} rId JournalQueue {msgQueue_} = StoreIO $ readTVarIO msgQueue_ >>= maybe newQ pure where @@ -327,14 +390,14 @@ instance MsgStoreClass JournalMsgStore where atomically $ writeTVar msgQueue_ $ Just q pure q where - createQ :: JMQueue -> IO JournalMsgQueue + createQ :: JMQueue -> IO (JournalMsgQueue s) createQ queue = do -- folder and files are not created here, -- to avoid file IO for queues without messages during subscription journalId <- newJournalId random mkJournalQueue queue (newMsgQueueState journalId) Nothing - getPeekMsgQueue :: JournalMsgStore -> RecipientId -> JournalQueue -> StoreIO (Maybe (JournalMsgQueue, Message)) + getPeekMsgQueue :: JournalMsgStore s -> RecipientId -> JournalQueue s -> StoreIO s (Maybe (JournalMsgQueue s, Message)) getPeekMsgQueue ms rId q@JournalQueue {isEmpty} = StoreIO (readTVarIO isEmpty) >>= \case Just True -> pure Nothing @@ -354,7 +417,7 @@ instance MsgStoreClass JournalMsgStore where (mq,) <$$> tryPeekMsg_ q mq -- only runs action if queue is not empty - withIdleMsgQueue :: Int64 -> JournalMsgStore -> RecipientId -> JournalQueue -> (JournalMsgQueue -> StoreIO a) -> StoreIO (Maybe a, Int) + withIdleMsgQueue :: Int64 -> JournalMsgStore s -> RecipientId -> JournalQueue s -> (JournalMsgQueue s -> StoreIO s a) -> StoreIO s (Maybe a, Int) withIdleMsgQueue now ms@JournalMsgStore {config} rId q action = StoreIO $ readTVarIO (msgQueue_ q) >>= \case Nothing -> @@ -375,18 +438,18 @@ instance MsgStoreClass JournalMsgStore where sz <- unStoreIO $ getQueueSize_ mq pure (r, sz) - deleteQueue :: JournalMsgStore -> RecipientId -> JournalQueue -> IO (Either ErrorType QueueRec) + deleteQueue :: JournalMsgStore s -> RecipientId -> JournalQueue s -> IO (Either ErrorType QueueRec) deleteQueue ms rId q = fst <$$> deleteQueue_ ms rId q - deleteQueueSize :: JournalMsgStore -> RecipientId -> JournalQueue -> IO (Either ErrorType (QueueRec, Int)) + deleteQueueSize :: JournalMsgStore s -> RecipientId -> JournalQueue s -> IO (Either ErrorType (QueueRec, Int)) deleteQueueSize ms rId q = deleteQueue_ ms rId q >>= mapM (traverse getSize) -- traverse operates on the second tuple element where getSize = maybe (pure (-1)) (fmap size . readTVarIO . state) - getQueueMessages_ :: Bool -> JournalMsgQueue -> StoreIO [Message] + getQueueMessages_ :: Bool -> JournalMsgQueue s -> StoreIO s [Message] getQueueMessages_ drainMsgs q = StoreIO (run []) where run msgs = readTVarIO (handles q) >>= maybe (pure []) (getMsg msgs) @@ -397,7 +460,7 @@ instance MsgStoreClass JournalMsgStore where updateReadPos q drainMsgs len hs (msg :) <$> run msgs - writeMsg :: JournalMsgStore -> RecipientId -> JournalQueue -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool)) + writeMsg :: JournalMsgStore s -> RecipientId -> JournalQueue s -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool)) writeMsg ms rId q' logState msg = isolateQueue rId q' "writeMsg" $ do q <- getMsgQueue ms rId q' StoreIO $ (`E.finally` updateActiveAt q') $ do @@ -446,15 +509,15 @@ instance MsgStoreClass JournalMsgStore where pure (newJournalState journalId, wh) -- can ONLY be used while restoring messages, not while server running - setOverQuota_ :: JournalQueue -> IO () + setOverQuota_ :: JournalQueue s -> IO () setOverQuota_ q = readTVarIO (msgQueue_ q) >>= mapM_ (\JournalMsgQueue {state} -> atomically $ modifyTVar' state $ \st -> st {canWrite = False}) - getQueueSize_ :: JournalMsgQueue -> StoreIO Int + getQueueSize_ :: JournalMsgQueue s -> StoreIO s Int getQueueSize_ JournalMsgQueue {state} = StoreIO $ size <$> readTVarIO state - tryPeekMsg_ :: JournalQueue -> JournalMsgQueue -> StoreIO (Maybe Message) + tryPeekMsg_ :: JournalQueue s -> JournalMsgQueue s -> StoreIO s (Maybe Message) tryPeekMsg_ q mq@JournalMsgQueue {tipMsg, handles} = StoreIO $ (readTVarIO handles $>>= chooseReadJournal mq True $>>= peekMsg) >>= setEmpty where @@ -468,7 +531,7 @@ instance MsgStoreClass JournalMsgStore where atomically $ writeTVar (isEmpty q) (Just $ isNothing msg) pure msg - tryDeleteMsg_ :: JournalQueue -> JournalMsgQueue -> Bool -> StoreIO () + tryDeleteMsg_ :: JournalQueue s -> JournalMsgQueue s -> Bool -> StoreIO s () tryDeleteMsg_ q mq@JournalMsgQueue {tipMsg, handles} logState = StoreIO $ (`E.finally` when logState (updateActiveAt q)) $ void $ readTVarIO tipMsg -- if there is no cached tipMsg, do nothing @@ -476,11 +539,11 @@ instance MsgStoreClass JournalMsgStore where $>>= \len -> readTVarIO handles $>>= \hs -> updateReadPos mq logState len hs $> Just () - isolateQueue :: RecipientId -> JournalQueue -> String -> StoreIO a -> ExceptT ErrorType IO a + isolateQueue :: RecipientId -> JournalQueue s -> String -> StoreIO s a -> ExceptT ErrorType IO a isolateQueue rId JournalQueue {queueLock} op = tryStore' op rId . withLock' queueLock op . unStoreIO -updateActiveAt :: JournalQueue -> IO () +updateActiveAt :: JournalQueue s -> IO () updateActiveAt q = atomically . writeTVar (activeAt q) . systemSeconds =<< getSystemTime tryStore' :: String -> RecipientId -> IO a -> ExceptT ErrorType IO a @@ -494,17 +557,17 @@ tryStore op rId a = ExceptT $ E.mask_ $ E.try a >>= either storeErr pure let e' = intercalate ", " [op, B.unpack $ strEncode rId, show e] in logError ("STORE: " <> T.pack e') $> Left (STORE e') -isolateQueueId :: String -> JournalMsgStore -> RecipientId -> IO (Either ErrorType a) -> ExceptT ErrorType IO a +isolateQueueId :: String -> JournalMsgStore s -> RecipientId -> IO (Either ErrorType a) -> ExceptT ErrorType IO a isolateQueueId op ms rId = tryStore op rId . withLockMap (queueLocks ms) rId op -openMsgQueue :: JournalMsgStore -> JMQueue -> IO JournalMsgQueue +openMsgQueue :: JournalMsgStore s -> JMQueue -> IO (JournalMsgQueue s) openMsgQueue ms q@JMQueue {queueDirectory = dir, statePath} = do (st, sh) <- readWriteQueueState ms statePath (st', rh, wh_) <- closeOnException sh $ openJournals ms dir st sh let hs = MsgQueueHandles {stateHandle = sh, readHandle = rh, writeHandle = wh_} mkJournalQueue q st' (Just hs) -mkJournalQueue :: JMQueue -> MsgQueueState -> Maybe MsgQueueHandles -> IO JournalMsgQueue +mkJournalQueue :: JMQueue -> MsgQueueState -> Maybe MsgQueueHandles -> IO (JournalMsgQueue s) mkJournalQueue queue st hs_ = do state <- newTVarIO st tipMsg <- newTVarIO Nothing @@ -513,7 +576,7 @@ mkJournalQueue queue st hs_ = do -- to avoid map lookup on queue operations pure JournalMsgQueue {queue, state, tipMsg, handles} -chooseReadJournal :: JournalMsgQueue -> Bool -> MsgQueueHandles -> IO (Maybe (JournalState 'JTRead, Handle)) +chooseReadJournal :: JournalMsgQueue s -> Bool -> MsgQueueHandles -> IO (Maybe (JournalState 'JTRead, Handle)) chooseReadJournal q log' hs = do st@MsgQueueState {writeState = ws, readState = rs} <- readTVarIO (state q) case writeHandle hs of @@ -529,7 +592,7 @@ chooseReadJournal q log' hs = do _ | msgPos rs >= msgCount rs && journalId rs == journalId ws -> pure Nothing _ -> pure $ Just (rs, readHandle hs) -updateQueueState :: JournalMsgQueue -> Bool -> MsgQueueHandles -> MsgQueueState -> STM () -> IO () +updateQueueState :: JournalMsgQueue s -> Bool -> MsgQueueHandles -> MsgQueueState -> STM () -> IO () updateQueueState q log' hs st a = do unless (validQueueState st) $ E.throwIO $ userError $ "updateQueueState invalid state: " <> show st when log' $ appendState (stateHandle hs) st @@ -538,7 +601,7 @@ updateQueueState q log' hs st a = do appendState :: Handle -> MsgQueueState -> IO () appendState h st = E.uninterruptibleMask_ $ B.hPutStr h $ strEncode st `B.snoc` '\n' -updateReadPos :: JournalMsgQueue -> Bool -> Int64 -> MsgQueueHandles -> IO () +updateReadPos :: JournalMsgQueue s -> Bool -> Int64 -> MsgQueueHandles -> IO () updateReadPos q log' len hs = do st@MsgQueueState {readState = rs, size} <- readTVarIO (state q) let JournalState {msgPos, bytePos} = rs @@ -547,7 +610,7 @@ updateReadPos q log' len hs = do st' = st {readState = rs', size = size - 1} updateQueueState q log' hs st' $ writeTVar (tipMsg q) Nothing -msgQueueDirectory :: JournalMsgStore -> RecipientId -> FilePath +msgQueueDirectory :: JournalMsgStore s -> RecipientId -> FilePath msgQueueDirectory JournalMsgStore {config = JournalStoreConfig {storePath, pathParts}} rId = storePath </> B.unpack (B.intercalate "/" $ splitSegments pathParts $ strEncode rId) where @@ -570,7 +633,7 @@ createNewJournal dir journalId = do newJournalId :: TVar StdGen -> IO ByteString newJournalId g = strEncode <$> atomically (stateTVar g $ genByteString 12) -openJournals :: JournalMsgStore -> FilePath -> MsgQueueState -> Handle -> IO (MsgQueueState, Handle, Maybe Handle) +openJournals :: JournalMsgStore s -> FilePath -> MsgQueueState -> Handle -> IO (MsgQueueState, Handle, Maybe Handle) openJournals ms dir st@MsgQueueState {readState = rs, writeState = ws} sh = do let rjId = journalId rs wjId = journalId ws @@ -643,7 +706,7 @@ removeJournal dir JournalState {journalId} = do -- This function is supposed to be resilient to crashes while updating state files, -- and also resilient to crashes during its execution. -readWriteQueueState :: JournalMsgStore -> FilePath -> IO (MsgQueueState, Handle) +readWriteQueueState :: JournalMsgStore s -> FilePath -> IO (MsgQueueState, Handle) readWriteQueueState JournalMsgStore {random, config} statePath = ifM (doesFileExist tempBackup) @@ -721,20 +784,21 @@ validQueueState MsgQueueState {readState = rs, writeState = ws, size} && msgPos ws == msgCount ws && bytePos ws == byteCount ws -deleteQueue_ :: JournalMsgStore -> RecipientId -> JournalQueue -> IO (Either ErrorType (QueueRec, Maybe JournalMsgQueue)) +deleteQueue_ :: forall s. JournalMsgStore s -> RecipientId -> JournalQueue s -> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s))) deleteQueue_ ms rId q = - runExceptT $ isolateQueueId "deleteQueue_" ms rId $ - deleteQueue' ms rId q >>= mapM remove + runExceptT $ isolateQueueId "deleteQueue_" ms rId $ case queueStore ms of + MQStore {} -> deleteQueue' ms rId q >>= mapM remove where + remove :: (QueueRec, Maybe (JournalMsgQueue s)) -> IO (QueueRec, Maybe (JournalMsgQueue s)) remove r@(_, mq_) = do mapM_ closeMsgQueueHandles mq_ removeQueueDirectory ms rId pure r -closeMsgQueue :: JournalQueue -> IO () +closeMsgQueue :: JournalQueue s -> IO () closeMsgQueue JournalQueue {msgQueue_} = atomically (swapTVar msgQueue_ Nothing) >>= mapM_ closeMsgQueueHandles -closeMsgQueueHandles :: JournalMsgQueue -> IO () +closeMsgQueueHandles :: JournalMsgQueue s -> IO () closeMsgQueueHandles q = readTVarIO (handles q) >>= mapM_ closeHandles where closeHandles (MsgQueueHandles sh rh wh_) = do @@ -742,7 +806,7 @@ closeMsgQueueHandles q = readTVarIO (handles q) >>= mapM_ closeHandles hClose rh mapM_ hClose wh_ -removeQueueDirectory :: JournalMsgStore -> RecipientId -> IO () +removeQueueDirectory :: JournalMsgStore s -> RecipientId -> IO () removeQueueDirectory st = removeQueueDirectory_ . msgQueueDirectory st removeQueueDirectory_ :: FilePath -> IO () diff --git a/src/Simplex/Messaging/Server/MsgStore/STM.hs b/src/Simplex/Messaging/Server/MsgStore/STM.hs index cbeb75f9c..8a29461b4 100644 --- a/src/Simplex/Messaging/Server/MsgStore/STM.hs +++ b/src/Simplex/Messaging/Server/MsgStore/STM.hs @@ -21,6 +21,7 @@ import Control.Monad.IO.Class import Control.Monad.Trans.Except import Data.Functor (($>)) import Data.Int (Int64) +import qualified Data.Map.Strict as M import Simplex.Messaging.Protocol import Simplex.Messaging.Server.MsgStore.Types import Simplex.Messaging.Server.QueueStore @@ -63,7 +64,6 @@ instance STMQueueStore STMMsgStore where notifiers' = notifiers storeLog' = storeLog mkQueue _ qr = STMQueue <$> newTVar (Just qr) <*> newTVar Nothing - msgQueue_' = msgQueue_ instance MsgStoreClass STMMsgStore where type StoreMonad STMMsgStore = STM @@ -97,6 +97,39 @@ instance MsgStoreClass STMMsgStore where queueRec' = queueRec {-# INLINE queueRec' #-} + msgQueue_' = msgQueue_ + {-# INLINE msgQueue_' #-} + + queueCounts :: STMMsgStore -> IO QueueCounts + queueCounts st = do + queueCount <- M.size <$> readTVarIO (queues st) + notifierCount <- M.size <$> readTVarIO (notifiers st) + pure QueueCounts {queueCount, notifierCount} + + addQueue = addQueue' + {-# INLINE addQueue #-} + + getQueue = getQueue' + {-# INLINE getQueue #-} + + getQueueRec = getQueueRec' + {-# INLINE getQueueRec #-} + + secureQueue = secureQueue' + {-# INLINE secureQueue #-} + + addQueueNotifier = addQueueNotifier' + {-# INLINE addQueueNotifier #-} + + deleteQueueNotifier = deleteQueueNotifier' + {-# INLINE deleteQueueNotifier #-} + + suspendQueue = suspendQueue' + {-# INLINE suspendQueue #-} + + updateQueueTime = updateQueueTime' + {-# INLINE updateQueueTime #-} + getMsgQueue :: STMMsgStore -> RecipientId -> STMQueue -> STM STMMsgQueue getMsgQueue _ _ STMQueue {msgQueue_} = readTVar msgQueue_ >>= maybe newQ pure where diff --git a/src/Simplex/Messaging/Server/MsgStore/Types.hs b/src/Simplex/Messaging/Server/MsgStore/Types.hs index 8754767cd..f43fad442 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Types.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Types.hs @@ -35,7 +35,6 @@ class MsgStoreClass s => STMQueueStore s where notifiers' :: s -> TMap NotifierId RecipientId storeLog' :: s -> TVar (Maybe (StoreLog 'WriteMode)) mkQueue :: s -> QueueRec -> STM (StoreQueue s) - msgQueue_' :: StoreQueue s -> TVar (Maybe (MsgQueue s)) class Monad (StoreMonad s) => MsgStoreClass s where type StoreMonad s = (m :: Type -> Type) | m -> s @@ -50,6 +49,18 @@ class Monad (StoreMonad s) => MsgStoreClass s where logQueueStates :: s -> IO () logQueueState :: StoreQueue s -> StoreMonad s () queueRec' :: StoreQueue s -> TVar (Maybe QueueRec) + msgQueue_' :: StoreQueue s -> TVar (Maybe (MsgQueue s)) + queueCounts :: s -> IO QueueCounts + + addQueue :: s -> QueueRec -> IO (Either ErrorType (StoreQueue s)) + getQueue :: DirectParty p => s -> SParty p -> QueueId -> IO (Either ErrorType (StoreQueue s)) + getQueueRec :: DirectParty p => s -> SParty p -> QueueId -> IO (Either ErrorType (StoreQueue s, QueueRec)) + secureQueue :: s -> StoreQueue s -> SndPublicAuthKey -> IO (Either ErrorType ()) + addQueueNotifier :: s -> StoreQueue s -> NtfCreds -> IO (Either ErrorType (Maybe NotifierId)) + deleteQueueNotifier :: s -> StoreQueue s -> IO (Either ErrorType (Maybe NotifierId)) + suspendQueue :: s -> StoreQueue s -> IO (Either ErrorType ()) + updateQueueTime :: s -> StoreQueue s -> RoundedSystemTime -> IO (Either ErrorType QueueRec) + getPeekMsgQueue :: s -> RecipientId -> StoreQueue s -> StoreMonad s (Maybe (MsgQueue s, Message)) getMsgQueue :: s -> RecipientId -> StoreQueue s -> StoreMonad s (MsgQueue s) @@ -65,6 +76,11 @@ class Monad (StoreMonad s) => MsgStoreClass s where tryDeleteMsg_ :: StoreQueue s -> MsgQueue s -> Bool -> StoreMonad s () isolateQueue :: RecipientId -> StoreQueue s -> String -> StoreMonad s a -> ExceptT ErrorType IO a +data QueueCounts = QueueCounts + { queueCount :: Int, + notifierCount :: Int + } + data MSType = MSMemory | MSJournal data SMSType :: MSType -> Type where diff --git a/src/Simplex/Messaging/Server/QueueStore/STM.hs b/src/Simplex/Messaging/Server/QueueStore/STM.hs index 7bf4f3a4a..65dd828a6 100644 --- a/src/Simplex/Messaging/Server/QueueStore/STM.hs +++ b/src/Simplex/Messaging/Server/QueueStore/STM.hs @@ -14,14 +14,14 @@ {-# LANGUAGE UndecidableInstances #-} module Simplex.Messaging.Server.QueueStore.STM - ( addQueue, - getQueue, - getQueueRec, - secureQueue, - addQueueNotifier, - deleteQueueNotifier, - suspendQueue, - updateQueueTime, + ( addQueue', + getQueue', + getQueueRec', + secureQueue', + addQueueNotifier', + deleteQueueNotifier', + suspendQueue', + updateQueueTime', deleteQueue', readQueueStore, withLog', @@ -31,31 +31,25 @@ where import qualified Control.Exception as E import Control.Logger.Simple import Control.Monad -import Control.Monad.IO.Class -import Control.Monad.Trans.Except import Data.Bitraversable (bimapM) -import qualified Data.ByteString.Char8 as B -import qualified Data.ByteString.Lazy.Char8 as LB import Data.Functor (($>)) import qualified Data.Text as T -import Data.Text.Encoding (decodeLatin1) -import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol import Simplex.Messaging.Server.MsgStore.Types import Simplex.Messaging.Server.QueueStore import Simplex.Messaging.Server.StoreLog import qualified Simplex.Messaging.TMap as TM -import Simplex.Messaging.Util (ifM, tshow, ($>>=), (<$$)) +import Simplex.Messaging.Util (ifM, ($>>=), (<$$)) import System.IO import UnliftIO.STM -addQueue :: STMQueueStore s => s -> QueueRec -> IO (Either ErrorType (StoreQueue s)) -addQueue st qr@QueueRec {recipientId = rId, senderId = sId, notifier}= +addQueue' :: STMQueueStore s => s -> QueueRec -> IO (Either ErrorType (StoreQueue s)) +addQueue' st qr@QueueRec {recipientId = rId, senderId = sId, notifier}= atomically add $>>= \q -> q <$$ withLog "addQueue" st (`logCreateQueue` qr) where add = ifM hasId (pure $ Left DUPLICATE_) $ do - q <- mkQueue st qr -- STMQueue lock <$> (newTVar $! Just qr) <*> newTVar Nothing + q <- mkQueue st qr TM.insert rId q $ queues' st TM.insert sId rId $ senders' st forM_ notifier $ \NtfCreds {notifierId} -> TM.insert notifierId rId $ notifiers' st @@ -63,20 +57,20 @@ addQueue st qr@QueueRec {recipientId = rId, senderId = sId, notifier}= hasId = or <$> sequence [TM.member rId $ queues' st, TM.member sId $ senders' st, hasNotifier] hasNotifier = maybe (pure False) (\NtfCreds {notifierId} -> TM.member notifierId (notifiers' st)) notifier -getQueue :: (STMQueueStore s, DirectParty p) => s -> SParty p -> QueueId -> IO (Either ErrorType (StoreQueue s)) -getQueue st party qId = +getQueue' :: (STMQueueStore s, DirectParty p) => s -> SParty p -> QueueId -> IO (Either ErrorType (StoreQueue s)) +getQueue' st party qId = maybe (Left AUTH) Right <$> case party of SRecipient -> TM.lookupIO qId $ queues' st SSender -> TM.lookupIO qId (senders' st) $>>= (`TM.lookupIO` queues' st) SNotifier -> TM.lookupIO qId (notifiers' st) $>>= (`TM.lookupIO` queues' st) -getQueueRec :: (STMQueueStore s, DirectParty p) => s -> SParty p -> QueueId -> IO (Either ErrorType (StoreQueue s, QueueRec)) -getQueueRec st party qId = +getQueueRec' :: (STMQueueStore s, DirectParty p) => s -> SParty p -> QueueId -> IO (Either ErrorType (StoreQueue s, QueueRec)) +getQueueRec' st party qId = getQueue st party qId $>>= (\q -> maybe (Left AUTH) (Right . (q,)) <$> readTVarIO (queueRec' q)) -secureQueue :: STMQueueStore s => s -> StoreQueue s -> SndPublicAuthKey -> IO (Either ErrorType ()) -secureQueue st sq sKey = +secureQueue' :: STMQueueStore s => s -> StoreQueue s -> SndPublicAuthKey -> IO (Either ErrorType ()) +secureQueue' st sq sKey = atomically (readQueueRec qr $>>= secure) $>>= \rId -> withLog "secureQueue" st $ \s -> logSecureQueue s rId sKey where @@ -87,8 +81,8 @@ secureQueue st sq sKey = writeTVar qr $ Just q {senderKey = Just sKey} pure $ Right rId -addQueueNotifier :: STMQueueStore s => s -> StoreQueue s -> NtfCreds -> IO (Either ErrorType (Maybe NotifierId)) -addQueueNotifier st sq ntfCreds@NtfCreds {notifierId = nId} = +addQueueNotifier' :: STMQueueStore s => s -> StoreQueue s -> NtfCreds -> IO (Either ErrorType (Maybe NotifierId)) +addQueueNotifier' st sq ntfCreds@NtfCreds {notifierId = nId} = atomically (readQueueRec qr $>>= add) $>>= \(rId, nId_) -> nId_ <$$ withLog "addQueueNotifier" st (\s -> logAddNotifier s rId ntfCreds) where @@ -100,8 +94,8 @@ addQueueNotifier st sq ntfCreds@NtfCreds {notifierId = nId} = TM.insert nId rId $ notifiers' st pure $ Right (rId, nId_) -deleteQueueNotifier :: STMQueueStore s => s -> StoreQueue s -> IO (Either ErrorType (Maybe NotifierId)) -deleteQueueNotifier st sq = +deleteQueueNotifier' :: STMQueueStore s => s -> StoreQueue s -> IO (Either ErrorType (Maybe NotifierId)) +deleteQueueNotifier' st sq = atomically (readQueueRec qr >>= mapM delete) $>>= \(rId, nId_) -> nId_ <$$ withLog "deleteQueueNotifier" st (`logDeleteNotifier` rId) where @@ -111,8 +105,8 @@ deleteQueueNotifier st sq = writeTVar qr $! Just q {notifier = Nothing} pure notifierId -suspendQueue :: STMQueueStore s => s -> StoreQueue s -> IO (Either ErrorType ()) -suspendQueue st sq = +suspendQueue' :: STMQueueStore s => s -> StoreQueue s -> IO (Either ErrorType ()) +suspendQueue' st sq = atomically (readQueueRec qr >>= mapM suspend) $>>= \rId -> withLog "suspendQueue" st (`logSuspendQueue` rId) where @@ -121,8 +115,8 @@ suspendQueue st sq = writeTVar qr $! Just q {status = QueueOff} pure $ recipientId q -updateQueueTime :: STMQueueStore s => s -> StoreQueue s -> RoundedSystemTime -> IO (Either ErrorType QueueRec) -updateQueueTime st sq t = atomically (readQueueRec qr >>= mapM update) $>>= log' +updateQueueTime' :: STMQueueStore s => s -> StoreQueue s -> RoundedSystemTime -> IO (Either ErrorType QueueRec) +updateQueueTime' st sq t = atomically (readQueueRec qr >>= mapM update) $>>= log' where qr = queueRec' sq update q@QueueRec {updatedAt} @@ -163,34 +157,3 @@ withLog' name sl action = withLog :: STMQueueStore s => String -> s -> (StoreLog 'WriteMode -> IO ()) -> IO (Either ErrorType ()) withLog name = withLog' name . storeLog' - -readQueueStore :: forall s. STMQueueStore s => FilePath -> s -> IO () -readQueueStore f st = withFile f ReadMode $ LB.hGetContents >=> mapM_ processLine . LB.lines - where - processLine :: LB.ByteString -> IO () - processLine s' = either printError procLogRecord (strDecode s) - where - s = LB.toStrict s' - procLogRecord :: StoreLogRecord -> IO () - procLogRecord = \case - CreateQueue q -> addQueue st q >>= qError (recipientId q) "CreateQueue" - SecureQueue qId sKey -> withQueue qId "SecureQueue" $ \q -> secureQueue st q sKey - AddNotifier qId ntfCreds -> withQueue qId "AddNotifier" $ \q -> addQueueNotifier st q ntfCreds - SuspendQueue qId -> withQueue qId "SuspendQueue" $ suspendQueue st - DeleteQueue qId -> withQueue qId "DeleteQueue" $ deleteQueue st qId - DeleteNotifier qId -> withQueue qId "DeleteNotifier" $ deleteQueueNotifier st - UpdateTime qId t -> withQueue qId "UpdateTime" $ \q -> updateQueueTime st q t - printError :: String -> IO () - printError e = B.putStrLn $ "Error parsing log: " <> B.pack e <> " - " <> s - withQueue :: forall a. RecipientId -> T.Text -> (StoreQueue s -> IO (Either ErrorType a)) -> IO () - withQueue qId op a = runExceptT go >>= qError qId op - where - go = do - q <- ExceptT $ getQueue st SRecipient qId - liftIO (readTVarIO $ queueRec' q) >>= \case - Nothing -> logWarn $ logPfx qId op <> "already deleted" - Just _ -> void $ ExceptT $ a q - qError qId op = \case - Left e -> logError $ logPfx qId op <> tshow e - Right _ -> pure () - logPfx qId op = "STORE: " <> op <> ", stored queue " <> decodeLatin1 (strEncode qId) <> ", " diff --git a/src/Simplex/Messaging/Server/StoreLog.hs b/src/Simplex/Messaging/Server/StoreLog.hs index 2da3398f2..889cb6046 100644 --- a/src/Simplex/Messaging/Server/StoreLog.hs +++ b/src/Simplex/Messaging/Server/StoreLog.hs @@ -26,6 +26,7 @@ module Simplex.Messaging.Server.StoreLog logUpdateQueueTime, readWriteStoreLog, writeQueueStore, + readQueueStore, ) where @@ -34,10 +35,14 @@ import Control.Concurrent.STM import qualified Control.Exception as E import Control.Logger.Simple import Control.Monad +import Control.Monad.IO.Class +import Control.Monad.Trans.Except import qualified Data.Attoparsec.ByteString.Char8 as A import qualified Data.ByteString.Char8 as B +import qualified Data.ByteString.Lazy.Char8 as LB import qualified Data.Map.Strict as M import qualified Data.Text as T +import Data.Text.Encoding (decodeLatin1) import Data.Time.Clock (getCurrentTime) import Data.Time.Format.ISO8601 (iso8601Show) import GHC.IO (catchAny) @@ -223,7 +228,7 @@ readWriteStoreLog readStore writeStore f st = renameFile tempBackup timedBackup logInfo $ "original state preserved as " <> T.pack timedBackup -writeQueueStore :: STMQueueStore s => StoreLog 'WriteMode -> s -> IO () +writeQueueStore :: MsgStoreClass s => StoreLog 'WriteMode -> s -> IO () writeQueueStore s st = readTVarIO (activeMsgQueues st) >>= mapM_ writeQueue . M.assocs where writeQueue (rId, q) = @@ -231,3 +236,34 @@ writeQueueStore s st = readTVarIO (activeMsgQueues st) >>= mapM_ writeQueue . M. Just q' -> when (active q') $ logCreateQueue s q' -- TODO we should log suspended queues when we use them Nothing -> atomically $ TM.delete rId $ activeMsgQueues st active QueueRec {status} = status == QueueActive + +readQueueStore :: forall s. MsgStoreClass s => FilePath -> s -> IO () +readQueueStore f st = withFile f ReadMode $ LB.hGetContents >=> mapM_ processLine . LB.lines + where + processLine :: LB.ByteString -> IO () + processLine s' = either printError procLogRecord (strDecode s) + where + s = LB.toStrict s' + procLogRecord :: StoreLogRecord -> IO () + procLogRecord = \case + CreateQueue q -> addQueue st q >>= qError (recipientId q) "CreateQueue" + SecureQueue qId sKey -> withQueue qId "SecureQueue" $ \q -> secureQueue st q sKey + AddNotifier qId ntfCreds -> withQueue qId "AddNotifier" $ \q -> addQueueNotifier st q ntfCreds + SuspendQueue qId -> withQueue qId "SuspendQueue" $ suspendQueue st + DeleteQueue qId -> withQueue qId "DeleteQueue" $ deleteQueue st qId + DeleteNotifier qId -> withQueue qId "DeleteNotifier" $ deleteQueueNotifier st + UpdateTime qId t -> withQueue qId "UpdateTime" $ \q -> updateQueueTime st q t + printError :: String -> IO () + printError e = B.putStrLn $ "Error parsing log: " <> B.pack e <> " - " <> s + withQueue :: forall a. RecipientId -> T.Text -> (StoreQueue s -> IO (Either ErrorType a)) -> IO () + withQueue qId op a = runExceptT go >>= qError qId op + where + go = do + q <- ExceptT $ getQueue st SRecipient qId + liftIO (readTVarIO $ queueRec' q) >>= \case + Nothing -> logWarn $ logPfx qId op <> "already deleted" + Just _ -> void $ ExceptT $ a q + qError qId op = \case + Left e -> logError $ logPfx qId op <> tshow e + Right _ -> pure () + logPfx qId op = "STORE: " <> op <> ", stored queue " <> decodeLatin1 (strEncode qId) <> ", " diff --git a/tests/CoreTests/MsgStoreTests.hs b/tests/CoreTests/MsgStoreTests.hs index 35c27c22e..bca1cc872 100644 --- a/tests/CoreTests/MsgStoreTests.hs +++ b/tests/CoreTests/MsgStoreTests.hs @@ -35,7 +35,6 @@ import Simplex.Messaging.Server.MsgStore.Journal import Simplex.Messaging.Server.MsgStore.STM import Simplex.Messaging.Server.MsgStore.Types import Simplex.Messaging.Server.QueueStore -import Simplex.Messaging.Server.QueueStore.STM import Simplex.Messaging.Server.StoreLog (closeStoreLog, logCreateQueue) import SMPClient (testStoreLogFile, testStoreMsgsDir, testStoreMsgsDir2, testStoreMsgsFile, testStoreMsgsFile2) import System.Directory (copyFile, createDirectoryIfMissing, listDirectory, removeFile, renameFile) @@ -69,11 +68,12 @@ withMsgStore cfg = bracket (newMsgStore cfg) closeMsgStore testSMTStoreConfig :: STMStoreConfig testSMTStoreConfig = STMStoreConfig {storePath = Nothing, quota = 3} -testJournalStoreCfg :: JournalStoreConfig +testJournalStoreCfg :: JournalStoreConfig 'MSMemory testJournalStoreCfg = JournalStoreConfig { storePath = testStoreMsgsDir, pathParts = journalMsgStoreDepth, + queueStoreType = SMSMemory, quota = 3, maxMsgCount = 4, maxStateLines = 2, @@ -178,7 +178,7 @@ testChangeReadJournal ms = do (Msg "message 5", Nothing) <- tryDelPeekMsg ms rId q mId5 void $ ExceptT $ deleteQueue ms rId q -testExportImportStore :: JournalMsgStore -> IO () +testExportImportStore :: JournalMsgStore s -> IO () testExportImportStore ms = do g <- C.newRandom (rId1, qr1) <- testNewQueueRec g True @@ -209,7 +209,7 @@ testExportImportStore ms = do closeStoreLog sl exportMessages False ms testStoreMsgsFile False (B.readFile testStoreMsgsFile `shouldReturn`) =<< B.readFile (testStoreMsgsFile <> ".copy") - let cfg = (testJournalStoreCfg :: JournalStoreConfig) {storePath = testStoreMsgsDir2} + let cfg = (testJournalStoreCfg :: JournalStoreConfig 'MSMemory) {storePath = testStoreMsgsDir2} ms' <- newMsgStore cfg readWriteQueueStore testStoreLogFile ms' >>= closeStoreLog stats@MessageStats {storedMsgsCount = 5, expiredMsgsCount = 0, storedQueues = 2} <- @@ -226,7 +226,7 @@ testExportImportStore ms = do exportMessages False stmStore testStoreMsgsFile False (B.sort <$> B.readFile testStoreMsgsFile `shouldReturn`) =<< (B.sort <$> B.readFile (testStoreMsgsFile2 <> ".bak")) -testQueueState :: JournalMsgStore -> IO () +testQueueState :: JournalMsgStore s -> IO () testQueueState ms = do g <- C.newRandom rId <- EntityId <$> atomically (C.randomBytes 24 g) @@ -291,7 +291,7 @@ testQueueState ms = do let f = dir </> name in unless (f == keep) $ removeFile f -testMessageState :: JournalMsgStore -> IO () +testMessageState :: JournalMsgStore s -> IO () testMessageState ms = do g <- C.newRandom (rId, qr) <- testNewQueueRec g True @@ -316,7 +316,7 @@ testMessageState ms = do (Msg "message 3", Nothing) <- tryDelPeekMsg ms rId q mId3 liftIO $ closeMsgQueue q -testReadFileMissing :: JournalMsgStore -> IO () +testReadFileMissing :: JournalMsgStore s -> IO () testReadFileMissing ms = do g <- C.newRandom (rId, qr) <- testNewQueueRec g True @@ -340,7 +340,7 @@ testReadFileMissing ms = do Msg "message 2" <- tryPeekMsg ms rId q' pure () -testReadFileMissingSwitch :: JournalMsgStore -> IO () +testReadFileMissingSwitch :: JournalMsgStore s -> IO () testReadFileMissingSwitch ms = do g <- C.newRandom (rId, qr) <- testNewQueueRec g True @@ -358,7 +358,7 @@ testReadFileMissingSwitch ms = do Msg "message 5" <- tryPeekMsg ms rId q' pure () -testWriteFileMissing :: JournalMsgStore -> IO () +testWriteFileMissing :: JournalMsgStore s -> IO () testWriteFileMissing ms = do g <- C.newRandom (rId, qr) <- testNewQueueRec g True @@ -381,7 +381,7 @@ testWriteFileMissing ms = do Msg "message 6" <- tryPeekMsg ms rId q' pure () -testReadAndWriteFilesMissing :: JournalMsgStore -> IO () +testReadAndWriteFilesMissing :: JournalMsgStore s -> IO () testReadAndWriteFilesMissing ms = do g <- C.newRandom (rId, qr) <- testNewQueueRec g True @@ -400,7 +400,7 @@ testReadAndWriteFilesMissing ms = do Msg "message 6" <- tryPeekMsg ms rId q' pure () -writeMessages :: JournalMsgStore -> RecipientId -> QueueRec -> IO JournalQueue +writeMessages :: JournalMsgStore s -> RecipientId -> QueueRec -> IO (JournalQueue s) writeMessages ms rId qr = runRight $ do q <- ExceptT $ addQueue ms qr let write s = writeMsg ms rId q True =<< mkMessage s diff --git a/tests/CoreTests/StoreLogTests.hs b/tests/CoreTests/StoreLogTests.hs index e24f9f1ea..5de40f0ef 100644 --- a/tests/CoreTests/StoreLogTests.hs +++ b/tests/CoreTests/StoreLogTests.hs @@ -108,5 +108,5 @@ testSMPStoreLog testSuite tests = closeStoreLog l ([], compacted') <- partitionEithers . map strDecode . B.lines <$> B.readFile testStoreLogFile compacted' `shouldBe` compacted - storeState :: JournalMsgStore -> IO (M.Map RecipientId QueueRec) - storeState st = M.mapMaybe id <$> (readTVarIO (queues st) >>= mapM (readTVarIO . queueRec')) + storeState :: JournalMsgStore 'MSMemory -> IO (M.Map RecipientId QueueRec) + storeState st = M.mapMaybe id <$> (readTVarIO (queues' st) >>= mapM (readTVarIO . queueRec'))