Skip to content

Commit c1215d0

Browse files
ximinezvlntb
andcommitted
Reduce duplicate peer traffic for ledger data (#5126)
- Drop duplicate outgoing TMGetLedger messages per peer - Allow a retry after 30s in case of peer or network congestion. - Addresses RIPD-1870 - (Changes levelization. That is not desirable, and will need to be fixed.) - Drop duplicate incoming TMGetLedger messages per peer - Allow a retry after 15s in case of peer or network congestion. - The requestCookie is ignored when computing the hash, thus increasing the chances of detecting duplicate messages. - With duplicate messages, keep track of the different requestCookies (or lack of cookie). When work is finally done for a given request, send the response to all the peers that are waiting on the request, sending one message per peer, including all the cookies and a "directResponse" flag indicating the data is intended for the sender, too. - Addresses RIPD-1871 - Drop duplicate incoming TMLedgerData messages - Addresses RIPD-1869 - Improve logging related to ledger acquisition - Class "CanProcess" to keep track of processing of distinct items --------- Co-authored-by: Valentin Balaschenko <[email protected]>
1 parent 6cf37c4 commit c1215d0

26 files changed

+1012
-142
lines changed

include/xrpl/basics/CanProcess.h

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
//------------------------------------------------------------------------------
2+
/*
3+
This file is part of rippled: https://github.com/ripple/rippled
4+
Copyright (c) 2024 Ripple Labs Inc.
5+
6+
Permission to use, copy, modify, and/or distribute this software for any
7+
purpose with or without fee is hereby granted, provided that the above
8+
copyright notice and this permission notice appear in all copies.
9+
10+
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11+
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12+
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13+
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14+
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15+
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16+
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17+
*/
18+
//==============================================================================
19+
20+
#ifndef RIPPLE_BASICS_CANPROCESS_H_INCLUDED
21+
#define RIPPLE_BASICS_CANPROCESS_H_INCLUDED
22+
23+
#include <functional>
24+
#include <mutex>
25+
#include <set>
26+
27+
/** RAII class to check if an Item is already being processed on another thread,
28+
* as indicated by it's presence in a Collection.
29+
*
30+
* If the Item is not in the Collection, it will be added under lock in the
31+
* ctor, and removed under lock in the dtor. The object will be considered
32+
* "usable" and evaluate to `true`.
33+
*
34+
* If the Item is in the Collection, no changes will be made to the collection,
35+
* and the CanProcess object will be considered "unusable".
36+
*
37+
* It's up to the caller to decide what "usable" and "unusable" mean. (e.g.
38+
* Process or skip a block of code, or set a flag.)
39+
*
40+
* The current use is to avoid lock contention that would be involved in
41+
* processing something associated with the Item.
42+
*
43+
* Examples:
44+
*
45+
* void IncomingLedgers::acquireAsync(LedgerHash const& hash, ...)
46+
* {
47+
* if (CanProcess check{acquiresMutex_, pendingAcquires_, hash})
48+
* {
49+
* acquire(hash, ...);
50+
* }
51+
* }
52+
*
53+
* bool
54+
* NetworkOPsImp::recvValidation(
55+
* std::shared_ptr<STValidation> const& val,
56+
* std::string const& source)
57+
* {
58+
* CanProcess check(
59+
* validationsMutex_, pendingValidations_, val->getLedgerHash());
60+
* BypassAccept bypassAccept =
61+
* check ? BypassAccept::no : BypassAccept::yes;
62+
* handleNewValidation(app_, val, source, bypassAccept, m_journal);
63+
* }
64+
*
65+
*/
66+
class CanProcess
67+
{
68+
public:
69+
template <class Mutex, class Collection, class Item>
70+
CanProcess(Mutex& mtx, Collection& collection, Item const& item)
71+
: cleanup_(insert(mtx, collection, item))
72+
{
73+
}
74+
75+
~CanProcess()
76+
{
77+
if (cleanup_)
78+
cleanup_();
79+
}
80+
81+
explicit
82+
operator bool() const
83+
{
84+
return static_cast<bool>(cleanup_);
85+
}
86+
87+
private:
88+
template <bool useIterator, class Mutex, class Collection, class Item>
89+
std::function<void()>
90+
doInsert(Mutex& mtx, Collection& collection, Item const& item)
91+
{
92+
std::unique_lock<Mutex> lock(mtx);
93+
// TODO: Use structured binding once LLVM 16 is the minimum supported
94+
// version. See also: https://github.com/llvm/llvm-project/issues/48582
95+
// https://github.com/llvm/llvm-project/commit/127bf44385424891eb04cff8e52d3f157fc2cb7c
96+
auto const insertResult = collection.insert(item);
97+
auto const it = insertResult.first;
98+
if (!insertResult.second)
99+
return {};
100+
if constexpr (useIterator)
101+
return [&, it]() {
102+
std::unique_lock<Mutex> lock(mtx);
103+
collection.erase(it);
104+
};
105+
else
106+
return [&]() {
107+
std::unique_lock<Mutex> lock(mtx);
108+
collection.erase(item);
109+
};
110+
}
111+
112+
// Generic insert() function doesn't use iterators because they may get
113+
// invalidated
114+
template <class Mutex, class Collection, class Item>
115+
std::function<void()>
116+
insert(Mutex& mtx, Collection& collection, Item const& item)
117+
{
118+
return doInsert<false>(mtx, collection, item);
119+
}
120+
121+
// Specialize insert() for std::set, which does not invalidate iterators for
122+
// insert and erase
123+
template <class Mutex, class Item>
124+
std::function<void()>
125+
insert(Mutex& mtx, std::set<Item>& collection, Item const& item)
126+
{
127+
return doInsert<true>(mtx, collection, item);
128+
}
129+
130+
// If set, then the item is "usable"
131+
std::function<void()> cleanup_;
132+
};
133+
134+
#endif

include/xrpl/basics/base_uint.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -632,6 +632,13 @@ to_string(base_uint<Bits, Tag> const& a)
632632
return strHex(a.cbegin(), a.cend());
633633
}
634634

