Skip to content

Commit 12efa00

Browse files
committed
Add:
- InMemoryRangeIndex Fix: - DefaultLocalListeners iterators do not handle multiple waiters on the same transaction correctly - Minor burn test edge case when cancelling bootstrap and resuming - NotifyWaitingOn stillExecutes->stillWaitsOn - Privileged fast path commit failure should not send stable message - Erroneously populating executeAt with txnId in CommandSummaries for Accepted commands Improve: - Improve Catchup logging - KeyDeps.forEach - Timestamp.tryParse - Simulate CFR loading/listening
1 parent 93d78be commit 12efa00

76 files changed

Lines changed: 2837 additions & 975 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

accord-core/src/main/java/accord/api/ProtocolModifiers.java

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ static Spec parse(String description)
127127
case "accept": accept = include; Invariants.require(cfp == DoNotChase, "Invalid to specify ChaseFixedPoint.Chase for accept"); break;
128128
case "commit": commit = include; Invariants.require(cfp == DoNotChase, "Invalid to specify ChaseFixedPoint.Chase for commit"); break;
129129
case "stable": stable = include; Invariants.require(cfp == DoNotChase, "Invalid to specify ChaseFixedPoint.Chase for stable"); break;
130-
case "recover": recover = include; Invariants.require(cfp == DoNotChase, "Invalid to specify ChaseFixedPoint.Chase for stable"); break;
130+
case "recover": recover = include; Invariants.require(cfp == DoNotChase, "Invalid to specify ChaseFixedPoint.Chase for recover"); break;
131131
}
132132
}
133133

@@ -194,6 +194,32 @@ public static boolean discardPreAcceptDeps(TxnId txnId)
194194
}
195195
}
196196

