diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java index 7d3c607e69bc..987859052269 100644 --- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java +++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java @@ -39,6 +39,7 @@ import org.xerial.snappy.SnappyInputStream; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.UnknownColumnFamilyException; +import org.apache.cassandra.db.monitoring.ApproximateTime; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus; import org.apache.cassandra.io.util.NIODataInputStream; @@ -187,8 +188,8 @@ private InetAddress receiveMessage(DataInputPlus input, int version) throws IOEx id = Integer.parseInt(input.readUTF()); else id = input.readInt(); - - MessageIn message = MessageIn.read(input, version, id, MessageIn.readConstructionTime(from, input)); + long currentTime = ApproximateTime.currentTimeMillis(); + MessageIn message = MessageIn.read(input, version, id, MessageIn.readConstructionTime(from, input, currentTime)); if (message == null) { // callback expired; nothing to do diff --git a/src/java/org/apache/cassandra/net/MessageIn.java b/src/java/org/apache/cassandra/net/MessageIn.java index a25474199e6f..0de9520688bb 100644 --- a/src/java/org/apache/cassandra/net/MessageIn.java +++ b/src/java/org/apache/cassandra/net/MessageIn.java @@ -124,10 +124,8 @@ public static MessageIn read(DataInputPlus in, int version, int id, lon return MessageIn.create(from, payload, parameters, verb, version, constructionTime); } - public static long readConstructionTime(InetAddress from, DataInputPlus input) throws IOException + public static long readConstructionTime(InetAddress from, DataInputPlus input, long currentTime) throws IOException { - long currentTime = ApproximateTime.currentTimeMillis(); - // Reconstruct the message construction time sent by the remote host (we sent only the lower 4 bytes, assuming the // higher 4 bytes wouldn't change between the sender and receiver) int partial = input.readInt(); // make sure to readInt, even if cross_node_to is not enabled diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java index ec27b7e23dde..11d17b842549 100644 --- a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java +++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java @@ -34,20 +34,18 @@ import java.util.concurrent.TimeUnit; import com.google.common.collect.Iterables; - import com.codahale.metrics.Timer; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.monitoring.ApproximateTime; import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus; import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus; import org.caffinitas.ohc.histo.EstimatedHistogram; - import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import org.apache.cassandra.config.DatabaseDescriptor; - import static org.junit.Assert.*; public class MessagingServiceTest @@ -100,15 +98,13 @@ public void testDroppedMessages() public void testDCLatency() throws Exception { int latency = 100; - ConcurrentHashMap dcLatency = MessagingService.instance().metrics.dcLatency; dcLatency.clear(); - long now = System.currentTimeMillis(); + long now = ApproximateTime.currentTimeMillis(); long sentAt = now - latency; - assertNull(dcLatency.get("datacenter1")); - addDCLatency(sentAt); + addDCLatency(sentAt, now); assertNotNull(dcLatency.get("datacenter1")); assertEquals(1, dcLatency.get("datacenter1").getCount()); long expectedBucket = bucketOffsets[Math.abs(Arrays.binarySearch(bucketOffsets, TimeUnit.MILLISECONDS.toNanos(latency))) - 1]; @@ -124,11 +120,11 @@ public void testNegativeDCLatency() throws Exception ConcurrentHashMap dcLatency = MessagingService.instance().metrics.dcLatency; dcLatency.clear(); - long now = System.currentTimeMillis(); + long now = ApproximateTime.currentTimeMillis(); long sentAt = now - latency; assertNull(dcLatency.get("datacenter1")); - addDCLatency(sentAt); + addDCLatency(sentAt, now); assertNull(dcLatency.get("datacenter1")); } @@ -221,7 +217,7 @@ public void testDoesntApplyBackPressureToBroadcastAddress() throws UnknownHostEx assertFalse(MockBackPressureStrategy.applied); } - private static void addDCLatency(long sentAt) throws IOException + private static void addDCLatency(long sentAt, long nowTime) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); try (DataOutputStreamPlus out = new WrappedDataOutputStreamPlus(baos)) @@ -229,7 +225,7 @@ private static void addDCLatency(long sentAt) throws IOException out.writeInt((int) sentAt); } DataInputStreamPlus in = new DataInputStreamPlus(new ByteArrayInputStream(baos.toByteArray())); - MessageIn.readConstructionTime(InetAddress.getLocalHost(), in); + MessageIn.readConstructionTime(InetAddress.getLocalHost(), in, nowTime); } public static class MockBackPressureStrategy implements BackPressureStrategy