Skip to content

Commit

Permalink
Add tx sybmission V2 protocol to the testnet sim
Browse files Browse the repository at this point in the history
  • Loading branch information
bolt12 committed Aug 22, 2024
1 parent 3f32750 commit eecc50f
Show file tree
Hide file tree
Showing 9 changed files with 194 additions and 22 deletions.
1 change: 1 addition & 0 deletions ouroboros-network-api/src/Ouroboros/Network/SizeInBytes.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import Quiet (Quiet (..))
newtype SizeInBytes = SizeInBytes { getSizeInBytes :: Word32 }
deriving (Eq, Ord)
deriving Show via Quiet SizeInBytes
deriving Bounded via Word32
deriving Enum via Word32
deriving Num via Word32
deriving Real via Word32
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ import Ouroboros.Network.Testing.Data.Script (Script (..), stepScriptSTM')

import Simulation.Network.Snocket (AddressType (..), FD)

import Ouroboros.Network.BlockFetch.ClientRegistry (readPeerGSVs)
import Ouroboros.Network.PeerSelection.Bootstrap (UseBootstrapPeers)
import Ouroboros.Network.PeerSelection.LedgerPeers.Type
(LedgerPeersConsensusInterface, UseLedgerPeers)
Expand All @@ -103,6 +104,8 @@ import Ouroboros.Network.PeerSelection.RelayAccessPoint (DomainAccessPoint,
import Ouroboros.Network.PeerSelection.RootPeersDNS.DNSActions (DNSLookupType)
import Ouroboros.Network.PeerSelection.State.LocalRootPeers (HotValency,
WarmValency)
import Ouroboros.Network.TxSubmission.Inbound.Policy (TxDecisionPolicy)
import Ouroboros.Network.TxSubmission.Inbound.Registry (decisionLogicThread)
import Test.Ouroboros.Network.Diffusion.Node.ChainDB (addBlock,
getBlockPointSet)
import Test.Ouroboros.Network.Diffusion.Node.MiniProtocols qualified as Node
Expand Down Expand Up @@ -155,6 +158,7 @@ data Arguments m = Arguments
, aDNSTimeoutScript :: Script DNSTimeout
, aDNSLookupDelayScript :: Script DNSLookupDelay
, aDebugTracer :: Tracer m String
, aTxDecisionPolicy :: TxDecisionPolicy
}

-- The 'mockDNSActions' is not using \/ specifying 'resolverException', thus we
Expand Down Expand Up @@ -277,9 +281,17 @@ run blockGeneratorArgs limits ni na tracersExtra tracerBlockFetch =
(mkArgsExtra useBootstrapPeersScriptVar) apps appsExtra)
$ \ diffusionThread ->
withAsync (blockFetch nodeKernel) $ \blockFetchLogicThread ->
wait diffusionThread
<> wait blockFetchLogicThread
<> wait nodeKernelThread

withAsync (decisionLogicThread
nullTracer
(aTxDecisionPolicy na)
(readPeerGSVs (nkFetchClientRegistry nodeKernel))
(nkTxChannelsVar nodeKernel)
(nkSharedTxStateVar nodeKernel)) $ \decLogicThread ->
wait diffusionThread
<> wait blockFetchLogicThread
<> wait nodeKernelThread
<> wait decLogicThread
where
blockFetch :: NodeKernel BlockHeader Block s m
-> m Void
Expand Down Expand Up @@ -415,6 +427,7 @@ run blockGeneratorArgs limits ni na tracersExtra tracerBlockFetch =
, Node.aaOwnPeerSharing = aOwnPeerSharing na
, Node.aaUpdateOutboundConnectionsState =
iUpdateOutboundConnectionsState ni
, Node.aaTxDecisionPolicy = aTxDecisionPolicy na
}

--- Utils
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ import Pipes qualified

