Skip to content

Commit

Permalink
WIP: Working ranking of peers.
Browse files Browse the repository at this point in the history
DeltaQ metrics is only available for our warm and hot peers that also
have us as hot. So a fraction of all downstream clients will have a
metric.

This change the ranking of peers to use simple scoring system. Deliver a
new TX before in time before it gets into the block gives you one point.
Delivering a TXs thats already in the mempool, is invalid, or fail
because it was included in a recent blocks gives you a penalty.
  • Loading branch information
karknu committed Oct 7, 2024
1 parent b87ebf6 commit 4a99a6e
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,8 @@ mkArbPeerTxState mempoolHasTxFun txIdsInflight unacked txMaskMap =
requestedTxIdsInflight,
requestedTxsInflight,
requestedTxsInflightSize,
unknownTxs }
unknownTxs,
rejectedTxs = 0 }
(Set.fromList $ Map.elems inflightMap)
bufferedMap
where
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import Control.Exception (assert)
import Control.Monad (unless)
import Control.Monad.Class.MonadSTM
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime.SI
import Control.Monad.Class.MonadTimer.SI
import Control.Tracer (Tracer, traceWith)

Expand Down Expand Up @@ -314,13 +315,18 @@ txSubmissionInbound tracer (NumTxIdsToAck maxUnacked) mpReader mpWriter _version
traceWith tracer $
TraceTxSubmissionCollected collected

!start <- getMonotonicTime
txidsAccepted <- mempoolAddTxs txsReady

!end <- getMonotonicTime
let duration = diffTime end start
traceWith tracer $
TraceTxInboundAddedToMempool txidsAccepted duration
let !accepted = length txidsAccepted

traceWith tracer $ TraceTxSubmissionProcessed ProcessedTxCount {
ptxcAccepted = accepted
, ptxcRejected = collected - accepted
, ptxcScore = 0 -- This implementatin does not track score
}

continueWithStateM (serverIdle n) st {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,27 +59,40 @@ makeDecisions
, Map peeraddr (TxDecision txid tx)
)
makeDecisions policy SharedDecisionContext {
sdcPeerGSV = peerGSV,
sdcPeerGSV = _peerGSV,
sdcSharedTxState = st
}
= fn
. pickTxsToDownload policy st
. orderByDeltaQ peerGSV
. orderByRejections
where
fn :: forall a.
(a, [(peeraddr, TxDecision txid tx)])
-> (a, Map peeraddr (TxDecision txid tx))
fn (a, as) = (a, Map.fromList as)


-- | Order peers by how useful the TXs they have provided are.
--
-- TXs delivered late will fail to apply because they where included in
-- a recently adopted block. Peers can race against each other by setting
-- `txInflightMultiplicity` to > 1.
--
-- TODO: Should not depend on plain `peeraddr` as a tie breaker.
orderByRejections :: Map peeraddr (PeerTxState txid tx)
-> [ (peeraddr, PeerTxState txid tx)]
orderByRejections =
sortOn (\(_peeraddr, ps) -> rejectedTxs ps)
. Map.toList

-- | Order peers by `DeltaQ`.
--
orderByDeltaQ :: forall peeraddr txid tx.
_orderByDeltaQ :: forall peeraddr txid tx.
Ord peeraddr
=> Map peeraddr PeerGSV
-> Map peeraddr (PeerTxState txid tx)
-> [(peeraddr, PeerTxState txid tx)]
orderByDeltaQ dq =
_orderByDeltaQ dq =
sortOn (\(peeraddr, _) ->
gsvRequestResponseDuration
(Map.findWithDefault defaultGSV peeraddr dq)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,5 @@ defaultTxDecisionPolicy =
maxUnacknowledgedTxIds = 10, -- must be the same as txSubmissionMaxUnacked
txsSizeInflightPerPeer = max_TX_SIZE * 6,
maxTxsSizeInflight = max_TX_SIZE * 20,
txInflightMultiplicity = 1
txInflightMultiplicity = 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,11 @@ data PeerTxAPI m txid tx = PeerTxAPI {
-- ^ requested txids
-> Map txid tx
-- ^ received txs
-> m ()
-> m (),
-- ^ handle received txs

countRejectedTxs :: Int
-> m Int
}


Expand Down Expand Up @@ -123,7 +126,8 @@ withPeer tracer
( TxChannels { txChannelMap = txChannelMap' }
, PeerTxAPI { readTxDecision = takeMVar chann',
handleReceivedTxIds,
handleReceivedTxs }
handleReceivedTxs,
countRejectedTxs }
)

atomically $ modifyTVar sharedStateVar registerPeer
Expand Down Expand Up @@ -151,7 +155,8 @@ withPeer tracer
requestedTxsInflightSize = 0,
requestedTxsInflight = Set.empty,
unacknowledgedTxIds = StrictSeq.empty,
unknownTxs = Set.empty }
unknownTxs = Set.empty,
rejectedTxs = 0 }
peerTxStates
}

