diff --git a/accord-core/src/main/java/accord/coordinate/RangeUnavailable.java b/accord-core/src/main/java/accord/coordinate/RangeUnavailable.java new file mode 100644 index 0000000000..b94e506cce --- /dev/null +++ b/accord-core/src/main/java/accord/coordinate/RangeUnavailable.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.coordinate; + +import accord.primitives.Ranges; +import accord.primitives.TxnId; + +public class RangeUnavailable extends Exhausted +{ + public final Ranges unavailable; + + public RangeUnavailable(Ranges unavailable, TxnId txnId) + { + super(txnId, null, buildMessage(unavailable)); + this.unavailable = unavailable; + } + + private static String buildMessage(Ranges unavailable) + { + return "The following ranges are unavailable to read: " + unavailable; + } +} diff --git a/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java b/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java index 66fa96c242..da744f7d53 100644 --- a/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java +++ b/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java @@ -131,7 +131,11 @@ public void onSuccess(Id from, Reply reply) break; case ApprovePartial: - handle(recordPartialReadSuccess(from, unavailable(reply))); + Ranges unavailable = unavailable(reply); + RequestStatus result = recordPartialReadSuccess(from, unavailable); + if (result == RequestStatus.Failed && failure == null) + failure = new RangeUnavailable(unavailable, txnId); + handle(result); break; } } diff --git a/accord-core/src/main/java/accord/coordinate/Recover.java b/accord-core/src/main/java/accord/coordinate/Recover.java index 62ed788c5a..a0c68a0b6d 100644 --- a/accord-core/src/main/java/accord/coordinate/Recover.java +++ b/accord-core/src/main/java/accord/coordinate/Recover.java @@ -66,7 +66,7 @@ class AwaitCommit extends AsyncResults.SettableResult implements Call AwaitCommit(Node node, TxnId txnId, Unseekables unseekables) { - Topology topology = node.topology().globalForEpoch(txnId.epoch()).forSelection(unseekables); + Topology topology = node.topology().globalForEpoch(txnId.epoch()).forSelection(unseekables, Topology.OnUnknown.REJECT); this.tracker = new QuorumTracker(new Topologies.Single(node.topology().sorter(), topology)); node.send(topology.nodes(), to -> new WaitOnCommit(to, topology, txnId, unseekables), this); } diff --git a/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java b/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java index cc7de494e8..36bc696b45 100644 --- a/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java +++ b/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java @@ -26,7 +26,7 @@ import java.util.function.Function; import accord.api.VisibleForImplementation; -import accord.impl.CommandsForKey.CommandLoader; +import accord.impl.CommandTimeseries.CommandLoader; import accord.local.Command; import accord.local.CommonAttributes; import accord.local.PreLoadContext; diff --git a/accord-core/src/main/java/accord/impl/CommandTimeseries.java b/accord-core/src/main/java/accord/impl/CommandTimeseries.java new file mode 100644 index 0000000000..69d4dbcd27 --- /dev/null +++ b/accord-core/src/main/java/accord/impl/CommandTimeseries.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.impl; + +import java.util.List; +import java.util.NavigableMap; +import java.util.Objects; +import java.util.TreeMap; +import java.util.function.Predicate; +import java.util.stream.Stream; +import javax.annotation.Nullable; + +import com.google.common.collect.ImmutableSortedMap; + +import accord.api.Key; +import accord.local.Command; +import accord.local.SafeCommandStore; +import accord.local.SaveStatus; +import accord.local.Status; +import accord.primitives.Seekable; +import accord.primitives.Timestamp; +import accord.primitives.TxnId; + +import static accord.local.SafeCommandStore.TestDep.ANY_DEPS; +import static accord.local.SafeCommandStore.TestDep.WITH; +import static accord.utils.Utils.ensureSortedImmutable; +import static accord.utils.Utils.ensureSortedMutable; + +public class CommandTimeseries +{ + public enum TestTimestamp + {BEFORE, AFTER} + + private final Seekable keyOrRange; + protected final CommandLoader loader; + public final ImmutableSortedMap commands; + + public CommandTimeseries(Update builder) + { + this.keyOrRange = builder.keyOrRange; + this.loader = builder.loader; + this.commands = ensureSortedImmutable(builder.commands); + } + + CommandTimeseries(Seekable keyOrRange, CommandLoader loader, ImmutableSortedMap commands) + { + this.keyOrRange = keyOrRange; + this.loader = loader; + this.commands = commands; + } + + public CommandTimeseries(Key keyOrRange, CommandLoader loader) + { + this(keyOrRange, loader, ImmutableSortedMap.of()); + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CommandTimeseries that = (CommandTimeseries) o; + return keyOrRange.equals(that.keyOrRange) && loader.equals(that.loader) && commands.equals(that.commands); + } + + @Override + public int hashCode() + { + int hash = 1; + hash = 31 * hash + Objects.hashCode(keyOrRange); + hash = 31 * hash + Objects.hashCode(loader); + hash = 31 * hash + Objects.hashCode(commands); + return hash; + } + + public D get(Timestamp key) + { + return commands.get(key); + } + + public boolean isEmpty() + { + return commands.isEmpty(); + } + + public Timestamp maxTimestamp() + { + return commands.isEmpty() ? Timestamp.NONE : commands.keySet().last(); + } + + /** + * All commands before/after (exclusive of) the given timestamp + *

+ * Note that {@code testDep} applies only to commands that know at least proposed deps; if specified any + * commands that do not know any deps will be ignored. + *

+ * TODO (expected, efficiency): TestDep should be asynchronous; data should not be kept memory-resident as only used for recovery + */ + public T mapReduce(SafeCommandStore.TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp, + SafeCommandStore.TestDep testDep, @Nullable TxnId depId, + @Nullable Status minStatus, @Nullable Status maxStatus, + SafeCommandStore.CommandFunction map, T initialValue, T terminalValue) + { + + for (D data : (testTimestamp == TestTimestamp.BEFORE ? commands.headMap(timestamp, false) : commands.tailMap(timestamp, false)).values()) + { + TxnId txnId = loader.txnId(data); + if (!testKind.test(txnId.rw())) continue; + SaveStatus status = loader.saveStatus(data); + if (minStatus != null && minStatus.compareTo(status.status) > 0) + continue; + if (maxStatus != null && maxStatus.compareTo(status.status) < 0) + continue; + List deps = loader.depsIds(data); + // If we don't have any dependencies, we treat a dependency filter as a mismatch + if (testDep != ANY_DEPS && (!status.known.deps.hasProposedOrDecidedDeps() || (deps.contains(depId) != (testDep == WITH)))) + continue; + Timestamp executeAt = loader.executeAt(data); + initialValue = map.apply(keyOrRange, txnId, executeAt, initialValue); + if (initialValue.equals(terminalValue)) + break; + } + return initialValue; + } + + Stream between(Timestamp min, Timestamp max, Predicate statusPredicate) + { + return commands.subMap(min, true, max, true).values().stream() + .filter(d -> statusPredicate.test(loader.status(d))).map(loader::txnId); + } + + public Stream all() + { + return commands.values().stream(); + } + + Update beginUpdate() + { + return new Update<>(this); + } + + public CommandLoader loader() + { + return loader; + } + + public interface CommandLoader + { + D saveForCFK(Command command); + + TxnId txnId(D data); + Timestamp executeAt(D data); + SaveStatus saveStatus(D data); + List depsIds(D data); + + default Status status(D data) + { + return saveStatus(data).status; + } + + default Status.Known known(D data) + { + return saveStatus(data).known; + } + } + + public static class Update + { + private final Seekable keyOrRange; + protected CommandLoader loader; + protected NavigableMap commands; + + public Update(Seekable keyOrRange, CommandLoader loader) + { + this.keyOrRange = keyOrRange; + this.loader = loader; + this.commands = new TreeMap<>(); + } + + public Update(CommandTimeseries timeseries) + { + this.keyOrRange = timeseries.keyOrRange; + this.loader = timeseries.loader; + this.commands = timeseries.commands; + } + + public Update add(Timestamp timestamp, Command command) + { + commands = ensureSortedMutable(commands); + commands.put(timestamp, loader.saveForCFK(command)); + return this; + } + + public Update add(Timestamp timestamp, D value) + { + commands = ensureSortedMutable(commands); + commands.put(timestamp, value); + return this; + } + + public Update remove(Timestamp timestamp) + { + commands = ensureSortedMutable(commands); + commands.remove(timestamp); + return this; + } + + public CommandTimeseries build() + { + return new CommandTimeseries<>(this); + } + } +} diff --git a/accord-core/src/main/java/accord/impl/CommandTimeseriesHolder.java b/accord-core/src/main/java/accord/impl/CommandTimeseriesHolder.java new file mode 100644 index 0000000000..f0a88e7507 --- /dev/null +++ b/accord-core/src/main/java/accord/impl/CommandTimeseriesHolder.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.impl; + +import accord.primitives.Timestamp; + +public interface CommandTimeseriesHolder +{ + CommandTimeseries byId(); + CommandTimeseries byExecuteAt(); + + Timestamp max(); +} diff --git a/accord-core/src/main/java/accord/impl/CommandsForKey.java b/accord-core/src/main/java/accord/impl/CommandsForKey.java index a85ac11413..97142d3abb 100644 --- a/accord-core/src/main/java/accord/impl/CommandsForKey.java +++ b/accord-core/src/main/java/accord/impl/CommandsForKey.java @@ -19,24 +19,19 @@ package accord.impl; import accord.api.Key; +import accord.impl.CommandTimeseries.CommandLoader; import accord.local.*; import accord.primitives.*; import com.google.common.collect.ImmutableSortedMap; -import javax.annotation.Nullable; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; -import java.util.function.Predicate; -import java.util.stream.Stream; -import static accord.local.SafeCommandStore.TestDep.ANY_DEPS; -import static accord.local.SafeCommandStore.TestDep.WITH; import static accord.local.Status.PreAccepted; import static accord.local.Status.PreCommitted; -import static accord.utils.Utils.*; -public class CommandsForKey +public class CommandsForKey implements CommandTimeseriesHolder { public static class SerializerSupport { @@ -55,179 +50,6 @@ public static CommandsForKey create(Key key, Timestamp max, } } - public interface CommandLoader - { - D saveForCFK(Command command); - - TxnId txnId(D data); - Timestamp executeAt(D data); - SaveStatus saveStatus(D data); - List depsIds(D data); - - default Status status(D data) - { - return saveStatus(data).status; - } - - default Status.Known known(D data) - { - return saveStatus(data).known; - } - } - - public static class CommandTimeseries - { - public enum TestTimestamp {BEFORE, AFTER} - - private final Key key; - protected final CommandLoader loader; - public final ImmutableSortedMap commands; - - public CommandTimeseries(Update builder) - { - this.key = builder.key; - this.loader = builder.loader; - this.commands = ensureSortedImmutable(builder.commands); - } - - CommandTimeseries(Key key, CommandLoader loader, ImmutableSortedMap commands) - { - this.key = key; - this.loader = loader; - this.commands = commands; - } - - public CommandTimeseries(Key key, CommandLoader loader) - { - this(key, loader, ImmutableSortedMap.of()); - } - - @Override - public boolean equals(Object o) - { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - CommandTimeseries that = (CommandTimeseries) o; - return key.equals(that.key) && loader.equals(that.loader) && commands.equals(that.commands); - } - - @Override - public int hashCode() - { - int hash = 1; - hash = 31 * hash + Objects.hashCode(key); - hash = 31 * hash + Objects.hashCode(loader); - hash = 31 * hash + Objects.hashCode(commands); - return hash; - } - - public D get(Timestamp key) - { - return commands.get(key); - } - - public boolean isEmpty() - { - return commands.isEmpty(); - } - - /** - * All commands before/after (exclusive of) the given timestamp - *

- * Note that {@code testDep} applies only to commands that know at least proposed deps; if specified any - * commands that do not know any deps will be ignored. - *

- * TODO (expected, efficiency): TestDep should be asynchronous; data should not be kept memory-resident as only used for recovery - */ - public T mapReduce(SafeCommandStore.TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp, - SafeCommandStore.TestDep testDep, @Nullable TxnId depId, - @Nullable Status minStatus, @Nullable Status maxStatus, - SafeCommandStore.CommandFunction map, T initialValue, T terminalValue) - { - - for (D data : (testTimestamp == TestTimestamp.BEFORE ? commands.headMap(timestamp, false) : commands.tailMap(timestamp, false)).values()) - { - TxnId txnId = loader.txnId(data); - Timestamp executeAt = loader.executeAt(data); - SaveStatus status = loader.saveStatus(data); - List deps = loader.depsIds(data); - if (!testKind.test(txnId.rw())) continue; - // If we don't have any dependencies, we treat a dependency filter as a mismatch - if (testDep != ANY_DEPS && (!status.known.deps.hasProposedOrDecidedDeps() || (deps.contains(depId) != (testDep == WITH)))) - continue; - if (minStatus != null && minStatus.compareTo(status.status) > 0) - continue; - if (maxStatus != null && maxStatus.compareTo(status.status) < 0) - continue; - initialValue = map.apply(key, txnId, executeAt, initialValue); - if (initialValue.equals(terminalValue)) - break; - } - return initialValue; - } - - Stream between(Timestamp min, Timestamp max, Predicate statusPredicate) - { - return commands.subMap(min, true, max, true).values().stream() - .filter(d -> statusPredicate.test(loader.status(d))).map(loader::txnId); - } - - public Stream all() - { - return commands.values().stream(); - } - - Update beginUpdate() - { - return new Update<>(this); - } - - public CommandLoader loader() - { - return loader; - } - - public static class Update - { - private final Key key; - protected CommandLoader loader; - protected NavigableMap commands; - - public Update(Key key, CommandLoader loader) - { - this.key = key; - this.loader = loader; - this.commands = new TreeMap<>(); - } - - public Update(CommandTimeseries timeseries) - { - this.key = timeseries.key; - this.loader = timeseries.loader; - this.commands = timeseries.commands; - } - - public CommandsForKey.CommandTimeseries.Update add(Timestamp timestamp, Command command) - { - commands = ensureSortedMutable(commands); - commands.put(timestamp, loader.saveForCFK(command)); - return this; - } - - public CommandsForKey.CommandTimeseries.Update remove(Timestamp timestamp) - { - commands = ensureSortedMutable(commands); - commands.remove(timestamp); - return this; - } - - CommandTimeseries build() - { - return new CommandTimeseries<>(this); - } - } - } - public static class Listener implements Command.DurableAndIdempotentListener { protected final Key listenerKey; @@ -363,6 +185,7 @@ public Key key() return key; } + @Override public Timestamp max() { return max; @@ -383,11 +206,13 @@ public Timestamp lastWriteTimestamp() return lastWriteTimestamp; } + @Override public CommandTimeseries byId() { return byId; } + @Override public CommandTimeseries byExecuteAt() { return byExecuteAt; diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java index aefdf4eb4e..a380bdec93 100644 --- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java +++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java @@ -43,6 +43,7 @@ import accord.api.DataStore; import accord.api.Key; import accord.api.ProgressLog; +import accord.impl.CommandTimeseries.CommandLoader; import accord.local.Command; import accord.local.CommandStore; import accord.local.CommandStores.RangesForEpoch; @@ -109,6 +110,16 @@ public Agent agent() return agent; } + TreeMap historicalRangeCommands() + { + return historicalRangeCommands; + } + + TreeMap rangeCommands() + { + return rangeCommands; + } + public GlobalCommand ifPresent(TxnId txnId) { return commands.get(txnId); @@ -333,38 +344,6 @@ private void forEach(Routable keyOrRange, Ranges slice, Consumer fo } } - @Override - protected void registerHistoricalTransactions(Deps deps) - { - Ranges allRanges = rangesForEpochHolder.get().all(); - deps.keyDeps.keys().forEach(allRanges, key -> { - SafeCommandsForKey cfk = commandsForKey(key).createSafeReference(); - deps.keyDeps.forEach(key, txnId -> { - // TODO (desired, efficiency): this can be made more efficient by batching by epoch - if (rangesForEpochHolder.get().coordinates(txnId).contains(key)) - return; // already coordinates, no need to replicate - if (!rangesForEpochHolder.get().allBefore(txnId.epoch()).contains(key)) - return; - - cfk.registerNotWitnessed(txnId); - }); - - }); - deps.rangeDeps.forEachUniqueTxnId(allRanges, txnId -> { - - if (rangeCommands.containsKey(txnId)) - return; - - Ranges ranges = deps.rangeDeps.ranges(txnId); - if (rangesForEpochHolder.get().coordinates(txnId).intersects(ranges)) - return; // already coordinates, no need to replicate - if (!rangesForEpochHolder.get().allBefore(txnId.epoch()).intersects(ranges)) - return; - - historicalRangeCommands.merge(txnId, ranges.slice(allRanges), Ranges::with); - }); - } - protected InMemorySafeStore createSafeStore(PreLoadContext context, RangesForEpoch ranges, Map commands, Map commandsForKeys) { return new InMemorySafeStore(this, cfkLoader, ranges, context, commands, commandsForKeys); @@ -482,7 +461,7 @@ void update(Ranges add) } } - class CFKLoader implements CommandsForKey.CommandLoader + class CFKLoader implements CommandLoader { private Command loadForCFK(TxnId data) { @@ -702,6 +681,41 @@ public Timestamp maxConflict(Seekables keysOrRanges, Ranges slice) return timestamp; } + @Override + public void registerHistoricalTransactions(Deps deps) + { + RangesForEpochHolder rangesForEpochHolder = commandStore.rangesForEpochHolder(); + Ranges allRanges = rangesForEpochHolder.get().all(); + deps.keyDeps.keys().forEach(allRanges, key -> { + SafeCommandsForKey cfk = commandsForKey(key); + deps.keyDeps.forEach(key, txnId -> { + // TODO (desired, efficiency): this can be made more efficient by batching by epoch + if (rangesForEpochHolder.get().coordinates(txnId).contains(key)) + return; // already coordinates, no need to replicate + if (!rangesForEpochHolder.get().allBefore(txnId.epoch()).contains(key)) + return; + + cfk.registerNotWitnessed(txnId); + }); + + }); + TreeMap rangeCommands = commandStore.rangeCommands(); + TreeMap historicalRangeCommands = commandStore.historicalRangeCommands(); + deps.rangeDeps.forEachUniqueTxnId(allRanges, txnId -> { + + if (rangeCommands.containsKey(txnId)) + return; + + Ranges ranges = deps.rangeDeps.ranges(txnId); + if (rangesForEpochHolder.get().coordinates(txnId).intersects(ranges)) + return; // already coordinates, no need to replicate + if (!rangesForEpochHolder.get().allBefore(txnId.epoch()).intersects(ranges)) + return; + + historicalRangeCommands.merge(txnId, ranges.slice(allRanges), Ranges::with); + }); + } + public Timestamp maxApplied(Seekables keysOrRanges, Ranges slice) { Seekables sliced = keysOrRanges.slice(slice, Minimal); @@ -725,7 +739,7 @@ public NodeTimeService time() public T mapReduce(Seekables keysOrRanges, Ranges slice, TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp, TestDep testDep, @Nullable TxnId depId, @Nullable Status minStatus, @Nullable Status maxStatus, CommandFunction map, T accumulate, T terminalValue) { accumulate = commandStore.mapReduceForKey(this, keysOrRanges, slice, (forKey, prev) -> { - CommandsForKey.CommandTimeseries timeseries; + CommandTimeseries timeseries; switch (testTimestamp) { default: throw new AssertionError(); @@ -737,17 +751,17 @@ public T mapReduce(Seekables keysOrRanges, Ranges slice, TestKind test case MAY_EXECUTE_BEFORE: timeseries = forKey.byExecuteAt(); } - CommandsForKey.CommandTimeseries.TestTimestamp remapTestTimestamp; + CommandTimeseries.TestTimestamp remapTestTimestamp; switch (testTimestamp) { default: throw new AssertionError(); case STARTED_AFTER: case EXECUTES_AFTER: - remapTestTimestamp = CommandsForKey.CommandTimeseries.TestTimestamp.AFTER; + remapTestTimestamp = CommandTimeseries.TestTimestamp.AFTER; break; case STARTED_BEFORE: case MAY_EXECUTE_BEFORE: - remapTestTimestamp = CommandsForKey.CommandTimeseries.TestTimestamp.BEFORE; + remapTestTimestamp = CommandTimeseries.TestTimestamp.BEFORE; } return timeseries.mapReduce(testKind, remapTestTimestamp, timestamp, testDep, depId, minStatus, maxStatus, map, prev, terminalValue); }, accumulate, terminalValue); @@ -866,7 +880,8 @@ public CommonAttributes completeRegistration(Seekable keyOrRange, Ranges slice, return commandStore.register(this, keyOrRange, slice, command, attrs); } - public CommandsForKey.CommandLoader cfkLoader() + @Override + public CommandLoader cfkLoader() { return cfkLoader; } diff --git a/accord-core/src/main/java/accord/impl/SafeCommandsForKey.java b/accord-core/src/main/java/accord/impl/SafeCommandsForKey.java index 527c12d1f9..6ba8d90087 100644 --- a/accord-core/src/main/java/accord/impl/SafeCommandsForKey.java +++ b/accord-core/src/main/java/accord/impl/SafeCommandsForKey.java @@ -20,8 +20,7 @@ import accord.api.Key; import accord.api.VisibleForImplementation; -import accord.impl.CommandsForKey.CommandLoader; -import accord.impl.CommandsForKey.CommandTimeseries; +import accord.impl.CommandTimeseries.CommandLoader; import accord.local.Command; import accord.primitives.Timestamp; import accord.primitives.TxnId; diff --git a/accord-core/src/main/java/accord/local/Bootstrap.java b/accord-core/src/main/java/accord/local/Bootstrap.java index 9fdd20447c..b66a0d9ab0 100644 --- a/accord-core/src/main/java/accord/local/Bootstrap.java +++ b/accord-core/src/main/java/accord/local/Bootstrap.java @@ -134,6 +134,8 @@ void start(SafeCommandStore safeStore0) Ranges commitRanges = valid; store.markBootstrapping(safeStore0, globalSyncId, valid); CoordinateSyncPoint.coordinate(node, globalSyncId, commitRanges) + // TODO (correcness) : PreLoadContext only works with Seekables, which doesn't allow mixing Keys and Ranges... But Deps has both Keys AND Ranges! + // ATM all known implementations store ranges in-memory, but this will not be true soon, so this will need to be addressed .flatMap(syncPoint -> node.withEpoch(epoch, () -> store.submit(contextFor(localSyncId, syncPoint.waitFor.keyDeps.keys()), safeStore1 -> { if (valid.isEmpty()) // we've lost ownership of the range return AsyncResults.success(Ranges.EMPTY); @@ -141,7 +143,7 @@ void start(SafeCommandStore safeStore0) Commands.commitRecipientLocalSyncPoint(safeStore1, localSyncId, syncPoint, valid); // TODO (now): this should use a dedicated local id, distinct from the one we use to coordinate globally, as this may also be committed and applied locally // TODO (now): should we even be putting any partialDeps here? Doesn't seem like it, as they're handled on source nodes. - safeStore1.commandStore().registerHistoricalTransactions(syncPoint.waitFor); + safeStore1.registerHistoricalTransactions(syncPoint.waitFor); return fetch = safeStore1.dataStore().fetch(node, safeStore1, valid, syncPoint, this); }))) .flatMap(i -> i) diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index c9b5443257..37046e4225 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -115,6 +115,11 @@ public Agent agent() return agent; } + public RangesForEpochHolder rangesForEpochHolder() + { + return rangesForEpochHolder; + } + public abstract boolean inStore(); public abstract AsyncChain execute(PreLoadContext context, Consumer consumer); @@ -141,8 +146,6 @@ protected void setBootstrapBeganAt(NavigableMap newBootstrapBegan this.bootstrapBeganAt = newBootstrapBeganAt; } - protected abstract void registerHistoricalTransactions(Deps deps); - /** * This method may be invoked on a non-CommandStore thread */ @@ -304,8 +307,10 @@ private void fetchMajorityDeps(AsyncResults.SettableResult coordination, N } else { - execute(contextFor(null, deps.txnIds()), safeStore -> { - registerHistoricalTransactions(deps); + // TODO (correcness) : PreLoadContext only works with Seekables, which doesn't allow mixing Keys and Ranges... But Deps has both Keys AND Ranges! + // ATM all known implementations store ranges in-memory, but this will not be true soon, so this will need to be addressed + execute(contextFor(null, deps.txnIds(), deps.keyDeps.keys()), safeStore -> { + safeStore.registerHistoricalTransactions(deps); }).begin((success, fail2) -> { if (fail2 != null) fetchMajorityDeps(coordination, node, epoch, ranges); else coordination.setSuccess(null); diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index b8203a1fc0..c5dca54862 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -37,6 +37,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Predicate; @@ -325,6 +326,11 @@ static class TopologyUpdate } } + protected boolean shouldBootstrap(Node node, Topology local, Topology newLocalTopology, Range add) + { + return newLocalTopology.epoch() != 1; + } + private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Topology newTopology) { checkArgument(!newTopology.isSubset(), "Use full topology for CommandStores.updateTopology"); @@ -371,9 +377,12 @@ private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Top RangesForEpochHolder rangesHolder = new RangesForEpochHolder(); ShardHolder shardHolder = new ShardHolder(supplier.create(nextId++, rangesHolder), rangesHolder); rangesHolder.current = new RangesForEpoch(epoch, add, shardHolder.store); - // the first epoch we assume is either empty, or correctly initialised by whatever system is migrating - if (epoch == 1) bootstrapUpdates.add(() -> shardHolder.store.initialise(epoch, add)); - else bootstrapUpdates.add(shardHolder.store.bootstrapper(node, add, newLocalTopology.epoch())); + + Map partitioned = add.partitioningBy(range -> shouldBootstrap(node, prev.local, newLocalTopology, range)); + if (partitioned.containsKey(true)) + bootstrapUpdates.add(shardHolder.store.bootstrapper(node, partitioned.get(true), newLocalTopology.epoch())); + if (partitioned.containsKey(false)) + bootstrapUpdates.add(() -> shardHolder.store.initialise(epoch, partitioned.get(false))); result.add(shardHolder); } } @@ -554,6 +563,18 @@ public CommandStore forId(int id) return snapshot.byId.get(id); } + public int[] ids() + { + Snapshot snapshot = current; + Int2ObjectHashMap.KeySet set = snapshot.byId.keySet(); + int[] ids = new int[set.size()]; + int idx = 0; + for (int a : set) + ids[idx++] = a; + Arrays.sort(ids); + return ids; + } + public int count() { return current.shards.length; diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java b/accord-core/src/main/java/accord/local/SafeCommandStore.java index dd2f799580..93532e851a 100644 --- a/accord-core/src/main/java/accord/local/SafeCommandStore.java +++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java @@ -23,6 +23,7 @@ import accord.api.Agent; import accord.api.DataStore; import accord.api.ProgressLog; +import accord.primitives.Deps; import accord.primitives.Keys; import accord.primitives.Ranges; import accord.primitives.Seekable; @@ -155,6 +156,7 @@ T mapReduce(Seekables keys, Ranges slice, NodeTimeService time(); CommandStores.RangesForEpoch ranges(); Timestamp maxConflict(Seekables keys, Ranges slice); + void registerHistoricalTransactions(Deps deps); default long latestEpoch() { diff --git a/accord-core/src/main/java/accord/local/SaveStatus.java b/accord-core/src/main/java/accord/local/SaveStatus.java index 1b2f5a313b..ec0b4eb956 100644 --- a/accord-core/src/main/java/accord/local/SaveStatus.java +++ b/accord-core/src/main/java/accord/local/SaveStatus.java @@ -74,6 +74,18 @@ public boolean hasBeen(Status status) return this.status.compareTo(status) >= 0; } + public boolean isComplete() + { + switch (this) + { + case Applied: + case Invalidated: + return true; + default: + return false; + } + } + // TODO (expected, testing): exhaustive testing, particularly around PreCommitted public static SaveStatus get(Status status, Known known) { diff --git a/accord-core/src/main/java/accord/primitives/Ranges.java b/accord-core/src/main/java/accord/primitives/Ranges.java index 9a56031173..4badc91138 100644 --- a/accord-core/src/main/java/accord/primitives/Ranges.java +++ b/accord-core/src/main/java/accord/primitives/Ranges.java @@ -22,8 +22,15 @@ import accord.utils.ArrayBuffers.ObjectBuffers; import javax.annotation.Nonnull; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; import java.util.stream.Stream; +import com.google.common.collect.ImmutableMap; + import static accord.primitives.AbstractRanges.UnionMode.MERGE_OVERLAPPING; import static accord.primitives.Routables.Slice.Overlapping; import static accord.utils.ArrayBuffers.cachedRanges; @@ -227,4 +234,17 @@ else if (count == result.length) return construct(cachedRanges.completeAndDiscard(result, count)); } + public Map partitioningBy(Predicate test) + { + if (isEmpty()) + return Collections.emptyMap(); + List trues = new ArrayList<>(); + List falses = new ArrayList<>(); + for (Range range : this) + (test.test(range) ? trues : falses).add(range); + if (trues.isEmpty()) return ImmutableMap.of(Boolean.FALSE, this); + if (falses.isEmpty()) return ImmutableMap.of(Boolean.TRUE, this); + return ImmutableMap.of(Boolean.TRUE, Ranges.ofSortedAndDeoverlapped(trues.toArray(new Range[0])), + Boolean.FALSE, Ranges.ofSortedAndDeoverlapped(falses.toArray(new Range[0]))); + } } diff --git a/accord-core/src/main/java/accord/primitives/Timestamp.java b/accord-core/src/main/java/accord/primitives/Timestamp.java index 13bc2a5536..1a5a44e4c3 100644 --- a/accord-core/src/main/java/accord/primitives/Timestamp.java +++ b/accord-core/src/main/java/accord/primitives/Timestamp.java @@ -345,4 +345,14 @@ public String toString() { return "[" + epoch() + ',' + hlc() + ',' + flags() + ',' + node + ']'; } + + public static Timestamp fromString(String string) + { + String[] split = string.replaceFirst("\\[", "").replaceFirst("\\]", "").split(","); + assert split.length == 4; + return Timestamp.fromValues(Long.parseLong(split[0]), + Long.parseLong(split[1]), + Integer.parseInt(split[2]), + new Id(Integer.parseInt(split[3]))); + } } diff --git a/accord-core/src/main/java/accord/topology/Topologies.java b/accord-core/src/main/java/accord/topology/Topologies.java index fef9282951..5832cce003 100644 --- a/accord-core/src/main/java/accord/topology/Topologies.java +++ b/accord-core/src/main/java/accord/topology/Topologies.java @@ -51,6 +51,11 @@ default long currentEpoch() int size(); + default boolean isEmpty() + { + return size() == 0; + } + int totalShards(); boolean contains(Id to); diff --git a/accord-core/src/main/java/accord/topology/Topology.java b/accord-core/src/main/java/accord/topology/Topology.java index f0c5e79bed..728553c12d 100644 --- a/accord-core/src/main/java/accord/topology/Topology.java +++ b/accord-core/src/main/java/accord/topology/Topology.java @@ -193,24 +193,24 @@ public int indexForKey(RoutingKey key) return Arrays.binarySearch(supersetIndexes, i); } - public Topology forSelection(Unseekables select) + public Topology forSelection(Unseekables select, OnUnknown onUnknown) { - return forSelection(select, (ignore, index) -> true, null); + return forSelection(select, onUnknown, (ignore, index) -> true, null); } - public Topology forSelection(Unseekables select, IndexedPredicate predicate, P1 param) + public Topology forSelection(Unseekables select, OnUnknown onUnknown, IndexedPredicate predicate, P1 param) { - return forSubset(subsetFor(select, predicate, param)); + return forSubset(subsetFor(select, predicate, param, onUnknown)); } - public Topology forSelection(Unseekables select, Collection nodes) + public Topology forSelection(Unseekables select, OnUnknown onUnknown, Collection nodes) { - return forSelection(select, nodes, (ignore, index) -> true, null); + return forSelection(select, onUnknown, nodes, (ignore, index) -> true, null); } - public Topology forSelection(Unseekables select, Collection nodes, IndexedPredicate predicate, P1 param) + public Topology forSelection(Unseekables select, OnUnknown onUnknown, Collection nodes, IndexedPredicate predicate, P1 param) { - return forSubset(subsetFor(select, predicate, param), nodes); + return forSubset(subsetFor(select, predicate, param, onUnknown), nodes); } private Topology forSubset(int[] newSubset) @@ -236,7 +236,9 @@ private Topology forSubset(int[] newSubset, Collection nodes) return new Topology(epoch, shards, ranges, nodeLookup, rangeSubset, newSubset); } - private int[] subsetFor(Unseekables select, IndexedPredicate predicate, P1 param) + public enum OnUnknown { REJECT, IGNORE } + + private int[] subsetFor(Unseekables select, IndexedPredicate predicate, P1 param, OnUnknown onUnknown) { int count = 0; IntBuffers cachedInts = ArrayBuffers.cachedInts(); @@ -258,16 +260,35 @@ private int[] subsetFor(Unseekables select, IndexedPredicate pred if (abi < 0) { if (ailim < as.size()) - throw new IllegalArgumentException("Range not found for " + as.get(ailim)); + { + switch (onUnknown) + { + case REJECT: throw new IllegalArgumentException("Range not found for " + as.get(ailim)); + case IGNORE: + break; + default: + throw new IllegalArgumentException("Unknown option: " + onUnknown); + } + } break; } ai = (int)abi; + boolean skip = false; if (ailim < ai) - throw new IllegalArgumentException("Range not found for " + as.get(ailim)); + { + switch (onUnknown) + { + default: + throw new IllegalArgumentException("Unknown option: " + onUnknown); + case REJECT: throw new IllegalArgumentException("Range not found for " + as.get(ailim)); + case IGNORE: + skip = true; + } + } bi = (int)(abi >>> 32); - if (predicate.test(param, bi)) + if (!skip && predicate.test(param, bi)) { if (count == newSubset.length) newSubset = cachedInts.resize(newSubset, count, count * 2); @@ -305,14 +326,14 @@ private int[] subsetFor(Unseekables select, IndexedPredicate pred return cachedInts.completeAndDiscard(newSubset, count); } - public void visitNodeForKeysOnceOrMore(Unseekables select, Consumer nodes) + public void visitNodeForKeysOnceOrMore(Unseekables select, OnUnknown onUnknown, Consumer nodes) { - visitNodeForKeysOnceOrMore(select, (i1, i2) -> true, null, nodes); + visitNodeForKeysOnceOrMore(select, onUnknown, (i1, i2) -> true, null, nodes); } - public void visitNodeForKeysOnceOrMore(Unseekables select, IndexedPredicate predicate, P1 param, Consumer nodes) + public void visitNodeForKeysOnceOrMore(Unseekables select, OnUnknown onUnknown, IndexedPredicate predicate, P1 param, Consumer nodes) { - for (int shardIndex : subsetFor(select, predicate, param)) + for (int shardIndex : subsetFor(select, predicate, param, onUnknown)) { Shard shard = shards[shardIndex]; for (Id id : shard.nodes) diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java index 0fcdc8fde1..aea8c535c7 100644 --- a/accord-core/src/main/java/accord/topology/TopologyManager.java +++ b/accord-core/src/main/java/accord/topology/TopologyManager.java @@ -230,16 +230,20 @@ public synchronized void onTopologyUpdate(Topology topology) { Epochs current = epochs; - checkArgument(topology.epoch == current.nextEpoch()); + checkArgument(topology.epoch == current.nextEpoch(), "Expected topology update %d to be %d", topology.epoch, current.nextEpoch()); EpochState[] nextEpochs = new EpochState[current.epochs.length + 1]; List> pendingSync = new ArrayList<>(current.pendingSyncComplete); Set alreadySyncd = Collections.emptySet(); if (!pendingSync.isEmpty()) { - EpochState currentEpoch = current.epochs[0]; - if (current.epochs[0].syncComplete()) - currentEpoch.markPrevSynced(); - alreadySyncd = pendingSync.remove(0); + // if empty, then notified about an epoch from a peer before first epoch seen + if (current.epochs.length != 0) + { + EpochState currentEpoch = current.epochs[0]; + if (currentEpoch.syncComplete()) + currentEpoch.markPrevSynced(); + alreadySyncd = pendingSync.remove(0); + } } System.arraycopy(current.epochs, 0, nextEpochs, 1, current.epochs.length); @@ -308,15 +312,15 @@ public Topologies withUnsyncedEpochs(Unseekables select, Timestamp min, Ti public Topologies withUnsyncedEpochs(Unseekables select, long minEpoch, long maxEpoch) { - Invariants.checkArgument(minEpoch <= maxEpoch); + Invariants.checkArgument(minEpoch <= maxEpoch, "min epoch %d > max %d", minEpoch, maxEpoch); Epochs snapshot = epochs; if (maxEpoch == Long.MAX_VALUE) maxEpoch = snapshot.currentEpoch; - else Invariants.checkState(snapshot.currentEpoch >= maxEpoch); + else Invariants.checkState(snapshot.currentEpoch >= maxEpoch, "current epoch %d < max %d", snapshot.currentEpoch, maxEpoch); EpochState maxEpochState = nonNull(snapshot.get(maxEpoch)); if (minEpoch == maxEpoch && maxEpochState.syncCompleteFor(select)) - return new Single(sorter, maxEpochState.global.forSelection(select)); + return new Single(sorter, maxEpochState.global.forSelection(select, Topology.OnUnknown.REJECT)); int start = (int)(snapshot.currentEpoch - maxEpoch); int limit = (int)(Math.min(1 + snapshot.currentEpoch - minEpoch, snapshot.epochs.length)); @@ -334,20 +338,22 @@ public Topologies withUnsyncedEpochs(Unseekables select, long minEpoch, lo { EpochState epochState = snapshot.epochs[i]; if (epochState.epoch() < minEpoch) - epochState.global.visitNodeForKeysOnceOrMore(select, EpochState::shardIsUnsynced, epochState, nodes::add); + epochState.global.visitNodeForKeysOnceOrMore(select, Topology.OnUnknown.IGNORE, EpochState::shardIsUnsynced, epochState, nodes::add); else - epochState.global.visitNodeForKeysOnceOrMore(select, nodes::add); + epochState.global.visitNodeForKeysOnceOrMore(select, Topology.OnUnknown.IGNORE, nodes::add); } + Invariants.checkState(!nodes.isEmpty(), "Unable to find an epoch that contained %s", select); Topologies.Multi topologies = new Topologies.Multi(sorter, count); for (int i = start; i < limit ; ++i) { EpochState epochState = snapshot.epochs[i]; if (epochState.epoch() < minEpoch) - topologies.add(epochState.global.forSelection(select, nodes, EpochState::shardIsUnsynced, epochState)); + topologies.add(epochState.global.forSelection(select, Topology.OnUnknown.IGNORE, nodes, EpochState::shardIsUnsynced, epochState)); else - topologies.add(epochState.global.forSelection(select, nodes, (ignore, idx) -> true, null)); + topologies.add(epochState.global.forSelection(select, Topology.OnUnknown.IGNORE, nodes, (ignore, idx) -> true, null)); } + Invariants.checkState(!topologies.isEmpty(), "Unable to find an epoch that contained %s", select); return topologies; } @@ -357,16 +363,18 @@ public Topologies preciseEpochs(Unseekables keys, long minEpoch, long maxE Epochs snapshot = epochs; if (minEpoch == maxEpoch) - return new Single(sorter, snapshot.get(minEpoch).global.forSelection(keys)); + return new Single(sorter, snapshot.get(minEpoch).global.forSelection(keys, Topology.OnUnknown.REJECT)); Set nodes = new LinkedHashSet<>(); int count = (int)(1 + maxEpoch - minEpoch); for (int i = count - 1 ; i >= 0 ; --i) - snapshot.get(minEpoch + i).global().visitNodeForKeysOnceOrMore(keys, nodes::add); + snapshot.get(minEpoch + i).global().visitNodeForKeysOnceOrMore(keys, Topology.OnUnknown.IGNORE, nodes::add); + Invariants.checkState(!nodes.isEmpty(), "Unable to find an epoch that contained %s", keys); Topologies.Multi topologies = new Topologies.Multi(sorter, count); for (int i = count - 1 ; i >= 0 ; --i) - topologies.add(snapshot.get(minEpoch + i).global.forSelection(keys, nodes)); + topologies.add(snapshot.get(minEpoch + i).global.forSelection(keys, Topology.OnUnknown.IGNORE, nodes)); + Invariants.checkState(!topologies.isEmpty(), "Unable to find an epoch that contained %s", keys); return topologies; } @@ -374,7 +382,7 @@ public Topologies preciseEpochs(Unseekables keys, long minEpoch, long maxE public Topologies forEpoch(Unseekables select, long epoch) { EpochState state = epochs.get(epoch); - return new Single(sorter, state.global.forSelection(select)); + return new Single(sorter, state.global.forSelection(select, Topology.OnUnknown.REJECT)); } public Shard forEpochIfKnown(RoutingKey key, long epoch) diff --git a/accord-core/src/main/java/accord/utils/async/AsyncChains.java b/accord-core/src/main/java/accord/utils/async/AsyncChains.java index 809ca1c76b..4c3e7eb47d 100644 --- a/accord-core/src/main/java/accord/utils/async/AsyncChains.java +++ b/accord-core/src/main/java/accord/utils/async/AsyncChains.java @@ -462,7 +462,7 @@ public static AsyncChain> all(List public static AsyncChain reduce(List> chains, BiFunction reducer) { - Invariants.checkArgument(!chains.isEmpty()); + Invariants.checkArgument(!chains.isEmpty(), "List of chains is empty"); if (chains.size() == 1) return (AsyncChain) chains.get(0); if (Reduce.canAppendTo(chains.get(0), reducer)) diff --git a/accord-core/src/main/java/accord/utils/async/Observable.java b/accord-core/src/main/java/accord/utils/async/Observable.java new file mode 100644 index 0000000000..c87f12e701 --- /dev/null +++ b/accord-core/src/main/java/accord/utils/async/Observable.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.utils.async; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collector; +import java.util.stream.Collectors; + +/** + * Stream like interface that is "pushed" results (to the {@link #onNext(Object)} method). This interface is similar, + * yet different from {@link AsyncChain} as that type works with a single element, whereas this type works with 0-n. + */ +public interface Observable +{ + void onNext(T value) throws Exception; + + void onError(Throwable t); + + void onCompleted(); + + default Observable map(Function mapper) + { + Observable self = this; + // since this project still targets jdk8, can't create private classes, so to avoid adding types to the public api, + // use ananomus classes. + return new Observable() + { + @Override + public void onNext(R value) throws Exception + { + self.onNext(mapper.apply(value)); + } + + @Override + public void onError(Throwable t) + { + self.onError(t); + } + + @Override + public void onCompleted() + { + self.onCompleted(); + } + }; + } + + static Observable distinct(Observable callback) + { + return new Observable() + { + Set keys = new HashSet<>(); + + @Override + public void onNext(T value) throws Exception + { + if (keys.add(value)) + callback.onNext(value); + } + + @Override + public void onError(Throwable t) + { + keys.clear(); + keys = null; + callback.onError(t); + } + + @Override + public void onCompleted() + { + keys.clear(); + keys = null; + callback.onCompleted(); + } + }; + } + + static Observable forCallback(BiConsumer, Throwable> callback) + { + return forCallback(callback, Collectors.toList()); + } + + static Observable forCallback(BiConsumer callback, + Collector collector) + { + return new Observable() + { + Accumulator values = collector.supplier().get(); + + @Override + public void onNext(T value) + { + collector.accumulator().accept(values, value); + } + + @Override + public void onError(Throwable t) + { + values = null; + callback.accept(null, t); + } + + @Override + public void onCompleted() + { + Result result = collector.finisher().apply(this.values); + this.values = null; + callback.accept(result, null); + } + }; + } + + static AsyncChain> asChain(Consumer> work) + { + return asChain(work, Function.identity(), Collectors.toList()); + } + + static AsyncChain asChain(Consumer> work, + Collector collector) + { + return asChain(work, Function.identity(), collector); + } + + static AsyncChain asChain(Consumer> work, + Function mapper, + Collector collector) + { + return new AsyncChains.Head() + { + @Override + protected void start(BiConsumer callback) + { + work.accept(forCallback(callback, collector).map(mapper)); + } + }; + } +} diff --git a/accord-core/src/test/java/accord/impl/mock/EpochSync.java b/accord-core/src/test/java/accord/impl/mock/EpochSync.java index 422c400c11..0b4e1a503a 100644 --- a/accord-core/src/test/java/accord/impl/mock/EpochSync.java +++ b/accord-core/src/test/java/accord/impl/mock/EpochSync.java @@ -102,7 +102,7 @@ private static class CommandSync extends AsyncResults.SettableResult imple public CommandSync(Node node, Route route, SyncCommitted message, Topology topology) { - this.tracker = new QuorumTracker(new Single(node.topology().sorter(), topology.forSelection(route))); + this.tracker = new QuorumTracker(new Single(node.topology().sorter(), topology.forSelection(route, Topology.OnUnknown.REJECT))); node.send(tracker.nodes(), message, this); } diff --git a/accord-core/src/test/java/accord/messages/PreAcceptTest.java b/accord-core/src/test/java/accord/messages/PreAcceptTest.java index 3bb0e4884a..b3a6994e88 100644 --- a/accord-core/src/test/java/accord/messages/PreAcceptTest.java +++ b/accord-core/src/test/java/accord/messages/PreAcceptTest.java @@ -20,8 +20,8 @@ import accord.api.RoutingKey; import accord.impl.*; -import accord.impl.CommandsForKey.CommandLoader; -import accord.impl.CommandsForKey.CommandTimeseries; +import accord.impl.CommandTimeseries.CommandLoader; +import accord.impl.CommandTimeseries; import accord.impl.IntKey.Raw; import accord.impl.mock.*; import accord.local.Node; diff --git a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java index 3ab88af087..ec85d1576d 100644 --- a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java +++ b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java @@ -162,12 +162,12 @@ void forKeys() Assertions.assertFalse(service.getEpochStateUnsafe(2).syncComplete()); RoutingKeys keys = keys(150).toUnseekables(); - Assertions.assertEquals(topologies(topology2.forSelection(keys), topology1.forSelection(keys)), + Assertions.assertEquals(topologies(topology2.forSelection(keys, Topology.OnUnknown.REJECT), topology1.forSelection(keys, Topology.OnUnknown.REJECT)), service.withUnsyncedEpochs(keys, 2, 2)); service.onEpochSyncComplete(id(2), 2); service.onEpochSyncComplete(id(3), 2); - Assertions.assertEquals(topologies(topology2.forSelection(keys)), + Assertions.assertEquals(topologies(topology2.forSelection(keys, Topology.OnUnknown.REJECT)), service.withUnsyncedEpochs(keys, 2, 2)); } diff --git a/accord-core/src/test/java/accord/topology/TopologyTest.java b/accord-core/src/test/java/accord/topology/TopologyTest.java index 6631c15bac..24b663e185 100644 --- a/accord-core/src/test/java/accord/topology/TopologyTest.java +++ b/accord-core/src/test/java/accord/topology/TopologyTest.java @@ -44,7 +44,7 @@ private static void assertRangeForKey(Topology topology, int key, int start, int Assertions.assertTrue(shard.range.contains(expectedKey)); Assertions.assertEquals(expectedRange, shard.range); - Topology subTopology = topology.forSelection(Keys.of(expectedKey).toUnseekables()); + Topology subTopology = topology.forSelection(Keys.of(expectedKey).toUnseekables(), Topology.OnUnknown.REJECT); shard = Iterables.getOnlyElement(subTopology.shards()); Assertions.assertTrue(shard.range.contains(expectedKey)); Assertions.assertEquals(expectedRange, shard.range); diff --git a/accord-maelstrom/src/test/java/accord/maelstrom/Runner.java b/accord-maelstrom/src/test/java/accord/maelstrom/Runner.java index 70b120b76e..ab0053d84c 100644 --- a/accord-maelstrom/src/test/java/accord/maelstrom/Runner.java +++ b/accord-maelstrom/src/test/java/accord/maelstrom/Runner.java @@ -185,7 +185,14 @@ void run(String ... commands) throws IOException { logger.info("Seed {}; rerun with -D{}={}", seed, key, seed); RandomSource randomSource = new DefaultRandom(seed); - Runner.run(nodeCount, new StandardQueue.Factory(randomSource), randomSource::fork, factory, commands); + try + { + Runner.run(nodeCount, new StandardQueue.Factory(randomSource), randomSource::fork, factory, commands); + } + catch (Throwable t) + { + throw new AssertionError("Failure for seed " + seed, t); + } } } } diff --git a/accord-maelstrom/src/test/java/accord/maelstrom/SimpleRandomTest.java b/accord-maelstrom/src/test/java/accord/maelstrom/SimpleRandomTest.java index e7772f757e..a65fd23480 100644 --- a/accord-maelstrom/src/test/java/accord/maelstrom/SimpleRandomTest.java +++ b/accord-maelstrom/src/test/java/accord/maelstrom/SimpleRandomTest.java @@ -24,6 +24,7 @@ import static accord.maelstrom.Runner.test; +// TODO (correctness) : if you run the tests with the same seed, you get different outcomes... this makes it hard to rerun a failure found from CI public class SimpleRandomTest { @Test