Skip to content

Commit 45c199e

Browse files
jimwwalkerdaverigby
authored andcommitted
MB-34910: Warmup DurabilityMonitor
Following a restart the DurabilityMonitor should be in a logically equivalent state to it's pre-warmup state. Of interest in this patch are the restoration of 1. High Prepared Seqno 2. High Completed Seqno 3. Highest Tracked Seqno (ActiveDurabilityMonitor only) To achieve the restoration of 1 and 2 and with inclusion of the previous patch warmup can just reset the variables to their pre-warmup values. For the restoration of 3, that is just assigned the value of 1. Further to these changes extension of the existing warmup testing showed that following warmup, any loaded prepares lose the 'ack' count for the current node, all prepares found on disk must have at least an 'ack' from the node itself. This is addressed when constructing the ADM by calling updateHighPreparedSeqno after loading all tracked writes. Change-Id: If3033397a331bbcef06b0fe22d93185ba9993489 Reviewed-on: http://review.couchbase.org/111701 Reviewed-by: Dave Rigby <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent 13f946a commit 45c199e

11 files changed

+223
-52
lines changed

engines/ep/src/durability/active_durability_monitor.cc

+18-7
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "stats.h"
2424
#include "statwriter.h"
2525
#include "vbucket.h"
26+
#include "vbucket_state.h"
2627

2728
#include <boost/algorithm/string/join.hpp>
2829
#include <folly/concurrency/UnboundedQueue.h>
@@ -356,8 +357,12 @@ ActiveDurabilityMonitor::ActiveDurabilityMonitor(EPStats& stats, VBucket& vb)
356357
ActiveDurabilityMonitor::ActiveDurabilityMonitor(
357358
EPStats& stats,
358359
VBucket& vb,
360+
const vbucket_state& vbs,
359361
std::vector<queued_item>&& outstandingPrepares)
360362
: ActiveDurabilityMonitor(stats, vb) {
363+
if (!vbs.replicationTopology.is_null()) {
364+
setReplicationTopology(vbs.replicationTopology);
365+
}
361366
auto s = state.wlock();
362367
for (auto& prepare : outstandingPrepares) {
363368
auto seqno = prepare->getBySeqno();
@@ -367,10 +372,18 @@ ActiveDurabilityMonitor::ActiveDurabilityMonitor(
367372
s->trackedWrites.emplace_back(nullptr,
368373
std::move(prepare),
369374
std::chrono::milliseconds{},
370-
nullptr,
371-
nullptr);
375+
s->firstChain.get(),
376+
s->secondChain.get());
372377
s->lastTrackedSeqno = seqno;
373378
}
379+
380+
// If we did load sync writes we should get them at least acked for this
381+
// node, which is achieved by attempting to move the HPS
382+
s->updateHighPreparedSeqno(*completedQueue);
383+
384+
s->lastTrackedSeqno.reset(vbs.highPreparedSeqno);
385+
s->highPreparedSeqno.reset(vbs.highPreparedSeqno);
386+
s->highCompletedSeqno.reset(vbs.highCompletedSeqno);
374387
}
375388

376389
ActiveDurabilityMonitor::ActiveDurabilityMonitor(EPStats& stats,
@@ -386,6 +399,8 @@ ActiveDurabilityMonitor::ActiveDurabilityMonitor(EPStats& stats,
386399
s->lastTrackedSeqno.reset(
387400
pdm.state.wlock()->highCompletedSeqno.lastWriteSeqno);
388401
}
402+
s->highPreparedSeqno.reset(pdm.getHighPreparedSeqno());
403+
s->highCompletedSeqno.reset(pdm.getHighCompletedSeqno());
389404
}
390405

391406
ActiveDurabilityMonitor::~ActiveDurabilityMonitor() = default;
@@ -426,11 +441,7 @@ void ActiveDurabilityMonitor::setReplicationTopology(
426441
}
427442

428443
int64_t ActiveDurabilityMonitor::getHighPreparedSeqno() const {
429-
const auto s = state.rlock();
430-
if (!s->firstChain) {
431-
return 0;
432-
}
433-
return s->highPreparedSeqno;
444+
return state.rlock()->highPreparedSeqno;
434445
}
435446

