Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/java/org/apache/cassandra/net/IncomingTcpConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.xerial.snappy.SnappyInputStream;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.UnknownColumnFamilyException;
import org.apache.cassandra.db.monitoring.ApproximateTime;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
import org.apache.cassandra.io.util.NIODataInputStream;
Expand Down Expand Up @@ -187,8 +188,8 @@ private InetAddress receiveMessage(DataInputPlus input, int version) throws IOEx
id = Integer.parseInt(input.readUTF());
else
id = input.readInt();

MessageIn message = MessageIn.read(input, version, id, MessageIn.readConstructionTime(from, input));
long currentTime = ApproximateTime.currentTimeMillis();
MessageIn message = MessageIn.read(input, version, id, MessageIn.readConstructionTime(from, input, currentTime));
if (message == null)
{
// callback expired; nothing to do
Expand Down
4 changes: 1 addition & 3 deletions src/java/org/apache/cassandra/net/MessageIn.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,8 @@ public static <T2> MessageIn<T2> read(DataInputPlus in, int version, int id, lon
return MessageIn.create(from, payload, parameters, verb, version, constructionTime);
}

public static long readConstructionTime(InetAddress from, DataInputPlus input) throws IOException
public static long readConstructionTime(InetAddress from, DataInputPlus input, long currentTime) throws IOException
{
long currentTime = ApproximateTime.currentTimeMillis();

// Reconstruct the message construction time sent by the remote host (we sent only the lower 4 bytes, assuming the
// higher 4 bytes wouldn't change between the sender and receiver)
int partial = input.readInt(); // make sure to readInt, even if cross_node_to is not enabled
Expand Down
20 changes: 8 additions & 12 deletions test/unit/org/apache/cassandra/net/MessagingServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,18 @@
import java.util.concurrent.TimeUnit;

import com.google.common.collect.Iterables;

import com.codahale.metrics.Timer;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.monitoring.ApproximateTime;
import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
import org.caffinitas.ohc.histo.EstimatedHistogram;

import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

import org.apache.cassandra.config.DatabaseDescriptor;

import static org.junit.Assert.*;

public class MessagingServiceTest
Expand Down Expand Up @@ -100,15 +98,13 @@ public void testDroppedMessages()
public void testDCLatency() throws Exception
{
int latency = 100;

ConcurrentHashMap<String, Timer> dcLatency = MessagingService.instance().metrics.dcLatency;
dcLatency.clear();

long now = System.currentTimeMillis();
long now = ApproximateTime.currentTimeMillis();
long sentAt = now - latency;

assertNull(dcLatency.get("datacenter1"));
addDCLatency(sentAt);
addDCLatency(sentAt, now);
assertNotNull(dcLatency.get("datacenter1"));
assertEquals(1, dcLatency.get("datacenter1").getCount());
long expectedBucket = bucketOffsets[Math.abs(Arrays.binarySearch(bucketOffsets, TimeUnit.MILLISECONDS.toNanos(latency))) - 1];
Expand All @@ -124,11 +120,11 @@ public void testNegativeDCLatency() throws Exception
ConcurrentHashMap<String, Timer> dcLatency = MessagingService.instance().metrics.dcLatency;
dcLatency.clear();

long now = System.currentTimeMillis();
long now = ApproximateTime.currentTimeMillis();
long sentAt = now - latency;

assertNull(dcLatency.get("datacenter1"));
addDCLatency(sentAt);
addDCLatency(sentAt, now);
assertNull(dcLatency.get("datacenter1"));
}

Expand Down Expand Up @@ -221,15 +217,15 @@ public void testDoesntApplyBackPressureToBroadcastAddress() throws UnknownHostEx
assertFalse(MockBackPressureStrategy.applied);
}

private static void addDCLatency(long sentAt) throws IOException
private static void addDCLatency(long sentAt, long nowTime) throws IOException
{
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (DataOutputStreamPlus out = new WrappedDataOutputStreamPlus(baos))
{
out.writeInt((int) sentAt);
}
DataInputStreamPlus in = new DataInputStreamPlus(new ByteArrayInputStream(baos.toByteArray()));
MessageIn.readConstructionTime(InetAddress.getLocalHost(), in);
MessageIn.readConstructionTime(InetAddress.getLocalHost(), in, nowTime);
}

public static class MockBackPressureStrategy implements BackPressureStrategy<MockBackPressureStrategy.MockBackPressureState>
Expand Down