Skip to content

Commit cc4c805

Browse files
committed
- DefaultRemoteListeners not correctly synchronised
- Topology.cloneEquivalentWithEpoch should also share nodeLookup and ranges, especially to accelerate computeWaitForEpoch/computeScope
1 parent fb693f5 commit cc4c805

10 files changed

Lines changed: 82 additions & 47 deletions

File tree

accord-core/src/main/java/accord/impl/DefaultRemoteListeners.java

Lines changed: 44 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ static class Listeners
297297
this.end = 1;
298298
}
299299

300-
Listeners merge(Listeners that)
300+
synchronized Listeners merge(Listeners that)
301301
{
302302
int count = 0;
303303
int i = this.start, j = that.start;
@@ -344,6 +344,41 @@ else if (count < i)
344344
end = count;
345345
return this;
346346
}
347+
348+
synchronized Listeners notify(NotifySink notifySink, SafeCommandStore safeStore, SafeCommand safeCommand, Command prev)
349+
{
350+
int storeId = safeStore.commandStore().id();
351+
SaveStatus newStatus = safeCommand.current().saveStatus();
352+
Durability newDurability = safeCommand.current().durability();
353+
354+
StatusListeners[] listeners = statusListeners;
355+
for (int i = start ; i < end ; ++i)
356+
{
357+
StatusListeners listener = listeners[i];
358+
if (awaitSaveStatus(listener.await).compareTo(newStatus) > 0)
359+
return this;
360+
361+
if (awaitDurability(listener.await).compareTo(newDurability.decisionOrOutcome()) > 0)
362+
continue;
363+
364+
listener.removeWaitingOn(storeId);
365+
if (listener.waitingOnCount == 0)
366+
{
367+
// if we get invalidated we don't save the route, so we take the combined route of before and after the new status
368+
Route<?> route = Route.merge(safeCommand.current().route(), prev == null ? null : (Route)prev.route());
369+
notifySink.notify(safeCommand.txnId(), newStatus, route, listener.listeners, listener.listenerCount);
370+
if (i != start)
371+
System.arraycopy(listeners, start, listeners, start + 1, i - start);
372+
listeners[start] = null;
373+
++start;
374+
}
375+
}
376+
377+
if (start == end)
378+
return null;
379+
380+
return this;
381+
}
347382
}
348383

349384
class Register implements Registration
@@ -396,7 +431,7 @@ private void trim()
396431
}
397432

398433
@Override
399-
public int done()
434+
public synchronized int done()
400435
{
401436
if (count == 0)
402437
return 0;
@@ -435,36 +470,14 @@ public Registration register(TxnId txnId, SaveStatus awaitSaveStatus, HasDecisio
435470
public void notify(SafeCommandStore safeStore, SafeCommand safeCommand, Command prev)
436471
{
437472
TxnId txnId = safeCommand.txnId();
438-
Listeners entry = this.listeners.get(txnId);
439-
if (entry == null)
440-
return;
441-
442-
int storeId = safeStore.commandStore().id();
443-
SaveStatus newStatus = safeCommand.current().saveStatus();
444-
Durability newDurability = safeCommand.current().durability();
445-
446-
int start = entry.start, end = entry.end;
447-
StatusListeners[] listeners = entry.statusListeners;
448-
for (int i = start ; i < end ; ++i)
473+
Listeners entry = listeners.get(txnId);
474+
if (entry != null && null == entry.notify(notifySink, safeStore, safeCommand, prev))
449475
{
450-
StatusListeners listener = listeners[i];
451-
if (awaitSaveStatus(listener.await).compareTo(newStatus) > 0)
452-
return;
453-
454-
if (awaitDurability(listener.await).compareTo(newDurability.decisionOrOutcome()) > 0)
455-
continue;
456-
457-
listener.removeWaitingOn(storeId);
458-
if (listener.waitingOnCount == 0)
459-
{
460-
// if we get invalidated we don't save the route, so we take the combined route of before and after the new status
461-
Route<?> route = Route.merge(safeCommand.current().route(), prev == null ? null : (Route)prev.route());
462-
notifySink.notify(txnId, newStatus, route, listener.listeners, listener.listenerCount);
463-
if (i != start)
464-
System.arraycopy(listeners, start, listeners, start + 1, i - start);
465-
listeners[start] = null;
466-
start = ++entry.start;
467-
}
476+
listeners.compute(txnId, (ignore, e) -> {
477+
if (e == entry && e.start == e.end)
478+
return null;
479+
return e;
480+
});
468481
}
469482
}
470483

accord-core/src/main/java/accord/impl/InMemoryCommandStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1234,7 +1234,7 @@ public void unsafeClearForTesting()
12341234
hasResumedBootstraps = false;
12351235
}
12361236

1237-
public Journal.Replayer replayer()
1237+
public Journal.Replayer replayer(AbstractReplayer.Mode mode)
12381238
{
12391239
return new CommandReplayer(this);
12401240
}

accord-core/src/main/java/accord/local/CommandStore.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.function.Supplier;
3636
import javax.annotation.Nullable;
3737

38+
import accord.impl.AbstractReplayer;
3839
import accord.primitives.*;
3940
import com.google.common.annotations.VisibleForTesting;
4041
import com.google.common.collect.ImmutableSortedMap;
@@ -222,7 +223,7 @@ public final int id()
222223
return id;
223224
}
224225