436447
int64_t ActiveDurabilityMonitor::getHighCompletedSeqno() const {

engines/ep/src/durability/active_durability_monitor.h

+4
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
class EPStats;
2929
class PassiveDurabilityMonitor;
30+
struct vbucket_state;
3031
class VBucket;
3132

3233
/*
@@ -47,14 +48,17 @@ class ActiveDurabilityMonitor : public DurabilityMonitor {
4748
* Construct an ActiveDM for the given vBucket, with the specified
4849
* outstanding prepares as the initial state of the tracked SyncWrites. Used
4950
* by warmup to restore the state as it was before restart.
51+
* @param stats EPStats object for the associated Bucket.
5052
* @param vb VBucket which owns this Durability Monitor.
53+
* @param vbs reference to the vbucket_state found at warmup
5154
* @param outstandingPrepares In-flight prepares which the DM should take
5255
* responsibility for.
5356
* These must be ordered by ascending seqno, otherwise
5457
* std::invalid_argument will be thrown.
5558
*/
5659
ActiveDurabilityMonitor(EPStats& stats,
5760
VBucket& vb,
61+
const vbucket_state& vbs,
5862
std::vector<queued_item>&& outstandingPrepares);
5963

6064
/**

engines/ep/src/durability/passive_durability_monitor.cc

+15-2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include "statwriter.h"
2525
#include "stored-value.h"
2626
#include "vbucket.h"
27+
#include "vbucket_state.h"
2728

2829
#include <boost/range/adaptor/reversed.hpp>
2930
#include <gsl.h>
@@ -38,10 +39,22 @@ PassiveDurabilityMonitor::PassiveDurabilityMonitor(VBucket& vb)
3839
s->highCompletedSeqno = Position(s->trackedWrites.end());
3940
}
4041

41-
PassiveDurabilityMonitor::PassiveDurabilityMonitor(
42-
VBucket& vb, std::vector<queued_item>&& outstandingPrepares)
42+
PassiveDurabilityMonitor::PassiveDurabilityMonitor(VBucket& vb,
43+
int64_t highPreparedSeqno,
44+
int64_t highCompletedSeqno)
4345
: PassiveDurabilityMonitor(vb) {
4446
auto s = state.wlock();
47+
s->highPreparedSeqno.lastWriteSeqno.reset(highPreparedSeqno);
48+
s->highCompletedSeqno.lastWriteSeqno.reset(highCompletedSeqno);
49+
}
50+
51+
PassiveDurabilityMonitor::PassiveDurabilityMonitor(
52+
VBucket& vb,
53+
int64_t highPreparedSeqno,
54+
int64_t highCompletedSeqno,
55+
std::vector<queued_item>&& outstandingPrepares)
56+
: PassiveDurabilityMonitor(vb, highPreparedSeqno, highCompletedSeqno) {
57+
auto s = state.wlock();
4558
for (auto& prepare : outstandingPrepares) {
4659
// Any outstanding prepares "grandfathered" into the DM should have
4760
// already specified a non-default timeout.

engines/ep/src/durability/passive_durability_monitor.h

+15
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <vector>
2323

2424
class RollbackResult;
25+
struct vbucket_state;
2526
class VBucket;
2627
class StoredDocKey;
2728

@@ -38,17 +39,31 @@ class PassiveDurabilityMonitor : public DurabilityMonitor {
3839
public:
3940
PassiveDurabilityMonitor(VBucket& vb);
4041

42+
/**
43+
* Construct a PassiveDM for the given vBucket, with the specified hps/hcs
44+
* @param vb VBucket which owns this Durability Monitor.
45+
* @param highPreparedSeqno seqno to use as the initial highPreparedSeqno
46+
* @param highCompletedSeqno seqno to use as the initial highCompletedSeqno
47+
*/
48+
PassiveDurabilityMonitor(VBucket& vb,
49+
int64_t highPreparedSeqno,
50+
int64_t highCompletedSeqno);
51+
4152
/**
4253
* Construct a PassiveDM for the given vBucket, with the specified
4354
* outstanding prepares as the initial state of the tracked SyncWrites. Used
4455
* by warmup to restore the state as it was before restart.
4556
* @param vb VBucket which owns this Durability Monitor.
57+
* @param highPreparedSeqno seqno to use as the initial highPreparedSeqno
58+
* @param highCompletedSeqno seqno to use as the initial highCompletedSeqno
4659
* @param outstandingPrepares In-flight prepares which the DM should take
4760
* responsibility for.
4861
* These must be ordered by ascending seqno, otherwise
4962
* std::invalid_argument will be thrown.
5063
*/
5164
PassiveDurabilityMonitor(VBucket& vb,
65+
int64_t highPreparedSeqno,
66+
int64_t highCompletedSeqno,
5267
std::vector<queued_item>&& outstandingPrepares);
5368

5469
~PassiveDurabilityMonitor();

engines/ep/src/ep_vb.cc

+7-7
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include "stored_value_factories.h"
3232
#include "tasks.h"
3333
#include "vbucket_bgfetch_item.h"
34+
#include "vbucket_state.h"
3435
#include "vbucketdeletiontask.h"
3536
#include <folly/lang/Assume.h>
3637

@@ -743,6 +744,7 @@ MutationStatus EPVBucket::insertFromWarmup(Item& itm,
743744
}
744745

745746
void EPVBucket::restoreOutstandingPreparesFromWarmup(
747+
const vbucket_state& vbs,
746748
std::vector<queued_item>&& outstandingPrepares) {
747749
// About to change the durabilityMonitor object, which is guarded by
748750
// stateLock.
@@ -763,19 +765,17 @@ void EPVBucket::restoreOutstandingPreparesFromWarmup(
763765
switch (getState()) {
764766
case vbucket_state_active: {
765767
durabilityMonitor = std::make_unique<ActiveDurabilityMonitor>(
766-
stats, *this, std::move(outstandingPrepares));
767-
auto topology = getReplicationTopology();
768-
if (!topology.is_null()) {
769-
dynamic_cast<ActiveDurabilityMonitor*>(durabilityMonitor.get())
770-
->setReplicationTopology(topology);
771-
}
768+
stats, *this, vbs, std::move(outstandingPrepares));
772769
return;
773770
}
774771
case vbucket_state_replica:
775772
case vbucket_state_pending:
776773
case vbucket_state_dead:
777774
durabilityMonitor = std::make_unique<PassiveDurabilityMonitor>(
778-
*this, std::move(outstandingPrepares));
775+
*this,
776+
vbs.highPreparedSeqno,
777+
vbs.highCompletedSeqno,
778+
std::move(outstandingPrepares));
779779
return;
780780
}
781781
}

