From d2c1674067d2854338657743e9547508df3824a1 Mon Sep 17 00:00:00 2001 From: Benedict Elliott Smith Date: Fri, 13 Jan 2023 07:59:31 +0000 Subject: [PATCH] Refactor to permit asynchronous calculateDeps, so range transactions can be implemented by e.g. scanning sstables --- .../main/java/accord/impl/CommandsForKey.java | 4 +- .../accord/impl/InMemoryCommandStore.java | 195 +++++++++++++----- .../accord/impl/InMemoryCommandStores.java | 7 +- .../accord/impl/InMemoryCommandsForKey.java | 4 +- .../java/accord/local/AsyncCommandStores.java | 62 +++++- .../src/main/java/accord/local/Command.java | 3 + .../main/java/accord/local/CommandStores.java | 52 +++-- .../src/main/java/accord/local/Node.java | 8 +- .../java/accord/local/PreLoadContext.java | 7 +- .../java/accord/local/SafeCommandStore.java | 47 ++++- .../java/accord/local/SyncCommandStores.java | 70 ++++++- .../accord/messages/AbstractEpochRequest.java | 6 - .../src/main/java/accord/messages/Accept.java | 31 ++- .../src/main/java/accord/messages/Apply.java | 9 +- .../accord/messages/BeginInvalidation.java | 6 - .../java/accord/messages/BeginRecovery.java | 135 +++++++----- .../java/accord/messages/CheckStatus.java | 6 - .../src/main/java/accord/messages/Commit.java | 15 +- .../src/main/java/accord/messages/Defer.java | 6 +- .../main/java/accord/messages/GetDeps.java | 12 +- .../java/accord/messages/InformDurable.java | 9 +- .../main/java/accord/messages/PreAccept.java | 51 +++-- .../main/java/accord/messages/ReadData.java | 6 - .../main/java/accord/messages/TxnRequest.java | 4 +- .../java/accord/primitives/AbstractKeys.java | 7 + .../java/accord/primitives/PartialDeps.java | 1 + .../java/accord/primitives/Routables.java | 1 + .../java/accord/utils/AsyncMapReduce.java | 12 ++ .../accord/utils/AsyncMapReduceConsume.java | 6 + .../src/main/java/accord/utils/MapReduce.java | 2 +- .../java/accord/utils/MapReduceConsume.java | 2 +- .../src/main/java/accord/utils/Reduce.java | 6 + .../main/java/accord/utils/ReduceConsume.java | 8 + .../java/accord/utils/ReducingFuture.java | 8 +- .../java/accord/utils/RelationMultiMap.java | 17 +- .../src/test/java/accord/burn/BurnTest.java | 2 +- 36 files changed, 560 insertions(+), 267 deletions(-) create mode 100644 accord-core/src/main/java/accord/utils/AsyncMapReduce.java create mode 100644 accord-core/src/main/java/accord/utils/AsyncMapReduceConsume.java create mode 100644 accord-core/src/main/java/accord/utils/Reduce.java create mode 100644 accord-core/src/main/java/accord/utils/ReduceConsume.java diff --git a/accord-core/src/main/java/accord/impl/CommandsForKey.java b/accord-core/src/main/java/accord/impl/CommandsForKey.java index 91a77ca0b4..bc70f5ff24 100644 --- a/accord-core/src/main/java/accord/impl/CommandsForKey.java +++ b/accord-core/src/main/java/accord/impl/CommandsForKey.java @@ -20,7 +20,7 @@ import accord.api.Key; import accord.local.*; -import accord.local.SafeCommandStore.CommandFunction; +import accord.local.SafeCommandStore.SearchFunction; import accord.local.SafeCommandStore.TestDep; import accord.local.SafeCommandStore.TestKind; import accord.primitives.Keys; @@ -55,7 +55,7 @@ enum TestTimestamp { BEFORE, AFTER } T mapReduce(TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp, TestDep testDep, @Nullable TxnId depId, @Nullable Status minStatus, @Nullable Status maxStatus, - CommandFunction map, T initialValue, T terminalValue); + SearchFunction map, T initialValue, T terminalValue); } public abstract Key key(); diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java index 66a3439b8d..43a58f6030 100644 --- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java +++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java @@ -18,14 +18,21 @@ package accord.impl; -import accord.local.*; import accord.local.SyncCommandStores.SyncCommandStore; // java8 fails compilation if this is in correct position +import accord.local.CommandStore; +import accord.impl.InMemoryCommandStore.Synchronized.SynchronizedState; +import accord.impl.InMemoryCommandStore.SingleThread.AsyncState; +import accord.local.SafeCommandStore; +import accord.local.Command; +import accord.local.CommandListener; +import accord.local.NodeTimeService; +import accord.local.PreLoadContext; +import accord.local.Status; +import accord.local.SyncCommandStores; import accord.api.Agent; import accord.api.DataStore; import accord.api.Key; import accord.api.ProgressLog; -import accord.impl.InMemoryCommandStore.SingleThread.AsyncState; -import accord.impl.InMemoryCommandStore.Synchronized.SynchronizedState; import accord.local.CommandStores.RangesForEpochHolder; import accord.local.CommandStores.RangesForEpoch; import accord.impl.CommandsForKey.CommandTimeseries; @@ -35,7 +42,9 @@ import accord.utils.Invariants; import org.apache.cassandra.utils.concurrent.AsyncPromise; import org.apache.cassandra.utils.concurrent.Future; +import org.apache.cassandra.utils.concurrent.ImmediateFuture; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.*; import java.util.concurrent.ExecutorService; @@ -65,22 +74,25 @@ public static abstract class State implements SafeCommandStore private final NavigableMap commands = new TreeMap<>(); private final NavigableMap commandsForKey = new TreeMap<>(); // TODO (find library, efficiency): this is obviously super inefficient, need some range map - private final TreeMap rangeCommands = new TreeMap<>(); + private final TreeMap rangeCommands = new TreeMap<>(); - static class RangeCommand + static class RangeAndTxnId implements Comparable { - final Command command; - Ranges ranges; + final Range range; + final TxnId txnId; - RangeCommand(Command command) - { - this.command = command; + + RangeAndTxnId(Range range, TxnId txnId) { + this.range = range; + this.txnId = txnId; } - void update(Ranges add) + @Override + public int compareTo(@Nonnull RangeAndTxnId that) { - if (ranges == null) ranges = add; - else ranges = ranges.with(add); + int c = this.range.compare(that.range); + if (c == 0) c = this.txnId.compareTo(that.txnId); + return c; } } @@ -202,10 +214,10 @@ private Timestamp maxConflict(Seekables keysOrRanges, Ranges slice) { Timestamp timestamp = mapReduceForKey(keysOrRanges, slice, (forKey, prev) -> Timestamp.max(forKey.max(), prev), Timestamp.NONE, null); Seekables sliced = keysOrRanges.slice(slice, Minimal); - for (RangeCommand command : rangeCommands.values()) + for (Map.Entry e : rangeCommands.entrySet()) { - if (command.ranges.intersects(sliced)) - timestamp = Timestamp.max(timestamp, command.command.executeAt()); + if (sliced.intersects(e.getKey().range)) + timestamp = Timestamp.max(timestamp, e.getValue().executeAt()); } return timestamp; } @@ -249,7 +261,104 @@ public void forCommittedInEpoch(Ranges ranges, long epoch, Consumer con } @Override - public T mapReduce(Seekables keysOrRanges, Ranges slice, TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp, TestDep testDep, @Nullable TxnId depId, @Nullable Status minStatus, @Nullable Status maxStatus, CommandFunction map, T accumulate, T terminalValue) + public Future slowFold(Seekables keysOrRanges, Ranges slice, SlowSearchFunction fold, T accumulate, T terminalValue) + { + accumulate = mapReduceForKey(keysOrRanges, slice, (forKey, acc) -> + fold.apply(new SlowSearcher() { + @Override + public T2 fold(TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp, TestDep testDep, @Nullable TxnId depId, @Nullable Status minStatus, @Nullable Status maxStatus, SearchFunction apply, T2 acc, T2 term) + { + CommandTimeseries timeseries; + switch (testTimestamp) + { + default: throw new AssertionError(); + case STARTED_AFTER: + case STARTED_BEFORE: + timeseries = forKey.byId(); + break; + case EXECUTES_AFTER: + case MAY_EXECUTE_BEFORE: + timeseries = forKey.byExecuteAt(); + } + CommandTimeseries.TestTimestamp remapTestTimestamp; + switch (testTimestamp) + { + default: throw new AssertionError(); + case STARTED_AFTER: + case EXECUTES_AFTER: + remapTestTimestamp = CommandTimeseries.TestTimestamp.AFTER; + break; + case STARTED_BEFORE: + case MAY_EXECUTE_BEFORE: + remapTestTimestamp = CommandTimeseries.TestTimestamp.BEFORE; + } + return timeseries.mapReduce(testKind, remapTestTimestamp, timestamp, testDep, depId, minStatus, maxStatus, apply, acc, term); + } + }, forKey.key(), acc), accumulate, terminalValue); + + if (accumulate.equals(terminalValue)) + return ImmediateFuture.success(accumulate); + + Seekables sliced = keysOrRanges.slice(slice, Minimal); + for (Map.Entry e : rangeCommands.entrySet()) + { + if (!sliced.intersects(e.getKey().range)) + continue; + + accumulate = fold.apply(new SlowSearcher() + { + @Override + public T2 fold(TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp, TestDep testDep, @Nullable TxnId depId, @Nullable Status minStatus, @Nullable Status maxStatus, SearchFunction fold, T2 accumulate, T2 terminalValue) + { + Command command = e.getValue(); + switch (testTimestamp) + { + default: throw new AssertionError(); + case STARTED_AFTER: + if (command.txnId().compareTo(timestamp) < 0) return accumulate; + else break; + case STARTED_BEFORE: + if (command.txnId().compareTo(timestamp) > 0) return accumulate; + else break; + case EXECUTES_AFTER: + if (command.executeAt().compareTo(timestamp) < 0) return accumulate; + else break; + case MAY_EXECUTE_BEFORE: + Timestamp compareTo = command.known().executeAt.hasDecidedExecuteAt() ? command.executeAt() : command.txnId(); + if (compareTo.compareTo(timestamp) > 0) return accumulate; + else break; + } + + if (minStatus != null && command.status().compareTo(minStatus) < 0) + return accumulate; + + if (maxStatus != null && command.status().compareTo(maxStatus) > 0) + return accumulate; + + if (testKind == Ws && command.txnId().rw().isRead()) + return accumulate; + + if (testDep != ANY_DEPS) + { + if (!command.known().deps.hasProposedOrDecidedDeps()) + return accumulate; + + if ((testDep == WITH) == !command.partialDeps().contains(depId)) + return accumulate; + } + + return fold.apply(e.getKey().range, command.txnId(), command.executeAt(), accumulate); + } + }, e.getKey().range, accumulate); + + if (accumulate.equals(terminalValue)) + break; + } + return ImmediateFuture.success(accumulate); + } + + @Override + public Future fold(Seekables keysOrRanges, Ranges slice, TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp, @Nullable Status minStatus, @Nullable Status maxStatus, SearchFunction fold, T accumulate, T terminalValue) { accumulate = mapReduceForKey(keysOrRanges, slice, (forKey, prev) -> { CommandTimeseries timeseries; @@ -276,18 +385,20 @@ public T mapReduce(Seekables keysOrRanges, Ranges slice, TestKind test case MAY_EXECUTE_BEFORE: remapTestTimestamp = CommandTimeseries.TestTimestamp.BEFORE; } - return timeseries.mapReduce(testKind, remapTestTimestamp, timestamp, testDep, depId, minStatus, maxStatus, map, prev, terminalValue); + return timeseries.mapReduce(testKind, remapTestTimestamp, timestamp, ANY_DEPS, null, minStatus, maxStatus, fold, prev, terminalValue); }, accumulate, terminalValue); if (accumulate.equals(terminalValue)) - return accumulate; + return ImmediateFuture.success(accumulate); // TODO (find lib, efficiency): this is super inefficient, need to store Command in something queryable Seekables sliced = keysOrRanges.slice(slice, Minimal); - Map> collect = new TreeMap<>(Range::compare); - for (RangeCommand rangeCommand : rangeCommands.values()) + for (Map.Entry e : rangeCommands.entrySet()) { - Command command = rangeCommand.command; + if (!sliced.intersects(e.getKey().range)) + continue; + + Command command = e.getValue(); switch (testTimestamp) { default: throw new AssertionError(); @@ -315,34 +426,13 @@ public T mapReduce(Seekables keysOrRanges, Ranges slice, TestKind test if (testKind == Ws && command.txnId().rw().isRead()) continue; - if (testDep != ANY_DEPS) - { - if (!command.known().deps.hasProposedOrDecidedDeps()) - continue; - - if ((testDep == WITH) == !command.partialDeps().contains(depId)) - continue; - } - - if (!rangeCommand.ranges.intersects(sliced)) - continue; - - Routables.foldl(rangeCommand.ranges, sliced, (r, in, i) -> { - // TODO (easy, efficiency): pass command as a parameter to Fold - List list = in.computeIfAbsent(r, ignore -> new ArrayList<>()); - if (list.isEmpty() || list.get(list.size() - 1) != command) - list.add(command); - return in; - }, collect); - } - - for (Map.Entry> e : collect.entrySet()) - { - for (Command command : e.getValue()) - accumulate = map.apply(e.getKey(), command.txnId(), command.executeAt(), accumulate); + // TODO (easy, efficiency): pass command as a parameter to Fold + accumulate = fold.apply(e.getKey().range, command.txnId(), command.executeAt(), accumulate); + if (accumulate.equals(terminalValue)) + break; } - return accumulate; + return ImmediateFuture.success(accumulate); } @Override @@ -355,8 +445,8 @@ public void register(Seekables keysOrRanges, Ranges slice, Command command forEach(keysOrRanges, slice, forKey -> forKey.register(command)); break; case Range: - rangeCommands.computeIfAbsent(command.txnId(), ignore -> new RangeCommand(command)) - .update((Ranges)keysOrRanges); + for (Range range : (Ranges)keysOrRanges) + rangeCommands.putIfAbsent(new RangeAndTxnId(range, command.txnId()), command); } } @@ -370,8 +460,7 @@ public void register(Seekable keyOrRange, Ranges slice, Command command) forEach(keyOrRange, slice, forKey -> forKey.register(command)); break; case Range: - rangeCommands.computeIfAbsent(command.txnId(), ignore -> new RangeCommand(command)) - .update(Ranges.of((Range)keyOrRange)); + rangeCommands.putIfAbsent(new RangeAndTxnId((Range)keyOrRange, command.txnId()), command); } } @@ -447,7 +536,7 @@ private void forEach(Routable keyOrRange, Ranges slice, Consumer public static class Synchronized extends SyncCommandStore { - public static class SynchronizedState extends State implements SyncCommandStores.SafeSyncCommandStore + public static class SynchronizedState extends InMemoryCommandStore.State implements SyncCommandStores.SafeSyncCommandStore { public SynchronizedState(NodeTimeService time, Agent agent, DataStore store, ProgressLog progressLog, RangesForEpochHolder rangesForEpoch, CommandStore commandStore) { diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java index 9ab4219f13..b11c6a70f9 100644 --- a/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java +++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java @@ -18,11 +18,16 @@ package accord.impl; -import accord.local.*; import accord.api.Agent; import accord.api.DataStore; import accord.api.ProgressLog; +import accord.local.AsyncCommandStores; import accord.local.CommandStore; +import accord.local.NodeTimeService; +import accord.local.PreLoadContext; +import accord.local.SafeCommandStore; +import accord.local.ShardDistributor; +import accord.local.SyncCommandStores; import accord.primitives.Routables; import accord.utils.MapReduce; diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandsForKey.java b/accord-core/src/main/java/accord/impl/InMemoryCommandsForKey.java index b2a1a65c48..7f2180337c 100644 --- a/accord-core/src/main/java/accord/impl/InMemoryCommandsForKey.java +++ b/accord-core/src/main/java/accord/impl/InMemoryCommandsForKey.java @@ -20,7 +20,7 @@ import accord.api.Key; import accord.local.Command; -import accord.local.SafeCommandStore.CommandFunction; +import accord.local.SafeCommandStore.SearchFunction; import accord.local.SafeCommandStore.TestDep; import accord.local.SafeCommandStore.TestKind; import accord.local.Status; @@ -77,7 +77,7 @@ public boolean isEmpty() public T mapReduce(TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp, TestDep testDep, @Nullable TxnId depId, @Nullable Status minStatus, @Nullable Status maxStatus, - CommandFunction map, T initialValue, T terminalValue) + SearchFunction map, T initialValue, T terminalValue) { for (Command cmd : (testTimestamp == TestTimestamp.BEFORE ? commands.headMap(timestamp, false) : commands.tailMap(timestamp, false)).values()) diff --git a/accord-core/src/main/java/accord/local/AsyncCommandStores.java b/accord-core/src/main/java/accord/local/AsyncCommandStores.java index f586ba75d4..c07eeced00 100644 --- a/accord-core/src/main/java/accord/local/AsyncCommandStores.java +++ b/accord-core/src/main/java/accord/local/AsyncCommandStores.java @@ -4,19 +4,18 @@ import accord.api.DataStore; import accord.api.ProgressLog; import accord.primitives.Routables; -import accord.utils.MapReduce; -import accord.utils.MapReduceConsume; -import accord.utils.ReducingFuture; +import accord.utils.*; import org.apache.cassandra.utils.concurrent.Future; import org.apache.cassandra.utils.concurrent.ImmediateFuture; import java.util.ArrayList; import java.util.List; +import java.util.function.Function; import java.util.stream.IntStream; public class AsyncCommandStores extends CommandStores { - static class AsyncMapReduceAdapter implements MapReduceAdapter, List>, O> + static class AsyncMapReduceAdapter implements MapReduceAdapter, List>, O, O> { private static final AsyncMapReduceAdapter INSTANCE = new AsyncMapReduceAdapter<>(); public static AsyncMapReduceAdapter instance() { return INSTANCE; } @@ -28,30 +27,69 @@ public List> allocate() } @Override - public Future apply(MapReduce map, CommandStore commandStore, PreLoadContext context) + public Future apply(Function map, CommandStore commandStore, PreLoadContext context) { return commandStore.submit(context, map); } @Override - public List> reduce(MapReduce reduce, List> futures, Future next) + public List> reduce(Reduce reduce, List> futures, Future next) { futures.add(next); return futures; } @Override - public void consume(MapReduceConsume reduceAndConsume, Future future) + public void consume(ReduceConsume reduceAndConsume, Future future) { future.addCallback(reduceAndConsume); } @Override - public Future reduce(MapReduce reduce, List> futures) + public Future reduce(Reduce reduce, List> futures) { if (futures.isEmpty()) return ImmediateFuture.success(null); - return ReducingFuture.reduce(futures, reduce::reduce); + return ReducingFuture.reduce(futures, reduce); + } + } + + static class AsyncFlatMapReduceAdapter implements MapReduceAdapter, List>, Future, O> + { + private static final AsyncFlatMapReduceAdapter INSTANCE = new AsyncFlatMapReduceAdapter<>(); + public static AsyncFlatMapReduceAdapter instance() { return INSTANCE; } + + @Override + public List> allocate() + { + return new ArrayList<>(); + } + + @Override + public Future apply(Function> map, CommandStore commandStore, PreLoadContext context) + { + return commandStore.submit(context, map).flatMap(i -> i); + } + + @Override + public List> reduce(Reduce reduce, List> futures, Future next) + { + futures.add(next); + return futures; + } + + @Override + public void consume(ReduceConsume reduceAndConsume, Future future) + { + future.addCallback(reduceAndConsume); + } + + @Override + public Future reduce(Reduce reduce, List> futures) + { + if (futures.isEmpty()) + return ImmediateFuture.success(null); + return ReducingFuture.reduce(futures, reduce); } } @@ -71,4 +109,10 @@ public void mapReduceConsume(PreLoadContext context, IntStream commandStoreI { mapReduceConsume(context, commandStoreIds, mapReduceConsume, AsyncMapReduceAdapter.INSTANCE); } + + @Override + public void mapReduceConsume(PreLoadContext context, Routables keys, long minEpoch, long maxEpoch, AsyncMapReduceConsume mapReduceConsume) + { + mapReduceConsume(context, keys, minEpoch, maxEpoch, mapReduceConsume, AsyncFlatMapReduceAdapter.INSTANCE); + } } diff --git a/accord-core/src/main/java/accord/local/Command.java b/accord-core/src/main/java/accord/local/Command.java index ef3709fb9a..74f7f72a96 100644 --- a/accord-core/src/main/java/accord/local/Command.java +++ b/accord-core/src/main/java/accord/local/Command.java @@ -191,6 +191,9 @@ public Iterable txnIds() public Seekables keys() { // TODO (expected, consider): when do we need this, and will it always be sufficient? + // this is used for Apply; should probably have its own context for clarity, but since + // only used for local timestamp derivation in C* integration can probably remove it + // when we improve that return partialTxn().keys(); } diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 09ad0fc6cc..5652b11137 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -22,10 +22,8 @@ import accord.primitives.*; import accord.api.RoutingKey; import accord.topology.Topology; -import accord.utils.MapReduce; -import accord.utils.MapReduceConsume; +import accord.utils.*; -import accord.utils.ReducingFuture; import com.carrotsearch.hppc.IntObjectMap; import com.carrotsearch.hppc.IntObjectScatterMap; import com.google.common.annotations.VisibleForTesting; @@ -36,6 +34,7 @@ import java.util.Collections; import java.util.List; import java.util.function.Consumer; +import java.util.function.Function; import java.util.stream.IntStream; import static accord.local.PreLoadContext.empty; @@ -287,13 +286,13 @@ private synchronized Snapshot updateTopology(Snapshot prev, Topology newTopology return new Snapshot(result.toArray(new ShardHolder[0]), newLocalTopology, newTopology); } - interface MapReduceAdapter + interface MapReduceAdapter { Accumulator allocate(); - Intermediate apply(MapReduce map, S commandStore, PreLoadContext context); - Accumulator reduce(MapReduce reduce, Accumulator accumulator, Intermediate next); - void consume(MapReduceConsume consume, Intermediate reduced); - Intermediate reduce(MapReduce reduce, Accumulator accumulator); + Accumulate apply(Function map, S commandStore, PreLoadContext context); + Accumulator reduce(Reduce reduce, Accumulator accumulator, Accumulate next); + void consume(ReduceConsume consume, Accumulate reduced); + Accumulate reduce(Reduce reduce, Accumulator accumulator); } public Future forEach(Consumer forEach) @@ -363,30 +362,57 @@ public void mapReduceConsume(PreLoadContext context, RoutingKey key, long mi */ public abstract void mapReduceConsume(PreLoadContext context, Routables keys, long minEpoch, long maxEpoch, MapReduceConsume mapReduceConsume); public abstract void mapReduceConsume(PreLoadContext context, IntStream commandStoreIds, MapReduceConsume mapReduceConsume); + public abstract void mapReduceConsume(PreLoadContext context, Routables keys, long minEpoch, long maxEpoch, AsyncMapReduceConsume mapReduceConsume); protected void mapReduceConsume(PreLoadContext context, Routables keys, long minEpoch, long maxEpoch, MapReduceConsume mapReduceConsume, - MapReduceAdapter adapter) + MapReduceAdapter adapter) + { + T1 reduced = mapReduce(context, keys, minEpoch, maxEpoch, mapReduceConsume, adapter); + adapter.consume(mapReduceConsume, reduced); + } + + protected void mapReduceConsume(PreLoadContext context, Routables keys, long minEpoch, long maxEpoch, AsyncMapReduceConsume mapReduceConsume, + MapReduceAdapter, O> adapter) { T1 reduced = mapReduce(context, keys, minEpoch, maxEpoch, mapReduceConsume, adapter); adapter.consume(mapReduceConsume, reduced); } protected void mapReduceConsume(PreLoadContext context, IntStream commandStoreIds, MapReduceConsume mapReduceConsume, - MapReduceAdapter adapter) + MapReduceAdapter adapter) { T1 reduced = mapReduce(context, commandStoreIds, mapReduceConsume, adapter); adapter.consume(mapReduceConsume, reduced); } protected T1 mapReduce(PreLoadContext context, Routables keys, long minEpoch, long maxEpoch, MapReduce mapReduce, - MapReduceAdapter adapter) + MapReduceAdapter adapter) + { + T2 accumulator = adapter.allocate(); + Snapshot snapshot = current; + ShardHolder[] shards = snapshot.shards; + for (ShardHolder shard : shards) + { + // TODO (required, efficiency): range map for intersecting ranges (e.g. that to be introduced for range dependencies) + Ranges shardRanges = shard.ranges().between(minEpoch, maxEpoch); + if (!shardRanges.intersects(keys)) + continue; + + T1 next = adapter.apply(mapReduce, (S)shard.store, context); + accumulator = adapter.reduce(mapReduce, accumulator, next); + } + return adapter.reduce(mapReduce, accumulator); + } + + protected T1 mapReduce(PreLoadContext context, Routables keys, long minEpoch, long maxEpoch, AsyncMapReduce mapReduce, + MapReduceAdapter, O> adapter) { T2 accumulator = adapter.allocate(); Snapshot snapshot = current; ShardHolder[] shards = snapshot.shards; for (ShardHolder shard : shards) { - // TODO (urgent, efficiency): range map for intersecting ranges (e.g. that to be introduced for range dependencies) + // TODO (required, efficiency): range map for intersecting ranges (e.g. that to be introduced for range dependencies) Ranges shardRanges = shard.ranges().between(minEpoch, maxEpoch); if (!shardRanges.intersects(keys)) continue; @@ -398,7 +424,7 @@ protected T1 mapReduce(PreLoadContext context, Routables keys, } protected T1 mapReduce(PreLoadContext context, IntStream commandStoreIds, MapReduce mapReduce, - MapReduceAdapter adapter) + MapReduceAdapter adapter) { // TODO (low priority, efficiency): avoid using an array, or use a scratch buffer int[] ids = commandStoreIds.toArray(); diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java index 4ccc05c1ca..84331519d3 100644 --- a/accord-core/src/main/java/accord/local/Node.java +++ b/accord-core/src/main/java/accord/local/Node.java @@ -31,6 +31,7 @@ import accord.messages.*; import accord.primitives.*; import accord.primitives.Routable.Domain; +import accord.utils.AsyncMapReduceConsume; import accord.utils.MapReduceConsume; import com.google.common.annotations.VisibleForTesting; @@ -275,7 +276,12 @@ public Future ifLocalSince(PreLoadContext context, RoutingKey key, Timesta return commandStores.ifLocal(context, key, since.epoch(), Long.MAX_VALUE, ifLocal); } - public void mapReduceConsumeLocal(TxnRequest request, long minEpoch, long maxEpoch, MapReduceConsume mapReduceConsume) + public void mapReduceConsumeLocal(TxnRequest request, long minEpoch, long maxEpoch, MapReduceConsume mapReduceConsume) + { + commandStores.mapReduceConsume(request, request.scope(), minEpoch, maxEpoch, mapReduceConsume); + } + + public void mapReduceConsumeLocal(TxnRequest request, long minEpoch, long maxEpoch, AsyncMapReduceConsume mapReduceConsume) { commandStores.mapReduceConsume(request, request.scope(), minEpoch, maxEpoch, mapReduceConsume); } diff --git a/accord-core/src/main/java/accord/local/PreLoadContext.java b/accord-core/src/main/java/accord/local/PreLoadContext.java index 8a56c9ff10..3c20f7dd9c 100644 --- a/accord-core/src/main/java/accord/local/PreLoadContext.java +++ b/accord-core/src/main/java/accord/local/PreLoadContext.java @@ -50,7 +50,7 @@ public interface PreLoadContext * Both can be done without. For range transactions calculateDeps needs to be asynchronous anyway to support * potentially large scans, and for register we do not need to load into memory, we can perform a blind write. */ - Seekables keys(); + default Seekables keys() { return Keys.EMPTY; } static PreLoadContext contextFor(Iterable txnIds, Seekables keys) { @@ -89,6 +89,11 @@ static PreLoadContext contextFor(Key key) return contextFor(Collections.emptyList(), Keys.of(key)); } + static PreLoadContext contextFor(Seekables keys) + { + return contextFor(Collections.emptyList(), keys); + } + static PreLoadContext empty() { return contextFor(Collections.emptyList(), Keys.EMPTY); diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java b/accord-core/src/main/java/accord/local/SafeCommandStore.java index 8f5b2f3b32..f025e65819 100644 --- a/accord-core/src/main/java/accord/local/SafeCommandStore.java +++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java @@ -53,11 +53,6 @@ public interface SafeCommandStore */ void addAndInvokeListener(TxnId txnId, CommandListener listener); - interface CommandFunction - { - O apply(Seekable keyOrRange, TxnId txnId, Timestamp executeAt, I in); - } - enum TestTimestamp { STARTED_BEFORE, @@ -69,14 +64,46 @@ enum TestDep { WITH, WITHOUT, ANY_DEPS } enum TestKind { Ws, RorWs } /** - * Visits keys first and then ranges, both in ascending order. - * Within each key or range visits TxnId in ascending order of queried timestamp. + * Is able to search based on dependency contents, which may require reading from storage + */ + interface SlowSearcher + { + T fold(TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp, + TestDep testDep, @Nullable TxnId depId, + @Nullable Status minStatus, @Nullable Status maxStatus, + SearchFunction mapReduce, T initialValue, T terminalValue); + } + + interface SlowSearchFunction + { + O apply(SlowSearcher slowSearcher, Seekable keyOrRange, I in); + } + + interface SearchFunction + { + O apply(Seekable keyOrRange, TxnId txnId, Timestamp executeAt, I in); + } + + /** + * A slow fold that permits applying multiple searches to the data for an intersecting + * key or range. TODO (expected): we might want some pre-filter? + * + * Visits keys in unspecified order, hence neither foldl/foldr, though we aim for foldl + * (ascending order in keys/ranges then txnId). + * This may be applied asynchronously, though it is expected to normally respond immediately. + */ + Future slowFold(Seekables keys, Ranges slice, + SlowSearchFunction fold, T initialValue, T terminalValue); + + /** + * Visits keys in unspecified order, hence neither foldl/foldr, though we aim for foldl + * (ascending order in keys/ranges then txnId). + * This may be applied asynchronously, though it is expected to normally respond immediately. */ - T mapReduce(Seekables keys, Ranges slice, + Future fold(Seekables keys, Ranges slice, TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp, - TestDep testDep, @Nullable TxnId depId, @Nullable Status minStatus, @Nullable Status maxStatus, - CommandFunction map, T initialValue, T terminalValue); + SearchFunction fold, T initialValue, T terminalValue); void register(Seekables keysOrRanges, Ranges slice, Command command); void register(Seekable keyOrRange, Ranges slice, Command command); diff --git a/accord-core/src/main/java/accord/local/SyncCommandStores.java b/accord-core/src/main/java/accord/local/SyncCommandStores.java index 93476f738e..dd62fbc4cd 100644 --- a/accord-core/src/main/java/accord/local/SyncCommandStores.java +++ b/accord-core/src/main/java/accord/local/SyncCommandStores.java @@ -4,9 +4,12 @@ import accord.api.DataStore; import accord.api.ProgressLog; import accord.primitives.Routables; -import accord.utils.MapReduce; -import accord.utils.MapReduceConsume; +import accord.utils.*; +import org.apache.cassandra.utils.concurrent.Future; +import org.apache.cassandra.utils.concurrent.ImmediateFuture; +import java.util.ArrayList; +import java.util.List; import java.util.function.Function; import java.util.stream.IntStream; @@ -31,7 +34,7 @@ public SyncCommandStores(NodeTimeService time, Agent agent, DataStore store, Sha super(time, agent, store, shardDistributor, progressLogFactory, shardFactory); } - protected static class SyncMapReduceAdapter implements MapReduceAdapter + protected static class SyncMapReduceAdapter implements MapReduceAdapter { private static final SyncMapReduceAdapter INSTANCE = new SyncMapReduceAdapter<>(); public static SyncMapReduceAdapter instance() { return INSTANCE; } @@ -44,30 +47,70 @@ public O allocate() } @Override - public O apply(MapReduce map, SyncCommandStore commandStore, PreLoadContext context) + public O apply(Function map, SyncCommandStore commandStore, PreLoadContext context) { return commandStore.executeSync(context, map); } @Override - public O reduce(MapReduce reduce, O prev, O next) + public O reduce(Reduce reduce, O prev, O next) { return prev == SENTINEL ? next : reduce.reduce(prev, next); } @Override - public void consume(MapReduceConsume reduceAndConsume, O result) + public void consume(ReduceConsume reduceAndConsume, O result) { reduceAndConsume.accept(result, null); } @Override - public O reduce(MapReduce reduce, O result) + public O reduce(Reduce reduce, O result) { return result == SENTINEL ? null : result; } } + protected static class SyncFutureMapReduceAdapter implements MapReduceAdapter, List>, Future, O> + { + private static final SyncFutureMapReduceAdapter INSTANCE = new SyncFutureMapReduceAdapter<>(); + public static SyncFutureMapReduceAdapter instance() { return INSTANCE; } + + @Override + public List> allocate() + { + return new ArrayList<>(); + } + + @Override + public Future apply(Function> map, SyncCommandStore commandStore, PreLoadContext context) + { + return commandStore.executeSync(context, map); + } + + @Override + public List> reduce(Reduce reduce, List> prev, Future next) + { + prev.add(next); + return prev; + } + + @Override + public void consume(ReduceConsume consume, Future reduced) + { + reduced.addCallback(consume); + } + + @Override + public Future reduce(Reduce reduce, List> futures) + { + if (futures.isEmpty()) + return ImmediateFuture.success(null); + + return ReducingFuture.reduce(futures, reduce); + } + } + @Override public void mapReduceConsume(PreLoadContext context, Routables keys, long minEpoch, long maxEpoch, MapReduceConsume mapReduceConsume) { @@ -93,4 +136,17 @@ public void mapReduceConsume(PreLoadContext context, IntStream commandStoreI mapReduceConsume.accept(null, t); } } + + @Override + public void mapReduceConsume(PreLoadContext context, Routables keys, long minEpoch, long maxEpoch, AsyncMapReduceConsume mapReduceConsume) + { + try + { + mapReduceConsume(context, keys, minEpoch, maxEpoch, mapReduceConsume, SyncFutureMapReduceAdapter.INSTANCE); + } + catch (Throwable t) + { + mapReduceConsume.accept(null, t); + } + } } diff --git a/accord-core/src/main/java/accord/messages/AbstractEpochRequest.java b/accord-core/src/main/java/accord/messages/AbstractEpochRequest.java index 4cc753861b..4b79e3f04e 100644 --- a/accord-core/src/main/java/accord/messages/AbstractEpochRequest.java +++ b/accord-core/src/main/java/accord/messages/AbstractEpochRequest.java @@ -50,10 +50,4 @@ public Iterable txnIds() { return Collections.singleton(txnId); } - - @Override - public Seekables keys() - { - return Keys.EMPTY; - } } diff --git a/accord-core/src/main/java/accord/messages/Accept.java b/accord-core/src/main/java/accord/messages/Accept.java index f155ffe08c..ef064781ee 100644 --- a/accord-core/src/main/java/accord/messages/Accept.java +++ b/accord-core/src/main/java/accord/messages/Accept.java @@ -31,8 +31,13 @@ import accord.local.Command; import java.util.Collections; +import java.util.function.BiFunction; + import accord.primitives.Deps; import accord.primitives.TxnId; +import accord.utils.AsyncMapReduceConsume; +import org.apache.cassandra.utils.concurrent.Future; +import org.apache.cassandra.utils.concurrent.ImmediateFuture; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -41,7 +46,7 @@ // TODO (low priority, efficiency): use different objects for send and receive, so can be more efficient // (e.g. serialize without slicing, and without unnecessary fields) -public class Accept extends TxnRequest.WithUnsynced +public class Accept extends TxnRequest.WithUnsynced implements AsyncMapReduceConsume { public static class SerializerSupport { @@ -75,7 +80,7 @@ private Accept(TxnId txnId, PartialRoute scope, long waitForEpoch, long minEp } @Override - public synchronized AcceptReply apply(SafeCommandStore safeStore) + public synchronized Future apply(SafeCommandStore safeStore) { if (minUnsyncedEpoch < txnId.epoch()) { @@ -83,7 +88,7 @@ public synchronized AcceptReply apply(SafeCommandStore safeStore) // if not, we're just providing dependencies, and we can do that without updating our state Ranges acceptRanges = safeStore.ranges().between(txnId.epoch(), executeAt.epoch()); if (!acceptRanges.intersects(scope)) - return new AcceptReply(calculatePartialDeps(safeStore)); + return calculatePartialDeps(safeStore).map(AcceptReply::new); } // only accept if we actually participate in the ranges - otherwise we're just looking @@ -94,14 +99,14 @@ public synchronized AcceptReply apply(SafeCommandStore safeStore) case Redundant: return AcceptReply.REDUNDANT; case RejectedBallot: - return new AcceptReply(command.promised()); + return ImmediateFuture.success(new AcceptReply(command.promised())); case Success: // TODO (desirable, efficiency): we don't need to calculate deps if executeAt == txnId - return new AcceptReply(calculatePartialDeps(safeStore)); + return calculatePartialDeps(safeStore).map(AcceptReply::new); } } - private PartialDeps calculatePartialDeps(SafeCommandStore safeStore) + private Future calculatePartialDeps(SafeCommandStore safeStore) { return PreAccept.calculatePartialDeps(safeStore, txnId, keys, executeAt, safeStore.ranges().between(minUnsyncedEpoch, executeAt.epoch())); } @@ -136,12 +141,6 @@ public Iterable txnIds() return Collections.singleton(txnId); } - @Override - public Seekables keys() - { - return keys; - } - @Override public MessageType type() { @@ -159,8 +158,8 @@ public String toString() { public static final class AcceptReply implements Reply { - public static final AcceptReply ACCEPT_INVALIDATE = new AcceptReply(Success); - public static final AcceptReply REDUNDANT = new AcceptReply(Redundant); + public static final Future ACCEPT_INVALIDATE = ImmediateFuture.success(new AcceptReply(Success)); + public static final Future REDUNDANT = ImmediateFuture.success(new AcceptReply(Redundant)); public final AcceptOutcome outcome; public final Ballot supersededBy; @@ -245,9 +244,9 @@ public AcceptReply apply(SafeCommandStore safeStore) { default: case Redundant: - return AcceptReply.REDUNDANT; + return AcceptReply.REDUNDANT.getNow(); case Success: - return AcceptReply.ACCEPT_INVALIDATE; + return AcceptReply.ACCEPT_INVALIDATE.getNow(); case RejectedBallot: return new AcceptReply(command.promised()); } diff --git a/accord-core/src/main/java/accord/messages/Apply.java b/accord-core/src/main/java/accord/messages/Apply.java index 7dac5605e8..2a1d21f92d 100644 --- a/accord-core/src/main/java/accord/messages/Apply.java +++ b/accord-core/src/main/java/accord/messages/Apply.java @@ -24,6 +24,7 @@ import accord.local.Node.Id; import accord.api.Result; import accord.topology.Topologies; +import accord.utils.MapReduceConsume; import com.google.common.collect.Iterables; import java.util.Collections; @@ -33,7 +34,7 @@ import static accord.messages.MessageType.APPLY_REQ; import static accord.messages.MessageType.APPLY_RSP; -public class Apply extends TxnRequest +public class Apply extends TxnRequest implements MapReduceConsume { public static class SerializationSupport { @@ -117,12 +118,6 @@ public Iterable txnIds() return Iterables.concat(Collections.singleton(txnId), deps.txnIds()); } - @Override - public Seekables keys() - { - return Keys.EMPTY; - } - @Override public MessageType type() { diff --git a/accord-core/src/main/java/accord/messages/BeginInvalidation.java b/accord-core/src/main/java/accord/messages/BeginInvalidation.java index 57f48f7602..79d63eb2c4 100644 --- a/accord-core/src/main/java/accord/messages/BeginInvalidation.java +++ b/accord-core/src/main/java/accord/messages/BeginInvalidation.java @@ -71,12 +71,6 @@ public Iterable txnIds() return Collections.singleton(txnId); } - @Override - public Seekables keys() - { - return Keys.EMPTY; - } - @Override public long waitForEpoch() { diff --git a/accord-core/src/main/java/accord/messages/BeginRecovery.java b/accord-core/src/main/java/accord/messages/BeginRecovery.java index 3d699c8eb4..5a9a36558b 100644 --- a/accord-core/src/main/java/accord/messages/BeginRecovery.java +++ b/accord-core/src/main/java/accord/messages/BeginRecovery.java @@ -20,6 +20,7 @@ import accord.api.Result; import accord.local.SafeCommandStore; +import accord.local.SafeCommandStore.SlowSearcher; import accord.local.Status.Phase; import accord.primitives.*; import accord.topology.Topologies; @@ -29,11 +30,14 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; +import accord.utils.AsyncMapReduceConsume; import accord.utils.Invariants; import accord.local.Node.Id; import accord.local.Command; import accord.local.Status; +import org.apache.cassandra.utils.concurrent.Future; +import org.apache.cassandra.utils.concurrent.ImmediateFuture; import java.util.Collections; @@ -42,9 +46,9 @@ import static accord.local.SafeCommandStore.TestKind.RorWs; import static accord.local.SafeCommandStore.TestTimestamp.*; import static accord.local.Status.*; -import static accord.messages.PreAccept.calculatePartialDeps; +import static accord.messages.PreAccept.addDeps; -public class BeginRecovery extends TxnRequest +public class BeginRecovery extends TxnRequest implements AsyncMapReduceConsume { public static class SerializationSupport { @@ -81,8 +85,7 @@ protected void process() } @Override - - public RecoverReply apply(SafeCommandStore safeStore) + public Future apply(SafeCommandStore safeStore) { Command command = safeStore.command(txnId); @@ -95,40 +98,70 @@ public RecoverReply apply(SafeCommandStore safeStore) throw new IllegalStateException("Invalid Outcome"); case RejectedBallot: - return new RecoverNack(command.promised()); + return ImmediateFuture.success(new RecoverNack(command.promised())); case Success: } - PartialDeps deps = command.partialDeps(); - if (!command.known().deps.hasProposedOrDecidedDeps()) - { - deps = calculatePartialDeps(safeStore, txnId, partialTxn.keys(), txnId, safeStore.ranges().at(txnId.epoch())); - } + Ranges ranges = safeStore.ranges().at(txnId.epoch()); + RecoverOk basic = new RecoverOk(txnId, command.status(), command.accepted(), command.executeAt(), + command.partialDeps() == null ? PartialDeps.NONE : command.partialDeps(), Deps.NONE, Deps.NONE, + false, command.writes(), command.result()); - boolean rejectsFastPath; - Deps earlierCommittedWitness, earlierAcceptedNoWitness; + if (command.hasBeen(Committed)) + return ImmediateFuture.success(basic); - if (command.hasBeen(PreCommitted)) + class Collector implements AutoCloseable { - rejectsFastPath = false; - earlierCommittedWitness = earlierAcceptedNoWitness = Deps.NONE; + PartialDeps.Builder deps; + Deps.Builder earlierAcceptedNoWitness; + Deps.Builder earlierCommittedWitness; + boolean rejectsFastPath; + + @Override + public void close() + { + if (deps != null) deps.close(); + if (earlierAcceptedNoWitness != null) earlierAcceptedNoWitness.close(); + if (earlierCommittedWitness != null) earlierCommittedWitness.close(); + } } - else - { - Ranges ranges = safeStore.ranges().at(txnId.epoch()); - rejectsFastPath = hasAcceptedStartedAfterWithoutWitnessing(safeStore, txnId, ranges, partialTxn.keys()); - if (!rejectsFastPath) - rejectsFastPath = hasCommittedExecutesAfterWithoutWitnessing(safeStore, txnId, ranges, partialTxn.keys()); - // TODO (expected, testing): introduce some good unit tests for verifying these two functions in a real repair scenario - // committed txns with an earlier txnid and have our txnid as a dependency - earlierCommittedWitness = committedStartedBeforeAndWitnessed(safeStore, txnId, ranges, partialTxn.keys()); + Collector collector = new Collector(); + if (!command.known().deps.hasProposedOrDecidedDeps()) + { + collector.deps = PartialDeps.builder(ranges); + } - // accepted txns with an earlier txnid that don't have our txnid as a dependency - earlierAcceptedNoWitness = acceptedStartedBeforeWithoutWitnessing(safeStore, txnId, ranges, partialTxn.keys()); + if (!basic.status.hasBeen(PreCommitted)) + { + collector.earlierCommittedWitness = Deps.builder(); + collector.earlierAcceptedNoWitness = Deps.builder(); } - return new RecoverOk(txnId, command.status(), command.accepted(), command.executeAt(), deps, earlierCommittedWitness, earlierAcceptedNoWitness, rejectsFastPath, command.writes(), command.result()); + + Future collected = safeStore.slowFold(partialTxn.keys(), ranges, (searcher, keyOrRange, c) -> { + if (c.deps != null) + addDeps(searcher, txnId, txnId, c.deps); + + if (!basic.status.hasBeen(PreCommitted)) + { + addAcceptedStartedBeforeWithoutWitnessing(searcher, txnId, c.earlierAcceptedNoWitness); + addCommittedStartedBeforeAndWitnessed(searcher, txnId, c.earlierCommittedWitness); + if (!c.rejectsFastPath) + { + c.rejectsFastPath = hasAcceptedStartedAfterWithoutWitnessing(searcher, txnId); + if (!c.rejectsFastPath) + c.rejectsFastPath = hasCommittedExecutesAfterWithoutWitnessing(searcher, txnId); + } + } + return c; + }, collector, null); + + return collected.map(c -> { + RecoverOk ok = basic.merge(c.deps, c.earlierCommittedWitness, c.earlierAcceptedNoWitness, c.rejectsFastPath); + c.close(); + return ok; + }); } @Override @@ -179,6 +212,8 @@ public Iterable txnIds() @Override public Seekables keys() { + // TODO (expected): we shouldn't need this; it's currently used only for maxConflict IF we haven't preaccepted, + // which we can always answer pessimistically, so should always be answered from cache return partialTxn.keys(); } @@ -268,6 +303,16 @@ public static RecoverOk maxAcceptedOrLater(List recoverOks) { return Status.max(recoverOks, r -> r.status, r -> r.accepted, r -> r.status.phase.compareTo(Phase.Accept) >= 0); } + + RecoverOk merge(@Nullable PartialDeps.Builder deps, @Nullable Deps.Builder earlierCommittedWitness, @Nullable Deps.Builder earlierAcceptedNoWitness, boolean rejectsFastPath) + { + return new RecoverOk(txnId, status, accepted, executeAt, + deps != null ? deps.build() : this.deps, + earlierCommittedWitness != null ? earlierCommittedWitness.build() : this.earlierCommittedWitness, + earlierAcceptedNoWitness != null ? earlierAcceptedNoWitness.build() : this.earlierAcceptedNoWitness, + rejectsFastPath | this.rejectsFastPath, + writes, result); + } } public static class RecoverNack extends RecoverReply @@ -293,31 +338,23 @@ public String toString() } } - private static Deps acceptedStartedBeforeWithoutWitnessing(SafeCommandStore commandStore, TxnId startedBefore, Ranges ranges, Seekables keys) + private static void addAcceptedStartedBeforeWithoutWitnessing(SlowSearcher searcher, TxnId startedBefore, Deps.Builder builder) { - try (Deps.Builder builder = Deps.builder()) - { - commandStore.mapReduce(keys, ranges, RorWs, STARTED_BEFORE, startedBefore, WITHOUT, startedBefore, Accepted, PreCommitted, - (keyOrRange, txnId, executeAt, prev) -> { - if (executeAt.compareTo(startedBefore) > 0) - builder.add(keyOrRange, txnId); - return builder; - }, builder, null); - return builder.build(); - } + searcher.fold(RorWs, STARTED_BEFORE, startedBefore, WITHOUT, startedBefore, Accepted, PreCommitted, + (keyOrRange, txnId, executeAt, prev) -> { + if (executeAt.compareTo(startedBefore) > 0) + builder.add(keyOrRange, txnId); + return builder; + }, builder, null); } - private static Deps committedStartedBeforeAndWitnessed(SafeCommandStore commandStore, TxnId startedBefore, Ranges ranges, Seekables keys) + private static void addCommittedStartedBeforeAndWitnessed(SlowSearcher searcher, TxnId startedBefore, Deps.Builder builder) { - try (Deps.Builder builder = Deps.builder()) - { - commandStore.mapReduce(keys, ranges, RorWs, STARTED_BEFORE, startedBefore, WITH, startedBefore, Committed, null, - (keyOrRange, txnId, executeAt, prev) -> builder.add(keyOrRange, txnId), (Deps.AbstractBuilder)builder, null); - return builder.build(); - } + searcher.fold(RorWs, STARTED_BEFORE, startedBefore, WITH, startedBefore, Committed, null, + (keyOrRange, txnId, executeAt, prev) -> builder.add(keyOrRange, txnId), (Deps.AbstractBuilder)builder, null); } - private static boolean hasAcceptedStartedAfterWithoutWitnessing(SafeCommandStore commandStore, TxnId startedAfter, Ranges ranges, Seekables keys) + private static boolean hasAcceptedStartedAfterWithoutWitnessing(SlowSearcher searcher, TxnId startedAfter) { /* * The idea here is to discover those transactions that were started after us and have been Accepted @@ -326,11 +363,11 @@ private static boolean hasAcceptedStartedAfterWithoutWitnessing(SafeCommandStore * witnessed us we are safe to propose the pre-accept timestamp regardless, whereas if any transaction * has not witnessed us we can safely invalidate (us). */ - return commandStore.mapReduce(keys, ranges, RorWs, STARTED_AFTER, startedAfter, WITHOUT, startedAfter, Accepted, PreCommitted, + return searcher.fold(RorWs, STARTED_AFTER, startedAfter, WITHOUT, startedAfter, Accepted, PreCommitted, (keyOrRange, txnId, executeAt, prev) -> true, false, true); } - private static boolean hasCommittedExecutesAfterWithoutWitnessing(SafeCommandStore commandStore, TxnId startedAfter, Ranges ranges, Seekables keys) + private static boolean hasCommittedExecutesAfterWithoutWitnessing(SlowSearcher searcher, TxnId startedAfter) { /* * The idea here is to discover those transactions that have been decided to execute after us @@ -339,7 +376,7 @@ private static boolean hasCommittedExecutesAfterWithoutWitnessing(SafeCommandSto * witnessed us we are safe to propose the pre-accept timestamp regardless, whereas if any transaction * has not witnessed us we can safely invalidate it. */ - return commandStore.mapReduce(keys, ranges, RorWs, EXECUTES_AFTER, startedAfter, WITHOUT, startedAfter, Committed, null, + return searcher.fold(RorWs, EXECUTES_AFTER, startedAfter, WITHOUT, startedAfter, Committed, null, (keyOrRange, txnId, executeAt, prev) -> true,false, true); } } diff --git a/accord-core/src/main/java/accord/messages/CheckStatus.java b/accord-core/src/main/java/accord/messages/CheckStatus.java index 5f03522648..21b1ce72c2 100644 --- a/accord-core/src/main/java/accord/messages/CheckStatus.java +++ b/accord-core/src/main/java/accord/messages/CheckStatus.java @@ -85,12 +85,6 @@ public Iterable txnIds() return Collections.singleton(txnId); } - @Override - public Seekables keys() - { - return Keys.EMPTY; - } - public CheckStatus(Id to, Topologies topologies, TxnId txnId, Unseekables query, IncludeInfo includeInfo) { super(txnId); diff --git a/accord-core/src/main/java/accord/messages/Commit.java b/accord-core/src/main/java/accord/messages/Commit.java index d053862394..9880ccae3f 100644 --- a/accord-core/src/main/java/accord/messages/Commit.java +++ b/accord-core/src/main/java/accord/messages/Commit.java @@ -33,11 +33,12 @@ import accord.utils.Invariants; import accord.topology.Topology; +import accord.utils.MapReduceConsume; import static accord.local.Status.Committed; import static accord.local.Status.Known.DefinitionOnly; -public class Commit extends TxnRequest +public class Commit extends TxnRequest implements MapReduceConsume { public static class SerializerSupport { @@ -132,12 +133,6 @@ public Iterable txnIds() return Collections.singleton(txnId); } - @Override - public Seekables keys() - { - return Keys.EMPTY; - } - @Override public void process() { @@ -257,12 +252,6 @@ public Iterable txnIds() return Collections.singleton(txnId); } - @Override - public Seekables keys() - { - return Keys.EMPTY; - } - @Override public long waitForEpoch() { diff --git a/accord-core/src/main/java/accord/messages/Defer.java b/accord-core/src/main/java/accord/messages/Defer.java index 6fa9579568..6534c259e3 100644 --- a/accord-core/src/main/java/accord/messages/Defer.java +++ b/accord-core/src/main/java/accord/messages/Defer.java @@ -17,12 +17,12 @@ class Defer implements CommandListener public enum Ready { No, Yes, Expired } final Function waitUntil; - final TxnRequest request; + final TxnRequest request; final IntHashSet waitingOn = new IntHashSet(); // TODO (easy): use Agrona when available int waitingOnCount; boolean isDone; - Defer(Known waitUntil, Known expireAt, TxnRequest request) + Defer(Known waitUntil, Known expireAt, TxnRequest request) { this(command -> { if (!waitUntil.isSatisfiedBy(command.known())) @@ -33,7 +33,7 @@ public enum Ready { No, Yes, Expired } }, request); } - Defer(Function waitUntil, TxnRequest request) + Defer(Function waitUntil, TxnRequest request) { this.waitUntil = waitUntil; this.request = request; diff --git a/accord-core/src/main/java/accord/messages/GetDeps.java b/accord-core/src/main/java/accord/messages/GetDeps.java index 51ee9e9a85..cf24b3364c 100644 --- a/accord-core/src/main/java/accord/messages/GetDeps.java +++ b/accord-core/src/main/java/accord/messages/GetDeps.java @@ -2,17 +2,19 @@ import accord.local.SafeCommandStore; import accord.primitives.*; +import accord.utils.AsyncMapReduceConsume; import accord.utils.Invariants; import accord.local.Node.Id; import accord.topology.Topologies; +import org.apache.cassandra.utils.concurrent.Future; import javax.annotation.Nonnull; import java.util.Collections; import static accord.messages.PreAccept.calculatePartialDeps; -public class GetDeps extends TxnRequest.WithUnsynced +public class GetDeps extends TxnRequest.WithUnsynced implements AsyncMapReduceConsume { public static final class SerializationSupport { @@ -49,7 +51,7 @@ public void process() } @Override - public PartialDeps apply(SafeCommandStore instance) + public Future apply(SafeCommandStore instance) { Ranges ranges = instance.ranges().between(minUnsyncedEpoch, executeAt.epoch()); return calculatePartialDeps(instance, txnId, keys, executeAt, ranges); @@ -89,12 +91,6 @@ public Iterable txnIds() return Collections.singleton(txnId); } - @Override - public Seekables keys() - { - return keys; - } - public static class GetDepsOk implements Reply { public final PartialDeps deps; diff --git a/accord-core/src/main/java/accord/messages/InformDurable.java b/accord-core/src/main/java/accord/messages/InformDurable.java index ba5c91b27b..266124d2a8 100644 --- a/accord-core/src/main/java/accord/messages/InformDurable.java +++ b/accord-core/src/main/java/accord/messages/InformDurable.java @@ -8,6 +8,7 @@ import accord.local.Status.Durability; import accord.primitives.*; import accord.topology.Topologies; +import accord.utils.MapReduceConsume; import java.util.Collections; @@ -17,7 +18,7 @@ import static accord.local.PreLoadContext.contextFor; import static accord.messages.SimpleReply.Ok; -public class InformDurable extends TxnRequest implements PreLoadContext +public class InformDurable extends TxnRequest implements PreLoadContext, MapReduceConsume { public static class SerializationSupport { @@ -115,10 +116,4 @@ public Iterable txnIds() { return Collections.singleton(txnId); } - - @Override - public Seekables keys() - { - return Keys.EMPTY; - } } diff --git a/accord-core/src/main/java/accord/messages/PreAccept.java b/accord-core/src/main/java/accord/messages/PreAccept.java index 9b171f054f..a233c4bf32 100644 --- a/accord-core/src/main/java/accord/messages/PreAccept.java +++ b/accord-core/src/main/java/accord/messages/PreAccept.java @@ -22,6 +22,7 @@ import java.util.Objects; import accord.local.*; +import accord.local.SafeCommandStore.SlowSearcher; import accord.local.SafeCommandStore.TestKind; import accord.local.Node.Id; @@ -33,13 +34,17 @@ import accord.primitives.*; import accord.primitives.TxnId; +import accord.utils.AsyncMapReduceConsume; +import org.apache.cassandra.utils.concurrent.Future; +import org.apache.cassandra.utils.concurrent.ImmediateFuture; import static accord.local.SafeCommandStore.TestDep.ANY_DEPS; import static accord.local.SafeCommandStore.TestKind.RorWs; import static accord.local.SafeCommandStore.TestKind.Ws; +import static accord.local.SafeCommandStore.TestTimestamp.MAY_EXECUTE_BEFORE; import static accord.local.SafeCommandStore.TestTimestamp.STARTED_BEFORE; -public class PreAccept extends WithUnsynced +public class PreAccept extends WithUnsynced implements AsyncMapReduceConsume { public static class SerializerSupport { @@ -78,6 +83,8 @@ public Iterable txnIds() @Override public Seekables keys() { + // TODO (expected): we shouldn't need this; it's currently used only for maxConflict, which we can always + // answer pessimistically, so should always be answered from cache return partialTxn.keys(); } @@ -88,7 +95,8 @@ protected void process() } @Override - public PreAcceptReply apply(SafeCommandStore safeStore) + // TODO (required): use AsyncChain + public Future apply(SafeCommandStore safeStore) { // note: this diverges from the paper, in that instead of waiting for JoinShard, // we PreAccept to both old and new topologies and require quorums in both. @@ -96,8 +104,8 @@ public PreAcceptReply apply(SafeCommandStore safeStore) if (minUnsyncedEpoch < txnId.epoch() && !safeStore.ranges().at(txnId.epoch()).intersects(scope)) { // we only preaccept in the coordination epoch, but we might contact other epochs for dependencies - return new PreAcceptOk(txnId, txnId, - calculatePartialDeps(safeStore, txnId, partialTxn.keys(), txnId, safeStore.ranges().between(minUnsyncedEpoch, txnId.epoch()))); + return calculatePartialDeps(safeStore, txnId, partialTxn.keys(), txnId, safeStore.ranges().between(minUnsyncedEpoch, txnId.epoch())) + .map(deps -> new PreAcceptOk(txnId, txnId, deps)); } Command command = safeStore.command(txnId); @@ -106,11 +114,11 @@ public PreAcceptReply apply(SafeCommandStore safeStore) default: case Success: case Redundant: - return new PreAcceptOk(txnId, command.executeAt(), - calculatePartialDeps(safeStore, txnId, partialTxn.keys(), txnId, safeStore.ranges().between(minUnsyncedEpoch, txnId.epoch()))); + return calculatePartialDeps(safeStore, txnId, partialTxn.keys(), txnId, safeStore.ranges().between(minUnsyncedEpoch, txnId.epoch())) + .map(deps -> new PreAcceptOk(txnId, command.executeAt(), deps)); case RejectedBallot: - return PreAcceptNack.INSTANCE; + return ImmediateFuture.success(PreAcceptNack.INSTANCE); } } @@ -129,10 +137,10 @@ public PreAcceptReply reduce(PreAcceptReply r1, PreAcceptReply r2) } @Override - public void accept(PreAcceptReply reply, Throwable failure) + public void accept(PreAcceptReply success, Throwable failure) { // TODO (required, error handling): communicate back the failure - node.reply(replyTo, replyContext, reply); + node.reply(replyTo, replyContext, success); } @Override @@ -216,27 +224,34 @@ public String toString() } } - static PartialDeps calculatePartialDeps(SafeCommandStore commandStore, TxnId txnId, Seekables keys, Timestamp executeAt, Ranges ranges) + public static Future calculatePartialDeps(SafeCommandStore commandStore, TxnId txnId, Seekables keys, Timestamp executeAt, Ranges ranges) { - try (PartialDeps.Builder builder = PartialDeps.builder(ranges)) - { - return calculateDeps(commandStore, txnId, keys, executeAt, ranges, builder); - } + PartialDeps.Builder builder = PartialDeps.builder(ranges); + TestKind testKind = txnId.isWrite() ? RorWs : Ws; + Future result = commandStore.fold(keys, ranges, testKind, MAY_EXECUTE_BEFORE, executeAt, null, null, + (keyOrRange, testTxnId, testExecuteAt, in) -> { + // TODO (easy, efficiency): either pass txnId as parameter or encode this behaviour in a specialised builder to avoid extra allocations + if (!testTxnId.equals(txnId)) + in.add(keyOrRange, testTxnId); + return in; + }, builder, null) + .map(Deps.AbstractBuilder::build); + result.addListener(builder::close); + return result; } - private static T calculateDeps(SafeCommandStore commandStore, TxnId txnId, Seekables keys, Timestamp executeAt, Ranges ranges, Deps.AbstractBuilder builder) + public static void addDeps(SlowSearcher searcher, TxnId txnId, Timestamp executeAt, Deps.AbstractBuilder builder) { - TestKind testKind = txnId.rw().isWrite() ? RorWs : Ws; + TestKind testKind = txnId.isWrite() ? RorWs : Ws; // could use MAY_EXECUTE_BEFORE to prune those we know execute later, but shouldn't usually be of much help // and would need to supply !hasOrderedTxnId - commandStore.mapReduce(keys, ranges, testKind, STARTED_BEFORE, executeAt, ANY_DEPS, null, null, null, + searcher.fold(testKind, STARTED_BEFORE, executeAt, ANY_DEPS, null, null, null, (keyOrRange, testTxnId, testExecuteAt, in) -> { // TODO (easy, efficiency): either pass txnId as parameter or encode this behaviour in a specialised builder to avoid extra allocations if (testTxnId != txnId) in.add(keyOrRange, testTxnId); return in; }, builder, null); - return builder.build(); } @Override diff --git a/accord-core/src/main/java/accord/messages/ReadData.java b/accord-core/src/main/java/accord/messages/ReadData.java index 3da177189d..6a012788b1 100644 --- a/accord-core/src/main/java/accord/messages/ReadData.java +++ b/accord-core/src/main/java/accord/messages/ReadData.java @@ -253,12 +253,6 @@ public Iterable txnIds() return Collections.singleton(txnId); } - @Override - public Seekables keys() - { - return Keys.EMPTY; - } - @Override public MessageType type() { diff --git a/accord-core/src/main/java/accord/messages/TxnRequest.java b/accord-core/src/main/java/accord/messages/TxnRequest.java index ab2a6cadc9..8b2bbcad03 100644 --- a/accord-core/src/main/java/accord/messages/TxnRequest.java +++ b/accord-core/src/main/java/accord/messages/TxnRequest.java @@ -38,9 +38,9 @@ import static java.lang.Long.min; -public abstract class TxnRequest implements Request, PreLoadContext, MapReduceConsume +public abstract class TxnRequest implements Request, PreLoadContext { - public static abstract class WithUnsynced extends TxnRequest + public static abstract class WithUnsynced extends TxnRequest { public final long minUnsyncedEpoch; // TODO (low priority, clarity): can this just always be TxnId.epoch? public final boolean doNotComputeProgressKey; diff --git a/accord-core/src/main/java/accord/primitives/AbstractKeys.java b/accord-core/src/main/java/accord/primitives/AbstractKeys.java index 1cb1a3f4da..348314648e 100644 --- a/accord-core/src/main/java/accord/primitives/AbstractKeys.java +++ b/accord-core/src/main/java/accord/primitives/AbstractKeys.java @@ -14,6 +14,7 @@ import net.nicoulaj.compilecommand.annotations.Inline; import static accord.primitives.Routable.Domain.Key; +import static accord.utils.SortedArrays.Search.FAST; @SuppressWarnings("rawtypes") // TODO (desired, efficiency): check that foldl call-sites are inlined and optimised by HotSpot @@ -94,6 +95,12 @@ public final boolean intersects(AbstractRanges ranges) return findNextIntersection(0, ranges, 0) >= 0; } + @Override + public final boolean intersects(Range range) + { + return findNext(0, range, FAST) >= 0; + } + @Override public final int findNext(int thisIndex, RoutableKey key, SortedArrays.Search search) { diff --git a/accord-core/src/main/java/accord/primitives/PartialDeps.java b/accord-core/src/main/java/accord/primitives/PartialDeps.java index 72a86b1cf0..b5627acf6c 100644 --- a/accord-core/src/main/java/accord/primitives/PartialDeps.java +++ b/accord-core/src/main/java/accord/primitives/PartialDeps.java @@ -10,6 +10,7 @@ public static Builder builder(Ranges covering) { return new Builder(covering); } + public static class Builder extends AbstractBuilder { final Ranges covering; diff --git a/accord-core/src/main/java/accord/primitives/Routables.java b/accord-core/src/main/java/accord/primitives/Routables.java index 27645f9288..c940cd06fc 100644 --- a/accord-core/src/main/java/accord/primitives/Routables.java +++ b/accord-core/src/main/java/accord/primitives/Routables.java @@ -36,6 +36,7 @@ enum Slice boolean isEmpty(); boolean intersects(AbstractRanges ranges); boolean intersects(AbstractKeys keys); + boolean intersects(Range range); default boolean intersects(Routables routables) { switch (routables.domain()) diff --git a/accord-core/src/main/java/accord/utils/AsyncMapReduce.java b/accord-core/src/main/java/accord/utils/AsyncMapReduce.java new file mode 100644 index 0000000000..cf7b5083a4 --- /dev/null +++ b/accord-core/src/main/java/accord/utils/AsyncMapReduce.java @@ -0,0 +1,12 @@ +package accord.utils; + +import org.apache.cassandra.utils.concurrent.Future; + +import java.util.function.Function; + +public interface AsyncMapReduce extends Function>, Reduce +{ + // TODO (desired, safety): ensure mutual exclusivity when calling each of these methods + Future apply(I in); + O reduce(O o1, O o2); +} diff --git a/accord-core/src/main/java/accord/utils/AsyncMapReduceConsume.java b/accord-core/src/main/java/accord/utils/AsyncMapReduceConsume.java new file mode 100644 index 0000000000..838b408a1a --- /dev/null +++ b/accord-core/src/main/java/accord/utils/AsyncMapReduceConsume.java @@ -0,0 +1,6 @@ +package accord.utils; + +public interface AsyncMapReduceConsume extends AsyncMapReduce, ReduceConsume +{ + void accept(O result, Throwable failure); +} diff --git a/accord-core/src/main/java/accord/utils/MapReduce.java b/accord-core/src/main/java/accord/utils/MapReduce.java index ac6ddf8d4c..b564d637d7 100644 --- a/accord-core/src/main/java/accord/utils/MapReduce.java +++ b/accord-core/src/main/java/accord/utils/MapReduce.java @@ -2,7 +2,7 @@ import java.util.function.Function; -public interface MapReduce extends Function +public interface MapReduce extends Function, Reduce { // TODO (desired, safety): ensure mutual exclusivity when calling each of these methods @Override diff --git a/accord-core/src/main/java/accord/utils/MapReduceConsume.java b/accord-core/src/main/java/accord/utils/MapReduceConsume.java index 5ec710b82f..35071b78a0 100644 --- a/accord-core/src/main/java/accord/utils/MapReduceConsume.java +++ b/accord-core/src/main/java/accord/utils/MapReduceConsume.java @@ -3,7 +3,7 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; -public interface MapReduceConsume extends MapReduce, BiConsumer +public interface MapReduceConsume extends MapReduce, ReduceConsume { @Override void accept(O result, Throwable failure); diff --git a/accord-core/src/main/java/accord/utils/Reduce.java b/accord-core/src/main/java/accord/utils/Reduce.java new file mode 100644 index 0000000000..4c4ecdbb4e --- /dev/null +++ b/accord-core/src/main/java/accord/utils/Reduce.java @@ -0,0 +1,6 @@ +package accord.utils; + +public interface Reduce +{ + O reduce(O o1, O o2); +} diff --git a/accord-core/src/main/java/accord/utils/ReduceConsume.java b/accord-core/src/main/java/accord/utils/ReduceConsume.java new file mode 100644 index 0000000000..528176c7a3 --- /dev/null +++ b/accord-core/src/main/java/accord/utils/ReduceConsume.java @@ -0,0 +1,8 @@ +package accord.utils; + +import java.util.function.BiConsumer; + +public interface ReduceConsume extends Reduce, BiConsumer +{ + void accept(O result, Throwable failure); +} diff --git a/accord-core/src/main/java/accord/utils/ReducingFuture.java b/accord-core/src/main/java/accord/utils/ReducingFuture.java index 4e02cba52c..db5566c278 100644 --- a/accord-core/src/main/java/accord/utils/ReducingFuture.java +++ b/accord-core/src/main/java/accord/utils/ReducingFuture.java @@ -12,10 +12,10 @@ public class ReducingFuture extends AsyncPromise { private static final AtomicIntegerFieldUpdater PENDING_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ReducingFuture.class, "pending"); private final List> futures; - private final BiFunction reducer; + private final Reduce reducer; private volatile int pending; - private ReducingFuture(List> futures, BiFunction reducer) + private ReducingFuture(List> futures, Reduce reducer) { this.futures = futures; this.reducer = reducer; @@ -36,13 +36,13 @@ else if (PENDING_UPDATER.decrementAndGet(this) == 0) { V result = futures.get(0).getNow(); for (int i=1, mi=futures.size(); i Future reduce(List> futures, BiFunction reducer) + public static Future reduce(List> futures, Reduce reducer) { Preconditions.checkArgument(!futures.isEmpty(), "future list is empty"); diff --git a/accord-core/src/main/java/accord/utils/RelationMultiMap.java b/accord-core/src/main/java/accord/utils/RelationMultiMap.java index cd3c8fb12f..ce2618a23a 100644 --- a/accord-core/src/main/java/accord/utils/RelationMultiMap.java +++ b/accord-core/src/main/java/accord/utils/RelationMultiMap.java @@ -125,14 +125,8 @@ public void nextKey(K key) if (keyCount == keys.length) { - K[] newKeys = cachedKeys.get(keyCount * 2); - System.arraycopy(keys, 0, newKeys, 0, keyCount); - cachedKeys.forceDiscard(keys, keyCount); - keys = newKeys; - int[] newKeyLimits = cachedInts.getInts(keyCount * 2); - System.arraycopy(keyLimits, 0, newKeyLimits, 0, keyCount); - cachedInts.forceDiscard(keyLimits); - keyLimits = newKeyLimits; + keys = cachedKeys.resize(keys, keyCount, keyCount * 2); + keyLimits = cachedInts.resize(keyLimits, keyCount, keyCount * 2); } keys[keyCount++] = key; hasOrderedValues = true; @@ -180,12 +174,7 @@ public void add(V value) hasOrderedValues = false; if (totalCount >= keysToValues.length) - { - V[] newValues = cachedValues.get(keysToValues.length * 2); - System.arraycopy(keysToValues, 0, newValues, 0, totalCount); - cachedValues.forceDiscard(keysToValues, totalCount); - keysToValues = newValues; - } + keysToValues = cachedValues.resize(keysToValues, keysToValues.length, keysToValues.length * 2); keysToValues[totalCount++] = value; } diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java b/accord-core/src/test/java/accord/burn/BurnTest.java index b458e1dc28..46b90b774b 100644 --- a/accord-core/src/test/java/accord/burn/BurnTest.java +++ b/accord-core/src/test/java/accord/burn/BurnTest.java @@ -298,7 +298,7 @@ public static void main(String[] args) throws Exception { // Long overrideSeed = null; int count = 1; - Long overrideSeed = 8602265915508619975L; + Long overrideSeed = -5041252069608453510L; LongSupplier seedGenerator = ThreadLocalRandom.current()::nextLong; for (int i = 0 ; i < args.length ; i += 2) {