From a30248eb1054bc969182d279c96064f3c6ac4436 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 16 Sep 2024 16:33:52 -0700 Subject: [PATCH] BOC can track message consumption --- .../io/airbyte/cdk/output/BufferingOutputConsumer.kt | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/airbyte-cdk/bulk/core/base/src/testFixtures/kotlin/io/airbyte/cdk/output/BufferingOutputConsumer.kt b/airbyte-cdk/bulk/core/base/src/testFixtures/kotlin/io/airbyte/cdk/output/BufferingOutputConsumer.kt index 26a93fe18047..bdf58b077811 100644 --- a/airbyte-cdk/bulk/core/base/src/testFixtures/kotlin/io/airbyte/cdk/output/BufferingOutputConsumer.kt +++ b/airbyte-cdk/bulk/core/base/src/testFixtures/kotlin/io/airbyte/cdk/output/BufferingOutputConsumer.kt @@ -34,6 +34,7 @@ class BufferingOutputConsumer( private val catalogs = mutableListOf() private val traces = mutableListOf() private val messages = mutableListOf() + private var messagesIndex: Int = 0 var callback: (AirbyteMessage) -> Unit = {} set(value) { @@ -79,4 +80,11 @@ class BufferingOutputConsumer( fun traces(): List = synchronized(this) { listOf(*traces.toTypedArray()) } fun messages(): List = synchronized(this) { listOf(*messages.toTypedArray()) } + + fun newMessages(): List = + synchronized(this) { + val newMessages = messages.subList(messagesIndex, messages.size) + messagesIndex = messages.size + newMessages + } }