635+
template <std::size_t Bits, class Tag>
636+
inline std::string
637+
to_short_string(base_uint<Bits, Tag> const& a)
638+
{
639+
return strHex(a.cbegin(), a.cend()).substr(0, 8) + "...";
640+
}
641+
635642
template <std::size_t Bits, class Tag>
636643
inline std::ostream&
637644
operator<<(std::ostream& out, base_uint<Bits, Tag> const& u)

include/xrpl/proto/ripple.proto

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,8 +321,18 @@ message TMLedgerData
321321
required uint32 ledgerSeq = 2;
322322
required TMLedgerInfoType type = 3;
323323
repeated TMLedgerNode nodes = 4;
324+
// If the peer supports "responseCookies", this field will
325+
// never be populated.
324326
optional uint32 requestCookie = 5;
325327
optional TMReplyError error = 6;
328+
// The old field is called "requestCookie", but this is
329+
// a response, so this name makes more sense
330+
repeated uint32 responseCookies = 7;
331+
// If a TMGetLedger request was received without a "requestCookie",
332+
// and the peer supports it, this flag will be set to true to
333+
// indicate that the receiver should process the result in addition
334+
// to forwarding it to its "responseCookies" peers.
335+
optional bool directResponse = 8;
326336
}
327337

328338
message TMPing

include/xrpl/protocol/LedgerHeader.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ struct LedgerHeader
5555

5656
// If validated is false, it means "not yet validated."
5757
// Once validated is true, it will never be set false at a later time.
58+
// NOTE: If you are accessing this directly, you are probably doing it
59+
// wrong. Use LedgerMaster::isValidated().
5860
// VFALCO TODO Make this not mutable
5961
bool mutable validated = false;
6062
bool accepted = false;

src/test/app/HashRouter_test.cpp

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,33 @@ class HashRouter_test : public beast::unit_test::suite
243243
BEAST_EXPECT(router.shouldProcess(key, peer, flags, 1s));
244244
}
245245

246+
void
247+
testProcessPeer()
248+
{
249+
using namespace std::chrono_literals;
250+
TestStopwatch stopwatch;
251+
HashRouter router(stopwatch, 5s);
252+
uint256 const key(1);
253+
HashRouter::PeerShortID peer1 = 1;
254+
HashRouter::PeerShortID peer2 = 2;
255+
auto const timeout = 2s;
256+
257+
BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout));
258+
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer1, timeout));
259+
++stopwatch;
260+
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer1, timeout));
261+
BEAST_EXPECT(router.shouldProcessForPeer(key, peer2, timeout));
262+
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout));
263+
++stopwatch;
264+
BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout));
265+
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout));
266+
++stopwatch;
267+
BEAST_EXPECT(router.shouldProcessForPeer(key, peer2, timeout));
268+
++stopwatch;
269+
BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout));
270+
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout));
271+
}
272+
246273
public:
247274
void
248275
run() override
@@ -253,6 +280,7 @@ class HashRouter_test : public beast::unit_test::suite
253280
testSetFlags();
254281
testRelay();
255282
testProcess();
283+
testProcessPeer();
256284
}
257285
};
258286

