Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xftp server: use recipient ID in control port to delete and block files, smp server: fix version negotiation #1434

Merged
merged 20 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion simplexmq.cabal
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
cabal-version: 1.12

name: simplexmq
version: 6.3.0.1
version: 6.3.0.105
synopsis: SimpleXMQ message broker
description: This package includes <./docs/Simplex-Messaging-Server.html server>,
<./docs/Simplex-Messaging-Client.html client> and
Expand Down
16 changes: 8 additions & 8 deletions src/Simplex/FileTransfer/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -287,13 +287,13 @@ xftpServer cfg@XFTPServerConfig {xftpPort, transportConfig, inactiveClientExpira
CPDelete fileId -> withUserRole $ unliftIO u $ do
fs <- asks store
r <- runExceptT $ do
(fr, _) <- ExceptT $ atomically $ getFile fs SFSender fileId
(fr, _) <- ExceptT $ atomically $ getFile fs SFRecipient fileId
ExceptT $ deleteServerFile_ fr
liftIO . hPutStrLn h $ either (\e -> "error: " <> show e) (\() -> "ok") r
CPBlock fileId info -> withUserRole $ unliftIO u $ do
fs <- asks store
r <- runExceptT $ do
(fr, _) <- ExceptT $ atomically $ getFile fs SFSender fileId
(fr, _) <- ExceptT $ atomically $ getFile fs SFRecipient fileId
ExceptT $ blockServerFile fr info
liftIO . hPutStrLn h $ either (\e -> "error: " <> show e) (\() -> "ok") r
CPHelp -> hPutStrLn h "commands: stats-rts, delete, help, quit"
Expand Down Expand Up @@ -540,12 +540,12 @@ blockServerFile fr@FileRec {senderId} info = do

deleteOrBlockServerFile_ :: FileRec -> (FileServerStats -> IORef Int) -> (FileStore -> STM (Either XFTPErrorType ())) -> M (Either XFTPErrorType ())
deleteOrBlockServerFile_ FileRec {filePath, fileInfo} stat storeAction = runExceptT $ do
path <- readTVarIO filePath
stats <- asks serverStats
ExceptT $ first (\(_ :: SomeException) -> FILE_IO) <$> try (forM_ path $ \p -> whenM (doesFileExist p) (removeFile p >> deletedStats stats))
st <- asks store
void $ atomically $ storeAction st
lift $ incFileStat stat
path <- readTVarIO filePath
stats <- asks serverStats
ExceptT $ first (\(_ :: SomeException) -> FILE_IO) <$> try (forM_ path $ \p -> whenM (doesFileExist p) (removeFile p >> deletedStats stats))
st <- asks store
void $ atomically $ storeAction st
lift $ incFileStat stat
where
deletedStats stats = do
liftIO $ atomicModifyIORef'_ (filesCount stats) (subtract 1)
Expand Down
4 changes: 2 additions & 2 deletions src/Simplex/FileTransfer/Transport.hs
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ supportedFileServerVRange :: VersionRangeXFTP
supportedFileServerVRange = mkVersionRange initialXFTPVersion currentXFTPVersion

-- XFTP protocol does not use this handshake method
xftpClientHandshakeStub :: c -> Maybe C.KeyPairX25519 -> C.KeyHash -> VersionRangeXFTP -> ExceptT TransportError IO (THandle XFTPVersion c 'TClient)
xftpClientHandshakeStub _c _ks _keyHash _xftpVRange = throwE TEVersion
xftpClientHandshakeStub :: c -> Maybe C.KeyPairX25519 -> C.KeyHash -> VersionRangeXFTP -> Bool -> ExceptT TransportError IO (THandle XFTPVersion c 'TClient)
xftpClientHandshakeStub _c _ks _keyHash _xftpVRange _proxyServer = throwE TEVersion

supportedXFTPhandshakes :: [ALPN]
supportedXFTPhandshakes = ["xftp/1"]
Expand Down
22 changes: 14 additions & 8 deletions src/Simplex/Messaging/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ data PClient v err msg = PClient
timeoutErrorCount :: TVar Int,
clientCorrId :: TVar ChaChaDRG,
sentCommands :: TMap CorrId (Request err msg),
sndQ :: TBQueue (Maybe (TVar Bool), ByteString),
sndQ :: TBQueue (Maybe (Request err msg), ByteString),
rcvQ :: TBQueue (NonEmpty (SignedTransmission err msg)),
msgQ :: Maybe (TBQueue (ServerTransmissionBatch v err msg))
}
Expand Down Expand Up @@ -406,6 +406,8 @@ data ProtocolClientConfig v = ProtocolClientConfig
serverVRange :: VersionRange v,
-- | agree shared session secret (used in SMP proxy for additional encryption layer)
agreeSecret :: Bool,
-- | Whether connecting client is a proxy server. See comment in ClientHandshake
proxyServer :: Bool,
-- | send SNI to server, False for SMP
useSNI :: Bool
}
Expand All @@ -420,6 +422,7 @@ defaultClientConfig clientALPN useSNI serverVRange =
clientALPN,
serverVRange,
agreeSecret = False,
proxyServer = False,
useSNI
}
{-# INLINE defaultClientConfig #-}
Expand Down Expand Up @@ -489,7 +492,7 @@ type TransportSession msg = (UserId, ProtoServer msg, Maybe ByteString)
-- A single queue can be used for multiple 'SMPClient' instances,
-- as 'SMPServerTransmission' includes server information.
getProtocolClient :: forall v err msg. Protocol v err msg => TVar ChaChaDRG -> TransportSession msg -> ProtocolClientConfig v -> Maybe (TBQueue (ServerTransmissionBatch v err msg)) -> UTCTime -> (ProtocolClient v err msg -> IO ()) -> IO (Either (ProtocolClientError err) (ProtocolClient v err msg))
getProtocolClient g transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize, networkConfig, clientALPN, serverVRange, agreeSecret, useSNI} msgQ proxySessTs disconnected = do
getProtocolClient g transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize, networkConfig, clientALPN, serverVRange, agreeSecret, proxyServer, useSNI} msgQ proxySessTs disconnected = do
case chooseTransportHost networkConfig (host srv) of
Right useHost ->
(getCurrentTime >>= mkProtocolClient useHost >>= runClient useTransport useHost)
Expand Down Expand Up @@ -548,7 +551,7 @@ getProtocolClient g transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize
client :: forall c. Transport c => TProxy c -> PClient v err msg -> TMVar (Either (ProtocolClientError err) (ProtocolClient v err msg)) -> c -> IO ()
client _ c cVar h = do
ks <- if agreeSecret then Just <$> atomically (C.generateKeyPair g) else pure Nothing
runExceptT (protocolClientHandshake @v @err @msg h ks (keyHash srv) serverVRange) >>= \case
runExceptT (protocolClientHandshake @v @err @msg h ks (keyHash srv) serverVRange proxyServer) >>= \case
Left e -> atomically . putTMVar cVar . Left $ PCETransportError e
Right th@THandle {params} -> do
sessionTs <- getCurrentTime
Expand All @@ -563,9 +566,12 @@ getProtocolClient g transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize
send :: Transport c => ProtocolClient v err msg -> THandle v c 'TClient -> IO ()
send ProtocolClient {client_ = PClient {sndQ}} h = forever $ atomically (readTBQueue sndQ) >>= sendPending
where
sendPending (Nothing, s) = send_ s
sendPending (Just pending, s) = whenM (readTVarIO pending) $ send_ s
send_ = void . tPutLog h
sendPending (r, s) = case r of
Nothing -> void $ tPutLog h s
Just Request {pending, responseVar} ->
whenM (readTVarIO pending) $ tPutLog h s >>= either responseErr pure
where
responseErr = atomically . putTMVar responseVar . Left . PCETransportError