225-
public abstract Journal.Replayer replayer();
226+
public abstract Journal.Replayer replayer(AbstractReplayer.Mode mode);
226227
// expected to invoke safeStore.upsertRedundantBefore at some future point, when the commandStore state is durably persisted
227228
protected abstract void ensureDurable(Ranges ranges, RedundantBefore onCommandStoreDurable);
228229

accord-core/src/main/java/accord/local/Commands.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1236,17 +1236,22 @@ public final void accept(SafeCommandStore safeStore)
12361236
boolean acceptInternal(SafeCommandStore safeStore)
12371237
{
12381238
SafeCommand waitingSafe = safeStore.get(waitingId);
1239+
PartialDeps partialDeps;
1240+
{
1241+
Command waiting = waitingSafe.current();
1242+
if (waiting.saveStatus().compareTo(Applying) >= 0)
1243+
return false;
1244+
1245+
partialDeps = waiting.partialDeps();
1246+
Invariants.require(partialDeps != null, "Trying to execute command without partialDeps: %s", waiting);
1247+
}
1248+
12391249
SafeCommand depSafe = null;
12401250
if (loadDepId != null)
12411251
{
12421252
depSafe = safeStore.ifInitialised(loadDepId);
12431253
if (depSafe == null) // TODO (required): slice to waiting.participants().waitsOn? can simplify method
1244-
{
1245-
Command waiting = waitingSafe.current();
1246-
if (waiting.saveStatus().compareTo(Applying) >= 0)
1247-
return false; // nothing to do
1248-
depSafe = initialiseOrRemoveDependency(safeStore, waitingSafe, loadDepId, waiting.partialDeps().participants(loadDepId));
1249-
}
1254+
depSafe = initialiseOrRemoveDependency(safeStore, waitingSafe, loadDepId, partialDeps.participants(loadDepId));
12501255
}
12511256

12521257
while (true)
@@ -1257,7 +1262,7 @@ boolean acceptInternal(SafeCommandStore safeStore)
12571262

12581263
if (depSafe == null)
12591264
{
1260-
WaitingOn waitingOn = waiting.asCommitted().waitingOn();
1265+
WaitingOn waitingOn = waiting.waitingOn();
12611266
TxnId directlyBlockedOn = waitingOn.nextWaitingOn();
12621267
if (directlyBlockedOn == null)
12631268
{
@@ -1300,7 +1305,7 @@ boolean acceptInternal(SafeCommandStore safeStore)
13001305
if (depExecution.compareTo(WaitingToExecute) < 0 && dep.participants().owns().isEmpty())
13011306
{
13021307
// TODO (desired): slightly costly to invert a large partialDeps collection
1303-
participants = waiting.partialDeps().participants(dep.txnId());
1308+
participants = partialDeps.participants(dep.txnId());
13041309
Participants<?> waitsOn = participants.intersecting(waiting.participants().stillWaitsOn(), Minimal);
13051310

13061311
depSafe = maybeCleanupRedundantDependency(safeStore, waitingSafe, depSafe, waitsOn);
@@ -1352,7 +1357,7 @@ boolean acceptInternal(SafeCommandStore safeStore)
13521357
case CleaningUp:
13531358
updateDependencyAndMaybeExecute(safeStore, waitingSafe, depSafe, false);
13541359
waiting = waitingSafe.current();
1355-
Invariants.require(waiting.saveStatus().compareTo(Applying) >= 0 || !waiting.asCommitted().waitingOn().isWaitingOn(dep.txnId()));
1360+
Invariants.require(waiting.saveStatus().compareTo(Applying) >= 0 || !waiting.waitingOn().isWaitingOn(dep.txnId()));
13561361
depSafe = null;
13571362
}
13581363
}

accord-core/src/main/java/accord/messages/ParticipantsRequest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,6 @@ public static long computeWaitForEpoch(Id node, Topologies topologies, int start
131131
if (i == mi)
132132
return topologies.oldestEpoch();
133133

134-
135134
Ranges latest;
136135
{
137136
Topology mostRecent = topologies.get(i - 1);

accord-core/src/main/java/accord/topology/Topology.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ public boolean isEquivalent(Topology topology)
269269

270270
public Topology cloneEquivalentWithEpoch(long epoch)
271271
{
272-
return new Topology(epoch, removed, hardRemoved, stale, shards);
272+
return new Topology(null, epoch, shards, ranges, removed, hardRemoved, stale, nodes, nodeLookup, ranges, supersetIndexes);
273273
}
274274

275275
@Override

accord-core/src/main/java/accord/utils/IntrusivePriorityHeap.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,13 +86,27 @@ protected boolean contains(N node)
8686
return i >= 0 && i < size && heap[i] == node;
8787
}
8888

89+
protected boolean removeIfContains(N node)
90+
{
91+
int i = node.heapIndex;
92+
if (i < 0 || i >= heap.length || heap[i] != node)
93+
return false;
94+
removeInternal(i, node);
95+
return true;
96+
}
97+
8998
/**
9099
* remove; can be used as a simple list
91100
*/
92101
protected void remove(N node)
93102
{
94103
int i = node.heapIndex;
95104
Invariants.requireArgument(i >= 0 && i < heap.length && heap[i] == node);
105+
removeInternal(i, node);
106+
}
107+
108+
private void removeInternal(int i, N node)
109+
{
96110
if (size > 1)
97111
{
98112
N tail = (N) heap[--size];

accord-core/src/test/java/accord/impl/RemoteListenersTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ protected TestCommandStore(int id)
404404
}
405405

406406
@Override
407-
public Journal.Replayer replayer()
407+
public Journal.Replayer replayer(AbstractReplayer.Mode mode)
408408
{
409409
throw new UnsupportedOperationException();
410410
}

accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838

3939
import accord.api.Journal;
4040
import accord.api.Result;
41+
import accord.impl.AbstractReplayer;
4142
import accord.impl.CommandChange;
4243
import accord.impl.InMemoryCommandStore;
4344
import accord.local.Cleanup;
@@ -70,6 +71,7 @@
7071
import static accord.api.Journal.Load.ALL;
7172
import static accord.api.Journal.Load.MINIMAL;
7273
import static accord.api.Journal.Load.MINIMAL_WITH_DEPS;
74+
import static accord.impl.AbstractReplayer.Mode.PART_NON_DURABLE;
7375
import static accord.impl.CommandChange.Field;
7476
import static accord.impl.CommandChange.Field.ACCEPTED;
7577
import static accord.impl.CommandChange.Field.CLEANUP;
@@ -632,7 +634,7 @@ public boolean replay(CommandStores commandStores, Object param)
632634
Map<TxnId, List<Diff>> diffs = new TreeMap<>();
633635

634636
InMemoryCommandStore commandStore = (InMemoryCommandStore) commandStores.forId(commandStoreId);
635-
Replayer replayer = commandStore.replayer();
637+
Replayer replayer = commandStore.replayer(PART_NON_DURABLE);
636638

637639
for (Map.Entry<TxnId, Diffs> e : diffEntry.getValue().entrySet())
638640
diffs.put(e.getKey(), e.getValue().sorted(true));

accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import accord.api.Result;
5050
import accord.api.RoutingKey;
5151
import accord.api.Update;
52+
import accord.impl.AbstractReplayer;
5253
import accord.impl.DefaultLocalListeners;
5354
import accord.impl.DefaultLocalListeners.DefaultNotifySink;
5455
import accord.impl.DefaultRemoteListeners;
@@ -993,7 +994,7 @@ boolean runOneTask(SafeCommandStore safeStore)
993994
return true;
994995
}
995996

996-
public Journal.Replayer replayer()
997+
public Journal.Replayer replayer(AbstractReplayer.Mode mode)
997998
{
998999
throw new UnsupportedOperationException();
9991000
}

0 commit comments

Comments
 (0)