Skip to content

Commit

Permalink
Adds tx submission diffusion testnet test
Browse files Browse the repository at this point in the history
This test checks that even in the presence of a node that keeps
disconnecting, but eventually stays online, we manage to learn about all
its transactions.
  • Loading branch information
bolt12 committed Aug 30, 2024
1 parent 97ca973 commit 906fa14
Show file tree
Hide file tree
Showing 7 changed files with 257 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ import Ouroboros.Network.PeerSelection.State.LocalRootPeers (HotValency,
WarmValency)
import Ouroboros.Network.TxSubmission.Inbound.Policy (TxDecisionPolicy)
import Ouroboros.Network.TxSubmission.Inbound.Registry (decisionLogicThread)
import Ouroboros.Network.TxSubmission.Inbound.Types (TraceTxSubmissionInbound)
import Test.Ouroboros.Network.Diffusion.Node.ChainDB (addBlock,
getBlockPointSet)
import Test.Ouroboros.Network.Diffusion.Node.MiniProtocols qualified as Node
Expand All @@ -115,6 +116,7 @@ import Test.Ouroboros.Network.Diffusion.Node.NodeKernel (NodeKernel (..),
import Test.Ouroboros.Network.Diffusion.Node.NodeKernel qualified as Node
import Test.Ouroboros.Network.PeerSelection.RootPeersDNS (DNSLookupDelay,
DNSTimeout, mockDNSActions)
import Test.Ouroboros.Network.TxSubmission.Common (Tx)


data Interfaces m = Interfaces
Expand Down Expand Up @@ -159,6 +161,7 @@ data Arguments m = Arguments
, aDNSLookupDelayScript :: Script DNSLookupDelay
, aDebugTracer :: Tracer m String
, aTxDecisionPolicy :: TxDecisionPolicy
, aTxs :: [Tx Int]
}

-- The 'mockDNSActions' is not using \/ specifying 'resolverException', thus we
Expand Down Expand Up @@ -196,9 +199,10 @@ run :: forall resolver m.
NtCAddr NtCVersion NtCVersionData
ResolverException m
-> Tracer m (TraceLabelPeer NtNAddr (TraceFetchClientState BlockHeader))
-> Tracer m (TraceTxSubmissionInbound Int (Tx Int))
-> m Void
run blockGeneratorArgs limits ni na tracersExtra tracerBlockFetch =
Node.withNodeKernelThread blockGeneratorArgs
run blockGeneratorArgs limits ni na tracersExtra tracerBlockFetch tracerTxSubmissionInbound =
Node.withNodeKernelThread blockGeneratorArgs (aTxs na)
$ \ nodeKernel nodeKernelThread -> do
dnsTimeoutScriptVar <- newTVarIO (aDNSTimeoutScript na)
dnsLookupDelayScriptVar <- newTVarIO (aDNSLookupDelayScript na)
Expand Down Expand Up @@ -271,7 +275,7 @@ run blockGeneratorArgs limits ni na tracersExtra tracerBlockFetch =
, Diff.P2P.daPeerSharingRegistry = nkPeerSharingRegistry nodeKernel
}

let apps = Node.applications (aDebugTracer na) nodeKernel Node.cborCodecs limits appArgs blockHeader
let apps = Node.applications (aDebugTracer na) tracerTxSubmissionInbound nodeKernel Node.cborCodecs limits appArgs blockHeader

withAsync
(Diff.P2P.runM interfaces
Expand All @@ -293,7 +297,7 @@ run blockGeneratorArgs limits ni na tracersExtra tracerBlockFetch =
<> wait nodeKernelThread
<> wait decLogicThread
where
blockFetch :: NodeKernel BlockHeader Block s m
blockFetch :: NodeKernel BlockHeader Block s txid m
-> m Void
blockFetch nodeKernel = do
blockFetchLogic
Expand All @@ -309,7 +313,7 @@ run blockGeneratorArgs limits ni na tracersExtra tracerBlockFetch =
bfcSalt = 0
})