src/test/app/LedgerReplay_test.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,11 @@ class TestPeer : public Peer
324324
{
325325
return false;
326326
}
327+
std::set<std::optional<uint64_t>>
328+
releaseRequestCookies(uint256 const& requestHash) override
329+
{
330+
return {};
331+
}
327332

328333
bool ledgerReplayEnabled_;
329334
PublicKey nodePublicKey_;

src/test/basics/base_uint_test.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ struct base_uint_test : beast::unit_test::suite
152152
uset.insert(u);
153153
BEAST_EXPECT(raw.size() == u.size());
154154
BEAST_EXPECT(to_string(u) == "0102030405060708090A0B0C");
155+
BEAST_EXPECT(to_short_string(u) == "01020304...");
155156
BEAST_EXPECT(*u.data() == 1);
156157
BEAST_EXPECT(u.signum() == 1);
157158
BEAST_EXPECT(!!u);
@@ -174,6 +175,7 @@ struct base_uint_test : beast::unit_test::suite
174175
test96 v{~u};
175176
uset.insert(v);
176177
BEAST_EXPECT(to_string(v) == "FEFDFCFBFAF9F8F7F6F5F4F3");
178+
BEAST_EXPECT(to_short_string(v) == "FEFDFCFB...");
177179
BEAST_EXPECT(*v.data() == 0xfe);
178180
BEAST_EXPECT(v.signum() == 1);
179181
BEAST_EXPECT(!!v);
@@ -194,6 +196,7 @@ struct base_uint_test : beast::unit_test::suite
194196
test96 z{beast::zero};
195197
uset.insert(z);
196198
BEAST_EXPECT(to_string(z) == "000000000000000000000000");
199+
BEAST_EXPECT(to_short_string(z) == "00000000...");
197200
BEAST_EXPECT(*z.data() == 0);
198201
BEAST_EXPECT(*z.begin() == 0);
199202
BEAST_EXPECT(*std::prev(z.end(), 1) == 0);
@@ -214,6 +217,7 @@ struct base_uint_test : beast::unit_test::suite
214217
BEAST_EXPECT(n == z);
215218
n--;
216219
BEAST_EXPECT(to_string(n) == "FFFFFFFFFFFFFFFFFFFFFFFF");
220+
BEAST_EXPECT(to_short_string(n) == "FFFFFFFF...");
217221
n = beast::zero;
218222
BEAST_EXPECT(n == z);
219223

@@ -224,6 +228,7 @@ struct base_uint_test : beast::unit_test::suite
224228
test96 x{zm1 ^ zp1};
225229
uset.insert(x);
226230
BEAST_EXPECTS(to_string(x) == "FFFFFFFFFFFFFFFFFFFFFFFE", to_string(x));
231+
BEAST_EXPECTS(to_short_string(x) == "FFFFFFFF...", to_short_string(x));
227232

228233
BEAST_EXPECT(uset.size() == 4);
229234

src/test/overlay/ProtocolVersion_test.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,8 @@ class ProtocolVersion_test : public beast::unit_test::suite
8888
negotiateProtocolVersion("XRPL/2.2") == make_protocol(2, 2));
8989
BEAST_EXPECT(
9090
negotiateProtocolVersion(
91-
"RTXP/1.2, XRPL/2.2, XRPL/2.3, XRPL/999.999") ==
92-
make_protocol(2, 2));
91+
"RTXP/1.2, XRPL/2.2, XRPL/2.3, XRPL/2.4, XRPL/999.999") ==
92+
make_protocol(2, 3));
9393
BEAST_EXPECT(
9494
negotiateProtocolVersion("XRPL/999.999, WebSocket/1.0") ==
9595
std::nullopt);

src/test/overlay/reduce_relay_test.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,11 @@ class PeerPartial : public Peer
186186
removeTxQueue(const uint256&) override
187187
{
188188
}
189+
std::set<std::optional<uint64_t>>
190+
releaseRequestCookies(uint256 const& requestHash) override
191+
{
192+
return {};
193+
}
189194
};
190195

191196
/** Manually advanced clock. */

src/xrpld/app/consensus/RCLConsensus.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1082,7 +1082,8 @@ void
10821082
RCLConsensus::Adaptor::updateOperatingMode(std::size_t const positions) const
10831083
{
10841084
if (!positions && app_.getOPs().isFull())
1085-
app_.getOPs().setMode(OperatingMode::CONNECTED);
1085+
app_.getOPs().setMode(
1086+
OperatingMode::CONNECTED, "updateOperatingMode: no positions");
10861087
}
10871088

10881089
void

0 commit comments

Comments
 (0)