engines/ep/src/ep_vb.h

+2
Original file line numberDiff line numberDiff line change
@@ -180,10 +180,12 @@ class EPVBucket : public VBucket {
180180
* Populates the HashTable and the DurabilityMonitor with the given
181181
* set of queued_items.
182182
*
183+
* @param vbs The vbucket_state read during warmup
183184
* @param outstandingPrepares Sequence of prepared_sync_writes, sorted by
184185
* seqno in ascending order.
185186
*/
186187
void restoreOutstandingPreparesFromWarmup(
188+
const vbucket_state& vbs,
187189
std::vector<queued_item>&& outstandingPrepares);
188190

189191
size_t getNumPersistedDeletes() const override;

engines/ep/src/vbucket.cc

+9-1
Original file line numberDiff line numberDiff line change
@@ -662,7 +662,15 @@ void VBucket::setupSyncReplication(const nlohmann::json& topology) {
662662
return;
663663
}
664664
// Current DM (if exists) is not Passive; replace it with a Passive one.
665-
durabilityMonitor = std::make_unique<PassiveDurabilityMonitor>(*this);
665+
if (durabilityMonitor) {
666+
durabilityMonitor = std::make_unique<PassiveDurabilityMonitor>(
667+
*this,
668+
durabilityMonitor->getHighPreparedSeqno(),
669+
durabilityMonitor->getHighCompletedSeqno());
670+
} else {
671+
durabilityMonitor =
672+
std::make_unique<PassiveDurabilityMonitor>(*this);
673+
}
666674
return;
667675
case vbucket_state_dead:
668676
// No DM in dead state.

engines/ep/src/vbucket_state.cc

+7
Original file line numberDiff line numberDiff line change
@@ -139,3 +139,10 @@ void from_json(const nlohmann::json& j, vbucket_state& vbs) {
139139
// Note: We don't track on disk prepares pre-6.5
140140
vbs.onDiskPrepares = std::stoll(j.value("on_disk_prepares", "0"));
141141
}
142+
143+
std::ostream& operator<<(std::ostream& os, const vbucket_state& vbs) {
144+
nlohmann::json j;
145+
to_json(j, vbs);
146+
os << j;
147+
return os;
148+
}

engines/ep/src/vbucket_state.h

+2
Original file line numberDiff line numberDiff line change
@@ -149,3 +149,5 @@ void to_json(nlohmann::json& json, const vbucket_state& vbs);
149149

150150
/// Method to allow nlohmann::json to convert from JSON to vbucket_state.
151151
void from_json(const nlohmann::json& j, vbucket_state& vbs);
152+
153+
std::ostream& operator<<(std::ostream& os, const vbucket_state& vbs);

engines/ep/src/warmup.cc

+12-2
Original file line numberDiff line numberDiff line change
@@ -1011,7 +1011,7 @@ void Warmup::createVBuckets(uint16_t shardId) {
10111011
// objects if they do not already exist.
10121012
for (const auto& itr : shardVbStates[shardId]) {
10131013
Vbid vbid = itr.first;
1014-
vbucket_state vbs = itr.second;
1014+
const vbucket_state& vbs = itr.second;
10151015

10161016
// Collections and sync-repl requires that the VBucket datafiles have
10171017
// 'namespacing' applied to the key space
@@ -1322,7 +1322,17 @@ void Warmup::loadPreparedSyncWrites(uint16_t shardId) {
13221322
[](const auto& a, const auto& b) {
13231323
return a->getBySeqno() < b->getBySeqno();
13241324
});
1325-
epVb.restoreOutstandingPreparesFromWarmup(std::move(prepares));
1325+
1326+
// Need the HPS/HCS so the DurabilityMonitor can be fully resumed
1327+
auto vbState = shardVbStates[shardId].find(vbid);
1328+
if (vbState == shardVbStates[shardId].end()) {
1329+
throw std::logic_error(
1330+
"Warmup::loadPreparedSyncWrites: processing " +
1331+
vbid.to_string() + ", but found no vbucket_state");
1332+
}
1333+
const vbucket_state& vbs = vbState->second;
1334+
1335+
epVb.restoreOutstandingPreparesFromWarmup(vbs, std::move(prepares));
13261336
}
13271337

13281338
if (++threadtask_count == store.vbMap.getNumShards()) {

0 commit comments

Comments
 (0)