Skip to content

Commit

Permalink
Added tx submission ratio & improved simulation
Browse files Browse the repository at this point in the history
  • Loading branch information
bolt12 committed Sep 2, 2024
1 parent 906fa14 commit cd0ef52
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ import Ouroboros.Network.PeerSelection.State.LocalRootPeers (HotValency,
WarmValency)
import Ouroboros.Network.TxSubmission.Inbound.Policy (TxDecisionPolicy)
import Ouroboros.Network.TxSubmission.Inbound.Registry (decisionLogicThread)
import Ouroboros.Network.TxSubmission.Inbound.State (DebugSharedTxState)
import Ouroboros.Network.TxSubmission.Inbound.Types (TraceTxSubmissionInbound)
import Test.Ouroboros.Network.Diffusion.Node.ChainDB (addBlock,
getBlockPointSet)
Expand Down Expand Up @@ -200,8 +201,9 @@ run :: forall resolver m.
ResolverException m
-> Tracer m (TraceLabelPeer NtNAddr (TraceFetchClientState BlockHeader))
-> Tracer m (TraceTxSubmissionInbound Int (Tx Int))
-> Tracer m (DebugSharedTxState NtNAddr Int (Tx Int))
-> m Void
run blockGeneratorArgs limits ni na tracersExtra tracerBlockFetch tracerTxSubmissionInbound =
run blockGeneratorArgs limits ni na tracersExtra tracerBlockFetch tracerTxSubmissionInbound tracerTxSubmissionDebug =
Node.withNodeKernelThread blockGeneratorArgs (aTxs na)
$ \ nodeKernel nodeKernelThread -> do
dnsTimeoutScriptVar <- newTVarIO (aDNSTimeoutScript na)
Expand Down Expand Up @@ -275,7 +277,7 @@ run blockGeneratorArgs limits ni na tracersExtra tracerBlockFetch tracerTxSubmis
, Diff.P2P.daPeerSharingRegistry = nkPeerSharingRegistry nodeKernel
}

let apps = Node.applications (aDebugTracer na) tracerTxSubmissionInbound nodeKernel Node.cborCodecs limits appArgs blockHeader
let apps = Node.applications (aDebugTracer na) tracerTxSubmissionInbound tracerTxSubmissionDebug nodeKernel Node.cborCodecs limits appArgs blockHeader

