Skip to content

Commit 7dc9efd

Browse files
ximinezvlntb
andcommitted
Reduce duplicate peer traffic for ledger data (#5126)
- Drop duplicate outgoing TMGetLedger messages per peer - Allow a retry after 30s in case of peer or network congestion. - Addresses RIPD-1870 - (Changes levelization. That is not desirable, and will need to be fixed.) - Drop duplicate incoming TMGetLedger messages per peer - Allow a retry after 15s in case of peer or network congestion. - The requestCookie is ignored when computing the hash, thus increasing the chances of detecting duplicate messages. - With duplicate messages, keep track of the different requestCookies (or lack of cookie). When work is finally done for a given request, send the response to all the peers that are waiting on the request, sending one message per peer, including all the cookies and a "directResponse" flag indicating the data is intended for the sender, too. - Addresses RIPD-1871 - Drop duplicate incoming TMLedgerData messages - Addresses RIPD-1869 - Improve logging related to ledger acquisition - Class "CanProcess" to keep track of processing of distinct items --------- Co-authored-by: Valentin Balaschenko <[email protected]>
1 parent 2216e5a commit 7dc9efd

27 files changed

+1013
-143
lines changed

Builds/levelization/results/loops.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ Loop: xrpld.app xrpld.net
1414
xrpld.app > xrpld.net
1515

1616
Loop: xrpld.app xrpld.overlay
17-
xrpld.overlay == xrpld.app
17+
xrpld.overlay ~= xrpld.app
1818

1919
Loop: xrpld.app xrpld.peerfinder
2020
xrpld.app > xrpld.peerfinder

include/xrpl/basics/CanProcess.h

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
//------------------------------------------------------------------------------
2+
/*
3+
This file is part of rippled: https://github.com/ripple/rippled
4+
Copyright (c) 2024 Ripple Labs Inc.
5+
6+
Permission to use, copy, modify, and/or distribute this software for any
7+
purpose with or without fee is hereby granted, provided that the above
8+
copyright notice and this permission notice appear in all copies.
9+
10+
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11+
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12+
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13+
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14+
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15+
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16+
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17+
*/
18+
//==============================================================================
19+
20+
#ifndef RIPPLE_BASICS_CANPROCESS_H_INCLUDED
21+
#define RIPPLE_BASICS_CANPROCESS_H_INCLUDED
22+
23+
#include <functional>
24+
#include <mutex>
25+
#include <set>
26+
27+
/** RAII class to check if an Item is already being processed on another thread,
28+
* as indicated by it's presence in a Collection.
29+
*
30+
* If the Item is not in the Collection, it will be added under lock in the
31+
* ctor, and removed under lock in the dtor. The object will be considered
32+
* "usable" and evaluate to `true`.
33+
*
34+
* If the Item is in the Collection, no changes will be made to the collection,
35+
* and the CanProcess object will be considered "unusable".
36+
*
37+
* It's up to the caller to decide what "usable" and "unusable" mean. (e.g.
38+
* Process or skip a block of code, or set a flag.)
39+
*
40+
* The current use is to avoid lock contention that would be involved in
41+
* processing something associated with the Item.
42+
*
43+
* Examples:
44+
*
45+
* void IncomingLedgers::acquireAsync(LedgerHash const& hash, ...)
46+
* {
47+
* if (CanProcess check{acquiresMutex_, pendingAcquires_, hash})
48+
* {
49+
* acquire(hash, ...);
50+
* }
51+
* }
52+
*
53+
* bool
54+
* NetworkOPsImp::recvValidation(
55+
* std::shared_ptr<STValidation> const& val,
56+
* std::string const& source)
57+
* {
58+
* CanProcess check(
59+
* validationsMutex_, pendingValidations_, val->getLedgerHash());
60+
* BypassAccept bypassAccept =
61+
* check ? BypassAccept::no : BypassAccept::yes;
62+
* handleNewValidation(app_, val, source, bypassAccept, m_journal);
63+
* }
64+
*
65+
*/
66+
class CanProcess
67+
{
68+
public:
69+
template <class Mutex, class Collection, class Item>
70+
CanProcess(Mutex& mtx, Collection& collection, Item const& item)
71+
: cleanup_(insert(mtx, collection, item))
72+
{
73+
}
74+
75+
~CanProcess()
76+
{
77+
if (cleanup_)
78+
cleanup_();
79+
}
80+
81+
explicit
82+
operator bool() const
83+
{
84+
return static_cast<bool>(cleanup_);
85+
}
86+
87+
private:
88+
template <bool useIterator, class Mutex, class Collection, class Item>
89+
std::function<void()>
90+
doInsert(Mutex& mtx, Collection& collection, Item const& item)
91+
{
92+
std::unique_lock<Mutex> lock(mtx);
93+
// TODO: Use structured binding once LLVM 16 is the minimum supported
94+
// version. See also: https://github.com/llvm/llvm-project/issues/48582
95+
// https://github.com/llvm/llvm-project/commit/127bf44385424891eb04cff8e52d3f157fc2cb7c
96+
auto const insertResult = collection.insert(item);
97+
auto const it = insertResult.first;
98+
if (!insertResult.second)
99+
return {};
100+
if constexpr (useIterator)
101+
return [&, it]() {
102+
std::unique_lock<Mutex> lock(mtx);
103+
collection.erase(it);
104+
};
105+
else
106+
return [&]() {
107+
std::unique_lock<Mutex> lock(mtx);
108+
collection.erase(item);
109+
};
110+
}
111+
112+
// Generic insert() function doesn't use iterators because they may get
113+
// invalidated
114+
template <class Mutex, class Collection, class Item>
115+
std::function<void()>
116+
insert(Mutex& mtx, Collection& collection, Item const& item)
117+
{
118+
return doInsert<false>(mtx, collection, item);
119+
}
120+
121+
// Specialize insert() for std::set, which does not invalidate iterators for
122+
// insert and erase
123+
template <class Mutex, class Item>
124+
std::function<void()>
125+
insert(Mutex& mtx, std::set<Item>& collection, Item const& item)
126+
{
127+
return doInsert<true>(mtx, collection, item);
128+
}
129+
130+
// If set, then the item is "usable"
131+
std::function<void()> cleanup_;
132+
};
133+
134+
#endif

include/xrpl/basics/base_uint.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -631,6 +631,13 @@ to_string(base_uint<Bits, Tag> const& a)
631631
return strHex(a.cbegin(), a.cend());
632632
}
633633

