diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java index 194fcdf81768..8de8ca84ebfd 100644 --- a/src/java/org/apache/cassandra/service/DataResolver.java +++ b/src/java/org/apache/cassandra/service/DataResolver.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; +import io.netty.util.concurrent.FastThreadLocal; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.schema.ColumnMetadata; @@ -44,10 +45,10 @@ public class DataResolver extends ResponseResolver { - private final Map> repairResponseRequestMap = new HashMap<>(); + private final FastThreadLocal>> repairResponseRequestMap = new FastThreadLocal<>(); private final long queryStartNanoTime; private Optional spareReadRepairNode; - private int responseCntSnapshot; + private final FastThreadLocal responseCntSnapshot = new FastThreadLocal<>(); public DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount, long queryStartNanoTime) { @@ -78,7 +79,7 @@ public PartitionIterator resolve() // reponse count and below "responseCntSnapshot" is used to calculate whether read repair is ok or not // since response list can get more response later, but we only iterator "responseCntSnapshot" for any future // operations including read repair retry. So we have to save this count in current state. - responseCntSnapshot = count; + responseCntSnapshot.set(count); List iters = new ArrayList<>(count); InetAddress[] sources = new InetAddress[count]; for (int i = 0; i < count; i++) @@ -180,9 +181,14 @@ public void close() int blockFor = consistency.blockFor(keyspace); if (Tracing.isTracing()) Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor); - logger.debug("Timeout while read-repairing after receiving all {} data and digest responses with exception {]", blockFor, ex); + logger.debug("Timeout while read-repairing after receiving all {} data and digest responses with exception {}", blockFor, ex); throw new ReadTimeoutException(consistency, blockFor-1, blockFor, true); } + finally + { + repairResponseRequestMap.remove(); + responseCntSnapshot.remove(); + } } /** @@ -201,7 +207,7 @@ public void close() private void waitRepairToFinishWithPossibleRetry() throws TimeoutException { - if (repairResponseRequestMap.isEmpty()) return; + if (!repairResponseRequestMap.isSet()) return; if (!consistency.satisfiesQuorumFor(keyspace)) { @@ -210,21 +216,13 @@ private void waitRepairToFinishWithPossibleRetry() throws TimeoutException return; } - ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().id); - long waitTimeNanos = cfs.sampleLatencyNanos; - - // no latency information, or we're overloaded - if (waitTimeNanos > TimeUnit.MILLISECONDS.toNanos(command.getTimeout()) || waitTimeNanos == 0) - { - // try to choose a default value - waitTimeNanos = TimeUnit.MILLISECONDS.toNanos((long)(DatabaseDescriptor.getReadRpcTimeout() / 4.0)); - } + long waitTimeMs = DatabaseDescriptor.getWriteRpcTimeout(); - List timeOuts = awaitRepairResponses(repairResponseRequestMap.keySet(), waitTimeNanos); + List timeOuts = awaitRepairResponses(repairResponseRequestMap.get().keySet(), waitTimeMs); int blockFor = consistency.blockFor(keyspace); long timeOutHostsCnt = distinctHostNum(timeOuts); - if (responseCntSnapshot - timeOutHostsCnt >= blockFor) + if (responseCntSnapshot.get() - timeOutHostsCnt >= blockFor) { //it's guaranteed to have at least blockFor replicas succeeded, then we don't need to worry about whether //read repair is done or not. "monotonic read" is already guaranteed @@ -237,29 +235,29 @@ private void waitRepairToFinishWithPossibleRetry() throws TimeoutException } // we only do extra read repair if we only need to do one more read repair to satisfy blockFor - if (responseCntSnapshot - timeOutHostsCnt == blockFor - 1) + if (responseCntSnapshot.get() - timeOutHostsCnt == blockFor - 1) { List repairRetryResponses = new ArrayList<>(); for(AsyncOneResponse response : timeOuts) { Tracing.trace("retry read-repair-mutation to {}", spareReadRepairNode.get()); - repairRetryResponses.add(MessagingService.instance().sendRR(repairResponseRequestMap.get(response).left, + repairRetryResponses.add(MessagingService.instance().sendRR(repairResponseRequestMap.get().get(response).left, spareReadRepairNode.get())); } - List retryTimeOut = awaitRepairResponses(repairRetryResponses, waitTimeNanos); + List retryTimeOut = awaitRepairResponses(repairRetryResponses, waitTimeMs); if (retryTimeOut.isEmpty()) return; //retry can not help, let's try previous repairResponse again to see whether there is one more done. - if (timeOutHostsCnt - distinctHostNum(awaitRepairResponses(repairResponseRequestMap.keySet(), waitTimeNanos)) >= 1) + if (timeOutHostsCnt - distinctHostNum(awaitRepairResponses(repairResponseRequestMap.get().keySet(), waitTimeMs)) >= 1) return; throw new TimeoutException("one more read repair can not help and check previous repair response again can not help either"); } else { - throw new TimeoutException("read repair timeout and will not retry read repair, diff count = " + (responseCntSnapshot - timeOutHostsCnt)); + throw new TimeoutException("read repair timeout and will not retry read repair, diff count = " + (responseCntSnapshot.get() - timeOutHostsCnt)); } } @@ -279,29 +277,26 @@ private void waitRepairToFinishWithPossibleRetry() throws TimeoutException */ private long distinctHostNum(final List responses) { - return responses.stream().map(response -> repairResponseRequestMap.get(response).right).distinct().count(); + return responses.stream().map(response -> repairResponseRequestMap.get().get(response).right).distinct().count(); } /** * * @param responses - * @param timeToWaitNanos + * @param timeToWaitMs * @return a list of response which have not responded in timeToWaitNanos window */ - private List awaitRepairResponses(final Collection responses, final long timeToWaitNanos) + private List awaitRepairResponses(final Collection responses, final long timeToWaitMs) { if (responses.isEmpty()) return Collections.emptyList(); List ret = new ArrayList<>(); - long start = System.nanoTime(); for (final AsyncOneResponse repairResponse : responses) { try { - long alreadyPassedNanos = System.nanoTime() - start ; - repairResponse.get(timeToWaitNanos - alreadyPassedNanos > 0 ? timeToWaitNanos - alreadyPassedNanos : 0, - TimeUnit.NANOSECONDS); + repairResponse.get(timeToWaitMs, TimeUnit.MILLISECONDS); } catch (final TimeoutException e) { @@ -535,7 +530,15 @@ public void close() // on-timeout behavior that a "real" mutation gets Tracing.trace("Sending read-repair-mutation to {}", sources[i]); MessageOut msg = new Mutation(repairs[i]).createMessage(MessagingService.Verb.READ_REPAIR); - repairResponseRequestMap.put(MessagingService.instance().sendRR(msg, sources[i]), Pair.create(msg, sources[i])); + if (!repairResponseRequestMap.isSet()) { + Map> map = new HashMap<>(); + map.put(MessagingService.instance().sendRR(msg, sources[i]), Pair.create(msg, sources[i])); + repairResponseRequestMap.set(map); + } + else + { + repairResponseRequestMap.get().put(MessagingService.instance().sendRR(msg, sources[i]), Pair.create(msg, sources[i])); + } } } } @@ -688,6 +691,6 @@ public boolean isDataPresent() public Map> getRepairResponseRequestMap() { - return new HashMap<>(repairResponseRequestMap); + return new HashMap<>(repairResponseRequestMap.get()); } }