diff --git a/pom.xml b/pom.xml index d2588e4..f4b2b18 100644 --- a/pom.xml +++ b/pom.xml @@ -18,7 +18,7 @@ 2.22.1 1.82 - 2.2.0-SNAPSHOT + 2.2.0 diff --git a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/ReadSync.java b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/ReadSync.java index 97caa4f..d64cfd8 100644 --- a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/ReadSync.java +++ b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/ReadSync.java @@ -1,16 +1,16 @@ package tech.ydb.examples.topic; -import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.Instant; +import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import tech.ydb.core.grpc.GrpcTransport; import tech.ydb.examples.SimpleExample; -import tech.ydb.table.Session; import tech.ydb.topic.TopicClient; +import tech.ydb.topic.description.MetadataItem; import tech.ydb.topic.read.DecompressionException; import tech.ydb.topic.read.Message; import tech.ydb.topic.read.SyncReader; @@ -31,7 +31,7 @@ protected void run(GrpcTransport transport, String pathPrefix) { ReaderSettings settings = ReaderSettings.newBuilder() .setConsumerName(CONSUMER_NAME) .addTopic(TopicReadSettings.newBuilder() - .setPath(CONSUMER_NAME) + .setPath(TOPIC_NAME) .setReadFrom(Instant.now().minus(Duration.ofHours(24))) .setMaxLag(Duration.ofMinutes(30)) .build()) @@ -42,7 +42,6 @@ protected void run(GrpcTransport transport, String pathPrefix) { // Init in background reader.init(); - try { // Reading 5 messages for (int i = 0; i < 5; i++) { diff --git a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionWriteAsync.java b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionWriteAsync.java index dfbb7ec..07887d3 100644 --- a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionWriteAsync.java +++ b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionWriteAsync.java @@ -96,8 +96,8 @@ protected void run(GrpcTransport transport, String pathPrefix) { switch (result.getState()) { case WRITTEN: WriteAck.Details details = result.getDetails(); - logger.info("Message was written successfully." - + ", offset: " + details.getOffset()); + logger.info("Message was written successfully, offset: " + + details.getOffset()); break; case ALREADY_WRITTEN: logger.warn("Message was already written"); @@ -106,7 +106,9 @@ protected void run(GrpcTransport transport, String pathPrefix) { break; } } - }); + }) + // Waiting for message to reach server before transaction commit + .join(); } catch (QueueOverflowException exception) { logger.error("Queue overflow exception while sending message{}: ", index, exception); // Send queue is full. Need retry with backoff or skip diff --git a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionWriteSync.java b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionWriteSync.java index 735290c..be801a0 100644 --- a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionWriteSync.java +++ b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionWriteSync.java @@ -7,7 +7,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import tech.ydb.common.transaction.BaseTransaction; import tech.ydb.common.transaction.TxMode; import tech.ydb.core.Result; import tech.ydb.core.grpc.GrpcTransport; @@ -15,8 +14,6 @@ import tech.ydb.table.Session; import tech.ydb.table.TableClient; import tech.ydb.table.transaction.TableTransaction; -import tech.ydb.table.transaction.Transaction; -import tech.ydb.table.transaction.TxControl; import tech.ydb.topic.TopicClient; import tech.ydb.topic.description.Codec; import tech.ydb.topic.settings.SendSettings; @@ -92,11 +89,11 @@ protected void run(GrpcTransport transport, String pathPrefix) { } catch (InterruptedException | ExecutionException exception) { logger.error("Couldn't put message {} into sending queue due to exception: ", i, exception); } + // flush to wait until all messages reach server before commit + writer.flush(); transaction.commit().join(); } - writer.flush(); - logger.info("Flush finished"); long shutdownTimeoutSeconds = 10; try { writer.shutdown(shutdownTimeoutSeconds, TimeUnit.SECONDS);