197+
public static class RangeSpecParam
198+
{
199+
// Implementations may set this prior to creating any Range object to specify whether the start or end bound of its ranges are inclusive.
200+
public static boolean END_INCLUSIVE = true;
201+
}
202+
public static class RangeSpec
203+
{
204+
public static final boolean END_INCLUSIVE = RangeSpecParam.END_INCLUSIVE;
205+
public static boolean isEndInclusive()
206+
{
207+
return END_INCLUSIVE;
208+
}
209+
public static boolean isEndExclusive()
210+
{
211+
return !END_INCLUSIVE;
212+
}
213+
public static boolean isStartInclusive()
214+
{
215+
return !END_INCLUSIVE;
216+
}
217+
public static boolean isStartExclusive()
218+
{
219+
return END_INCLUSIVE;
220+
}
221+
}
222+
197223
public static class Toggles
198224
{
199225
private static FastPaths permittedFastPaths = new FastPaths(FastPath.values());
@@ -205,10 +231,25 @@ public static class Toggles
205231
public static MediumPath defaultMediumPath() { return defaultMediumPath; }
206232
public static void setDefaultMediumPath(MediumPath newDefaultMediumPath) { defaultMediumPath = newDefaultMediumPath; }
207233

234+
private static boolean recoveryAwaitsSupersedingSyncPoints = true;
235+
public static boolean recoveryAwaitsSupersedingSyncPoints() { return recoveryAwaitsSupersedingSyncPoints; }
236+
public static void setRecoveryAwaitsSupersedingSyncPoints(boolean newRecoveryAwaitsSupersedingSyncPoints) { recoveryAwaitsSupersedingSyncPoints = newRecoveryAwaitsSupersedingSyncPoints; }
237+
238+
// TODO (required): default this to false once released support via recoveryAwaitsSupersedingSyncPoints
239+
private static boolean syncPointsTrackUnstableMediumPathDependencies = true;
240+
public static boolean syncPointsTrackUnstableMediumPathDependencies() { return syncPointsTrackUnstableMediumPathDependencies; }
241+
public static void setSyncPointsTrackUnstableMediumPathDependencies(boolean newSyncPointsTrackUnstableMediumPathDependencies) { syncPointsTrackUnstableMediumPathDependencies = newSyncPointsTrackUnstableMediumPathDependencies; }
242+
243+
private static boolean recoverPartialAcceptPhaseIfNoFastPath = false;
244+
public static boolean recoverPartialAcceptPhaseIfNoFastPath() { return recoverPartialAcceptPhaseIfNoFastPath; }
245+
public static void setRecoverPartialAcceptPhaseIfNoFastPath(boolean newSyncPointsRecoverPartialAcceptPhase) {recoverPartialAcceptPhaseIfNoFastPath = newSyncPointsRecoverPartialAcceptPhase; }
246+
208247
private static boolean filterDuplicateDependenciesFromAcceptReply = true;
209248
public static boolean filterDuplicateDependenciesFromAcceptReply() { return filterDuplicateDependenciesFromAcceptReply; }
210249
public static void setFilterDuplicateDependenciesFromAcceptReply(boolean newFilterDuplicateDependenciesFromAcceptReply) { filterDuplicateDependenciesFromAcceptReply = newFilterDuplicateDependenciesFromAcceptReply; }
211250

251+
252+
212253
public enum SendStableMessages { TO_ALL, FOR_READS, FOR_READS_OR_NONE_IF_FASTEXEC}
213254
private static SendStableMessages sendStableMessages = FOR_READS_OR_NONE_IF_FASTEXEC;
214255
public static void setSendStableMessages(SendStableMessages newSendStableMessages) { sendStableMessages = newSendStableMessages; }

accord-core/src/main/java/accord/coordinate/ExecuteEphemeralRead.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@
5454

5555
import static accord.api.ProtocolModifiers.Toggles.permitLocalExecution;
5656
import static accord.coordinate.ExecutePath.EPHEMERAL;
57-
import static accord.coordinate.ExecutePath.FAST;
5857
import static accord.coordinate.ReadCoordinator.Action.Approve;
5958
import static accord.coordinate.ReadCoordinator.Action.ApprovePartial;
6059
import static accord.primitives.Status.Phase.Execute;

accord-core/src/main/java/accord/coordinate/ExecuteTxn.java

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -399,20 +399,23 @@ protected void onDone(Success success, Throwable failure)
399399
}
400400
else
401401
{
402-
stable.informStableOnceQuorum();
403-
if (sendOnlyReadStableMessages())
402+
if (!isPrivilegedVoteCommitting)
404403
{
405-
// send additional stable messages to record the transaction outcome
406-
Commit.Kind kind = commitKind();
407-
if (!candidates.isEmpty())
404+
stable.informStableOnceQuorum();
405+
if (sendOnlyReadStableMessages())
408406
{
409-
for (int i = 0, size = candidates.size() ; i < size ; ++i)
410-
sendStableOnly(candidates.get(i), kind);
411-
}
412-
if (unstableFastReads != null)
413-
{
414-
for (Node.Id to : unstableFastReads)
415-
sendStableOnly(to, kind);
407+
// send additional stable messages to record the transaction outcome
408+
Commit.Kind kind = commitKind();
409+
if (!candidates.isEmpty())
410+
{
411+
for (int i = 0, size = candidates.size() ; i < size ; ++i)
412+
sendStableOnly(candidates.get(i), kind);
413+
}
414+
if (unstableFastReads != null)
415+
{
416+
for (Node.Id to : unstableFastReads)
417+
sendStableOnly(to, kind);
418+
}
416419
}
417420
}
418421
invokeCallback(null, failure);
@@ -430,9 +433,8 @@ public void onSlowResponse(Id from)
430433
@Override
431434
public void onFailure(Id from, Throwable failure)
432435
{
433-
super.onFailure(from, failure);
434-
if (isPrivilegedVoteCommitting && from.id == node.id().id)
435-
tryFinishOnFailure();
436+
if (isPrivilegedVoteCommitting && from.id == node.id().id) finishWithFailure(failure);
437+
else super.onFailure(from, failure);
436438
}
437439

438440
protected CoordinationAdapter<Result> adapter()

accord-core/src/main/java/accord/coordinate/FetchDurableBefore.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,10 @@ void start()
5252
{
5353
super.start();
5454
contact(ignore -> new GetDurableBefore(), id -> !node.id().equals(id));
55-
markSelfContacted();
56-
onSuccess(node.id(), new DurableBeforeReply(node.durableBefore()));
55+
executor.executeMaybeImmediately(() -> {
56+
markSelfContacted();
57+
onSuccess(node.id(), new DurableBeforeReply(node.durableBefore()));
58+
});
5759
}
5860

5961
public static AsyncChain<DurableBefore> catchup(Node node)

accord-core/src/main/java/accord/coordinate/KeyBarriers.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.slf4j.LoggerFactory;
2525

2626
import accord.api.RoutingKey;
27+
import accord.api.VisibleForImplementation;
2728
import accord.local.CommandSummaries;
2829
import accord.local.MapReduceConsumeCommandStores;
2930
import accord.local.Node;
@@ -48,8 +49,6 @@
4849
import static accord.local.CommandSummaries.SummaryStatus.APPLIED;
4950
import static accord.local.CommandSummaries.SummaryStatus.COMMITTED;
5051
import static accord.local.CommandSummaries.SummaryStatus.INVALIDATED;
51-
import static accord.local.CommandSummaries.ComputeIsDep.IGNORE;
52-
import static accord.local.CommandSummaries.TestStartedAt.STARTED_AFTER;
5352
import static accord.local.durability.DurabilityService.SyncLocal.NoLocal;
5453
import static accord.local.durability.DurabilityService.SyncLocal.Self;
5554
import static accord.local.durability.DurabilityService.SyncRemote.NoRemote;
@@ -63,6 +62,7 @@
6362
* Facility for finding existing key transactions that can serve as a barrier transaction,
6463
* ensuring all reads/writes after some point in the txn log have been executed.
6564
*/
65+
@VisibleForImplementation
6666
public class KeyBarriers
6767
{
6868
@SuppressWarnings("unused")
@@ -84,7 +84,7 @@ public Found(TxnId txnId, RoutingKey key, SyncLocal knownLocal, SyncRemote known
8484
}
8585
}
8686

87-
public static AsyncResult<Found> find(Node node, Timestamp min, RoutingKey key, SyncLocal syncLocal, SyncRemote syncRemote)
87+
public static AsyncResult<Found> find(Node node, TxnId min, RoutingKey key, SyncLocal syncLocal, SyncRemote syncRemote)
8888
{
8989
Find find = new Find(min, key, syncLocal, syncRemote);
9090
node.commandStores().mapReduceConsume(min.epoch(), Long.MAX_VALUE, find);
@@ -98,16 +98,16 @@ public static AsyncResult<Found> find(Node node, Timestamp min, RoutingKey key,
9898
* For Applied we can return success immediately with the executeAt epoch. For PreApplied we can add
9999
* a listener for when it transitions to Applied and then return success.
100100
*/
101-
static class Find extends MapReduceConsumeCommandStores<RoutingKeys, Found> implements CommandSummaries.AllCommandVisitor
101+
static class Find extends MapReduceConsumeCommandStores<RoutingKeys, Found> implements CommandSummaries.SupersedingCommandVisitor
102102
{
103103
final AsyncResults.SettableByCallback<Found> result = new AsyncResults.SettableByCallback<>();
104-
final Timestamp min;
104+
final TxnId min;
105105
final RoutingKey find;
106106
final SyncLocal syncLocal;
107107
final SyncRemote syncRemote;
108108
Found found;
109109

110-
Find(Timestamp min, RoutingKey find, SyncLocal syncLocal, SyncRemote syncRemote)
110+
Find(TxnId min, RoutingKey find, SyncLocal syncLocal, SyncRemote syncRemote)
111111
{
112112
super(RoutingKeys.of(find));
113113
this.min = min;
@@ -122,7 +122,7 @@ public Found applyInternal(SafeCommandStore safeStore)
122122
// Barriers are trying to establish that committed transactions are applied before the barrier (or in this case just minEpoch)
123123
// so all existing transaction types should ensure that at this point. An earlier txnid may have an executeAt that is after
124124
// this barrier or the transaction we listen on and that is fine
125-
safeStore.visit(scope, TxnId.NONE, Ws, STARTED_AFTER, min, IGNORE, this);
125+
safeStore.visit(scope, min, Ws, this);
126126
return found;
127127
}
128128

accord-core/src/main/java/accord/coordinate/Propose.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import accord.local.Node.Id;
3232
import accord.local.SequentialAsyncExecutor;
3333
import accord.messages.Accept;
34+
import accord.messages.Accept.AcceptFlags;
3435
import accord.messages.Accept.AcceptReply;
3536
import accord.messages.Callback;
3637
import accord.primitives.Ballot;
@@ -47,9 +48,9 @@
4748
import accord.utils.SortedListMap;
4849
import accord.utils.Rethrowable;
4950

50-
import static accord.api.ProtocolModifiers.Toggles.filterDuplicateDependenciesFromAcceptReply;
5151
import static accord.coordinate.tracking.RequestStatus.Failed;
5252
import static accord.coordinate.tracking.RequestStatus.Success;
53+
import static accord.messages.Accept.AcceptFlags.filterDeps;
5354
import static accord.messages.Commit.Invalidate.commitInvalidate;
5455
import static accord.primitives.Routables.Slice.Minimal;
5556
import static accord.primitives.Status.AcceptedInvalidate;
@@ -66,6 +67,7 @@ abstract class Propose<R> extends AbstractCoordination<FullRoute<?>, R, AcceptRe
6667

6768
final Timestamp executeAt;
6869
final QuorumTracker tracker;
70+
final int acceptFlags;
6971

7072
Propose(Node node, SequentialAsyncExecutor executor, Topologies topologies, Accept.Kind kind, Ballot ballot, TxnId txnId, Txn txn, Route<?> require, FullRoute<?> route, Timestamp executeAt, Deps deps, BiConsumer<? super R, Throwable> callback)
7173
{
@@ -77,6 +79,7 @@ abstract class Propose<R> extends AbstractCoordination<FullRoute<?>, R, AcceptRe
7779
this.deps = deps;
7880
this.executeAt = executeAt;
7981
this.tracker = new QuorumTracker(topologies);
82+
this.acceptFlags = AcceptFlags.encode(txnId, require != scope);
8083
Invariants.require(txnId.isSyncPoint() || deps.maxTxnId(txnId).compareTo(executeAt) <= 0,
8184
"Attempted to propose %s with an earlier executeAt than a conflicting transaction it witnessed: %s vs executeAt: %s", txnId, deps, executeAt);
8285
Invariants.require(topologies.currentEpoch() == executeAt.epoch());
@@ -86,7 +89,7 @@ abstract class Propose<R> extends AbstractCoordination<FullRoute<?>, R, AcceptRe
8689
void start()
8790
{
8891
super.start();
89-
contact(to -> new Accept(to, tracker.topologies(), kind, ballot, txnId, scope, executeAt, deps, require != scope));
92+
contact(to -> new Accept(to, tracker.topologies(), kind, ballot, txnId, scope, executeAt, deps, acceptFlags));
9093
}
9194

9295
@Override
@@ -163,15 +166,22 @@ Deps mergeDeps(Deps newDeps)
163166
Deps mergeNewDeps()
164167
{
165168
SortedListMap<Node.Id, AcceptReply> oks = finishOks();
169+
if (!AcceptFlags.calculateDeps(acceptFlags))
170+
return Deps.NONE;
171+
166172
Deps deps = Deps.merge(oks, oks.domainSize(), SortedListMap::getValue, ok -> ok.deps);
167173
if (Faults.discardPreAcceptDeps(txnId))
168174
return deps;
169175

170176
if (txnId.is(TrackStable))
171177
{
172178
// we must not propose as stable any dep < txnId that we did not propose as part of this phase
173-
if (filterDuplicateDependenciesFromAcceptReply())
179+
if (!filterDeps(acceptFlags))
180+
{
181+
// if the replicas haven't filtered their responses, we need to remove
182+
// anything we sent to ensure we don't mark them as unstable
174183
deps = deps.without(this.deps);
184+
}
175185

176186
deps = deps.markUnstableBefore(txnId);
177187
}
@@ -181,7 +191,6 @@ Deps mergeNewDeps()
181191

182192
abstract CoordinationAdapter<R> adapter();
183193

184-
185194
@Override
186195
public CoordinationKind kind()
187196
{

0 commit comments

Comments
 (0)