diff --git a/.trivyignore b/.trivyignore
index 223846f..82ae41f 100644
--- a/.trivyignore
+++ b/.trivyignore
@@ -5,5 +5,9 @@
# UID2-6097
CVE-2025-59375 exp:2025-12-15
-# UID2-6128
-CVE-2025-55163 exp:2025-11-30
+# UID2-6340
+CVE-2025-64720 exp:2026-06-05
+
+# UID2-6340
+CVE-2025-65018 exp:2026-06-05
+
diff --git a/pom.xml b/pom.xml
index 2d85c1f..7e3305b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -15,7 +15,7 @@
4.5.21
1.1.0
- 11.1.13
+ 11.1.91
${project.version}
5.10.1
5.10.1
diff --git a/src/main/java/com/uid2/optout/vertx/OptOutTrafficCalculator.java b/src/main/java/com/uid2/optout/vertx/OptOutTrafficCalculator.java
new file mode 100644
index 0000000..f0c7a7c
--- /dev/null
+++ b/src/main/java/com/uid2/optout/vertx/OptOutTrafficCalculator.java
@@ -0,0 +1,574 @@
+package com.uid2.optout.vertx;
+
+import com.uid2.shared.cloud.ICloudStorage;
+import com.uid2.shared.optout.OptOutCollection;
+import com.uid2.shared.optout.OptOutEntry;
+import com.uid2.shared.optout.OptOutUtils;
+import com.uid2.optout.Const;
+import io.vertx.core.json.JsonObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.sqs.model.Message;
+import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;
+
+import java.nio.charset.StandardCharsets;
+
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.io.InputStream;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * Calculates opt-out traffic patterns to determine DEFAULT or DELAYED_PROCESSING status.
+ *
+ * Compares recent ~24h traffic (sumCurrent) against a configurable baseline (baselineTraffic) of expected traffic in 24 hours.
+ * The baseline is multiplied by (thresholdMultiplier) to determine the threshold.
+ * sumCurrent excludes records in allowlist ranges (surge windows determined by engineers).
+ *
+ * Returns DELAYED_PROCESSING if sumCurrent >= thresholdMultiplier * baselineTraffic, indicating abnormal traffic spike.
+ */
+public class OptOutTrafficCalculator {
+ private static final Logger LOGGER = LoggerFactory.getLogger(OptOutTrafficCalculator.class);
+
+ private static final int HOURS_24 = 24 * 3600; // 24 hours in seconds
+
+ private final Map deltaFileCache = new ConcurrentHashMap<>();
+ private final ICloudStorage cloudStorage;
+ private final String s3DeltaPrefix; // (e.g. "optout-v2/delta/")
+ private final String trafficCalcConfigPath;
+ private int baselineTraffic;
+ private int thresholdMultiplier;
+ private int evaluationWindowSeconds;
+ private List> allowlistRanges;
+
+ public enum TrafficStatus {
+ DELAYED_PROCESSING,
+ DEFAULT
+ }
+
+ /**
+ * Cache entry for a delta file containing all record timestamps.
+ *
+ * Memory usage: ~8 bytes per timestamp (long)
+ * 1GB of memory can store ~130 million timestamps (1024^3)/8
+ */
+ private static class FileRecordCache {
+ final List timestamps; // All non-sentinel record timestamps
+ final long newestTimestamp; // evict delta from cache based on oldest record timestamp
+
+ FileRecordCache(List timestamps) {
+ this.timestamps = timestamps;
+ this.newestTimestamp = timestamps.isEmpty() ? 0 : Collections.max(timestamps);
+ }
+ }
+
+ /**
+ * Exception thrown by malformed traffic calculator config
+ */
+ public static class MalformedTrafficCalcConfigException extends Exception {
+ public MalformedTrafficCalcConfigException(String message) {
+ super(message);
+ }
+ }
+
+ /**
+ * Constructor for OptOutTrafficCalculator
+ *
+ * @param cloudStorage Cloud storage for reading delta files
+ * @param s3DeltaPrefix S3 prefix for delta files
+ * @param trafficCalcConfigS3Path S3 path for traffic calc config
+ */
+ public OptOutTrafficCalculator(ICloudStorage cloudStorage, String s3DeltaPrefix, String trafficCalcConfigPath) throws MalformedTrafficCalcConfigException {
+ this.cloudStorage = cloudStorage;
+ this.s3DeltaPrefix = s3DeltaPrefix;
+ this.trafficCalcConfigPath = trafficCalcConfigPath;
+ reloadTrafficCalcConfig(); // Load ConfigMap
+
+ LOGGER.info("OptOutTrafficCalculator initialized: s3DeltaPrefix={}, threshold={}x",
+ s3DeltaPrefix, thresholdMultiplier);
+ }
+
+ /**
+ * Reload traffic calc config from ConfigMap.
+ * Expected format:
+ * {
+ * "traffic_calc_evaluation_window_seconds": 86400,
+ * "traffic_calc_baseline_traffic": 100,
+ * "traffic_calc_threshold_multiplier": 5,
+ * "traffic_calc_allowlist_ranges": [
+ * [startTimestamp1, endTimestamp1],
+ * [startTimestamp2, endTimestamp2]
+ * ],
+ * }
+ *
+ * Can be called periodically to pick up config changes without restarting.
+ */
+ public void reloadTrafficCalcConfig() throws MalformedTrafficCalcConfigException {
+ LOGGER.info("Loading traffic calc config from ConfigMap");
+ try (InputStream is = Files.newInputStream(Paths.get(trafficCalcConfigPath))) {
+ String content = new String(is.readAllBytes(), StandardCharsets.UTF_8);
+ JsonObject trafficCalcConfig = new JsonObject(content);
+
+ // Validate required fields exist
+ if (!trafficCalcConfig.containsKey(Const.Config.OptOutTrafficCalcEvaluationWindowSecondsProp)) {
+ throw new MalformedTrafficCalcConfigException("Missing required field: traffic_calc_evaluation_window_seconds");
+ }
+ if (!trafficCalcConfig.containsKey(Const.Config.OptOutTrafficCalcBaselineTrafficProp)) {
+ throw new MalformedTrafficCalcConfigException("Missing required field: traffic_calc_baseline_traffic");
+ }
+ if (!trafficCalcConfig.containsKey(Const.Config.OptOutTrafficCalcThresholdMultiplierProp)) {
+ throw new MalformedTrafficCalcConfigException("Missing required field: traffic_calc_threshold_multiplier");
+ }
+ if (!trafficCalcConfig.containsKey(Const.Config.OptOutTrafficCalcAllowlistRangesProp)) {
+ throw new MalformedTrafficCalcConfigException("Missing required field: traffic_calc_allowlist_ranges");
+ }
+
+ this.evaluationWindowSeconds = trafficCalcConfig.getInteger(Const.Config.OptOutTrafficCalcEvaluationWindowSecondsProp);
+ this.baselineTraffic = trafficCalcConfig.getInteger(Const.Config.OptOutTrafficCalcBaselineTrafficProp);
+ this.thresholdMultiplier = trafficCalcConfig.getInteger(Const.Config.OptOutTrafficCalcThresholdMultiplierProp);
+
+ List> ranges = parseAllowlistRanges(trafficCalcConfig);
+ this.allowlistRanges = ranges;
+
+ LOGGER.info("Successfully loaded traffic calc config from ConfigMap: evaluationWindowSeconds={}, baselineTraffic={}, thresholdMultiplier={}, allowlistRanges={}",
+ this.evaluationWindowSeconds, this.baselineTraffic, this.thresholdMultiplier, ranges.size());
+
+ } catch (MalformedTrafficCalcConfigException e) {
+ LOGGER.warn("Failed to load traffic calc config. Config is malformed: {}", trafficCalcConfigPath, e);
+ throw e;
+ } catch (Exception e) {
+ LOGGER.warn("Failed to load traffic calc config. Config is malformed or missing: {}", trafficCalcConfigPath, e);
+ throw new MalformedTrafficCalcConfigException("Failed to load traffic calc config: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Parse allowlist ranges from JSON config
+ */
+ List> parseAllowlistRanges(JsonObject config) throws MalformedTrafficCalcConfigException {
+ List> ranges = new ArrayList<>();
+
+ try {
+ var rangesArray = config.getJsonArray(Const.Config.OptOutTrafficCalcAllowlistRangesProp);
+ if (rangesArray != null) {
+ for (int i = 0; i < rangesArray.size(); i++) {
+ var rangeArray = rangesArray.getJsonArray(i);
+ if (rangeArray != null && rangeArray.size() >= 2) {
+ long start = rangeArray.getLong(0);
+ long end = rangeArray.getLong(1);
+
+ if(start >= end) {
+ LOGGER.error("Invalid allowlist range: start must be less than end: [{}, {}]", start, end);
+ throw new MalformedTrafficCalcConfigException("Invalid allowlist range at index " + i + ": start must be less than end");
+ }
+
+ if (end - start > 86400) {
+ LOGGER.error("Invalid allowlist range: range must be less than 24 hours: [{}, {}]", start, end);
+ throw new MalformedTrafficCalcConfigException("Invalid allowlist range at index " + i + ": range must be less than 24 hours");
+ }
+
+ List range = Arrays.asList(start, end);
+ ranges.add(range);
+ LOGGER.info("Loaded allowlist range: [{}, {}]", start, end);
+ }
+ }
+ }
+
+ ranges.sort(Comparator.comparing(range -> range.get(0)));
+
+ // Validate no overlapping ranges
+ for (int i = 0; i < ranges.size() - 1; i++) {
+ long currentEnd = ranges.get(i).get(1);
+ long nextStart = ranges.get(i + 1).get(0);
+ if (currentEnd >= nextStart) {
+ LOGGER.error("Overlapping allowlist ranges detected: [{}, {}] overlaps with [{}, {}]",
+ ranges.get(i).get(0), currentEnd, nextStart, ranges.get(i + 1).get(1));
+ throw new MalformedTrafficCalcConfigException(
+ "Overlapping allowlist ranges detected at indices " + i + " and " + (i + 1));
+ }
+ }
+
+ } catch (MalformedTrafficCalcConfigException e) {
+ throw e;
+ } catch (Exception e) {
+ LOGGER.error("Failed to parse allowlist ranges", e);
+ throw new MalformedTrafficCalcConfigException("Failed to parse allowlist ranges: " + e.getMessage());
+ }
+
+ return ranges;
+ }
+
+ /**
+ * Calculate traffic status based on delta files and SQS queue messages.
+ *
+ * Uses the newest delta file timestamp to anchor the 24-hour delta traffic window,
+ * and the oldest queue timestamp to anchor the 5-minute queue window.
+ *
+ * @param sqsMessages List of SQS messages
+ * @return TrafficStatus (DELAYED_PROCESSING or DEFAULT)
+ */
+ public TrafficStatus calculateStatus(List sqsMessages) {
+
+ try {
+ // Get list of delta files from S3 (sorted newest to oldest)
+ List deltaS3Paths = listDeltaFiles();
+
+ if (deltaS3Paths.isEmpty()) {
+ LOGGER.warn("No delta files found in S3 with prefix: {}", s3DeltaPrefix);
+ return TrafficStatus.DEFAULT;
+ }
+
+ // Find newest delta file timestamp for delta traffic window
+ long newestDeltaTs = findNewestDeltaTimestamp(deltaS3Paths);
+ LOGGER.info("Traffic calculation: newestDeltaTs={}", newestDeltaTs);
+
+ // Find oldest SQS queue message timestamp for queue window
+ long oldestQueueTs = findOldestQueueTimestamp(sqsMessages);
+ LOGGER.info("Traffic calculation: oldestQueueTs={}", oldestQueueTs);
+
+ // Define start time of the delta evaluation window
+ // We need evaluationWindowSeconds of non-allowlisted time, so we iteratively extend
+ // the window to account for any allowlist ranges in the extended portion
+ long deltaWindowStart = calculateWindowStartWithAllowlist(newestDeltaTs, this.evaluationWindowSeconds);
+
+ // Evict old cache entries (older than delta window start)
+ evictOldCacheEntries(deltaWindowStart);
+
+ // Process delta files and count records in [deltaWindowStart, newestDeltaTs]
+ int sum = 0;
+
+ for (String s3Path : deltaS3Paths) {
+ List timestamps = getTimestampsFromFile(s3Path);
+
+ boolean shouldStop = false;
+ for (long ts : timestamps) {
+ // Stop condition: record is older than our window
+ if (ts < deltaWindowStart) {
+ LOGGER.debug("Stopping delta file processing at timestamp {} (older than window start {})", ts, deltaWindowStart);
+ break;
+ }
+
+ // skip records in allowlisted ranges
+ if (isInAllowlist(ts)) {
+ continue;
+ }
+
+ // increment sum if record is in delta window
+ if (ts >= deltaWindowStart) {
+ sum++;
+ }
+
+ }
+
+ if (shouldStop) {
+ break;
+ }
+ }
+
+ // Count SQS messages in [oldestQueueTs, oldestQueueTs + 5m]
+ if (sqsMessages != null && !sqsMessages.isEmpty()) {
+ int sqsCount = countSqsMessages(sqsMessages, oldestQueueTs);
+ sum += sqsCount;
+ }
+
+ // Determine status
+ TrafficStatus status = determineStatus(sum, this.baselineTraffic);
+
+ LOGGER.info("Traffic calculation complete: sum={}, baselineTraffic={}, thresholdMultiplier={}, status={}",
+ sum, this.baselineTraffic, this.thresholdMultiplier, status);
+
+ return status;
+
+ } catch (Exception e) {
+ LOGGER.error("Error calculating traffic status", e);
+ return TrafficStatus.DEFAULT;
+ }
+ }
+
+ /**
+ * Find the newest timestamp from delta files.
+ * Reads the newest delta file and returns its maximum timestamp.
+ */
+ private long findNewestDeltaTimestamp(List deltaS3Paths) throws IOException {
+ if (deltaS3Paths == null || deltaS3Paths.isEmpty()) {
+ return System.currentTimeMillis() / 1000;
+ }
+
+ // Delta files are sorted (ISO 8601 format, lexicographically sortable) so first file is newest
+ String newestDeltaPath = deltaS3Paths.get(0);
+ List timestamps = getTimestampsFromFile(newestDeltaPath);
+
+ if (timestamps.isEmpty()) {
+ LOGGER.warn("Newest delta file has no timestamps: {}", newestDeltaPath);
+ return System.currentTimeMillis() / 1000;
+ }
+
+ long newestTs = Collections.max(timestamps);
+ LOGGER.debug("Found newest delta timestamp {} from file {}", newestTs, newestDeltaPath);
+ return newestTs;
+ }
+
+ /**
+ * List all delta files from S3, sorted newest to oldest
+ */
+ private List listDeltaFiles() {
+ try {
+ // List all objects with the delta prefix
+ List allFiles = cloudStorage.list(s3DeltaPrefix);
+
+ // Filter to only .dat delta files and sort newest to oldest
+ return allFiles.stream()
+ .filter(OptOutUtils::isDeltaFile)
+ .sorted(OptOutUtils.DeltaFilenameComparatorDescending)
+ .collect(Collectors.toList());
+
+ } catch (Exception e) {
+ LOGGER.error("Failed to list delta files from S3 with prefix: {}", s3DeltaPrefix, e);
+ return Collections.emptyList();
+ }
+ }
+
+ /**
+ * Get timestamps from a delta file (S3 path), using cache if available
+ */
+ private List getTimestampsFromFile(String s3Path) throws IOException {
+ // Extract filename from S3 path for cache key
+ String filename = s3Path.substring(s3Path.lastIndexOf('/') + 1);
+
+ // Check cache first
+ FileRecordCache cached = deltaFileCache.get(filename);
+ if (cached != null) {
+ LOGGER.debug("Using cached timestamps for file: {}", filename);
+ return cached.timestamps;
+ }
+
+ // Cache miss - download from S3
+ LOGGER.debug("Downloading and reading timestamps from S3: {}", s3Path);
+ List timestamps = readTimestampsFromS3(s3Path);
+
+ // Store in cache
+ deltaFileCache.put(filename, new FileRecordCache(timestamps));
+
+ return timestamps;
+ }
+
+ /**
+ * Read all non-sentinel record timestamps from a delta file in S3
+ */
+ private List readTimestampsFromS3(String s3Path) throws IOException {
+ try (InputStream is = cloudStorage.download(s3Path)) {
+ byte[] data = is.readAllBytes();
+ OptOutCollection collection = new OptOutCollection(data);
+
+ List timestamps = new ArrayList<>();
+ for (int i = 0; i < collection.size(); i++) {
+ OptOutEntry entry = collection.get(i);
+
+ // Skip sentinel entries
+ if (entry.isSpecialHash()) {
+ continue;
+ }
+
+ timestamps.add(entry.timestamp);
+ }
+
+ return timestamps;
+ } catch (Exception e) {
+ LOGGER.error("Failed to read delta file from S3: {}", s3Path, e);
+ throw new IOException("Failed to read delta file from S3: " + s3Path, e);
+ }
+ }
+
+ /**
+ * Calculate total duration of allowlist ranges that overlap with the given time window.
+ */
+ long getAllowlistDuration(long t, long windowStart) {
+ long totalDuration = 0;
+ for (List range : this.allowlistRanges) {
+ long start = range.get(0);
+ long end = range.get(1);
+
+ // Clip range to window boundaries
+ if (start < windowStart) {
+ start = windowStart;
+ }
+ if (end > t) {
+ end = t;
+ }
+
+ // Only add duration if there's actual overlap (start < end)
+ if (start < end) {
+ totalDuration += end - start;
+ }
+ }
+ return totalDuration;
+ }
+
+ /**
+ * Calculate the window start time that provides evaluationWindowSeconds of non-allowlisted time.
+ * Iteratively extends the window to account for allowlist ranges that may fall in extended portions.
+ */
+ long calculateWindowStartWithAllowlist(long newestDeltaTs, int evaluationWindowSeconds) {
+ long allowlistDuration = getAllowlistDuration(newestDeltaTs, newestDeltaTs - evaluationWindowSeconds);
+
+ // Each iteration discovers at least one new allowlist range, so max iterations = number of ranges
+ int maxIterations = this.allowlistRanges.size() + 1;
+
+ for (int i = 0; i < maxIterations && allowlistDuration > 0; i++) {
+ long newWindowStart = newestDeltaTs - evaluationWindowSeconds - allowlistDuration;
+ long newAllowlistDuration = getAllowlistDuration(newestDeltaTs, newWindowStart);
+
+ if (newAllowlistDuration == allowlistDuration) {
+ // No new allowlist time in extended portion, we've converged
+ break;
+ }
+
+ allowlistDuration = newAllowlistDuration;
+ }
+
+ return newestDeltaTs - evaluationWindowSeconds - allowlistDuration;
+ }
+
+ /**
+ * Find the oldest SQS queue message timestamp
+ */
+ private long findOldestQueueTimestamp(List sqsMessages) throws IOException {
+ long oldest = System.currentTimeMillis() / 1000;
+
+ if (sqsMessages != null && !sqsMessages.isEmpty()) {
+ for (Message msg : sqsMessages) {
+ Long ts = extractTimestampFromMessage(msg);
+ if (ts != null && ts < oldest) {
+ oldest = ts;
+ }
+ }
+ }
+
+ return oldest;
+ }
+
+ /**
+ * Extract timestamp from SQS message (from SentTimestamp attribute)
+ */
+ private Long extractTimestampFromMessage(Message msg) {
+ // Get SentTimestamp attribute (milliseconds)
+ String sentTimestamp = msg.attributes().get(MessageSystemAttributeName.SENT_TIMESTAMP);
+ if (sentTimestamp != null) {
+ try {
+ return Long.parseLong(sentTimestamp) / 1000; // Convert ms to seconds
+ } catch (NumberFormatException e) {
+ LOGGER.debug("Invalid SentTimestamp: {}", sentTimestamp);
+ }
+ }
+
+ // Fallback: use current time
+ return System.currentTimeMillis() / 1000;
+ }
+
+ /**
+ * Count SQS messages from oldestQueueTs to oldestQueueTs + 5 minutes
+ */
+ private int countSqsMessages(List sqsMessages, long oldestQueueTs) {
+
+ int count = 0;
+ long windowEnd = oldestQueueTs + 5 * 60;
+
+ for (Message msg : sqsMessages) {
+ Long ts = extractTimestampFromMessage(msg);
+
+ if (ts < oldestQueueTs || ts > windowEnd) {
+ continue;
+ }
+
+ if (isInAllowlist(ts)) {
+ continue;
+ }
+ count++;
+
+ }
+
+ LOGGER.info("SQS messages: {} in window [oldestQueueTs={}, oldestQueueTs+5m={}]", count, oldestQueueTs, windowEnd);
+ return count;
+ }
+
+ /**
+ * Check if a timestamp falls within any allowlist range
+ */
+ boolean isInAllowlist(long timestamp) {
+ if (allowlistRanges == null || allowlistRanges.isEmpty()) {
+ return false;
+ }
+
+ for (List range : allowlistRanges) {
+ if (range.size() < 2) {
+ continue;
+ }
+
+ long start = range.get(0);
+ long end = range.get(1);
+
+ if (timestamp >= start && timestamp <= end) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Evict cache entries with data older than the cutoff timestamp
+ */
+ private void evictOldCacheEntries(long cutoffTimestamp) {
+ int beforeSize = deltaFileCache.size();
+
+ deltaFileCache.entrySet().removeIf(entry ->
+ entry.getValue().newestTimestamp < cutoffTimestamp
+ );
+
+ int afterSize = deltaFileCache.size();
+ if (beforeSize != afterSize) {
+ LOGGER.info("Evicted {} old cache entries (before={}, after={})",
+ beforeSize - afterSize, beforeSize, afterSize);
+ }
+ }
+
+ /**
+ * Determine traffic status based on current vs past counts
+ */
+ TrafficStatus determineStatus(int sumCurrent, int baselineTraffic) {
+ if (baselineTraffic == 0 || thresholdMultiplier == 0) {
+ // Avoid division by zero - if no baseline traffic, return DEFAULT status
+ LOGGER.warn("baselineTraffic is 0 or thresholdMultiplier is 0 returning DEFAULT status.");
+ return TrafficStatus.DEFAULT;
+ }
+
+ if (sumCurrent >= thresholdMultiplier * baselineTraffic) {
+ LOGGER.warn("DELAYED_PROCESSING threshold breached: sumCurrent={} >= {}×baselineTraffic={}",
+ sumCurrent, thresholdMultiplier, baselineTraffic);
+ return TrafficStatus.DELAYED_PROCESSING;
+ }
+
+ LOGGER.info("Traffic within normal range: sumCurrent={} < {}×baselineTraffic={}",
+ sumCurrent, thresholdMultiplier, baselineTraffic);
+ return TrafficStatus.DEFAULT;
+ }
+
+ /**
+ * Get cache statistics for monitoring
+ */
+ public Map getCacheStats() {
+ Map stats = new HashMap<>();
+ stats.put("cached_files", deltaFileCache.size());
+
+ int totalTimestamps = deltaFileCache.values().stream()
+ .mapToInt(cache -> cache.timestamps.size())
+ .sum();
+ stats.put("total_cached_timestamps", totalTimestamps);
+
+ return stats;
+ }
+
+}
diff --git a/src/main/java/com/uid2/optout/vertx/OptOutTrafficFilter.java b/src/main/java/com/uid2/optout/vertx/OptOutTrafficFilter.java
new file mode 100644
index 0000000..e8bd04b
--- /dev/null
+++ b/src/main/java/com/uid2/optout/vertx/OptOutTrafficFilter.java
@@ -0,0 +1,172 @@
+package com.uid2.optout.vertx;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Collections;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.charset.StandardCharsets;
+import io.vertx.core.json.JsonObject;
+import io.vertx.core.json.JsonArray;
+
+public class OptOutTrafficFilter {
+ private static final Logger LOGGER = LoggerFactory.getLogger(OptOutTrafficFilter.class);
+
+ private final String trafficFilterConfigPath;
+ List filterRules;
+
+ /**
+ * Traffic filter rule defining a time range and a list of IP addresses to exclude
+ */
+ private static class TrafficFilterRule {
+ private final List range;
+ private final List ipAddresses;
+
+ TrafficFilterRule(List range, List ipAddresses) {
+ this.range = range;
+ this.ipAddresses = ipAddresses;
+ }
+
+ public long getRangeStart() {
+ return range.get(0);
+ }
+ public long getRangeEnd() {
+ return range.get(1);
+ }
+ public List getIpAddresses() {
+ return ipAddresses;
+ }
+ }
+
+ public static class MalformedTrafficFilterConfigException extends Exception {
+ public MalformedTrafficFilterConfigException(String message) {
+ super(message);
+ }
+ }
+
+ /**
+ * Constructor for OptOutTrafficFilter
+ *
+ * @param trafficFilterConfigPath S3 path for traffic filter config
+ * @throws MalformedTrafficFilterConfigException if the traffic filter config is invalid
+ */
+ public OptOutTrafficFilter(String trafficFilterConfigPath) throws MalformedTrafficFilterConfigException {
+ this.trafficFilterConfigPath = trafficFilterConfigPath;
+ // Initial filter rules load
+ this.filterRules = Collections.emptyList(); // start empty
+ reloadTrafficFilterConfig(); // load ConfigMap
+
+ LOGGER.info("OptOutTrafficFilter initialized: filterRules={}",
+ filterRules.size());
+ }
+
+ /**
+ * Reload traffic filter config from ConfigMap.
+ * Expected format:
+ * {
+ * "denylist_requests": [
+ * {range: [startTimestamp, endTimestamp], IPs: ["ip1"]},
+ * {range: [startTimestamp, endTimestamp], IPs: ["ip1", "ip2"]},
+ * {range: [startTimestamp, endTimestamp], IPs: ["ip1", "ip3"]},
+ * ]
+ * }
+ *
+ * Can be called periodically to pick up config changes without restarting.
+ */
+ public void reloadTrafficFilterConfig() throws MalformedTrafficFilterConfigException {
+ LOGGER.info("Loading traffic filter config from ConfigMap");
+ try (InputStream is = Files.newInputStream(Paths.get(trafficFilterConfigPath))) {
+ String content = new String(is.readAllBytes(), StandardCharsets.UTF_8);
+ JsonObject filterConfigJson = new JsonObject(content);
+
+ this.filterRules = parseFilterRules(filterConfigJson);
+
+ LOGGER.info("Successfully loaded traffic filter config from ConfigMap: filterRules={}",
+ filterRules.size());
+
+ } catch (Exception e) {
+ LOGGER.warn("No traffic filter config found at: {}", trafficFilterConfigPath, e);
+ throw new MalformedTrafficFilterConfigException(e.getMessage());
+ }
+ }
+
+ /**
+ * Parse request filtering rules from JSON config
+ */
+ List parseFilterRules(JsonObject config) throws MalformedTrafficFilterConfigException {
+ List rules = new ArrayList<>();
+ try {
+ JsonArray denylistRequests = config.getJsonArray("denylist_requests");
+ if (denylistRequests == null) {
+ LOGGER.error("Invalid traffic filter config: denylist_requests is null");
+ throw new MalformedTrafficFilterConfigException("Invalid traffic filter config: denylist_requests is null");
+ }
+ for (int i = 0; i < denylistRequests.size(); i++) {
+ JsonObject ruleJson = denylistRequests.getJsonObject(i);
+
+ // parse range
+ var rangeJson = ruleJson.getJsonArray("range");
+ List range = new ArrayList<>();
+ if (rangeJson != null && rangeJson.size() == 2) {
+ long start = rangeJson.getLong(0);
+ long end = rangeJson.getLong(1);
+
+ if (start >= end) {
+ LOGGER.error("Invalid traffic filter rule: range start must be less than end: {}", ruleJson.encode());
+ throw new MalformedTrafficFilterConfigException("Invalid traffic filter rule: range start must be less than end");
+ }
+ range.add(start);
+ range.add(end);
+ }
+
+ // parse IPs
+ var ipAddressesJson = ruleJson.getJsonArray("IPs");
+ List ipAddresses = new ArrayList<>();
+ if (ipAddressesJson != null) {
+ for (int j = 0; j < ipAddressesJson.size(); j++) {
+ ipAddresses.add(ipAddressesJson.getString(j));
+ }
+ }
+
+ // log error and throw exception if rule is invalid
+ if (range.size() != 2 || ipAddresses.size() == 0 || range.get(1) - range.get(0) > 86400) { // range must be 24 hours or less
+ LOGGER.error("Invalid traffic filter rule, range must be 24 hours or less: {}", ruleJson.encode());
+ throw new MalformedTrafficFilterConfigException("Invalid traffic filter rule, range must be 24 hours or less");
+ }
+
+ TrafficFilterRule rule = new TrafficFilterRule(range, ipAddresses);
+
+ LOGGER.info("Loaded traffic filter rule: range=[{}, {}], IPs={}", rule.getRangeStart(), rule.getRangeEnd(), rule.getIpAddresses());
+ rules.add(rule);
+ }
+ return rules;
+ } catch (Exception e) {
+ LOGGER.error("Failed to parse traffic filter rules: config={}, error={}", config.encode(), e.getMessage());
+ throw new MalformedTrafficFilterConfigException(e.getMessage());
+ }
+ }
+
+ public boolean isDenylisted(SqsParsedMessage message) {
+ long timestamp = message.getTimestamp();
+ String clientIp = message.getClientIp();
+
+ if (clientIp == null || clientIp.isEmpty()) {
+ LOGGER.error("Request does not contain client IP, timestamp={}", timestamp);
+ return false;
+ }
+
+ for (TrafficFilterRule rule : filterRules) {
+ if(timestamp >= rule.getRangeStart() && timestamp <= rule.getRangeEnd()) {
+ if(rule.getIpAddresses().contains(clientIp)) {
+ return true;
+ }
+ };
+ }
+ return false;
+ }
+
+}
\ No newline at end of file
diff --git a/src/test/java/com/uid2/optout/vertx/OptOutTrafficCalculatorTest.java b/src/test/java/com/uid2/optout/vertx/OptOutTrafficCalculatorTest.java
new file mode 100644
index 0000000..f977233
--- /dev/null
+++ b/src/test/java/com/uid2/optout/vertx/OptOutTrafficCalculatorTest.java
@@ -0,0 +1,1510 @@
+package com.uid2.optout.vertx;
+
+import com.uid2.shared.cloud.CloudStorageException;
+import com.uid2.shared.cloud.ICloudStorage;
+import com.uid2.shared.optout.OptOutCollection;
+import com.uid2.shared.optout.OptOutEntry;
+import com.uid2.optout.Const;
+import io.vertx.core.json.JsonArray;
+import io.vertx.core.json.JsonObject;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+import software.amazon.awssdk.services.sqs.model.Message;
+import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;
+
+import com.uid2.optout.vertx.OptOutTrafficCalculator.MalformedTrafficCalcConfigException;
+import java.io.ByteArrayInputStream;
+import java.util.*;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
+public class OptOutTrafficCalculatorTest {
+
+ @Mock
+ private ICloudStorage cloudStorage;
+
+ private static final String S3_DELTA_PREFIX = "optout-v2/delta/";
+ private static final String TRAFFIC_CONFIG_PATH = "./traffic-config.json";
+ private static final int BASELINE_TRAFFIC = 100;
+ private static final int THRESHOLD_MULTIPLIER = 5;
+ private static final int EVALUATION_WINDOW_SECONDS = 24 * 3600;
+
+ @BeforeEach
+ void setUp() {
+ // default config
+ JsonObject config = new JsonObject();
+ config.put(Const.Config.OptOutTrafficCalcBaselineTrafficProp, BASELINE_TRAFFIC);
+ config.put(Const.Config.OptOutTrafficCalcThresholdMultiplierProp, THRESHOLD_MULTIPLIER);
+ config.put(Const.Config.OptOutTrafficCalcEvaluationWindowSecondsProp, EVALUATION_WINDOW_SECONDS);
+ config.put(Const.Config.OptOutTrafficCalcAllowlistRangesProp, new JsonArray());
+ try {
+ createTrafficConfigFile(config.toString());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @AfterEach
+ void tearDown() {
+ if (Files.exists(Path.of(TRAFFIC_CONFIG_PATH))) {
+ try {
+ Files.delete(Path.of(TRAFFIC_CONFIG_PATH));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private void createTrafficConfigFile(String content) {
+ try {
+ Path configPath = Path.of(TRAFFIC_CONFIG_PATH);
+ Files.writeString(configPath, content);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Helper to create config by merging partial JSON with defaults
+ */
+ private void createConfigFromPartialJson(String partialJson) {
+ JsonObject partial = new JsonObject(partialJson);
+ JsonObject config = new JsonObject();
+
+ // Set defaults
+ if (!partial.containsKey(Const.Config.OptOutTrafficCalcBaselineTrafficProp)) {
+ config.put(Const.Config.OptOutTrafficCalcBaselineTrafficProp, BASELINE_TRAFFIC);
+ }
+ if (!partial.containsKey(Const.Config.OptOutTrafficCalcThresholdMultiplierProp)) {
+ config.put(Const.Config.OptOutTrafficCalcThresholdMultiplierProp, THRESHOLD_MULTIPLIER);
+ }
+ if (!partial.containsKey(Const.Config.OptOutTrafficCalcEvaluationWindowSecondsProp)) {
+ config.put(Const.Config.OptOutTrafficCalcEvaluationWindowSecondsProp, EVALUATION_WINDOW_SECONDS);
+ }
+ if (!partial.containsKey(Const.Config.OptOutTrafficCalcAllowlistRangesProp)) {
+ config.put(Const.Config.OptOutTrafficCalcAllowlistRangesProp, new JsonArray());
+ }
+
+ // Merge in partial config (overrides defaults)
+ partial.forEach(entry -> config.put(entry.getKey(), entry.getValue()));
+
+ createTrafficConfigFile(config.toString());
+ }
+
+ /**
+ * Helper to create config with custom threshold
+ */
+ private void createConfigWithThreshold(int threshold) {
+ createConfigFromPartialJson("{\"" + Const.Config.OptOutTrafficCalcThresholdMultiplierProp + "\": " + threshold + "}");
+ }
+
+ // ============================================================================
+ // SECTION 1: Constructor & Initialization Tests
+ // ============================================================================
+
+ @Test
+ void testConstructor_defaultThreshold() throws Exception {
+ // Setup - default threshold of 5
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Assert - DEFAULT when below threshold, DELAYED_PROCESSING when above threshold
+ OptOutTrafficCalculator.TrafficStatus status = calculator.determineStatus(10, 3);
+ assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, status); // 10 < 5*3
+
+ status = calculator.determineStatus(15, 3);
+ assertEquals(OptOutTrafficCalculator.TrafficStatus.DELAYED_PROCESSING, status); // 15 >= 5*3
+ }
+
+ @Test
+ void testConstructor_customThreshold() throws Exception {
+ // Setup - custom threshold of 10
+ createConfigWithThreshold(10);
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Assert - DEFAULT when below threshold, DELAYED_PROCESSING when above threshold
+ OptOutTrafficCalculator.TrafficStatus status = calculator.determineStatus(49, 5);
+ assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, status); // 49 < 10*5
+ status = calculator.determineStatus(50, 5);
+ assertEquals(OptOutTrafficCalculator.TrafficStatus.DELAYED_PROCESSING, status); // 50 >= 10*5
+ }
+
+ @Test
+ void testConstructor_trafficCalcConfigLoadFailure() throws Exception {
+ // Setup - traffic calc config load failure
+ createTrafficConfigFile("Invalid JSON");
+ assertThrows(MalformedTrafficCalcConfigException.class, () -> {
+ new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+ });
+
+ // Create valid config to test reload failure
+ createConfigFromPartialJson("{}");
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ createTrafficConfigFile("Invalid JSON");
+ assertThrows(MalformedTrafficCalcConfigException.class, () -> {
+ calculator.reloadTrafficCalcConfig();
+ });
+ }
+
+ // ============================================================================
+ // SECTION 2: parseTrafficCalcConfigRanges()
+ // ============================================================================
+
+ @Test
+ void testParseTrafficCalcConfigRanges_emptyConfig() throws Exception {
+ // Setup - no config
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+ JsonObject emptyConfig = new JsonObject();
+
+ // Act
+ List> ranges = calculator.parseAllowlistRanges(emptyConfig);
+
+ // Assert - empty ranges
+ assertTrue(ranges.isEmpty());
+ }
+
+ @Test
+ void testParseTrafficCalcConfigRanges_singleRange() throws Exception {
+ // Setup - single range
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ JsonObject configWithRanges = new JsonObject();
+ JsonArray ranges = new JsonArray()
+ .add(new JsonArray().add(1000L).add(2000L));
+ configWithRanges.put("traffic_calc_allowlist_ranges", ranges);
+
+ // Act
+ List> result = calculator.parseAllowlistRanges(configWithRanges);
+
+ // Assert - single range
+ assertEquals(1, result.size());
+ assertEquals(1000L, result.get(0).get(0));
+ assertEquals(2000L, result.get(0).get(1));
+ }
+
+ @Test
+ void testParseTrafficCalcConfigRanges_multipleRanges() throws Exception {
+ // Setup - multiple ranges
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ JsonObject configWithRanges = new JsonObject();
+ JsonArray ranges = new JsonArray()
+ .add(new JsonArray().add(1000L).add(2000L))
+ .add(new JsonArray().add(3000L).add(4000L))
+ .add(new JsonArray().add(5000L).add(6000L));
+ configWithRanges.put("traffic_calc_allowlist_ranges", ranges);
+
+ // Act
+ List> result = calculator.parseAllowlistRanges(configWithRanges);
+
+ // Assert - multiple ranges
+ assertEquals(3, result.size());
+ assertEquals(1000L, result.get(0).get(0));
+ assertEquals(3000L, result.get(1).get(0));
+ assertEquals(5000L, result.get(2).get(0));
+ }
+
+ @Test
+ void testParseTrafficCalcConfigRanges_misorderedRange() throws Exception {
+ // Setup - range with end < start is malformed
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ JsonObject configWithRanges = new JsonObject();
+ JsonArray ranges = new JsonArray()
+ .add(new JsonArray().add(2000L).add(1000L)); // End before start
+ configWithRanges.put("traffic_calc_allowlist_ranges", ranges);
+
+ // Act
+ assertThrows(MalformedTrafficCalcConfigException.class, () -> {
+ calculator.parseAllowlistRanges(configWithRanges);
+ });
+ }
+
+ @Test
+ void testParseTrafficCalcConfigRanges_rangeTooLong() throws Exception {
+ // Setup - range longer than 24 hours is malformed
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ JsonObject configWithRanges = new JsonObject();
+ JsonArray ranges = new JsonArray()
+ .add(new JsonArray().add(2000L).add(200000L)); // Longer than 24 hours
+ configWithRanges.put("traffic_calc_allowlist_ranges", ranges);
+
+ // Act
+ assertThrows(MalformedTrafficCalcConfigException.class, () -> {
+ calculator.parseAllowlistRanges(configWithRanges);
+ });
+ }
+
+ @Test
+ void testParseTrafficCalcConfigRanges_sortsByStartTime() throws Exception {
+ // Setup - ranges added out of order
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ JsonObject configWithRanges = new JsonObject();
+ JsonArray ranges = new JsonArray()
+ .add(new JsonArray().add(5000L).add(6000L))
+ .add(new JsonArray().add(1000L).add(2000L))
+ .add(new JsonArray().add(3000L).add(4000L));
+ configWithRanges.put("traffic_calc_allowlist_ranges", ranges);
+
+ // Act
+ List> result = calculator.parseAllowlistRanges(configWithRanges);
+
+ // Assert - should be sorted by start time
+ assertEquals(3, result.size());
+ assertEquals(1000L, result.get(0).get(0));
+ assertEquals(3000L, result.get(1).get(0));
+ assertEquals(5000L, result.get(2).get(0));
+ }
+
+ @Test
+ void testParseTrafficCalcConfigRanges_invalidRangeTooFewElements() throws Exception {
+ // Setup - invalid range with only 1 element;
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ JsonObject configWithRanges = new JsonObject();
+ JsonArray ranges = new JsonArray()
+ .add(new JsonArray().add(1000L)) // Only 1 element
+ .add(new JsonArray().add(2000L).add(3000L)); // Valid
+ configWithRanges.put("traffic_calc_allowlist_ranges", ranges);
+
+ // Act
+ List> result = calculator.parseAllowlistRanges(configWithRanges);
+
+ // Assert - should skip invalid range
+ assertEquals(1, result.size());
+ assertEquals(2000L, result.get(0).get(0));
+ }
+
+ @Test
+ void testParseTrafficCalcConfigRanges_nullArray() throws Exception {
+ // Setup - null array
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ JsonObject configWithRanges = new JsonObject();
+ configWithRanges.put("traffic_calc_allowlist_ranges", (JsonArray) null);
+
+ // Act
+ List> result = calculator.parseAllowlistRanges(configWithRanges);
+
+ // Assert - empty ranges
+ assertTrue(result.isEmpty());
+ }
+
+ @Test
+ void testParseTrafficCalcConfigRanges_overlappingRanges() throws Exception {
+ // Setup - overlapping ranges
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ JsonObject configWithRanges = new JsonObject();
+ JsonArray ranges = new JsonArray()
+ .add(new JsonArray().add(1000L).add(2000L))
+ .add(new JsonArray().add(1500L).add(2500L)); // Overlaps with first range
+ configWithRanges.put("traffic_calc_allowlist_ranges", ranges);
+
+ // Act & Assert - should throw exception due to overlap
+ assertThrows(MalformedTrafficCalcConfigException.class, () -> {
+ calculator.parseAllowlistRanges(configWithRanges);
+ });
+ }
+
+ @Test
+ void testParseTrafficCalcConfigRanges_adjacentRangesWithSameBoundary() throws Exception {
+ // Setup - ranges where end of first equals start of second (touching but not overlapping semantically, but we treat as overlap)
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ JsonObject configWithRanges = new JsonObject();
+ JsonArray ranges = new JsonArray()
+ .add(new JsonArray().add(1000L).add(2000L))
+ .add(new JsonArray().add(2000L).add(3000L)); // Starts exactly where first ends
+ configWithRanges.put("traffic_calc_allowlist_ranges", ranges);
+
+ // Act & Assert - should throw exception because ranges touch at boundary
+ assertThrows(MalformedTrafficCalcConfigException.class, () -> {
+ calculator.parseAllowlistRanges(configWithRanges);
+ });
+ }
+
+ @Test
+ void testParseTrafficCalcConfigRanges_nonOverlappingRanges() throws Exception {
+ // Setup - ranges that don't overlap (with gap between them)
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ JsonObject configWithRanges = new JsonObject();
+ JsonArray ranges = new JsonArray()
+ .add(new JsonArray().add(1000L).add(2000L))
+ .add(new JsonArray().add(2001L).add(3000L)); // Starts after first ends
+ configWithRanges.put("traffic_calc_allowlist_ranges", ranges);
+
+ // Act
+ List> result = calculator.parseAllowlistRanges(configWithRanges);
+
+ // Assert - should succeed with 2 ranges
+ assertEquals(2, result.size());
+ }
+
+ // ============================================================================
+ // SECTION 3: isInTrafficCalcConfig()
+ // ============================================================================
+
+ @Test
+ void testIsInTrafficCalcConfig_withinSingleRange() throws Exception {
+ // Setup - load traffic calc config with single range [1000, 2000]
+ String trafficCalcConfigJson = """
+ {
+ "traffic_calc_allowlist_ranges": [
+ [1000, 2000]
+ ]
+ }
+ """;
+ createConfigFromPartialJson(trafficCalcConfigJson);
+
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Assert - true when within range
+ assertTrue(calculator.isInAllowlist(1500L));
+ }
+
+ @Test
+ void testIsInTrafficCalcConfig_exactlyAtStart() throws Exception {
+ // Setup - load traffic calc config with single range [1000, 2000]
+ String trafficCalcConfigJson = """
+ {
+ "traffic_calc_allowlist_ranges": [
+ [1000, 2000]
+ ]
+ }
+ """;
+ createConfigFromPartialJson(trafficCalcConfigJson);
+
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Assert - true when exactly at start of range
+ assertTrue(calculator.isInAllowlist(1000L));
+ }
+
+ @Test
+ void testIsInTrafficCalcConfig_exactlyAtEnd() throws Exception {
+ // Setup - load traffic calc config with single range [1000, 2000]
+ String trafficCalcConfigJson = """
+ {
+ "traffic_calc_allowlist_ranges": [
+ [1000, 2000]
+ ]
+ }
+ """;
+ createConfigFromPartialJson(trafficCalcConfigJson);
+
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Assert - true when exactly at end of range
+ assertTrue(calculator.isInAllowlist(2000L));
+ }
+
+ @Test
+ void testIsInTrafficCalcConfig_beforeRange() throws Exception {
+ // Setup - load traffic calc config with single range [1000, 2000]
+ String trafficCalcConfigJson = """
+ {
+ "traffic_calc_allowlist_ranges": [
+ [1000, 2000]
+ ]
+ }
+ """;
+ createConfigFromPartialJson(trafficCalcConfigJson);
+
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Assert - false when before range
+ assertFalse(calculator.isInAllowlist(999L));
+ }
+
+ @Test
+ void testIsInTrafficCalcConfig_afterRange() throws Exception {
+ // Setup - load traffic calc config with single range [1000, 2000]
+ String trafficCalcConfigJson = """
+ {
+ "traffic_calc_allowlist_ranges": [
+ [1000, 2000]
+ ]
+ }
+ """;
+ createConfigFromPartialJson(trafficCalcConfigJson);
+
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Assert - false when after range
+ assertFalse(calculator.isInAllowlist(2001L));
+ }
+
+ @Test
+ void testIsInTrafficCalcConfig_betweenRanges() throws Exception {
+ // Setup - load traffic calc config with two ranges [1000, 2000] and [3000, 4000]
+ String trafficCalcConfigJson = """
+ {
+ "traffic_calc_allowlist_ranges": [
+ [1000, 2000],
+ [3000, 4000]
+ ]
+ }
+ """;
+ createConfigFromPartialJson(trafficCalcConfigJson);
+
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Assert - false when between ranges
+ assertFalse(calculator.isInAllowlist(2500L));
+ }
+
+ @Test
+ void testIsInTrafficCalcConfig_emptyRanges() throws Exception {
+ // Setup uses default config from setUp() which has empty traffic calc config ranges
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Assert - false when empty ranges
+ assertFalse(calculator.isInAllowlist(1500L));
+ }
+
+ @Test
+ void testIsInTrafficCalcConfig_nullRanges() throws Exception {
+ // Setup - no traffic calc config ranges loaded (will fail and set empty)
+ String trafficCalcConfigJson = """
+ {
+ "traffic_calc_allowlist_ranges": null
+ }
+ """;
+ createConfigFromPartialJson(trafficCalcConfigJson);
+
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Assert - false when null/empty ranges
+ assertFalse(calculator.isInAllowlist(1500L));
+ }
+
+ @Test
+ void testIsInTrafficCalcConfig_invalidRangeSize() throws Exception {
+ // Setup - load traffic calc config with invalid range (only 1 element) and valid range
+ String trafficCalcConfigJson = """
+ {
+ "traffic_calc_allowlist_ranges": [
+ [1000],
+ [2000, 3000]
+ ]
+ }
+ """;
+ createConfigFromPartialJson(trafficCalcConfigJson);
+
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Assert
+ assertFalse(calculator.isInAllowlist(1500L)); // Should not match invalid range
+ assertTrue(calculator.isInAllowlist(2500L)); // Should match valid range
+ }
+
+ @Test
+ void testIsInTrafficCalcConfig_multipleRanges() throws Exception {
+ // Setup - load traffic calc config with multiple ranges
+ String trafficCalcConfigJson = """
+ {
+ "traffic_calc_allowlist_ranges": [
+ [1000, 2000],
+ [3000, 4000],
+ [5000, 6000]
+ ]
+ }
+ """;
+ createConfigFromPartialJson(trafficCalcConfigJson);
+
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Assert
+ assertTrue(calculator.isInAllowlist(1500L)); // In first range
+ assertTrue(calculator.isInAllowlist(3500L)); // In second range
+ assertTrue(calculator.isInAllowlist(5500L)); // In third range
+ assertFalse(calculator.isInAllowlist(2500L)); // Between first and second
+ }
+
+ // ============================================================================
+ // SECTION 4: getTrafficCalcConfigDuration()
+ // ============================================================================
+
+ @Test
+ void testGetTrafficCalcConfigDuration_noRanges() throws Exception {
+ // Setup - no ranges
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Assert
+ assertEquals(0L, calculator.getAllowlistDuration(10000L, 5000L)); // 0 duration when no ranges
+ }
+
+ @Test
+ void testGetTrafficCalcConfigDuration_rangeFullyWithinWindow() throws Exception {
+ // Setup - range fully within window
+ String trafficCalcConfigJson = """
+ {
+ "traffic_calc_allowlist_ranges": [
+ [6000, 7000]
+ ]
+ }
+ """;
+ createConfigFromPartialJson(trafficCalcConfigJson);
+
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Act - window [5000, 10000], range [6000, 7000]
+ long duration = calculator.getAllowlistDuration(10000L, 5000L);
+
+ // Assert - full range duration
+ assertEquals(1000L, duration);
+ }
+
+ @Test
+ void testGetTrafficCalcConfigDuration_rangePartiallyOverlapsStart() throws Exception {
+ // Setup - range partially overlaps start of window
+ String trafficCalcConfigJson = """
+ {
+ "traffic_calc_allowlist_ranges": [
+ [3000, 7000]
+ ]
+ }
+ """;
+ createConfigFromPartialJson(trafficCalcConfigJson);
+
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Act - window [5000, 10000], range [3000, 7000]
+ long duration = calculator.getAllowlistDuration(10000L, 5000L);
+
+ // Assert - should clip to [5000, 7000] = 2000
+ assertEquals(2000L, duration);
+ }
+
+ @Test
+ void testGetTrafficCalcConfigDuration_rangePartiallyOverlapsEnd() throws Exception {
+ // Setup - range partially overlaps end of window
+ String trafficCalcConfigJson = """
+ {
+ "traffic_calc_allowlist_ranges": [
+ [8000, 12000]
+ ]
+ }
+ """;
+ createConfigFromPartialJson(trafficCalcConfigJson);
+
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Act - window [5000, 10000], range [8000, 12000]
+ long duration = calculator.getAllowlistDuration(10000L, 5000L);
+
+ // Assert - should clip to [8000, 10000] = 2000
+ assertEquals(2000L, duration);
+ }
+
+ @Test
+ void testGetTrafficCalcConfigDuration_rangeCompletelyOutsideWindow() throws Exception {
+ // Setup - range completely outside window
+ String trafficCalcConfigJson = """
+ {
+ "traffic_calc_allowlist_ranges": [
+ [1000, 2000]
+ ]
+ }
+ """;
+ createConfigFromPartialJson(trafficCalcConfigJson);
+
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Act - window [5000, 10000], range [1000, 2000]
+ long duration = calculator.getAllowlistDuration(10000L, 5000L);
+
+ // Assert - 0 duration when range completely outside window
+ assertEquals(0L, duration);
+ }
+
+ @Test
+ void testGetTrafficCalcConfigDuration_multipleRanges() throws Exception {
+ // Setup - multiple ranges
+ String trafficCalcConfigJson = """
+ {
+ "traffic_calc_allowlist_ranges": [
+ [6000, 7000],
+ [8000, 9000]
+ ]
+ }
+ """;
+ createConfigFromPartialJson(trafficCalcConfigJson);
+
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Act - window [5000, 10000], ranges [6000, 7000] and [8000, 9000]
+ long duration = calculator.getAllowlistDuration(10000L, 5000L);
+
+ // Assert - 1000 + 1000 = 2000
+ assertEquals(2000L, duration);
+ }
+
+ @Test
+ void testGetTrafficCalcConfigDuration_rangeSpansEntireWindow() throws Exception {
+ // Setup - range spans entire window
+ String trafficCalcConfigJson = """
+ {
+ "traffic_calc_allowlist_ranges": [
+ [3000, 12000]
+ ]
+ }
+ """;
+ createConfigFromPartialJson(trafficCalcConfigJson);
+
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Act - window [5000, 10000], range [3000, 12000]
+ long duration = calculator.getAllowlistDuration(10000L, 5000L);
+
+ // Assert - entire window is in traffic calc config ranges = 5000
+ assertEquals(5000L, duration);
+ }
+
+ // ============================================================================
+ // SECTION 4.5: calculateWindowStartWithAllowlist()
+ // ============================================================================
+
+ @Test
+ void testCalculateWindowStartWithAllowlist_noAllowlist() throws Exception {
+ // Setup - no allowlist ranges
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Act - window should be [3, 8] with no extension
+ long windowStart = calculator.calculateWindowStartWithAllowlist(8L, 5);
+
+ // Assert - no allowlist, so window start is simply newestDeltaTs - evaluationWindowSeconds
+ assertEquals(3L, windowStart);
+ }
+
+ @Test
+ void testCalculateWindowStartWithAllowlist_allowlistInOriginalWindowOnly() throws Exception {
+ // Setup - allowlist range only in original window, not in extended portion
+ String trafficCalcConfigJson = """
+ {
+ "traffic_calc_allowlist_ranges": [
+ [6, 7]
+ ]
+ }
+ """;
+ createConfigFromPartialJson(trafficCalcConfigJson);
+
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Act - newestDeltaTs=8, evaluationWindow=5
+ // Original window [3, 8] has [6,7] allowlisted (1 hour)
+ // Extended portion [2, 3] has no allowlist
+ // So window start should be 8 - 5 - 1 = 2
+ long windowStart = calculator.calculateWindowStartWithAllowlist(8L, 5);
+
+ assertEquals(2L, windowStart);
+ }
+
+ @Test
+ void testCalculateWindowStartWithAllowlist_allowlistInExtendedPortion() throws Exception {
+ // Setup - allowlist ranges in both original window AND extended portion
+ // This is the user's example: evaluationWindow=5, newestDeltaTs=8, allowlist={[2,3], [6,7]}
+ String trafficCalcConfigJson = """
+ {
+ "traffic_calc_allowlist_ranges": [
+ [2, 3],
+ [6, 7]
+ ]
+ }
+ """;
+ createConfigFromPartialJson(trafficCalcConfigJson);
+
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Act
+ // Original window [3, 8]: [6,7] allowlisted = 1 hour
+ // First extension to [2, 8]: [2,3] and [6,7] allowlisted = 2 hours total
+ // Second extension to [1, 8]: still [2,3] and [6,7] = 2 hours (no new allowlist)
+ // Final: windowStart = 8 - 5 - 2 = 1
+ long windowStart = calculator.calculateWindowStartWithAllowlist(8L, 5);
+
+ assertEquals(1L, windowStart);
+ }
+
+ @Test
+ void testCalculateWindowStartWithAllowlist_allowlistBeforeWindow() throws Exception {
+ // Setup - allowlist range entirely before the initial window
+ // This tests that we don't over-extend when allowlist is old
+ // evaluationWindow=5, newestDeltaTs=20, allowlist=[10,13]
+ String trafficCalcConfigJson = """
+ {
+ "traffic_calc_allowlist_ranges": [
+ [10, 13]
+ ]
+ }
+ """;
+ createConfigFromPartialJson(trafficCalcConfigJson);
+
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Act
+ // Initial window [15, 20]: no allowlist overlap, allowlistDuration = 0
+ // No extension needed
+ // Final: windowStart = 20 - 5 - 0 = 15
+ long windowStart = calculator.calculateWindowStartWithAllowlist(20L, 5);
+
+ // Verify: window [15, 20] has 5 hours, 0 allowlisted = 5 non-allowlisted
+ assertEquals(15L, windowStart);
+ }
+
+ // ============================================================================
+ // SECTION 5: determineStatus()
+ // ============================================================================
+
+ @Test
+ void testDetermineStatus_belowThreshold() throws Exception {
+ // Setup - below threshold
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Act - 10 < 5 * 3
+ OptOutTrafficCalculator.TrafficStatus status = calculator.determineStatus(10, 3);
+
+ // Assert - DEFAULT when below threshold
+ assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, status);
+ }
+
+ @Test
+ void testDetermineStatus_atThreshold() throws Exception {
+ // Setup - at threshold
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Act - 15 == 5 * 3
+ OptOutTrafficCalculator.TrafficStatus status = calculator.determineStatus(15, 3);
+
+ // Assert - DELAYED_PROCESSING when at threshold
+ assertEquals(OptOutTrafficCalculator.TrafficStatus.DELAYED_PROCESSING, status);
+ }
+
+ @Test
+ void testDetermineStatus_aboveThreshold() throws Exception {
+ // Setup - above threshold
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Act - 20 > 5 * 3
+ OptOutTrafficCalculator.TrafficStatus status = calculator.determineStatus(20, 3);
+
+ // Assert - DELAYED_PROCESSING when above threshold
+ assertEquals(OptOutTrafficCalculator.TrafficStatus.DELAYED_PROCESSING, status);
+ }
+
+ @Test
+ void testDetermineStatus_sumPastZero() throws Exception {
+ // Setup - sumPast is 0
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Act - should return DEFAULT to avoid crash
+ OptOutTrafficCalculator.TrafficStatus status = calculator.determineStatus(100, 0);
+
+ // Assert
+ assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, status);
+ }
+
+ @Test
+ void testDetermineStatus_bothZero() throws Exception {
+ // Setup - both sumCurrent and sumPast are 0;
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Act - should return DEFAULT to avoid crash
+ OptOutTrafficCalculator.TrafficStatus status = calculator.determineStatus(0, 0);
+
+ // Assert
+ assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, status);
+ }
+
+ @Test
+ void testDetermineStatus_sumCurrentZero() throws Exception {
+ // Setup - sumCurrent is 0
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Act - 0 < 5 * 10
+ OptOutTrafficCalculator.TrafficStatus status = calculator.determineStatus(0, 10);
+
+ // Assert - DEFAULT when sumCurrent is 0
+ assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, status);
+ }
+
+ @ParameterizedTest
+ @CsvSource({
+ "1, 1, 1, DELAYED_PROCESSING", // threshold=1: 1 >= 1*1
+ "2, 4, 2, DELAYED_PROCESSING", // threshold=2: 4 >= 2*2
+ "5, 10, 2, DELAYED_PROCESSING", // threshold=5: 10 >= 5*2
+ "10, 100, 10, DELAYED_PROCESSING", // threshold=10: 100 >= 10*10
+ "5, 24, 5, DEFAULT", // threshold=5: 24 < 5*5
+ "100, 1000, 11, DEFAULT" // threshold=100: 1000 < 100*11
+ })
+ void testDetermineStatus_variousThresholds(int threshold, int sumCurrent, int sumPast, String expectedStatus) throws Exception {
+ // Setup - various thresholds
+ createConfigWithThreshold(threshold);
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Act
+ OptOutTrafficCalculator.TrafficStatus status = calculator.determineStatus(sumCurrent, sumPast);
+
+ // Assert
+ assertEquals(OptOutTrafficCalculator.TrafficStatus.valueOf(expectedStatus), status);
+ }
+
+ @Test
+ void testDetermineStatus_largeNumbers() throws Exception {
+ // Setup - test with large numbers
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Act
+ OptOutTrafficCalculator.TrafficStatus status = calculator.determineStatus(1_000_000, 200_000);
+
+ // Assert - 1M >= 5 * 200K = 1M
+ assertEquals(OptOutTrafficCalculator.TrafficStatus.DELAYED_PROCESSING, status);
+ }
+
+ // ============================================================================
+ // SECTION 6: S3 Config Reload Tests
+ // ============================================================================
+
+ @Test
+ void testReloadTrafficCalcConfig_success() throws Exception {
+ // Setup - initial traffic calc config
+ String trafficCalcConfigJson = """
+ {
+ "traffic_calc_allowlist_ranges": [
+ [1000, 2000],
+ [3000, 4000]
+ ]
+ }
+ """;
+ createConfigFromPartialJson(trafficCalcConfigJson);
+
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Change the traffic calc config to a new range
+ String newTrafficCalcConfigJson = """
+ {
+ "traffic_calc_allowlist_ranges": [
+ [5000, 6000]
+ ]
+ }
+ """;
+ createConfigFromPartialJson(newTrafficCalcConfigJson);
+
+ // Act - reload the traffic calc config
+ calculator.reloadTrafficCalcConfig();
+
+ // Assert - verify new traffic calc config is loaded
+ assertTrue(calculator.isInAllowlist(5500L));
+ }
+
+ @Test
+ void testReloadTrafficCalcConfig_failure() throws Exception {
+ // Setup - initial traffic calc config
+ String trafficCalcConfigJson = """
+ {
+ "traffic_calc_allowlist_ranges": [
+ [1000, 2000]
+ ]
+ }
+ """;
+ createConfigFromPartialJson(trafficCalcConfigJson);
+
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Now make it fail
+ createTrafficConfigFile("Invalid JSON");
+
+ // Act - should not throw exception
+ assertThrows(MalformedTrafficCalcConfigException.class, () -> {
+ calculator.reloadTrafficCalcConfig();
+ });
+
+ }
+
+ @Test
+ public void testReloadTrafficCalcConfig_failure_missingKeys() throws Exception {
+ // Setup
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Act & Assert missing threshold multiplier
+ createTrafficConfigFile("{\"traffic_calc_evaluation_window_seconds\": 86400, \"traffic_calc_baseline_traffic\": 100, \"traffic_calc_allowlist_ranges\": [ [1000, 2000] ]}");
+ assertThrows(MalformedTrafficCalcConfigException.class, () -> {
+ calculator.reloadTrafficCalcConfig();
+ });
+
+ // Act & Assert missing evaluation window seconds
+ createTrafficConfigFile("{\"traffic_calc_threshold_multiplier\": 5, \"traffic_calc_baseline_traffic\": 100, \"traffic_calc_allowlist_ranges\": [ [1000, 2000] ]}");
+ assertThrows(MalformedTrafficCalcConfigException.class, () -> {
+ calculator.reloadTrafficCalcConfig();
+ });
+
+ // Act & Assert missing baseline traffic
+ createTrafficConfigFile("{\"traffic_calc_threshold_multiplier\": 5, \"traffic_calc_evaluation_window_seconds\": 86400, \"traffic_calc_allowlist_ranges\": [ [1000, 2000] ]}");
+ assertThrows(MalformedTrafficCalcConfigException.class, () -> {
+ calculator.reloadTrafficCalcConfig();
+ });
+
+ // Act & Assert missing traffic calc config ranges
+ createTrafficConfigFile("{\"traffic_calc_threshold_multiplier\": 5, \"traffic_calc_evaluation_window_seconds\": 86400, \"traffic_calc_baseline_traffic\": 100}");
+ assertThrows(MalformedTrafficCalcConfigException.class, () -> {
+ calculator.reloadTrafficCalcConfig();
+ });
+ }
+
+ @Test
+ public void testReloadTrafficCalcConfig_failure_misorderedRanges() throws Exception {
+ // Setup - misordered ranges
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+ createConfigFromPartialJson("{\"traffic_calc_allowlist_ranges\": [ [2000, 1000] ]}");
+
+ // Act & Assert
+ assertThrows(MalformedTrafficCalcConfigException.class, () -> {
+ calculator.reloadTrafficCalcConfig();
+ });
+ }
+
+ @Test
+ public void testReloadTrafficCalcConfig_failure_rangeTooLong() throws Exception {
+ // Setup - range greater than 24 hours
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+ createConfigFromPartialJson("{\"traffic_calc_allowlist_ranges\": [ [1000, 200000] ]}");
+
+ // Act & Assert
+ assertThrows(MalformedTrafficCalcConfigException.class, () -> {
+ calculator.reloadTrafficCalcConfig();
+ });
+ }
+
+ // ============================================================================
+ // SECTION 7: Cache Management Tests (also tested in section 9)
+ // ============================================================================
+
+ @Test
+ void testGetCacheStats_emptyCache() throws Exception {
+ // Setup
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Act
+ Map stats = calculator.getCacheStats();
+
+ // Assert - should return empty stats
+ assertEquals(0, stats.get("cached_files"));
+ assertEquals(0, stats.get("total_cached_timestamps"));
+ }
+
+ // ============================================================================
+ // SECTION 8: Helper Methods for Test Data Creation
+ // ============================================================================
+
+ /**
+ * Create a mock SQS message with specified timestamp
+ */
+ private Message createSqsMessage(long timestampSeconds) {
+ Map attributes = new HashMap<>();
+ attributes.put(MessageSystemAttributeName.SENT_TIMESTAMP, String.valueOf(timestampSeconds * 1000));
+
+ return Message.builder()
+ .messageId("test-msg-" + timestampSeconds)
+ .body("{\"test\": \"data\"}")
+ .attributes(attributes)
+ .build();
+ }
+
+ /**
+ * Create a mock SQS message without timestamp
+ */
+ private Message createSqsMessageWithoutTimestamp() {
+ return Message.builder()
+ .messageId("test-msg-no-timestamp")
+ .body("{\"test\": \"data\"}")
+ .attributes(new HashMap<>())
+ .build();
+ }
+
+ /**
+ * Create delta file bytes with specified timestamps
+ */
+ private byte[] createDeltaFileBytes(List timestamps) throws Exception {
+ // Create OptOutEntry objects using newTestEntry
+ List entries = new ArrayList<>();
+
+ long idCounter = 1000; // Use incrementing IDs for test entries
+ for (long timestamp : timestamps) {
+ entries.add(OptOutEntry.newTestEntry(idCounter++, timestamp));
+ }
+
+ // Create OptOutCollection
+ OptOutCollection collection = new OptOutCollection(entries.toArray(new OptOutEntry[0]));
+ return collection.getStore();
+ }
+
+
+ // ============================================================================
+ // SECTION 9: Tests for calculateStatus()
+ // ============================================================================
+
+ @Test
+ void testCalculateStatus_noDeltaFiles() throws Exception {
+ // Setup - no delta files
+ when(cloudStorage.list(S3_DELTA_PREFIX)).thenReturn(Collections.emptyList());
+
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Act
+ OptOutTrafficCalculator.TrafficStatus status = calculator.calculateStatus(Collections.emptyList());
+
+ // Assert - should return DEFAULT when no delta files
+ assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, status);
+ }
+
+ @Test
+ void testCalculateStatus_normalTraffic() throws Exception {
+ // Setup - setup time: current time
+ long currentTime = System.currentTimeMillis() / 1000;
+ long t = currentTime;
+
+ // Create delta files with timestamps distributed over 48 hours
+ List timestamps = new ArrayList<>();
+
+ // add 499 entries in current window
+ for (int i = 0; i < 49; i++) {
+ timestamps.add(t - 23*3600 + i * 60);
+ }
+
+ byte[] deltaFileBytes = createDeltaFileBytes(timestamps);
+
+ when(cloudStorage.list(S3_DELTA_PREFIX)).thenReturn(Arrays.asList("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat"));
+ when(cloudStorage.download("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat"))
+ .thenReturn(new ByteArrayInputStream(deltaFileBytes));
+
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Act
+ List sqsMessages = Arrays.asList(createSqsMessage(t));
+ OptOutTrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages);
+
+ // Assert - 100+1 < 5 * 50 = 250, so should be DEFAULT
+ assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, status);
+ }
+
+ @Test
+ void testCalculateStatus_delayedProcessing() throws Exception {
+ // Setup - create delta files with spike in current window
+ long currentTime = System.currentTimeMillis() / 1000;
+ long t = currentTime;
+
+ // Create delta files with spike in current window
+ List timestamps = new ArrayList<>();
+
+ // add 500 entries in current window
+ for (int i = 0; i < 500; i++) {
+ timestamps.add(t - 23*3600 + i * 60);
+ }
+
+ byte[] deltaFileBytes = createDeltaFileBytes(timestamps);
+
+ when(cloudStorage.list(S3_DELTA_PREFIX)).thenReturn(Arrays.asList("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat"));
+ when(cloudStorage.download("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat"))
+ .thenReturn(new ByteArrayInputStream(deltaFileBytes));
+
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Act
+ List sqsMessages = Arrays.asList(createSqsMessage(t));
+ OptOutTrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages);
+
+ // Assert - 100+1 >= 5 * 10 = 50, DELAYED_PROCESSING
+ assertEquals(OptOutTrafficCalculator.TrafficStatus.DELAYED_PROCESSING, status);
+ }
+
+ @Test
+ void testCalculateStatus_noSqsMessages() throws Exception {
+ // Setup - create delta files with some entries
+ long currentTime = System.currentTimeMillis() / 1000;
+ long t = currentTime;
+
+ List timestamps = Arrays.asList(t - 3600, t - 7200); // Some entries
+ byte[] deltaFileBytes = createDeltaFileBytes(timestamps);
+
+ when(cloudStorage.list(S3_DELTA_PREFIX)).thenReturn(Arrays.asList("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat"));
+ when(cloudStorage.download("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat"))
+ .thenReturn(new ByteArrayInputStream(deltaFileBytes));
+
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Act - null SQS messages
+ OptOutTrafficCalculator.TrafficStatus status = calculator.calculateStatus(null);
+
+ // Assert - should still calculate based on delta files, DEFAULT
+ assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, status);
+ }
+
+ @Test
+ void testCalculateStatus_emptySqsMessages() throws Exception {
+ // Setup - create delta files with some entries
+ long currentTime = System.currentTimeMillis() / 1000;
+ long t = currentTime;
+
+ List timestamps = Arrays.asList(t - 3600);
+ byte[] deltaFileBytes = createDeltaFileBytes(timestamps);
+
+ when(cloudStorage.list(S3_DELTA_PREFIX)).thenReturn(Arrays.asList("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat"));
+ when(cloudStorage.download("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat"))
+ .thenReturn(new ByteArrayInputStream(deltaFileBytes));
+
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Act - empty SQS messages
+ OptOutTrafficCalculator.TrafficStatus status = calculator.calculateStatus(Collections.emptyList());
+
+ // Assert - should still calculate based on delta files, DEFAULT
+ assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, status);
+ }
+
+ @Test
+ void testCalculateStatus_multipleSqsMessages() throws Exception {
+ // Setup - create delta files with some entries
+ long currentTime = System.currentTimeMillis() / 1000;
+ long t = currentTime;
+
+ List timestamps = new ArrayList<>();
+ // add 470 entries in window
+ for (int i = 0; i < 470; i++) {
+ timestamps.add(t - 24*3600 + i * 60);
+ }
+
+ byte[] deltaFileBytes = createDeltaFileBytes(timestamps);
+
+ when(cloudStorage.list(S3_DELTA_PREFIX)).thenReturn(Arrays.asList("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat"));
+ when(cloudStorage.download("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat"))
+ .thenReturn(new ByteArrayInputStream(deltaFileBytes));
+
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Add 30 SQS entries in [t, t+5min]
+ List sqsMessages = new ArrayList<>();
+ for (int i = 0; i < 30; i++) {
+ sqsMessages.add(createSqsMessage(t - i * 10));
+ }
+ OptOutTrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages);
+
+ // Assert - DELAYED_PROCESSING
+ assertEquals(OptOutTrafficCalculator.TrafficStatus.DELAYED_PROCESSING, status);
+ }
+
+ @Test
+ void testCalculateStatus_withTrafficCalcConfig() throws Exception {
+ // Setup - create delta files with some entries
+ long currentTime = System.currentTimeMillis() / 1000;
+ long t = currentTime;
+
+ // Traffic calc config that covers part of window
+ String trafficCalcConfigJson = String.format("""
+ {
+ "traffic_calc_allowlist_ranges": [
+ [%d, %d]
+ ]
+ }
+ """, t - 12*3600, t - 6*3600);
+
+ List timestamps = new ArrayList<>();
+
+ // window - 600 entries (300 in traffic calc config range, 300 outside)
+ for (int i = 0; i < 300; i++) {
+ timestamps.add(t - 12*3600 + i);
+ }
+ for (int i = 0; i < 300; i++) {
+ timestamps.add(t - 3600 + i);
+ }
+
+ byte[] deltaFileBytes = createDeltaFileBytes(timestamps);
+
+ createConfigFromPartialJson(trafficCalcConfigJson);
+ when(cloudStorage.list(S3_DELTA_PREFIX)).thenReturn(Arrays.asList("optout-v2/delta/delta-001.dat"));
+ when(cloudStorage.download("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat"))
+ .thenReturn(new ByteArrayInputStream(deltaFileBytes));
+
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Act
+ List sqsMessages = Arrays.asList(createSqsMessage(t));
+ OptOutTrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages);
+
+ // Assert - should filter out entries in traffic calc config ranges
+ // Only 300 from window count (not in traffic calc config ranges) + 1 SQS = 301
+ // 301 < 5*100, so DEFAULT
+ assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, status);
+ }
+
+ @Test
+ void testCalculateStatus_cacheUtilization() throws Exception {
+ // Setup - create delta files with some entries
+ long currentTime = System.currentTimeMillis() / 1000;
+ long t = currentTime;
+
+ List timestamps = Arrays.asList(t - 3600, t - 7200);
+ byte[] deltaFileBytes = createDeltaFileBytes(timestamps);
+
+ when(cloudStorage.list(S3_DELTA_PREFIX)).thenReturn(Arrays.asList("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat"));
+ when(cloudStorage.download("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat"))
+ .thenReturn(new ByteArrayInputStream(deltaFileBytes));
+
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Act - first call should populate cache
+ List sqsMessages = Arrays.asList(createSqsMessage(t));
+ calculator.calculateStatus(sqsMessages);
+
+ Map stats = calculator.getCacheStats();
+ int cachedFiles = (Integer) stats.get("cached_files");
+
+ // Second call should use cache (no additional S3 download)
+ calculator.calculateStatus(sqsMessages);
+
+ Map stats2 = calculator.getCacheStats();
+ int cachedFiles2 = (Integer) stats2.get("cached_files");
+
+ // Assert - cache should be populated and remain consistent
+ assertEquals(1, cachedFiles);
+ assertEquals(cachedFiles, cachedFiles2);
+
+ // Verify S3 download was called only once per file
+ verify(cloudStorage, times(1)).download("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat");
+ }
+
+ @Test
+ void testCalculateStatus_s3Exception() throws Exception {
+ // Setup - S3 list error
+ when(cloudStorage.list(S3_DELTA_PREFIX)).thenThrow(new RuntimeException("S3 error"));
+
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Act - should not throw exception
+ OptOutTrafficCalculator.TrafficStatus status = calculator.calculateStatus(Collections.emptyList());
+
+ // Assert - DEFAULT on error
+ assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, status);
+ }
+
+ @Test
+ void testCalculateStatus_deltaFileReadException() throws Exception {
+ // Setup - S3 download error
+ when(cloudStorage.list(S3_DELTA_PREFIX)).thenReturn(Arrays.asList("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat"));
+ when(cloudStorage.download("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat"))
+ .thenThrow(new CloudStorageException("Failed to download"));
+
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Act - empty SQS messages
+ OptOutTrafficCalculator.TrafficStatus status = calculator.calculateStatus(Collections.emptyList());
+
+ // Assert - DEFAULT on error
+ assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, status);
+ }
+
+ @Test
+ void testCalculateStatus_invalidSqsMessageTimestamp() throws Exception {
+ // Setup - create delta files with some entries
+ long currentTime = System.currentTimeMillis() / 1000;
+ long t = currentTime;
+
+ List timestamps = Arrays.asList(t - 3600);
+ byte[] deltaFileBytes = createDeltaFileBytes(timestamps);
+
+ when(cloudStorage.list(S3_DELTA_PREFIX)).thenReturn(Arrays.asList("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat"));
+ when(cloudStorage.download("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat"))
+ .thenReturn(new ByteArrayInputStream(deltaFileBytes));
+
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Act - SQS message without timestamp (should use current time)
+ List sqsMessages = Arrays.asList(createSqsMessageWithoutTimestamp());
+ OptOutTrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages);
+
+ // Assert - DEFAULT
+ assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, status);
+ }
+
+ @Test
+ void testCalculateStatus_multipleDeltaFiles() throws Exception {
+ // Setup - create delta files with some entries
+ long currentTime = System.currentTimeMillis() / 1000;
+ long t = currentTime;
+
+ // File 1 - recent entries
+ List timestamps1 = new ArrayList<>();
+ for (int i = 0; i < 50; i++) {
+ timestamps1.add(t - 12*3600 + i * 1000);
+ }
+ byte[] deltaFileBytes1 = createDeltaFileBytes(timestamps1);
+
+ // File 2 - older entries
+ List timestamps2 = new ArrayList<>();
+ for (int i = 0; i < 30; i++) {
+ timestamps2.add(t - 36*3600 + i * 1000);
+ }
+ byte[] deltaFileBytes2 = createDeltaFileBytes(timestamps2);
+
+ when(cloudStorage.list(S3_DELTA_PREFIX)).thenReturn(Arrays.asList(
+ "optout-v2/delta/optout-delta--01_2025-11-13T02.00.00Z_bbbbbbbb.dat",
+ "optout-v2/delta/optout-delta--01_2025-11-13T01.00.00Z_aaaaaaaa.dat"
+ ));
+ when(cloudStorage.download("optout-v2/delta/optout-delta--01_2025-11-13T02.00.00Z_bbbbbbbb.dat"))
+ .thenReturn(new ByteArrayInputStream(deltaFileBytes1));
+ when(cloudStorage.download("optout-v2/delta/optout-delta--01_2025-11-13T01.00.00Z_aaaaaaaa.dat"))
+ .thenReturn(new ByteArrayInputStream(deltaFileBytes2));
+
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Act
+ List sqsMessages = Arrays.asList(createSqsMessage(t));
+ OptOutTrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages);
+
+ // Assert - DEFAULT
+ assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, status);
+
+ // Verify cache has both files
+ Map stats = calculator.getCacheStats();
+ assertEquals(2, stats.get("cached_files"));
+ }
+
+ @Test
+ void testCalculateStatus_windowBoundaryTimestamp() throws Exception {
+ // Setup - create delta file with timestamps at window boundary
+ long currentTime = System.currentTimeMillis() / 1000;
+ long t = currentTime;
+ long currentWindowStart = t - 24*3600;
+ List timestamps = new ArrayList<>();
+ for (int i = 0; i < 250; i++) {
+ timestamps.add(t);
+ }
+ for (int i = 0; i < 250; i++) {
+ timestamps.add(currentWindowStart);
+ }
+ byte[] deltaFileBytes = createDeltaFileBytes(timestamps);
+
+ when(cloudStorage.list(S3_DELTA_PREFIX)).thenReturn(Arrays.asList("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat"));
+ when(cloudStorage.download("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat"))
+ .thenReturn(new ByteArrayInputStream(deltaFileBytes));
+
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Act
+ List sqsMessages = Arrays.asList(createSqsMessage(t));
+ OptOutTrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages);
+
+ // Assert - DEFAULT
+ assertEquals(OptOutTrafficCalculator.TrafficStatus.DELAYED_PROCESSING, status);
+ }
+
+ @Test
+ void testCalculateStatus_timestampsCached() throws Exception {
+ // Setup - create delta files with some entries
+ long currentTime = System.currentTimeMillis() / 1000;
+ long t = currentTime;
+
+ List timestamps = Arrays.asList(t - 3600, t - 7200);
+ byte[] deltaFileBytes = createDeltaFileBytes(timestamps);
+
+ when(cloudStorage.list(S3_DELTA_PREFIX)).thenReturn(Arrays.asList("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat"));
+ when(cloudStorage.download("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat"))
+ .thenReturn(new ByteArrayInputStream(deltaFileBytes));
+
+ OptOutTrafficCalculator calculator = new OptOutTrafficCalculator(
+ cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
+
+ // Act
+ List sqsMessages = Arrays.asList(createSqsMessage(t));
+ OptOutTrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages);
+
+ // Assert
+ assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, status);
+
+ // Cache should contain the timestamps
+ Map stats = calculator.getCacheStats();
+ assertEquals(2, stats.get("total_cached_timestamps"));
+ }
+
+}
diff --git a/src/test/java/com/uid2/optout/vertx/OptOutTrafficFilterTest.java b/src/test/java/com/uid2/optout/vertx/OptOutTrafficFilterTest.java
new file mode 100644
index 0000000..63f6807
--- /dev/null
+++ b/src/test/java/com/uid2/optout/vertx/OptOutTrafficFilterTest.java
@@ -0,0 +1,424 @@
+package com.uid2.optout.vertx;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import software.amazon.awssdk.services.sqs.model.Message;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import static org.junit.Assert.*;
+
+public class OptOutTrafficFilterTest {
+
+ private static final String TEST_CONFIG_PATH = "./traffic-config.json";
+
+ @Before
+ public void setUp() {
+ try {
+ Files.deleteIfExists(Path.of(TEST_CONFIG_PATH));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @After
+ public void tearDown() {
+ try {
+ Files.deleteIfExists(Path.of(TEST_CONFIG_PATH));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testParseFilterRules_emptyRules() throws Exception {
+ // Setup - empty denylist
+ String config = """
+ {
+ "denylist_requests": []
+ }
+ """;
+ Files.writeString(Path.of(TEST_CONFIG_PATH), config);
+
+ // Act & Assert - no rules
+ OptOutTrafficFilter filter = new OptOutTrafficFilter(TEST_CONFIG_PATH);
+ assertEquals(0, filter.filterRules.size());
+ }
+
+ @Test
+ public void testParseFilterRules_singleRule() throws Exception {
+ // Setup - config with one rule
+ String config = """
+ {
+ "denylist_requests": [
+ {
+ "range": [1700000000, 1700003600],
+ "IPs": ["192.168.1.1"]
+ }
+ ]
+ }
+ """;
+ Files.writeString(Path.of(TEST_CONFIG_PATH), config);
+
+ // Act & Assert - one rule
+ OptOutTrafficFilter filter = new OptOutTrafficFilter(TEST_CONFIG_PATH);
+ assertEquals(1, filter.filterRules.size());
+ }
+
+ @Test
+ public void testParseFilterRules_multipleRules() throws Exception {
+ // Setup - config with multiple rules
+ String config = """
+ {
+ "denylist_requests": [
+ {
+ "range": [1700000000, 1700003600],
+ "IPs": ["192.168.1.1"]
+ },
+ {
+ "range": [1700010000, 1700013600],
+ "IPs": ["10.0.0.1", "10.0.0.2"]
+ }
+ ]
+ }
+ """;
+ Files.writeString(Path.of(TEST_CONFIG_PATH), config);
+
+ // Act & Assert - two rules
+ OptOutTrafficFilter filter = new OptOutTrafficFilter(TEST_CONFIG_PATH);
+ assertEquals(2, filter.filterRules.size());
+ }
+
+ @Test(expected = OptOutTrafficFilter.MalformedTrafficFilterConfigException.class)
+ public void testParseFilterRules_missingDenylistRequests() throws Exception {
+ // Setup - config without denylist_requests field
+ String config = """
+ {
+ "other_field": "value"
+ }
+ """;
+ Files.writeString(Path.of(TEST_CONFIG_PATH), config);
+
+ // Act & Assert - throws exception
+ new OptOutTrafficFilter(TEST_CONFIG_PATH);
+ }
+
+ @Test(expected = OptOutTrafficFilter.MalformedTrafficFilterConfigException.class)
+ public void testParseFilterRules_invalidRange_startAfterEnd() throws Exception {
+ // Setup - range where start > end
+ String config = """
+ {
+ "denylist_requests": [
+ {
+ "range": [1700003600, 1700000000],
+ "IPs": ["192.168.1.1"]
+ }
+ ]
+ }
+ """;
+ Files.writeString(Path.of(TEST_CONFIG_PATH), config);
+
+ // Act & Assert - throws exception
+ new OptOutTrafficFilter(TEST_CONFIG_PATH);
+ }
+
+ @Test(expected = OptOutTrafficFilter.MalformedTrafficFilterConfigException.class)
+ public void testParseFilterRules_invalidRange_startEqualsEnd() throws Exception {
+ // Setup - range where start == end
+ String config = """
+ {
+ "denylist_requests": [
+ {
+ "range": [1700000000, 1700000000],
+ "IPs": ["192.168.1.1"]
+ }
+ ]
+ }
+ """;
+ Files.writeString(Path.of(TEST_CONFIG_PATH), config);
+
+ // Act & Assert - throws exception
+ new OptOutTrafficFilter(TEST_CONFIG_PATH);
+ }
+
+ @Test(expected = OptOutTrafficFilter.MalformedTrafficFilterConfigException.class)
+ public void testParseFilterRules_rangeExceeds24Hours() throws Exception {
+ // Setup - range longer than 24 hours (86400 seconds)
+ String config = """
+ {
+ "denylist_requests": [
+ {
+ "range": [1700000000, 1700086401],
+ "IPs": ["192.168.1.1"]
+ }
+ ]
+ }
+ """;
+ Files.writeString(Path.of(TEST_CONFIG_PATH), config);
+
+ // Act & Assert - throws exception
+ new OptOutTrafficFilter(TEST_CONFIG_PATH);
+ }
+
+ @Test(expected = OptOutTrafficFilter.MalformedTrafficFilterConfigException.class)
+ public void testParseFilterRules_emptyIPs() throws Exception {
+ // Setup - rule with empty IP list
+ String config = """
+ {
+ "denylist_requests": [
+ {
+ "range": [1700000000, 1700003600],
+ "IPs": []
+ }
+ ]
+ }
+ """;
+ Files.writeString(Path.of(TEST_CONFIG_PATH), config);
+
+ // Act & Assert - throws exception
+ new OptOutTrafficFilter(TEST_CONFIG_PATH);
+ }
+
+ @Test(expected = OptOutTrafficFilter.MalformedTrafficFilterConfigException.class)
+ public void testParseFilterRules_missingIPs() throws Exception {
+ // Setup - rule without IPs field
+ String config = """
+ {
+ "denylist_requests": [
+ {
+ "range": [1700000000, 1700003600]
+ }
+ ]
+ }
+ """;
+ Files.writeString(Path.of(TEST_CONFIG_PATH), config);
+
+ // Act & Assert - throws exception
+ new OptOutTrafficFilter(TEST_CONFIG_PATH);
+ }
+
+ @Test
+ public void testIsDenylisted_matchingIPAndTimestamp() throws Exception {
+ // Setup - filter with denylist rule
+ String config = """
+ {
+ "denylist_requests": [
+ {
+ "range": [1700000000, 1700003600],
+ "IPs": ["192.168.1.1", "10.0.0.1"]
+ }
+ ]
+ }
+ """;
+ Files.writeString(Path.of(TEST_CONFIG_PATH), config);
+ SqsParsedMessage message = createTestMessage(1700001800, "192.168.1.1");
+
+ // Act & Assert - denylisted
+ OptOutTrafficFilter filter = new OptOutTrafficFilter(TEST_CONFIG_PATH);
+ assertTrue(filter.isDenylisted(message));
+ }
+
+ @Test
+ public void testIsDenylisted_matchingIPOutsideTimeRange() throws Exception {
+ // Setup - filter with denylist rule
+ String config = """
+ {
+ "denylist_requests": [
+ {
+ "range": [1700000000, 1700003600],
+ "IPs": ["192.168.1.1"]
+ }
+ ]
+ }
+ """;
+ Files.writeString(Path.of(TEST_CONFIG_PATH), config);
+ OptOutTrafficFilter filter = new OptOutTrafficFilter(TEST_CONFIG_PATH);
+
+ // Act & Assert - message before range not denylisted
+ SqsParsedMessage messageBefore = createTestMessage(1699999999, "192.168.1.1");
+ assertFalse(filter.isDenylisted(messageBefore));
+ // Act & Assert - message after range not denylisted
+ SqsParsedMessage messageAfter = createTestMessage(1700003601, "192.168.1.1");
+ assertFalse(filter.isDenylisted(messageAfter));
+ }
+
+ @Test
+ public void testIsDenylisted_nonMatchingIP() throws Exception {
+ // Setup - filter with denylist rule
+ String config = """
+ {
+ "denylist_requests": [
+ {
+ "range": [1700000000, 1700003600],
+ "IPs": ["192.168.1.1"]
+ }
+ ]
+ }
+ """;
+ Files.writeString(Path.of(TEST_CONFIG_PATH), config);
+ OptOutTrafficFilter filter = new OptOutTrafficFilter(TEST_CONFIG_PATH);
+
+ // Act & Assert - non-matching IP not denylisted
+ SqsParsedMessage message = createTestMessage(1700001800, "10.0.0.1");
+ assertFalse(filter.isDenylisted(message));
+ }
+
+ @Test
+ public void testIsDenylisted_atRangeBoundaries() throws Exception {
+ // Setup - filter with denylist rule
+ String config = """
+ {
+ "denylist_requests": [
+ {
+ "range": [1700000000, 1700003600],
+ "IPs": ["192.168.1.1"]
+ }
+ ]
+ }
+ """;
+ Files.writeString(Path.of(TEST_CONFIG_PATH), config);
+ OptOutTrafficFilter filter = new OptOutTrafficFilter(TEST_CONFIG_PATH);
+
+ // Act & Assert - message at start boundary (inclusive) denylisted
+ SqsParsedMessage messageAtStart = createTestMessage(1700000000, "192.168.1.1");
+ assertTrue(filter.isDenylisted(messageAtStart));
+
+ // Act & Assert - message at end boundary (inclusive) denylisted
+ SqsParsedMessage messageAtEnd = createTestMessage(1700003600, "192.168.1.1");
+ assertTrue(filter.isDenylisted(messageAtEnd));
+ }
+
+ @Test
+ public void testIsDenylisted_multipleRules() throws Exception {
+ // Setup - multiple denylist rules
+ String config = """
+ {
+ "denylist_requests": [
+ {
+ "range": [1700000000, 1700003600],
+ "IPs": ["192.168.1.1"]
+ },
+ {
+ "range": [1700010000, 1700013600],
+ "IPs": ["10.0.0.1"]
+ }
+ ]
+ }
+ """;
+ Files.writeString(Path.of(TEST_CONFIG_PATH), config);
+
+ OptOutTrafficFilter filter = new OptOutTrafficFilter(TEST_CONFIG_PATH);
+
+ // Act & Assert - message matches first rule
+ SqsParsedMessage msg1 = createTestMessage(1700001800, "192.168.1.1");
+ assertTrue(filter.isDenylisted(msg1));
+
+ // Act & Assert - message matches second rule
+ SqsParsedMessage msg2 = createTestMessage(1700011800, "10.0.0.1");
+ assertTrue(filter.isDenylisted(msg2));
+
+ // Act & Assert - message matches neither rule
+ SqsParsedMessage msg3 = createTestMessage(1700005000, "172.16.0.1");
+ assertFalse(filter.isDenylisted(msg3));
+ }
+
+ @Test
+ public void testIsDenylisted_nullClientIp() throws Exception {
+ // Setup - filter with denylist rule
+ String config = """
+ {
+ "denylist_requests": [
+ {
+ "range": [1700000000, 1700003600],
+ "IPs": ["192.168.1.1"]
+ }
+ ]
+ }
+ """;
+ Files.writeString(Path.of(TEST_CONFIG_PATH), config);
+ OptOutTrafficFilter filter = new OptOutTrafficFilter(TEST_CONFIG_PATH);
+
+ // Act & Assert - message with null IP not denylisted
+ SqsParsedMessage message = createTestMessage(1700001800, null);
+ assertFalse(filter.isDenylisted(message));
+ }
+
+ @Test
+ public void testReloadTrafficFilterConfig_success() throws Exception {
+ // Setup - config with one rule
+ String initialConfig = """
+ {
+ "denylist_requests": [
+ {
+ "range": [1700000000, 1700003600],
+ "IPs": ["192.168.1.1"]
+ }
+ ]
+ }
+ """;
+ Files.writeString(Path.of(TEST_CONFIG_PATH), initialConfig);
+
+ // Act & Assert - one rule
+ OptOutTrafficFilter filter = new OptOutTrafficFilter(TEST_CONFIG_PATH);
+ assertEquals(1, filter.filterRules.size());
+
+ // Setup - update config
+ String updatedConfig = """
+ {
+ "denylist_requests": [
+ {
+ "range": [1700000000, 1700003600],
+ "IPs": ["192.168.1.1"]
+ },
+ {
+ "range": [1700010000, 1700013600],
+ "IPs": ["10.0.0.1"]
+ }
+ ]
+ }
+ """;
+ Files.writeString(Path.of(TEST_CONFIG_PATH), updatedConfig);
+
+ // Act & Assert - two rules
+ filter.reloadTrafficFilterConfig();
+ assertEquals(2, filter.filterRules.size());
+ }
+
+ @Test(expected = OptOutTrafficFilter.MalformedTrafficFilterConfigException.class)
+ public void testReloadTrafficFilterConfig_fileNotFound() throws Exception {
+ // Setup, Act & Assert - try to create filter with non-existent config
+ new OptOutTrafficFilter("./non-existent-file.json");
+ }
+
+ @Test
+ public void testParseFilterRules_maxValidRange() throws Exception {
+ // Setup - range exactly 24 hours (86400 seconds) - should be valid
+ String config = """
+ {
+ "denylist_requests": [
+ {
+ "range": [1700000000, 1700086400],
+ "IPs": ["192.168.1.1"]
+ }
+ ]
+ }
+ """;
+ Files.writeString(Path.of(TEST_CONFIG_PATH), config);
+
+ // Act & Assert - one rule
+ OptOutTrafficFilter filter = new OptOutTrafficFilter(TEST_CONFIG_PATH);
+ assertEquals(1, filter.filterRules.size());
+ }
+
+ /**
+ * Helper method to create test SqsParsedMessage
+ */
+ private SqsParsedMessage createTestMessage(long timestamp, String clientIp) {
+ Message mockMessage = Message.builder().build();
+ byte[] hash = new byte[32];
+ byte[] id = new byte[32];
+ return new SqsParsedMessage(mockMessage, hash, id, timestamp, null, null, clientIp, null);
+ }
+}