From e92640e552a0cec574c96332d383498b17339c92 Mon Sep 17 00:00:00 2001 From: "Fred [C] Park" Date: Tue, 4 Nov 2025 17:45:59 -0800 Subject: [PATCH] feat: new WriteOption config for capturing backpressure data --- CHANGELOG.md | 4 + README.md | 2 + client/README.md | 40 +- .../com/influxdb/client/WriteOptions.java | 70 +++- .../client/internal/AbstractWriteClient.java | 11 +- .../BackpressureBatchesBufferStrategy.java | 72 +++- .../write/events/BackpressureEvent.java | 29 +- .../client/ITCaptureBackpressureDataTest.java | 129 +++++++ .../com/influxdb/client/WriteOptionsTest.java | 270 +++++++++++++ ...BackpressureBatchesBufferStrategyTest.java | 2 +- .../flowable/BackpressureDataCaptureTest.java | 354 ++++++++++++++++++ .../BackpressureEventDataCaptureTest.java | 153 ++++++++ 12 files changed, 1115 insertions(+), 21 deletions(-) create mode 100644 client/src/test/java/com/influxdb/client/ITCaptureBackpressureDataTest.java create mode 100644 client/src/test/java/com/influxdb/client/internal/flowable/BackpressureDataCaptureTest.java create mode 100644 client/src/test/java/com/influxdb/client/write/events/BackpressureEventDataCaptureTest.java diff --git a/CHANGELOG.md b/CHANGELOG.md index dcdf474e21a..67b069979b2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## 7.4.0 [unreleased] +### Features + +- [#848](https://github.com/influxdata/influxdb-client-java/pull/848): new WriteOption config for capturing backpressure data + ## 7.3.0 [2025-05-22] ### Features diff --git a/README.md b/README.md index f9c3a7589d9..e10042a7832 100644 --- a/README.md +++ b/README.md @@ -418,6 +418,8 @@ If you have Docker running, but it is not available over localhost (e.g. you are - `INFLUXDB_PORT_API` - `INFLUXDB_2_IP` - `INFLUXDB_2_PORT_API` +- `INFLUXDB_2_ONBOARDING_IP` +- `INFLUXDB_2_ONBOARDING_PORT` ```bash $ export INFLUXDB_IP=192.168.99.100 diff --git a/client/README.md b/client/README.md index 93b0bc19b33..248a0834a6f 100644 --- a/client/README.md +++ b/client/README.md @@ -611,6 +611,10 @@ The writes are processed in batches which are configurable by `WriteOptions`: | **exponentialBase** | the base for the exponential retry delay, the next delay is computed using random exponential backoff as a random value within the interval ``retryInterval * exponentialBase^(attempts-1)`` and ``retryInterval * exponentialBase^(attempts)``. Example for ``retryInterval=5_000, exponentialBase=2, maxRetryDelay=125_000, total=5`` Retry delays are random distributed values within the ranges of ``[5_000-10_000, 10_000-20_000, 20_000-40_000, 40_000-80_000, 80_000-125_000]`` | **bufferLimit** | the maximum number of unwritten stored points | 10000 | | **backpressureStrategy** | the strategy to deal with buffer overflow | DROP_OLDEST | +| **captureBackpressureData** | whether to capture affected data points in backpressure events | false | +| **concatMapPrefetch** | the number of upstream items to prefetch for the concatMapMaybe operator | 2 | + +There is also a synchronous blocking version of `WriteApi` - [WriteApiBlocking](#writing-data-using-synchronous-blocking-api). #### Backpressure The backpressure presents the problem of what to do with a growing backlog of unconsumed data points. @@ -640,7 +644,41 @@ writeApi.listenEvents(BackpressureEvent.class, value -> { }); ``` -There is also a synchronous blocking version of `WriteApi` - [WriteApiBlocking](#writing-data-using-synchronous-blocking-api). +##### Backpressure Event Data Snapshots + +When backpressure occurs, enable `captureBackpressureData` to capture a snapshot of the affected data points from the `BackpressureEvent`. The content of this snapshot depends on the configured backpressure strategy: + +- **`DROP_OLDEST`**: The snapshot contains only the data points that will be dropped (the oldest points in the buffer). This allows you to log, persist, or handle the specific data that is being lost due to backpressure. + +- **`DROP_LATEST`**: The snapshot contains only the newest data points that are being added to the buffer. This represents the most recent data that triggered the backpressure condition. + +Logging dropped data points: +```java +WriteOptions writeOptions = WriteOptions.builder() + .backpressureStrategy(BackpressureOverflowStrategy.DROP_OLDEST) + .bufferLimit(1000) + .captureBackpressureData(true) + .build(); + +WriteApi writeApi = influxDBClient.getWriteApi(writeOptions); + +writeApi.listenEvents(BackpressureEvent.class, backpressureEvent -> { + List affectedPoints = backpressureEvent.getDroppedLineProtocol(); + + if (backpressureEvent.getReason() == BackpressureEvent.BackpressureReason.TOO_MUCH_BATCHES) { + logger.warn("Backpressure occurred. Affected {} data points:", affectedPoints.size()); + + // For DROP_OLDEST: these are the points that were dropped from the buffer + // For DROP_LATEST: these are the newest points that triggered the condition + affectedPoints.forEach(point -> logger.debug("Affected point: {}", point)); + + // Do something with affected points ie. requeue for retry + requeue(affectedPoints); + } +}); +``` + +Note: Disabling `captureBackpressureData` can improve performance when backpressure data capture is not needed. #### Writing data diff --git a/client/src/main/java/com/influxdb/client/WriteOptions.java b/client/src/main/java/com/influxdb/client/WriteOptions.java index 7710c90db55..bb0a428bc35 100644 --- a/client/src/main/java/com/influxdb/client/WriteOptions.java +++ b/client/src/main/java/com/influxdb/client/WriteOptions.java @@ -43,6 +43,8 @@ *
  • retryInterval = 5000 ms
  • *
  • jitterInterval = 0
  • *
  • bufferLimit = 10_000
  • + *
  • concatMapPrefetch = 2
  • + *
  • captureBackpressureData = false
  • * *

    * The default backpressure strategy is {@link BackpressureOverflowStrategy#DROP_OLDEST}. @@ -62,6 +64,8 @@ public final class WriteOptions implements WriteApi.RetryOptions { public static final int DEFAULT_MAX_RETRY_TIME = 180_000; public static final int DEFAULT_EXPONENTIAL_BASE = 2; public static final int DEFAULT_BUFFER_LIMIT = 10000; + public static final int DEFAULT_CONCAT_MAP_PREFETCH = 2; + public static final boolean DEFAULT_CAPTURE_BACKPRESSURE_DATA = false; /** * Default configuration with values that are consistent with Telegraf. @@ -77,8 +81,10 @@ public final class WriteOptions implements WriteApi.RetryOptions { private final int maxRetryTime; private final int exponentialBase; private final int bufferLimit; + private final int concatMapPrefetch; private final Scheduler writeScheduler; private final BackpressureOverflowStrategy backpressureStrategy; + private final boolean captureBackpressureData; /** * @return the number of data point to collect in batch @@ -171,6 +177,17 @@ public int getBufferLimit() { return bufferLimit; } + /** + * The number of upstream items to prefetch so that fresh items are ready to be mapped when a previous + * MaybeSource terminates. + * + * @return the prefetch value for concatMapMaybe operator + * @see WriteOptions.Builder#concatMapPrefetch(int) + */ + public int getConcatMapPrefetch() { + return concatMapPrefetch; + } + /** * @return The scheduler which is used for write data points. * @see WriteOptions.Builder#writeScheduler(Scheduler) @@ -189,6 +206,14 @@ public BackpressureOverflowStrategy getBackpressureStrategy() { return backpressureStrategy; } + /** + * @return whether to capture affected data points in backpressure events + * @see WriteOptions.Builder#captureBackpressureData(boolean) + */ + public boolean getCaptureBackpressureData() { + return captureBackpressureData; + } + private WriteOptions(@Nonnull final Builder builder) { Arguments.checkNotNull(builder, "WriteOptions.Builder"); @@ -202,8 +227,10 @@ private WriteOptions(@Nonnull final Builder builder) { maxRetryTime = builder.maxRetryTime; exponentialBase = builder.exponentialBase; bufferLimit = builder.bufferLimit; + concatMapPrefetch = builder.concatMapPrefetch; writeScheduler = builder.writeScheduler; backpressureStrategy = builder.backpressureStrategy; + captureBackpressureData = builder.captureBackpressureData; } /** @@ -231,8 +258,10 @@ public static class Builder { private int maxRetryTime = DEFAULT_MAX_RETRY_TIME; private int exponentialBase = DEFAULT_EXPONENTIAL_BASE; private int bufferLimit = DEFAULT_BUFFER_LIMIT; + private int concatMapPrefetch = DEFAULT_CONCAT_MAP_PREFETCH; private Scheduler writeScheduler = Schedulers.newThread(); private BackpressureOverflowStrategy backpressureStrategy = BackpressureOverflowStrategy.DROP_OLDEST; + private boolean captureBackpressureData = DEFAULT_CAPTURE_BACKPRESSURE_DATA; /** * Set the number of data point to collect in batch. @@ -339,7 +368,9 @@ public Builder maxRetryTime(final int maxRetryTime) { */ @Nonnull public Builder exponentialBase(final int exponentialBase) { - Arguments.checkPositiveNumber(exponentialBase, "exponentialBase"); + if (exponentialBase < 2) { + throw new IllegalArgumentException("Expecting a number >= 2 for exponentialBase"); + } this.exponentialBase = exponentialBase; return this; } @@ -354,11 +385,27 @@ public Builder exponentialBase(final int exponentialBase) { */ @Nonnull public Builder bufferLimit(final int bufferLimit) { - Arguments.checkNotNegativeNumber(bufferLimit, "bufferLimit"); + Arguments.checkPositiveNumber(bufferLimit, "bufferLimit"); this.bufferLimit = bufferLimit; return this; } + /** + * Set the prefetch value for the concatMapMaybe operator that processes write batches. + * + * The number of upstream items to prefetch so that fresh items are ready to be mapped when a previous + * MaybeSource terminates. + * + * @param concatMapPrefetch the prefetch value for concatMapMaybe operator (must be positive) + * @return {@code this} + */ + @Nonnull + public Builder concatMapPrefetch(final int concatMapPrefetch) { + Arguments.checkPositiveNumber(concatMapPrefetch, "concatMapPrefetch"); + this.concatMapPrefetch = concatMapPrefetch; + return this; + } + /** * Set the scheduler which is used for write data points. It is useful for disabling batch writes or * for tuning the performance. Default value is {@link Schedulers#newThread()}. @@ -389,6 +436,25 @@ public Builder backpressureStrategy(@Nonnull final BackpressureOverflowStrategy return this; } + /** + * Set whether to capture affected data points in backpressure events. + * + * When enabled, BackpressureEvent will include the specific line protocol points + * that are affected by the backpressure condition: + * - For DROP_OLDEST strategy: points that will be dropped + * - For DROP_LATEST strategy: newest points being added + * + * Disabling this can improve performance when backpressure data capture is not needed. + * + * @param captureBackpressureData whether to capture affected data points. Default is false. + * @return {@code this} + */ + @Nonnull + public Builder captureBackpressureData(final boolean captureBackpressureData) { + this.captureBackpressureData = captureBackpressureData; + return this; + } + /** * Build an instance of WriteOptions. * diff --git a/client/src/main/java/com/influxdb/client/internal/AbstractWriteClient.java b/client/src/main/java/com/influxdb/client/internal/AbstractWriteClient.java index 6b715bb1840..004f997c4e1 100644 --- a/client/src/main/java/com/influxdb/client/internal/AbstractWriteClient.java +++ b/client/src/main/java/com/influxdb/client/internal/AbstractWriteClient.java @@ -161,12 +161,15 @@ public AbstractWriteClient(@Nonnull final WriteOptions writeOptions, // .lift(new BackpressureBatchesBufferStrategy( writeOptions.getBufferLimit(), - () -> publish(new BackpressureEvent(BackpressureEvent.BackpressureReason.TOO_MUCH_BATCHES)), - writeOptions.getBackpressureStrategy())) + droppedPoints -> publish(new BackpressureEvent( + BackpressureEvent.BackpressureReason.TOO_MUCH_BATCHES, droppedPoints)), + writeOptions.getBackpressureStrategy(), + writeOptions.getCaptureBackpressureData())) // - // Use concat to process batches one by one + // Use concat to process batches with configurable prefetch // - .concatMapMaybe(new ToWritePointsMaybe(processorScheduler, writeOptions)) + .concatMapMaybe(new ToWritePointsMaybe(processorScheduler, writeOptions), + writeOptions.getConcatMapPrefetch()) .doFinally(() -> finished.set(true)) .subscribe(responseNotification -> { diff --git a/client/src/main/java/com/influxdb/client/internal/flowable/BackpressureBatchesBufferStrategy.java b/client/src/main/java/com/influxdb/client/internal/flowable/BackpressureBatchesBufferStrategy.java index 89a9ca257f7..d85bd7ce447 100644 --- a/client/src/main/java/com/influxdb/client/internal/flowable/BackpressureBatchesBufferStrategy.java +++ b/client/src/main/java/com/influxdb/client/internal/flowable/BackpressureBatchesBufferStrategy.java @@ -1,9 +1,14 @@ package com.influxdb.client.internal.flowable; +import java.util.Collections; import java.util.ArrayDeque; +import java.util.Arrays; import java.util.Deque; +import java.util.List; +import java.util.stream.Collectors; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import com.influxdb.client.internal.AbstractWriteClient; @@ -13,7 +18,6 @@ import io.reactivex.rxjava3.core.FlowableSubscriber; import io.reactivex.rxjava3.exceptions.Exceptions; import io.reactivex.rxjava3.exceptions.MissingBackpressureException; -import io.reactivex.rxjava3.functions.Action; import io.reactivex.rxjava3.internal.operators.flowable.FlowableOnBackpressureBufferStrategy; import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper; import io.reactivex.rxjava3.internal.util.BackpressureHelper; @@ -34,21 +38,32 @@ public final class BackpressureBatchesBufferStrategy implements final long bufferSize; - final Action onOverflow; + final Consumer> onOverflow; final BackpressureOverflowStrategy strategy; - public BackpressureBatchesBufferStrategy(long bufferSize, Action onOverflow, - BackpressureOverflowStrategy strategy) { + final boolean captureBackpressureData; + + public BackpressureBatchesBufferStrategy(long bufferSize, + Consumer> onOverflow, + BackpressureOverflowStrategy strategy) { + this(bufferSize, onOverflow, strategy, false); + } + + public BackpressureBatchesBufferStrategy(long bufferSize, + Consumer> onOverflow, + BackpressureOverflowStrategy strategy, + boolean captureBackpressureData) { this.bufferSize = bufferSize; this.onOverflow = onOverflow; this.strategy = strategy; + this.captureBackpressureData = captureBackpressureData; } @Override public @NonNull Subscriber apply( @NonNull final Subscriber subscriber) throws Throwable { - return new OnBackpressureBufferStrategySubscriber(subscriber, onOverflow, strategy, bufferSize); + return new OnBackpressureBufferStrategySubscriber(subscriber, onOverflow, strategy, bufferSize, captureBackpressureData); } static final class OnBackpressureBufferStrategySubscriber @@ -59,8 +74,6 @@ static final class OnBackpressureBufferStrategySubscriber final Subscriber downstream; - final Action onOverflow; - final BackpressureOverflowStrategy strategy; final long bufferSize; @@ -76,14 +89,20 @@ static final class OnBackpressureBufferStrategySubscriber volatile boolean done; Throwable error; + final Consumer> onOverflow; + + final boolean captureBackpressureData; + OnBackpressureBufferStrategySubscriber(Subscriber actual, - Action onOverflow, + Consumer> onOverflow, BackpressureOverflowStrategy strategy, - long bufferSize) { + long bufferSize, + boolean captureBackpressureData) { this.downstream = actual; this.onOverflow = onOverflow; this.strategy = strategy; this.bufferSize = bufferSize; + this.captureBackpressureData = captureBackpressureData; this.requested = new AtomicLong(); this.deque = new ArrayDeque<>(); } @@ -107,18 +126,25 @@ public void onNext(AbstractWriteClient.BatchWriteItem t) { boolean callOnOverflow = false; boolean callError = false; Deque dq = deque; + List overflowSnapshot = null; synchronized (dq) { AtomicLong size = new AtomicLong(t.length()); dq.forEach(batchWriteItem -> size.addAndGet(batchWriteItem.length())); if (size.get() > bufferSize) { switch (strategy) { case DROP_LATEST: + if (captureBackpressureData) { + overflowSnapshot = captureBatch(t); + } dq.pollLast(); dq.offer(t); callOnOverflow = true; break; case DROP_OLDEST: - dq.poll(); + AbstractWriteClient.BatchWriteItem droppedBatch = dq.poll(); + if (captureBackpressureData) { + overflowSnapshot = captureBatch(droppedBatch); + } dq.offer(t); callOnOverflow = true; break; @@ -135,7 +161,13 @@ public void onNext(AbstractWriteClient.BatchWriteItem t) { if (callOnOverflow) { if (onOverflow != null) { try { - onOverflow.run(); + List droppedPoints; + if (captureBackpressureData) { + droppedPoints = overflowSnapshot; + } else { + droppedPoints = Collections.emptyList(); + } + onOverflow.accept(droppedPoints); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); upstream.cancel(); @@ -150,6 +182,24 @@ public void onNext(AbstractWriteClient.BatchWriteItem t) { } } + /** + * Captures snapshot of a single batch item for overflow handling. + * + * @param item the batch item to capture + * @return list of line protocol points from the item + */ + List captureBatch(AbstractWriteClient.BatchWriteItem item) { + String lp = item.toLineProtocol(); + if (lp == null || lp.isEmpty()) { + return Collections.emptyList(); + } + + return Arrays.stream(lp.split("\n")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toList()); + } + @Override public void onError(Throwable t) { if (done) { diff --git a/client/src/main/java/com/influxdb/client/write/events/BackpressureEvent.java b/client/src/main/java/com/influxdb/client/write/events/BackpressureEvent.java index d6fd03b48ea..619d2933c4b 100644 --- a/client/src/main/java/com/influxdb/client/write/events/BackpressureEvent.java +++ b/client/src/main/java/com/influxdb/client/write/events/BackpressureEvent.java @@ -21,9 +21,12 @@ */ package com.influxdb.client.write.events; +import java.util.Collections; +import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import com.influxdb.utils.Arguments; @@ -35,6 +38,7 @@ public final class BackpressureEvent extends AbstractWriteEvent { private final BackpressureReason reason; + private final List droppedLineProtocol; public enum BackpressureReason { /** @@ -51,17 +55,29 @@ public enum BackpressureReason { private static final Logger LOG = Logger.getLogger(BackpressureEvent.class.getName()); public BackpressureEvent(@Nonnull final BackpressureReason reason) { + this(reason, null); + } + + public BackpressureEvent(@Nonnull final BackpressureReason reason, + @Nullable final List droppedLineProtocol) { Arguments.checkNotNull(reason, "reason"); this.reason = reason; + this.droppedLineProtocol = droppedLineProtocol != null + ? Collections.unmodifiableList(droppedLineProtocol) + : Collections.emptyList(); } @Override public void logEvent() { + String message = String.format("Backpressure[%s] applied, try increase WriteOptions.bufferLimit.", reason); - LOG.log(Level.WARNING, - String.format("Backpressure[%s] applied, try increase WriteOptions.bufferLimit", reason)); + if (!droppedLineProtocol.isEmpty()) { + message += String.format(" Buffer contains %d line protocol points.", droppedLineProtocol.size()); + } + + LOG.log(Level.WARNING, message); } /** @@ -71,4 +87,13 @@ public void logEvent() { public BackpressureReason getReason() { return reason; } + + /** + * @return unmodifiable list of line protocol points in the buffer at the time + * of the backpressure event + */ + @Nonnull + public List getDroppedLineProtocol() { + return droppedLineProtocol; + } } \ No newline at end of file diff --git a/client/src/test/java/com/influxdb/client/ITCaptureBackpressureDataTest.java b/client/src/test/java/com/influxdb/client/ITCaptureBackpressureDataTest.java new file mode 100644 index 00000000000..04cdfb0fd08 --- /dev/null +++ b/client/src/test/java/com/influxdb/client/ITCaptureBackpressureDataTest.java @@ -0,0 +1,129 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.client; + +import com.influxdb.client.domain.WritePrecision; +import com.influxdb.client.internal.AbstractInfluxDBClientTest; +import com.influxdb.client.write.Point; +import com.influxdb.client.write.events.BackpressureEvent; +import okhttp3.mockwebserver.MockResponse; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.stream.Collectors; + +/** + * @author Fred Park (fredjoonpark@github) + */ +class ITCaptureBackpressureDataTest extends AbstractInfluxDBClientTest { + + private WriteApi writeApi; + + @AfterEach + void tearDown() { + if (writeApi != null) { + writeApi.close(); + } + } + + @Test + void backpressureEventContainsDroppedLineProtocol() throws InterruptedException { + List backpressureEvents = new CopyOnWriteArrayList<>(); + + WriteOptions writeOptions = WriteOptions.builder() + .batchSize(2) // Small batches + .bufferLimit(4) + .flushInterval(100_000) // Very long flush - batches only created when full + .maxRetries(1) + .retryInterval(100) + .maxRetryDelay(100) + .concatMapPrefetch(1) + .captureBackpressureData(true) + .build(); + + writeApi = influxDBClient.makeWriteApi(writeOptions); + + // Listen for backpressure events + writeApi.listenEvents(BackpressureEvent.class, event -> { + System.out.println(">>> Backpressure event received: " + event.getReason() + + " (buffered items: " + event.getDroppedLineProtocol().size() + ")"); + backpressureEvents.add(event); + }); + + // Initial request + prefetched batch = 4 points + // 2 batches loaded into buffer = 4 points + // Write 2 more points (10 points total) to trigger flush and backpressure + for (int i = 0; i < 10; i++) { + Point point = Point.measurement("temperature") + .addTag("location", "room1") + .addField("value", 20.0 + i) + .time(System.nanoTime() + i, WritePrecision.NS); + writeApi.writePoint("my-bucket", "my-org", point); + } + + Assertions.assertThat(backpressureEvents) + .as("Should have backpressure events") + .isNotEmpty(); + + boolean hasBufferedData = backpressureEvents.stream() + .anyMatch(event -> !event.getDroppedLineProtocol().isEmpty()); + Assertions.assertThat(hasBufferedData) + .as("Backpressure event should contain buffered line protocol") + .isTrue(); + + long eventsWithData = backpressureEvents.stream() + .filter(event -> !event.getDroppedLineProtocol().isEmpty()) + .count(); + + List batchBackpressure = backpressureEvents.stream() + .filter(e -> !e.getDroppedLineProtocol().isEmpty()) + .collect(Collectors.toList()); + Assertions.assertThat(batchBackpressure) + .as("Should have TOO_MUCH_BATCHES backpressure with buffered data") + .isNotEmpty(); + + // Verify the buffered line protocol is valid + for (BackpressureEvent event : batchBackpressure) { + System.out.println(" Reason: " + event.getReason()); + System.out.println(" Buffered points count: " + event.getDroppedLineProtocol().size()); + + Assertions.assertThat(event.getDroppedLineProtocol().size()) + .as("Should have buffered points") + .isGreaterThan(0); + + // Verify each line protocol string is valid + for (String lineProtocol : event.getDroppedLineProtocol()) { + Assertions.assertThat(lineProtocol) + .as("Line protocol should not be empty") + .isNotEmpty(); + + Assertions.assertThat(lineProtocol) + .as("Line protocol should contain measurement name") + .contains("temperature"); + } + } + } +} diff --git a/client/src/test/java/com/influxdb/client/WriteOptionsTest.java b/client/src/test/java/com/influxdb/client/WriteOptionsTest.java index 5afe2c8735e..22f35a30fb9 100644 --- a/client/src/test/java/com/influxdb/client/WriteOptionsTest.java +++ b/client/src/test/java/com/influxdb/client/WriteOptionsTest.java @@ -45,8 +45,10 @@ void defaults() { Assertions.assertThat(writeOptions.getMaxRetryTime()).isEqualTo(180_000); Assertions.assertThat(writeOptions.getMaxRetryDelay()).isEqualTo(125_000); Assertions.assertThat(writeOptions.getExponentialBase()).isEqualTo(2); + Assertions.assertThat(writeOptions.getConcatMapPrefetch()).isEqualTo(2); Assertions.assertThat(writeOptions.getWriteScheduler()).isEqualTo(Schedulers.newThread()); Assertions.assertThat(writeOptions.getBackpressureStrategy()).isEqualTo(BackpressureOverflowStrategy.DROP_OLDEST); + Assertions.assertThat(writeOptions.getCaptureBackpressureData()).isFalse(); } @Test @@ -61,8 +63,10 @@ void configure() { .maxRetries(5) .maxRetryDelay(250_123) .exponentialBase(2) + .concatMapPrefetch(5) .writeScheduler(Schedulers.computation()) .backpressureStrategy(BackpressureOverflowStrategy.ERROR) + .captureBackpressureData(true) .build(); Assertions.assertThat(writeOptions.getBatchSize()).isEqualTo(10_000); @@ -73,7 +77,273 @@ void configure() { Assertions.assertThat(writeOptions.getMaxRetries()).isEqualTo(5); Assertions.assertThat(writeOptions.getMaxRetryDelay()).isEqualTo(250_123); Assertions.assertThat(writeOptions.getExponentialBase()).isEqualTo(2); + Assertions.assertThat(writeOptions.getConcatMapPrefetch()).isEqualTo(5); Assertions.assertThat(writeOptions.getWriteScheduler()).isEqualTo(Schedulers.computation()); Assertions.assertThat(writeOptions.getBackpressureStrategy()).isEqualTo(BackpressureOverflowStrategy.ERROR); + Assertions.assertThat(writeOptions.getCaptureBackpressureData()).isTrue(); + } + + @Test + void batchSizeEdgeCases() { + // Minimum valid batch size (1) + WriteOptions minBatch = WriteOptions.builder().batchSize(1).build(); + Assertions.assertThat(minBatch.getBatchSize()).isEqualTo(1); + + // Zero batch size should throw exception + Assertions.assertThatThrownBy(() -> WriteOptions.builder().batchSize(0).build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("batchSize"); + + // Negative batch size should throw exception + Assertions.assertThatThrownBy(() -> WriteOptions.builder().batchSize(-1).build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("batchSize"); + + // Very large batch size + WriteOptions largeBatch = WriteOptions.builder().batchSize(Integer.MAX_VALUE).build(); + Assertions.assertThat(largeBatch.getBatchSize()).isEqualTo(Integer.MAX_VALUE); + } + + @Test + void bufferLimitEdgeCases() { + // Minimum valid buffer limit (1) + WriteOptions minBuffer = WriteOptions.builder().bufferLimit(1).build(); + Assertions.assertThat(minBuffer.getBufferLimit()).isEqualTo(1); + + // Zero buffer limit should throw exception + Assertions.assertThatThrownBy(() -> WriteOptions.builder().bufferLimit(0).build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("bufferLimit"); + + // Negative buffer limit should throw exception + Assertions.assertThatThrownBy(() -> WriteOptions.builder().bufferLimit(-100).build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("bufferLimit"); + + // Very large buffer limit + WriteOptions largeBuffer = WriteOptions.builder().bufferLimit(Integer.MAX_VALUE).build(); + Assertions.assertThat(largeBuffer.getBufferLimit()).isEqualTo(Integer.MAX_VALUE); + } + + @Test + void flushIntervalEdgeCases() { + // Minimum valid flush interval (1ms) + WriteOptions minFlush = WriteOptions.builder().flushInterval(1).build(); + Assertions.assertThat(minFlush.getFlushInterval()).isEqualTo(1); + + // Zero flush interval should throw exception + Assertions.assertThatThrownBy(() -> WriteOptions.builder().flushInterval(0).build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("flushInterval"); + + // Negative flush interval should throw exception + Assertions.assertThatThrownBy(() -> WriteOptions.builder().flushInterval(-500).build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("flushInterval"); + + // Very large flush interval + WriteOptions largeFlush = WriteOptions.builder().flushInterval(Integer.MAX_VALUE).build(); + Assertions.assertThat(largeFlush.getFlushInterval()).isEqualTo(Integer.MAX_VALUE); + } + + @Test + void jitterIntervalEdgeCases() { + // Zero jitter interval is valid (no jitter) + WriteOptions noJitter = WriteOptions.builder().jitterInterval(0).build(); + Assertions.assertThat(noJitter.getJitterInterval()).isEqualTo(0); + + // Negative jitter interval should throw exception + Assertions.assertThatThrownBy(() -> WriteOptions.builder().jitterInterval(-1).build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("jitterInterval"); + + // Very large jitter interval + WriteOptions largeJitter = WriteOptions.builder().jitterInterval(Integer.MAX_VALUE).build(); + Assertions.assertThat(largeJitter.getJitterInterval()).isEqualTo(Integer.MAX_VALUE); + } + + @Test + void retryIntervalEdgeCases() { + // Minimum valid retry interval (1ms) + WriteOptions minRetry = WriteOptions.builder().retryInterval(1).build(); + Assertions.assertThat(minRetry.getRetryInterval()).isEqualTo(1); + + // Zero retry interval should throw exception + Assertions.assertThatThrownBy(() -> WriteOptions.builder().retryInterval(0).build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("retryInterval"); + + // Negative retry interval should throw exception + Assertions.assertThatThrownBy(() -> WriteOptions.builder().retryInterval(-1000).build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("retryInterval"); + } + + @Test + void maxRetriesEdgeCases() { + // Zero retries (no retry) + WriteOptions noRetry = WriteOptions.builder().maxRetries(1).build(); + Assertions.assertThat(noRetry.getMaxRetries()).isEqualTo(1); + + // Negative retries should throw exception + Assertions.assertThatThrownBy(() -> WriteOptions.builder().maxRetries(-1).build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("maxRetries"); + + // Very large number of retries + WriteOptions manyRetries = WriteOptions.builder().maxRetries(1000).build(); + Assertions.assertThat(manyRetries.getMaxRetries()).isEqualTo(1000); + } + + @Test + void maxRetryDelayEdgeCases() { + // Minimum valid retry delay (1ms) + WriteOptions minDelay = WriteOptions.builder().maxRetryDelay(1).build(); + Assertions.assertThat(minDelay.getMaxRetryDelay()).isEqualTo(1); + + // Zero retry delay should throw exception + Assertions.assertThatThrownBy(() -> WriteOptions.builder().maxRetryDelay(0).build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("maxRetryDelay"); + + // Negative retry delay should throw exception + Assertions.assertThatThrownBy(() -> WriteOptions.builder().maxRetryDelay(-5000).build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("maxRetryDelay"); + } + + @Test + void exponentialBaseEdgeCases() { + // Minimum valid exponential base (2) + WriteOptions minBase = WriteOptions.builder().exponentialBase(2).build(); + Assertions.assertThat(minBase.getExponentialBase()).isEqualTo(2); + + // Base of 1 should throw exception (no exponential growth) + Assertions.assertThatThrownBy(() -> WriteOptions.builder().exponentialBase(1).build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("exponentialBase"); + + // Base less than 2 should throw exception + Assertions.assertThatThrownBy(() -> WriteOptions.builder().exponentialBase(0).build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("exponentialBase"); + + // Negative base should throw exception + Assertions.assertThatThrownBy(() -> WriteOptions.builder().exponentialBase(-2).build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("exponentialBase"); + + // Large exponential base + WriteOptions largeBase = WriteOptions.builder().exponentialBase(10).build(); + Assertions.assertThat(largeBase.getExponentialBase()).isEqualTo(10); + } + + @Test + void concatMapPrefetchEdgeCases() { + // Minimum valid prefetch (1) + WriteOptions minPrefetch = WriteOptions.builder().concatMapPrefetch(1).build(); + Assertions.assertThat(minPrefetch.getConcatMapPrefetch()).isEqualTo(1); + + // Negative prefetch should throw exception + Assertions.assertThatThrownBy(() -> WriteOptions.builder().concatMapPrefetch(-5).build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("concatMapPrefetch"); + + // Very large prefetch + WriteOptions largePrefetch = WriteOptions.builder().concatMapPrefetch(1000).build(); + Assertions.assertThat(largePrefetch.getConcatMapPrefetch()).isEqualTo(1000); + } + + @Test + void allBackpressureStrategies() { + // Test all available backpressure strategies + for (BackpressureOverflowStrategy strategy : BackpressureOverflowStrategy.values()) { + WriteOptions options = WriteOptions.builder() + .backpressureStrategy(strategy) + .build(); + Assertions.assertThat(options.getBackpressureStrategy()).isEqualTo(strategy); + } + } + + @Test + void bufferLimitSmallerThanBatchSize() { + // Buffer limit can be smaller than batch size (valid configuration) + WriteOptions options = WriteOptions.builder() + .batchSize(1000) + .bufferLimit(500) + .build(); + + Assertions.assertThat(options.getBatchSize()).isEqualTo(1000); + Assertions.assertThat(options.getBufferLimit()).isEqualTo(500); + } + + @Test + void maxRetryTimeBoundaries() { + // Test maxRetryTime edge cases + WriteOptions minTime = WriteOptions.builder().maxRetryTime(1).build(); + Assertions.assertThat(minTime.getMaxRetryTime()).isEqualTo(1); + + // Zero max retry time should throw exception + Assertions.assertThatThrownBy(() -> WriteOptions.builder().maxRetryTime(0).build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("maxRetryTime"); + + // Negative max retry time should throw exception + Assertions.assertThatThrownBy(() -> WriteOptions.builder().maxRetryTime(-1).build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("maxRetryTime"); + + // Very large max retry time + WriteOptions largeTime = WriteOptions.builder().maxRetryTime(Integer.MAX_VALUE).build(); + Assertions.assertThat(largeTime.getMaxRetryTime()).isEqualTo(Integer.MAX_VALUE); + } + + @Test + void captureBackpressureDataBothStates() { + // Test both true and false states + WriteOptions captureTrue = WriteOptions.builder().captureBackpressureData(true).build(); + Assertions.assertThat(captureTrue.getCaptureBackpressureData()).isTrue(); + + WriteOptions captureFalse = WriteOptions.builder().captureBackpressureData(false).build(); + Assertions.assertThat(captureFalse.getCaptureBackpressureData()).isFalse(); + } + + @Test + void multipleBuilderCalls() { + // Test that builder can be reused and values overridden + WriteOptions.Builder builder = WriteOptions.builder(); + + builder.batchSize(100); + builder.batchSize(200); // Override + + WriteOptions options = builder.build(); + Assertions.assertThat(options.getBatchSize()).isEqualTo(200); + } + + @Test + void concatMapPrefetchValidation() { + // Test that concatMapPrefetch must be positive + Assertions.assertThatThrownBy(() -> WriteOptions.builder().concatMapPrefetch(0).build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("concatMapPrefetch"); + + WriteOptions options10 = WriteOptions.builder().concatMapPrefetch(10).build(); + Assertions.assertThat(options10.getConcatMapPrefetch()).isEqualTo(10); + } + + @Test + void backpressureConfiguration() { + // Test that backpressure options can be configured together + WriteOptions options = WriteOptions.builder() + .batchSize(100) + .bufferLimit(500) + .backpressureStrategy(BackpressureOverflowStrategy.DROP_LATEST) + .captureBackpressureData(true) + .build(); + + Assertions.assertThat(options.getBatchSize()).isEqualTo(100); + Assertions.assertThat(options.getBufferLimit()).isEqualTo(500); + Assertions.assertThat(options.getBackpressureStrategy()).isEqualTo(BackpressureOverflowStrategy.DROP_LATEST); + Assertions.assertThat(options.getCaptureBackpressureData()).isTrue(); } } \ No newline at end of file diff --git a/client/src/test/java/com/influxdb/client/internal/flowable/BackpressureBatchesBufferStrategyTest.java b/client/src/test/java/com/influxdb/client/internal/flowable/BackpressureBatchesBufferStrategyTest.java index 5cf5613c315..0cd57e5d7ca 100644 --- a/client/src/test/java/com/influxdb/client/internal/flowable/BackpressureBatchesBufferStrategyTest.java +++ b/client/src/test/java/com/influxdb/client/internal/flowable/BackpressureBatchesBufferStrategyTest.java @@ -51,7 +51,7 @@ public void onNext(AbstractWriteClient.BatchWriteItem t) { itemsToWrite(dataPerBatches) .lift(new BackpressureBatchesBufferStrategy( bufferSize, - backpressure::incrementAndGet, + droppedPoints -> backpressure.incrementAndGet(), DROP_OLDEST))) .subscribe(testSubscriber); diff --git a/client/src/test/java/com/influxdb/client/internal/flowable/BackpressureDataCaptureTest.java b/client/src/test/java/com/influxdb/client/internal/flowable/BackpressureDataCaptureTest.java new file mode 100644 index 00000000000..599dc1a657f --- /dev/null +++ b/client/src/test/java/com/influxdb/client/internal/flowable/BackpressureDataCaptureTest.java @@ -0,0 +1,354 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.client.internal.flowable; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +import com.influxdb.client.domain.WritePrecision; +import com.influxdb.client.internal.AbstractWriteClient; +import com.influxdb.client.write.WriteParameters; + +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.BackpressureOverflowStrategy; +import io.reactivex.rxjava3.internal.subscriptions.BooleanSubscription; +import io.reactivex.rxjava3.subscribers.DefaultSubscriber; +import io.reactivex.rxjava3.subscribers.TestSubscriber; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +/** + * @author Fred Park (fredjoonpark@github) + */ +class BackpressureDataCaptureTest { + + @Test + public void testBackwardCompatibility() { + AtomicInteger backpressureCount = new AtomicInteger(0); + TestSubscriber testSubscriber = createTestSubscriber(); + + // Test old constructor without capture parameter (should default to false) + Flowable.fromPublisher( + createSequentialBatches(5) + .lift(new BackpressureBatchesBufferStrategy( + 3, + bufferedPoints -> backpressureCount.incrementAndGet(), + BackpressureOverflowStrategy.DROP_OLDEST))) + .subscribe(testSubscriber); + + testSubscriber.request(1); + testSubscriber.awaitDone(5, TimeUnit.SECONDS).assertNoErrors(); + + Assertions.assertThat(backpressureCount.get()).isGreaterThan(0); + } + + @Test + public void testCaptureDisabled_NoDataCaptured() { + int bufferSize = 2; + + // Test both DROP_OLDEST and DROP_LATEST with capture disabled + for (BackpressureOverflowStrategy strategy : List.of( + BackpressureOverflowStrategy.DROP_OLDEST, + BackpressureOverflowStrategy.DROP_LATEST)) { + + List capturedData = new ArrayList<>(); + TestSubscriber testSubscriber = createTestSubscriber(); + + Flowable.fromPublisher( + createSequentialBatches(4) + .lift(new BackpressureBatchesBufferStrategy( + bufferSize, + capturedData::addAll, + strategy, + false))) + .subscribe(testSubscriber); + + testSubscriber.request(10); + testSubscriber.awaitDone(5, TimeUnit.SECONDS); + testSubscriber.assertNoErrors(); + testSubscriber.assertComplete(); + + Assertions.assertThat(capturedData) + .as("No data should be captured when captureBackpressureData=false with " + strategy) + .isEmpty(); + } + } + + @Test + public void testCaptureEnabled_CompareStrategies() { + int bufferSize = 2; // Small buffer: can hold batches 1,2 initially + int batchCount = 6; // Send batches 1,2,3,4,5,6 + + // Test DROP_OLDEST + // Expected: When batch 3 arrives, batch 1 gets dropped (oldest) + // When batch 4 arrives, batch 2 gets dropped, etc. + List oldestDropped = new ArrayList<>(); + TestSubscriber oldestSubscriber = createTestSubscriber(); + + Flowable.fromPublisher( + createSequentialBatches(batchCount) + .lift(new BackpressureBatchesBufferStrategy( + bufferSize, + oldestDropped::addAll, + BackpressureOverflowStrategy.DROP_OLDEST, + true))) + .subscribe(oldestSubscriber); + + oldestSubscriber.request(1); + oldestSubscriber.awaitDone(5, TimeUnit.SECONDS).assertNoErrors(); + + // Test DROP_LATEST + // Expected: When batch 3 arrives, batch 3 gets dropped (latest) + // When batch 4 arrives, batch 4 gets dropped, etc. + List latestDropped = new ArrayList<>(); + TestSubscriber latestSubscriber = createTestSubscriber(); + + Flowable.fromPublisher( + createSequentialBatches(batchCount) + .lift(new BackpressureBatchesBufferStrategy( + bufferSize, + latestDropped::addAll, + BackpressureOverflowStrategy.DROP_LATEST, + true))) + .subscribe(latestSubscriber); + + latestSubscriber.request(1); + latestSubscriber.awaitDone(5, TimeUnit.SECONDS).assertNoErrors(); + + // Extract and verify the exact batch numbers captured + List oldestBatchNums = extractBatchNumbers(oldestDropped); + List latestBatchNums = extractBatchNumbers(latestDropped); + + // Both strategies should capture some data + Assertions.assertThat(oldestBatchNums) + .as("DROP_OLDEST should capture some batch numbers") + .isNotEmpty(); + Assertions.assertThat(latestBatchNums) + .as("DROP_LATEST should capture some batch numbers") + .isNotEmpty(); + + // Verify the specific behavior: + // DROP_OLDEST should capture early batch numbers (1, 2, 3, etc.) + Assertions.assertThat(oldestBatchNums) + .as("DROP_OLDEST should capture early batch numbers") + .allMatch(batchNum -> batchNum <= 4); + + // DROP_LATEST should capture later batch numbers (3, 4, 5, 6) + Assertions.assertThat(latestBatchNums) + .as("DROP_LATEST should capture later batch numbers") + .allMatch(batchNum -> batchNum >= 3); + + // The minimum batch number in DROP_OLDEST should be lower than in DROP_LATEST + int minOldest = oldestBatchNums.stream().min(Integer::compareTo).orElse(999); + int minLatest = latestBatchNums.stream().min(Integer::compareTo).orElse(0); + + Assertions.assertThat(minOldest) + .as("DROP_OLDEST should capture lower batch numbers than DROP_LATEST") + .isLessThan(minLatest); + + // Verify the captured data contains valid line protocol with expected batch + // numbers + for (String point : oldestDropped) { + Assertions.assertThat(point) + .as("Captured point should be valid line protocol") + .matches("sequential,batch=\\d+ value=\\d+ \\d+"); + } + + for (String point : latestDropped) { + Assertions.assertThat(point) + .as("Captured point should be valid line protocol") + .matches("sequential,batch=\\d+ value=\\d+ \\d+"); + } + } + + @Test + public void testCaptureBatch() { + try { + WriteParameters writeParameters = new WriteParameters("test-bucket", "test-org", WritePrecision.NS); + + BackpressureBatchesBufferStrategy strategy = new BackpressureBatchesBufferStrategy( + 1, + capturedData -> { + }, + BackpressureOverflowStrategy.DROP_OLDEST, + true); + + TestSubscriber testSubscriber = createTestSubscriber(); + var subscriber = (BackpressureBatchesBufferStrategy.OnBackpressureBufferStrategySubscriber) strategy + .apply(testSubscriber); + + // Test empty case + AbstractWriteClient.BatchWriteDataGrouped emptyData = new AbstractWriteClient.BatchWriteDataGrouped( + writeParameters); + AbstractWriteClient.BatchWriteItem emptyItem = new AbstractWriteClient.BatchWriteItem(writeParameters, + emptyData); + List emptyResult = subscriber.captureBatch(emptyItem); + Assertions.assertThat(emptyResult).isEmpty(); + + // Test multiline with empty lines + AbstractWriteClient.BatchWriteDataGrouped multilineData = new AbstractWriteClient.BatchWriteDataGrouped( + writeParameters); + multilineData.append("measurement1,tag=value1 field=1 1000000"); + multilineData.append(""); + multilineData.append(" "); + multilineData.append("measurement2,tag=value2 field=2 2000000"); + + AbstractWriteClient.BatchWriteItem multilineItem = new AbstractWriteClient.BatchWriteItem(writeParameters, + multilineData); + + List multilineResult = subscriber.captureBatch(multilineItem); + Assertions.assertThat(multilineResult).isNotEmpty(); + Assertions.assertThat(multilineResult).hasSize(2); // Only non-empty lines + Assertions.assertThat(multilineResult).contains("measurement1,tag=value1 field=1 1000000"); + Assertions.assertThat(multilineResult).contains("measurement2,tag=value2 field=2 2000000"); + Assertions.assertThat(multilineResult).allMatch(line -> !line.trim().isEmpty()); + + // Test single line + AbstractWriteClient.BatchWriteDataGrouped singleLineData = new AbstractWriteClient.BatchWriteDataGrouped( + writeParameters); + singleLineData.append("single,measurement=test value=123 1000000"); + + AbstractWriteClient.BatchWriteItem singleLineItem = new AbstractWriteClient.BatchWriteItem(writeParameters, + singleLineData); + + List singleResult = subscriber.captureBatch(singleLineItem); + Assertions.assertThat(singleResult).hasSize(1); + Assertions.assertThat(singleResult).contains("single,measurement=test value=123 1000000"); + + // Test null and empty line protocol through Flowable stream + List capturedData = new ArrayList<>(); + TestSubscriber streamTestSubscriber = createTestSubscriber(); + + Flowable.just( + new AbstractWriteClient.BatchWriteItem(writeParameters, + new AbstractWriteClient.BatchWriteDataRecord(null)), + new AbstractWriteClient.BatchWriteItem(writeParameters, + new AbstractWriteClient.BatchWriteDataRecord(""))) + .lift(new BackpressureBatchesBufferStrategy(1, capturedData::addAll, + BackpressureOverflowStrategy.DROP_OLDEST, true)) + .subscribe(streamTestSubscriber); + + streamTestSubscriber.request(10); + streamTestSubscriber.awaitDone(5, TimeUnit.SECONDS); + streamTestSubscriber.assertNoErrors(); + streamTestSubscriber.assertComplete(); + + } catch (Throwable e) { + Assertions.fail("captureBatch test failed: " + e.getMessage()); + } + } + + @Test + public void testOnOverflowExceptionHandling() { + int bufferSize = 1; + TestSubscriber testSubscriber = createTestSubscriber(); + + Consumer> throwingConsumer = capturedData -> { + throw new RuntimeException("Test exception in onOverflow"); + }; + Flowable.fromPublisher( + createSequentialBatches(3) + .lift(new BackpressureBatchesBufferStrategy( + bufferSize, + throwingConsumer, + BackpressureOverflowStrategy.DROP_OLDEST, + true))) + .subscribe(testSubscriber); + + testSubscriber.request(1); + testSubscriber.awaitDone(5, TimeUnit.SECONDS); + + testSubscriber.assertError(RuntimeException.class); + } + + /** + * Extract batch numbers from line protocol strings for verification + */ + private List extractBatchNumbers(List lineProtocolPoints) { + return lineProtocolPoints.stream() + .map(line -> { + try { + int batchStart = line.indexOf("batch=") + 6; + if (batchStart == 5) + return -1; + + int batchEnd = line.indexOf(" ", batchStart); + if (batchEnd == -1) + batchEnd = line.length(); + + String batchNumStr = line.substring(batchStart, batchEnd); + return Integer.parseInt(batchNumStr); + } catch (Exception e) { + return -1; // Invalid format + } + }) + .filter(batchNum -> batchNum != -1) + .distinct() + .sorted() + .collect(java.util.stream.Collectors.toList()); + } + + private TestSubscriber createTestSubscriber() { + return new TestSubscriber<>(new DefaultSubscriber() { + @Override + protected void onStart() {} + + @Override + public void onComplete() {} + + @Override + public void onError(Throwable e) {} + + @Override + public void onNext(AbstractWriteClient.BatchWriteItem t) {} + }, 0L); + } + + private Flowable createSequentialBatches(int batchCount) { + return Flowable.unsafeCreate(s -> { + BooleanSubscription bs = new BooleanSubscription(); + s.onSubscribe(bs); + + for (int batchNum = 1; batchNum <= batchCount && !bs.isCancelled(); batchNum++) { + WriteParameters writeParameters = new WriteParameters("test-bucket", "test-org", WritePrecision.NS); + AbstractWriteClient.BatchWriteDataGrouped data = new AbstractWriteClient.BatchWriteDataGrouped(writeParameters); + + // Single point per batch with clear batch identification + data.append(String.format("sequential,batch=%d value=%d %d", batchNum, batchNum * 100, batchNum * 1000000L)); + s.onNext(new AbstractWriteClient.BatchWriteItem(writeParameters, data)); + // Small delay to ensure predictable ordering + try { + Thread.sleep(1); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } + if (!bs.isCancelled()) { + s.onComplete(); + } + }); + } +} \ No newline at end of file diff --git a/client/src/test/java/com/influxdb/client/write/events/BackpressureEventDataCaptureTest.java b/client/src/test/java/com/influxdb/client/write/events/BackpressureEventDataCaptureTest.java new file mode 100644 index 00000000000..dcabff63a01 --- /dev/null +++ b/client/src/test/java/com/influxdb/client/write/events/BackpressureEventDataCaptureTest.java @@ -0,0 +1,153 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.client.write.events; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +/** + * @author Fred Park (fredjoonpark@github) + */ +class BackpressureEventDataCaptureTest { + + @Test + public void testBackpressureEventWithoutData() { + BackpressureEvent event = new BackpressureEvent(BackpressureEvent.BackpressureReason.TOO_MUCH_BATCHES); + + Assertions.assertThat(event.getReason()).isEqualTo(BackpressureEvent.BackpressureReason.TOO_MUCH_BATCHES); + Assertions.assertThat(event.getDroppedLineProtocol()).isEmpty(); + } + + @Test + public void testBackpressureEventWithData() { + List testData = Arrays.asList( + "measurement,tag=test1 value=1 1000000000", + "measurement,tag=test2 value=2 2000000000" + ); + + BackpressureEvent event = new BackpressureEvent( + BackpressureEvent.BackpressureReason.TOO_MUCH_BATCHES, + testData + ); + + Assertions.assertThat(event.getReason()).isEqualTo(BackpressureEvent.BackpressureReason.TOO_MUCH_BATCHES); + Assertions.assertThat(event.getDroppedLineProtocol()).hasSize(2); + Assertions.assertThat(event.getDroppedLineProtocol()).containsExactly( + "measurement,tag=test1 value=1 1000000000", + "measurement,tag=test2 value=2 2000000000" + ); + } + + @Test + public void testBackpressureEventWithNullData() { + BackpressureEvent event = new BackpressureEvent( + BackpressureEvent.BackpressureReason.FAST_EMITTING, + null + ); + + Assertions.assertThat(event.getReason()).isEqualTo(BackpressureEvent.BackpressureReason.FAST_EMITTING); + Assertions.assertThat(event.getDroppedLineProtocol()).isEmpty(); + } + + @Test + public void testBackpressureEventWithEmptyData() { + BackpressureEvent event = new BackpressureEvent( + BackpressureEvent.BackpressureReason.TOO_MUCH_BATCHES, + Collections.emptyList() + ); + + Assertions.assertThat(event.getReason()).isEqualTo(BackpressureEvent.BackpressureReason.TOO_MUCH_BATCHES); + Assertions.assertThat(event.getDroppedLineProtocol()).isEmpty(); + } + + @Test + public void testDroppedLineProtocolIsUnmodifiable() { + List testData = Arrays.asList( + "measurement,tag=test1 value=1 1000000000", + "measurement,tag=test2 value=2 2000000000" + ); + + BackpressureEvent event = new BackpressureEvent( + BackpressureEvent.BackpressureReason.TOO_MUCH_BATCHES, + testData + ); + + List bufferedData = event.getDroppedLineProtocol(); + + // Should throw UnsupportedOperationException when trying to modify + Assertions.assertThatThrownBy(() -> bufferedData.add("new point")) + .isInstanceOf(UnsupportedOperationException.class); + + Assertions.assertThatThrownBy(() -> bufferedData.clear()) + .isInstanceOf(UnsupportedOperationException.class); + } + + @Test + public void testLogEventWithBufferedData() { + List testData = Arrays.asList( + "measurement,tag=test1 value=1 1000000000", + "measurement,tag=test2 value=2 2000000000", + "measurement,tag=test3 value=3 3000000000" + ); + + BackpressureEvent event = new BackpressureEvent( + BackpressureEvent.BackpressureReason.TOO_MUCH_BATCHES, + testData + ); + + // Should not throw any exceptions + Assertions.assertThatCode(event::logEvent).doesNotThrowAnyException(); + + // Verify the event contains the expected data + Assertions.assertThat(event.getDroppedLineProtocol()).hasSize(3); + } + + @Test + public void testLogEventWithoutBufferedData() { + BackpressureEvent event = new BackpressureEvent(BackpressureEvent.BackpressureReason.FAST_EMITTING); + + // Should not throw any exceptions + Assertions.assertThatCode(event::logEvent).doesNotThrowAnyException(); + + // Verify the event has no buffered data + Assertions.assertThat(event.getDroppedLineProtocol()).isEmpty(); + } + + @Test + public void testBackpressureReasons() { + // Test FAST_EMITTING reason + BackpressureEvent fastEmittingEvent = new BackpressureEvent( + BackpressureEvent.BackpressureReason.FAST_EMITTING + ); + Assertions.assertThat(fastEmittingEvent.getReason()).isEqualTo(BackpressureEvent.BackpressureReason.FAST_EMITTING); + + // Test TOO_MUCH_BATCHES reason + BackpressureEvent tooMuchBatchesEvent = new BackpressureEvent( + BackpressureEvent.BackpressureReason.TOO_MUCH_BATCHES + ); + Assertions.assertThat(tooMuchBatchesEvent.getReason()).isEqualTo(BackpressureEvent.BackpressureReason.TOO_MUCH_BATCHES); + } +} \ No newline at end of file