From 91d334f8a96257783416abd773cb95f45007731b Mon Sep 17 00:00:00 2001 From: Armando Santos Date: Mon, 12 Aug 2024 15:24:03 +0100 Subject: [PATCH] Write TxSubmission V2 simulation test This new test generates an `TxsubmissionV2State` and passes it to the new tx submission protocol. It tests multiple pairs of client server, where the server instances share state. The test then checks for the same semantics/behavior as the old V1 but now for all results from all pairs client/server. - Adds debugTracer - Fixes arbitrary instance of `ArbTxDecisionPolicy` --- .../Ouroboros/Network/TxSubmission/Common.hs | 13 +- .../Network/TxSubmission/TxSubmissionV2.hs | 372 +++++++++++------- 2 files changed, 233 insertions(+), 152 deletions(-) diff --git a/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/TxSubmission/Common.hs b/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/TxSubmission/Common.hs index 380c05028db..406c558eed6 100644 --- a/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/TxSubmission/Common.hs +++ b/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/TxSubmission/Common.hs @@ -303,6 +303,9 @@ verboseTracer :: forall a m. => Tracer m a verboseTracer = threadAndTimeTracer $ showTracing $ Tracer say +debugTracer :: forall a s. Show a => Tracer (IOSim s) a +debugTracer = threadAndTimeTracer $ showTracing $ Tracer (traceM . show) + threadAndTimeTracer :: forall a m. ( MonadAsync m , MonadDelay m @@ -1118,8 +1121,8 @@ instance Arbitrary ArbTxDecisionPolicy where arbitrary = ArbTxDecisionPolicy . fixupTxDecisionPolicy <$> ( TxDecisionPolicy - <$> (getSmall <$> arbitrary) - <*> (getSmall <$> arbitrary) + <$> (getSmall . getPositive <$> arbitrary) + <*> (getSmall . getPositive <$> arbitrary) <*> (SizeInBytes . getPositive <$> arbitrary) <*> (SizeInBytes . getPositive <$> arbitrary) <*> (getPositive <$> arbitrary)) @@ -1130,17 +1133,17 @@ instance Arbitrary ArbTxDecisionPolicy where maxTxsSizeInflight, txInflightMultiplicity }) = [ ArbTxDecisionPolicy a { maxNumTxIdsToRequest = NumTxIdsToReq x } - | x <- shrink (getNumTxIdsToReq maxNumTxIdsToRequest) + | (Positive x) <- shrink (Positive (getNumTxIdsToReq maxNumTxIdsToRequest)) ] ++ [ ArbTxDecisionPolicy . fixupTxDecisionPolicy $ a { txsSizeInflightPerPeer = SizeInBytes s } - | s <- shrink (getSizeInBytes txsSizeInflightPerPeer) + | Positive s <- shrink (Positive (getSizeInBytes txsSizeInflightPerPeer)) ] ++ [ ArbTxDecisionPolicy . fixupTxDecisionPolicy $ a { maxTxsSizeInflight = SizeInBytes s } - | s <- shrink (getSizeInBytes maxTxsSizeInflight) + | Positive s <- shrink (Positive (getSizeInBytes maxTxsSizeInflight)) ] ++ [ ArbTxDecisionPolicy . fixupTxDecisionPolicy diff --git a/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/TxSubmission/TxSubmissionV2.hs b/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/TxSubmission/TxSubmissionV2.hs index d3a950204b7..f32057cd4c6 100644 --- a/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/TxSubmission/TxSubmissionV2.hs +++ b/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/TxSubmission/TxSubmissionV2.hs @@ -27,7 +27,7 @@ import Control.Monad.Class.MonadThrow import Control.Monad.Class.MonadTime.SI import Control.Monad.Class.MonadTimer.SI import Control.Monad.IOSim hiding (SimResult) -import Control.Tracer (Tracer (..), contramap, nullTracer) +import Control.Tracer (Tracer (..), contramap) import Data.ByteString.Lazy (ByteString) @@ -37,7 +37,6 @@ import Data.List (intercalate, nubBy) import Data.Map.Strict (Map) import Data.Map.Strict qualified as Map import Data.Maybe (fromMaybe) -import Data.Word (Word16) import Ouroboros.Network.Channel @@ -52,7 +51,7 @@ import Ouroboros.Network.TxSubmission.Inbound.Policy import Ouroboros.Network.TxSubmission.Outbound import Ouroboros.Network.Util.ShowProxy -import Ouroboros.Network.Testing.Utils +import Ouroboros.Network.Testing.Utils hiding (debugTracer) import Test.QuickCheck import Test.Tasty (TestTree, testGroup) @@ -62,9 +61,9 @@ import Control.Concurrent.Class.MonadMVar.Strict qualified as Strict import Control.Concurrent.Class.MonadSTM.Strict (StrictTVar) import Control.Concurrent.Class.MonadSTM.Strict qualified as Strict import Control.Monad (forM) +import Data.Foldable (traverse_) import Data.Void (Void) import Ouroboros.Network.DeltaQ (PeerGSV) -import Ouroboros.Network.Snocket (TestAddress (..)) import Ouroboros.Network.TxSubmission.Inbound.Registry import Ouroboros.Network.TxSubmission.Inbound.Server (txSubmissionInboundV2) import Ouroboros.Network.TxSubmission.Inbound.State @@ -76,8 +75,44 @@ tests = testGroup "Ouroboros.Network.TxSubmission.TxSubmissionV2" [ testProperty "txSubmission" prop_txSubmission ] +data TxSubmissionV2State = + TxSubmissionV2State { + peerMap :: Map Int ( [Tx Int] + , Maybe (Positive SmallDelay) + , Maybe (Positive SmallDelay) + -- ^ The delay must be smaller (<) than 5s, so that overall + -- delay is less than 10s, otherwise 'smallDelay' in + -- 'timeLimitsTxSubmission2' will kick in. + ) + , decisionPolicy :: TxDecisionPolicy + } deriving (Show) + +instance Arbitrary TxSubmissionV2State where + arbitrary = do + ArbTxDecisionPolicy decisionPolicy <- arbitrary + peersN <- choose (1, 10) + txsN <- choose (1, 10) + txs <- divvy txsN . nubBy (on (==) getTxId) <$> vectorOf (peersN * txsN) arbitrary + peers <- vectorOf peersN arbitrary + peersState <- map (\(a, (b, c)) -> (a, b, c)) + . zip txs + <$> vectorOf peersN arbitrary + return (TxSubmissionV2State (Map.fromList (zip peers peersState)) decisionPolicy) + shrink (TxSubmissionV2State peerMap decisionPolicy) = + TxSubmissionV2State <$> shrinkMap1 peerMap + <*> [ policy + | ArbTxDecisionPolicy policy <- shrink (ArbTxDecisionPolicy decisionPolicy) + ] + where + shrinkMap1 :: (Ord k, Arbitrary k, Arbitrary v) => Map k v -> [Map k v] + shrinkMap1 m + | Map.size m <= 1 = [m] + | otherwise = [Map.delete k m | k <- Map.keys m] ++ singletonMaps + where + singletonMaps = [Map.singleton k v | (k, v) <- Map.toList m] + txSubmissionSimulation - :: forall m txid. + :: forall m peeraddr txid. ( MonadAsync m , MonadDelay m , MonadFork m @@ -94,110 +129,105 @@ txSubmissionSimulation , Eq txid , ShowProxy txid , NoThunks (Tx txid) + , Show peeraddr + , Ord peeraddr , txid ~ Int ) => Tracer m (String, TraceSendRecv (TxSubmission2 txid (Tx txid))) - -> Tracer m (DebugSharedTxState (TestAddress Int) txid (Tx txid)) - -> NumTxIdsToAck - -> [Tx txid] - -> ControlMessageSTM m - -> Maybe DiffTime - -> Maybe DiffTime - -> m ([Tx txid], [Tx txid]) -txSubmissionSimulation tracer tracerDST maxUnacked outboundTxs - controlMessageSTM - inboundDelay outboundDelay = do - - inboundMempool <- emptyMempool - outboundMempool <- newMempool outboundTxs - (outboundChannel, inboundChannel) <- createConnectedChannels + -> Tracer m (DebugSharedTxState peeraddr txid (Tx txid)) + -> Map peeraddr ( [Tx txid] + , ControlMessageSTM m + , Maybe DiffTime + , Maybe DiffTime + ) + -> TxDecisionPolicy + -> m ([Tx txid], [[Tx txid]]) +txSubmissionSimulation tracer tracerDST state txDecisionPolicy = do + + state' <- traverse (\(b, c, d, e) -> do + mempool <- newMempool b + (outChannel, inChannel) <- createConnectedChannels + return (mempool, c, d, e, outChannel, inChannel) + ) state + + inboundMempool <- emptyMempool txChannelsMVar <- Strict.newMVar (TxChannels Map.empty) sharedTxStateVar <- newSharedTxStateVar gsvVar <- Strict.newTVarIO Map.empty - asyncs <- runTxSubmission [(TestAddress 0, outboundChannel, inboundChannel)] - txChannelsMVar - sharedTxStateVar - (outboundMempool, inboundMempool) - gsvVar - undefined - (pure . snd) + runTxSubmission state' + txChannelsMVar + sharedTxStateVar + inboundMempool + gsvVar + (\(a, as) -> do + _ <- waitAnyCancel as + cancel a - _ <- waitAnyCancel asyncs + inmp <- readMempool inboundMempool + outmp <- forM (Map.elems state') + (\(outMempool, _, _, _, _, _) -> readMempool outMempool) + return (inmp, outmp) + ) - inmp <- readMempool inboundMempool - outmp <- readMempool outboundMempool - return (inmp, outmp) where - - runTxSubmission :: [(TestAddress Int, Channel m ByteString, Channel m ByteString)] - -> TxChannelsVar m (TestAddress Int) txid (Tx txid) - -> SharedTxStateVar m (TestAddress Int) txid (Tx txid) - -> (Mempool m txid, Mempool m txid) - -> StrictTVar m (Map (TestAddress Int) PeerGSV) - -> TxDecisionPolicy + runTxSubmission :: Map peeraddr ( Mempool m txid -- ^ Outbound mempool + , ControlMessageSTM m + , Maybe DiffTime -- ^ Outbound delay + , Maybe DiffTime -- ^ Inbound delay + , Channel m ByteString -- ^ Outbound channel + , Channel m ByteString -- ^ Inbound channel + ) + -> TxChannelsVar m peeraddr txid (Tx txid) + -> SharedTxStateVar m peeraddr txid (Tx txid) + -> Mempool m txid -- ^ Inbound mempool + -> StrictTVar m (Map peeraddr PeerGSV) -> ((Async m Void, [Async m ((), Maybe ByteString)]) -> m b) -> m b - runTxSubmission addrs txChannelsVar sharedTxStateVar - (outboundMempool, inboundMempool) gsvVar txDecisionPolicy k = + runTxSubmission st txChannelsVar sharedTxStateVar + inboundMempool gsvVar k = withAsync (decisionLogicThread tracerDST txDecisionPolicy gsvVar txChannelsVar sharedTxStateVar) $ \a -> do - -- Construct txSubmission outbound client - let clients = - (\(addr, outboundChannel, _) -> - ( addr - , outboundChannel - , txSubmissionOutbound nullTracer - maxUnacked - (getMempoolReader outboundMempool) - NodeToNodeV_7 - controlMessageSTM - )) <$> addrs - - -- Construct txSubmission inbound server - servers <- forM addrs - (\(addr, _, inboundChannel) -> - withPeer - tracerDST - txChannelsVar - sharedTxStateVar - (getMempoolReader inboundMempool) - addr $ \api -> - pure $ - ( addr - , inboundChannel - , txSubmissionInboundV2 nullTracer - (getMempoolWriter inboundMempool) - api - ) - ) - - -- Construct txSubmission outbound client miniprotocol peer runner - let runPeerClients = - (\(_, channel, client) -> - runPeerWithLimits - (("OUTBOUND",) `contramap` tracer) - txSubmissionCodec2 - (byteLimitsTxSubmission2 (fromIntegral . BSL.length)) - timeLimitsTxSubmission2 - (maybe id delayChannel outboundDelay channel) - (txSubmissionClientPeer client)) - <$> clients - -- Construct txSubmission inbound server miniprotocol peer runner - runPeerServers = - (\(addr, channel, server) -> - runPipelinedPeerWithLimits - (("INBOUND " ++ show addr,) `contramap` verboseTracer) - txSubmissionCodec2 - (byteLimitsTxSubmission2 (fromIntegral . BSL.length)) - timeLimitsTxSubmission2 - (maybe id delayChannel inboundDelay channel) - (txSubmissionServerPeerPipelined server)) - <$> servers + -- Construct txSubmission outbound client + let clients = (\(addr, (mempool, ctrlMsgSTM, outDelay, _, outChannel, _)) -> do + let client = txSubmissionOutbound verboseTracer + (NumTxIdsToAck $ getNumTxIdsToReq + $ maxUnacknowledgedTxIds + $ txDecisionPolicy) + (getMempoolReader mempool) + NodeToNodeV_7 + ctrlMsgSTM + runPeerWithLimits (("OUTBOUND " ++ show addr,) `contramap` tracer) + txSubmissionCodec2 + (byteLimitsTxSubmission2 (fromIntegral . BSL.length)) + timeLimitsTxSubmission2 + (maybe id delayChannel outDelay outChannel) + (txSubmissionClientPeer client) + ) + <$> Map.assocs st + + -- Construct txSubmission inbound server + servers = (\(addr, (_, _, _, inDelay, _, inChannel)) -> + withPeer tracerDST + txChannelsVar + sharedTxStateVar + (getMempoolReader inboundMempool) + addr $ \api -> do + let server = txSubmissionInboundV2 verboseTracer + (getMempoolWriter inboundMempool) + api + runPipelinedPeerWithLimits + (("INBOUND " ++ show addr,) `contramap` verboseTracer) + txSubmissionCodec2 + (byteLimitsTxSubmission2 (fromIntegral . BSL.length)) + timeLimitsTxSubmission2 + (maybe id delayChannel inDelay inChannel) + (txSubmissionServerPeerPipelined server) + ) <$> Map.assocs st -- Run clients and servers - withAsyncAll (runPeerClients ++ runPeerServers) (\asyncs -> k (a, asyncs)) + withAsyncAll (clients ++ servers) (\asyncs -> k (a, asyncs)) withAsyncAll :: MonadAsync m => [m a] -> ([Async m a] -> m b) -> m b withAsyncAll xs0 action = go [] xs0 @@ -205,59 +235,107 @@ txSubmissionSimulation tracer tracerDST maxUnacked outboundTxs go as [] = action (reverse as) go as (x:xs) = withAsync x (\a -> go (a:as) xs) +prop_txSubmission + :: TxSubmissionV2State + -> Property +prop_txSubmission (TxSubmissionV2State state txDecisionPolicy) = + ioProperty $ do + tr' <- evaluateTrace (runSimTrace sim) + case tr' of + SimException e trace -> do + return $ counterexample (intercalate "\n" $ show e : trace) False + SimDeadLock trace -> do + return $ counterexample (intercalate "\n" $ "Deadlock" : trace) False + SimReturn (inmp, outmps) _trace -> do + r <- mapM (\outmp -> do + let outUniqueTxIds = nubBy (on (==) getTxId) outmp + outValidTxs = filter getTxValid outmp + case ( length outUniqueTxIds == length outmp + , length outValidTxs == length outmp + ) of + (True, True) -> + -- If we are presented with a stream of unique txids for valid + -- transactions the inbound transactions should match the outbound + -- transactions exactly. + return $ counterexample ("(True, True) " ++ show outmp) + $ checkMempools inmp (take (length inmp) outValidTxs) + + (True, False) -> + -- If we are presented with a stream of unique txids then we should have + -- fetched all valid transactions. + return $ counterexample ("(True, False) " ++ show outmp) + $ checkMempools inmp (take (length inmp) outValidTxs) + + (False, True) -> + -- If we are presented with a stream of valid txids then we should have + -- fetched some version of those transactions. + return $ counterexample ("(False, True) " ++ show outmp) + $ checkMempools (map getTxId inmp) + (take (length inmp) + (map getTxId $ filter getTxValid outUniqueTxIds)) + + (False, False) -> + -- If we are presented with a stream of valid and invalid Txs with + -- duplicate txids we're content with completing the protocol + -- without error. + return $ property True) + outmps + return $ counterexample (intercalate "\n" _trace) + $ conjoin r + where + sim :: forall s . IOSim s ([Tx Int], [[Tx Int]]) + sim = do + state' <- traverse (\(txs, mbOutDelay, mbInDelay) -> do + let mbOutDelayTime = getSmallDelay . getPositive <$> mbOutDelay + mbInDelayTime = getSmallDelay . getPositive <$> mbInDelay + controlMessageVar <- newTVarIO Continue + return ( txs + , controlMessageVar + , mbOutDelayTime + , mbInDelayTime + ) + ) + state + + state'' <- traverse (\(txs, var, mbOutDelay, mbInDelay) -> do + return ( txs + , readTVar var + , mbOutDelay + , mbInDelay + ) + ) + state' + + let simDelayTime = Map.foldl' (\m (txs, _, mbInDelay, mbOutDelay) -> + max m ( fromMaybe 1 (max <$> mbInDelay <*> mbOutDelay) + * realToFrac (length txs `div` 4) + ) + ) + 0 + $ state'' + controlMessageVars = (\(_, x, _, _) -> x) + <$> Map.elems state' + + _ <- async do + threadDelay (simDelayTime + 1000) + atomically (traverse_ (`writeTVar` Terminate) controlMessageVars) + + let tracer = verboseTracer <> debugTracer + tracer' = verboseTracer <> debugTracer + txSubmissionSimulation tracer tracer' state'' txDecisionPolicy + +checkMempools :: (Eq a, Show a) => [a] -> [a] -> Property +checkMempools [] [] = property True +checkMempools _ [] = property True +checkMempools [] _ = property False +checkMempools inp@(i : is) outp@(o : os) = + if o == i then counterexample (show inp ++ " " ++ show outp) + $ checkMempools is os + else counterexample (show inp ++ " " ++ show outp) + $ checkMempools is outp -prop_txSubmission :: Positive Word16 - -> NonEmptyList (Tx Int) - -> Maybe (Positive SmallDelay) - -- ^ The delay must be smaller (<) than 5s, so that overall - -- delay is less than 10s, otherwise 'smallDelay' in - -- 'timeLimitsTxSubmission2' will kick in. - -> Property -prop_txSubmission (Positive maxUnacked) (NonEmpty outboundTxs) delay = - let mbDelayTime = getSmallDelay . getPositive <$> delay - tr = (runSimTrace $ do - controlMessageVar <- newTVarIO Continue - _ <- - async $ do - threadDelay - (fromMaybe 1 mbDelayTime - * realToFrac (length outboundTxs `div` 4)) - atomically (writeTVar controlMessageVar Terminate) - txSubmissionSimulation - verboseTracer - verboseTracer - (NumTxIdsToAck maxUnacked) outboundTxs - (readTVar controlMessageVar) - mbDelayTime mbDelayTime - ) in - ioProperty $ do - tr' <- evaluateTrace tr - case tr' of - SimException e trace -> do - return $ counterexample (intercalate "\n" $ show e : trace) False - SimDeadLock trace -> do - return $ counterexample (intercalate "\n" $ "Deadlock" : trace) False - SimReturn (inmp, outmp) _trace -> do - -- printf "Log: %s\n" (intercalate "\n" _trace) - let outUniqueTxIds = nubBy (on (==) getTxId) outmp - outValidTxs = filter getTxValid outmp - case (length outUniqueTxIds == length outmp, length outValidTxs == length outmp) of - (True, True) -> - -- If we are presented with a stream of unique txids for valid - -- transactions the inbound transactions should match the outbound - -- transactions exactly. - return $ inmp === take (length inmp) outValidTxs - (True, False) -> - -- If we are presented with a stream of unique txids then we should have - -- fetched all valid transactions. - return $ inmp === take (length inmp) outValidTxs - (False, True) -> - -- If we are presented with a stream of valid txids then we should have - -- fetched some version of those transactions. - return $ map getTxId inmp === take (length inmp) (map getTxId $ - filter getTxValid outUniqueTxIds) - (False, False) - -- If we are presented with a stream of valid and invalid Txs with - -- duplicate txids we're content with completing the protocol - -- without error. - -> return $ property True +-- | Split a list into sub list of at most `n` elements. +-- +divvy :: Int -> [a] -> [[a]] +divvy _ [] = [] +divvy n as = take n as : divvy n (drop n as)