import Ouroboros.Network.NodeToNode (blockFetchMiniProtocolNum,
chainSyncMiniProtocolNum, keepAliveMiniProtocolNum,
peerSharingMiniProtocolNum)
peerSharingMiniProtocolNum, txSubmissionMiniProtocolNum)
import Ouroboros.Network.PeerSelection.LedgerPeers
import Ouroboros.Network.PeerSelection.LocalRootPeers (OutboundConnectionsState)
import Ouroboros.Network.PeerSelection.PeerSharing qualified as PSTypes
Expand All @@ -96,7 +96,19 @@ import Ouroboros.Network.Protocol.PeerSharing.Client (peerSharingClientPeer)
import Ouroboros.Network.Protocol.PeerSharing.Codec (codecPeerSharing)
import Ouroboros.Network.Protocol.PeerSharing.Server (peerSharingServerPeer)
import Ouroboros.Network.Protocol.PeerSharing.Type (PeerSharing)
import Ouroboros.Network.Protocol.TxSubmission2.Client (txSubmissionClientPeer)
import Ouroboros.Network.Protocol.TxSubmission2.Server
(txSubmissionServerPeerPipelined)
import Ouroboros.Network.Protocol.TxSubmission2.Type (NumTxIdsToAck (..),
NumTxIdsToReq (..), TxSubmission2)
import Ouroboros.Network.TxSubmission.Inbound.Policy (TxDecisionPolicy (..))
import Ouroboros.Network.TxSubmission.Inbound.Registry (SharedTxStateVar,
TxChannelsVar, withPeer)
import Ouroboros.Network.TxSubmission.Inbound.Server (txSubmissionInboundV2)
import Ouroboros.Network.TxSubmission.Outbound (txSubmissionOutbound)
import Test.Ouroboros.Network.Diffusion.Node.NodeKernel
import Test.Ouroboros.Network.TxSubmission.Common (Mempool, Tx,
getMempoolReader, getMempoolWriter, txSubmissionCodec2)


-- | Protocol codecs.
Expand All @@ -112,6 +124,8 @@ data Codecs addr header block m = Codecs
CBOR.DeserialiseFailure m ByteString
, peerSharingCodec :: Codec (PeerSharing addr)
CBOR.DeserialiseFailure m ByteString
, txSubmissionCodec :: Codec (TxSubmission2 Int (Tx Int))
CBOR.DeserialiseFailure m ByteString
}

cborCodecs :: MonadST m => Codecs NtNAddr BlockHeader Block m
Expand All @@ -125,6 +139,7 @@ cborCodecs = Codecs
, keepAliveCodec = codecKeepAlive_v2
, pingPongCodec = codecPingPong
, peerSharingCodec = codecPeerSharing encodeNtNAddr decodeNtNAddr
, txSubmissionCodec = txSubmissionCodec2
}


Expand Down Expand Up @@ -178,6 +193,14 @@ data LimitsAndTimeouts header block = LimitsAndTimeouts
:: ProtocolTimeLimits (PeerSharing NtNAddr)
, peerSharingSizeLimits
:: ProtocolSizeLimits (PeerSharing NtNAddr) ByteString

-- tx submission
, txSubmissionLimits
:: MiniProtocolLimits
, txSubmissionTimeLimits
:: ProtocolTimeLimits (TxSubmission2 Int (Tx Int))
, txSubmissionSizeLimits
:: ProtocolSizeLimits (TxSubmission2 Int (Tx Int)) ByteString
}


Expand Down Expand Up @@ -208,6 +231,8 @@ data AppArgs header block m = AppArgs
:: PSTypes.PeerSharing
, aaUpdateOutboundConnectionsState
:: OutboundConnectionsState -> STM m ()

, aaTxDecisionPolicy :: TxDecisionPolicy
}


