Skip to content

Commit

Permalink
Gather cache message latency stats and periodically log a summary
Browse files Browse the repository at this point in the history
  • Loading branch information
Keith R. Gustafson committed Aug 24, 2022
1 parent 5afdeb8 commit 9a16dba
Showing 1 changed file with 184 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@

package com.techempower.gemini.cluster.jms;

import java.util.*;
import javax.jms.*;

import org.slf4j.*;

import com.techempower.cache.*;
import com.techempower.collection.relation.*;
import com.techempower.data.*;
Expand All @@ -51,6 +50,8 @@ public class CacheMessageManager
{
public static final String CACHE_TOPIC_DESTINATION = "CACHE.TOPIC";
public static final String MESSAGE_PROPERTY_UUID = "Gemini.CacheMgr.ClientUUID";
public static final long DEFAULT_STATS_PERIOD_MINUTES = 10;
public static final long DEFAULT_STATS_LOG_MAX_THRESHOLD_MS = 10;

//
// Variables.
Expand All @@ -68,6 +69,8 @@ public class CacheMessageManager
private String instanceID;
private int maximumRelationSize = 10000;
private int deliveryMode = DeliveryMode.PERSISTENT;
private long statsPeriodMinutes = DEFAULT_STATS_PERIOD_MINUTES;
private long statsLogMaxThresholdMs = DEFAULT_STATS_LOG_MAX_THRESHOLD_MS;

//
// Methods.
Expand All @@ -92,7 +95,13 @@ public void configure(EnhancedProperties props)
String propsPrefix = "CacheMessageManager.";
this.maximumRelationSize = props.getInt(
propsPrefix + "MaximumRelationSize", this.maximumRelationSize);
log.info("[CacheMessageManager.MaximumRelationSize: " + maximumRelationSize + "]");
this.deliveryMode = props.getInt(propsPrefix + "DeliveryMode", this.deliveryMode);
log.info("[CacheMessageManager.DeliveryMode: " + deliveryMode + "]");
this.statsPeriodMinutes = props.getLong("StatsPeriodMinutes", DEFAULT_STATS_PERIOD_MINUTES);
log.info("[CacheMessageManager.StatsPeriodMinutes: " + statsPeriodMinutes + "]");
this.statsLogMaxThresholdMs = props.getLong("StatsLogMaxThresholdMs", DEFAULT_STATS_LOG_MAX_THRESHOLD_MS);
log.info("[CacheMessageManager.StatsLogMaxThresholdMs: " + statsLogMaxThresholdMs + "]");
}