withAsync
(Diff.P2P.runM interfaces
Expand All @@ -287,7 +289,7 @@ run blockGeneratorArgs limits ni na tracersExtra tracerBlockFetch tracerTxSubmis
withAsync (blockFetch nodeKernel) $ \blockFetchLogicThread ->

withAsync (decisionLogicThread
(contramap show $ aDebugTracer na)
tracerTxSubmissionDebug
(aTxDecisionPolicy na)
(readPeerGSVs (nkFetchClientRegistry nodeKernel))
(nkTxChannelsVar nodeKernel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ import Ouroboros.Network.TxSubmission.Inbound.Policy (TxDecisionPolicy (..))
import Ouroboros.Network.TxSubmission.Inbound.Registry (SharedTxStateVar,
TxChannelsVar, withPeer)
import Ouroboros.Network.TxSubmission.Inbound.Server (txSubmissionInboundV2)
import Ouroboros.Network.TxSubmission.Inbound.State (DebugSharedTxState)
import Ouroboros.Network.TxSubmission.Inbound.Types (TraceTxSubmissionInbound)
import Ouroboros.Network.TxSubmission.Outbound (txSubmissionOutbound)
import Test.Ouroboros.Network.Diffusion.Node.NodeKernel
Expand Down Expand Up @@ -260,6 +261,7 @@ applications :: forall block header s m.
)
=> Tracer m String
-> Tracer m (TraceTxSubmissionInbound Int (Tx Int))
-> Tracer m (DebugSharedTxState NtNAddr Int (Tx Int))
-> NodeKernel header block s Int m
-> Codecs NtNAddr header block m
-> LimitsAndTimeouts header block
Expand All @@ -268,7 +270,7 @@ applications :: forall block header s m.
-> Diff.Applications NtNAddr NtNVersion NtNVersionData
NtCAddr NtCVersion NtCVersionData
m ()
applications debugTracer txSubmissionInboundTracer nodeKernel
applications debugTracer txSubmissionInboundTracer txSubmissionInboundDebug nodeKernel
Codecs { chainSyncCodec, blockFetchCodec
, keepAliveCodec, pingPongCodec
, peerSharingCodec
Expand Down Expand Up @@ -678,7 +680,7 @@ applications debugTracer txSubmissionInboundTracer nodeKernel
MiniProtocolCb $
\ ResponderContext { rcConnectionId = connId@ConnectionId { remoteAddress = them }} channel
-> do
withPeer ((show . (connId,)) `contramap` debugTracer)
withPeer txSubmissionInboundDebug
txChannelsVar
sharedTxStateVar
(getMempoolReader mempool)
Expand Down
60 changes: 56 additions & 4 deletions ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/Testnet.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}

#if defined(mingw32_HOST_OS)
{-# OPTIONS_GHC -Wno-unused-top-binds #-}
Expand All @@ -22,7 +22,7 @@ import Control.Monad.Class.MonadTime.SI (DiffTime, Time (Time), addTime,
import Control.Monad.IOSim
import Data.Bifoldable (bifoldMap)

import Data.Foldable (fold)
import Data.Foldable (fold, foldr')
import Data.IP qualified as IP
import Data.List as List (find, foldl', intercalate, sort, tails)
import Data.List.Trace qualified as Trace
Expand Down Expand Up @@ -101,9 +101,13 @@ import Test.Ouroboros.Network.LedgerPeers (LedgerPools (..))
import Control.Monad.Class.MonadTest (exploreRaces)
import Data.Bifunctor (bimap)
import Data.Char (ord)
import Data.Ratio (Ratio)
import Ouroboros.Network.PeerSelection.Bootstrap (requiresBootstrapPeers)
import Ouroboros.Network.PeerSelection.LedgerPeers
import Ouroboros.Network.TxSubmission.Inbound.Policy (defaultTxDecisionPolicy)
import Ouroboros.Network.TxSubmission.Inbound.Policy (defaultTxDecisionPolicy,
txInflightMultiplicity)
import Ouroboros.Network.TxSubmission.Inbound.State (DebugSharedTxState (..),
inflightTxs)
import Ouroboros.Network.TxSubmission.Inbound.Types
(TraceTxSubmissionInbound (..))
import Ouroboros.Network.TxSubmission.Outbound (TxSubmissionProtocolError (..))
Expand Down Expand Up @@ -238,6 +242,8 @@ tests =
(testWithIOSim prop_no_txSubmission_error 125000)
, testProperty "all transactions"
unit_txSubmission_allTransactions
, testProperty "inflight coverage"
prop_check_inflight_ratio
]
, testGroup "Churn"
[ testProperty "no timeouts"
Expand Down Expand Up @@ -575,7 +581,7 @@ unit_txSubmission_allTransactions (ArbTxDecisionPolicy decisionPolicy)
case x of
-- When we add txids to the mempool, we collect them
-- into the map
DiffusionTxSubmission (TraceTxInboundAddedToMempool txids) ->
DiffusionTxSubmissionInbound (TraceTxInboundAddedToMempool txids) ->
Map.alter (maybe (Just []) (Just . sort . (txids ++))) n rr
-- When the node is shutdown we have to reset the accepted
-- txids list
Expand Down Expand Up @@ -615,6 +621,52 @@ unit_txSubmission_allTransactions (ArbTxDecisionPolicy decisionPolicy)
_ -> counterexample "Didn't find any entry in the map!"
$ False

-- | This test checks the ratio of the inflight txs against the allowed by the
-- TxDecisionPolicy.
--
prop_check_inflight_ratio :: AbsBearerInfo
-> DiffusionScript
-> Property
prop_check_inflight_ratio bi ds@(DiffusionScript simArgs _ _) =
let sim :: forall s . IOSim s Void
sim = diffusionSimulation (toBearerInfo bi)
ds
iosimTracer

events :: Events DiffusionTestTrace
events = Signal.eventsFromList
. Trace.toList
. fmap ( (\(WithTime t (WithName _ b)) -> (t, b))
)
. withTimeNameTraceEvents
@DiffusionTestTrace
@NtNAddr
. Trace.take 500000
$ runSimTrace
$ sim

inflightTxsMap =
foldr'
(\(_, m) r -> Map.unionWith (max) m r
)
Map.empty
$ Signal.eventsToList
$ Signal.selectEvents
(\case
DiffusionTxSubmissionDebug (DebugSharedTxState _ d) -> Just (inflightTxs d)
_ -> Nothing
)
$ events

txDecisionPolicy = saTxDecisionPolicy simArgs

in tabulate "Max observeed ratio of inflight multiplicity by the max stipulated by the policy"
(map (\m -> "has " ++ show m ++ " in flight - ratio: "
++ show @(Ratio Int) (fromIntegral m / fromIntegral (txInflightMultiplicity txDecisionPolicy))
)
(Map.elems inflightTxsMap))
$ True

-- | This test coverage of InboundGovernor transitions.
--
prop_inbound_governor_transitions_coverage :: AbsBearerInfo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import Control.Monad.IOSim (IOSim, traceM)
import Data.ByteString.Char8 qualified as BSC
import Data.ByteString.Lazy qualified as BL
import Data.IP (IP (..))
import Data.List (delete, foldl', nubBy)
import Data.List (delete, nubBy)
import Data.Map (Map)
import Data.Map qualified as Map
import Data.Maybe (catMaybes, fromMaybe, maybeToList)
Expand Down Expand Up @@ -116,7 +116,6 @@ import Test.Ouroboros.Network.PeerSelection.RootPeersDNS qualified as PeerSelect
(tests)

import Data.Bool (bool)
import Data.Char (ord)
import Data.Function (on)
import Data.Typeable (Typeable)
import Ouroboros.Network.BlockFetch (FetchMode (..), TraceFetchClientState,
Expand All @@ -141,6 +140,7 @@ import Ouroboros.Network.Protocol.PeerSharing.Codec (byteLimitsPeerSharing,
import Ouroboros.Network.Protocol.TxSubmission2.Codec (byteLimitsTxSubmission2,
timeLimitsTxSubmission2)
import Ouroboros.Network.TxSubmission.Inbound.Policy (TxDecisionPolicy)
import Ouroboros.Network.TxSubmission.Inbound.State (DebugSharedTxState)
import Ouroboros.Network.TxSubmission.Inbound.Types (TraceTxSubmissionInbound)
import Test.Ouroboros.Network.LedgerPeers (LedgerPools (..), genLedgerPoolsFrom)
import Test.Ouroboros.Network.PeerSelection.LocalRootPeers ()
Expand Down Expand Up @@ -414,8 +414,9 @@ genNodeArgs :: [RelayAccessInfo]
-> Int
-> [(HotValency, WarmValency, Map RelayAccessPoint (PeerAdvertise, PeerTrustable))]
-> RelayAccessInfo
-> [Tx Int]
-> Gen NodeArgs
genNodeArgs relays minConnected localRootPeers relay = flip suchThat hasUpstream $ do
genNodeArgs relays minConnected localRootPeers relay txs = flip suchThat hasUpstream $ do
-- Slot length needs to be greater than 0 else we get a livelock on
-- the IOSim.
--
Expand Down Expand Up @@ -486,10 +487,6 @@ genNodeArgs relays minConnected localRootPeers relay = flip suchThat hasUpstream
<$> arbitrary)
let bootstrapPeersDomain = Script (firstBootstrapPeer :| bootstrapPeers)

nTxs <- chooseInt (0, 10)
placeholderTxs <- vectorOf nTxs (arbitrary :: Gen (Tx Int))
let txs = map (\(t, i) -> t { getTxId = (foldl' (+) 0 $ map ord $ show relay) + i })
(zip placeholderTxs [0 :: Int ..])
return
$ NodeArgs
{ naSeed = seed
Expand Down Expand Up @@ -741,22 +738,35 @@ genDiffusionScript genLocalRootPeers
= do
ArbTxDecisionPolicy txDecisionPolicy <- arbitrary
let simArgs = mainnetSimArgs (length relays') txDecisionPolicy
nodesWithCommands <- mapM go (nubBy ((==) `on` getRelayIP) relays')
txs <- makeUniqueIds 0
<$> vectorOf (length relays') (choose (10, 100) >>= \c -> vectorOf c arbitrary)
nodesWithCommands <- mapM go (zip (nubBy ((==) `on` getRelayIP) relays') txs)
return (simArgs, dnsMapScript, nodesWithCommands)
where
makeUniqueIds :: Int -> [[Tx Int]] -> [[Tx Int]]
makeUniqueIds _ [] = []
makeUniqueIds i (l:ls) =
let (r, i') = makeUniqueIds' l i
in r : makeUniqueIds i' ls

makeUniqueIds' :: [Tx Int] -> Int -> ([Tx Int], Int)
makeUniqueIds' l i = ( map (\(tx, x) -> tx {getTxId = x}) (zip l [i..])
, i + length l + 1
)

getRelayIP :: RelayAccessInfo -> IP
getRelayIP (RelayAddrInfo ip _ _) = ip
getRelayIP (RelayDomainInfo _ ip _ _) = ip

relays' :: [RelayAccessInfo]
relays' = getRelayAccessInfos relays

go :: RelayAccessInfo -> Gen (NodeArgs, [Command])
go relay = do
go :: (RelayAccessInfo, [Tx Int]) -> Gen (NodeArgs, [Command])
go (relay, txs) = do
let otherRelays = relay `delete` relays'
minConnected = 3 `max` (length relays' - 1)
localRts <- genLocalRootPeers otherRelays relay
nodeArgs <- genNodeArgs relays' minConnected localRts relay
nodeArgs <- genNodeArgs relays' minConnected localRts relay txs
commands <- genCommands localRts
return (nodeArgs, commands)

Expand Down Expand Up @@ -982,7 +992,8 @@ data DiffusionTestTrace =
| DiffusionInboundGovernorTrace (InboundGovernorTrace NtNAddr)
| DiffusionServerTrace (ServerTrace NtNAddr)
| DiffusionFetchTrace (TraceFetchClientState BlockHeader)
| DiffusionTxSubmission (TraceTxSubmissionInbound Int (Tx Int))
| DiffusionTxSubmissionInbound (TraceTxSubmissionInbound Int (Tx Int))
| DiffusionTxSubmissionDebug (DebugSharedTxState NtNAddr Int (Tx Int))
| DiffusionDebugTrace String
deriving (Show)

Expand Down Expand Up @@ -1276,7 +1287,11 @@ diffusionSimulation
. tracerWithName addr
. tracerWithTime
$ nodeTracer)
( contramap DiffusionTxSubmission
( contramap DiffusionTxSubmissionInbound
. tracerWithName addr
. tracerWithTime
$ nodeTracer)
( contramap DiffusionTxSubmissionDebug
. tracerWithName addr
. tracerWithTime
$ nodeTracer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1112,15 +1112,15 @@ instance Arbitrary ArbTxDecisionPolicy where
<*> (getSmall . getPositive <$> arbitrary)
<*> (SizeInBytes . getPositive <$> arbitrary)
<*> (SizeInBytes . getPositive <$> arbitrary)
<*> (getPositive <$> arbitrary))
<*> (getSmall . getPositive <$> arbitrary))

shrink (ArbTxDecisionPolicy a@TxDecisionPolicy {
maxNumTxIdsToRequest,
txsSizeInflightPerPeer,
maxTxsSizeInflight,
txInflightMultiplicity }) =
[ ArbTxDecisionPolicy a { maxNumTxIdsToRequest = NumTxIdsToReq x }
| (Positive x) <- shrink (Positive (getNumTxIdsToReq maxNumTxIdsToRequest))
| (Positive (Small x)) <- shrink (Positive (Small (getNumTxIdsToReq maxNumTxIdsToRequest)))
]
++
[ ArbTxDecisionPolicy . fixupTxDecisionPolicy
Expand All @@ -1135,7 +1135,7 @@ instance Arbitrary ArbTxDecisionPolicy where
++
[ ArbTxDecisionPolicy . fixupTxDecisionPolicy
$ a { txInflightMultiplicity = x }
| Positive x <- shrink (Positive txInflightMultiplicity)
| Positive (Small x) <- shrink (Positive (Small txInflightMultiplicity))
]


Expand Down

0 comments on commit cd0ef52

Please sign in to comment.