Expand Down Expand Up @@ -213,6 +218,22 @@ withPeer tracer
handleReceivedTxs txids txs =
collectTxs tracer sharedStateVar peeraddr txids txs

countRejectedTxs :: Int
-> m Int
countRejectedTxs n = atomically $ do
modifyTVar sharedStateVar cntRejects
st <- readTVar sharedStateVar
case Map.lookup peeraddr (peerTxStates st) of
Nothing -> error "missing peer updated"
Just ps -> return $ rejectedTxs ps
where
cntRejects :: SharedTxState peeraddr txid tx
-> SharedTxState peeraddr txid tx
cntRejects st@SharedTxState { peerTxStates } =
let peerTxStates' = Map.update (\ps -> Just $! ps { rejectedTxs = min 42 (max (-42) (rejectedTxs ps + n)) }) peeraddr peerTxStates in
st {peerTxStates = peerTxStates'}



decisionLogicThread
:: forall m peeraddr txid tx.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import Control.Tracer (Tracer, traceWith)

import Network.TypedProtocol.Pipelined

import Control.Monad (when, unless)
import Control.Monad (unless)
import Ouroboros.Network.Protocol.TxSubmission2.Server
import Ouroboros.Network.TxSubmission.Inbound.Registry (PeerTxAPI (..))
import Ouroboros.Network.TxSubmission.Inbound.Types
import Ouroboros.Network.TxSubmission.Mempool.Reader

-- | Flag to enable/disable the usage of the new tx submission protocol
--
Expand All @@ -48,19 +49,24 @@ txSubmissionInboundV2
, Ord txid
)
=> Tracer m (TraceTxSubmissionInbound txid tx)
-> TxSubmissionMempoolReader txid tx idx m
-> TxSubmissionMempoolWriter txid tx idx m
-> PeerTxAPI m txid tx
-> TxSubmissionServerPipelined txid tx m ()
txSubmissionInboundV2
tracer
TxSubmissionMempoolReader{
mempoolGetSnapshot
}
TxSubmissionMempoolWriter {
txId,
mempoolAddTxs
}
PeerTxAPI {
readTxDecision,
handleReceivedTxIds,
handleReceivedTxs
handleReceivedTxs,
countRejectedTxs
}
=
TxSubmissionServerPipelined serverIdle
Expand Down Expand Up @@ -186,21 +192,32 @@ txSubmissionInboundV2
handleReceivedTxs requested received

let !collected = Map.size received
when (collected > 0) $ do
!start <- getMonotonicTime
txidsAccepted <- mempoolAddTxs $ Map.elems received
!end <- getMonotonicTime
let duration = diffTime end start

traceWith tracer $
TraceTxInboundAddedToMempool txidsAccepted duration
let !accepted = length txidsAccepted
let !rejected = collected - accepted
traceWith tracer $
TraceTxSubmissionCollected collected

traceWith tracer $ TraceTxSubmissionProcessed ProcessedTxCount {
ptxcAccepted = accepted
, ptxcRejected = rejected
}
mpSnapshot <- atomically mempoolGetSnapshot

-- Note that checking if the mempool contains a TX before
-- spending several ms attempting to add it to the pool has
-- been judged immoral.
let fresh = Map.filterWithKey
(\txid _ -> not $ mempoolHasTx mpSnapshot txid)
received

!start <- getMonotonicTime
txidsAccepted <- mempoolAddTxs $ Map.elems fresh
!end <- getMonotonicTime
let duration = diffTime end start

traceWith tracer $
TraceTxInboundAddedToMempool txidsAccepted duration
let !accepted = length txidsAccepted
let !rejected = collected - accepted
traceWith tracer $
TraceTxSubmissionCollected collected

s <- countRejectedTxs (rejected - accepted) -- accepted TXs are discounted
traceWith tracer $ TraceTxSubmissionProcessed ProcessedTxCount {
ptxcAccepted = accepted
, ptxcRejected = rejected
, ptxcScore = s
}

k
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ data PeerTxState txid tx = PeerTxState {
-- since that could potentially lead to corrupting the node, not being
-- able to download a `tx` which is needed & available from other nodes.
--
unknownTxs :: !(Set txid)
unknownTxs :: !(Set txid),

rejectedTxs :: Int
}
deriving (Eq, Show, Generic)

Expand Down Expand Up @@ -253,6 +255,7 @@ data ProcessedTxCount = ProcessedTxCount {
ptxcAccepted :: Int
-- | Just rejected this many transactions.
, ptxcRejected :: Int
, ptxcScore :: Int
}
deriving (Eq, Show)

Expand Down

0 comments on commit 4a99a6e

Please sign in to comment.