/**
Expand Down Expand Up @@ -383,6 +392,17 @@ public void reset(long relationID)
private class CacheSignalListener
implements MessageListener
{
private long statsCollectionStart = System.currentTimeMillis();
// No need for concurrent data structures since a listener will only get
// one message at a time.
private Map<String, Long> statsCount = new HashMap<>();
private Map<String, Long> statsTxMin = new HashMap<>();
private Map<String, Long> statsTxMax = new HashMap<>();
private Map<String, Long> statsTxSum = new HashMap<>();
private Map<String, Long> statsPxMin = new HashMap<>();
private Map<String, Long> statsPxMax = new HashMap<>();
private Map<String, Long> statsPxSum = new HashMap<>();
private long statsCollectionMs = 0L;

public CacheSignalListener(GeminiApplication application)
{
Expand All @@ -399,6 +419,8 @@ public void onMessage(javax.jms.Message message)
log.debug("EntityStore is not yet initialized. Ignoring message.");
return;
}
long start = System.currentTimeMillis();
String statsKey = null;
BroadcastMessage broadcastMessage = null;
// cast object to BroadcastMessage
if (message instanceof ObjectMessage)
Expand Down Expand Up @@ -441,6 +463,8 @@ else if (senderUuid.equals(instanceID))
if (broadcastMessage instanceof CacheMessage)
{
final CacheMessage cacheMessage = (CacheMessage)broadcastMessage;
statsKey = "g" + cacheMessage.getGroupId();

// handle the message
switch (cacheMessage.getAction())
{
Expand Down Expand Up @@ -540,6 +564,7 @@ else if (broadcastMessage instanceof CachedRelationMessage)
{
final CachedRelationMessage cachedRelationMessage = (CachedRelationMessage)broadcastMessage;
final CachedRelation<?, ?> relation = store.getCachedRelation(cachedRelationMessage.getRelationId());
statsKey = "r" + cachedRelationMessage.getRelationId();
switch (cachedRelationMessage.getAction())
{
case (CachedRelationMessage.ACTION_ADD):
Expand Down Expand Up @@ -610,6 +635,163 @@ else if (broadcastMessage instanceof CachedRelationMessage)
}
}
}

if (statsKey != null) {
// Gather statistics on transmission and receiver processing timings and periodically log a
// summary.
try {
// Track how long it takes to gather the stats.
long startStats = System.currentTimeMillis();

// Processing time: How long it took to take action on the message.
long pxTime = System.currentTimeMillis() - start;

// Transmission time: How long it took to receive the message.
long txTime = start - message.getJMSTimestamp();

// Count: How many messages have been received during the reporting period.
statsCount.merge(statsKey, 1L, Long::sum);

// Transmission sum: Sum of transmission times during the reporting period.
statsTxSum.merge(statsKey, txTime, Long::sum);

// Transmission max: Longest recorded transmission time during the reporting period.
long txMax = statsTxMax.getOrDefault(statsKey, -1L);
if (txTime > txMax) {
statsTxMax.put(statsKey, txTime);
// Log the new max to help with investigating performance problems.
if (txTime > statsLogMaxThresholdMs) {
// To reduce logging noise, only log new max values that exceed the configured
// threshold.
log.info("Stats for " + statsKey + ": New transmission max of " + txTime + ": " + message);
}
}

// Transmission min: Shortest recorded transmission time during the reporting period.
long txMin = statsTxMin.getOrDefault(statsKey, -1L);
if (txMin == -1L || txTime < txMin) {
statsTxMin.put(statsKey, txTime);
}

// Processing sum: Sum of processing times during the reporting period.
statsPxSum.merge(statsKey, pxTime, Long::sum);

// Processing max: Longest recorded processing time during the reporting period.
long pxMax = statsPxMax.getOrDefault(statsKey, -1L);
if (pxTime > pxMax) {
statsPxMax.put(statsKey, pxTime);
// Log the new max to help with investigating performance problems.
if (pxTime > statsLogMaxThresholdMs) {
// To reduce logging noise, only log new max values that exceed the configured
// threshold.
log.info("Stats for " + statsKey + ": New receiver processing max of " + pxTime + ": " + message);
}
}

// Processing min: Shortest recorded processing time during the reporting period.
long pxMin = statsPxMin.getOrDefault(statsKey, -1L);
if (pxMin == -1L || pxTime < pxMin) {
statsPxMin.put(statsKey, pxTime);
}

// Record time spent gathering stats.
statsCollectionMs += System.currentTimeMillis() - startStats;

// Periodically log collected stats and reset accumulators.
if ((System.currentTimeMillis() - statsCollectionStart) >
(statsPeriodMinutes * UtilityConstants.MINUTE)) {

// Log stats for each CacheGroup/CachedRelation that was received during the last period.
for (String key : statsCount.keySet()) {
// Count: How many messages have been received during the reporting period.
long keyCount = statsCount.getOrDefault(key, -1L);

// Transmission sum: Sum of transmission times during the reporting period.
long keyTxSum = statsTxSum.getOrDefault(key, -1L);

// Transmission avg: Average of transmission times during the reporting period.
long keyTxAvg = keyCount > 0 ? keyTxSum / keyCount : 0;

// Transmission max: Longest recorded transmission time during the reporting period.
long keyTxMax = statsTxMax.getOrDefault(key, -1L);

// Transmission min: Shortest recorded transmission time during the reporting period.
long keyTxMin = statsTxMin.getOrDefault(key, -1L);;

// Processing sum: Sum of processing times during the reporting period.
long keyPxSum = statsPxSum.getOrDefault(key, -1L);;

// Processing avg: Average of processing times during the reporting period.
long keyPxAvg = keyCount > 0 ? keyPxSum / keyCount : 0;

// Processing max: Longest recorded processing time during the reporting period.
long keyPxMax = statsPxMax.getOrDefault(key, -1L);;

// Processing min: Shortest recorded processing time during the reporting period.
long keyPxMin = statsPxMin.getOrDefault(key, -1L);

// Log summary of collected statistics.
log.info("Stats summary for " + key + ": count " + keyCount
+ " transmission (max/min/avg) " + keyTxMax
+ " " + keyTxMin
+ " " + keyTxAvg
+ " receiver processing (max/min/avg) " + keyPxMax
+ " " + keyPxMin
+ " " + keyPxAvg);
}

// Count: How many messages have been received since last reset.
long overallCount = statsCount.values().stream().mapToLong(Long::longValue).sum();

// Transmission sum: Sum of transmission times during the reporting period.
long overallTxSum = statsTxSum.values().stream().mapToLong(Long::longValue).sum();

// Transmission avg: Average of transmission times during the reporting period.
long overallTxAvg = overallCount > 0 ? overallTxSum / overallCount : 0;

// Transmission max: Longest recorded transmission time during the reporting period.
long overallTxMax = statsTxMax.values().stream().mapToLong(Long::longValue).max().orElse(-1);

// Transmission min: Shortest recorded transmission time during the reporting period.
long overallTxMin = statsTxMin.values().stream().mapToLong(Long::longValue).min().orElse(-1);

// Processing sum: Sum of processing times during the reporting period.
long overallPxSum = statsPxSum.values().stream().mapToLong(Long::longValue).sum();

// Processing avg: Average of processing times during the reporting period.
long overallPxAvg = overallCount > 0 ? overallPxSum / overallCount : 0;

// Processing max: Longest recorded processing time during the reporting period.
long overallPxMax = statsPxMax.values().stream().mapToLong(Long::longValue).max().orElse(-1);

// Processing min: Shortest recorded processing time during the reporting period.
long overallPxMin = statsPxMin.values().stream().mapToLong(Long::longValue).min().orElse(-1);

// Log summary of collected statistics.
log.info("Stats summary overall: count " + overallCount
+ " transmission (max/min/avg) " + overallTxMax
+ " " + overallTxMin
+ " " + overallTxAvg
+ " receiver processing (max/min/avg) " + overallPxMax
+ " " + overallPxMin
+ " " + overallPxAvg
+ " stats collection time: " + statsCollectionMs);

// Reset all accumulators.
statsCount.clear();
statsTxMin.clear();
statsTxMax.clear();
statsTxSum.clear();
statsPxMin.clear();
statsPxMax.clear();
statsPxSum.clear();
statsCollectionMs = 0L;
statsCollectionStart = System.currentTimeMillis();
}
} catch (Exception e) {
log.error("Unable to gather statistics", e);
}
}
}
}
}

0 comments on commit 9a16dba

Please sign in to comment.