Expand Down Expand Up @@ -245,6 +270,7 @@ applications debugTracer nodeKernel
Codecs { chainSyncCodec, blockFetchCodec
, keepAliveCodec, pingPongCodec
, peerSharingCodec
, txSubmissionCodec
}
limits
AppArgs
Expand All @@ -257,6 +283,7 @@ applications debugTracer nodeKernel
, aaChainSyncEarlyExit
, aaOwnPeerSharing
, aaUpdateOutboundConnectionsState
, aaTxDecisionPolicy
}
toHeader =
Diff.Applications
Expand Down Expand Up @@ -316,6 +343,17 @@ applications debugTracer nodeKernel
blockFetchInitiator
blockFetchResponder
}

, MiniProtocol {
miniProtocolNum = txSubmissionMiniProtocolNum,
miniProtocolLimits = txSubmissionLimits limits,
miniProtocolRun =
InitiatorAndResponderProtocol
(txSubmissionInitiator aaTxDecisionPolicy (nkMempool nodeKernel))
(txSubmissionResponder (nkMempool nodeKernel)
(nkTxChannelsVar nodeKernel)
(nkSharedTxStateVar nodeKernel))
}
]
, withWarm = WithWarm
[ MiniProtocol
Expand Down Expand Up @@ -600,6 +638,76 @@ applications debugTracer nodeKernel
$ peerSharingServerPeer
$ peerSharingServer psAPI

txSubmissionInitiator
:: TxDecisionPolicy
-> Mempool m Int
-> MiniProtocolCb (ExpandedInitiatorContext NtNAddr m) ByteString m ()
txSubmissionInitiator txDecisionPolicy mempool =
MiniProtocolCb $
\ ExpandedInitiatorContext {
eicConnectionId = connId,
eicControlMessage = controlMessageSTM
}
channel
-> do
let client = txSubmissionOutbound
((show . (connId,)) `contramap` debugTracer)
(NumTxIdsToAck $ getNumTxIdsToReq
$ maxUnacknowledgedTxIds
$ txDecisionPolicy)
(getMempoolReader mempool)
maxBound
controlMessageSTM
labelThisThread "TxSubmissionClient"
runPeerWithLimits
((show . (connId,)) `contramap` debugTracer)
txSubmissionCodec
(txSubmissionSizeLimits limits)
(txSubmissionTimeLimits limits)
channel
(txSubmissionClientPeer client)

txSubmissionResponder
:: Mempool m Int
-> TxChannelsVar m NtNAddr Int (Tx Int)
-> SharedTxStateVar m NtNAddr Int (Tx Int)
-> MiniProtocolCb (ResponderContext NtNAddr) ByteString m ()
txSubmissionResponder mempool txChannelsVar sharedTxStateVar =
MiniProtocolCb $
\ ResponderContext { rcConnectionId = connId@ConnectionId { remoteAddress = them }} channel
-> do
withPeer ((show . (connId,)) `contramap` debugTracer)
txChannelsVar
sharedTxStateVar
(getMempoolReader mempool)
them $ \api -> do
let server = txSubmissionInboundV2
((show . (connId,)) `contramap` debugTracer)
(getMempoolWriter mempool)
api
labelThisThread "TxSubmissionServer"
runPipelinedPeerWithLimits
((show . (connId,)) `contramap` debugTracer)
txSubmissionCodec
(txSubmissionSizeLimits limits)
(txSubmissionTimeLimits limits)
channel
(txSubmissionServerPeerPipelined server)

-- aTxSubmission2Server
-- :: NodeToNodeVersion
-- -> ResponderContext addrNTN
-- -> Channel m bTX
-- -> m ((), Maybe bTX)
-- aTxSubmission2Server version ResponderContext { rcConnectionId = them } channel = do
-- labelThisThread "TxSubmissionServer"
-- runPipelinedPeerWithLimits
-- (contramap (TraceLabelPeer them) tTxSubmission2Tracer)
-- (cTxSubmission2Codec (mkCodecs version))
-- blTxSubmission2
-- timeLimitsTxSubmission2
-- channel
-- (txSubmissionServerPeerPipelined (hTxSubmissionServer version them))

--
-- Orphaned Instances
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ import Test.Ouroboros.Network.Orphans ()

import Codec.CBOR.Decoding qualified as CBOR
import Codec.CBOR.Encoding qualified as CBOR
import Control.Concurrent.Class.MonadMVar.Strict qualified as Strict
import Ouroboros.Network.Mock.Chain (Chain (..))
import Ouroboros.Network.NodeToNode ()
import Ouroboros.Network.PeerSelection.Governor (PublicPeerSelectionState,
Expand All @@ -85,8 +86,11 @@ import Ouroboros.Network.PeerSelection.RelayAccessPoint (RelayAccessPoint (..))
import Ouroboros.Network.PeerSharing (PeerSharingAPI, PeerSharingRegistry (..),
newPeerSharingAPI, newPeerSharingRegistry,
ps_POLICY_PEER_SHARE_MAX_PEERS, ps_POLICY_PEER_SHARE_STICKY_TIME)
import Ouroboros.Network.TxSubmission.Inbound.Registry (SharedTxStateVar,
TxChannels (..), TxChannelsVar, newSharedTxStateVar)
import Test.Ouroboros.Network.Diffusion.Node.ChainDB (ChainDB (..))
import Test.Ouroboros.Network.Diffusion.Node.ChainDB qualified as ChainDB
import Test.Ouroboros.Network.TxSubmission.Common (Mempool, Tx, emptyMempool)
import Test.QuickCheck (Arbitrary (..), choose, chooseInt, frequency, oneof)


Expand Down Expand Up @@ -268,10 +272,17 @@ data NodeKernel header block s m = NodeKernel {

nkPeerSharingAPI :: PeerSharingAPI NtNAddr s m,

nkPublicPeerSelectionVar :: StrictTVar m (PublicPeerSelectionState NtNAddr)
nkPublicPeerSelectionVar :: StrictTVar m (PublicPeerSelectionState NtNAddr),

nkMempool :: Mempool m Int,

nkTxChannelsVar :: TxChannelsVar m NtNAddr Int (Tx Int),

nkSharedTxStateVar :: SharedTxStateVar m NtNAddr Int (Tx Int)
}

newNodeKernel :: ( MonadSTM m
, Strict.MonadMVar m
, RandomGen s
)
=> s -> m (NodeKernel header block s m)
Expand All @@ -287,6 +298,9 @@ newNodeKernel rng = do
ps_POLICY_PEER_SHARE_STICKY_TIME
ps_POLICY_PEER_SHARE_MAX_PEERS
<*> pure publicStateVar
<*> emptyMempool
<*> Strict.newMVar (TxChannels Map.empty)
<*> newSharedTxStateVar

-- | Register a new upstream chain-sync client.
--
Expand Down Expand Up @@ -364,6 +378,7 @@ withNodeKernelThread
, MonadTimer m
, MonadThrow m
, MonadThrow (STM m)
, Strict.MonadMVar m
, HasFullHeader block
, RandomGen seed
)
Expand Down
24 changes: 17 additions & 7 deletions ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/Testnet.hs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ import Test.Ouroboros.Network.LedgerPeers (LedgerPools (..))
import Control.Monad.Class.MonadTest (exploreRaces)
import Ouroboros.Network.PeerSelection.Bootstrap (requiresBootstrapPeers)
import Ouroboros.Network.PeerSelection.LedgerPeers
import Ouroboros.Network.TxSubmission.Inbound.Policy (defaultTxDecisionPolicy)

tests :: TestTree
tests =
Expand Down Expand Up @@ -734,6 +735,7 @@ unit_4177 = prop_inbound_governor_transitions_coverage absNoAttenuation script
Nothing
False
(Script (FetchModeDeadline :| []))
defaultTxDecisionPolicy
, [JoinNetwork 1.742857142857
,Reconfigure 6.33333333333 [(1,1,Map.fromList [(RelayAccessDomain "test2" 65535,(DoAdvertisePeer, IsNotTrustable))]),
(1,1,Map.fromList [(RelayAccessAddress "0:6:0:3:0:6:0:5" 65530,(DoAdvertisePeer, IsNotTrustable))
Expand Down Expand Up @@ -766,6 +768,7 @@ unit_4177 = prop_inbound_governor_transitions_coverage absNoAttenuation script
Nothing
False
(Script (FetchModeDeadline :| []))
defaultTxDecisionPolicy
, [JoinNetwork 0.183783783783
,Reconfigure 4.533333333333 [(1,1,Map.fromList [])]
]
Expand Down Expand Up @@ -1347,6 +1350,7 @@ unit_4191 = testWithIOSim prop_diffusion_dns_can_recover 125000 absInfo script
Nothing
False
(Script (FetchModeDeadline :| []))
defaultTxDecisionPolicy
, [ JoinNetwork 6.710144927536
, Kill 7.454545454545
, JoinNetwork 10.763157894736
Expand Down Expand Up @@ -2275,7 +2279,8 @@ async_demotion_network_script =
naChainSyncEarlyExit
= False,
naPeerSharing = PeerSharingDisabled,
naFetchModeScript = singletonScript FetchModeDeadline
naFetchModeScript = singletonScript FetchModeDeadline,
naTxDecisionPolicy = defaultTxDecisionPolicy
}


Expand Down Expand Up @@ -2719,6 +2724,7 @@ prop_unit_4258 =
Nothing
False
(Script (FetchModeDeadline :| []))
defaultTxDecisionPolicy
, [ JoinNetwork 4.166666666666,
Kill 0.3,
JoinNetwork 1.517857142857,
Expand Down Expand Up @@ -2759,6 +2765,7 @@ prop_unit_4258 =
Nothing
False
(Script (FetchModeDeadline :| []))
defaultTxDecisionPolicy
, [ JoinNetwork 3.384615384615,
Reconfigure 3.583333333333 [(1,1,Map.fromList [(RelayAccessAddress "0.0.0.4" 9,(DoNotAdvertisePeer, IsNotTrustable))])],
Kill 15.55555555555,
Expand Down Expand Up @@ -2821,6 +2828,7 @@ prop_unit_reconnect =
Nothing
False
(Script (FetchModeDeadline :| []))
defaultTxDecisionPolicy
, [ JoinNetwork 0
])
, (NodeArgs
Expand All @@ -2843,11 +2851,12 @@ prop_unit_reconnect =
targetNumberOfEstablishedBigLedgerPeers = 0,
targetNumberOfActiveBigLedgerPeers = 0
}
(Script (DNSTimeout {getDNSTimeout = 10} :| [ ]))
(Script (DNSLookupDelay {getDNSLookupDelay = 0} :| []))
Nothing
False
(Script (FetchModeDeadline :| []))
(Script (DNSTimeout {getDNSTimeout = 10} :| [ ]))
(Script (DNSLookupDelay {getDNSLookupDelay = 0} :| []))
Nothing
False
(Script (FetchModeDeadline :| []))
defaultTxDecisionPolicy
, [ JoinNetwork 10
])
]
Expand Down Expand Up @@ -3253,7 +3262,8 @@ unit_peer_sharing =
naDNSLookupDelayScript = singletonScript (DNSLookupDelay 0.01),
naChainSyncEarlyExit = False,
naChainSyncExitOnBlockNo = Nothing,
naFetchModeScript = singletonScript FetchModeDeadline
naFetchModeScript = singletonScript FetchModeDeadline,
naTxDecisionPolicy = defaultTxDecisionPolicy
}

script = DiffusionScript
Expand Down
Loading

0 comments on commit eecc50f

Please sign in to comment.