Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 33 additions & 30 deletions src/java/org/apache/cassandra/service/DataResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import io.netty.util.concurrent.FastThreadLocal;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.schema.ColumnMetadata;
Expand All @@ -44,10 +45,10 @@

public class DataResolver extends ResponseResolver
{
private final Map<AsyncOneResponse, Pair<MessageOut, InetAddress>> repairResponseRequestMap = new HashMap<>();
private final FastThreadLocal<Map<AsyncOneResponse, Pair<MessageOut, InetAddress>>> repairResponseRequestMap = new FastThreadLocal<>();
private final long queryStartNanoTime;
private Optional<InetAddress> spareReadRepairNode;
private int responseCntSnapshot;
private final FastThreadLocal<Integer> responseCntSnapshot = new FastThreadLocal<>();

public DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount, long queryStartNanoTime)
{
Expand Down Expand Up @@ -78,7 +79,7 @@ public PartitionIterator resolve()
// reponse count and below "responseCntSnapshot" is used to calculate whether read repair is ok or not
// since response list can get more response later, but we only iterator "responseCntSnapshot" for any future
// operations including read repair retry. So we have to save this count in current state.
responseCntSnapshot = count;
responseCntSnapshot.set(count);
List<UnfilteredPartitionIterator> iters = new ArrayList<>(count);
InetAddress[] sources = new InetAddress[count];
for (int i = 0; i < count; i++)
Expand Down Expand Up @@ -180,9 +181,14 @@ public void close()
int blockFor = consistency.blockFor(keyspace);
if (Tracing.isTracing())
Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor);
logger.debug("Timeout while read-repairing after receiving all {} data and digest responses with exception {]", blockFor, ex);
logger.debug("Timeout while read-repairing after receiving all {} data and digest responses with exception {}", blockFor, ex);
throw new ReadTimeoutException(consistency, blockFor-1, blockFor, true);
}
finally
{
repairResponseRequestMap.remove();
responseCntSnapshot.remove();
}
}

/**
Expand All @@ -201,7 +207,7 @@ public void close()

private void waitRepairToFinishWithPossibleRetry() throws TimeoutException
{
if (repairResponseRequestMap.isEmpty()) return;
if (!repairResponseRequestMap.isSet()) return;

if (!consistency.satisfiesQuorumFor(keyspace))
{
Expand All @@ -210,21 +216,13 @@ private void waitRepairToFinishWithPossibleRetry() throws TimeoutException
return;
}

ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().id);
long waitTimeNanos = cfs.sampleLatencyNanos;

// no latency information, or we're overloaded
if (waitTimeNanos > TimeUnit.MILLISECONDS.toNanos(command.getTimeout()) || waitTimeNanos == 0)
{
// try to choose a default value
waitTimeNanos = TimeUnit.MILLISECONDS.toNanos((long)(DatabaseDescriptor.getReadRpcTimeout() / 4.0));
}
long waitTimeMs = DatabaseDescriptor.getWriteRpcTimeout();

List<AsyncOneResponse> timeOuts = awaitRepairResponses(repairResponseRequestMap.keySet(), waitTimeNanos);
List<AsyncOneResponse> timeOuts = awaitRepairResponses(repairResponseRequestMap.get().keySet(), waitTimeMs);

int blockFor = consistency.blockFor(keyspace);
long timeOutHostsCnt = distinctHostNum(timeOuts);
if (responseCntSnapshot - timeOutHostsCnt >= blockFor)
if (responseCntSnapshot.get() - timeOutHostsCnt >= blockFor)
{
//it's guaranteed to have at least blockFor replicas succeeded, then we don't need to worry about whether
//read repair is done or not. "monotonic read" is already guaranteed
Expand All @@ -237,29 +235,29 @@ private void waitRepairToFinishWithPossibleRetry() throws TimeoutException
}

// we only do extra read repair if we only need to do one more read repair to satisfy blockFor
if (responseCntSnapshot - timeOutHostsCnt == blockFor - 1)
if (responseCntSnapshot.get() - timeOutHostsCnt == blockFor - 1)
{
List<AsyncOneResponse> repairRetryResponses = new ArrayList<>();
for(AsyncOneResponse response : timeOuts)
{
Tracing.trace("retry read-repair-mutation to {}", spareReadRepairNode.get());
repairRetryResponses.add(MessagingService.instance().sendRR(repairResponseRequestMap.get(response).left,
repairRetryResponses.add(MessagingService.instance().sendRR(repairResponseRequestMap.get().get(response).left,
spareReadRepairNode.get()));
}

List<AsyncOneResponse> retryTimeOut = awaitRepairResponses(repairRetryResponses, waitTimeNanos);
List<AsyncOneResponse> retryTimeOut = awaitRepairResponses(repairRetryResponses, waitTimeMs);
if (retryTimeOut.isEmpty())
return;

//retry can not help, let's try previous repairResponse again to see whether there is one more done.
if (timeOutHostsCnt - distinctHostNum(awaitRepairResponses(repairResponseRequestMap.keySet(), waitTimeNanos)) >= 1)
if (timeOutHostsCnt - distinctHostNum(awaitRepairResponses(repairResponseRequestMap.get().keySet(), waitTimeMs)) >= 1)
return;

throw new TimeoutException("one more read repair can not help and check previous repair response again can not help either");
}
else
{
throw new TimeoutException("read repair timeout and will not retry read repair, diff count = " + (responseCntSnapshot - timeOutHostsCnt));
throw new TimeoutException("read repair timeout and will not retry read repair, diff count = " + (responseCntSnapshot.get() - timeOutHostsCnt));
}
}

Expand All @@ -279,29 +277,26 @@ private void waitRepairToFinishWithPossibleRetry() throws TimeoutException
*/
private long distinctHostNum(final List<AsyncOneResponse> responses)
{
return responses.stream().map(response -> repairResponseRequestMap.get(response).right).distinct().count();
return responses.stream().map(response -> repairResponseRequestMap.get().get(response).right).distinct().count();
}

