From 0a827307d666de819d5cef17f70a78be5cd0e410 Mon Sep 17 00:00:00 2001 From: Evgeny Date: Sat, 14 Dec 2024 15:22:06 +0000 Subject: [PATCH 1/3] build: remove some modules from the client build (#1418) * build: remove some modules from the client build * remove websockets from client_library --- simplexmq.cabal | 26 +++++----- src/Simplex/FileTransfer/Agent.hs | 3 +- src/Simplex/FileTransfer/Client.hs | 42 +++++++++++++++ src/Simplex/FileTransfer/Client/Main.hs | 69 +------------------------ src/Simplex/FileTransfer/Description.hs | 19 +++++++ src/Simplex/FileTransfer/Types.hs | 10 ++++ src/Simplex/Messaging/Client.hs | 2 - 7 files changed, 86 insertions(+), 85 deletions(-) diff --git a/simplexmq.cabal b/simplexmq.cabal index 519eaf366..06b7f61c8 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -73,7 +73,6 @@ library Simplex.FileTransfer.Chunks Simplex.FileTransfer.Client Simplex.FileTransfer.Client.Agent - Simplex.FileTransfer.Client.Main Simplex.FileTransfer.Client.Presets Simplex.FileTransfer.Crypto Simplex.FileTransfer.Description @@ -152,7 +151,6 @@ library Simplex.Messaging.Notifications.Types Simplex.Messaging.Parsers Simplex.Messaging.Protocol - Simplex.Messaging.Server.CLI Simplex.Messaging.Server.Expiration Simplex.Messaging.Server.QueueStore.QueueInfo Simplex.Messaging.ServiceScheme @@ -168,7 +166,6 @@ library Simplex.Messaging.Transport.HTTP2.Server Simplex.Messaging.Transport.KeepAlive Simplex.Messaging.Transport.Server - Simplex.Messaging.Transport.WebSockets Simplex.Messaging.Util Simplex.Messaging.Version Simplex.Messaging.Version.Internal @@ -179,6 +176,7 @@ library Simplex.RemoteControl.Types if !flag(client_library) exposed-modules: + Simplex.FileTransfer.Client.Main Simplex.FileTransfer.Server Simplex.FileTransfer.Server.Control Simplex.FileTransfer.Server.Env @@ -186,7 +184,17 @@ library Simplex.FileTransfer.Server.Stats Simplex.FileTransfer.Server.Store Simplex.FileTransfer.Server.StoreLog + Simplex.Messaging.Notifications.Server + Simplex.Messaging.Notifications.Server.Control + Simplex.Messaging.Notifications.Server.Env + Simplex.Messaging.Notifications.Server.Main + Simplex.Messaging.Notifications.Server.Push.APNS + Simplex.Messaging.Notifications.Server.Push.APNS.Internal + Simplex.Messaging.Notifications.Server.Stats + Simplex.Messaging.Notifications.Server.Store + Simplex.Messaging.Notifications.Server.StoreLog Simplex.Messaging.Server + Simplex.Messaging.Server.CLI Simplex.Messaging.Server.Control Simplex.Messaging.Server.Env.STM Simplex.Messaging.Server.Information @@ -201,15 +209,7 @@ library Simplex.Messaging.Server.Stats Simplex.Messaging.Server.StoreLog Simplex.Messaging.Server.StoreLog.Types - Simplex.Messaging.Notifications.Server - Simplex.Messaging.Notifications.Server.Control - Simplex.Messaging.Notifications.Server.Env - Simplex.Messaging.Notifications.Server.Main - Simplex.Messaging.Notifications.Server.Push.APNS - Simplex.Messaging.Notifications.Server.Push.APNS.Internal - Simplex.Messaging.Notifications.Server.Stats - Simplex.Messaging.Notifications.Server.Store - Simplex.Messaging.Notifications.Server.StoreLog + Simplex.Messaging.Transport.WebSockets other-modules: Paths_simplexmq hs-source-dirs: @@ -270,7 +270,6 @@ library , transformers ==0.6.* , unliftio ==0.2.* , unliftio-core ==0.2.* - , websockets ==0.12.* , yaml ==0.11.* , zstd ==0.1.3.* default-language: Haskell2010 @@ -280,6 +279,7 @@ library build-depends: case-insensitive ==1.2.* , hashable ==1.4.* + , websockets ==0.12.* if impl(ghc >= 9.6.2) build-depends: bytestring ==0.11.* diff --git a/src/Simplex/FileTransfer/Agent.hs b/src/Simplex/FileTransfer/Agent.hs index 2b6c1c5af..9506b465c 100644 --- a/src/Simplex/FileTransfer/Agent.hs +++ b/src/Simplex/FileTransfer/Agent.hs @@ -51,8 +51,7 @@ import Data.Text (Text) import Data.Time.Clock (getCurrentTime) import Data.Time.Format (defaultTimeLocale, formatTime) import Simplex.FileTransfer.Chunks (toKB) -import Simplex.FileTransfer.Client (XFTPChunkSpec (..)) -import Simplex.FileTransfer.Client.Main +import Simplex.FileTransfer.Client (XFTPChunkSpec (..), getChunkDigest, prepareChunkSizes, prepareChunkSpecs, singleChunkSize) import Simplex.FileTransfer.Crypto import Simplex.FileTransfer.Description import Simplex.FileTransfer.Protocol (FileParty (..), SFileParty (..)) diff --git a/src/Simplex/FileTransfer/Client.hs b/src/Simplex/FileTransfer/Client.hs index e5c66e764..de4da07f2 100644 --- a/src/Simplex/FileTransfer/Client.hs +++ b/src/Simplex/FileTransfer/Client.hs @@ -20,14 +20,18 @@ import Data.Bifunctor (first) import Data.ByteString.Builder (Builder, byteString) import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B +import qualified Data.ByteString.Lazy as LB import Data.Int (Int64) +import Data.List (foldl') import Data.List.NonEmpty (NonEmpty (..)) +import Data.Maybe (listToMaybe) import Data.Time.Clock (UTCTime) import Data.Word (Word32) import qualified Data.X509 as X import qualified Data.X509.Validation as XV import qualified Network.HTTP.Types as N import qualified Network.HTTP2.Client as H +import Simplex.FileTransfer.Chunks import Simplex.FileTransfer.Protocol import Simplex.FileTransfer.Transport import Simplex.Messaging.Client @@ -298,3 +302,41 @@ noFile HTTP2Body {bodyPart} a = case bodyPart of -- FACK :: FileCommand Recipient -- PING :: FileCommand Recipient + +singleChunkSize :: Int64 -> Maybe Word32 +singleChunkSize size' = + listToMaybe $ dropWhile (< chunkSize) serverChunkSizes + where + chunkSize = fromIntegral size' + +prepareChunkSizes :: Int64 -> [Word32] +prepareChunkSizes size' = prepareSizes size' + where + (smallSize, bigSize) + | size' > size34 chunkSize3 = (chunkSize2, chunkSize3) + | size' > size34 chunkSize2 = (chunkSize1, chunkSize2) + | otherwise = (chunkSize0, chunkSize1) + size34 sz = (fromIntegral sz * 3) `div` 4 + prepareSizes 0 = [] + prepareSizes size + | size >= fromIntegral bigSize = replicate (fromIntegral n1) bigSize <> prepareSizes remSz + | size > size34 bigSize = [bigSize] + | otherwise = replicate (fromIntegral n2') smallSize + where + (n1, remSz) = size `divMod` fromIntegral bigSize + n2' = let (n2, remSz2) = (size `divMod` fromIntegral smallSize) in if remSz2 == 0 then n2 else n2 + 1 + +prepareChunkSpecs :: FilePath -> [Word32] -> [XFTPChunkSpec] +prepareChunkSpecs filePath chunkSizes = reverse . snd $ foldl' addSpec (0, []) chunkSizes + where + addSpec :: (Int64, [XFTPChunkSpec]) -> Word32 -> (Int64, [XFTPChunkSpec]) + addSpec (chunkOffset, specs) sz = + let spec = XFTPChunkSpec {filePath, chunkOffset, chunkSize = sz} + in (chunkOffset + fromIntegral sz, spec : specs) + +getChunkDigest :: XFTPChunkSpec -> IO ByteString +getChunkDigest XFTPChunkSpec {filePath = chunkPath, chunkOffset, chunkSize} = + withFile chunkPath ReadMode $ \h -> do + hSeek h AbsoluteSeek $ fromIntegral chunkOffset + chunk <- LB.hGet h (fromIntegral chunkSize) + pure $! LC.sha256Hash chunk diff --git a/src/Simplex/FileTransfer/Client/Main.hs b/src/Simplex/FileTransfer/Client/Main.hs index 079392a1b..a330e21a7 100644 --- a/src/Simplex/FileTransfer/Client/Main.hs +++ b/src/Simplex/FileTransfer/Client/Main.hs @@ -19,11 +19,7 @@ module Simplex.FileTransfer.Client.Main singleChunkSize, prepareChunkSizes, prepareChunkSpecs, - maxFileSize, - maxFileSizeHard, - fileSizeLen, getChunkDigest, - SentRecipientReplica (..), ) where @@ -34,7 +30,6 @@ import Control.Monad.Trans.Except import Crypto.Random (ChaChaDRG) import qualified Data.Attoparsec.ByteString.Char8 as A import Data.Bifunctor (first) -import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import qualified Data.ByteString.Lazy.Char8 as LB import Data.Char (toLower) @@ -45,7 +40,7 @@ import Data.List.NonEmpty (NonEmpty (..), nonEmpty) import qualified Data.List.NonEmpty as L import Data.Map.Strict (Map) import qualified Data.Map.Strict as M -import Data.Maybe (fromMaybe, listToMaybe) +import Data.Maybe (fromMaybe) import qualified Data.Text as T import Data.Word (Word32) import GHC.Records (HasField (getField)) @@ -80,20 +75,6 @@ import UnliftIO.Directory xftpClientVersion :: String xftpClientVersion = "1.0.1" --- | Soft limit for XFTP clients. Should be checked and reported to user. -maxFileSize :: Int64 -maxFileSize = gb 1 - -maxFileSizeStr :: String -maxFileSizeStr = B.unpack . strEncode $ FileSize maxFileSize - --- | Hard internal limit for XFTP agent after which it refuses to prepare chunks. -maxFileSizeHard :: Int64 -maxFileSizeHard = gb 5 - -fileSizeLen :: Int64 -fileSizeLen = 8 - newtype CLIError = CLIError String deriving (Eq, Show, Exception) @@ -231,16 +212,6 @@ data SentFileChunkReplica = SentFileChunkReplica } deriving (Show) -data SentRecipientReplica = SentRecipientReplica - { chunkNo :: Int, - server :: XFTPServer, - rcvNo :: Int, - replicaId :: ChunkReplicaId, - replicaKey :: C.APrivateAuthKey, - digest :: FileDigest, - chunkSize :: FileSize Word32 - } - logCfg :: LogConfig logCfg = LogConfig {lc_file = Nothing, lc_stderr = True} @@ -414,13 +385,6 @@ cliSendFileOpts SendOptions {filePath, outputDir, numRecipients, xftpServers, re B.writeFile fdSndPath $ strEncode fdSnd pure (fdRcvPaths, fdSndPath) -getChunkDigest :: XFTPChunkSpec -> IO ByteString -getChunkDigest XFTPChunkSpec {filePath = chunkPath, chunkOffset, chunkSize} = - withFile chunkPath ReadMode $ \h -> do - hSeek h AbsoluteSeek $ fromIntegral chunkOffset - chunk <- LB.hGet h (fromIntegral chunkSize) - pure $! LC.sha256Hash chunk - cliReceiveFile :: ReceiveOptions -> ExceptT CLIError IO () cliReceiveFile ReceiveOptions {fileDescription, filePath, retryCount, tempPath, verbose, yes} = getFileDescription' fileDescription >>= receive @@ -536,37 +500,6 @@ getFileDescription' path = getFileDescription path >>= \case AVFD fd -> either (throwE . CLIError) pure $ checkParty fd -singleChunkSize :: Int64 -> Maybe Word32 -singleChunkSize size' = - listToMaybe $ dropWhile (< chunkSize) serverChunkSizes - where - chunkSize = fromIntegral size' - -prepareChunkSizes :: Int64 -> [Word32] -prepareChunkSizes size' = prepareSizes size' - where - (smallSize, bigSize) - | size' > size34 chunkSize3 = (chunkSize2, chunkSize3) - | size' > size34 chunkSize2 = (chunkSize1, chunkSize2) - | otherwise = (chunkSize0, chunkSize1) - size34 sz = (fromIntegral sz * 3) `div` 4 - prepareSizes 0 = [] - prepareSizes size - | size >= fromIntegral bigSize = replicate (fromIntegral n1) bigSize <> prepareSizes remSz - | size > size34 bigSize = [bigSize] - | otherwise = replicate (fromIntegral n2') smallSize - where - (n1, remSz) = size `divMod` fromIntegral bigSize - n2' = let (n2, remSz2) = (size `divMod` fromIntegral smallSize) in if remSz2 == 0 then n2 else n2 + 1 - -prepareChunkSpecs :: FilePath -> [Word32] -> [XFTPChunkSpec] -prepareChunkSpecs filePath chunkSizes = reverse . snd $ foldl' addSpec (0, []) chunkSizes - where - addSpec :: (Int64, [XFTPChunkSpec]) -> Word32 -> (Int64, [XFTPChunkSpec]) - addSpec (chunkOffset, specs) sz = - let spec = XFTPChunkSpec {filePath, chunkOffset, chunkSize = sz} - in (chunkOffset + fromIntegral sz, spec : specs) - getEncPath :: MonadIO m => Maybe FilePath -> String -> m FilePath getEncPath path name = (`uniqueCombine` (name <> ".encrypted")) =<< maybe (liftIO getCanonicalTemporaryDirectory) pure path diff --git a/src/Simplex/FileTransfer/Description.hs b/src/Simplex/FileTransfer/Description.hs index 8cb98fd32..865daf23d 100644 --- a/src/Simplex/FileTransfer/Description.hs +++ b/src/Simplex/FileTransfer/Description.hs @@ -37,6 +37,10 @@ module Simplex.FileTransfer.Description FileClientData, fileDescriptionURI, qrSizeLimit, + maxFileSize, + maxFileSizeStr, + maxFileSizeHard, + fileSizeLen, ) where @@ -266,6 +270,21 @@ instance StrEncoding FileDescriptionURI where qrSizeLimit :: Int qrSizeLimit = 1002 -- ~2 chunks in URLencoded YAML with some spare size for server hosts +-- | Soft limit for XFTP clients. Should be checked and reported to user. +maxFileSize :: Int64 +maxFileSize = gb 1 + +maxFileSizeStr :: String +maxFileSizeStr = B.unpack . strEncode $ FileSize maxFileSize + +-- | Hard internal limit for XFTP agent after which it refuses to prepare chunks. +maxFileSizeHard :: Int64 +maxFileSizeHard = gb 5 + +fileSizeLen :: Int64 +fileSizeLen = 8 + + instance (Integral a, Show a) => StrEncoding (FileSize a) where strEncode (FileSize b) | b' /= 0 = bshow b diff --git a/src/Simplex/FileTransfer/Types.hs b/src/Simplex/FileTransfer/Types.hs index 8569bdd12..571cf3748 100644 --- a/src/Simplex/FileTransfer/Types.hs +++ b/src/Simplex/FileTransfer/Types.hs @@ -246,6 +246,16 @@ data DeletedSndChunkReplica = DeletedSndChunkReplica } deriving (Show) +data SentRecipientReplica = SentRecipientReplica + { chunkNo :: Int, + server :: XFTPServer, + rcvNo :: Int, + replicaId :: ChunkReplicaId, + replicaKey :: C.APrivateAuthKey, + digest :: FileDigest, + chunkSize :: FileSize Word32 + } + data FileErrorType = -- | cannot proceed with download from not approved relays without proxy NOT_APPROVED diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index 2ba7d9edf..9a030d4a5 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -145,7 +145,6 @@ import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport import Simplex.Messaging.Transport.Client (SocksAuth (..), SocksProxyWithAuth (..), TransportClientConfig (..), TransportHost (..), defaultSMPPort, defaultTcpConnectTimeout, runTransportClient) import Simplex.Messaging.Transport.KeepAlive -import Simplex.Messaging.Transport.WebSockets (WS) import Simplex.Messaging.Util (bshow, diffToMicroseconds, ifM, liftEitherWith, raceAny_, threadDelay', tshow, whenM) import Simplex.Messaging.Version import System.Mem.Weak (Weak, deRefWeak) @@ -544,7 +543,6 @@ getProtocolClient g transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize "" -> case protocolTypeI @(ProtoType msg) of SPSMP | smpWebPort -> ("443", transport @TLS) _ -> defaultTransport cfg - "80" -> ("80", transport @WS) p -> (p, transport @TLS) client :: forall c. Transport c => TProxy c -> PClient v err msg -> TMVar (Either (ProtocolClientError err) (ProtocolClient v err msg)) -> c -> IO () From 4d640c16aaaad5c8d206b690b6c901411d28098d Mon Sep 17 00:00:00 2001 From: Evgeny Date: Wed, 18 Dec 2024 11:19:13 +0000 Subject: [PATCH 2/3] smp server: log prometheus metrics (#1411) * smp server: log prometheus metrics * save metrics * diff * lines * version * do not include Prometheus into client * corrections Co-authored-by: sh <37271604+shumvgolove@users.noreply.github.com> * corrections Co-authored-by: sh <37271604+shumvgolove@users.noreply.github.com> * corrections Co-authored-by: sh <37271604+shumvgolove@users.noreply.github.com> * add timestamp to metrics * remove type * remove version --------- Co-authored-by: sh <37271604+shumvgolove@users.noreply.github.com> --- simplexmq.cabal | 1 + src/Simplex/Messaging/Server.hs | 64 +++- src/Simplex/Messaging/Server/Env/STM.hs | 3 + src/Simplex/Messaging/Server/Main.hs | 6 +- src/Simplex/Messaging/Server/Prometheus.hs | 385 +++++++++++++++++++++ src/Simplex/Messaging/Server/Stats.hs | 8 + src/Simplex/Messaging/Transport/Server.hs | 10 + tests/SMPClient.hs | 7 +- tests/ServerTests.hs | 10 +- 9 files changed, 484 insertions(+), 10 deletions(-) create mode 100644 src/Simplex/Messaging/Server/Prometheus.hs diff --git a/simplexmq.cabal b/simplexmq.cabal index 06b7f61c8..e722ef4ee 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -204,6 +204,7 @@ library Simplex.Messaging.Server.MsgStore.STM Simplex.Messaging.Server.MsgStore.Types Simplex.Messaging.Server.NtfStore + Simplex.Messaging.Server.Prometheus Simplex.Messaging.Server.QueueStore Simplex.Messaging.Server.QueueStore.STM Simplex.Messaging.Server.Stats diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 988639f5c..afec6e332 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -71,6 +71,7 @@ import Data.Maybe (catMaybes, fromMaybe, isJust, isNothing) import Data.Semigroup (Sum (..)) import qualified Data.Text as T import Data.Text.Encoding (decodeLatin1) +import qualified Data.Text.IO as T import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime) import Data.Time.Clock.System (SystemTime (..), getSystemTime) import Data.Time.Format.ISO8601 (iso8601Show) @@ -98,6 +99,7 @@ import Simplex.Messaging.Server.MsgStore.Journal (JournalQueue, closeMsgQueue) import Simplex.Messaging.Server.MsgStore.STM import Simplex.Messaging.Server.MsgStore.Types import Simplex.Messaging.Server.NtfStore +import Simplex.Messaging.Server.Prometheus import Simplex.Messaging.Server.QueueStore import Simplex.Messaging.Server.QueueStore.QueueInfo import Simplex.Messaging.Server.QueueStore.STM @@ -176,7 +178,11 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT : receiveFromProxyAgent pa : expireNtfsThread cfg : sigIntHandlerThread - : map runServer transports <> expireMessagesThread_ cfg <> serverStatsThread_ cfg <> controlPortThread_ cfg + : map runServer transports + <> expireMessagesThread_ cfg + <> serverStatsThread_ cfg + <> prometheusMetricsThread_ cfg + <> controlPortThread_ cfg ) `finally` stopServer s where @@ -555,6 +561,50 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT showProxyStats ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther} = [show _pRequests, show _pSuccesses, show _pErrorsConnect, show _pErrorsCompat, show _pErrorsOther] + prometheusMetricsThread_ :: ServerConfig -> [M ()] + prometheusMetricsThread_ ServerConfig {prometheusInterval = Just interval, prometheusMetricsFile} = + [savePrometheusMetrics interval prometheusMetricsFile] + prometheusMetricsThread_ _ = [] + + savePrometheusMetrics :: Int -> FilePath -> M () + savePrometheusMetrics saveInterval metricsFile = do + labelMyThread "savePrometheusMetrics" + liftIO $ putStrLn $ "Prometheus metrics saved every " <> show saveInterval <> " seconds to " <> metricsFile + AMS _ st <- asks msgStore + ss <- asks serverStats + env <- ask + let interval = 1000000 * saveInterval + liftIO $ forever $ do + threadDelay interval + ts <- getCurrentTime + sm <- getServerMetrics st ss + rtm <- getRealTimeMetrics env + T.writeFile metricsFile $ prometheusMetrics sm rtm ts + + getServerMetrics :: STMQueueStore s => s -> ServerStats -> IO ServerMetrics + getServerMetrics st ss = do + d <- getServerStatsData ss + let ps = periodStatDataCounts $ _activeQueues d + psNtf = periodStatDataCounts $ _activeQueuesNtf d + queueCount <- M.size <$> readTVarIO (activeMsgQueues st) + notifierCount <- M.size <$> readTVarIO (notifiers' st) + pure ServerMetrics {statsData = d, activeQueueCounts = ps, activeNtfCounts = psNtf, queueCount, notifierCount} + + getRealTimeMetrics :: Env -> IO RealTimeMetrics + getRealTimeMetrics Env {clients, sockets, server = Server {subscribers, notifiers, subClients, ntfSubClients}} = do + socketStats <- mapM (traverse getSocketStats) =<< readTVarIO sockets +#if MIN_VERSION_base(4,18,0) + threadsCount <- length <$> listThreads +#else + let threadsCount = 0 +#endif + clientsCount <- IM.size <$> readTVarIO clients + smpSubsCount <- M.size <$> readTVarIO subscribers + smpSubClientsCount <- IM.size <$> readTVarIO subClients + ntfSubsCount <- M.size <$> readTVarIO notifiers + ntfSubClientsCount <- IM.size <$> readTVarIO ntfSubClients + pure RealTimeMetrics {socketStats, threadsCount, clientsCount, smpSubsCount, smpSubClientsCount, ntfSubsCount, ntfSubClientsCount} + runClient :: Transport c => C.APrivateSignKey -> TProxy c -> c -> M () runClient signKey tp h = do kh <- asks serverIdentity @@ -695,13 +745,13 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT #endif CPSockets -> withUserRole $ unliftIO u (asks sockets) >>= readTVarIO >>= mapM_ putSockets where - putSockets (tcpPort, (accepted', closed', active')) = do - (accepted, closed, active) <- (,,) <$> readTVarIO accepted' <*> readTVarIO closed' <*> readTVarIO active' + putSockets (tcpPort, socketsState) = do + ss <- getSocketStats socketsState hPutStrLn h $ "Sockets for port " <> tcpPort <> ":" - hPutStrLn h $ "accepted: " <> show accepted - hPutStrLn h $ "closed: " <> show closed - hPutStrLn h $ "active: " <> show (IM.size active) - hPutStrLn h $ "leaked: " <> show (accepted - closed - IM.size active) + hPutStrLn h $ "accepted: " <> show (socketsAccepted ss) + hPutStrLn h $ "closed: " <> show (socketsClosed ss) + hPutStrLn h $ "active: " <> show (socketsActive ss) + hPutStrLn h $ "leaked: " <> show (socketsLeaked ss) CPSocketThreads -> withAdminRole $ do #if MIN_VERSION_base(4,18,0) unliftIO u (asks sockets) >>= readTVarIO >>= mapM_ putSocketThreads diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index f598bdcb8..f7a9cc7e8 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -96,6 +96,9 @@ data ServerConfig = ServerConfig serverStatsLogFile :: FilePath, -- | file to save and restore stats serverStatsBackupFile :: Maybe FilePath, + -- | interval and file to save prometheus metrics + prometheusInterval :: Maybe Int, + prometheusMetricsFile :: FilePath, -- | notification delivery interval ntfDeliveryInterval :: Int, -- | interval between sending pending END events to unsubscribed clients, seconds diff --git a/src/Simplex/Messaging/Server/Main.hs b/src/Simplex/Messaging/Server/Main.hs index 3da2aaeb4..89b032661 100644 --- a/src/Simplex/Messaging/Server/Main.hs +++ b/src/Simplex/Messaging/Server/Main.hs @@ -253,7 +253,9 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = <> ("expire_ntfs_hours: " <> tshow defNtfExpirationHours <> "\n\n") <> "# Log daily server statistics to CSV file\n" <> ("log_stats: " <> onOff logStats <> "\n\n") - <> "[AUTH]\n\ + <> "# Log interval for real-time Prometheus metrics\n\ + \# prometheus_interval: 300\n\n\ + \[AUTH]\n\ \# Set new_queues option to off to completely prohibit creating new messaging queues.\n\ \# This can be useful when you want to decommission the server, but not all connections are switched yet.\n\ \new_queues: on\n\n\ @@ -431,6 +433,8 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = logStatsStartTime = 0, -- seconds from 00:00 UTC serverStatsLogFile = combine logPath "smp-server-stats.daily.log", serverStatsBackupFile = logStats $> combine logPath "smp-server-stats.log", + prometheusInterval = eitherToMaybe $ read . T.unpack <$> lookupValue "STORE_LOG" "prometheus_interval" ini, + prometheusMetricsFile = combine logPath "smp-server-metrics.txt", pendingENDInterval = 15000000, -- 15 seconds ntfDeliveryInterval = 3000000, -- 3 seconds smpServerVRange = supportedServerSMPRelayVRange, diff --git a/src/Simplex/Messaging/Server/Prometheus.hs b/src/Simplex/Messaging/Server/Prometheus.hs new file mode 100644 index 000000000..869c13e63 --- /dev/null +++ b/src/Simplex/Messaging/Server/Prometheus.hs @@ -0,0 +1,385 @@ +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE TypeApplications #-} +{-# OPTIONS_GHC -fno-warn-unrecognised-pragmas #-} + +module Simplex.Messaging.Server.Prometheus where + +import Data.Int (Int64) +import Data.Text (Text) +import qualified Data.Text as T +import Data.Time.Clock (UTCTime (..), diffUTCTime) +import Data.Time.Clock.System (systemEpochDay) +import Data.Time.Format.ISO8601 (iso8601Show) +import Network.Socket (ServiceName) +import Simplex.Messaging.Server.Stats + +data ServerMetrics = ServerMetrics + { statsData :: ServerStatsData, + activeQueueCounts :: PeriodStatCounts, + activeNtfCounts :: PeriodStatCounts, + queueCount :: Int, + notifierCount :: Int + } + +data RealTimeMetrics = RealTimeMetrics + { socketStats :: [(ServiceName, SocketStats)], + threadsCount :: Int, + clientsCount :: Int, + smpSubsCount :: Int, + smpSubClientsCount :: Int, + ntfSubsCount :: Int, + ntfSubClientsCount :: Int + } + +data SocketStats = SocketStats + { socketsAccepted :: Int, + socketsClosed :: Int, + socketsActive :: Int, + socketsLeaked :: Int + } + +{-# FOURMOLU_DISABLE\n#-} +prometheusMetrics :: ServerMetrics -> RealTimeMetrics -> UTCTime -> Text +prometheusMetrics sm rtm ts = + time <> queues <> subscriptions <> messages <> ntfMessages <> ntfs <> relays <> info + where + ServerMetrics {statsData, activeQueueCounts = ps, activeNtfCounts = psNtf, queueCount, notifierCount} = sm + RealTimeMetrics + { socketStats, + threadsCount, + clientsCount, + smpSubsCount, + smpSubClientsCount, + ntfSubsCount, + ntfSubClientsCount + } = rtm + ServerStatsData + { _fromTime, + _qCreated, + _qSecured, + _qDeletedAll, + _qDeletedAllB, + _qDeletedNew, + _qDeletedSecured, + _qSub, + _qSubAllB, + _qSubAuth, + _qSubDuplicate, + _qSubProhibited, + _qSubEnd, + _qSubEndB, + _ntfCreated, + _ntfDeleted, + _ntfDeletedB, + _ntfSub, + _ntfSubB, + _ntfSubAuth, + _ntfSubDuplicate, + _msgSent, + _msgSentAuth, + _msgSentQuota, + _msgSentLarge, + _msgRecv, + _msgRecvGet, + _msgGet, + _msgGetNoMsg, + _msgGetAuth, + _msgGetDuplicate, + _msgGetProhibited, + _msgExpired, + _activeQueues, + _msgSentNtf, + _msgRecvNtf, + _activeQueuesNtf, + _msgNtfs, + _msgNtfsB, + _msgNtfNoSub, + _msgNtfLost, + _msgNtfExpired, + _pRelays, + _pRelaysOwn, + _pMsgFwds, + _pMsgFwdsOwn, + _pMsgFwdsRecv, + _qCount, + _msgCount, + _ntfCount + } = statsData + time = + "# Recorded at: " <> T.pack (iso8601Show ts) <> "\n\ + \# Stats from: " <> T.pack (iso8601Show _fromTime) <> "\n\ + \\n" + queues = + "# Queues\n\ + \# ------\n\ + \\n\ + \# HELP simplex_smp_queues_created Created queues\n\ + \# TYPE simplex_smp_queues_created counter\n\ + \simplex_smp_queues_created " <> mshow _qCreated <> "\n# qCreated\n\ + \\n\ + \# HELP simplex_smp_queues_secured Secured queues\n\ + \# TYPE simplex_smp_queues_secured counter\n\ + \simplex_smp_queues_secured " <> mshow _qSecured <> "\n# qSecured\n\ + \\n\ + \# HELP simplex_smp_queues_deleted Deleted queues\n\ + \# TYPE simplex_smp_queues_deleted counter\n\ + \simplex_smp_queues_deleted{type=\"all\"} " <> mshow _qDeletedAll <> "\n# qDeleted\n\ + \simplex_smp_queues_deleted{type=\"new\"} " <> mshow _qDeletedNew <> "\n# qDeletedNew\n\ + \simplex_smp_queues_deleted{type=\"secured\"} " <> mshow _qDeletedSecured <> "\n# qDeletedSecured\n\ + \\n\ + \# HELP simplex_smp_queues_deleted_batch Batched requests to delete queues\n\ + \# TYPE simplex_smp_queues_deleted_batch counter\n\ + \simplex_smp_queues_deleted_batch " <> mshow _qDeletedAllB <> "\n# qDeletedAllB\n\ + \\n\ + \# HELP simplex_smp_queues_total1 Total number of stored queues (first type of count).\n\ + \# TYPE simplex_smp_queues_total1 gauge\n\ + \simplex_smp_queues_total1 " <> mshow _qCount <> "\n# qCount\n\ + \\n\ + \# HELP simplex_smp_queues_total2 Total number of stored queues (second type of count).\n\ + \# TYPE simplex_smp_queues_total2 gauge\n\ + \simplex_smp_queues_total2 " <> mshow queueCount <> "\n# qCount2\n\ + \\n\ + \# HELP simplex_smp_queues_daily Daily active queues.\n\ + \# TYPE simplex_smp_queues_daily gauge\n\ + \simplex_smp_queues_daily " <> mstr (dayCount ps) <> "\n# dayMsgQueues\n\ + \\n\ + \# HELP simplex_smp_queues_weekly Weekly active queues.\n\ + \# TYPE simplex_smp_queues_weekly gauge\n\ + \simplex_smp_queues_weekly " <> mstr (weekCount ps) <> "\n# weekMsgQueues\n\ + \\n\ + \# HELP simplex_smp_queues_monthly Monthly active queues.\n\ + \# TYPE simplex_smp_queues_monthly gauge\n\ + \simplex_smp_queues_monthly " <> mstr (monthCount ps) <> "\n# monthMsgQueues\n\ + \\n\ + \# HELP simplex_smp_queues_notify_daily Daily active queues with notifications.\n\ + \# TYPE simplex_smp_queues_notify_daily gauge\n\ + \simplex_smp_queues_notify_daily " <> mstr (dayCount psNtf) <> "\n# dayCountNtf\n\ + \\n\ + \# HELP simplex_smp_queues_notify_weekly Weekly active queues with notifications.\n\ + \# TYPE simplex_smp_queues_notify_weekly gauge\n\ + \simplex_smp_queues_notify_weekly " <> mstr (weekCount psNtf) <> "\n# weekCountNtf\n\ + \\n\ + \# HELP simplex_smp_queues_notify_monthly Monthly active queues with notifications.\n\ + \# TYPE simplex_smp_queues_notify_monthly gauge\n\ + \simplex_smp_queues_notify_monthly " <> mstr (monthCount psNtf) <> "\n# monthCountNtf\n\ + \\n" + subscriptions = + "# Subscriptions\n\ + \# -------------\n\ + \\n\ + \# HELP simplex_smp_subscribtion_successes Successful subscriptions.\n\ + \# TYPE simplex_smp_subscribtion_successes counter\n\ + \simplex_smp_subscribtion_successes " <> mshow _qSub <> "\n# qSub\n\ + \\n\ + \# HELP simplex_smp_subscribtion_successes_batch Batched successful subscriptions.\n\ + \# TYPE simplex_smp_subscribtion_successes_batch counter\n\ + \simplex_smp_subscribtion_successes_batch " <> mshow _qSubAllB <> "\n# qSubAllB\n\ + \\n\ + \# HELP simplex_smp_subscribtion_end Ended subscriptions.\n\ + \# TYPE simplex_smp_subscribtion_end counter\n\ + \simplex_smp_subscribtion_end " <> mshow _qSubEnd <> "\n# qSubEnd\n\ + \\n\ + \# HELP simplex_smp_subscribtion_end_batch Batched ended subscriptions.\n\ + \# TYPE simplex_smp_subscribtion_end_batch counter\n\ + \simplex_smp_subscribtion_end_batch " <> mshow _qSubEndB <> "\n# qSubEndB\n\ + \\n\ + \# HELP simplex_smp_subscribtion_errors Subscription errors.\n\ + \# TYPE simplex_smp_subscribtion_errors counter\n\ + \simplex_smp_subscribtion_errors{type=\"auth\"} " <> mshow _qSubAuth <> "\n# qSubAuth\n\ + \simplex_smp_subscribtion_errors{type=\"duplicate\"} " <> mshow _qSubDuplicate <> "\n# qSubDuplicate\n\ + \simplex_smp_subscribtion_errors{type=\"prohibited\"} " <> mshow _qSubProhibited <> "\n# qSubProhibited\n\ + \\n" + messages = + "# Messages\n\ + \# --------\n\ + \\n\ + \# HELP simplex_smp_messages_sent Sent messages.\n\ + \# TYPE simplex_smp_messages_sent counter\n\ + \simplex_smp_messages_sent " <> mshow _msgSent <> "\n# msgSent\n\ + \\n\ + \# HELP simplex_smp_messages_sent_errors Total number of messages errors by type.\n\ + \# TYPE simplex_smp_messages_sent_errors counter\n\ + \simplex_smp_messages_sent_errors{type=\"auth\"} " <> mshow _msgSentAuth <> "\n# msgSentAuth\n\ + \simplex_smp_messages_sent_errors{type=\"quota\"} " <> mshow _msgSentQuota <> "\n# msgSentQuota\n\ + \simplex_smp_messages_sent_errors{type=\"large\"} " <> mshow _msgSentLarge <> "\n# msgSentLarge\n\ + \\n\ + \# HELP simplex_smp_messages_received Received messages.\n\ + \# TYPE simplex_smp_messages_received counter\n\ + \simplex_smp_messages_received " <> mshow _msgRecv <> "\n# msgRecv\n\ + \\n\ + \# HELP simplex_smp_messages_expired Expired messages.\n\ + \# TYPE simplex_smp_messages_expired counter\n\ + \simplex_smp_messages_expired " <> mshow _msgExpired <> "\n# msgExpired\n\ + \\n\ + \# HELP simplex_smp_messages_total Total number of messages stored.\n\ + \# TYPE simplex_smp_messages_total gauge\n\ + \simplex_smp_messages_total " <> mshow _msgCount <> "\n# msgCount\n\ + \\n" + ntfMessages = + "# Notification messages (client)\n\ + \# ------------------------------\n\ + \\n\ + \# HELP simplex_smp_messages_notify_sent Sent messages with notification flag (cleint).\n\ + \# TYPE simplex_smp_messages_notify_sent counter\n\ + \simplex_smp_messages_notify_sent " <> mshow _msgSentNtf <> "\n# msgSentNtf\n\ + \\n\ + \# HELP simplex_smp_messages_notify_received Received messages with notification flag (client).\n\ + \# TYPE simplex_smp_messages_notify_received counter\n\ + \simplex_smp_messages_notify_received " <> mshow _msgRecvNtf <> "\n# msgRecvNtf\n\ + \\n\ + \# HELP simplex_smp_messages_notify_get_sent Requests to get messages with notification flag (client).\n\ + \# TYPE simplex_smp_messages_notify_get_sent counter\n\ + \simplex_smp_messages_notify_get_sent " <> mshow _msgGet <> "\n# msgGet\n\ + \\n\ + \# HELP simplex_smp_messages_notify_get_received Succesfully received get requests messages with notification flag (client).\n\ + \# TYPE simplex_smp_messages_notify_get_received counter\n\ + \simplex_smp_messages_notify_get_received " <> mshow _msgRecvGet <> "\n# msgRecvGet\n\ + \\n\ + \# HELP simplex_smp_messages_notify_get_errors Error events with messages with notification flag (client). \n\ + \# TYPE simplex_smp_messages_notify_get_errors counter\n\ + \simplex_smp_messages_notify_get_errors{type=\"nomsg\"} " <> mshow _msgGetNoMsg <> "\n# msgGetNoMsg\n\ + \simplex_smp_messages_notify_get_errors{type=\"auth\"} " <> mshow _msgGetAuth <> "\n# msgGetAuth\n\ + \simplex_smp_messages_notify_get_errors{type=\"duplicate\"} " <> mshow _msgGetDuplicate <> "\n# msgGetDuplicate\n\ + \simplex_smp_messages_notify_get_errors{type=\"prohibited\"} " <> mshow _msgGetProhibited <> "\n# msgGetProhibited\n\ + \\n\ + \# HELP simplex_smp_queues_notify_created Created queues with notification flag (client).\n\ + \# TYPE simplex_smp_queues_notify_created counter\n\ + \simplex_smp_queues_notify_created " <> mshow _ntfCreated <> "\n# ntfCreated\n\ + \\n\ + \# HELP simplex_smp_queues_notify_deleted Deleted queues with notification flag (client).\n\ + \# TYPE simplex_smp_queues_notify_deleted counter\n\ + \simplex_smp_queues_notify_deleted " <> mshow _ntfDeleted <> "\n# ntfDeleted\n\ + \\n\ + \# HELP simplex_smp_queues_notify_deleted_batch Deleted batched queues with notification flag (client).\n\ + \# TYPE simplex_smp_queues_notify_deleted_batch counter\n\ + \simplex_smp_queues_notify_deleted_batch " <> mshow _ntfDeletedB <> "\n# ntfDeletedB\n\ + \\n\ + \# HELP simplex_smp_queues_notify_total1 Total number of stored queues with notification flag (first type of count).\n\ + \# TYPE simplex_smp_queues_notify_total1 gauge\n\ + \simplex_smp_queues_notify_total1 " <> mshow _ntfCount <> "\n# ntfCount1\n\ + \\n\ + \# HELP simplex_smp_queues_notify_total2 Total number of stored queues with notification flag (second type of count).\n\ + \# TYPE simplex_smp_queues_notify_total2 gauge\n\ + \simplex_smp_queues_notify_total2 " <> mshow notifierCount <> "\n# ntfCount2\n\ + \\n" + ntfs = + "# Notifications (server)\n\ + \# ----------------------\n\ + \\n\ + \# HELP simplex_smp_messages_ntf_successes Successful events with notification messages (to ntf server). \n\ + \# TYPE simplex_smp_messages_ntf_successes counter\n\ + \simplex_smp_messages_ntf_successes " <> mshow _msgNtfs <> "\n# msgNtfs\n\ + \\n\ + \# HELP simplex_smp_messages_ntf_successes_batch Successful batched events with notification messages (to ntf server). \n\ + \# TYPE simplex_smp_messages_ntf_successes_batch counter\n\ + \simplex_smp_messages_ntf_successes_batch " <> mshow _msgNtfsB <> "\n# msgNtfsB\n\ + \\n\ + \# HELP simplex_smp_messages_ntf_errors Error events with notification messages (to ntf server). \n\ + \# TYPE simplex_smp_messages_ntf_errors counter\n\ + \simplex_smp_messages_ntf_errors{type=\"nosub\"} " <> mshow _msgNtfNoSub <> "\n# msgNtfNoSub\n\ + \simplex_smp_messages_ntf_errors{type=\"lost\"} " <> mshow _msgNtfLost <> "\n# msgNtfLost\n\ + \simplex_smp_messages_ntf_errors{type=\"expired\"} " <> mshow _msgNtfExpired <> "\n# msgNtfExpired\n\ + \\n\ + \# HELP simplex_smp_subscription_ntf_requests Subscription requests with notification flag (from ntf server). \n\ + \# TYPE simplex_smp_subscription_ntf_requests counter\n\ + \simplex_smp_subscription_ntf_requests " <> mshow _ntfSub <> "\n# ntfSub\n\ + \\n\ + \# HELP simplex_smp_subscription_ntf_requests_batch Batched subscription requests with notification flag (from ntf server). \n\ + \# TYPE simplex_smp_subscription_ntf_requests_batch counter\n\ + \simplex_smp_subscription_ntf_requests_batch " <> mshow _ntfSubB <> "\n# ntfSubB\n\ + \\n\ + \# HELP simplex_smp_subscribtion_ntf_errors Subscription errors with notification flag (from ntf server). \n\ + \# TYPE simplex_smp_subscribtion_ntf_errors counter\n\ + \simplex_smp_subscribtion_ntf_errors{type=\"auth\"} " <> mshow _ntfSubAuth <> "\n# ntfSubAuth\n\ + \simplex_smp_subscribtion_ntf_errors{type=\"duplicate\"} " <> mshow _ntfSubDuplicate <> "\n# ntfSubDuplicate\n\ + \\n" + relays = + "# Relays\n\ + \# ------\n\ + \\n\ + \# HELP simplex_smp_relay_sessions_requests Session requests through relay.\n\ + \# TYPE simplex_smp_relay_sessions_requests counter\n\ + \simplex_smp_relay_sessions_requests{source=\"all\"} " <> mshow (_pRequests _pRelays) <> "\n# pRelays_pRequests\n\ + \simplex_smp_relay_sessions_requests{source=\"own\"} " <> mshow (_pRequests _pRelaysOwn) <> "\n# pRelaysOwn_pRequests\n\ + \\n\ + \# HELP simplex_smp_relay_sessions_successes Successful session events through relay.\n\ + \# TYPE simplex_smp_relay_sessions_successes counter\n\ + \simplex_smp_relay_sessions_successes{source=\"all\"} " <> mshow (_pSuccesses _pRelays) <> "\n# pRelays_pSuccesses\n\ + \simplex_smp_relay_sessions_successes{source=\"own\"} " <> mshow (_pSuccesses _pRelaysOwn) <> "\n# pRelaysOwn_pSuccesses\n\ + \\n\ + \# HELP simplex_smp_relay_sessions_errors Error session events through relay.\n\ + \# TYPE simplex_smp_relay_sessions_errors counter\n\ + \simplex_smp_relay_sessions_errors{source=\"all\",type=\"connect\"} " <> mshow (_pErrorsConnect _pRelays) <> "\n# pRelays_pErrorsConnect\n\ + \simplex_smp_relay_sessions_errors{source=\"all\",type=\"compat\"} " <> mshow (_pErrorsCompat _pRelays) <> "\n# pRelays_pErrorsCompat\n\ + \simplex_smp_relay_sessions_errors{source=\"all\",type=\"other\"} " <> mshow (_pErrorsOther _pRelays) <> "\n# pRelays_pErrorsOther\n\ + \simplex_smp_relay_sessions_errors{source=\"own\",type=\"connect\"} " <> mshow (_pErrorsConnect _pRelaysOwn) <> "\n# pRelaysOwn_pErrorsConnect\n\ + \simplex_smp_relay_sessions_errors{source=\"own\",type=\"compat\"} " <> mshow (_pErrorsCompat _pRelaysOwn) <> "\n# pRelaysOwn_pErrorsCompat\n\ + \simplex_smp_relay_sessions_errors{source=\"own\",type=\"other\"} " <> mshow (_pErrorsOther _pRelaysOwn) <> "\n# pRelaysOwn_pErrorsOther\n\ + \\n\ + \# HELP simplex_smp_relay_messages_requests Message requests sent through relay.\n\ + \# TYPE simplex_smp_relay_messages_requests counter\n\ + \simplex_smp_relay_messages_requests{source=\"all\"} " <> mshow (_pRequests _pMsgFwds) <> "\n# pMsgFwds_pRequests\n\ + \simplex_smp_relay_messages_requests{source=\"own\"} " <> mshow (_pRequests _pMsgFwdsOwn) <> "\n# pMsgFwdsOwn_pRequests\n\ + \\n\ + \# HELP simplex_smp_relay_messages_successes Successful messages sent through relay.\n\ + \# TYPE simplex_smp_relay_messages_successes counter\n\ + \simplex_smp_relay_messages_successes{source=\"all\"} " <> mshow (_pSuccesses _pMsgFwds) <> "\n# pMsgFwds_pSuccesses\n\ + \simplex_smp_relay_messages_successes{source=\"own\"} " <> mshow (_pSuccesses _pMsgFwdsOwn) <> "\n# pMsgFwdsOwn_pSuccesses\n\ + \\n\ + \# HELP simplex_smp_relay_messages_errors Error events with messages sent through relay.\n\ + \# TYPE simplex_smp_relay_messages_errors counter\n\ + \simplex_smp_relay_messages_errors{source=\"all\",type=\"connect\"} " <> mshow (_pErrorsConnect _pMsgFwds) <> "\n# pMsgFwds_pErrorsConnect\n\ + \simplex_smp_relay_messages_errors{source=\"all\",type=\"compat\"} " <> mshow (_pErrorsCompat _pMsgFwds) <> "\n# pMsgFwds_pErrorsCompat\n\ + \simplex_smp_relay_messages_errors{source=\"all\",type=\"other\"} " <> mshow (_pErrorsOther _pMsgFwds) <> "\n# pMsgFwds_pErrorsOther\n\ + \simplex_smp_relay_messages_errors{source=\"own\",type=\"connect\"} " <> mshow (_pErrorsConnect _pMsgFwdsOwn) <> "\n# pMsgFwdsOwn_pErrorsConnect\n\ + \simplex_smp_relay_messages_errors{source=\"own\",type=\"compat\"} " <> mshow (_pErrorsCompat _pMsgFwdsOwn) <> "\n# pMsgFwdsOwn_pErrorsCompat\n\ + \simplex_smp_relay_messages_errors{source=\"own\",type=\"other\"} " <> mshow (_pErrorsOther _pMsgFwdsOwn) <> "\n# pMsgFwdsOwn_pErrorsOther\n\ + \\n\ + \# HELP simplex_smp_relay_messages_received Relay messages statistics.\n\ + \# TYPE simplex_smp_relay_messages_received counter\n\ + \simplex_smp_relay_messages_received " <> mshow _pMsgFwdsRecv <> "\n# pMsgFwdsRecv\n\ + \\n" + info = + "# Info\n\ + \# ----\n\ + \\n" + <> socketsMetric socketsAccepted "simplex_smp_sockets_accepted" "Accepted sockets" + <> socketsMetric socketsClosed "simplex_smp_sockets_closed" "Closed sockets" + <> socketsMetric socketsActive "simplex_smp_sockets_active" "Active sockets" + <> socketsMetric socketsLeaked "simplex_smp_sockets_leaked" "Leaked sockets" + <> "# HELP simplex_smp_threads_total Threads\n\ + \# TYPE simplex_smp_threads_total gauge\n\ + \simplex_smp_threads_total " <> mshow threadsCount <> "\n\ + \\n\ + \# HELP simplex_smp_clients_total Clients\n\ + \# TYPE simplex_smp_clients_total gauge\n\ + \simplex_smp_clients_total " <> mshow clientsCount <> "\n\ + \\n\ + \# HELP simplex_smp_subscribtion_total Total subscriptions\n\ + \# TYPE simplex_smp_subscribtion_total gauge\n\ + \simplex_smp_subscribtion_total " <> mshow smpSubsCount <> "\n# smpSubs\n\ + \\n\ + \# HELP simplex_smp_subscribtion_clients_total Subscribed clients, first counting method\n\ + \# TYPE simplex_smp_subscribtion_clients_total gauge\n\ + \simplex_smp_subscribtion_clients_total " <> mshow smpSubClientsCount <> "\n# smpSubClients\n\ + \\n\ + \# HELP simplex_smp_subscription_ntf_total Total notification subscripbtions (from ntf server)\n\ + \# TYPE simplex_smp_subscription_ntf_total gauge\n\ + \simplex_smp_subscription_ntf_total " <> mshow ntfSubsCount <> "\n# ntfSubs\n\ + \\n\ + \# HELP simplex_smp_subscription_ntf_clients_total Total subscribed NTF servers, first counting method\n\ + \# TYPE simplex_smp_subscription_ntf_clients_total gauge\n\ + \simplex_smp_subscription_ntf_clients_total " <> mshow ntfSubClientsCount <> "\n# ntfSubClients\n" + socketsMetric :: (SocketStats -> Int) -> Text -> Text -> Text + socketsMetric sel metric descr = + "# HELP " <> metric <> " " <> descr <> "\n" + <> "# TYPE " <> metric <> " gauge\n" + <> T.concat (map (\(port, ss) -> metric <> "{port=\"" <> T.pack port <> "\"} " <> mshow (sel ss) <> "\n") socketStats) + <> "\n" + mstr a = T.pack a <> " " <> tsEpoch + mshow :: Show a => a -> Text + mshow = mstr . show + tsEpoch = T.pack $ show @Int64 $ floor @Double $ realToFrac (ts `diffUTCTime` epoch) * 1000 + epoch = UTCTime systemEpochDay 0 +{-# FOURMOLU_ENABLE\n#-} diff --git a/src/Simplex/Messaging/Server/Stats.hs b/src/Simplex/Messaging/Server/Stats.hs index 385ba119b..b384ad9b9 100644 --- a/src/Simplex/Messaging/Server/Stats.hs +++ b/src/Simplex/Messaging/Server/Stats.hs @@ -638,6 +638,14 @@ data PeriodStatCounts = PeriodStatCounts monthCount :: String } +periodStatDataCounts :: PeriodStatsData -> PeriodStatCounts +periodStatDataCounts PeriodStatsData {_day, _week, _month} = + PeriodStatCounts + { dayCount = show $ IS.size _day, + weekCount = show $ IS.size _week, + monthCount = show $ IS.size _month + } + periodStatCounts :: PeriodStats -> UTCTime -> IO PeriodStatCounts periodStatCounts ps ts = do let d = utctDay ts diff --git a/src/Simplex/Messaging/Transport/Server.hs b/src/Simplex/Messaging/Transport/Server.hs index 8913ba0c3..95afb5947 100644 --- a/src/Simplex/Messaging/Transport/Server.hs +++ b/src/Simplex/Messaging/Transport/Server.hs @@ -13,6 +13,7 @@ module Simplex.Messaging.Transport.Server runTransportServerState_, SocketState, newSocketState, + getSocketStats, runTransportServer, runTransportServerSocket, runLocalTCPServer, @@ -43,6 +44,7 @@ import Foreign.C.Error import GHC.IO.Exception (ioe_errno) import Network.Socket import qualified Network.TLS as T +import Simplex.Messaging.Server.Prometheus import Simplex.Messaging.Transport import Simplex.Messaging.Util (catchAll_, labelMyThread, tshow) import System.Exit (exitFailure) @@ -166,6 +168,14 @@ type SocketState = (TVar Int, TVar Int, TVar (IntMap (Weak ThreadId))) newSocketState :: IO SocketState newSocketState = (,,) <$> newTVarIO 0 <*> newTVarIO 0 <*> newTVarIO mempty +getSocketStats :: SocketState -> IO SocketStats +getSocketStats (accepted, closed, active) = do + socketsAccepted <- readTVarIO accepted + socketsClosed <- readTVarIO closed + socketsActive <- IM.size <$> readTVarIO active + let socketsLeaked = socketsAccepted - socketsClosed - socketsActive + pure SocketStats {socketsAccepted, socketsClosed, socketsActive, socketsLeaked} + closeServer :: TMVar Bool -> TVar (IntMap (Weak ThreadId)) -> Socket -> IO () closeServer started clients sock = do close sock diff --git a/tests/SMPClient.hs b/tests/SMPClient.hs index d658c30a6..5f7935cd9 100644 --- a/tests/SMPClient.hs +++ b/tests/SMPClient.hs @@ -79,6 +79,9 @@ testStoreNtfsFile = "tests/tmp/smp-server-ntfs.log" testStoreNtfsFile2 :: FilePath testStoreNtfsFile2 = "tests/tmp/smp-server-ntfs.log.2" +testPrometheusMetricsFile :: FilePath +testPrometheusMetricsFile = "tests/tmp/smp-server-metrics.txt" + testServerStatsBackupFile :: FilePath testServerStatsBackupFile = "tests/tmp/smp-server-stats.log" @@ -141,8 +144,10 @@ cfgMS msType = inactiveClientExpiration = Just defaultInactiveClientExpiration, logStatsInterval = Nothing, logStatsStartTime = 0, - serverStatsLogFile = "tests/smp-server-stats.daily.log", + serverStatsLogFile = "tests/tmp/smp-server-stats.daily.log", serverStatsBackupFile = Nothing, + prometheusInterval = Nothing, + prometheusMetricsFile = testPrometheusMetricsFile, pendingENDInterval = 500000, ntfDeliveryInterval = 200000, smpCredentials = diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index 744ceb437..bdc5f4dc3 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -46,7 +46,7 @@ import Simplex.Messaging.Server.StoreLog (closeStoreLog) import Simplex.Messaging.Transport import Simplex.Messaging.Util (whenM) import Simplex.Messaging.Version (mkVersionRange) -import System.Directory (doesDirectoryExist, removeDirectoryRecursive, removeFile) +import System.Directory (doesDirectoryExist, doesFileExist, removeDirectoryRecursive, removeFile) import System.TimeIt (timeItT) import System.Timeout import Test.HUnit @@ -71,6 +71,7 @@ serverTests = do describe "Store log" testWithStoreLog describe "Restore messages" testRestoreMessages describe "Restore messages (old / v2)" testRestoreExpireMessages + describe "Save prometheus metrics" testPrometheusMetrics describe "Timing of AUTH error" testTiming describe "Message notifications" testMessageNotifications describe "Message expiration" $ do @@ -825,6 +826,13 @@ testRestoreExpireMessages = runClient :: Transport c => TProxy c -> (THandleSMP c 'TClient -> IO ()) -> Expectation runClient _ test' = testSMPClient test' `shouldReturn` () +testPrometheusMetrics :: SpecWith (ATransport, AMSType) +testPrometheusMetrics = + it "should save Prometheus metrics" $ \(at, msType) -> do + let cfg' = (cfgMS msType) {prometheusInterval = Just 1} + withSmpServerConfigOn at cfg' testPort $ \_ -> threadDelay 1000000 + doesFileExist testPrometheusMetricsFile `shouldReturn` True + createAndSecureQueue :: Transport c => THandleSMP c 'TClient -> SndPublicAuthKey -> IO (SenderId, RecipientId, RcvPrivateAuthKey, RcvDhSecret) createAndSecureQueue h sPub = do g <- C.newRandom From 77a5ed2ec69fd00cc7cc9a1a65396a21e743f344 Mon Sep 17 00:00:00 2001 From: sh <37271604+shumvgolove@users.noreply.github.com> Date: Thu, 19 Dec 2024 20:45:17 +0000 Subject: [PATCH 3/3] 6.2.1.0 (#1424) --- simplexmq.cabal | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/simplexmq.cabal b/simplexmq.cabal index e722ef4ee..0e610933f 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -1,7 +1,7 @@ cabal-version: 1.12 name: simplexmq -version: 6.2.0.7 +version: 6.2.1.0 synopsis: SimpleXMQ message broker description: This package includes <./docs/Simplex-Messaging-Server.html server>, <./docs/Simplex-Messaging-Client.html client> and