diff --git a/jdbc/spring-jooq/pom.xml b/jdbc/spring-jooq/pom.xml index 0116693..ff4222c 100644 --- a/jdbc/spring-jooq/pom.xml +++ b/jdbc/spring-jooq/pom.xml @@ -106,7 +106,6 @@ true - cr.yandex/yc/yandex-docker-local-ydb:trunk @@ -210,4 +209,4 @@ - \ No newline at end of file + diff --git a/pom.xml b/pom.xml index f81f8ca..13e1bfd 100644 --- a/pom.xml +++ b/pom.xml @@ -18,7 +18,7 @@ 2.22.1 1.82 - 2.2.10 + 2.3.10 diff --git a/query-example/pom.xml b/query-example/pom.xml index 1a8a7fd..aa5146e 100644 --- a/query-example/pom.xml +++ b/query-example/pom.xml @@ -73,7 +73,6 @@ maven-surefire-plugin - cr.yandex/yc/yandex-docker-local-ydb:trunk true 1 diff --git a/query-example/src/main/java/tech/ydb/example/App.java b/query-example/src/main/java/tech/ydb/example/App.java index 58ff312..d34fba5 100644 --- a/query-example/src/main/java/tech/ydb/example/App.java +++ b/query-example/src/main/java/tech/ydb/example/App.java @@ -119,6 +119,7 @@ private void upsertTablesData() { // Upsert list of series to table retryCtx.supplyResult(session -> session.createQuery( + "DECLARE $values AS " + ListType.of(seriesType) + ";" + "UPSERT INTO series SELECT * FROM AS_TABLE($values)", TxMode.SERIALIZABLE_RW, Params.of("$values", seriesData) @@ -146,6 +147,7 @@ private void upsertTablesData() { // Upsert list of seasons to table retryCtx.supplyResult(session -> session.createQuery( + "DECLARE $values AS " + ListType.of(seasonType) + ";" + "UPSERT INTO seasons SELECT * FROM AS_TABLE($values)", TxMode.SERIALIZABLE_RW, Params.of("$values", seasonsData) @@ -173,6 +175,7 @@ private void upsertTablesData() { // Upsert list of series to episodes retryCtx.supplyResult(session -> session.createQuery( + "DECLARE $values AS " + ListType.of(episodeType) + ";" + "UPSERT INTO episodes SELECT * FROM AS_TABLE($values)", TxMode.SERIALIZABLE_RW, Params.of("$values", episodesData) diff --git a/ydb-cookbook/pom.xml b/ydb-cookbook/pom.xml index a0aa1c1..562cb25 100644 --- a/ydb-cookbook/pom.xml +++ b/ydb-cookbook/pom.xml @@ -26,6 +26,10 @@ tech.ydb ydb-sdk-scheme + + tech.ydb + ydb-sdk-query + tech.ydb ydb-sdk-topic diff --git a/ydb-cookbook/src/main/java/tech/ydb/examples/SimpleExample.java b/ydb-cookbook/src/main/java/tech/ydb/examples/SimpleExample.java index 87079e1..d03b0c0 100644 --- a/ydb-cookbook/src/main/java/tech/ydb/examples/SimpleExample.java +++ b/ydb-cookbook/src/main/java/tech/ydb/examples/SimpleExample.java @@ -1,18 +1,7 @@ package tech.ydb.examples; -import java.time.Duration; - import tech.ydb.auth.iam.CloudAuthHelper; -import tech.ydb.core.Result; -import tech.ydb.core.Status; import tech.ydb.core.grpc.GrpcTransport; -import tech.ydb.scheme.SchemeClient; -import tech.ydb.scheme.description.DescribePathResult; -import tech.ydb.topic.TopicClient; -import tech.ydb.topic.description.Codec; -import tech.ydb.topic.description.Consumer; -import tech.ydb.topic.description.SupportedCodecs; -import tech.ydb.topic.settings.CreateTopicSettings; /** @@ -20,9 +9,6 @@ * @author Nikolay Perfilov */ public abstract class SimpleExample { - protected static final String TOPIC_NAME = System.getenv("YDB_TOPIC_NAME"); - protected static final String CONSUMER_NAME = System.getenv("YDB_CONSUMER_NAME"); - protected void doMain(String[] args) { if (args.length > 1) { System.err.println("Too many arguments"); @@ -32,11 +18,12 @@ protected void doMain(String[] args) { if (args.length == 1) { connString = args[0]; } else { - connString = "some.host.name.com:2135?database=/Root"; - System.err.println("Pass as argument to override connection settings\n"); + System.err.println("Pass as an argument. " + + "Example: some.host.name.com:2135?database=/Root\n"); + return; } - System.err.println("connection-string: " + connString + "\n"); + System.out.println("connection-string: " + connString + "\n"); try (GrpcTransport transport = GrpcTransport.forConnectionString(connString) .withAuthProvider(CloudAuthHelper.getAuthProviderFromEnviron()) diff --git a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/ControlPlane.java b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/ControlPlane.java index 0c708ff..d1af298 100644 --- a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/ControlPlane.java +++ b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/ControlPlane.java @@ -12,7 +12,6 @@ import org.slf4j.LoggerFactory; import tech.ydb.core.Result; import tech.ydb.core.grpc.GrpcTransport; -import tech.ydb.examples.SimpleExample; import tech.ydb.topic.TopicClient; import tech.ydb.topic.description.Codec; import tech.ydb.topic.description.Consumer; @@ -28,11 +27,11 @@ /** * @author Nikolay Perfilov */ -public class ControlPlane extends SimpleExample { +public class ControlPlane extends SimpleTopicExample { private static final Logger logger = LoggerFactory.getLogger(ControlPlane.class); @Override - protected void run(GrpcTransport transport, String pathPrefix) { + protected void run(GrpcTransport transport) { logger.info("ControlPlane run"); try (TopicClient topicClient = TopicClient.newClient(transport).build()) { diff --git a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/ReadAsync.java b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/ReadAsync.java index 3865802..74ee72a 100644 --- a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/ReadAsync.java +++ b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/ReadAsync.java @@ -9,7 +9,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import tech.ydb.core.grpc.GrpcTransport; -import tech.ydb.examples.SimpleExample; import tech.ydb.topic.TopicClient; import tech.ydb.topic.read.AsyncReader; import tech.ydb.topic.read.DecompressionException; @@ -28,16 +27,15 @@ /** * @author Nikolay Perfilov */ -public class ReadAsync extends SimpleExample { +public class ReadAsync extends SimpleTopicExample { private static final Logger logger = LoggerFactory.getLogger(ReadAsync.class); private static final long MAX_MEMORY_USAGE_BYTES = 500 * 1024 * 1024; // 500 Mb private static final int MESSAGES_COUNT = 5; private final CompletableFuture messageReceivedFuture = new CompletableFuture<>(); - private long lastSeqNo = -1; @Override - protected void run(GrpcTransport transport, String pathPrefix) { + protected void run(GrpcTransport transport) { try (TopicClient topicClient = TopicClient.newClient(transport) .setCompressionPoolThreadCount(8) @@ -108,13 +106,6 @@ public void onMessages(DataReceivedEvent event) { } else { logger.info("Message received. SeqNo={}, offset={}", message.getSeqNo(), message.getOffset()); } - if (lastSeqNo > message.getSeqNo()) { - logger.error("Received a message with seqNo {}. Previously got a message with seqNo {}", - message.getSeqNo(), lastSeqNo); - messageReceivedFuture.complete(null); - } else { - lastSeqNo = message.getSeqNo(); - } message.commit().thenRun(() -> { logger.info("Message committed"); if (messageCounter.incrementAndGet() >= MESSAGES_COUNT) { diff --git a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/ReadSync.java b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/ReadSync.java index d8a3401..dc7d4b6 100644 --- a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/ReadSync.java +++ b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/ReadSync.java @@ -7,7 +7,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import tech.ydb.core.grpc.GrpcTransport; -import tech.ydb.examples.SimpleExample; import tech.ydb.topic.TopicClient; import tech.ydb.topic.read.DecompressionException; import tech.ydb.topic.read.Message; @@ -18,11 +17,11 @@ /** * @author Nikolay Perfilov */ -public class ReadSync extends SimpleExample { +public class ReadSync extends SimpleTopicExample { private static final Logger logger = LoggerFactory.getLogger(ReadSync.class); @Override - protected void run(GrpcTransport transport, String pathPrefix) { + protected void run(GrpcTransport transport) { try (TopicClient topicClient = TopicClient.newClient(transport).build()) { diff --git a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/ReadWriteWorkload.java b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/ReadWriteWorkload.java index 8ed53cf..86429d5 100644 --- a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/ReadWriteWorkload.java +++ b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/ReadWriteWorkload.java @@ -19,7 +19,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import tech.ydb.core.grpc.GrpcTransport; -import tech.ydb.examples.SimpleExample; import tech.ydb.topic.TopicClient; import tech.ydb.topic.description.Codec; import tech.ydb.topic.read.AsyncReader; @@ -40,7 +39,7 @@ /** * @author Nikolay Perfilov */ -public class ReadWriteWorkload extends SimpleExample { +public class ReadWriteWorkload extends SimpleTopicExample { private static final Logger logger = LoggerFactory.getLogger(ReadWriteWorkload.class); private static final int WRITE_TIMEOUT_SECONDS = 60; private static final int MESSAGE_LENGTH_BYTES = 10_000_000; // 10 Mb @@ -53,12 +52,11 @@ public class ReadWriteWorkload extends SimpleExample { private final AtomicInteger messagesReceived = new AtomicInteger(0); private final AtomicInteger messagesCommitted = new AtomicInteger(0); private final AtomicLong bytesWritten = new AtomicLong(0); - private long lastSeqNo = -1; CountDownLatch writeFinishedLatch = new CountDownLatch(1); CountDownLatch readFinishedLatch = new CountDownLatch(1); @Override - protected void run(GrpcTransport transport, String pathPrefix) { + protected void run(GrpcTransport transport) { ExecutorService compressionExecutor = Executors.newFixedThreadPool(10); AtomicBoolean timeToStopWriting = new AtomicBoolean(false); @@ -154,9 +152,8 @@ protected void run(GrpcTransport transport, String pathPrefix) { }; Runnable readingThread = () -> { - String consumerName = "consumer1"; ReaderSettings readerSettings = ReaderSettings.newBuilder() - .setConsumerName(consumerName) + .setConsumerName(CONSUMER_NAME) .addTopic(TopicReadSettings.newBuilder() .setPath(TOPIC_NAME) .setReadFrom(Instant.now().minus(Duration.ofHours(24))) @@ -274,12 +271,6 @@ public void onMessages(DataReceivedEvent event) { } else { logger.debug("Message received. SeqNo={}, offset={}", message.getSeqNo(), message.getOffset()); } - if (lastSeqNo > message.getSeqNo()) { - logger.error("Received a message with seqNo {}. Previously got a message with seqNo {}", - message.getSeqNo(), lastSeqNo); - } else { - lastSeqNo = message.getSeqNo(); - } message.commit().thenRun(() -> { logger.trace("Message committed"); unreadMessagesCount.decrementAndGet(); diff --git a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/SimpleTopicExample.java b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/SimpleTopicExample.java new file mode 100644 index 0000000..8d49dbd --- /dev/null +++ b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/SimpleTopicExample.java @@ -0,0 +1,40 @@ +package tech.ydb.examples.topic; + +import tech.ydb.auth.iam.CloudAuthHelper; +import tech.ydb.core.grpc.GrpcTransport; + + +/** + * @author Nikolay Perfilov + */ +public abstract class SimpleTopicExample { + protected static final String TOPIC_NAME = System.getenv("YDB_TOPIC_NAME"); + protected static final String CONSUMER_NAME = System.getenv("YDB_CONSUMER_NAME"); + + protected void doMain(String[] args) { + if (args.length > 1) { + System.err.println("Too many arguments"); + return; + } + String connString; + if (args.length == 1) { + connString = args[0]; + } else { + System.err.println("Pass as an argument. " + + "Example: some.host.name.com:2135?database=/Root\n"); + return; + } + + System.out.println("connection-string: " + connString + "\n"); + + try (GrpcTransport transport = GrpcTransport.forConnectionString(connString) + .withAuthProvider(CloudAuthHelper.getAuthProviderFromEnviron()) + .build()) { + run(transport); + } catch (Throwable t) { + t.printStackTrace(); + } + } + + protected abstract void run(GrpcTransport transport); +} diff --git a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/WriteAsync.java b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/WriteAsync.java index 9cf4a8c..c5c60d7 100644 --- a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/WriteAsync.java +++ b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/WriteAsync.java @@ -10,7 +10,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import tech.ydb.core.grpc.GrpcTransport; -import tech.ydb.examples.SimpleExample; import tech.ydb.topic.TopicClient; import tech.ydb.topic.description.Codec; import tech.ydb.topic.settings.WriterSettings; @@ -22,13 +21,13 @@ /** * @author Nikolay Perfilov */ -public class WriteAsync extends SimpleExample { +public class WriteAsync extends SimpleTopicExample { private static final Logger logger = LoggerFactory.getLogger(WriteAsync.class); private static final int MESSAGES_COUNT = 5; private static final int WAIT_TIMEOUT_SECONDS = 60; @Override - protected void run(GrpcTransport transport, String pathPrefix) { + protected void run(GrpcTransport transport) { String producerId = "messageGroup1"; String messageGroupId = "messageGroup1"; diff --git a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/WriteSync.java b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/WriteSync.java index 2bdc213..d757c1d 100644 --- a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/WriteSync.java +++ b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/WriteSync.java @@ -7,7 +7,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import tech.ydb.core.grpc.GrpcTransport; -import tech.ydb.examples.SimpleExample; import tech.ydb.topic.TopicClient; import tech.ydb.topic.description.Codec; import tech.ydb.topic.settings.WriterSettings; @@ -17,16 +16,15 @@ /** * @author Nikolay Perfilov */ -public class WriteSync extends SimpleExample { +public class WriteSync extends SimpleTopicExample { private static final Logger logger = LoggerFactory.getLogger(WriteSync.class); @Override - protected void run(GrpcTransport transport, String pathPrefix) { + protected void run(GrpcTransport transport) { String producerId = "messageGroup1"; String messageGroupId = "messageGroup1"; try (TopicClient topicClient = TopicClient.newClient(transport).build()) { - WriterSettings settings = WriterSettings.newBuilder() .setTopicPath(TOPIC_NAME) .setProducerId(producerId) diff --git a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionReadAsync.java b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionReadAsync.java index ca76c3e..7e87b8a 100644 --- a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionReadAsync.java +++ b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionReadAsync.java @@ -1,8 +1,6 @@ package tech.ydb.examples.topic.transactions; import java.nio.charset.StandardCharsets; -import java.time.Duration; -import java.time.Instant; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; @@ -12,15 +10,14 @@ import tech.ydb.core.Result; import tech.ydb.core.Status; import tech.ydb.core.grpc.GrpcTransport; -import tech.ydb.examples.SimpleExample; -import tech.ydb.table.Session; -import tech.ydb.table.TableClient; -import tech.ydb.table.impl.PooledTableClient; -import tech.ydb.table.rpc.grpc.GrpcTableRpc; -import tech.ydb.table.transaction.TableTransaction; +import tech.ydb.examples.topic.SimpleTopicExample; +import tech.ydb.query.QueryClient; +import tech.ydb.query.QueryTransaction; +import tech.ydb.query.tools.SessionRetryContext; +import tech.ydb.table.query.Params; +import tech.ydb.table.values.PrimitiveValue; import tech.ydb.topic.TopicClient; import tech.ydb.topic.read.AsyncReader; -import tech.ydb.topic.read.DecompressionException; import tech.ydb.topic.read.Message; import tech.ydb.topic.read.PartitionSession; import tech.ydb.topic.read.events.DataReceivedEvent; @@ -37,53 +34,36 @@ /** * @author Nikolay Perfilov */ -public class TransactionReadAsync extends SimpleExample { +public class TransactionReadAsync extends SimpleTopicExample { private static final Logger logger = LoggerFactory.getLogger(TransactionReadAsync.class); - private static final long MAX_MEMORY_USAGE_BYTES = 500 * 1024 * 1024; // 500 Mb private static final int MESSAGES_COUNT = 1; private final CompletableFuture messageReceivedFuture = new CompletableFuture<>(); - private TableClient tableClient; + private QueryClient queryClient; private AsyncReader reader; @Override - protected void run(GrpcTransport transport, String pathPrefix) { - tableClient = PooledTableClient.newClient(GrpcTableRpc.useTransport(transport)).build(); - - - try (TopicClient topicClient = TopicClient.newClient(transport) - .setCompressionPoolThreadCount(8) - .build()) { - ReaderSettings readerSettings = ReaderSettings.newBuilder() - .setConsumerName(CONSUMER_NAME) - .addTopic(TopicReadSettings.newBuilder() - .setPath(TOPIC_NAME) - .setReadFrom(Instant.now().minus(Duration.ofHours(24))) - .setMaxLag(Duration.ofMinutes(30)) - .build()) - .setMaxMemoryUsageBytes(MAX_MEMORY_USAGE_BYTES) - .build(); - - ReadEventHandlersSettings handlerSettings = ReadEventHandlersSettings.newBuilder() - .setEventHandler(new Handler()) - .build(); - - reader = topicClient.createAsyncReader(readerSettings, handlerSettings); - - reader.init(); - - messageReceivedFuture.join(); - - reader.shutdown().join(); - tableClient.close(); - } - } - - public static void analyzeCommitStatus(Status status) { - if (status.isSuccess()) { - logger.info("Transaction committed successfully"); - } else { - logger.error("Failed to commit transaction: {}", status); + protected void run(GrpcTransport transport) { + // WARNING: Working with transactions in Java Topic SDK is currently experimental. Interfaces may change + try (TopicClient topicClient = TopicClient.newClient(transport).build()) { + try (QueryClient queryClient = QueryClient.newClient(transport).build()) { + this.queryClient = queryClient; + ReaderSettings readerSettings = ReaderSettings.newBuilder() + .setConsumerName(CONSUMER_NAME) + .addTopic(TopicReadSettings.newBuilder() + .setPath(TOPIC_NAME) + .build()) + .build(); + + ReadEventHandlersSettings handlerSettings = ReadEventHandlersSettings.newBuilder() + .setEventHandler(new Handler()) + .build(); + + reader = topicClient.createAsyncReader(readerSettings, handlerSettings); + reader.init(); + messageReceivedFuture.join(); + reader.shutdown().join(); + } } } @@ -93,69 +73,40 @@ private class Handler extends AbstractReadEventHandler { @Override public void onMessages(DataReceivedEvent event) { for (Message message : event.getMessages()) { - StringBuilder str = new StringBuilder("Message received"); - - if (logger.isTraceEnabled()) { - byte[] messageData; - try { - messageData = message.getData(); - } catch (DecompressionException e) { - logger.warn("Decompression exception while receiving a message: ", e); - messageData = e.getRawData(); - } - str.append(": \"").append(new String(messageData, StandardCharsets.UTF_8)).append("\""); - } - str.append("\n"); - if (logger.isDebugEnabled()) { - str.append(" offset: ").append(message.getOffset()).append("\n") - .append(" seqNo: ").append(message.getSeqNo()).append("\n") - .append(" createdAt: ").append(message.getCreatedAt()).append("\n") - .append(" messageGroupId: ").append(message.getMessageGroupId()).append("\n") - .append(" producerId: ").append(message.getProducerId()).append("\n") - .append(" writtenAt: ").append(message.getWrittenAt()).append("\n") - .append(" partitionSession: ").append(message.getPartitionSession().getId()).append("\n") - .append(" partitionId: ").append(message.getPartitionSession().getPartitionId()) - .append("\n"); - if (!message.getWriteSessionMeta().isEmpty()) { - str.append(" writeSessionMeta:\n"); - message.getWriteSessionMeta().forEach((key, value) -> - str.append(" ").append(key).append(": ").append(value).append("\n")); - } - if (logger.isTraceEnabled()) { - logger.trace(str.toString()); - } else { - logger.debug(str.toString()); + SessionRetryContext retryCtx = SessionRetryContext.create(queryClient).build(); + + retryCtx.supplyStatus(querySession -> { + QueryTransaction transaction = querySession.beginTransaction(TxMode.SERIALIZABLE_RW) + .join().getValue(); + + // Update offsets in transaction + Status updateStatus = reader.updateOffsetsInTransaction(transaction, message.getPartitionOffsets(), + new UpdateOffsetsInTransactionSettings.Builder().build()) + // Do not commit transaction without waiting for updateOffsetsInTransaction result + .join(); + if (!updateStatus.isSuccess()) { + // Return update status to SessionRetryContext function + return CompletableFuture.completedFuture(updateStatus); } - } else { - logger.info("Message received. SeqNo={}, offset={}", message.getSeqNo(), message.getOffset()); - } - // creating session and transaction - Result sessionResult = tableClient.createSession(Duration.ofSeconds(10)).join(); - if (!sessionResult.isSuccess()) { - logger.error("Couldn't get a session from the pool: {}", sessionResult); - return; // retry or shutdown - } - Session session = sessionResult.getValue(); - TableTransaction transaction = session.beginTransaction(TxMode.SERIALIZABLE_RW) - .join() - .getValue(); - - // do something else in transaction - transaction.executeDataQuery("SELECT 1").join(); - // analyzeQueryResultIfNeeded(); - - Status updateStatus = reader.updateOffsetsInTransaction(transaction, - message.getPartitionOffsets(), new UpdateOffsetsInTransactionSettings.Builder().build()) - // Do not commit transaction without waiting for updateOffsetsInTransaction result - .join(); - if (!updateStatus.isSuccess()) { - logger.error("Couldn't update offsets in transaction: {}", updateStatus); - return; // retry or shutdown - } + // Execute a query in transaction + Status queryStatus = transaction.createQuery( + "DECLARE $id AS Uint64; \n" + + "DECLARE $value AS Text;\n" + + "UPSERT INTO table (id, value) VALUES ($id, $value)", + Params.of("$id", PrimitiveValue.newUint64(message.getOffset()), + "$value", PrimitiveValue.newText(new String(message.getData(), + StandardCharsets.UTF_8)))) + .execute().join().getStatus(); + + if (!queryStatus.isSuccess()) { + // Return query status to SessionRetryContext function + return CompletableFuture.completedFuture(queryStatus); + } - Status commitStatus = transaction.commit().join(); - analyzeCommitStatus(commitStatus); + // Return commit status to SessionRetryContext function + return transaction.commit().thenApply(Result::getStatus); + }).join().expectSuccess("Couldn't read from topic and write to table in transaction"); if (messageCounter.incrementAndGet() >= MESSAGES_COUNT) { logger.info("{} messages committed in transaction. Finishing reading.", MESSAGES_COUNT); diff --git a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionReadSync.java b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionReadSync.java index 974a49d..673bd3a 100644 --- a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionReadSync.java +++ b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionReadSync.java @@ -1,20 +1,19 @@ package tech.ydb.examples.topic.transactions; import java.nio.charset.StandardCharsets; -import java.time.Duration; -import java.time.Instant; +import java.util.concurrent.CompletableFuture; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import tech.ydb.common.transaction.TxMode; import tech.ydb.core.Result; +import tech.ydb.core.Status; import tech.ydb.core.grpc.GrpcTransport; -import tech.ydb.examples.SimpleExample; -import tech.ydb.table.Session; -import tech.ydb.table.TableClient; -import tech.ydb.table.transaction.TableTransaction; +import tech.ydb.examples.topic.SimpleTopicExample; +import tech.ydb.query.QueryClient; +import tech.ydb.query.QueryTransaction; +import tech.ydb.query.tools.SessionRetryContext; +import tech.ydb.table.query.Params; +import tech.ydb.table.values.PrimitiveValue; import tech.ydb.topic.TopicClient; -import tech.ydb.topic.read.DecompressionException; import tech.ydb.topic.read.Message; import tech.ydb.topic.read.SyncReader; import tech.ydb.topic.settings.ReaderSettings; @@ -24,61 +23,50 @@ /** * @author Nikolay Perfilov */ -public class TransactionReadSync extends SimpleExample { - private static final Logger logger = LoggerFactory.getLogger(TransactionReadSync.class); +public class TransactionReadSync extends SimpleTopicExample { @Override - protected void run(GrpcTransport transport, String pathPrefix) { - + protected void run(GrpcTransport transport) { + // WARNING: Working with transactions in Java Topic SDK is currently experimental. Interfaces may change try (TopicClient topicClient = TopicClient.newClient(transport).build()) { - try (TableClient tableClient = TableClient.newClient(transport).build()) { + try (QueryClient queryClient = QueryClient.newClient(transport).build()) { ReaderSettings settings = ReaderSettings.newBuilder() .setConsumerName(CONSUMER_NAME) .addTopic(TopicReadSettings.newBuilder() .setPath(TOPIC_NAME) - .setReadFrom(Instant.now().minus(Duration.ofHours(24))) - .setMaxLag(Duration.ofMinutes(30)) .build()) .build(); SyncReader reader = topicClient.createSyncReader(settings); - - // Init in background reader.init(); - try { - // creating session and transaction - Result sessionResult = tableClient.createSession(Duration.ofSeconds(10)).join(); - if (!sessionResult.isSuccess()) { - logger.error("Couldn't a get session from the pool: {}", sessionResult); - return; // retry or shutdown - } - Session session = sessionResult.getValue(); - TableTransaction transaction = session.createNewTransaction(TxMode.SERIALIZABLE_RW); + SessionRetryContext retryCtx = SessionRetryContext.create(queryClient).build(); - // do something else in transaction - transaction.executeDataQuery("SELECT 1").join(); - // analyzeQueryResultIfNeeded(); - - //Session session - Message message = reader.receive(ReceiveSettings.newBuilder() - .setTransaction(transaction) - .build()); - byte[] messageData; + retryCtx.supplyStatus(querySession -> { + QueryTransaction transaction = querySession.beginTransaction(TxMode.SERIALIZABLE_RW).join().getValue(); + Message message; try { - messageData = message.getData(); - } catch (DecompressionException e) { - logger.warn("Decompression exception while receiving a message: ", e); - messageData = e.getRawData(); + message = reader.receive(ReceiveSettings.newBuilder() + .setTransaction(transaction) + .build()); + } catch (InterruptedException exception) { + throw new RuntimeException("Interrupted exception while waiting for message"); } - logger.info("Message received: {}", new String(messageData, StandardCharsets.UTF_8)); - transaction.commit().join(); - // analyze commit status - } catch (InterruptedException exception) { - logger.error("Interrupted exception while waiting for message: ", exception); - } + Status queryStatus = transaction.createQuery( + "DECLARE $id AS Uint64; \n" + + "DECLARE $value AS Text;\n" + + "UPSERT INTO table (id, value) VALUES ($id, $value)", + Params.of("$id", PrimitiveValue.newUint64(message.getOffset()), + "$value", PrimitiveValue.newText(new String(message.getData(), + StandardCharsets.UTF_8)))) + .execute().join().getStatus(); + if (!queryStatus.isSuccess()) { + return CompletableFuture.completedFuture(queryStatus); + } + return transaction.commit().thenApply(Result::getStatus); + }).join().expectSuccess("Couldn't read from topic and write to table in transaction"); reader.shutdown(); } } diff --git a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionWriteAsync.java b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionWriteAsync.java index f9b5c67..aea3e1b 100644 --- a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionWriteAsync.java +++ b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionWriteAsync.java @@ -1,154 +1,100 @@ package tech.ydb.examples.topic.transactions; -import java.time.Duration; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import tech.ydb.common.transaction.TxMode; import tech.ydb.core.Result; +import tech.ydb.core.Status; import tech.ydb.core.grpc.GrpcTransport; -import tech.ydb.examples.SimpleExample; -import tech.ydb.table.Session; -import tech.ydb.table.TableClient; -import tech.ydb.table.query.DataQueryResult; +import tech.ydb.examples.topic.SimpleTopicExample; +import tech.ydb.query.QueryClient; +import tech.ydb.query.QueryStream; +import tech.ydb.query.QueryTransaction; +import tech.ydb.query.tools.QueryReader; +import tech.ydb.query.tools.SessionRetryContext; +import tech.ydb.table.query.Params; import tech.ydb.table.result.ResultSetReader; -import tech.ydb.table.transaction.TableTransaction; +import tech.ydb.table.values.PrimitiveValue; import tech.ydb.topic.TopicClient; -import tech.ydb.topic.description.Codec; import tech.ydb.topic.settings.SendSettings; import tech.ydb.topic.settings.WriterSettings; import tech.ydb.topic.write.AsyncWriter; import tech.ydb.topic.write.Message; import tech.ydb.topic.write.QueueOverflowException; -import tech.ydb.topic.write.WriteAck; /** * @author Nikolay Perfilov */ -public class TransactionWriteAsync extends SimpleExample { - private static final Logger logger = LoggerFactory.getLogger(TransactionWriteAsync.class); +public class TransactionWriteAsync extends SimpleTopicExample { + private static final long SHUTDOWN_TIMEOUT_SECONDS = 10; @Override - protected void run(GrpcTransport transport, String pathPrefix) { - String producerId = "messageGroup1"; - String messageGroupId = "messageGroup1"; - - ExecutorService compressionExecutor = Executors.newFixedThreadPool(10); - try (TopicClient topicClient = TopicClient.newClient(transport) - .setCompressionExecutor(compressionExecutor) - .build()) { - try (TableClient tableClient = TableClient.newClient(transport).build()) { - - WriterSettings settings = WriterSettings.newBuilder() + protected void run(GrpcTransport transport) { + // WARNING: Working with transactions in Java Topic SDK is currently experimental. Interfaces may change + try (TopicClient topicClient = TopicClient.newClient(transport).build()) { + try (QueryClient queryClient = QueryClient.newClient(transport).build()) { + long id = 2; + String randomProducerId = "randomProducerId"; // Different for writers with different transactions + WriterSettings writerSettings = WriterSettings.newBuilder() .setTopicPath(TOPIC_NAME) - .setProducerId(producerId) - .setMessageGroupId(messageGroupId) - .setCodec(Codec.GZIP) - .setMaxSendBufferMemorySize(50 * 1024 * 1024) + .setProducerId(randomProducerId) + .setMessageGroupId(randomProducerId) .build(); - AsyncWriter writer = topicClient.createAsyncWriter(settings); - - // Init in background - writer.init() - .thenRun(() -> logger.info("Init finished successfully")) - .exceptionally(ex -> { - logger.error("Init failed with ex: ", ex); - return null; - }); - - // creating session and transaction - Result sessionResult = tableClient.createSession(Duration.ofSeconds(10)).join(); - if (!sessionResult.isSuccess()) { - logger.error("Couldn't get a session from the pool: {}", sessionResult); - return; // retry or shutdown - } - Session session = sessionResult.getValue(); - TableTransaction transaction = session.createNewTransaction(TxMode.SERIALIZABLE_RW); - - // get message text within the transaction - Result dataQueryResult = transaction.executeDataQuery("SELECT \"Hello, world!\";") - .join(); - if (!dataQueryResult.isSuccess()) { - logger.error("Couldn't execute DataQuery: {}", dataQueryResult); - return; // retry or shutdown - } - ResultSetReader rsReader = dataQueryResult.getValue().getResultSet(0); - byte[] message; - if (rsReader.next()) { - message = rsReader.getColumn(0).getBytes(); - } else { - logger.error("Empty DataQuery result"); - return; // retry or shutdown - } - - try { - // Blocks until the message is put into sending buffer - writer.send(Message.newBuilder() - .setData(message) - .build(), - SendSettings.newBuilder() - .setTransaction(transaction) - .build()) - .whenComplete((result, ex) -> { - if (ex != null) { - logger.error("Exception while sending a message: ", ex); - } else { - logger.info("Message ack received"); - - switch (result.getState()) { - case WRITTEN: - WriteAck.Details details = result.getDetails(); - logger.info("Message was written successfully, offset: " + - details.getOffset()); - break; - case ALREADY_WRITTEN: - logger.warn("Message was already written"); - break; - default: - break; - } - } - }) - // Waiting for the message to reach the server before committing the transaction - .join(); - - logger.info("Message is sent"); - - transaction.commit().whenComplete((status, throwable) -> { - if (throwable != null) { - logger.error("Exception while committing transaction with message: ", throwable); - } else { - if (status.isSuccess()) { - logger.info("Transaction with message committed successfully"); - } else { - logger.error("Failed to commit transaction with message: {}", status); - } - } - }); - } catch (QueueOverflowException exception) { - logger.error("Queue overflow exception while sending a message: ", exception); - // Send queue is full. Need to retry with backoff or skip - } - - long timeoutSeconds = 10; - try { - writer.shutdown().get(timeoutSeconds, TimeUnit.SECONDS); - } catch (TimeoutException exception) { - logger.error("Timeout exception during writer termination ({} seconds): ", timeoutSeconds, exception); - } catch (ExecutionException exception) { - logger.error("Execution exception during writer termination: ", exception); - } catch (InterruptedException exception) { - logger.error("Writer termination was interrupted: ", exception); - } + SessionRetryContext retryCtx = SessionRetryContext.create(queryClient).build(); + retryCtx.supplyStatus(querySession -> { + QueryTransaction transaction = querySession.beginTransaction(TxMode.SERIALIZABLE_RW) + .join().getValue(); + + QueryStream queryStream = transaction.createQuery( + "DECLARE $id AS Uint64;\n" + + "SELECT value FROM table WHERE id=$id", + Params.of("$id", PrimitiveValue.newUint64(id))); + QueryReader queryReader = QueryReader.readFrom(queryStream).join().getValue(); + ResultSetReader resultSet = queryReader.getResultSet(0); + if (!resultSet.next()) { + throw new RuntimeException("Value for id=" + id + " not found"); + } + String value = resultSet.getColumn("value").getText(); + + // Current implementation requires creating a writer for every transaction: + AsyncWriter writer = topicClient.createAsyncWriter(writerSettings); + writer.init(); + System.err.println("writer initialized, value: " + value); + try { + writer.send(Message.of(value.getBytes()), + SendSettings.newBuilder() + .setTransaction(transaction) + .build()) + .join(); // Waiting for WriteAck before committing transaction + } catch (QueueOverflowException exception) { + // Send queue is full. Need to retry with backoff or skip + throw new RuntimeException("Couldn't add message to SDK buffer", exception); + } + CompletableFuture commitStatus = transaction.commit().thenApply(Result::getStatus); + commitStatus.join(); + try { + System.err.println("Commit status: " + commitStatus.get()); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + + try { + writer.shutdown().get(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } catch (TimeoutException exception) { + throw new RuntimeException("Shutdown not finished within " + SHUTDOWN_TIMEOUT_SECONDS + + " seconds"); + } catch (InterruptedException | ExecutionException exception) { + throw new RuntimeException("Shutdown not finished due to exception: " + exception); + } + + return commitStatus; + }).join().expectSuccess("Couldn't read from table and write to topic in transaction"); } } - compressionExecutor.shutdown(); } public static void main(String[] args) { diff --git a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionWriteSync.java b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionWriteSync.java index e951869..25dd304 100644 --- a/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionWriteSync.java +++ b/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionWriteSync.java @@ -1,23 +1,24 @@ package tech.ydb.examples.topic.transactions; -import java.time.Duration; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import tech.ydb.common.transaction.TxMode; import tech.ydb.core.Result; +import tech.ydb.core.Status; import tech.ydb.core.grpc.GrpcTransport; -import tech.ydb.examples.SimpleExample; -import tech.ydb.table.Session; -import tech.ydb.table.TableClient; -import tech.ydb.table.query.DataQueryResult; +import tech.ydb.examples.topic.SimpleTopicExample; +import tech.ydb.query.QueryClient; +import tech.ydb.query.QueryStream; +import tech.ydb.query.QueryTransaction; +import tech.ydb.query.tools.QueryReader; +import tech.ydb.query.tools.SessionRetryContext; +import tech.ydb.table.query.Params; import tech.ydb.table.result.ResultSetReader; -import tech.ydb.table.transaction.TableTransaction; +import tech.ydb.table.values.PrimitiveValue; import tech.ydb.topic.TopicClient; -import tech.ydb.topic.description.Codec; import tech.ydb.topic.settings.SendSettings; import tech.ydb.topic.settings.WriterSettings; import tech.ydb.topic.write.Message; @@ -26,92 +27,64 @@ /** * @author Nikolay Perfilov */ -public class TransactionWriteSync extends SimpleExample { - private static final Logger logger = LoggerFactory.getLogger(TransactionWriteSync.class); +public class TransactionWriteSync extends SimpleTopicExample { + private static final long SHUTDOWN_TIMEOUT_SECONDS = 10; @Override - protected void run(GrpcTransport transport, String pathPrefix) { - String producerId = "messageGroup1"; - String messageGroupId = "messageGroup1"; - + protected void run(GrpcTransport transport) { + // WARNING: Working with transactions in Java Topic SDK is currently experimental. Interfaces may change try (TopicClient topicClient = TopicClient.newClient(transport).build()) { - try (TableClient tableClient = TableClient.newClient(transport).build()) { - WriterSettings settings = WriterSettings.newBuilder() + try (QueryClient queryClient = QueryClient.newClient(transport).build()) { + long id = 1; + String randomProducerId = "randomProducerId"; // Different for writers with different transactions + WriterSettings writerSettings = WriterSettings.newBuilder() .setTopicPath(TOPIC_NAME) - .setProducerId(producerId) - .setMessageGroupId(messageGroupId) - .setCodec(Codec.ZSTD) - .setMaxSendBufferMessagesCount(100) + .setProducerId(randomProducerId) + .setMessageGroupId(randomProducerId) .build(); - SyncWriter writer = topicClient.createSyncWriter(settings); - - logger.info("SyncWriter created "); + SessionRetryContext retryCtx = SessionRetryContext.create(queryClient).build(); + retryCtx.supplyStatus(querySession -> { + QueryTransaction transaction = querySession.beginTransaction(TxMode.SERIALIZABLE_RW) + .join().getValue(); + String value; + QueryStream queryStream = transaction.createQuery( + "DECLARE $id AS Uint64;\n" + + "SELECT value FROM table WHERE id=$id", + Params.of("$id", PrimitiveValue.newUint64(1))); + QueryReader queryReader = QueryReader.readFrom(queryStream).join().getValue(); + ResultSetReader resultSet = queryReader.getResultSet(0); + if (!resultSet.next()) { + throw new RuntimeException("Value for id=" + id + " not found"); + } + value = resultSet.getColumn("value").getText(); - try { + // Current implementation requires creating a writer for every transaction: + SyncWriter writer = topicClient.createSyncWriter(writerSettings); writer.initAndWait(); - logger.info("Init finished"); - } catch (Exception exception) { - logger.error("Exception while initializing writer: ", exception); - return; - } - - long timeoutSeconds = 5; // How long should we wait for a message to be put into sending buffer - // creating session and transaction - Result sessionResult = tableClient.createSession(Duration.ofSeconds(10)).join(); - if (!sessionResult.isSuccess()) { - logger.error("Couldn't get a session from the pool: {}", sessionResult); - return; // retry or shutdown - } - Session session = sessionResult.getValue(); - TableTransaction transaction = session.createNewTransaction(TxMode.SERIALIZABLE_RW); - - // get message text within the transaction - Result dataQueryResult = transaction.executeDataQuery("SELECT \"Hello, world!\";") - .join(); - if (!dataQueryResult.isSuccess()) { - logger.error("Couldn't execute DataQuery: {}", dataQueryResult); - return; // retry or shutdown - } - ResultSetReader rsReader = dataQueryResult.getValue().getResultSet(0); - byte[] message; - if (rsReader.next()) { - message = rsReader.getColumn(0).getBytes(); - } else { - logger.error("Empty DataQuery result"); - return; // retry or shutdown - } - try { - // Non-blocking call writer.send( - Message.of(message), + Message.of(value.getBytes()), SendSettings.newBuilder() .setTransaction(transaction) - .build(), - timeoutSeconds, - TimeUnit.SECONDS + .build() ); - logger.info("Message is sent"); - } catch (TimeoutException exception) { - logger.error("Send queue is full. Couldn't put message into sending queue within {} seconds", - timeoutSeconds); - } catch (InterruptedException | ExecutionException exception) { - logger.error("Couldn't put message into sending queue due to exception: ", exception); - } - // flush to wait until the message reach server before commit - writer.flush(); + writer.flush(); + + CompletableFuture commitStatus = transaction.commit().thenApply(Result::getStatus); + commitStatus.join(); - transaction.commit().join(); + try { + writer.shutdown(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } catch (TimeoutException exception) { + throw new RuntimeException("Shutdown not finished within " + SHUTDOWN_TIMEOUT_SECONDS + + " seconds"); + } catch (InterruptedException | ExecutionException exception) { + throw new RuntimeException("Shutdown not finished due to exception: " + exception); + } - long shutdownTimeoutSeconds = 10; - try { - writer.shutdown(shutdownTimeoutSeconds, TimeUnit.SECONDS); - } catch (TimeoutException exception) { - logger.error("Shutdown was not finished within {} seconds: ", timeoutSeconds, exception); - } catch (InterruptedException | ExecutionException exception) { - logger.error("Shutdown was not finished due to exception: ", exception); - } + return commitStatus; + }).join().expectSuccess("Couldn't read from table and write to topic in transaction"); } } } diff --git a/ydb-cookbook/src/main/resources/log4j2.xml b/ydb-cookbook/src/main/resources/log4j2.xml index 4019365..d25e131 100644 --- a/ydb-cookbook/src/main/resources/log4j2.xml +++ b/ydb-cookbook/src/main/resources/log4j2.xml @@ -28,11 +28,11 @@ - + - +