/**
*
* @param responses
* @param timeToWaitNanos
* @param timeToWaitMs
* @return a list of response which have not responded in timeToWaitNanos window
*/
private List<AsyncOneResponse> awaitRepairResponses(final Collection<AsyncOneResponse> responses, final long timeToWaitNanos)
private List<AsyncOneResponse> awaitRepairResponses(final Collection<AsyncOneResponse> responses, final long timeToWaitMs)
{
if (responses.isEmpty())
return Collections.emptyList();

List<AsyncOneResponse> ret = new ArrayList<>();
long start = System.nanoTime();
for (final AsyncOneResponse repairResponse : responses)
{
try
{
long alreadyPassedNanos = System.nanoTime() - start ;
repairResponse.get(timeToWaitNanos - alreadyPassedNanos > 0 ? timeToWaitNanos - alreadyPassedNanos : 0,
TimeUnit.NANOSECONDS);
repairResponse.get(timeToWaitMs, TimeUnit.MILLISECONDS);
}
catch (final TimeoutException e)
{
Expand Down Expand Up @@ -535,7 +530,15 @@ public void close()
// on-timeout behavior that a "real" mutation gets
Tracing.trace("Sending read-repair-mutation to {}", sources[i]);
MessageOut<Mutation> msg = new Mutation(repairs[i]).createMessage(MessagingService.Verb.READ_REPAIR);
repairResponseRequestMap.put(MessagingService.instance().sendRR(msg, sources[i]), Pair.create(msg, sources[i]));
if (!repairResponseRequestMap.isSet()) {
Map<AsyncOneResponse, Pair<MessageOut, InetAddress>> map = new HashMap<>();
map.put(MessagingService.instance().sendRR(msg, sources[i]), Pair.create(msg, sources[i]));
repairResponseRequestMap.set(map);
}
else
{
repairResponseRequestMap.get().put(MessagingService.instance().sendRR(msg, sources[i]), Pair.create(msg, sources[i]));
}
}
}
}
Expand Down Expand Up @@ -688,6 +691,6 @@ public boolean isDataPresent()

public Map<AsyncOneResponse, Pair<MessageOut, InetAddress>> getRepairResponseRequestMap()
{
return new HashMap<>(repairResponseRequestMap);
return new HashMap<>(repairResponseRequestMap.get());
}
}