diff --git a/ydb-cookbook/src/main/java/tech/ydb/examples/SimpleExample.java b/ydb-cookbook/src/main/java/tech/ydb/examples/SimpleExample.java index e1075e0..87079e1 100644 --- a/ydb-cookbook/src/main/java/tech/ydb/examples/SimpleExample.java +++ b/ydb-cookbook/src/main/java/tech/ydb/examples/SimpleExample.java @@ -1,7 +1,18 @@ package tech.ydb.examples; +import java.time.Duration; + import tech.ydb.auth.iam.CloudAuthHelper; +import tech.ydb.core.Result; +import tech.ydb.core.Status; import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.scheme.SchemeClient; +import tech.ydb.scheme.description.DescribePathResult; +import tech.ydb.topic.TopicClient; +import tech.ydb.topic.description.Codec; +import tech.ydb.topic.description.Consumer; +import tech.ydb.topic.description.SupportedCodecs; +import tech.ydb.topic.settings.CreateTopicSettings; /** @@ -9,8 +20,8 @@ * @author Nikolay Perfilov */ public abstract class SimpleExample { - protected static final String TOPIC_NAME = "test-topic"; - protected static final String CONSUMER_NAME = "test-consumer"; + protected static final String TOPIC_NAME = System.getenv("YDB_TOPIC_NAME"); + protected static final String CONSUMER_NAME = System.getenv("YDB_CONSUMER_NAME"); protected void doMain(String[] args) { if (args.length > 1) { diff --git a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/ReadAsync.java b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/ReadAsync.java index e93cddd..3865802 100644 --- a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/ReadAsync.java +++ b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/ReadAsync.java @@ -160,6 +160,9 @@ public void onPartitionSessionClosed(PartitionSessionClosedEvent event) { @Override public void onReaderClosed(ReaderClosedEvent event) { logger.info("Reader is closed."); + if (!messageReceivedFuture.isDone()) { + messageReceivedFuture.complete(null); + } } } diff --git a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/ReadSync.java b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/ReadSync.java index d64cfd8..d8a3401 100644 --- a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/ReadSync.java +++ b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/ReadSync.java @@ -3,14 +3,12 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.Instant; -import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import tech.ydb.core.grpc.GrpcTransport; import tech.ydb.examples.SimpleExample; import tech.ydb.topic.TopicClient; -import tech.ydb.topic.description.MetadataItem; import tech.ydb.topic.read.DecompressionException; import tech.ydb.topic.read.Message; import tech.ydb.topic.read.SyncReader; diff --git a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/ReadWriteWorkload.java b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/ReadWriteWorkload.java index efb5ce2..8ed53cf 100644 --- a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/ReadWriteWorkload.java +++ b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/ReadWriteWorkload.java @@ -1,5 +1,6 @@ package tech.ydb.examples.topic; +import java.nio.charset.StandardCharsets; import java.text.DecimalFormat; import java.time.Duration; import java.time.Instant; @@ -109,6 +110,9 @@ protected void run(GrpcTransport transport, String pathPrefix) { } logger.info("Received a signal to stop writing"); + // Wait for all writes to receive a WriteAck before shutting down writer + writer.flush(); + try { writer.shutdown(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS); } catch (TimeoutException exception) { @@ -240,7 +244,7 @@ private class Handler extends AbstractReadEventHandler { public void onMessages(DataReceivedEvent event) { for (tech.ydb.topic.read.Message message : event.getMessages()) { messagesReceived.incrementAndGet(); - if (logger.isTraceEnabled()) { + if (logger.isDebugEnabled()) { StringBuilder str = new StringBuilder("Message received"); str.append("\n"); str.append(" offset: ").append(message.getOffset()).append("\n") @@ -251,13 +255,22 @@ public void onMessages(DataReceivedEvent event) { .append(" writtenAt: ").append(message.getWrittenAt()).append("\n") .append(" partitionSession: ").append(message.getPartitionSession().getId()).append("\n") .append(" partitionId: ").append(message.getPartitionSession().getPartitionId()) + .append("\n") + .append(" metadataItems: ") .append("\n"); + message.getMetadataItems().forEach(item -> str + .append(" key: \"") + .append(item.getKey()) + .append("\", value: \"") + .append(new String(item.getValue(), StandardCharsets.UTF_8)) + .append("\"") + .append("\n")); if (!message.getWriteSessionMeta().isEmpty()) { str.append(" writeSessionMeta:\n"); message.getWriteSessionMeta().forEach((key, value) -> str.append(" ").append(key).append(": ").append(value).append("\n")); } - logger.trace(str.toString()); + logger.debug(str.toString()); } else { logger.debug("Message received. SeqNo={}, offset={}", message.getSeqNo(), message.getOffset()); } diff --git a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/WriteAsync.java b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/WriteAsync.java index c480e13..9cf4a8c 100644 --- a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/WriteAsync.java +++ b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/WriteAsync.java @@ -1,5 +1,6 @@ package tech.ydb.examples.topic; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -23,6 +24,8 @@ */ public class WriteAsync extends SimpleExample { private static final Logger logger = LoggerFactory.getLogger(WriteAsync.class); + private static final int MESSAGES_COUNT = 5; + private static final int WAIT_TIMEOUT_SECONDS = 60; @Override protected void run(GrpcTransport transport, String pathPrefix) { @@ -52,14 +55,17 @@ protected void run(GrpcTransport transport, String pathPrefix) { return null; }); - for (int i = 1; i <= 5; i++) { + // A latch to wait for all writes to receive a WriteAck before shutting down writer + CountDownLatch writesInProgress = new CountDownLatch(MESSAGES_COUNT); + + for (int i = 1; i <= MESSAGES_COUNT; i++) { final int index = i; try { String messageString = "message" + i; // Blocks until the message is put into sending buffer writer.send(Message.of(messageString.getBytes())).whenComplete((result, ex) -> { if (ex != null) { - logger.error("Exception while sending message {}: ", index, ex); + logger.error("Exception while sending a message {}: ", index, ex); } else { logger.info("Message {} ack received", index); @@ -76,20 +82,38 @@ protected void run(GrpcTransport transport, String pathPrefix) { break; } } + writesInProgress.countDown(); }); } catch (QueueOverflowException exception) { - logger.error("Queue overflow exception while sending message{}: ", index, exception); - // Send queue is full. Need retry with backoff or skip + logger.error("Queue overflow exception while sending a message{}: ", index, exception); + // Send queue is full. Need to retry with backoff or skip + writesInProgress.countDown(); } logger.info("Message {} is sent", index); } - long timeoutSeconds = 10; try { - writer.shutdown().get(timeoutSeconds, TimeUnit.SECONDS); + while (!writesInProgress.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { + logger.error("Writes are not finished in {} seconds", WAIT_TIMEOUT_SECONDS); + } + } catch (InterruptedException exception) { + logger.error("Waiting for writes to finish was interrupted: ", exception); + } + + try { + if (!writesInProgress.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { + logger.error("Writes are not finished in {} seconds", WAIT_TIMEOUT_SECONDS); + } + } catch (InterruptedException exception) { + logger.error("Waiting for writes to finish was interrupted: ", exception); + } + + try { + writer.shutdown().get(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); } catch (TimeoutException exception) { - logger.error("Timeout exception during writer termination ({} seconds): ", timeoutSeconds, exception); + logger.error("Timeout exception during writer termination ({} seconds): ", WAIT_TIMEOUT_SECONDS, + exception); } catch (ExecutionException exception) { logger.error("Execution exception during writer termination: ", exception); } catch (InterruptedException exception) { diff --git a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/WriteSync.java b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/WriteSync.java index 9203196..2bdc213 100644 --- a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/WriteSync.java +++ b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/WriteSync.java @@ -67,8 +67,10 @@ protected void run(GrpcTransport transport, String pathPrefix) { } } + // Wait for all writes to receive a WriteAck before shutting down writer writer.flush(); logger.info("Flush finished"); + long shutdownTimeoutSeconds = 10; try { writer.shutdown(shutdownTimeoutSeconds, TimeUnit.SECONDS); diff --git a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionReadAsync.java b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionReadAsync.java index 806d0d4..ca76c3e 100644 --- a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionReadAsync.java +++ b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionReadAsync.java @@ -18,8 +18,6 @@ import tech.ydb.table.impl.PooledTableClient; import tech.ydb.table.rpc.grpc.GrpcTableRpc; import tech.ydb.table.transaction.TableTransaction; -import tech.ydb.table.transaction.Transaction; -import tech.ydb.table.transaction.TxControl; import tech.ydb.topic.TopicClient; import tech.ydb.topic.read.AsyncReader; import tech.ydb.topic.read.DecompressionException; @@ -42,7 +40,7 @@ public class TransactionReadAsync extends SimpleExample { private static final Logger logger = LoggerFactory.getLogger(TransactionReadAsync.class); private static final long MAX_MEMORY_USAGE_BYTES = 500 * 1024 * 1024; // 500 Mb - private static final int MESSAGES_COUNT = 5; + private static final int MESSAGES_COUNT = 1; private final CompletableFuture messageReceivedFuture = new CompletableFuture<>(); private TableClient tableClient; @@ -135,11 +133,13 @@ public void onMessages(DataReceivedEvent event) { // creating session and transaction Result sessionResult = tableClient.createSession(Duration.ofSeconds(10)).join(); if (!sessionResult.isSuccess()) { - logger.error("Couldn't get session from pool: {}", sessionResult); + logger.error("Couldn't get a session from the pool: {}", sessionResult); return; // retry or shutdown } Session session = sessionResult.getValue(); - TableTransaction transaction = session.createNewTransaction(TxMode.SERIALIZABLE_RW); + TableTransaction transaction = session.beginTransaction(TxMode.SERIALIZABLE_RW) + .join() + .getValue(); // do something else in transaction transaction.executeDataQuery("SELECT 1").join(); @@ -199,6 +199,7 @@ public void onPartitionSessionClosed(PartitionSessionClosedEvent event) { @Override public void onReaderClosed(ReaderClosedEvent event) { logger.info("Reader is closed."); + messageReceivedFuture.complete(null); } } diff --git a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionReadSync.java b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionReadSync.java index 9bd91b9..974a49d 100644 --- a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionReadSync.java +++ b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionReadSync.java @@ -13,8 +13,6 @@ import tech.ydb.table.Session; import tech.ydb.table.TableClient; import tech.ydb.table.transaction.TableTransaction; -import tech.ydb.table.transaction.Transaction; -import tech.ydb.table.transaction.TxControl; import tech.ydb.topic.TopicClient; import tech.ydb.topic.read.DecompressionException; import tech.ydb.topic.read.Message; @@ -49,37 +47,34 @@ protected void run(GrpcTransport transport, String pathPrefix) { reader.init(); try { - // Reading 5 messages - for (int i = 0; i < 5; i++) { - // creating session and transaction - Result sessionResult = tableClient.createSession(Duration.ofSeconds(10)).join(); - if (!sessionResult.isSuccess()) { - logger.error("Couldn't get session from pool: {}", sessionResult); - return; // retry or shutdown - } - Session session = sessionResult.getValue(); - TableTransaction transaction = session.createNewTransaction(TxMode.SERIALIZABLE_RW); - - // do something else in transaction - transaction.executeDataQuery("SELECT 1").join(); - // analyzeQueryResultIfNeeded(); + // creating session and transaction + Result sessionResult = tableClient.createSession(Duration.ofSeconds(10)).join(); + if (!sessionResult.isSuccess()) { + logger.error("Couldn't a get session from the pool: {}", sessionResult); + return; // retry or shutdown + } + Session session = sessionResult.getValue(); + TableTransaction transaction = session.createNewTransaction(TxMode.SERIALIZABLE_RW); - //Session session - Message message = reader.receive(ReceiveSettings.newBuilder() - .setTransaction(transaction) - .build()); - byte[] messageData; - try { - messageData = message.getData(); - } catch (DecompressionException e) { - logger.warn("Decompression exception while receiving a message: ", e); - messageData = e.getRawData(); - } - logger.info("Message received: {}", new String(messageData, StandardCharsets.UTF_8)); + // do something else in transaction + transaction.executeDataQuery("SELECT 1").join(); + // analyzeQueryResultIfNeeded(); - transaction.commit().join(); - // analyze commit status + //Session session + Message message = reader.receive(ReceiveSettings.newBuilder() + .setTransaction(transaction) + .build()); + byte[] messageData; + try { + messageData = message.getData(); + } catch (DecompressionException e) { + logger.warn("Decompression exception while receiving a message: ", e); + messageData = e.getRawData(); } + logger.info("Message received: {}", new String(messageData, StandardCharsets.UTF_8)); + + transaction.commit().join(); + // analyze commit status } catch (InterruptedException exception) { logger.error("Interrupted exception while waiting for message: ", exception); } diff --git a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionWriteAsync.java b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionWriteAsync.java index 07887d3..f9b5c67 100644 --- a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionWriteAsync.java +++ b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionWriteAsync.java @@ -15,9 +15,9 @@ import tech.ydb.examples.SimpleExample; import tech.ydb.table.Session; import tech.ydb.table.TableClient; +import tech.ydb.table.query.DataQueryResult; +import tech.ydb.table.result.ResultSetReader; import tech.ydb.table.transaction.TableTransaction; -import tech.ydb.table.transaction.Transaction; -import tech.ydb.table.transaction.TxControl; import tech.ydb.topic.TopicClient; import tech.ydb.topic.description.Codec; import tech.ydb.topic.settings.SendSettings; @@ -35,7 +35,6 @@ public class TransactionWriteAsync extends SimpleExample { @Override protected void run(GrpcTransport transport, String pathPrefix) { - String topicPath = pathPrefix + "topic-java"; String producerId = "messageGroup1"; String messageGroupId = "messageGroup1"; @@ -46,7 +45,7 @@ protected void run(GrpcTransport transport, String pathPrefix) { try (TableClient tableClient = TableClient.newClient(transport).build()) { WriterSettings settings = WriterSettings.newBuilder() - .setTopicPath(topicPath) + .setTopicPath(TOPIC_NAME) .setProducerId(producerId) .setMessageGroupId(messageGroupId) .setCodec(Codec.GZIP) @@ -63,69 +62,78 @@ protected void run(GrpcTransport transport, String pathPrefix) { return null; }); - for (int i = 1; i <= 5; i++) { - final int index = i; - // creating session and transaction - Result sessionResult = tableClient.createSession(Duration.ofSeconds(10)).join(); - if (!sessionResult.isSuccess()) { - logger.error("Couldn't get session from pool: {}", sessionResult); - return; // retry or shutdown - } - Session session = sessionResult.getValue(); - TableTransaction transaction = session.createNewTransaction(TxMode.SERIALIZABLE_RW); - - // do something else in transaction - transaction.executeDataQuery("SELECT 1").join(); - // analyzeQueryResultIfNeeded(); - - try { - String messageString = "message" + i; - // Blocks until the message is put into sending buffer - writer.send(Message.newBuilder() - .setData(messageString.getBytes()) - .build(), - SendSettings.newBuilder() - .setTransaction(transaction) - .build()) - .whenComplete((result, ex) -> { - if (ex != null) { - logger.error("Exception while sending message {}: ", index, ex); - } else { - logger.info("Message {} ack received", index); - - switch (result.getState()) { - case WRITTEN: - WriteAck.Details details = result.getDetails(); - logger.info("Message was written successfully, offset: " + - details.getOffset()); - break; - case ALREADY_WRITTEN: - logger.warn("Message was already written"); - break; - default: - break; - } + // creating session and transaction + Result sessionResult = tableClient.createSession(Duration.ofSeconds(10)).join(); + if (!sessionResult.isSuccess()) { + logger.error("Couldn't get a session from the pool: {}", sessionResult); + return; // retry or shutdown + } + Session session = sessionResult.getValue(); + TableTransaction transaction = session.createNewTransaction(TxMode.SERIALIZABLE_RW); + + // get message text within the transaction + Result dataQueryResult = transaction.executeDataQuery("SELECT \"Hello, world!\";") + .join(); + if (!dataQueryResult.isSuccess()) { + logger.error("Couldn't execute DataQuery: {}", dataQueryResult); + return; // retry or shutdown + } + ResultSetReader rsReader = dataQueryResult.getValue().getResultSet(0); + byte[] message; + if (rsReader.next()) { + message = rsReader.getColumn(0).getBytes(); + } else { + logger.error("Empty DataQuery result"); + return; // retry or shutdown + } + + try { + // Blocks until the message is put into sending buffer + writer.send(Message.newBuilder() + .setData(message) + .build(), + SendSettings.newBuilder() + .setTransaction(transaction) + .build()) + .whenComplete((result, ex) -> { + if (ex != null) { + logger.error("Exception while sending a message: ", ex); + } else { + logger.info("Message ack received"); + + switch (result.getState()) { + case WRITTEN: + WriteAck.Details details = result.getDetails(); + logger.info("Message was written successfully, offset: " + + details.getOffset()); + break; + case ALREADY_WRITTEN: + logger.warn("Message was already written"); + break; + default: + break; } - }) - // Waiting for message to reach server before transaction commit - .join(); - } catch (QueueOverflowException exception) { - logger.error("Queue overflow exception while sending message{}: ", index, exception); - // Send queue is full. Need retry with backoff or skip - } + } + }) + // Waiting for the message to reach the server before committing the transaction + .join(); + + logger.info("Message is sent"); + transaction.commit().whenComplete((status, throwable) -> { if (throwable != null) { - logger.error("Exception while committing transaction with message{}: ", index, throwable); + logger.error("Exception while committing transaction with message: ", throwable); } else { if (status.isSuccess()) { - logger.info("Transaction with message{} committed successfully", index); + logger.info("Transaction with message committed successfully"); } else { - logger.error("Failed to commit transaction with message{}: {}", index, status); + logger.error("Failed to commit transaction with message: {}", status); } } }); - - logger.info("Message {} is sent", index); + } catch (QueueOverflowException exception) { + logger.error("Queue overflow exception while sending a message: ", exception); + // Send queue is full. Need to retry with backoff or skip } long timeoutSeconds = 10; diff --git a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionWriteSync.java b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionWriteSync.java index be801a0..e951869 100644 --- a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionWriteSync.java +++ b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionWriteSync.java @@ -13,6 +13,8 @@ import tech.ydb.examples.SimpleExample; import tech.ydb.table.Session; import tech.ydb.table.TableClient; +import tech.ydb.table.query.DataQueryResult; +import tech.ydb.table.result.ResultSetReader; import tech.ydb.table.transaction.TableTransaction; import tech.ydb.topic.TopicClient; import tech.ydb.topic.description.Codec; @@ -55,44 +57,52 @@ protected void run(GrpcTransport transport, String pathPrefix) { } long timeoutSeconds = 5; // How long should we wait for a message to be put into sending buffer + // creating session and transaction + Result sessionResult = tableClient.createSession(Duration.ofSeconds(10)).join(); + if (!sessionResult.isSuccess()) { + logger.error("Couldn't get a session from the pool: {}", sessionResult); + return; // retry or shutdown + } + Session session = sessionResult.getValue(); + TableTransaction transaction = session.createNewTransaction(TxMode.SERIALIZABLE_RW); - for (int i = 1; i <= 5; i++) { - // creating session and transaction - Result sessionResult = tableClient.createSession(Duration.ofSeconds(10)).join(); - if (!sessionResult.isSuccess()) { - logger.error("Couldn't get session from pool: {}", sessionResult); - return; // retry or shutdown - } - Session session = sessionResult.getValue(); - TableTransaction transaction = session.createNewTransaction(TxMode.SERIALIZABLE_RW); - - // do something else in transaction - transaction.executeDataQuery("SELECT 1").join(); - // analyzeQueryResultIfNeeded(); - try { - String messageString = "message" + i; - // Non-blocking call - writer.send( - Message.newBuilder() - .setData(messageString.getBytes()) - .build(), - SendSettings.newBuilder() - .setTransaction(transaction) - .build(), - timeoutSeconds, - TimeUnit.SECONDS - ); - logger.info("Message '{}' is sent.", messageString); - } catch (TimeoutException exception) { - logger.error("Send queue is full. Couldn't put message {} into sending queue within {} seconds", - i, timeoutSeconds); - } catch (InterruptedException | ExecutionException exception) { - logger.error("Couldn't put message {} into sending queue due to exception: ", i, exception); - } - // flush to wait until all messages reach server before commit - writer.flush(); - transaction.commit().join(); + // get message text within the transaction + Result dataQueryResult = transaction.executeDataQuery("SELECT \"Hello, world!\";") + .join(); + if (!dataQueryResult.isSuccess()) { + logger.error("Couldn't execute DataQuery: {}", dataQueryResult); + return; // retry or shutdown + } + ResultSetReader rsReader = dataQueryResult.getValue().getResultSet(0); + byte[] message; + if (rsReader.next()) { + message = rsReader.getColumn(0).getBytes(); + } else { + logger.error("Empty DataQuery result"); + return; // retry or shutdown } + try { + // Non-blocking call + writer.send( + Message.of(message), + SendSettings.newBuilder() + .setTransaction(transaction) + .build(), + timeoutSeconds, + TimeUnit.SECONDS + ); + logger.info("Message is sent"); + } catch (TimeoutException exception) { + logger.error("Send queue is full. Couldn't put message into sending queue within {} seconds", + timeoutSeconds); + } catch (InterruptedException | ExecutionException exception) { + logger.error("Couldn't put message into sending queue due to exception: ", exception); + } + + // flush to wait until the message reach server before commit + writer.flush(); + + transaction.commit().join(); long shutdownTimeoutSeconds = 10; try {