Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 42 additions & 27 deletions src/instantsend/instantsend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,14 @@ MessageProcessingResult CInstantSendManager::ProcessMessage(NodeId from, std::st
}

LOCK(cs_pendingLocks);
pendingInstantSendLocks.emplace(hash, std::make_pair(from, islock));
pendingInstantSendLocks.emplace(hash, instantsend::PendingISLockFromPeer{from, islock});
NotifyWorker();
return ret;
}

instantsend::PendingState CInstantSendManager::ProcessPendingInstantSendLocks()
{
decltype(pendingInstantSendLocks) pend;
std::vector<std::pair<uint256, instantsend::PendingISLockFromPeer>> pend;
instantsend::PendingState ret;

if (!IsInstantSendEnabled()) {
Expand All @@ -183,14 +184,15 @@ instantsend::PendingState CInstantSendManager::ProcessPendingInstantSendLocks()
// The keys of the removed values are temporaily stored here to avoid invalidating an iterator
std::vector<uint256> removed;
removed.reserve(maxCount);
pend.reserve(maxCount);

for (const auto& [islockHash, nodeid_islptr_pair] : pendingInstantSendLocks) {
// Check if we've reached max count
if (pend.size() >= maxCount) {
ret.m_pending_work = true;
break;
}
pend.emplace(islockHash, std::move(nodeid_islptr_pair));
pend.emplace_back(islockHash, std::move(nodeid_islptr_pair));
removed.emplace_back(islockHash);
}

Expand All @@ -216,24 +218,25 @@ instantsend::PendingState CInstantSendManager::ProcessPendingInstantSendLocks()
if (!badISLocks.empty()) {
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- doing verification on old active set\n", __func__);

// filter out valid IS locks from "pend"
for (auto it = pend.begin(); it != pend.end();) {
if (!badISLocks.count(it->first)) {
it = pend.erase(it);
} else {
++it;
// filter out valid IS locks from "pend" - keep only bad ones
std::vector<std::pair<uint256, instantsend::PendingISLockFromPeer>> filteredPend;
filteredPend.reserve(badISLocks.size());
for (auto& p : pend) {
if (badISLocks.contains(p.first)) {
filteredPend.push_back(std::move(p));
}
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

linter complains about extra whitespaces here

// Now check against the previous active set and perform banning if this fails
ProcessPendingInstantSendLocks(llmq_params, dkgInterval, /*ban=*/true, pend, ret.m_peer_activity);
ProcessPendingInstantSendLocks(llmq_params, dkgInterval, /*ban=*/true, filteredPend, ret.m_peer_activity);
}

return ret;
}

Uint256HashSet CInstantSendManager::ProcessPendingInstantSendLocks(
const Consensus::LLMQParams& llmq_params, int signOffset, bool ban,
const Uint256HashMap<std::pair<NodeId, instantsend::InstantSendLockPtr>>& pend,
const std::vector<std::pair<uint256, instantsend::PendingISLockFromPeer>>& pend,
std::vector<std::pair<NodeId, MessageProcessingResult>>& peer_activity)
{
CBLSBatchVerifier<NodeId, uint256> batchVerifier(false, true, 8);
Expand All @@ -243,8 +246,8 @@ Uint256HashSet CInstantSendManager::ProcessPendingInstantSendLocks(
size_t alreadyVerified = 0;
for (const auto& p : pend) {
const auto& hash = p.first;
auto nodeId = p.second.first;
const auto& islock = p.second.second;
auto nodeId = p.second.node_id;
const auto& islock = p.second.islock;

if (batchVerifier.badSources.count(nodeId)) {
continue;
Expand Down Expand Up @@ -315,8 +318,8 @@ Uint256HashSet CInstantSendManager::ProcessPendingInstantSendLocks(
}
for (const auto& p : pend) {
const auto& hash = p.first;
auto nodeId = p.second.first;
const auto& islock = p.second.second;
auto nodeId = p.second.node_id;
const auto& islock = p.second.islock;

if (batchVerifier.badMessages.count(hash)) {
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, islock=%s: invalid sig in islock, peer=%d\n",
Expand Down Expand Up @@ -393,7 +396,7 @@ MessageProcessingResult CInstantSendManager::ProcessInstantSendLock(NodeId from,
} else {
// put it in a separate pending map and try again later
LOCK(cs_pendingLocks);
pendingNoTxInstantSendLocks.try_emplace(hash, std::make_pair(from, islock));
pendingNoTxInstantSendLocks.try_emplace(hash, instantsend::PendingISLockFromPeer{from, islock});
}

// This will also add children TXs to pendingRetryTxs
Expand Down Expand Up @@ -436,11 +439,11 @@ void CInstantSendManager::TransactionAddedToMempool(const CTransactionRef& tx)
LOCK(cs_pendingLocks);
auto it = pendingNoTxInstantSendLocks.begin();
while (it != pendingNoTxInstantSendLocks.end()) {
if (it->second.second->txid == tx->GetHash()) {
if (it->second.islock->txid == tx->GetHash()) {
// we received an islock earlier
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, islock=%s\n", __func__,
tx->GetHash().ToString(), it->first.ToString());
islock = it->second.second;
islock = it->second.islock;
pendingInstantSendLocks.try_emplace(it->first, it->second);
pendingNoTxInstantSendLocks.erase(it);
break;
Expand All @@ -458,6 +461,7 @@ void CInstantSendManager::TransactionAddedToMempool(const CTransactionRef& tx)
} else {
RemoveMempoolConflictsForLock(::SerializeHash(*islock), *islock);
}
NotifyWorker();
}

void CInstantSendManager::TransactionRemovedFromMempool(const CTransactionRef& tx)
Expand All @@ -475,6 +479,7 @@ void CInstantSendManager::TransactionRemovedFromMempool(const CTransactionRef& t
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- transaction %s was removed from mempool\n", __func__,
tx->GetHash().ToString());
RemoveConflictingLock(::SerializeHash(*islock), *islock);
NotifyWorker();
}

void CInstantSendManager::BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindex)
Expand Down Expand Up @@ -505,12 +510,14 @@ void CInstantSendManager::BlockConnected(const std::shared_ptr<const CBlock>& pb
}

db.WriteBlockInstantSendLocks(pblock, pindex);
NotifyWorker();
}

void CInstantSendManager::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock,
const CBlockIndex* pindexDisconnected)
{
db.RemoveBlockInstantSendLocks(pblock, pindexDisconnected);
NotifyWorker();
}

void CInstantSendManager::AddNonLockedTx(const CTransactionRef& tx, const CBlockIndex* pindexMined)
Expand All @@ -533,7 +540,7 @@ void CInstantSendManager::AddNonLockedTx(const CTransactionRef& tx, const CBlock
LOCK(cs_pendingLocks);
auto it = pendingNoTxInstantSendLocks.begin();
while (it != pendingNoTxInstantSendLocks.end()) {
if (it->second.second->txid == tx->GetHash()) {
if (it->second.islock->txid == tx->GetHash()) {
// we received an islock earlier, let's put it back into pending and verify/lock
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, islock=%s\n", __func__,
tx->GetHash().ToString(), it->first.ToString());
Expand All @@ -553,6 +560,7 @@ void CInstantSendManager::AddNonLockedTx(const CTransactionRef& tx, const CBlock

LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, pindexMined=%s\n", __func__,
tx->GetHash().ToString(), pindexMined ? pindexMined->GetBlockHash().ToString() : "");
NotifyWorker();
}

void CInstantSendManager::RemoveNonLockedTx(const uint256& txid, bool retryChildren)
Expand Down Expand Up @@ -593,6 +601,7 @@ void CInstantSendManager::RemoveNonLockedTx(const uint256& txid, bool retryChild

LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, retryChildren=%d, retryChildrenCount=%d\n",
__func__, txid.ToString(), retryChildren, retryChildrenCount);
NotifyWorker();
}

void CInstantSendManager::RemoveConflictedTx(const CTransaction& tx)
Expand All @@ -601,6 +610,7 @@ void CInstantSendManager::RemoveConflictedTx(const CTransaction& tx)
if (auto signer = m_signer.load(std::memory_order_acquire); signer) {
signer->ClearInputsFromQueue(GetIdsFromLockable(tx.vin));
}
NotifyWorker();
}

void CInstantSendManager::TruncateRecoveredSigsForInputs(const instantsend::InstantSendLock& islock)
Expand All @@ -620,13 +630,15 @@ void CInstantSendManager::TryEmplacePendingLock(const uint256& hash, const NodeI
if (db.KnownInstantSendLock(hash)) return;
LOCK(cs_pendingLocks);
if (!pendingInstantSendLocks.count(hash)) {
pendingInstantSendLocks.emplace(hash, std::make_pair(id, islock));
pendingInstantSendLocks.emplace(hash, instantsend::PendingISLockFromPeer{id, islock});
}
NotifyWorker();
}

void CInstantSendManager::NotifyChainLock(const CBlockIndex* pindexChainLock)
{
HandleFullyConfirmedBlock(pindexChainLock);
NotifyWorker();
}

void CInstantSendManager::UpdatedBlockTip(const CBlockIndex* pindexNew)
Expand All @@ -644,6 +656,7 @@ void CInstantSendManager::UpdatedBlockTip(const CBlockIndex* pindexNew)
if (pindex) {
HandleFullyConfirmedBlock(pindex);
}
NotifyWorker();
}

void CInstantSendManager::HandleFullyConfirmedBlock(const CBlockIndex* pindex)
Expand Down Expand Up @@ -842,11 +855,11 @@ bool CInstantSendManager::GetInstantSendLockByHash(const uint256& hash, instants
LOCK(cs_pendingLocks);
auto it = pendingInstantSendLocks.find(hash);
if (it != pendingInstantSendLocks.end()) {
islock = it->second.second;
islock = it->second.islock;
} else {
auto itNoTx = pendingNoTxInstantSendLocks.find(hash);
if (itNoTx != pendingNoTxInstantSendLocks.end()) {
islock = itNoTx->second.second;
islock = itNoTx->second.islock;
} else {
return false;
}
Expand Down Expand Up @@ -883,7 +896,7 @@ bool CInstantSendManager::IsWaitingForTx(const uint256& txHash) const
LOCK(cs_pendingLocks);
auto it = pendingNoTxInstantSendLocks.begin();
while (it != pendingNoTxInstantSendLocks.end()) {
if (it->second.second->txid == txHash) {
if (it->second.islock->txid == txHash) {
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, islock=%s\n", __func__, txHash.ToString(),
it->first.ToString());
return true;
Expand Down Expand Up @@ -920,6 +933,7 @@ size_t CInstantSendManager::GetInstantSendLockCount() const
void CInstantSendManager::WorkThreadMain(PeerManager& peerman)
{
while (!workInterrupt) {
uint64_t startEpoch = workEpoch.load(std::memory_order_acquire);
bool fMoreWork = [&]() -> bool {
if (!IsInstantSendEnabled()) return false;
auto [more_work, peer_activity] = ProcessPendingInstantSendLocks();
Expand Down Expand Up @@ -947,10 +961,11 @@ void CInstantSendManager::WorkThreadMain(PeerManager& peerman)
signer->ProcessPendingRetryLockTxs(txns);
return more_work;
}();

if (!fMoreWork && !workInterrupt.sleep_for(std::chrono::milliseconds(100))) {
return;
}
if (fMoreWork) continue;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | πŸ”΄ Critical

Critical: Busy-loop causes 100% CPU utilization.

When fMoreWork is true, the thread immediately continues without any throttling, causing a busy-loop. This explains the reported CPU increase from 5-10% to 100% in the PR comments.

Under sustained load with >32 pending locks, ProcessPendingInstantSendLocks will consistently return m_pending_work=true (line 192), causing continuous spinning.

Apply this diff to add throttling even when there's more work:

         return more_work;
     }();
-    if (fMoreWork) continue;
     std::unique_lock<Mutex> l(workMutex);
-    workCv.wait(l, [this, startEpoch]{
+    workCv.wait_for(l, std::chrono::milliseconds(fMoreWork ? 1 : 100), [this, startEpoch]{
         return bool(workInterrupt) || workEpoch.load(std::memory_order_acquire) != startEpoch;
     });

This change uses a 1ms timeout when there's more work (allowing rapid processing) and 100ms when idle (reducing unnecessary wakeups).

Committable suggestion skipped: line range outside the PR's diff.

πŸ€– Prompt for AI Agents
In src/instantsend/instantsend.cpp around line 964, the loop currently continues
immediately when fMoreWork is true causing a busy-spin and 100% CPU; modify the
control flow so that instead of an immediate continue you call a short
sleep/condition wait (e.g., 1ms) when fMoreWork is true to allow rapid
processing without spinning, and use a longer sleep/condition wait (e.g., 100ms)
when idle to reduce wakeups; implement this by replacing the unconditional
continue with a timed wait or SleepFor that uses 1ms if fMoreWork is set,
otherwise 100ms, preserving existing wakeup semantics.

std::unique_lock<Mutex> l(workMutex);
workCv.wait(l, [this, startEpoch]{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any chance that event could be missing and thread will wait forever? For example, when app is terminating.

return bool(workInterrupt) || workEpoch.load(std::memory_order_acquire) != startEpoch;
});
}
}

Expand Down
22 changes: 18 additions & 4 deletions src/instantsend/instantsend.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <atomic>
#include <unordered_map>
#include <unordered_set>
#include <vector>

class CBlockIndex;
class CChainState;
Expand All @@ -35,6 +36,11 @@ struct LLMQParams;
namespace instantsend {
class InstantSendSigner;

struct PendingISLockFromPeer {
NodeId node_id;
InstantSendLockPtr islock;
};

struct PendingState {
bool m_pending_work{false};
std::vector<std::pair<NodeId, MessageProcessingResult>> m_peer_activity{};
Expand Down Expand Up @@ -63,12 +69,16 @@ class CInstantSendManager final : public instantsend::InstantSendSignerParent

std::thread workThread;
CThreadInterrupt workInterrupt;
// Event to wake the worker thread when new work arrives
Mutex workMutex;
std::condition_variable_any workCv;
std::atomic<uint64_t> workEpoch{0};

mutable Mutex cs_pendingLocks;
// Incoming and not verified yet
Uint256HashMap<std::pair<NodeId, instantsend::InstantSendLockPtr>> pendingInstantSendLocks GUARDED_BY(cs_pendingLocks);
Uint256HashMap<instantsend::PendingISLockFromPeer> pendingInstantSendLocks GUARDED_BY(cs_pendingLocks);
// Tried to verify but there is no tx yet
Uint256HashMap<std::pair<NodeId, instantsend::InstantSendLockPtr>> pendingNoTxInstantSendLocks GUARDED_BY(cs_pendingLocks);
Uint256HashMap<instantsend::PendingISLockFromPeer> pendingNoTxInstantSendLocks GUARDED_BY(cs_pendingLocks);

// TXs which are neither IS locked nor ChainLocked. We use this to determine for which TXs we need to retry IS
// locking of child TXs
Expand Down Expand Up @@ -104,14 +114,14 @@ class CInstantSendManager final : public instantsend::InstantSendSignerParent

void Start(PeerManager& peerman);
void Stop();
void InterruptWorkerThread() { workInterrupt(); };
void InterruptWorkerThread() { workInterrupt(); workCv.notify_all(); };

private:
instantsend::PendingState ProcessPendingInstantSendLocks()
EXCLUSIVE_LOCKS_REQUIRED(!cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry);

Uint256HashSet ProcessPendingInstantSendLocks(const Consensus::LLMQParams& llmq_params, int signOffset, bool ban,
const Uint256HashMap<std::pair<NodeId, instantsend::InstantSendLockPtr>>& pend,
const std::vector<std::pair<uint256, instantsend::PendingISLockFromPeer>>& pend,
std::vector<std::pair<NodeId, MessageProcessingResult>>& peer_activity)
EXCLUSIVE_LOCKS_REQUIRED(!cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry);
MessageProcessingResult ProcessInstantSendLock(NodeId from, const uint256& hash,
Expand All @@ -133,6 +143,10 @@ class CInstantSendManager final : public instantsend::InstantSendSignerParent

void WorkThreadMain(PeerManager& peerman)
EXCLUSIVE_LOCKS_REQUIRED(!cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry);
void NotifyWorker() {
workEpoch.fetch_add(1, std::memory_order_acq_rel);
workCv.notify_one();
}

void HandleFullyConfirmedBlock(const CBlockIndex* pindex)
EXCLUSIVE_LOCKS_REQUIRED(!cs_nonLocked, !cs_pendingRetry);
Expand Down
Loading
Loading