634+
template <std::size_t Bits, class Tag>
635+
inline std::string
636+
to_short_string(base_uint<Bits, Tag> const& a)
637+
{
638+
return strHex(a.cbegin(), a.cend()).substr(0, 8) + "...";
639+
}
640+
634641
template <std::size_t Bits, class Tag>
635642
inline std::ostream&
636643
operator<<(std::ostream& out, base_uint<Bits, Tag> const& u)

include/xrpl/proto/ripple.proto

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,8 +321,18 @@ message TMLedgerData
321321
required uint32 ledgerSeq = 2;
322322
required TMLedgerInfoType type = 3;
323323
repeated TMLedgerNode nodes = 4;
324+
// If the peer supports "responseCookies", this field will
325+
// never be populated.
324326
optional uint32 requestCookie = 5;
325327
optional TMReplyError error = 6;
328+
// The old field is called "requestCookie", but this is
329+
// a response, so this name makes more sense
330+
repeated uint32 responseCookies = 7;
331+
// If a TMGetLedger request was received without a "requestCookie",
332+
// and the peer supports it, this flag will be set to true to
333+
// indicate that the receiver should process the result in addition
334+
// to forwarding it to its "responseCookies" peers.
335+
optional bool directResponse = 8;
326336
}
327337

328338
message TMPing

include/xrpl/protocol/LedgerHeader.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ struct LedgerHeader
5555

5656
// If validated is false, it means "not yet validated."
5757
// Once validated is true, it will never be set false at a later time.
58+
// NOTE: If you are accessing this directly, you are probably doing it
59+
// wrong. Use LedgerMaster::isValidated().
5860
// VFALCO TODO Make this not mutable
5961
bool mutable validated = false;
6062
bool accepted = false;

src/test/app/HashRouter_test.cpp

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,33 @@ class HashRouter_test : public beast::unit_test::suite
242242
BEAST_EXPECT(router.shouldProcess(key, peer, flags, 1s));
243243
}
244244

245+
void
246+
testProcessPeer()
247+
{
248+
using namespace std::chrono_literals;
249+
TestStopwatch stopwatch;
250+
HashRouter router(stopwatch, 5s);
251+
uint256 const key(1);
252+
HashRouter::PeerShortID peer1 = 1;
253+
HashRouter::PeerShortID peer2 = 2;
254+
auto const timeout = 2s;
255+
256+
BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout));
257+
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer1, timeout));
258+
++stopwatch;
259+
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer1, timeout));
260+
BEAST_EXPECT(router.shouldProcessForPeer(key, peer2, timeout));
261+
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout));
262+
++stopwatch;
263+
BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout));
264+
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout));
265+
++stopwatch;
266+
BEAST_EXPECT(router.shouldProcessForPeer(key, peer2, timeout));
267+
++stopwatch;
268+
BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout));
269+
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout));
270+
}
271+
245272
public:
246273
void
247274
run() override
@@ -252,6 +279,7 @@ class HashRouter_test : public beast::unit_test::suite
252279
testSetFlags();
253280
testRelay();
254281
testProcess();
282+
testProcessPeer();
255283
}
256284
};
257285

