Skip to content

Commit

Permalink
Fix V2 simulation
Browse files Browse the repository at this point in the history
  • Loading branch information
bolt12 committed Aug 12, 2024
1 parent ceb31e5 commit e1c982d
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -844,8 +844,8 @@ prop_receivedTxIds_generator (ArbReceivedTxIds _ someTxsToAck _peeraddr _ps st)
prop_acknowledgeTxIds :: ArbReceivedTxIds
-> Property
prop_acknowledgeTxIds (ArbReceivedTxIds _mempoolHasTxFun _txs _peeraddr ps st) =
case TXS.acknowledgeTxIds st ps of
(numTxIdsToAck, txs, TXS.RefCountDiff { TXS.txIdsToAck }, ps') ->
case TXS.acknowledgeTxIds undefined st ps of
(numTxIdsToAck, _, txs, TXS.RefCountDiff { TXS.txIdsToAck }, ps') ->
counterexample "number of tx ids to ack must agree with RefCountDiff"
( fromIntegral numTxIdsToAck
===
Expand Down Expand Up @@ -882,9 +882,9 @@ prop_hasTxIdsToAcknowledge
-> Property
prop_hasTxIdsToAcknowledge (ArbReceivedTxIds _mempoolHasTxFun _txs _peeraddr ps st) =
case ( TXS.hasTxIdsToAcknowledge st ps
, TXS.acknowledgeTxIds st ps
, TXS.acknowledgeTxIds undefined st ps
) of
(canAck, (numTxIdsToAck, _, _, _)) ->
(canAck, (numTxIdsToAck, _, _, _, _)) ->
canAck === (numTxIdsToAck > 0)


Expand Down Expand Up @@ -1612,8 +1612,8 @@ prop_makeDecisions_acknowledged

ackFromState :: Map PeerAddr NumTxIdsToAck
ackFromState =
Map.map (\ps -> case TXS.acknowledgeTxIds sharedTxState ps of
(a, _, _, _) -> a)
Map.map (\ps -> case TXS.acknowledgeTxIds undefined sharedTxState ps of
(a, _, _, _, _) -> a)
. peerTxStates
$ sharedTxState

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

module Ouroboros.Network.TxSubmission.Inbound.Decision
( TxDecision (..)
, emptyTxDecision
-- * Internal API exposed for testing
, makeDecisions
, filterActivePeers
Expand All @@ -28,7 +29,8 @@ import Data.Set (Set)
import Data.Set qualified as Set

import Data.Sequence.Strict qualified as StrictSeq
import Ouroboros.Network.DeltaQ (PeerGSV (..), gsvRequestResponseDuration)
import Ouroboros.Network.DeltaQ (PeerGSV (..), defaultGSV,
gsvRequestResponseDuration)
import Ouroboros.Network.Protocol.TxSubmission2.Type
import Ouroboros.Network.TxSubmission.Inbound.Policy
import Ouroboros.Network.TxSubmission.Inbound.State
Expand Down Expand Up @@ -90,6 +92,14 @@ instance Ord txid => Semigroup (TxDecision txid tx) where
txdTxsToMempool = txdTxsToMempool ++ txdTxsToMempool'
}

emptyTxDecision :: TxDecision txid tx
emptyTxDecision = TxDecision {
txdTxIdsToAcknowledge = 0,
txdTxIdsToRequest = 0,
txdPipelineTxIds = False,
txdTxsToRequest = Set.empty,
txdTxsToMempool = []
}

data SharedDecisionContext peeraddr txid tx = SharedDecisionContext {
-- TODO: check how to access it.
Expand Down Expand Up @@ -148,7 +158,10 @@ orderByDeltaQ :: forall peeraddr txid tx.
orderByDeltaQ dq =
sortOn (\(peeraddr, _) ->
gsvRequestResponseDuration
(dq Map.! peeraddr) reqSize respSize)
(Map.findWithDefault defaultGSV peeraddr dq)
reqSize
respSize
)
. Map.toList
where
-- according to calculations in `txSubmissionProtocolLimits`: sizes of
Expand All @@ -168,7 +181,7 @@ data St peeraddr txid tx =
-- ^ size of all `tx`s in-flight.

stInflight :: !(Map txid Int),
-- ^ `txid`s in-flight.
-- ^ `txid`s in-flight.

stAcknowledged :: !(Map txid Int)
-- ^ acknowledged `txid` with multiplicities. It is used to update
Expand Down Expand Up @@ -248,25 +261,32 @@ pickTxsToDownload policy@TxDecisionPolicy { txsSizeInflightPerPeer,
sizeInflightOther = sizeInflightAll - requestedTxsInflightSize

in if sizeInflightAll >= maxTxsSizeInflight
then let (numTxIdsToAck, txsToMempool, RefCountDiff { txIdsToAck }, peerTxState') =
acknowledgeTxIds sharedState peerTxState
(numTxIdsToReq, peerTxState'') = numTxIdsToRequest policy peerTxState'
then let (numTxIdsToAck, numTxIdsToReq, txsToMempool, RefCountDiff { txIdsToAck }, peerTxState') =
acknowledgeTxIds policy sharedState peerTxState

stAcknowledged' = Map.unionWith (+) stAcknowledged txIdsToAck
in
( st { stAcknowledged = stAcknowledged' }
, ( (peeraddr, peerTxState'')
, TxDecision { txdTxIdsToAcknowledge = numTxIdsToAck,
txdTxIdsToRequest = numTxIdsToReq,
txdPipelineTxIds = not
. StrictSeq.null
. unacknowledgedTxIds
$ peerTxState',
txdTxsToRequest = Set.empty,
txdTxsToMempool = txsToMempool
}
)
)
if requestedTxIdsInflight peerTxState' > 0
then
( st { stAcknowledged = stAcknowledged' }
, ( (peeraddr, peerTxState')
, TxDecision { txdTxIdsToAcknowledge = numTxIdsToAck,
txdTxIdsToRequest = numTxIdsToReq,
txdPipelineTxIds = not
. StrictSeq.null
. unacknowledgedTxIds
$ peerTxState',
txdTxsToRequest = Set.empty,
txdTxsToMempool = txsToMempool
}
)
)
else
( st
, ( (peeraddr, peerTxState')
, emptyTxDecision
)
)
else
let requestedTxsInflightSize' :: SizeInBytes
txsToRequest :: Set txid
Expand Down Expand Up @@ -321,8 +341,8 @@ pickTxsToDownload policy@TxDecisionPolicy { txsSizeInflightPerPeer,
<> txsToRequest
}

(numTxIdsToAck, txsToMempool, RefCountDiff { txIdsToAck }, peerTxState'') =
acknowledgeTxIds sharedState peerTxState'
(numTxIdsToAck, numTxIdsToReq, txsToMempool, RefCountDiff { txIdsToAck }, peerTxState'') =
acknowledgeTxIds policy sharedState peerTxState'

stAcknowledged' = Map.unionWith (+) stAcknowledged txIdsToAck

Expand All @@ -333,25 +353,32 @@ pickTxsToDownload policy@TxDecisionPolicy { txsSizeInflightPerPeer,

stInflight' :: Map txid Int
stInflight' = Map.unionWith (+) stInflightDelta stInflight

(numTxIdsToReq, peerTxState''') = numTxIdsToRequest policy peerTxState''


in ( St { stInflight = stInflight',
stInflightSize = sizeInflightOther + requestedTxsInflightSize',
stAcknowledged = stAcknowledged' }
, ( (peeraddr, peerTxState''')
, TxDecision { txdTxIdsToAcknowledge = numTxIdsToAck,
txdPipelineTxIds = not
. StrictSeq.null
. unacknowledgedTxIds
$ peerTxState''',
txdTxIdsToRequest = numTxIdsToReq,
txdTxsToRequest = txsToRequest,
txdTxsToMempool = txsToMempool
}
)
)
in
if requestedTxIdsInflight peerTxState'' > 0
then
( St { stInflight = stInflight',
stInflightSize = sizeInflightOther + requestedTxsInflightSize',
stAcknowledged = stAcknowledged' }
, ( (peeraddr, peerTxState'')
, TxDecision { txdTxIdsToAcknowledge = numTxIdsToAck,
txdPipelineTxIds = not
. StrictSeq.null
. unacknowledgedTxIds
$ peerTxState'',
txdTxIdsToRequest = numTxIdsToReq,
txdTxsToRequest = txsToRequest,
txdTxsToMempool = txsToMempool
}
)
)
else
( st { stInflight = stInflight',
stInflightSize = sizeInflightOther + requestedTxsInflightSize'
}
, ( (peeraddr, peerTxState'')
, emptyTxDecision { txdTxsToRequest = txsToRequest }
)
)

gn :: ( St peeraddr txid tx
, [((peeraddr, PeerTxState txid tx), TxDecision txid tx)]
Expand All @@ -361,8 +388,8 @@ pickTxsToDownload policy@TxDecisionPolicy { txsSizeInflightPerPeer,
)
gn
( St { stInflight,
stInflightSize,
stAcknowledged }
stInflightSize,
stAcknowledged }
, as
)
=
Expand Down Expand Up @@ -418,10 +445,10 @@ filterActivePeers
txsSizeInflightPerPeer,
maxTxsSizeInflight,
txInflightMultiplicity }
st@SharedTxState { peerTxStates,
bufferedTxs,
inflightTxs,
inflightTxsSize }
SharedTxState { peerTxStates,
bufferedTxs,
inflightTxs,
inflightTxsSize }
| overLimit
= Map.filter fn peerTxStates
| otherwise
Expand All @@ -432,22 +459,25 @@ filterActivePeers
<> Map.keysSet bufferedTxs

fn :: PeerTxState txid tx -> Bool
fn ps@PeerTxState { unacknowledgedTxIds,
requestedTxIdsInflight } =
hasTxIdsToAcknowledge st ps
|| requestedTxIdsInflight + numOfUnacked < maxUnacknowledgedTxIds
fn PeerTxState { unacknowledgedTxIds,
requestedTxIdsInflight } =
-- hasTxIdsToAcknowledge st ps ||
requestedTxIdsInflight == 0 -- document why it's not <= maxTxIdsInFlightPerPeer
&& requestedTxIdsInflight + numOfUnacked <= maxUnacknowledgedTxIds
where
numOfUnacked = fromIntegral (StrictSeq.length unacknowledgedTxIds)

gn :: PeerTxState txid tx -> Bool
gn ps@PeerTxState { unacknowledgedTxIds,
requestedTxIdsInflight,
requestedTxsInflight,
requestedTxsInflightSize,
availableTxIds,
unknownTxs } =
hasTxIdsToAcknowledge st ps
|| requestedTxIdsInflight + numOfUnacked < maxUnacknowledgedTxIds
gn PeerTxState { unacknowledgedTxIds,
requestedTxIdsInflight,
requestedTxsInflight,
requestedTxsInflightSize,
availableTxIds,
unknownTxs } =
-- hasTxIdsToAcknowledge st ps ||
( requestedTxIdsInflight == 0
&& requestedTxIdsInflight + numOfUnacked <= maxUnacknowledgedTxIds
)
|| (underSizeLimit && not (Map.null downloadable))
where
numOfUnacked = fromIntegral (StrictSeq.length unacknowledgedTxIds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import Control.Concurrent.Class.MonadMVar.Strict
import Control.Concurrent.Class.MonadSTM.Strict
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTimer.SI
import Control.Tracer (Tracer (..), traceWith)

import Data.Foldable (foldl', traverse_)
import Data.Map.Strict (Map)
Expand All @@ -31,6 +30,7 @@ import Data.Set (Set)
import Data.Set qualified as Set
import Data.Void (Void)

import Control.Tracer (Tracer, traceWith)
import Ouroboros.Network.DeltaQ (PeerGSV (..))
import Ouroboros.Network.Protocol.TxSubmission2.Type
import Ouroboros.Network.TxSubmission.Inbound.Decision
Expand Down Expand Up @@ -73,6 +73,10 @@ data PeerTxAPI m txid tx = PeerTxAPI {
}


data TraceDecision peeraddr txid tx =
TraceDecisions (Map peeraddr (TxDecision txid tx))
deriving (Eq, Show)

-- | A bracket function which registers / de-registers a new peer in
-- `SharedTxStateVar` and `PeerTxStateVar`s, which exposes `PeerTxStateAPI`.
-- `PeerTxStateAPI` is only safe inside the `withPeer` scope.
Expand Down Expand Up @@ -188,13 +192,10 @@ withPeer tracer
-> StrictSeq txid
-> Map txid SizeInBytes
-> m ()
handleReceivedTxIds numTxIdsToReq txidsSeq txidsMap = do
-- TODO: hide this inside `receivedTxIds` so it's run in the same STM
-- transaction.
mempoolSnapshot <- atomically mempoolGetSnapshot
handleReceivedTxIds numTxIdsToReq txidsSeq txidsMap =
receivedTxIds tracer
sharedStateVar
mempoolSnapshot
mempoolGetSnapshot
peeraddr
numTxIdsToReq
txidsSeq
Expand All @@ -215,6 +216,7 @@ decisionLogicThread
( MonadDelay m
, MonadMVar m
, MonadSTM m
, MonadMask m
, Ord peeraddr
, Ord txid
)
Expand Down Expand Up @@ -245,11 +247,22 @@ decisionLogicThread tracer policy gsvVar txChannelsVar sharedStateVar = go
let (sharedState, decisions) = makeDecisions policy sharedCtx activePeers
writeTVar sharedStateVar sharedState
return (decisions, sharedState)
traceWith tracer (DebugSharedTxState st)
traceWith tracer (DebugSharedTxState "decisionLogicThread" st)
TxChannels { txChannelMap } <- readMVar txChannelsVar
traverse_
(\(mvar, d) -> modifyMVar_ mvar (\d' -> pure (d' <> d)))
(\(mvar, d) -> modifyMVarWithDefault_ mvar d (\d' -> pure (d' <> d)))
(Map.intersectionWith (,)
txChannelMap
decisions)
go

-- Variant of modifyMVar_ that puts a default value if the MVar is empty.
modifyMVarWithDefault_ :: StrictMVar m a -> a -> (a -> m a) -> m ()
modifyMVarWithDefault_ m d io =
mask $ \restore -> do
mbA <- tryTakeMVar m
case mbA of
Just a -> do
a' <- restore (io a) `onException` putMVar m a
putMVar m a'
Nothing -> putMVar m d
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ txSubmissionInboundV2
-> TxDecision txid tx
-> m (ServerStIdle n txid tx m ())
serverReqTxIds
n TxDecision { txdTxIdsToAcknowledge = 0,
txdTxIdsToRequest = 0 }
n TxDecision { txdTxIdsToRequest = 0 }
=
case n of
Zero -> serverIdle
Expand Down
Loading

0 comments on commit e1c982d

Please sign in to comment.