Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Batching Feature / Configuration to the New Relic Telemetry SDK #278

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@

import com.newrelic.telemetry.Attributes;
import com.newrelic.telemetry.Telemetry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* An Event is ad hoc set of key-value pairs with an associated timestamp, recorded in the New Relic
* Metric API.
*/
public final class Event implements Telemetry {
private static final Logger logger = LoggerFactory.getLogger(Event.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets remove this since it is not being used

private final String eventType;
private final Attributes attributes;
private final long timestamp; // in epoch ms
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.newrelic.telemetry.util.Utils;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -56,10 +57,33 @@ public Response sendBatch(EventBatch batch) throws ResponseException {
"Sending an event batch (number of events: {}) to the New Relic event ingest endpoint)",
batch.size());
String json = marshaller.toJson(batch);

return sender.send(json, batch);
}

/**
* Send batches of events to New Relic.
*
* @param eventBatches An ArrayList where each element is an EventBatch. Each EventBatch will be
* sent to New Relic one by one. Each batch will be drained of accumulated events as a part of
* this process.
* @return The response from the ingest API.
* @throws ResponseException In cases where the batch is unable to be successfully sent, one of
* the subclasses of {@link ResponseException} will be thrown. See the documentation on that
* hierarchy for details on the recommended ways to respond to those exceptions.
*/
public Response sendBatch(ArrayList<EventBatch> eventBatches) throws ResponseException {
Response response = new Response(200, "success", "sent data");
for (EventBatch batch : eventBatches) {
response = sendBatch(batch);

// If there is an issue with a batch, stop sending batches
if ((response.getStatusCode() != 200)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking the respond code line won't be reached if there is an exception. The underlying exception from sendPayload will bubble up to the sendBatch(one batch) method here and then this whole method of sending batches will exit.

I think what we might need to do is create a pool of threads and have each batch be sent async on its own thread.

Or, we can try catch the exception and log that something went wrong. The rest of the batches in this list will still get sent (at least attempted to). I would prefer the async approach

return response;
}
}
return response;
}

/**
* Creates a new EventBatchSender with the given supplier of HttpPoster impl and a BaseConfig
* instance, with all configuration NOT in BaseConfig being default.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
package com.newrelic.telemetry.events;

import com.newrelic.telemetry.Attributes;
import com.newrelic.telemetry.events.json.EventBatchMarshaller;
import com.newrelic.telemetry.util.IngestWarnings;
import com.newrelic.telemetry.util.Utils;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.slf4j.Logger;
Expand All @@ -23,6 +25,8 @@ public final class EventBuffer {
private final Queue<Event> events = new ConcurrentLinkedQueue<>();
private final IngestWarnings ingestWarnings = new IngestWarnings();
private final Attributes commonAttributes;
private final boolean splitBatch;
private static final int MAX_UNCOMPRESSED_BATCH_SIZE = 180000000;

/**
* Create a new buffer with the provided common set of attributes.
Expand All @@ -31,7 +35,19 @@ public final class EventBuffer {
* {@link Event} in this buffer.
*/
public EventBuffer(Attributes commonAttributes) {
this(commonAttributes, false);
}

/**
* Create a new buffer with the provided common set of attributes.
*
* @param commonAttributes These attributes will be appended (by the New Relic backend) to every
* {@link Event} in this buffer.
* @param splitOnSizeLimit Flag to indicate whether to split batch when size limit is hit.
*/
public EventBuffer(Attributes commonAttributes, boolean splitOnSizeLimit) {
this.commonAttributes = Utils.verifyNonNull(commonAttributes);
this.splitBatch = splitOnSizeLimit;
}

/**
Expand Down Expand Up @@ -64,7 +80,7 @@ public int size() {
*
* @return A new {@link EventBatch} with an immutable collection of {@link Event Events}.
*/
public EventBatch createBatch() {
public EventBatch createSingleBatch() {
logger.debug("Creating Event batch.");
Collection<Event> eventsForBatch = new ArrayList<>(this.events.size());

Expand All @@ -73,10 +89,95 @@ public EventBatch createBatch() {
while ((event = this.events.poll()) != null) {
eventsForBatch.add(event);
}

return new EventBatch(eventsForBatch, this.commonAttributes);
}

/**
* Creates an ArrayList of EventBatch objects from the contents of this buffer, then clears the
* contents of this buffer.
*
* <p>{@link Event Events} are added to an EventBatch. When each event is added, the size (in
* bytes) of the event is calculated. When the total size of the events in the batch exceeds the
* MAX_UNCOMPRESSED_BATCH_SIZE, the current batch is sent to New Relic, and a new EventBatch is
* created. This process repeats until all events are removed from the queue.
*
* @return An ArrayList of EventBatch objects. Each {@link EventBatch} in the ArrayList contains
* an immutable collection of {@link Event Events}.
*/
public ArrayList<EventBatch> createBatches() {
logger.debug("Creating Event batch.");

int currentUncompressedBatchSize = 0;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: line can be removed

ArrayList<EventBatch> batches = new ArrayList<>();
Collection<Event> eventsForBatch = new ArrayList<>();

// Drain the Event buffer and return the batch
Event event;

while ((event = this.events.poll()) != null) {
String partialEventJson = EventBatchMarshaller.mapToJson(event);
Map<String, Object> attrs = getCommonAttributes().asMap();

// Insert common attributes into event JSON
String fullEventJson = generateFullEventJson(partialEventJson, attrs);

// Calculate size of event JSON (in bytes) and add it to the currentUncompressedBatchSize

currentUncompressedBatchSize += (fullEventJson.getBytes(StandardCharsets.UTF_8).length);

if (currentUncompressedBatchSize > MAX_UNCOMPRESSED_BATCH_SIZE) {
EventBatch e = new EventBatch(eventsForBatch, this.commonAttributes);
batches.add(e);
eventsForBatch = new ArrayList<>();
currentUncompressedBatchSize = fullEventJson.getBytes(StandardCharsets.UTF_8).length;
}
eventsForBatch.add(event);
}

batches.add(new EventBatch(eventsForBatch, this.commonAttributes));
return batches;
}

/**
* Generates the full JSON for an Event by inserting the Event's common attributes into the JSON
*
* @param partialEventJson This is the JSON for the Event (generated by the EventBatchMarshaller).
* This is a map of all attributes that are unique to the event.
* @param attributes This is a map of the common attributes for each Event sent in a batch.
*/
public String generateFullEventJson(String partialEventJson, Map<String, Object> attributes) {
String fullEventJson = partialEventJson.substring(0, partialEventJson.length() - 1);
Set<String> keys = getCommonAttributes().asMap().keySet();
for (String key : keys) {
String quoteName = "," + '"' + key + '"' + ':' + '"' + attributes.get(key) + '"';
fullEventJson += quoteName;
}
fullEventJson += "}";
return fullEventJson;
}

/**
* Creates an ArrayList of EventBatch objects by calling {@link #createSingleBatch()} or {@link
* #createBatches()}. This depends on if the user wants to split batches on size limit or not
* (splitBatch). If splitBatch = false, {@link #createSingleBatch()} is called. If splitBatch =
* true, {@link #createBatches()} is called.
*
* @return An ArrayList of EventBatch objects. Each {@link EventBatch} in the ArrayList contains
* an immutable collection of {@link Event Events}.
*/
public ArrayList<EventBatch> createBatch() {
ArrayList<EventBatch> batches = new ArrayList<EventBatch>();
if (splitBatch == false) {
EventBatch singleEventBatch = createSingleBatch();
batches.add(singleEventBatch);
return batches;
} else {
batches = createBatches();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return createBatches(); 😄

return batches;
}
}

Queue<Event> getEvents() {
return events;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public String toJson(EventBatch batch) {
return builder.toString();
}

static String mapToJson(Event event) {
public static String mapToJson(Event event) {
try {
StringWriter out = new StringWriter();
JsonWriter jsonWriter = new JsonWriter(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.newrelic.telemetry.util.Utils;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -59,10 +60,35 @@ public Response sendBatch(MetricBatch batch) throws ResponseException {
logger.debug(
"Sending a metric batch (number of metrics: {}) to the New Relic metric ingest endpoint)",
batch.size());

String json = marshaller.toJson(batch);
return sender.send(json, batch);
}

/**
* Send batches of metrics to New Relic.
*
* @param metricBatches An ArrayList where each element is an MetricBatch. Each MetricBatch will
* be sent to New Relic one by one. Each batch will be drained of accumulated events as a part
* of this process.
* @return The response from the ingest API.
* @throws ResponseException In cases where the batch is unable to be successfully sent, one of
* the subclasses of {@link ResponseException} will be thrown. See the documentation on that
* hierarchy for details on the recommended ways to respond to those exceptions.
*/
public Response sendBatch(ArrayList<MetricBatch> metricBatches) throws ResponseException {
Response response = new Response(200, "success", "sent data");
for (MetricBatch batch : metricBatches) {
response = sendBatch(batch);

// If there is an issue with a batch, stop sending batches
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above, this line won't be reached when a response exception is thrown

if ((response.getStatusCode() != 200) && (response.getStatusCode() != 202)) {
return response;
}
}
return response;
}

/**
* Creates a new MetricBatchSender with the given supplier of HttpPoster impl and a BaseConfig
* instance, with all configuration NOT in BaseConfig being default.
Expand Down
Loading