Skip to content

Commit

Permalink
parameterize journal store for queue storage
Browse files Browse the repository at this point in the history
  • Loading branch information
epoberezkin committed Dec 12, 2024
1 parent 79e9447 commit fd190d1
Show file tree
Hide file tree
Showing 10 changed files with 246 additions and 165 deletions.
13 changes: 6 additions & 7 deletions src/Simplex/Messaging/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ import Simplex.Messaging.Server.MsgStore.Types
import Simplex.Messaging.Server.NtfStore
import Simplex.Messaging.Server.QueueStore
import Simplex.Messaging.Server.QueueStore.QueueInfo
import Simplex.Messaging.Server.QueueStore.STM
import Simplex.Messaging.Server.Stats
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
Expand Down Expand Up @@ -841,7 +840,7 @@ runClientTransport h@THandle {params = thParams@THandleParams {thVersion, sessio
c <- liftIO $ newClient msType clientId q thVersion sessionId ts
runClientThreads msType ms active c clientId `finally` clientDisconnected c
where
runClientThreads :: STMQueueStore (MsgStore s) => SMSType s -> MsgStore s -> TVar (IM.IntMap (Maybe AClient)) -> Client (MsgStore s) -> IS.Key -> M ()
runClientThreads :: MsgStoreClass (MsgStore s) => SMSType s -> MsgStore s -> TVar (IM.IntMap (Maybe AClient)) -> Client (MsgStore s) -> IS.Key -> M ()
runClientThreads msType ms active c clientId = do
atomically $ modifyTVar' active $ IM.insert clientId $ Just (AClient msType c)
s <- asks server
Expand Down Expand Up @@ -897,7 +896,7 @@ cancelSub s = case subThread s of
_ -> pure ()
ProhibitSub -> pure ()

receive :: forall c s. (Transport c, STMQueueStore s) => THandleSMP c 'TServer -> s -> Client s -> M ()
receive :: forall c s. (Transport c, MsgStoreClass s) => THandleSMP c 'TServer -> s -> Client s -> M ()
receive h@THandle {params = THandleParams {thAuth}} ms Client {rcvQ, sndQ, rcvActiveAt, sessionId} = do
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " receive"
sa <- asks serverActive
Expand Down Expand Up @@ -997,7 +996,7 @@ data VerificationResult s = VRVerified (Maybe (StoreQueue s, QueueRec)) | VRFail
-- - the queue or party key do not exist.
-- In all cases, the time of the verification should depend only on the provided authorization type,
-- a dummy key is used to run verification in the last two cases, and failure is returned irrespective of the result.
verifyTransmission :: forall s. STMQueueStore s => s -> Maybe (THandleAuth 'TServer, C.CbNonce) -> Maybe TransmissionAuth -> ByteString -> QueueId -> Cmd -> M (VerificationResult s)
verifyTransmission :: forall s. MsgStoreClass s => s -> Maybe (THandleAuth 'TServer, C.CbNonce) -> Maybe TransmissionAuth -> ByteString -> QueueId -> Cmd -> M (VerificationResult s)
verifyTransmission ms auth_ tAuth authorized queueId cmd =
case cmd of
Cmd SRecipient (NEW k _ _ _ _) -> pure $ Nothing `verifiedWith` k
Expand Down Expand Up @@ -1074,7 +1073,7 @@ forkClient Client {endThreads, endThreadSeq} label action = do
action `finally` atomically (modifyTVar' endThreads $ IM.delete tId)
mkWeakThreadId t >>= atomically . modifyTVar' endThreads . IM.insert tId

client :: forall s. STMQueueStore s => THandleParams SMPVersion 'TServer -> Server -> s -> Client s -> M ()
client :: forall s. MsgStoreClass s => THandleParams SMPVersion 'TServer -> Server -> s -> Client s -> M ()
client
thParams'
Server {subscribedQ, ntfSubscribedQ, subscribers}
Expand Down Expand Up @@ -1768,7 +1767,7 @@ processServerMessages = do
stored'' <- getQueueSize ms rId q
liftIO $ closeMsgQueue q
pure (stored'', expired'')
processValidateQueue :: RecipientId -> JournalQueue -> IO MessageStats
processValidateQueue :: RecipientId -> JournalQueue 'MSMemory -> IO MessageStats
processValidateQueue rId q =
runExceptT (getQueueSize ms rId q) >>= \case
Right storedMsgsCount -> pure newMessageStats {storedMsgsCount, storedQueues = 1}
Expand All @@ -1777,7 +1776,7 @@ processServerMessages = do
exitFailure

-- TODO this function should be called after importing queues from store log
importMessages :: forall s. STMQueueStore s => Bool -> s -> FilePath -> Maybe Int64 -> IO MessageStats
importMessages :: forall s. MsgStoreClass s => Bool -> s -> FilePath -> Maybe Int64 -> IO MessageStats
importMessages tty ms f old_ = do
logInfo $ "restoring messages from file " <> T.pack f
LB.readFile f >>= runExceptT . foldM restoreMsg (0, Nothing, (0, 0, M.empty)) . LB.lines >>= \case
Expand Down
8 changes: 4 additions & 4 deletions src/Simplex/Messaging/Server/Env/STM.hs
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,9 @@ data Env = Env

type family MsgStore s where
MsgStore 'MSMemory = STMMsgStore
MsgStore 'MSJournal = JournalMsgStore
MsgStore 'MSJournal = JournalMsgStore 'MSMemory

data AMsgStore = forall s. (STMQueueStore (MsgStore s), MsgStoreClass (MsgStore s)) => AMS (SMSType s) (MsgStore s)
data AMsgStore = forall s. (StoreClass (MsgStore s), MsgStoreClass (MsgStore s)) => AMS (SMSType s) (MsgStore s)

data AStoreQueue = forall s. MsgStoreClass (MsgStore s) => ASQ (SMSType s) (StoreQueue (MsgStore s))

Expand Down Expand Up @@ -295,7 +295,7 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, storeLogFile, msgSt
AMSType SMSMemory -> AMS SMSMemory <$> newMsgStore STMStoreConfig {storePath = storeMsgsFile, quota = msgQueueQuota}
AMSType SMSJournal -> case storeMsgsFile of
Just storePath ->
let cfg = JournalStoreConfig {storePath, quota = msgQueueQuota, pathParts = journalMsgStoreDepth, maxMsgCount = maxJournalMsgCount, maxStateLines = maxJournalStateLines, stateTailSize = defaultStateTailSize, idleInterval = idleQueueInterval}
let cfg = JournalStoreConfig {storePath, quota = msgQueueQuota, pathParts = journalMsgStoreDepth, queueStoreType = SMSMemory, maxMsgCount = maxJournalMsgCount, maxStateLines = maxJournalStateLines, stateTailSize = defaultStateTailSize, idleInterval = idleQueueInterval}
in AMS SMSJournal <$> newMsgStore cfg
Nothing -> putStrLn "Error: journal msg store require path in [STORE_LOG], restore_messages" >> exitFailure
ntfStore <- NtfStore <$> TM.emptyIO
Expand Down Expand Up @@ -359,5 +359,5 @@ newSMPProxyAgent smpAgentCfg random = do
smpAgent <- newSMPClientAgent smpAgentCfg random
pure ProxyAgent {smpAgent}

readWriteQueueStore :: STMQueueStore s => FilePath -> s -> IO (StoreLog 'WriteMode)
readWriteQueueStore :: MsgStoreClass s => FilePath -> s -> IO (StoreLog 'WriteMode)
readWriteQueueStore = readWriteStoreLog readQueueStore writeQueueStore
2 changes: 1 addition & 1 deletion src/Simplex/Messaging/Server/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
doesFileExist iniFile >>= \case
True -> readIniFile iniFile >>= either exitError a
_ -> exitError $ "Error: server is not initialized (" <> iniFile <> " does not exist).\nRun `" <> executableName <> " init`."
newJournalMsgStore = newMsgStore JournalStoreConfig {storePath = storeMsgsJournalDir, pathParts = journalMsgStoreDepth, quota = defaultMsgQueueQuota, maxMsgCount = defaultMaxJournalMsgCount, maxStateLines = defaultMaxJournalStateLines, stateTailSize = defaultStateTailSize, idleInterval = checkInterval defaultMessageExpiration}
newJournalMsgStore = newMsgStore JournalStoreConfig {storePath = storeMsgsJournalDir, pathParts = journalMsgStoreDepth, queueStoreType = SMSMemory, quota = defaultMsgQueueQuota, maxMsgCount = defaultMaxJournalMsgCount, maxStateLines = defaultMaxJournalStateLines, stateTailSize = defaultStateTailSize, idleInterval = checkInterval defaultMessageExpiration}
iniFile = combine cfgPath "smp-server.ini"
serverVersion = "SMP server v" <> simplexMQVersion
defaultServerPorts = "5223,443"
Expand Down
Loading

0 comments on commit fd190d1

Please sign in to comment.