receive :: Transport c => ProtocolClient v err msg -> THandle v c 'TClient -> IO ()
receive ProtocolClient {client_ = PClient {rcvQ, lastReceived, timeoutErrorCount}} h = forever $ do
Expand Down Expand Up @@ -1101,12 +1107,12 @@ sendProtocolCommand_ c@ProtocolClient {client_ = PClient {sndQ}, thParams = THan
where
-- two separate "atomically" needed to avoid blocking
sendRecv :: Either TransportError SentRawTransmission -> Request err msg -> IO (Either (ProtocolClientError err) msg)
sendRecv t_ r@Request {pending} = case t_ of
sendRecv t_ r = case t_ of
Left e -> pure . Left $ PCETransportError e
Right t
| B.length s > blockSize - 2 -> pure . Left $ PCETransportError TELargeMsg
| otherwise -> do
atomically $ writeTBQueue sndQ (Just pending, s)
atomically $ writeTBQueue sndQ (Just r, s)
response <$> getResponse c tOut r
where
s
Expand Down
4 changes: 2 additions & 2 deletions src/Simplex/Messaging/Notifications/Transport.hs
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ ntfServerHandshake serverSignKey c (k, pk) kh ntfVRange = do
Nothing -> throwE TEVersion

-- | Notifcations server client transport handshake.
ntfClientHandshake :: forall c. Transport c => c -> C.KeyHash -> VersionRangeNTF -> ExceptT TransportError IO (THandleNTF c 'TClient)
ntfClientHandshake c keyHash ntfVRange = do
ntfClientHandshake :: forall c. Transport c => c -> C.KeyHash -> VersionRangeNTF -> Bool -> ExceptT TransportError IO (THandleNTF c 'TClient)
ntfClientHandshake c keyHash ntfVRange _proxyServer = do
let th@THandle {params = THandleParams {sessionId}} = ntfTHandle c
NtfServerHandshake {sessionId = sessId, ntfVersionRange, authPubKey = sk'} <- getHandshake th
if sessionId /= sessId
Expand Down
14 changes: 5 additions & 9 deletions src/Simplex/Messaging/Protocol.hs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ supportedSMPClientVRange = mkVersionRange initialSMPClientVersion currentSMPClie
-- TODO v6.0 remove dependency on version
maxMessageLength :: VersionSMP -> Int
maxMessageLength v
| v >= encryptedBlockSMPVersion = 16048 -- max 16051
| v >= encryptedBlockSMPVersion = 16048 -- max 16048
| v >= sendingProxySMPVersion = 16064 -- max 16067
| otherwise = 16088 -- 16048 - always use this size to determine allowed ranges

Expand Down Expand Up @@ -1343,7 +1343,7 @@ transmissionP THandleParams {sessionId, implySessId} = do
class (ProtocolTypeI (ProtoType msg), ProtocolEncoding v err msg, ProtocolEncoding v err (ProtoCommand msg), Show err, Show msg) => Protocol v err msg | msg -> v, msg -> err where
type ProtoCommand msg = cmd | cmd -> msg
type ProtoType msg = (sch :: ProtocolType) | sch -> msg
protocolClientHandshake :: forall c. Transport c => c -> Maybe C.KeyPairX25519 -> C.KeyHash -> VersionRange v -> ExceptT TransportError IO (THandle v c 'TClient)
protocolClientHandshake :: forall c. Transport c => c -> Maybe C.KeyPairX25519 -> C.KeyHash -> VersionRange v -> Bool -> ExceptT TransportError IO (THandle v c 'TClient)
protocolPing :: ProtoCommand msg
protocolError :: msg -> Maybe err

Expand All @@ -1370,9 +1370,7 @@ instance PartyI p => ProtocolEncoding SMPVersion ErrorType (Command p) where
encodeProtocol v = \case
NEW rKey dhKey auth_ subMode sndSecure
| v >= sndAuthKeySMPVersion -> new <> e (auth_, subMode, sndSecure)
| v >= subModeSMPVersion -> new <> auth <> e subMode
| v == basicAuthSMPVersion -> new <> auth
| otherwise -> new
| otherwise -> new <> auth <> e subMode
where
new = e (NEW_, ' ', rKey, dhKey)
auth = maybe "" (e . ('A',)) auth_
Expand Down Expand Up @@ -1441,9 +1439,7 @@ instance ProtocolEncoding SMPVersion ErrorType Cmd where
Cmd SRecipient <$> case tag of
NEW_
| v >= sndAuthKeySMPVersion -> new <*> smpP <*> smpP <*> smpP
| v >= subModeSMPVersion -> new <*> auth <*> smpP <*> pure False
| v == basicAuthSMPVersion -> new <*> auth <*> pure SMSubscribe <*> pure False
| otherwise -> new <*> pure Nothing <*> pure SMSubscribe <*> pure False
| otherwise -> new <*> auth <*> smpP <*> pure False
where
new = NEW <$> _smpP <*> smpP
auth = optional (A.char 'A' *> smpP)
Expand Down Expand Up @@ -1495,7 +1491,7 @@ instance ProtocolEncoding SMPVersion ErrorType BrokerMsg where
INFO info -> e (INFO_, ' ', info)
OK -> e OK_
ERR err -> case err of
BLOCKED _ | v < blockedEntityErrorSMPVersion -> e (ERR_, ' ', AUTH)
BLOCKED _ | v < blockedEntitySMPVersion -> e (ERR_, ' ', AUTH)
_ -> e (ERR_, ' ', err)
PONG -> e PONG_
where
Expand Down
6 changes: 3 additions & 3 deletions src/Simplex/Messaging/Server/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,10 @@ import Simplex.Messaging.Server.Information
import Simplex.Messaging.Server.MsgStore.Journal (JournalStoreConfig (..))
import Simplex.Messaging.Server.MsgStore.Types (AMSType (..), SMSType (..), newMsgStore)
import Simplex.Messaging.Server.QueueStore.STM (readQueueStore)
import Simplex.Messaging.Transport (batchCmdsSMPVersion, currentServerSMPRelayVersion, simplexMQVersion, supportedServerSMPRelayVRange)
import Simplex.Messaging.Transport (simplexMQVersion, supportedProxyClientSMPRelayVRange, supportedServerSMPRelayVRange)
import Simplex.Messaging.Transport.Client (SocksProxy, TransportHost (..), defaultSocksProxy)
import Simplex.Messaging.Transport.Server (ServerCredentials (..), TransportServerConfig (..), defaultTransportServerConfig)
import Simplex.Messaging.Util (eitherToMaybe, ifM, safeDecodeUtf8, tshow)
import Simplex.Messaging.Version (mkVersionRange)
import System.Directory (createDirectoryIfMissing, doesDirectoryExist, doesFileExist)
import System.Exit (exitFailure)
import System.FilePath (combine)
Expand Down Expand Up @@ -447,8 +446,9 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
defaultSMPClientAgentConfig
{ smpCfg =
(smpCfg defaultSMPClientAgentConfig)
{ serverVRange = mkVersionRange batchCmdsSMPVersion currentServerSMPRelayVersion,
{ serverVRange = supportedProxyClientSMPRelayVRange,
agreeSecret = True,
proxyServer = True,
networkConfig =
defaultNetworkConfig
{ socksProxy = either error id <$!> strDecodeIni "PROXY" "socks_proxy" ini,
Expand Down
4 changes: 2 additions & 2 deletions src/Simplex/Messaging/Server/Prometheus.hs
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ prometheusMetrics sm rtm ts =
\simplex_smp_queues_deleted{type=\"new\"} " <> mshow _qDeletedNew <> "\n# qDeletedNew\n\
\simplex_smp_queues_deleted{type=\"secured\"} " <> mshow _qDeletedSecured <> "\n# qDeletedSecured\n\
\\n\
\# HELP simplex_smp_queues_deleted Deleted queues\n\
\# TYPE simplex_smp_queues_deleted counter\n\
\# HELP simplex_smp_queues_blocked Deleted queues\n\
\# TYPE simplex_smp_queues_blocked counter\n\
\simplex_smp_queues_blocked " <> mshow _qBlocked <> "\n# qBlocked\n\
\\n\
\# HELP simplex_smp_queues_deleted_batch Batched requests to delete queues\n\
Expand Down
Loading
Loading