From 1b8df9123ebc9c7f0e8a7986d4b39765e0144c2c Mon Sep 17 00:00:00 2001 From: yamnihcg Date: Tue, 3 Aug 2021 13:44:57 -0700 Subject: [PATCH 1/7] All files changed so batching (split on size) feature can be added. --- .../com/newrelic/telemetry/events/Event.java | 3 + .../telemetry/events/EventBatchSender.java | 26 +++- .../telemetry/events/EventBuffer.java | 94 +++++++++++++- .../events/json/EventBatchMarshaller.java | 2 +- .../telemetry/metrics/MetricBatchSender.java | 26 ++++ .../telemetry/metrics/MetricBuffer.java | 115 +++++++++++++++++- .../metrics/json/MetricToJsonStatic.java | 101 +++++++++++++++ .../telemetry/transport/BatchDataSender.java | 4 +- .../events/EventBatchSenderTest.java | 24 ++-- .../telemetry/events/EventBufferTest.java | 86 +++++++++++++ .../metrics/MetricBatchSenderTest.java | 22 ++-- .../telemetry/metrics/MetricBufferTest.java | 107 +++++++++++++++- 12 files changed, 582 insertions(+), 28 deletions(-) create mode 100644 telemetry-core/src/main/java/com/newrelic/telemetry/metrics/json/MetricToJsonStatic.java diff --git a/telemetry-core/src/main/java/com/newrelic/telemetry/events/Event.java b/telemetry-core/src/main/java/com/newrelic/telemetry/events/Event.java index 17fddfdb..d9b5c2a6 100644 --- a/telemetry-core/src/main/java/com/newrelic/telemetry/events/Event.java +++ b/telemetry-core/src/main/java/com/newrelic/telemetry/events/Event.java @@ -7,12 +7,15 @@ import com.newrelic.telemetry.Attributes; import com.newrelic.telemetry.Telemetry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * An Event is ad hoc set of key-value pairs with an associated timestamp, recorded in the New Relic * Metric API. */ public final class Event implements Telemetry { + private static final Logger logger = LoggerFactory.getLogger(Event.class); private final String eventType; private final Attributes attributes; private final long timestamp; // in epoch ms diff --git a/telemetry-core/src/main/java/com/newrelic/telemetry/events/EventBatchSender.java b/telemetry-core/src/main/java/com/newrelic/telemetry/events/EventBatchSender.java index d6970df0..ca1867dd 100644 --- a/telemetry-core/src/main/java/com/newrelic/telemetry/events/EventBatchSender.java +++ b/telemetry-core/src/main/java/com/newrelic/telemetry/events/EventBatchSender.java @@ -17,6 +17,7 @@ import com.newrelic.telemetry.util.Utils; import java.net.MalformedURLException; import java.net.URL; +import java.util.ArrayList; import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,10 +57,33 @@ public Response sendBatch(EventBatch batch) throws ResponseException { "Sending an event batch (number of events: {}) to the New Relic event ingest endpoint)", batch.size()); String json = marshaller.toJson(batch); - return sender.send(json, batch); } + /** + * Send batches of events to New Relic. + * + * @param eventBatches An ArrayList where each element is an EventBatch. Each EventBatch will be + * sent to New Relic one by one. Each batch will be drained of accumulated events as a part of + * this process. + * @return The response from the ingest API. + * @throws ResponseException In cases where the batch is unable to be successfully sent, one of + * the subclasses of {@link ResponseException} will be thrown. See the documentation on that + * hierarchy for details on the recommended ways to respond to those exceptions. + */ + public Response sendBatch(ArrayList eventBatches) throws ResponseException { + Response response = new Response(200, "success", "sent data"); + for (EventBatch batch : eventBatches) { + response = sendBatch(batch); + + // If there is an issue with a batch, stop sending batches + if ((response.getStatusCode() != 200)) { + return response; + } + } + return response; + } + /** * Creates a new EventBatchSender with the given supplier of HttpPoster impl and a BaseConfig * instance, with all configuration NOT in BaseConfig being default. diff --git a/telemetry-core/src/main/java/com/newrelic/telemetry/events/EventBuffer.java b/telemetry-core/src/main/java/com/newrelic/telemetry/events/EventBuffer.java index 512b7a01..f49c6ee7 100644 --- a/telemetry-core/src/main/java/com/newrelic/telemetry/events/EventBuffer.java +++ b/telemetry-core/src/main/java/com/newrelic/telemetry/events/EventBuffer.java @@ -5,8 +5,10 @@ package com.newrelic.telemetry.events; import com.newrelic.telemetry.Attributes; +import com.newrelic.telemetry.events.json.EventBatchMarshaller; import com.newrelic.telemetry.util.IngestWarnings; import com.newrelic.telemetry.util.Utils; +import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; import org.slf4j.Logger; @@ -23,6 +25,7 @@ public final class EventBuffer { private final Queue events = new ConcurrentLinkedQueue<>(); private final IngestWarnings ingestWarnings = new IngestWarnings(); private final Attributes commonAttributes; + private final boolean splitBatch; /** * Create a new buffer with the provided common set of attributes. @@ -31,7 +34,19 @@ public final class EventBuffer { * {@link Event} in this buffer. */ public EventBuffer(Attributes commonAttributes) { + this(commonAttributes, false); + } + + /** + * Create a new buffer with the provided common set of attributes. + * + * @param commonAttributes These attributes will be appended (by the New Relic backend) to every + * {@link Event} in this buffer. + * @param splitOnSizeLimit Flag to indicate whether to split batch when size limit is hit. + */ + public EventBuffer(Attributes commonAttributes, boolean splitOnSizeLimit) { this.commonAttributes = Utils.verifyNonNull(commonAttributes); + this.splitBatch = splitOnSizeLimit; } /** @@ -64,7 +79,7 @@ public int size() { * * @return A new {@link EventBatch} with an immutable collection of {@link Event Events}. */ - public EventBatch createBatch() { + public EventBatch createSingleBatch() { logger.debug("Creating Event batch."); Collection eventsForBatch = new ArrayList<>(this.events.size()); @@ -73,10 +88,85 @@ public EventBatch createBatch() { while ((event = this.events.poll()) != null) { eventsForBatch.add(event); } - return new EventBatch(eventsForBatch, this.commonAttributes); } + /** + * Creates an {@link ArrayList} from the contents of this buffer, then clears the + * contents of this buffer. + * + *

{@link Event Events} are added to an EventBatch. When each event is added, the size (in + * bytes) of the event is calculated. When the total size of the events in the batch exceeds the + * MAX_UNCOMPRESSED_BATCH_SIZE, the current batch is sent to New Relic, and a new EventBatch is + * created. This process repeats until all events are removed from the queue. + * + * @return An {@link ArrayList}. Each {@link EventBatch} in the ArrayList contains an + * immutable collection of {@link Event Events}. + */ + public ArrayList createBatches() { + logger.debug("Creating Event batch."); + + int currentUncompressedBatchSize = 0; + int MAX_UNCOMPRESSED_BATCH_SIZE = 180000000; + + ArrayList batches = new ArrayList<>(); + Collection eventsForBatch = new ArrayList<>(); + + // Drain the Event buffer and return the batch + Event event; + + while ((event = this.events.poll()) != null) { + String partialEventJson = EventBatchMarshaller.mapToJson(event); + + // Insert common attributes into event JSON + + Map attrs = getCommonAttributes().asMap(); + String fullEventJson = partialEventJson.substring(0, partialEventJson.length() - 1); + Set keys = getCommonAttributes().asMap().keySet(); + for (String key : keys) { + String quoteName = "," + '"' + key + '"' + ':' + '"' + attrs.get(key) + '"'; + fullEventJson += quoteName; + } + fullEventJson += "}"; + + // Calculate size of event JSON (in bytes) and add it to the currentUncompressedBatchSize + + currentUncompressedBatchSize += (fullEventJson.getBytes(StandardCharsets.UTF_8).length); + + if (currentUncompressedBatchSize > MAX_UNCOMPRESSED_BATCH_SIZE) { + EventBatch e = new EventBatch(eventsForBatch, this.commonAttributes); + batches.add(e); + eventsForBatch = new ArrayList<>(); + currentUncompressedBatchSize = fullEventJson.getBytes(StandardCharsets.UTF_8).length; + } + eventsForBatch.add(event); + } + + batches.add(new EventBatch(eventsForBatch, this.commonAttributes)); + return batches; + } + + /** + * Creates an {@link ArrayList} by calling {@link #createSingleBatch()} or {@link + * #createBatches()}. This depends on if the user wants to split batches on size limit or not + * (splitBatch). If splitBatch = false, {@link #createSingleBatch()} is called. If splitBatch = + * true, {@link #createBatches()} is called. + * + * @return An {@link ArrayList}. Each {@link EventBatch} in the ArrayList contains an + * immutable collection of {@link Event Events}. + */ + public ArrayList createBatch() { + ArrayList batches = new ArrayList(); + if (splitBatch == false) { + EventBatch singleEventBatch = createSingleBatch(); + batches.add(singleEventBatch); + return batches; + } else { + batches = createBatches(); + return batches; + } + } + Queue getEvents() { return events; } diff --git a/telemetry-core/src/main/java/com/newrelic/telemetry/events/json/EventBatchMarshaller.java b/telemetry-core/src/main/java/com/newrelic/telemetry/events/json/EventBatchMarshaller.java index 68342bd3..b63c13e1 100644 --- a/telemetry-core/src/main/java/com/newrelic/telemetry/events/json/EventBatchMarshaller.java +++ b/telemetry-core/src/main/java/com/newrelic/telemetry/events/json/EventBatchMarshaller.java @@ -46,7 +46,7 @@ public String toJson(EventBatch batch) { return builder.toString(); } - static String mapToJson(Event event) { + public static String mapToJson(Event event) { try { StringWriter out = new StringWriter(); JsonWriter jsonWriter = new JsonWriter(out); diff --git a/telemetry-core/src/main/java/com/newrelic/telemetry/metrics/MetricBatchSender.java b/telemetry-core/src/main/java/com/newrelic/telemetry/metrics/MetricBatchSender.java index d0232985..130e11fa 100644 --- a/telemetry-core/src/main/java/com/newrelic/telemetry/metrics/MetricBatchSender.java +++ b/telemetry-core/src/main/java/com/newrelic/telemetry/metrics/MetricBatchSender.java @@ -20,6 +20,7 @@ import com.newrelic.telemetry.util.Utils; import java.net.MalformedURLException; import java.net.URL; +import java.util.ArrayList; import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,10 +60,35 @@ public Response sendBatch(MetricBatch batch) throws ResponseException { logger.debug( "Sending a metric batch (number of metrics: {}) to the New Relic metric ingest endpoint)", batch.size()); + String json = marshaller.toJson(batch); return sender.send(json, batch); } + /** + * Send batches of metrics to New Relic. + * + * @param metricBatches An ArrayList where each element is an MetricBatch. Each MetricBatch will + * be sent to New Relic one by one. Each batch will be drained of accumulated events as a part + * of this process. + * @return The response from the ingest API. + * @throws ResponseException In cases where the batch is unable to be successfully sent, one of + * the subclasses of {@link ResponseException} will be thrown. See the documentation on that + * hierarchy for details on the recommended ways to respond to those exceptions. + */ + public Response sendBatch(ArrayList metricBatches) throws ResponseException { + Response response = new Response(200, "success", "sent data"); + for (MetricBatch batch : metricBatches) { + response = sendBatch(batch); + + // If there is an issue with a batch, stop sending batches + if ((response.getStatusCode() != 200) && (response.getStatusCode() != 202)) { + return response; + } + } + return response; + } + /** * Creates a new MetricBatchSender with the given supplier of HttpPoster impl and a BaseConfig * instance, with all configuration NOT in BaseConfig being default. diff --git a/telemetry-core/src/main/java/com/newrelic/telemetry/metrics/MetricBuffer.java b/telemetry-core/src/main/java/com/newrelic/telemetry/metrics/MetricBuffer.java index 1c3b7a88..eaa8227e 100644 --- a/telemetry-core/src/main/java/com/newrelic/telemetry/metrics/MetricBuffer.java +++ b/telemetry-core/src/main/java/com/newrelic/telemetry/metrics/MetricBuffer.java @@ -5,12 +5,12 @@ package com.newrelic.telemetry.metrics; import com.newrelic.telemetry.Attributes; +import com.newrelic.telemetry.json.AttributesJson; +import com.newrelic.telemetry.metrics.json.MetricToJsonStatic; import com.newrelic.telemetry.util.IngestWarnings; import com.newrelic.telemetry.util.Utils; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Map; -import java.util.Queue; +import java.nio.charset.StandardCharsets; +import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; import org.slf4j.LoggerFactory; @@ -28,6 +28,7 @@ public final class MetricBuffer { private final Queue metrics = new ConcurrentLinkedQueue<>(); private final IngestWarnings ingestWarnings = new IngestWarnings(); private final Attributes commonAttributes; + private final boolean splitBatch; /** * Create a new buffer with the provided common set of attributes. @@ -36,7 +37,19 @@ public final class MetricBuffer { * {@link Metric} in this buffer. */ public MetricBuffer(Attributes commonAttributes) { + this(commonAttributes, false); + } + + /** + * Create a new buffer with the provided common set of attributes. + * + * @param commonAttributes These attributes will be appended (by the New Relic backend) to every + * {@link Metric} in this buffer. + * @param splitOnSizeLimit Flag to indicate whether to split batch when size limit is hit. + */ + public MetricBuffer(Attributes commonAttributes, boolean splitOnSizeLimit) { this.commonAttributes = Utils.verifyNonNull(commonAttributes); + this.splitBatch = splitOnSizeLimit; } /** @@ -80,6 +93,7 @@ public void addMetric(Summary summaryMetric) { public int size() { return metrics.size(); } + /** * Creates a new {@link MetricBatch} from the contents of this buffer, then clears the contents of * this buffer. @@ -90,7 +104,7 @@ public int size() { * * @return A new {@link MetricBatch} with an immutable collection of {@link Metric Metrics}. */ - public MetricBatch createBatch() { + public MetricBatch createSingleBatch() { logger.debug("Creating metric batch."); Collection metrics = new ArrayList<>(this.metrics.size()); @@ -103,6 +117,97 @@ public MetricBatch createBatch() { return new MetricBatch(metrics, this.commonAttributes); } + /** + * Creates an {@link ArrayList} from the contents of this buffer, then clears the + * contents of this buffer. + * + *

{@link Metric Metrics} are added to a MetricBatch. When each metric is added, the size (in + * bytes) of the metric is calculated. When the total size of the metrics in the batch exceeds the + * MAX_UNCOMPRESSED_BATCH_SIZE, the current batch is sent to New Relic, and a new MetricBatch is + * created. This process repeats until all metrics are removed from the queue. + * + * @return An {@link ArrayList}. Each {@link MetricBatch} in the ArrayList contains + * an immutable collection of {@link Metric Metrics}. + */ + public ArrayList createBatches() { + logger.debug("Creating metric batch."); + ArrayList metricBatches = new ArrayList<>(); + Collection metricsInBatch = new ArrayList<>(); + + int currentUncompressedBatchSize = 0; + int MAX_UNCOMPRESSED_BATCH_SIZE = 180000000; + + // Construct JSON for common attributes and add to uncompressed batch size + + AttributesJson attrsJson = new AttributesJson(); + StringBuilder commonAttributeSb = new StringBuilder(); + commonAttributeSb + .append("\"common\":") + .append("{") + .append("\"attributes\":") + .append(attrsJson.toJson(commonAttributes.asMap())) + .append("}"); + String commonJson = commonAttributeSb.toString(); + + currentUncompressedBatchSize += commonJson.getBytes(StandardCharsets.UTF_8).length; + + // JSON generation + calculating payload size + + Metric curMetric; + while ((curMetric = this.metrics.poll()) != null) { + + if (currentUncompressedBatchSize > MAX_UNCOMPRESSED_BATCH_SIZE) { + MetricBatch m = new MetricBatch(metricsInBatch, this.commonAttributes); + metricBatches.add(m); + metricsInBatch = new ArrayList<>(); + currentUncompressedBatchSize = 0; + } + + if (curMetric instanceof Count) { + Count curMetricToCount = (Count) curMetric; + String countJson = MetricToJsonStatic.writeCountJson(curMetricToCount); + currentUncompressedBatchSize += countJson.getBytes(StandardCharsets.UTF_8).length; + } + + if (curMetric instanceof Gauge) { + Gauge curMetricToGauge = (Gauge) curMetric; + String gaugeJson = MetricToJsonStatic.writeGaugeJson(curMetricToGauge); + currentUncompressedBatchSize += gaugeJson.getBytes(StandardCharsets.UTF_8).length; + } + + if (curMetric instanceof Summary) { + Summary curMetricToSummary = (Summary) curMetric; + String summaryJson = MetricToJsonStatic.writeSummaryJson(curMetricToSummary); + currentUncompressedBatchSize += summaryJson.getBytes(StandardCharsets.UTF_8).length; + } + + metricsInBatch.add(curMetric); + } + metricBatches.add(new MetricBatch(metricsInBatch, this.commonAttributes)); + return metricBatches; + } + + /** + * Creates an {@link ArrayList} by calling {@link #createSingleBatch()} or {@link + * #createBatches()}. This depends on if the user wants to split batches on size limit or not + * (splitBatch). If splitBatch = false, {@link #createSingleBatch()} is called. If splitBatch = + * true, {@link #createBatches()} is called. + * + * @return An {@link ArrayList}. Each {@link MetricBatch} in the ArrayList contains + * an immutable collection of {@link Metric Metrics}. + */ + public ArrayList createBatch() { + ArrayList batches = new ArrayList(); + if (splitBatch == false) { + MetricBatch singleEventBatch = createSingleBatch(); + batches.add(singleEventBatch); + return batches; + } else { + batches = createBatches(); + return batches; + } + } + Queue getMetrics() { return metrics; } diff --git a/telemetry-core/src/main/java/com/newrelic/telemetry/metrics/json/MetricToJsonStatic.java b/telemetry-core/src/main/java/com/newrelic/telemetry/metrics/json/MetricToJsonStatic.java new file mode 100644 index 00000000..71322d5b --- /dev/null +++ b/telemetry-core/src/main/java/com/newrelic/telemetry/metrics/json/MetricToJsonStatic.java @@ -0,0 +1,101 @@ +package com.newrelic.telemetry.metrics.json; + +import com.google.gson.stream.JsonWriter; +import com.newrelic.telemetry.json.AttributesJson; +import com.newrelic.telemetry.metrics.Count; +import com.newrelic.telemetry.metrics.Gauge; +import com.newrelic.telemetry.metrics.Summary; +import java.io.IOException; +import java.io.StringWriter; + +/** + * This class turns Metrics into JSON via an embedded JsonWriter from the gson project. /** Note: + * This is the same as MetricToJson.java, except all methods are static. This is for the + * createBatches() method in the metric buffer. * + */ +public class MetricToJsonStatic { + + private static final AttributesJson attributeJson = new AttributesJson(); + + public static String writeSummaryJson(Summary summary) { + try { + StringWriter out = new StringWriter(); + JsonWriter jsonWriter = new JsonWriter(out); + jsonWriter.beginObject(); + jsonWriter.name("name").value(summary.getName()); + jsonWriter.name("type").value("summary"); + + jsonWriter.name("value"); + jsonWriter.beginObject(); + jsonWriter.name("count").value(summary.getCount()); + jsonWriter.name("sum").value(summary.getSum()); + jsonWriter.name("min"); + writeDouble(jsonWriter, summary.getMin()); + jsonWriter.name("max"); + writeDouble(jsonWriter, summary.getMax()); + jsonWriter.endObject(); + + jsonWriter.name("timestamp").value(summary.getStartTimeMs()); + jsonWriter.name("interval.ms").value(summary.getEndTimeMs() - summary.getStartTimeMs()); + String attributes = attributeJson.toJson(summary.getAttributes()); + if (!attributes.isEmpty()) { + jsonWriter.name("attributes").jsonValue(attributes); + } + jsonWriter.endObject(); + return out.toString(); + } catch (IOException e) { + throw new RuntimeException("Failed to generate summary json", e); + } + } + + public static String writeGaugeJson(Gauge gauge) { + try { + StringWriter out = new StringWriter(); + JsonWriter jsonWriter = new JsonWriter(out); + jsonWriter.beginObject(); + jsonWriter.name("name").value(gauge.getName()); + jsonWriter.name("type").value("gauge"); + jsonWriter.name("value").value(gauge.getValue()); + jsonWriter.name("timestamp").value(gauge.getTimestamp()); + String attributes = attributeJson.toJson(gauge.getAttributes()); + if (!attributes.isEmpty()) { + jsonWriter.name("attributes").jsonValue(attributes); + } + jsonWriter.endObject(); + return out.toString(); + } catch (IOException e) { + throw new RuntimeException("Failed to generate gauge json", e); + } + } + + public static String writeCountJson(Count count) { + try { + StringWriter out = new StringWriter(); + JsonWriter jsonWriter = new JsonWriter(out); + jsonWriter.beginObject(); + jsonWriter.name("name").value(count.getName()); + jsonWriter.name("type").value("count"); + jsonWriter.name("value").value(count.getValue()); + jsonWriter.name("timestamp").value(count.getStartTimeMs()); + jsonWriter.name("interval.ms").value(count.getEndTimeMs() - count.getStartTimeMs()); + + String attributes = attributeJson.toJson(count.getAttributes()); + if (!attributes.isEmpty()) { + jsonWriter.name("attributes").jsonValue(attributes); + } + jsonWriter.endObject(); + return out.toString(); + } catch (IOException e) { + throw new RuntimeException("Failed to generate count json"); + } + } + + private static void writeDouble(final JsonWriter jsonWriter, final double value) + throws IOException { + if (Double.isFinite(value)) { + jsonWriter.value(value); + } else { + jsonWriter.nullValue(); + } + } +} diff --git a/telemetry-core/src/main/java/com/newrelic/telemetry/transport/BatchDataSender.java b/telemetry-core/src/main/java/com/newrelic/telemetry/transport/BatchDataSender.java index 81acb849..63958c2f 100644 --- a/telemetry-core/src/main/java/com/newrelic/telemetry/transport/BatchDataSender.java +++ b/telemetry-core/src/main/java/com/newrelic/telemetry/transport/BatchDataSender.java @@ -132,7 +132,8 @@ private byte[] generatePayload(String json, String batchType) throws DiscardBatc private byte[] compressJson(String result) throws IOException { ByteArrayOutputStream compressedOutput = new ByteArrayOutputStream(); GZIPOutputStream gzipOutputStream = new GZIPOutputStream(compressedOutput); - gzipOutputStream.write(result.getBytes(StandardCharsets.UTF_8)); + gzipOutputStream.write( + result.getBytes(StandardCharsets.UTF_8)); // Used to get number of bytes in piece of data gzipOutputStream.close(); return compressedOutput.toByteArray(); } @@ -141,6 +142,7 @@ private Response sendPayload(byte[] payload, UUID requestId, String batchType) throws DiscardBatchException, RetryWithSplitException, RetryWithBackoffException, RetryWithRequestedWaitException { Map headers = new HashMap<>(); + if (useLicenseKey) { headers.put("X-License-Key", apiKey); } else { diff --git a/telemetry-core/src/test/java/com/newrelic/telemetry/events/EventBatchSenderTest.java b/telemetry-core/src/test/java/com/newrelic/telemetry/events/EventBatchSenderTest.java index b140a150..698863dc 100644 --- a/telemetry-core/src/test/java/com/newrelic/telemetry/events/EventBatchSenderTest.java +++ b/telemetry-core/src/test/java/com/newrelic/telemetry/events/EventBatchSenderTest.java @@ -21,12 +21,7 @@ import java.net.MalformedURLException; import java.net.URI; import java.net.URL; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.function.Supplier; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -53,10 +48,13 @@ void testSplitSend() throws Exception { EventBatchSender testClass = new EventBatchSender(marshaller, sender); + ArrayList testEventBatch = new ArrayList(); + testEventBatch.add(batch); + assertThrows( RetryWithSplitException.class, () -> { - testClass.sendBatch(batch); + testClass.sendBatch(testEventBatch); }); } @@ -64,7 +62,11 @@ void testSplitSend() throws Exception { void testEmptyBatch() throws Exception { EventBatchSender testClass = new EventBatchSender(null, null); EventBatch batch = new EventBatch(Collections.emptyList(), new Attributes()); - Response response = testClass.sendBatch(batch); + + ArrayList testBatchList = new ArrayList(); + testBatchList.add(batch); + + Response response = testClass.sendBatch(testBatchList); assertEquals(202, response.getStatusCode()); } @@ -92,7 +94,11 @@ public void sendBatchViaCreate() throws Exception { EventBatchSender logBatchSender = EventBatchSender.create(posterSupplier, baseConfig); - Response result = logBatchSender.sendBatch(batch); + ArrayList testBatchList = new ArrayList(); + testBatchList.add(batch); + + Response result = logBatchSender.sendBatch(testBatchList); + assertEquals(expected, result); assertTrue(((String) headersCaptor.getValue().get("User-Agent")).endsWith(" second")); } diff --git a/telemetry-core/src/test/java/com/newrelic/telemetry/events/EventBufferTest.java b/telemetry-core/src/test/java/com/newrelic/telemetry/events/EventBufferTest.java index 9881de22..d856c4b9 100644 --- a/telemetry-core/src/test/java/com/newrelic/telemetry/events/EventBufferTest.java +++ b/telemetry-core/src/test/java/com/newrelic/telemetry/events/EventBufferTest.java @@ -3,6 +3,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import com.newrelic.telemetry.Attributes; +import java.util.ArrayList; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -18,4 +19,89 @@ void testMetricsSize() { eventBuffer.addEvent(expectedEvent); assertEquals(1, eventBuffer.size()); } + + @Test + @DisplayName("Check if a single batch is created") + void testCreateOneBatch() { + long currentTimeMillis = 350; + Attributes testAttr = new Attributes(); + testAttr.put("name", "Bob"); + testAttr.put("age", 20); + + Event testEvent = new Event("testEvent", testAttr, currentTimeMillis); + EventBuffer testEventBuffer = new EventBuffer(new Attributes()); + testEventBuffer.addEvent(testEvent); + + // ArrayList testBatchesList = testEventBuffer.createBatches(); + ArrayList testBatchesList = testEventBuffer.createBatch(); + assertEquals(1, testBatchesList.size()); + } + + @Test + @DisplayName("Multiple Event Batches Not Enabled: Check if a single batch is created") + void testCreateSingleBatchWithMultipleNotEnabled() { + /** + * The uncompressed payload size for this example is 292000001 bytes. If splitOnSizeLimit = + * true, then 2 batches should be created. This is because the maximum uncompressed payload size + * for a batch is 180000000 bytes. However, since splitOnSizeLimit = false (by default), only 1 + * batch should be created. + */ + EventBuffer testEventBuffer = new EventBuffer(new Attributes()); + + Attributes attr = new Attributes(); + attr.put("name", "Bob"); + attr.put("age", 20); + attr.put("dept", "Business"); + attr.put("id", 123423543); + + for (int i = 0; i < 2000000; i++) { + long timestamp = System.currentTimeMillis(); + Event testEvent = new Event("StudentDataV2", attr, timestamp); + testEventBuffer.addEvent(testEvent); + } + + ArrayList testEventBatches = testEventBuffer.createBatch(); + assertEquals(1, testEventBatches.size()); + } + + @Test + @DisplayName("Multiple Event Batches Enabled: Check if a single batch is created") + void testCreateOneBatchWithMultipleEnabled() { + long currentTimeMillis = 350; + Attributes testAttr = new Attributes(); + testAttr.put("name", "Bob"); + testAttr.put("age", 20); + + Event testEvent = new Event("testEvent", testAttr, currentTimeMillis); + EventBuffer testEventBuffer = new EventBuffer(new Attributes(), true); + testEventBuffer.addEvent(testEvent); + + ArrayList testBatchesList = testEventBuffer.createBatch(); + assertEquals(1, testBatchesList.size()); + } + + @Test + @DisplayName("Multiple Event Batches Enabled: Check if multiple batches are created") + void createMultipleBatchesWithMultipleEnabled() { + /** + * The uncompressed payload size for this example is 292000001 bytes. Since the maximum + * uncompressed payload size for a batch is 180000000 bytes, 2 batches should be created. + */ + EventBuffer testEventBuffer = new EventBuffer(new Attributes(), true); + + Attributes attr = new Attributes(); + attr.put("name", "Bob"); + attr.put("age", 20); + attr.put("dept", "Business"); + attr.put("id", 123423543); + + for (int i = 0; i < 2000000; i++) { + long timestamp = System.currentTimeMillis(); + Event testEvent = new Event("StudentDataV2", attr, timestamp); + testEventBuffer.addEvent(testEvent); + } + + ArrayList testEventBatches = testEventBuffer.createBatch(); + assertEquals(2, testEventBatches.size()); + } } diff --git a/telemetry-core/src/test/java/com/newrelic/telemetry/metrics/MetricBatchSenderTest.java b/telemetry-core/src/test/java/com/newrelic/telemetry/metrics/MetricBatchSenderTest.java index e109d6d3..6f98fa37 100644 --- a/telemetry-core/src/test/java/com/newrelic/telemetry/metrics/MetricBatchSenderTest.java +++ b/telemetry-core/src/test/java/com/newrelic/telemetry/metrics/MetricBatchSenderTest.java @@ -19,10 +19,7 @@ import java.net.MalformedURLException; import java.net.URI; import java.net.URL; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; +import java.util.*; import java.util.function.Supplier; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -45,7 +42,10 @@ void testSimpleSend() throws Exception { MetricBatchSender testClass = new MetricBatchSender(marshaller, sender); - Response result = testClass.sendBatch(batch); + ArrayList testMetricBatch = new ArrayList(); + testMetricBatch.add(batch); + + Response result = testClass.sendBatch(testMetricBatch); assertEquals(response, result); } @@ -53,7 +53,11 @@ void testSimpleSend() throws Exception { void testEmptyBatch() throws Exception { MetricBatchSender testClass = new MetricBatchSender(null, null); MetricBatch batch = new MetricBatch(Collections.emptyList(), new Attributes()); - Response response = testClass.sendBatch(batch); + + ArrayList testMetricBatch = new ArrayList(); + testMetricBatch.add(batch); + + Response response = testClass.sendBatch(testMetricBatch); assertEquals(202, response.getStatusCode()); } @@ -81,7 +85,11 @@ public void sendBatchViaCreate() throws Exception { MetricBatchSender metricBatchSender = MetricBatchSender.create(posterSupplier, baseConfig); - Response result = metricBatchSender.sendBatch(batch); + /* New Code */ + ArrayList testMetricBatch = new ArrayList(); + testMetricBatch.add(batch); + + Response result = metricBatchSender.sendBatch(testMetricBatch); assertEquals(expected, result); assertTrue(((String) headersCaptor.getValue().get("User-Agent")).endsWith(" second")); } diff --git a/telemetry-core/src/test/java/com/newrelic/telemetry/metrics/MetricBufferTest.java b/telemetry-core/src/test/java/com/newrelic/telemetry/metrics/MetricBufferTest.java index dec20ceb..7e291dcc 100644 --- a/telemetry-core/src/test/java/com/newrelic/telemetry/metrics/MetricBufferTest.java +++ b/telemetry-core/src/test/java/com/newrelic/telemetry/metrics/MetricBufferTest.java @@ -7,6 +7,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import com.newrelic.telemetry.Attributes; +import java.util.ArrayList; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -64,9 +65,9 @@ void testNullValues() { metricBuffer.addMetric(new Summary("bar", 3, 44d, 22d, 99d, 0, 10, commonAttributes)); metricBuffer.addMetric(new Count("baz", 44f, 0, 10, commonAttributes)); - MetricBatch batch = metricBuffer.createBatch(); + ArrayList batches = metricBuffer.createBatch(); - assertEquals(commonAttributes, batch.getCommonAttributes()); + assertEquals(commonAttributes, batches.get(0).getCommonAttributes()); } @Test @@ -88,4 +89,106 @@ void testBuilder() { .build(); assertEquals(expectedAttributes, buffer.getCommonAttributes()); } + + @Test + @DisplayName("Check that one batch is created") + void testCreateOneBatch() { + + long startTimeMillis = 300; + long currentTimeMillis = 350; + double testDouble = 60.3; + + MetricBuffer testMetricBuffer = new MetricBuffer(new Attributes()); + + Attributes testAttributes = new Attributes(); + testAttributes.put("item", "Apple"); + testAttributes.put("location", "downtown"); + + Count testCount = + new Count("TestEvent", testDouble, startTimeMillis, currentTimeMillis, testAttributes); + testMetricBuffer.addMetric(testCount); + + ArrayList testBatchesList = testMetricBuffer.createBatch(); + assertEquals(1, testBatchesList.size()); + } + + @Test + @DisplayName("Multiple Metric Batches Not Enabled: Check if a single batch is created") + void testCreateSingleBatchWithMultipleNotEnabled() { + /** + * The uncompressed payload size for this example is 268000070 bytes. If splitOnSizeLimit = + * true, then 2 batches should be created. This is because the maximum uncompressed payload size + * for a batch is 180000000 bytes. However, since splitOnSizeLimit = false (by default), only 1 + * batch should be created. + */ + long startTimeMillis = 300; + long currentTimeMillis = 350; + double testDouble = 60.3; + + Attributes testAttributes = new Attributes(); + testAttributes.put("item", "Apple"); + testAttributes.put("location", "downtown"); + + MetricBuffer testMetricBuffer = new MetricBuffer(new Attributes()); + + for (int i = 0; i < 1500000; i++) { + Count testCount = + new Count("TestEvent", testDouble, startTimeMillis, currentTimeMillis, testAttributes); + testMetricBuffer.addMetric(testCount); + } + + ArrayList testEventBatches = testMetricBuffer.createBatch(); + assertEquals(1, testEventBatches.size()); + } + + @Test + @DisplayName("Multiple Metric Batches Enabled: Check if a single batch is created") + void testCreateOneBatchWithMultipleEnabled() { + + long startTimeMillis = 300; + long currentTimeMillis = 350; + double testDouble = 60.3; + + MetricBuffer testMetricBuffer = new MetricBuffer(new Attributes(), true); + + Attributes testAttributes = new Attributes(); + testAttributes.put("item", "Apple"); + testAttributes.put("location", "downtown"); + + Count testCount = + new Count("TestEvent", testDouble, startTimeMillis, currentTimeMillis, testAttributes); + testMetricBuffer.addMetric(testCount); + + ArrayList testBatchesList = testMetricBuffer.createBatch(); + assertEquals(1, testBatchesList.size()); + } + + @Test + @DisplayName("Multiple Metric Batches Enabled: Check that multiple batches are created") + void testCreateMultipleBatchesWithMultipleEnabled() { + + /** + * The uncompressed payload size for this example is 268000070 bytes (from the .createBatch() + * method). Since the maximum uncompressed payload size for a batch is 180000000 bytes, 2 + * batches should be created. + */ + long startTimeMillis = 300; + long currentTimeMillis = 350; + double testDouble = 60.3; + + Attributes testAttributes = new Attributes(); + testAttributes.put("item", "Apple"); + testAttributes.put("location", "downtown"); + + MetricBuffer testMetricBuffer = new MetricBuffer(new Attributes(), true); + + for (int i = 0; i < 1500000; i++) { + Count testCount = + new Count("TestEvent", testDouble, startTimeMillis, currentTimeMillis, testAttributes); + testMetricBuffer.addMetric(testCount); + } + + ArrayList testBatchesList = testMetricBuffer.createBatch(); + assertEquals(2, testBatchesList.size()); + } } From ba272f65bee923cd01994eb934394994b6792c6f Mon Sep 17 00:00:00 2001 From: yamnihcg Date: Tue, 3 Aug 2021 13:54:37 -0700 Subject: [PATCH 2/7] Old createBatch() method in TelemetryClientExamplee --- .../newrelic/telemetry/examples/TelemetryClientExample.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/telemetry_examples/src/main/java/com/newrelic/telemetry/examples/TelemetryClientExample.java b/telemetry_examples/src/main/java/com/newrelic/telemetry/examples/TelemetryClientExample.java index 99a41676..e3473c7b 100644 --- a/telemetry_examples/src/main/java/com/newrelic/telemetry/examples/TelemetryClientExample.java +++ b/telemetry_examples/src/main/java/com/newrelic/telemetry/examples/TelemetryClientExample.java @@ -104,12 +104,12 @@ private static void sendSampleMetrics( new Summary( "throughput", 25, 100, 1, 10, startTime, System.currentTimeMillis(), new Attributes())); - MetricBatch batch = metricBuffer.createBatch(); + //MetricBatch batch = metricBuffer.createBatch(); // The TelemetryClient uses the recommended techniques for responding to errors from the // New Relic APIs. It uses a background thread to schedule the sending, handling retries // transparently. - telemetryClient.sendBatch(batch); + //telemetryClient.sendBatch(batch); } private static Span sendSampleSpan(TelemetryClient telemetryClient, Attributes commonAttributes) { From a36b17ea7baca37d84a8ab1edd7619342fe8622b Mon Sep 17 00:00:00 2001 From: yamnihcg Date: Tue, 3 Aug 2021 13:58:03 -0700 Subject: [PATCH 3/7] Updated file with GoogleJavaFormatting --- .../newrelic/telemetry/examples/TelemetryClientExample.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/telemetry_examples/src/main/java/com/newrelic/telemetry/examples/TelemetryClientExample.java b/telemetry_examples/src/main/java/com/newrelic/telemetry/examples/TelemetryClientExample.java index e3473c7b..4646f35d 100644 --- a/telemetry_examples/src/main/java/com/newrelic/telemetry/examples/TelemetryClientExample.java +++ b/telemetry_examples/src/main/java/com/newrelic/telemetry/examples/TelemetryClientExample.java @@ -15,7 +15,6 @@ import com.newrelic.telemetry.logs.LogBatch; import com.newrelic.telemetry.metrics.Count; import com.newrelic.telemetry.metrics.Gauge; -import com.newrelic.telemetry.metrics.MetricBatch; import com.newrelic.telemetry.metrics.MetricBuffer; import com.newrelic.telemetry.metrics.Summary; import com.newrelic.telemetry.spans.Span; @@ -104,12 +103,12 @@ private static void sendSampleMetrics( new Summary( "throughput", 25, 100, 1, 10, startTime, System.currentTimeMillis(), new Attributes())); - //MetricBatch batch = metricBuffer.createBatch(); + MetricBatch batch = metricBuffer.createBatch(); // The TelemetryClient uses the recommended techniques for responding to errors from the // New Relic APIs. It uses a background thread to schedule the sending, handling retries // transparently. - //telemetryClient.sendBatch(batch); + telemetryClient.sendBatch(batch); } private static Span sendSampleSpan(TelemetryClient telemetryClient, Attributes commonAttributes) { From 07c0af8b6efc65dd311c7015c63b1f373bf4a1de Mon Sep 17 00:00:00 2001 From: yamnihcg Date: Tue, 3 Aug 2021 14:04:56 -0700 Subject: [PATCH 4/7] Updated TelemetryClientExample to ignore new MetricBatch change --- .../examples/TelemetryClientExample.java | 26 +++---------------- 1 file changed, 4 insertions(+), 22 deletions(-) diff --git a/telemetry_examples/src/main/java/com/newrelic/telemetry/examples/TelemetryClientExample.java b/telemetry_examples/src/main/java/com/newrelic/telemetry/examples/TelemetryClientExample.java index 4646f35d..e4fc3158 100644 --- a/telemetry_examples/src/main/java/com/newrelic/telemetry/examples/TelemetryClientExample.java +++ b/telemetry_examples/src/main/java/com/newrelic/telemetry/examples/TelemetryClientExample.java @@ -4,26 +4,6 @@ */ package com.newrelic.telemetry.examples; -import static java.util.Collections.singleton; - -import com.newrelic.telemetry.Attributes; -import com.newrelic.telemetry.OkHttpPoster; -import com.newrelic.telemetry.TelemetryClient; -import com.newrelic.telemetry.events.Event; -import com.newrelic.telemetry.events.EventBatch; -import com.newrelic.telemetry.logs.Log; -import com.newrelic.telemetry.logs.LogBatch; -import com.newrelic.telemetry.metrics.Count; -import com.newrelic.telemetry.metrics.Gauge; -import com.newrelic.telemetry.metrics.MetricBuffer; -import com.newrelic.telemetry.metrics.Summary; -import com.newrelic.telemetry.spans.Span; -import com.newrelic.telemetry.spans.SpanBatch; -import java.net.InetAddress; -import java.time.Duration; -import java.time.temporal.ChronoUnit; -import java.util.UUID; - /** * This example shows how to use the TelemetryClient to handle standard error conditions. * @@ -36,6 +16,7 @@ public class TelemetryClientExample { public static void main(String[] args) throws Exception { String insightsInsertKey = args[0]; + /* // create a TelemetryClient with an http connect timeout of 10 seconds. TelemetryClient telemetryClient = TelemetryClient.create( @@ -54,9 +35,10 @@ public static void main(String[] args) throws Exception { // make sure to shutdown the client, else the background Executor will stop the program from // exiting. - telemetryClient.shutdown(); + telemetryClient.shutdown(); */ } + /* private static void sendSampleLogEntry( TelemetryClient telemetryClient, Attributes commonAttributes, Span span) { Log log = @@ -123,5 +105,5 @@ private static Span sendSampleSpan(TelemetryClient telemetryClient, Attributes c SpanBatch spanBatch = new SpanBatch(singleton(sampleSpan), commonAttributes, traceId); telemetryClient.sendBatch(spanBatch); return sampleSpan; - } + } */ } From 880cfaa8b97de38b4733373c612ffdc6daee9897 Mon Sep 17 00:00:00 2001 From: yamnihcg Date: Tue, 3 Aug 2021 14:34:34 -0700 Subject: [PATCH 5/7] Updated TelemetryClientExample and Javadoc in Event/Metric buffers. --- .../telemetry/events/EventBuffer.java | 10 +++---- .../telemetry/metrics/MetricBuffer.java | 14 ++++----- .../examples/TelemetryClientExample.java | 29 +++++++++++++++---- 3 files changed, 36 insertions(+), 17 deletions(-) diff --git a/telemetry-core/src/main/java/com/newrelic/telemetry/events/EventBuffer.java b/telemetry-core/src/main/java/com/newrelic/telemetry/events/EventBuffer.java index f49c6ee7..0be919ce 100644 --- a/telemetry-core/src/main/java/com/newrelic/telemetry/events/EventBuffer.java +++ b/telemetry-core/src/main/java/com/newrelic/telemetry/events/EventBuffer.java @@ -92,15 +92,15 @@ public EventBatch createSingleBatch() { } /** - * Creates an {@link ArrayList} from the contents of this buffer, then clears the - * contents of this buffer. + * Creates an ArrayList from the contents of this buffer, then clears the contents of + * this buffer. * *

{@link Event Events} are added to an EventBatch. When each event is added, the size (in * bytes) of the event is calculated. When the total size of the events in the batch exceeds the * MAX_UNCOMPRESSED_BATCH_SIZE, the current batch is sent to New Relic, and a new EventBatch is * created. This process repeats until all events are removed from the queue. * - * @return An {@link ArrayList}. Each {@link EventBatch} in the ArrayList contains an + * @return An ArrayList. Each {@link EventBatch} in the ArrayList contains an * immutable collection of {@link Event Events}. */ public ArrayList createBatches() { @@ -147,12 +147,12 @@ public ArrayList createBatches() { } /** - * Creates an {@link ArrayList} by calling {@link #createSingleBatch()} or {@link + * Creates an ArrayList by calling {@link #createSingleBatch()} or {@link * #createBatches()}. This depends on if the user wants to split batches on size limit or not * (splitBatch). If splitBatch = false, {@link #createSingleBatch()} is called. If splitBatch = * true, {@link #createBatches()} is called. * - * @return An {@link ArrayList}. Each {@link EventBatch} in the ArrayList contains an + * @return An ArrayList. Each {@link EventBatch} in the ArrayList contains an * immutable collection of {@link Event Events}. */ public ArrayList createBatch() { diff --git a/telemetry-core/src/main/java/com/newrelic/telemetry/metrics/MetricBuffer.java b/telemetry-core/src/main/java/com/newrelic/telemetry/metrics/MetricBuffer.java index eaa8227e..7187026d 100644 --- a/telemetry-core/src/main/java/com/newrelic/telemetry/metrics/MetricBuffer.java +++ b/telemetry-core/src/main/java/com/newrelic/telemetry/metrics/MetricBuffer.java @@ -118,16 +118,16 @@ public MetricBatch createSingleBatch() { } /** - * Creates an {@link ArrayList} from the contents of this buffer, then clears the - * contents of this buffer. + * Creates an ArrayList from the contents of this buffer, then clears the contents of + * this buffer. * *

{@link Metric Metrics} are added to a MetricBatch. When each metric is added, the size (in * bytes) of the metric is calculated. When the total size of the metrics in the batch exceeds the * MAX_UNCOMPRESSED_BATCH_SIZE, the current batch is sent to New Relic, and a new MetricBatch is * created. This process repeats until all metrics are removed from the queue. * - * @return An {@link ArrayList}. Each {@link MetricBatch} in the ArrayList contains - * an immutable collection of {@link Metric Metrics}. + * @return An ArrayList. Each {@link MetricBatch} in the ArrayList contains an + * immutable collection of {@link Metric Metrics}. */ public ArrayList createBatches() { logger.debug("Creating metric batch."); @@ -188,13 +188,13 @@ public ArrayList createBatches() { } /** - * Creates an {@link ArrayList} by calling {@link #createSingleBatch()} or {@link + * Creates an ArrayList by calling {@link #createSingleBatch()} or {@link * #createBatches()}. This depends on if the user wants to split batches on size limit or not * (splitBatch). If splitBatch = false, {@link #createSingleBatch()} is called. If splitBatch = * true, {@link #createBatches()} is called. * - * @return An {@link ArrayList}. Each {@link MetricBatch} in the ArrayList contains - * an immutable collection of {@link Metric Metrics}. + * @return An ArrayList. Each {@link MetricBatch} in the ArrayList contains an + * immutable collection of {@link Metric Metrics}. */ public ArrayList createBatch() { ArrayList batches = new ArrayList(); diff --git a/telemetry_examples/src/main/java/com/newrelic/telemetry/examples/TelemetryClientExample.java b/telemetry_examples/src/main/java/com/newrelic/telemetry/examples/TelemetryClientExample.java index e4fc3158..7ca343fc 100644 --- a/telemetry_examples/src/main/java/com/newrelic/telemetry/examples/TelemetryClientExample.java +++ b/telemetry_examples/src/main/java/com/newrelic/telemetry/examples/TelemetryClientExample.java @@ -4,6 +4,27 @@ */ package com.newrelic.telemetry.examples; +import static java.util.Collections.singleton; + +import com.newrelic.telemetry.Attributes; +import com.newrelic.telemetry.OkHttpPoster; +import com.newrelic.telemetry.TelemetryClient; +import com.newrelic.telemetry.events.Event; +import com.newrelic.telemetry.events.EventBatch; +import com.newrelic.telemetry.logs.Log; +import com.newrelic.telemetry.logs.LogBatch; +import com.newrelic.telemetry.metrics.Count; +import com.newrelic.telemetry.metrics.Gauge; +import com.newrelic.telemetry.metrics.MetricBatch; +import com.newrelic.telemetry.metrics.MetricBuffer; +import com.newrelic.telemetry.metrics.Summary; +import com.newrelic.telemetry.spans.Span; +import com.newrelic.telemetry.spans.SpanBatch; +import java.net.InetAddress; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.UUID; + /** * This example shows how to use the TelemetryClient to handle standard error conditions. * @@ -16,7 +37,6 @@ public class TelemetryClientExample { public static void main(String[] args) throws Exception { String insightsInsertKey = args[0]; - /* // create a TelemetryClient with an http connect timeout of 10 seconds. TelemetryClient telemetryClient = TelemetryClient.create( @@ -35,10 +55,9 @@ public static void main(String[] args) throws Exception { // make sure to shutdown the client, else the background Executor will stop the program from // exiting. - telemetryClient.shutdown(); */ + telemetryClient.shutdown(); } - /* private static void sendSampleLogEntry( TelemetryClient telemetryClient, Attributes commonAttributes, Span span) { Log log = @@ -85,7 +104,7 @@ private static void sendSampleMetrics( new Summary( "throughput", 25, 100, 1, 10, startTime, System.currentTimeMillis(), new Attributes())); - MetricBatch batch = metricBuffer.createBatch(); + MetricBatch batch = metricBuffer.createBatch().get(0); // The TelemetryClient uses the recommended techniques for responding to errors from the // New Relic APIs. It uses a background thread to schedule the sending, handling retries @@ -105,5 +124,5 @@ private static Span sendSampleSpan(TelemetryClient telemetryClient, Attributes c SpanBatch spanBatch = new SpanBatch(singleton(sampleSpan), commonAttributes, traceId); telemetryClient.sendBatch(spanBatch); return sampleSpan; - } */ + } } From d875cd54d3354842e9f9e0eae2cc005c25a92b74 Mon Sep 17 00:00:00 2001 From: yamnihcg Date: Tue, 3 Aug 2021 14:48:35 -0700 Subject: [PATCH 6/7] Updated Javadoc in Event/Metric buffers. --- .../com/newrelic/telemetry/events/EventBuffer.java | 14 +++++++------- .../newrelic/telemetry/metrics/MetricBuffer.java | 14 +++++++------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/telemetry-core/src/main/java/com/newrelic/telemetry/events/EventBuffer.java b/telemetry-core/src/main/java/com/newrelic/telemetry/events/EventBuffer.java index 0be919ce..60aa9399 100644 --- a/telemetry-core/src/main/java/com/newrelic/telemetry/events/EventBuffer.java +++ b/telemetry-core/src/main/java/com/newrelic/telemetry/events/EventBuffer.java @@ -92,16 +92,16 @@ public EventBatch createSingleBatch() { } /** - * Creates an ArrayList from the contents of this buffer, then clears the contents of - * this buffer. + * Creates an ArrayList of EventBatch objects from the contents of this buffer, then clears the + * contents of this buffer. * *

{@link Event Events} are added to an EventBatch. When each event is added, the size (in * bytes) of the event is calculated. When the total size of the events in the batch exceeds the * MAX_UNCOMPRESSED_BATCH_SIZE, the current batch is sent to New Relic, and a new EventBatch is * created. This process repeats until all events are removed from the queue. * - * @return An ArrayList. Each {@link EventBatch} in the ArrayList contains an - * immutable collection of {@link Event Events}. + * @return An ArrayList of EventBatch objects. Each {@link EventBatch} in the ArrayList contains + * an immutable collection of {@link Event Events}. */ public ArrayList createBatches() { logger.debug("Creating Event batch."); @@ -147,13 +147,13 @@ public ArrayList createBatches() { } /** - * Creates an ArrayList by calling {@link #createSingleBatch()} or {@link + * Creates an ArrayList of EventBatch objects by calling {@link #createSingleBatch()} or {@link * #createBatches()}. This depends on if the user wants to split batches on size limit or not * (splitBatch). If splitBatch = false, {@link #createSingleBatch()} is called. If splitBatch = * true, {@link #createBatches()} is called. * - * @return An ArrayList. Each {@link EventBatch} in the ArrayList contains an - * immutable collection of {@link Event Events}. + * @return An ArrayList of EventBatch objects. Each {@link EventBatch} in the ArrayList contains + * an immutable collection of {@link Event Events}. */ public ArrayList createBatch() { ArrayList batches = new ArrayList(); diff --git a/telemetry-core/src/main/java/com/newrelic/telemetry/metrics/MetricBuffer.java b/telemetry-core/src/main/java/com/newrelic/telemetry/metrics/MetricBuffer.java index 7187026d..f800b614 100644 --- a/telemetry-core/src/main/java/com/newrelic/telemetry/metrics/MetricBuffer.java +++ b/telemetry-core/src/main/java/com/newrelic/telemetry/metrics/MetricBuffer.java @@ -118,16 +118,16 @@ public MetricBatch createSingleBatch() { } /** - * Creates an ArrayList from the contents of this buffer, then clears the contents of - * this buffer. + * Creates an ArrayList of MetricBatch objects from the contents of this buffer, then clears the + * contents of this buffer. * *

{@link Metric Metrics} are added to a MetricBatch. When each metric is added, the size (in * bytes) of the metric is calculated. When the total size of the metrics in the batch exceeds the * MAX_UNCOMPRESSED_BATCH_SIZE, the current batch is sent to New Relic, and a new MetricBatch is * created. This process repeats until all metrics are removed from the queue. * - * @return An ArrayList. Each {@link MetricBatch} in the ArrayList contains an - * immutable collection of {@link Metric Metrics}. + * @return An ArrayList of MetricBatch objects. Each {@link MetricBatch} in the ArrayList contains + * an immutable collection of {@link Metric Metrics}. */ public ArrayList createBatches() { logger.debug("Creating metric batch."); @@ -188,13 +188,13 @@ public ArrayList createBatches() { } /** - * Creates an ArrayList by calling {@link #createSingleBatch()} or {@link + * Creates an ArrayList of MetricBatch objects by calling {@link #createSingleBatch()} or {@link * #createBatches()}. This depends on if the user wants to split batches on size limit or not * (splitBatch). If splitBatch = false, {@link #createSingleBatch()} is called. If splitBatch = * true, {@link #createBatches()} is called. * - * @return An ArrayList. Each {@link MetricBatch} in the ArrayList contains an - * immutable collection of {@link Metric Metrics}. + * @return An ArrayList of MetricBatch objects. Each {@link MetricBatch} in the ArrayList contains + * an immutable collection of {@link Metric Metrics}. */ public ArrayList createBatch() { ArrayList batches = new ArrayList(); From 9c125e2af4a80905f81249cfa8c902d2c568523f Mon Sep 17 00:00:00 2001 From: yamnihcg Date: Fri, 6 Aug 2021 11:51:10 -0700 Subject: [PATCH 7/7] Simplified .createBatches() method in the MetricBuffer --- .../telemetry/events/EventBuffer.java | 31 +++++++++++++------ .../telemetry/metrics/MetricBuffer.java | 30 ++++++++++++------ 2 files changed, 41 insertions(+), 20 deletions(-) diff --git a/telemetry-core/src/main/java/com/newrelic/telemetry/events/EventBuffer.java b/telemetry-core/src/main/java/com/newrelic/telemetry/events/EventBuffer.java index 06c7f770..6e9f6d00 100644 --- a/telemetry-core/src/main/java/com/newrelic/telemetry/events/EventBuffer.java +++ b/telemetry-core/src/main/java/com/newrelic/telemetry/events/EventBuffer.java @@ -26,6 +26,7 @@ public final class EventBuffer { private final IngestWarnings ingestWarnings = new IngestWarnings(); private final Attributes commonAttributes; private final boolean splitBatch; + private static final int MAX_UNCOMPRESSED_BATCH_SIZE = 180000000; /** * Create a new buffer with the provided common set of attributes. @@ -107,7 +108,6 @@ public ArrayList createBatches() { logger.debug("Creating Event batch."); int currentUncompressedBatchSize = 0; - int MAX_UNCOMPRESSED_BATCH_SIZE = 180000000; ArrayList batches = new ArrayList<>(); Collection eventsForBatch = new ArrayList<>(); @@ -117,17 +117,10 @@ public ArrayList createBatches() { while ((event = this.events.poll()) != null) { String partialEventJson = EventBatchMarshaller.mapToJson(event); + Map attrs = getCommonAttributes().asMap(); // Insert common attributes into event JSON - - Map attrs = getCommonAttributes().asMap(); - String fullEventJson = partialEventJson.substring(0, partialEventJson.length() - 1); - Set keys = getCommonAttributes().asMap().keySet(); - for (String key : keys) { - String quoteName = "," + '"' + key + '"' + ':' + '"' + attrs.get(key) + '"'; - fullEventJson += quoteName; - } - fullEventJson += "}"; + String fullEventJson = generateFullEventJson(partialEventJson, attrs); // Calculate size of event JSON (in bytes) and add it to the currentUncompressedBatchSize @@ -146,6 +139,24 @@ public ArrayList createBatches() { return batches; } + /** + * Generates the full JSON for an Event by inserting the Event's common attributes into the JSON + * + * @param partialEventJson This is the JSON for the Event (generated by the EventBatchMarshaller). + * This is a map of all attributes that are unique to the event. + * @param attributes This is a map of the common attributes for each Event sent in a batch. + */ + public String generateFullEventJson(String partialEventJson, Map attributes) { + String fullEventJson = partialEventJson.substring(0, partialEventJson.length() - 1); + Set keys = getCommonAttributes().asMap().keySet(); + for (String key : keys) { + String quoteName = "," + '"' + key + '"' + ':' + '"' + attributes.get(key) + '"'; + fullEventJson += quoteName; + } + fullEventJson += "}"; + return fullEventJson; + } + /** * Creates an ArrayList of EventBatch objects by calling {@link #createSingleBatch()} or {@link * #createBatches()}. This depends on if the user wants to split batches on size limit or not diff --git a/telemetry-core/src/main/java/com/newrelic/telemetry/metrics/MetricBuffer.java b/telemetry-core/src/main/java/com/newrelic/telemetry/metrics/MetricBuffer.java index 917570c5..6a334ad7 100644 --- a/telemetry-core/src/main/java/com/newrelic/telemetry/metrics/MetricBuffer.java +++ b/telemetry-core/src/main/java/com/newrelic/telemetry/metrics/MetricBuffer.java @@ -29,6 +29,7 @@ public final class MetricBuffer { private final IngestWarnings ingestWarnings = new IngestWarnings(); private final Attributes commonAttributes; private final boolean splitBatch; + private static final int MAX_UNCOMPRESSED_BATCH_SIZE = 180000000; /** * Create a new buffer with the provided common set of attributes. @@ -135,19 +136,10 @@ public ArrayList createBatches() { Collection metricsInBatch = new ArrayList<>(); int currentUncompressedBatchSize = 0; - int MAX_UNCOMPRESSED_BATCH_SIZE = 180000000; // Construct JSON for common attributes and add to uncompressed batch size - AttributesJson attrsJson = new AttributesJson(); - StringBuilder commonAttributeSb = new StringBuilder(); - commonAttributeSb - .append("\"common\":") - .append("{") - .append("\"attributes\":") - .append(attrsJson.toJson(commonAttributes.asMap())) - .append("}"); - String commonJson = commonAttributeSb.toString(); + String commonJson = generateCommonJSON(); currentUncompressedBatchSize += commonJson.getBytes(StandardCharsets.UTF_8).length; @@ -187,6 +179,24 @@ public ArrayList createBatches() { return metricBatches; } + /** + * Generates a JSON String that contains all of the common attribute information. This JSON string + * is used in a MetricBatch of any size. + */ + public String generateCommonJSON() { + + AttributesJson attrsJson = new AttributesJson(); + StringBuilder commonAttributeSb = new StringBuilder(); + commonAttributeSb + .append("\"common\":") + .append("{") + .append("\"attributes\":") + .append(attrsJson.toJson(commonAttributes.asMap())) + .append("}"); + String commonAttrString = commonAttributeSb.toString(); + return commonAttrString; + } + /** * Creates an ArrayList of MetricBatch objects by calling {@link #createSingleBatch()} or {@link * #createBatches()}. This depends on if the user wants to split batches on size limit or not