diff --git a/gemini/src/main/java/com/techempower/gemini/cluster/jms/CacheMessageManager.java b/gemini/src/main/java/com/techempower/gemini/cluster/jms/CacheMessageManager.java index fa0ec71..d1e68a4 100755 --- a/gemini/src/main/java/com/techempower/gemini/cluster/jms/CacheMessageManager.java +++ b/gemini/src/main/java/com/techempower/gemini/cluster/jms/CacheMessageManager.java @@ -27,10 +27,9 @@ package com.techempower.gemini.cluster.jms; +import java.util.*; import javax.jms.*; - import org.slf4j.*; - import com.techempower.cache.*; import com.techempower.collection.relation.*; import com.techempower.data.*; @@ -51,6 +50,8 @@ public class CacheMessageManager { public static final String CACHE_TOPIC_DESTINATION = "CACHE.TOPIC"; public static final String MESSAGE_PROPERTY_UUID = "Gemini.CacheMgr.ClientUUID"; + public static final long DEFAULT_STATS_PERIOD_MINUTES = 10; + public static final long DEFAULT_STATS_LOG_MAX_THRESHOLD_MS = 10; // // Variables. @@ -68,6 +69,8 @@ public class CacheMessageManager private String instanceID; private int maximumRelationSize = 10000; private int deliveryMode = DeliveryMode.PERSISTENT; + private long statsPeriodMinutes = DEFAULT_STATS_PERIOD_MINUTES; + private long statsLogMaxThresholdMs = DEFAULT_STATS_LOG_MAX_THRESHOLD_MS; // // Methods. @@ -92,7 +95,13 @@ public void configure(EnhancedProperties props) String propsPrefix = "CacheMessageManager."; this.maximumRelationSize = props.getInt( propsPrefix + "MaximumRelationSize", this.maximumRelationSize); + log.info("[CacheMessageManager.MaximumRelationSize: " + maximumRelationSize + "]"); this.deliveryMode = props.getInt(propsPrefix + "DeliveryMode", this.deliveryMode); + log.info("[CacheMessageManager.DeliveryMode: " + deliveryMode + "]"); + this.statsPeriodMinutes = props.getLong("StatsPeriodMinutes", DEFAULT_STATS_PERIOD_MINUTES); + log.info("[CacheMessageManager.StatsPeriodMinutes: " + statsPeriodMinutes + "]"); + this.statsLogMaxThresholdMs = props.getLong("StatsLogMaxThresholdMs", DEFAULT_STATS_LOG_MAX_THRESHOLD_MS); + log.info("[CacheMessageManager.StatsLogMaxThresholdMs: " + statsLogMaxThresholdMs + "]"); } /** @@ -383,6 +392,17 @@ public void reset(long relationID) private class CacheSignalListener implements MessageListener { + private long statsCollectionStart = System.currentTimeMillis(); + // No need for concurrent data structures since a listener will only get + // one message at a time. + private Map statsCount = new HashMap<>(); + private Map statsTxMin = new HashMap<>(); + private Map statsTxMax = new HashMap<>(); + private Map statsTxSum = new HashMap<>(); + private Map statsPxMin = new HashMap<>(); + private Map statsPxMax = new HashMap<>(); + private Map statsPxSum = new HashMap<>(); + private long statsCollectionMs = 0L; public CacheSignalListener(GeminiApplication application) { @@ -399,6 +419,8 @@ public void onMessage(javax.jms.Message message) log.debug("EntityStore is not yet initialized. Ignoring message."); return; } + long start = System.currentTimeMillis(); + String statsKey = null; BroadcastMessage broadcastMessage = null; // cast object to BroadcastMessage if (message instanceof ObjectMessage) @@ -441,6 +463,8 @@ else if (senderUuid.equals(instanceID)) if (broadcastMessage instanceof CacheMessage) { final CacheMessage cacheMessage = (CacheMessage)broadcastMessage; + statsKey = "g" + cacheMessage.getGroupId(); + // handle the message switch (cacheMessage.getAction()) { @@ -540,6 +564,7 @@ else if (broadcastMessage instanceof CachedRelationMessage) { final CachedRelationMessage cachedRelationMessage = (CachedRelationMessage)broadcastMessage; final CachedRelation relation = store.getCachedRelation(cachedRelationMessage.getRelationId()); + statsKey = "r" + cachedRelationMessage.getRelationId(); switch (cachedRelationMessage.getAction()) { case (CachedRelationMessage.ACTION_ADD): @@ -610,6 +635,163 @@ else if (broadcastMessage instanceof CachedRelationMessage) } } } + + if (statsKey != null) { + // Gather statistics on transmission and receiver processing timings and periodically log a + // summary. + try { + // Track how long it takes to gather the stats. + long startStats = System.currentTimeMillis(); + + // Processing time: How long it took to take action on the message. + long pxTime = System.currentTimeMillis() - start; + + // Transmission time: How long it took to receive the message. + long txTime = start - message.getJMSTimestamp(); + + // Count: How many messages have been received during the reporting period. + statsCount.merge(statsKey, 1L, Long::sum); + + // Transmission sum: Sum of transmission times during the reporting period. + statsTxSum.merge(statsKey, txTime, Long::sum); + + // Transmission max: Longest recorded transmission time during the reporting period. + long txMax = statsTxMax.getOrDefault(statsKey, -1L); + if (txTime > txMax) { + statsTxMax.put(statsKey, txTime); + // Log the new max to help with investigating performance problems. + if (txTime > statsLogMaxThresholdMs) { + // To reduce logging noise, only log new max values that exceed the configured + // threshold. + log.info("Stats for " + statsKey + ": New transmission max of " + txTime + ": " + message); + } + } + + // Transmission min: Shortest recorded transmission time during the reporting period. + long txMin = statsTxMin.getOrDefault(statsKey, -1L); + if (txMin == -1L || txTime < txMin) { + statsTxMin.put(statsKey, txTime); + } + + // Processing sum: Sum of processing times during the reporting period. + statsPxSum.merge(statsKey, pxTime, Long::sum); + + // Processing max: Longest recorded processing time during the reporting period. + long pxMax = statsPxMax.getOrDefault(statsKey, -1L); + if (pxTime > pxMax) { + statsPxMax.put(statsKey, pxTime); + // Log the new max to help with investigating performance problems. + if (pxTime > statsLogMaxThresholdMs) { + // To reduce logging noise, only log new max values that exceed the configured + // threshold. + log.info("Stats for " + statsKey + ": New receiver processing max of " + pxTime + ": " + message); + } + } + + // Processing min: Shortest recorded processing time during the reporting period. + long pxMin = statsPxMin.getOrDefault(statsKey, -1L); + if (pxMin == -1L || pxTime < pxMin) { + statsPxMin.put(statsKey, pxTime); + } + + // Record time spent gathering stats. + statsCollectionMs += System.currentTimeMillis() - startStats; + + // Periodically log collected stats and reset accumulators. + if ((System.currentTimeMillis() - statsCollectionStart) > + (statsPeriodMinutes * UtilityConstants.MINUTE)) { + + // Log stats for each CacheGroup/CachedRelation that was received during the last period. + for (String key : statsCount.keySet()) { + // Count: How many messages have been received during the reporting period. + long keyCount = statsCount.getOrDefault(key, -1L); + + // Transmission sum: Sum of transmission times during the reporting period. + long keyTxSum = statsTxSum.getOrDefault(key, -1L); + + // Transmission avg: Average of transmission times during the reporting period. + long keyTxAvg = keyCount > 0 ? keyTxSum / keyCount : 0; + + // Transmission max: Longest recorded transmission time during the reporting period. + long keyTxMax = statsTxMax.getOrDefault(key, -1L); + + // Transmission min: Shortest recorded transmission time during the reporting period. + long keyTxMin = statsTxMin.getOrDefault(key, -1L);; + + // Processing sum: Sum of processing times during the reporting period. + long keyPxSum = statsPxSum.getOrDefault(key, -1L);; + + // Processing avg: Average of processing times during the reporting period. + long keyPxAvg = keyCount > 0 ? keyPxSum / keyCount : 0; + + // Processing max: Longest recorded processing time during the reporting period. + long keyPxMax = statsPxMax.getOrDefault(key, -1L);; + + // Processing min: Shortest recorded processing time during the reporting period. + long keyPxMin = statsPxMin.getOrDefault(key, -1L); + + // Log summary of collected statistics. + log.info("Stats summary for " + key + ": count " + keyCount + + " transmission (max/min/avg) " + keyTxMax + + " " + keyTxMin + + " " + keyTxAvg + + " receiver processing (max/min/avg) " + keyPxMax + + " " + keyPxMin + + " " + keyPxAvg); + } + + // Count: How many messages have been received since last reset. + long overallCount = statsCount.values().stream().mapToLong(Long::longValue).sum(); + + // Transmission sum: Sum of transmission times during the reporting period. + long overallTxSum = statsTxSum.values().stream().mapToLong(Long::longValue).sum(); + + // Transmission avg: Average of transmission times during the reporting period. + long overallTxAvg = overallCount > 0 ? overallTxSum / overallCount : 0; + + // Transmission max: Longest recorded transmission time during the reporting period. + long overallTxMax = statsTxMax.values().stream().mapToLong(Long::longValue).max().orElse(-1); + + // Transmission min: Shortest recorded transmission time during the reporting period. + long overallTxMin = statsTxMin.values().stream().mapToLong(Long::longValue).min().orElse(-1); + + // Processing sum: Sum of processing times during the reporting period. + long overallPxSum = statsPxSum.values().stream().mapToLong(Long::longValue).sum(); + + // Processing avg: Average of processing times during the reporting period. + long overallPxAvg = overallCount > 0 ? overallPxSum / overallCount : 0; + + // Processing max: Longest recorded processing time during the reporting period. + long overallPxMax = statsPxMax.values().stream().mapToLong(Long::longValue).max().orElse(-1); + + // Processing min: Shortest recorded processing time during the reporting period. + long overallPxMin = statsPxMin.values().stream().mapToLong(Long::longValue).min().orElse(-1); + + // Log summary of collected statistics. + log.info("Stats summary overall: count " + overallCount + + " transmission (max/min/avg) " + overallTxMax + + " " + overallTxMin + + " " + overallTxAvg + + " receiver processing (max/min/avg) " + overallPxMax + + " " + overallPxMin + + " " + overallPxAvg + + " stats collection time: " + statsCollectionMs); + + // Reset all accumulators. + statsCount.clear(); + statsTxMin.clear(); + statsTxMax.clear(); + statsTxSum.clear(); + statsPxMin.clear(); + statsPxMax.clear(); + statsPxSum.clear(); + statsCollectionMs = 0L; + statsCollectionStart = System.currentTimeMillis(); + } + } catch (Exception e) { + log.error("Unable to gather statistics", e); + } + } } } }