src/test/app/LedgerReplay_test.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,11 @@ class TestPeer : public Peer
322322
{
323323
return false;
324324
}
325+
std::set<std::optional<uint64_t>>
326+
releaseRequestCookies(uint256 const& requestHash) override
327+
{
328+
return {};
329+
}
325330

326331
bool ledgerReplayEnabled_;
327332
PublicKey nodePublicKey_;

src/test/basics/base_uint_test.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ struct base_uint_test : beast::unit_test::suite
151151
uset.insert(u);
152152
BEAST_EXPECT(raw.size() == u.size());
153153
BEAST_EXPECT(to_string(u) == "0102030405060708090A0B0C");
154+
BEAST_EXPECT(to_short_string(u) == "01020304...");
154155
BEAST_EXPECT(*u.data() == 1);
155156
BEAST_EXPECT(u.signum() == 1);
156157
BEAST_EXPECT(!!u);
@@ -173,6 +174,7 @@ struct base_uint_test : beast::unit_test::suite
173174
test96 v{~u};
174175
uset.insert(v);
175176
BEAST_EXPECT(to_string(v) == "FEFDFCFBFAF9F8F7F6F5F4F3");
177+
BEAST_EXPECT(to_short_string(v) == "FEFDFCFB...");
176178
BEAST_EXPECT(*v.data() == 0xfe);
177179
BEAST_EXPECT(v.signum() == 1);
178180
BEAST_EXPECT(!!v);
@@ -193,6 +195,7 @@ struct base_uint_test : beast::unit_test::suite
193195
test96 z{beast::zero};
194196
uset.insert(z);
195197
BEAST_EXPECT(to_string(z) == "000000000000000000000000");
198+
BEAST_EXPECT(to_short_string(z) == "00000000...");
196199
BEAST_EXPECT(*z.data() == 0);
197200
BEAST_EXPECT(*z.begin() == 0);
198201
BEAST_EXPECT(*std::prev(z.end(), 1) == 0);
@@ -213,6 +216,7 @@ struct base_uint_test : beast::unit_test::suite
213216
BEAST_EXPECT(n == z);
214217
n--;
215218
BEAST_EXPECT(to_string(n) == "FFFFFFFFFFFFFFFFFFFFFFFF");
219+
BEAST_EXPECT(to_short_string(n) == "FFFFFFFF...");
216220
n = beast::zero;
217221
BEAST_EXPECT(n == z);
218222

@@ -223,6 +227,7 @@ struct base_uint_test : beast::unit_test::suite
223227
test96 x{zm1 ^ zp1};
224228
uset.insert(x);
225229
BEAST_EXPECTS(to_string(x) == "FFFFFFFFFFFFFFFFFFFFFFFE", to_string(x));
230+
BEAST_EXPECTS(to_short_string(x) == "FFFFFFFF...", to_short_string(x));
226231

227232
BEAST_EXPECT(uset.size() == 4);
228233

src/test/overlay/ProtocolVersion_test.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,8 @@ class ProtocolVersion_test : public beast::unit_test::suite
8787
negotiateProtocolVersion("XRPL/2.2") == make_protocol(2, 2));
8888
BEAST_EXPECT(
8989
negotiateProtocolVersion(
90-
"RTXP/1.2, XRPL/2.2, XRPL/2.3, XRPL/999.999") ==
91-
make_protocol(2, 2));
90+
"RTXP/1.2, XRPL/2.2, XRPL/2.3, XRPL/2.4, XRPL/999.999") ==
91+
make_protocol(2, 3));
9292
BEAST_EXPECT(
9393
negotiateProtocolVersion("XRPL/999.999, WebSocket/1.0") ==
9494
std::nullopt);

src/test/overlay/reduce_relay_test.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,11 @@ class PeerPartial : public Peer
182182
removeTxQueue(const uint256&) override
183183
{
184184
}
185+
std::set<std::optional<uint64_t>>
186+
releaseRequestCookies(uint256 const& requestHash) override
187+
{
188+
return {};
189+
}
185190
};
186191

187192
/** Manually advanced clock. */

0 commit comments

Comments
 (0)