-
Notifications
You must be signed in to change notification settings - Fork 23
CASSANDRA-18519: (CEP-15: (C*) Add notion of CommandsForRanges and make this durable in C*) #46
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
8c99d1c
2a34e82
aa3611b
bed28e1
856127f
722ce53
c7b03c4
c054e1d
cfabfda
4d029dc
223beeb
9429814
e1835da
745c281
16be4e9
37b2e2b
5e958b2
692d55d
b56f424
0d70e37
8a5a56d
fb87624
cd3c65c
60740c3
e9132d7
a2175b5
03fbc29
8d87f6e
7cdf2f5
56da122
583a800
b86652d
a7e9cd9
7c82fff
0df6349
56216a9
0e7e8f4
9ca49ce
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,38 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package accord.coordinate; | ||
|
|
||
| import accord.primitives.Ranges; | ||
| import accord.primitives.TxnId; | ||
|
|
||
| public class RangeUnavailable extends Exhausted | ||
| { | ||
| public final Ranges unavailable; | ||
|
|
||
| public RangeUnavailable(Ranges unavailable, TxnId txnId) | ||
| { | ||
| super(txnId, null, buildMessage(unavailable)); | ||
| this.unavailable = unavailable; | ||
| } | ||
|
|
||
| private static String buildMessage(Ranges unavailable) | ||
| { | ||
| return "The following ranges are unavailable to read: " + unavailable; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,229 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package accord.impl; | ||
|
|
||
| import java.util.List; | ||
| import java.util.NavigableMap; | ||
| import java.util.Objects; | ||
| import java.util.TreeMap; | ||
| import java.util.function.Predicate; | ||
| import java.util.stream.Stream; | ||
| import javax.annotation.Nullable; | ||
|
|
||
| import com.google.common.collect.ImmutableSortedMap; | ||
|
|
||
| import accord.api.Key; | ||
| import accord.local.Command; | ||
| import accord.local.SafeCommandStore; | ||
| import accord.local.SaveStatus; | ||
| import accord.local.Status; | ||
| import accord.primitives.Seekable; | ||
| import accord.primitives.Timestamp; | ||
| import accord.primitives.TxnId; | ||
|
|
||
| import static accord.local.SafeCommandStore.TestDep.ANY_DEPS; | ||
| import static accord.local.SafeCommandStore.TestDep.WITH; | ||
| import static accord.utils.Utils.ensureSortedImmutable; | ||
| import static accord.utils.Utils.ensureSortedMutable; | ||
|
|
||
| public class CommandTimeseries<D> | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just moved this out of |
||
| { | ||
| public enum TestTimestamp | ||
| {BEFORE, AFTER} | ||
|
|
||
| private final Seekable keyOrRange; | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this type change is the only real change to this class |
||
| protected final CommandLoader<D> loader; | ||
| public final ImmutableSortedMap<Timestamp, D> commands; | ||
|
|
||
| public CommandTimeseries(Update<D> builder) | ||
| { | ||
| this.keyOrRange = builder.keyOrRange; | ||
| this.loader = builder.loader; | ||
| this.commands = ensureSortedImmutable(builder.commands); | ||
| } | ||
|
|
||
| CommandTimeseries(Seekable keyOrRange, CommandLoader<D> loader, ImmutableSortedMap<Timestamp, D> commands) | ||
| { | ||
| this.keyOrRange = keyOrRange; | ||
| this.loader = loader; | ||
| this.commands = commands; | ||
| } | ||
|
|
||
| public CommandTimeseries(Key keyOrRange, CommandLoader<D> loader) | ||
| { | ||
| this(keyOrRange, loader, ImmutableSortedMap.of()); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean equals(Object o) | ||
| { | ||
| if (this == o) return true; | ||
| if (o == null || getClass() != o.getClass()) return false; | ||
| CommandTimeseries<?> that = (CommandTimeseries<?>) o; | ||
| return keyOrRange.equals(that.keyOrRange) && loader.equals(that.loader) && commands.equals(that.commands); | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() | ||
| { | ||
| int hash = 1; | ||
| hash = 31 * hash + Objects.hashCode(keyOrRange); | ||
| hash = 31 * hash + Objects.hashCode(loader); | ||
| hash = 31 * hash + Objects.hashCode(commands); | ||
| return hash; | ||
| } | ||
|
|
||
| public D get(Timestamp key) | ||
| { | ||
| return commands.get(key); | ||
| } | ||
|
|
||
| public boolean isEmpty() | ||
| { | ||
| return commands.isEmpty(); | ||
| } | ||
|
|
||
| public Timestamp maxTimestamp() | ||
| { | ||
| return commands.isEmpty() ? Timestamp.NONE : commands.keySet().last(); | ||
| } | ||
|
|
||
| /** | ||
| * All commands before/after (exclusive of) the given timestamp | ||
| * <p> | ||
| * Note that {@code testDep} applies only to commands that know at least proposed deps; if specified any | ||
| * commands that do not know any deps will be ignored. | ||
| * <p> | ||
| * TODO (expected, efficiency): TestDep should be asynchronous; data should not be kept memory-resident as only used for recovery | ||
| */ | ||
| public <T> T mapReduce(SafeCommandStore.TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp, | ||
| SafeCommandStore.TestDep testDep, @Nullable TxnId depId, | ||
| @Nullable Status minStatus, @Nullable Status maxStatus, | ||
| SafeCommandStore.CommandFunction<T, T> map, T initialValue, T terminalValue) | ||
| { | ||
|
|
||
| for (D data : (testTimestamp == TestTimestamp.BEFORE ? commands.headMap(timestamp, false) : commands.tailMap(timestamp, false)).values()) | ||
| { | ||
| TxnId txnId = loader.txnId(data); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I changed the ordering of this method due to cost to deserialize for |
||
| if (!testKind.test(txnId.rw())) continue; | ||
| SaveStatus status = loader.saveStatus(data); | ||
| if (minStatus != null && minStatus.compareTo(status.status) > 0) | ||
| continue; | ||
| if (maxStatus != null && maxStatus.compareTo(status.status) < 0) | ||
| continue; | ||
| List<TxnId> deps = loader.depsIds(data); | ||
| // If we don't have any dependencies, we treat a dependency filter as a mismatch | ||
| if (testDep != ANY_DEPS && (!status.known.deps.hasProposedOrDecidedDeps() || (deps.contains(depId) != (testDep == WITH)))) | ||
| continue; | ||
| Timestamp executeAt = loader.executeAt(data); | ||
| initialValue = map.apply(keyOrRange, txnId, executeAt, initialValue); | ||
| if (initialValue.equals(terminalValue)) | ||
| break; | ||
| } | ||
| return initialValue; | ||
| } | ||
|
|
||
| Stream<TxnId> between(Timestamp min, Timestamp max, Predicate<Status> statusPredicate) | ||
| { | ||
| return commands.subMap(min, true, max, true).values().stream() | ||
| .filter(d -> statusPredicate.test(loader.status(d))).map(loader::txnId); | ||
| } | ||
|
|
||
| public Stream<D> all() | ||
| { | ||
| return commands.values().stream(); | ||
| } | ||
|
|
||
| Update<D> beginUpdate() | ||
| { | ||
| return new Update<>(this); | ||
| } | ||
|
|
||
| public CommandLoader<D> loader() | ||
| { | ||
| return loader; | ||
| } | ||
|
|
||
| public interface CommandLoader<D> | ||
| { | ||
| D saveForCFK(Command command); | ||
|
|
||
| TxnId txnId(D data); | ||
| Timestamp executeAt(D data); | ||
| SaveStatus saveStatus(D data); | ||
| List<TxnId> depsIds(D data); | ||
|
|
||
| default Status status(D data) | ||
| { | ||
| return saveStatus(data).status; | ||
| } | ||
|
|
||
| default Status.Known known(D data) | ||
| { | ||
| return saveStatus(data).known; | ||
| } | ||
| } | ||
|
|
||
| public static class Update<D> | ||
| { | ||
| private final Seekable keyOrRange; | ||
| protected CommandLoader<D> loader; | ||
| protected NavigableMap<Timestamp, D> commands; | ||
|
|
||
| public Update(Seekable keyOrRange, CommandLoader<D> loader) | ||
| { | ||
| this.keyOrRange = keyOrRange; | ||
| this.loader = loader; | ||
| this.commands = new TreeMap<>(); | ||
| } | ||
|
|
||
| public Update(CommandTimeseries<D> timeseries) | ||
| { | ||
| this.keyOrRange = timeseries.keyOrRange; | ||
| this.loader = timeseries.loader; | ||
| this.commands = timeseries.commands; | ||
| } | ||
|
|
||
| public Update<D> add(Timestamp timestamp, Command command) | ||
| { | ||
| commands = ensureSortedMutable(commands); | ||
| commands.put(timestamp, loader.saveForCFK(command)); | ||
| return this; | ||
| } | ||
|
|
||
| public Update<D> add(Timestamp timestamp, D value) | ||
| { | ||
| commands = ensureSortedMutable(commands); | ||
| commands.put(timestamp, value); | ||
| return this; | ||
| } | ||
|
|
||
| public Update<D> remove(Timestamp timestamp) | ||
| { | ||
| commands = ensureSortedMutable(commands); | ||
| commands.remove(timestamp); | ||
| return this; | ||
| } | ||
|
|
||
| public CommandTimeseries<D> build() | ||
| { | ||
| return new CommandTimeseries<>(this); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package accord.impl; | ||
|
|
||
| import accord.primitives.Timestamp; | ||
|
|
||
| public interface CommandTimeseriesHolder | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not a fan of the name, so any recommendations welcome =) |
||
| { | ||
| CommandTimeseries<?> byId(); | ||
| CommandTimeseries<?> byExecuteAt(); | ||
|
|
||
| Timestamp max(); | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exhaustedcould be different things, so this makes it easier to see what happened; what table ranges could not be read?