blockFetchPolicy :: NodeKernel BlockHeader Block s m
blockFetchPolicy :: NodeKernel BlockHeader Block s txid m
-> BlockFetchConsensusInterface NtNAddr BlockHeader Block m
blockFetchPolicy nodeKernel =
BlockFetchConsensusInterface {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ import Ouroboros.Network.TxSubmission.Inbound.Policy (TxDecisionPolicy (..))
import Ouroboros.Network.TxSubmission.Inbound.Registry (SharedTxStateVar,
TxChannelsVar, withPeer)
import Ouroboros.Network.TxSubmission.Inbound.Server (txSubmissionInboundV2)
import Ouroboros.Network.TxSubmission.Inbound.Types (TraceTxSubmissionInbound)
import Ouroboros.Network.TxSubmission.Outbound (txSubmissionOutbound)
import Test.Ouroboros.Network.Diffusion.Node.NodeKernel
import Test.Ouroboros.Network.TxSubmission.Common (Mempool, Tx,
Expand Down Expand Up @@ -258,15 +259,16 @@ applications :: forall block header s m.
, RandomGen s
)
=> Tracer m String
-> NodeKernel header block s m
-> Tracer m (TraceTxSubmissionInbound Int (Tx Int))
-> NodeKernel header block s Int m
-> Codecs NtNAddr header block m
-> LimitsAndTimeouts header block
-> AppArgs header block m
-> (block -> header)
-> Diff.Applications NtNAddr NtNVersion NtNVersionData
NtCAddr NtCVersion NtCVersionData
m ()
applications debugTracer nodeKernel
applications debugTracer txSubmissionInboundTracer nodeKernel
Codecs { chainSyncCodec, blockFetchCodec
, keepAliveCodec, pingPongCodec
, peerSharingCodec
Expand Down Expand Up @@ -682,7 +684,7 @@ applications debugTracer nodeKernel
(getMempoolReader mempool)
them $ \api -> do
let server = txSubmissionInboundV2
((show . (connId,)) `contramap` debugTracer)
txSubmissionInboundTracer
(getMempoolWriter mempool)
api
labelThisThread "TxSubmissionServer"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ import Ouroboros.Network.TxSubmission.Inbound.Registry (SharedTxStateVar,
TxChannels (..), TxChannelsVar, newSharedTxStateVar)
import Test.Ouroboros.Network.Diffusion.Node.ChainDB (ChainDB (..))
import Test.Ouroboros.Network.Diffusion.Node.ChainDB qualified as ChainDB
import Test.Ouroboros.Network.TxSubmission.Common (Mempool, Tx, emptyMempool)
import Test.Ouroboros.Network.TxSubmission.Common (Mempool, Tx, newMempool)
import Test.QuickCheck (Arbitrary (..), choose, chooseInt, frequency, oneof)


Expand Down Expand Up @@ -255,7 +255,7 @@ randomBlockGenerationArgs bgaSlotDuration bgaSeed quota =
, bgaSeed
}

data NodeKernel header block s m = NodeKernel {
data NodeKernel header block s txid m = NodeKernel {
-- | upstream chains
nkClientChains
:: StrictTVar m (Map NtNAddr (StrictTVar m (Chain header))),
Expand All @@ -274,19 +274,22 @@ data NodeKernel header block s m = NodeKernel {

nkPublicPeerSelectionVar :: StrictTVar m (PublicPeerSelectionState NtNAddr),

nkMempool :: Mempool m Int,
nkMempool :: Mempool m txid,

nkTxChannelsVar :: TxChannelsVar m NtNAddr Int (Tx Int),
nkTxChannelsVar :: TxChannelsVar m NtNAddr txid (Tx txid),

nkSharedTxStateVar :: SharedTxStateVar m NtNAddr Int (Tx Int)
nkSharedTxStateVar :: SharedTxStateVar m NtNAddr txid (Tx txid)
}

newNodeKernel :: ( MonadSTM m
, Strict.MonadMVar m
, RandomGen s
, Eq txid
)
=> s -> m (NodeKernel header block s m)
newNodeKernel rng = do
=> s
-> [Tx txid]
-> m (NodeKernel header block s txid m)
newNodeKernel rng txs = do
publicStateVar <- makePublicPeerSelectionStateVar
NodeKernel
<$> newTVarIO Map.empty
Expand All @@ -298,14 +301,14 @@ newNodeKernel rng = do
ps_POLICY_PEER_SHARE_STICKY_TIME
ps_POLICY_PEER_SHARE_MAX_PEERS
<*> pure publicStateVar
<*> emptyMempool
<*> newMempool txs
<*> Strict.newMVar (TxChannels Map.empty)
<*> newSharedTxStateVar

-- | Register a new upstream chain-sync client.
--
registerClientChains :: MonadSTM m
=> NodeKernel header block s m
=> NodeKernel header block s txid m
-> NtNAddr
-> m (StrictTVar m (Chain header))
registerClientChains NodeKernel { nkClientChains } peerAddr = atomically $ do
Expand All @@ -317,7 +320,7 @@ registerClientChains NodeKernel { nkClientChains } peerAddr = atomically $ do
-- | Unregister an upstream chain-sync client.
--
unregisterClientChains :: MonadSTM m
=> NodeKernel header block s m
=> NodeKernel header block s txid m
-> NtNAddr
-> m ()
unregisterClientChains NodeKernel { nkClientChains } peerAddr = atomically $
Expand Down Expand Up @@ -370,7 +373,7 @@ instance Exception NodeKernelError where
-- | Run chain selection \/ block production thread.
--
withNodeKernelThread
:: forall block header m seed a.
:: forall block header m seed txid a.
( Alternative (STM m)
, MonadAsync m
, MonadDelay m
Expand All @@ -381,21 +384,24 @@ withNodeKernelThread
, Strict.MonadMVar m
, HasFullHeader block
, RandomGen seed
, Eq txid
)
=> BlockGeneratorArgs block seed
-> (NodeKernel header block seed m -> Async m Void -> m a)
-> [Tx txid]
-> (NodeKernel header block seed txid m -> Async m Void -> m a)
-- ^ The continuation which has a handle to the chain selection \/ block
-- production thread. The thread might throw an exception.
-> m a
withNodeKernelThread BlockGeneratorArgs { bgaSlotDuration, bgaBlockGenerator, bgaSeed }
txs
k = do
kernel <- newNodeKernel psSeed
kernel <- newNodeKernel psSeed txs
withSlotTime bgaSlotDuration $ \waitForSlot ->
withAsync (blockProducerThread kernel waitForSlot) (k kernel)
where
(bpSeed, psSeed) = split bgaSeed

blockProducerThread :: NodeKernel header block seed m
blockProducerThread :: NodeKernel header block seed txid m
-> (SlotNo -> STM m SlotNo)
-> m Void
blockProducerThread NodeKernel { nkChainProducerState, nkChainDB }
Expand Down
Loading

0 comments on commit 906fa14

Please sign in to comment.