Skip to content

Update transaction topic examples #48

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Mar 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions jdbc/spring-jooq/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@
<configuration>
<environmentVariables>
<TESTCONTAINERS_REUSE_ENABLE>true</TESTCONTAINERS_REUSE_ENABLE>
<YDB_DOCKER_IMAGE>cr.yandex/yc/yandex-docker-local-ydb:trunk</YDB_DOCKER_IMAGE>
</environmentVariables>
</configuration>
</plugin>
Expand Down Expand Up @@ -210,4 +209,4 @@
</plugin>
</plugins>
</build>
</project>
</project>
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<log4j.version>2.22.1</log4j.version>
<jcommander.version>1.82</jcommander.version>

<ydb.sdk.version>2.2.10</ydb.sdk.version>
<ydb.sdk.version>2.3.10</ydb.sdk.version>
</properties>

<modules>
Expand Down
1 change: 0 additions & 1 deletion query-example/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<environmentVariables>
<YDB_DOCKER_IMAGE>cr.yandex/yc/yandex-docker-local-ydb:trunk</YDB_DOCKER_IMAGE>
<TESTCONTAINERS_REUSE_ENABLE>true</TESTCONTAINERS_REUSE_ENABLE>
<YDB_ANONYMOUS_CREDENTIALS>1</YDB_ANONYMOUS_CREDENTIALS>
</environmentVariables>
Expand Down
3 changes: 3 additions & 0 deletions query-example/src/main/java/tech/ydb/example/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ private void upsertTablesData() {

// Upsert list of series to table
retryCtx.supplyResult(session -> session.createQuery(
"DECLARE $values AS " + ListType.of(seriesType) + ";" +
"UPSERT INTO series SELECT * FROM AS_TABLE($values)",
TxMode.SERIALIZABLE_RW,
Params.of("$values", seriesData)
Expand Down Expand Up @@ -146,6 +147,7 @@ private void upsertTablesData() {

// Upsert list of seasons to table
retryCtx.supplyResult(session -> session.createQuery(
"DECLARE $values AS " + ListType.of(seasonType) + ";" +
"UPSERT INTO seasons SELECT * FROM AS_TABLE($values)",
TxMode.SERIALIZABLE_RW,
Params.of("$values", seasonsData)
Expand Down Expand Up @@ -173,6 +175,7 @@ private void upsertTablesData() {

// Upsert list of series to episodes
retryCtx.supplyResult(session -> session.createQuery(
"DECLARE $values AS " + ListType.of(episodeType) + ";" +
"UPSERT INTO episodes SELECT * FROM AS_TABLE($values)",
TxMode.SERIALIZABLE_RW,
Params.of("$values", episodesData)
Expand Down
4 changes: 4 additions & 0 deletions ydb-cookbook/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-scheme</artifactId>
</dependency>
<dependency>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-query</artifactId>
</dependency>
<dependency>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-topic</artifactId>
Expand Down
21 changes: 4 additions & 17 deletions ydb-cookbook/src/main/java/tech/ydb/examples/SimpleExample.java
Original file line number Diff line number Diff line change
@@ -1,28 +1,14 @@
package tech.ydb.examples;

import java.time.Duration;

import tech.ydb.auth.iam.CloudAuthHelper;
import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.scheme.SchemeClient;
import tech.ydb.scheme.description.DescribePathResult;
import tech.ydb.topic.TopicClient;
import tech.ydb.topic.description.Codec;
import tech.ydb.topic.description.Consumer;
import tech.ydb.topic.description.SupportedCodecs;
import tech.ydb.topic.settings.CreateTopicSettings;


/**
* @author Sergey Polovko
* @author Nikolay Perfilov
*/
public abstract class SimpleExample {
protected static final String TOPIC_NAME = System.getenv("YDB_TOPIC_NAME");
protected static final String CONSUMER_NAME = System.getenv("YDB_CONSUMER_NAME");

protected void doMain(String[] args) {
if (args.length > 1) {
System.err.println("Too many arguments");
Expand All @@ -32,11 +18,12 @@ protected void doMain(String[] args) {
if (args.length == 1) {
connString = args[0];
} else {
connString = "some.host.name.com:2135?database=/Root";
System.err.println("Pass <connection-string> as argument to override connection settings\n");
System.err.println("Pass <connection-string> as an argument. " +
"Example: some.host.name.com:2135?database=/Root\n");
return;
}

System.err.println("connection-string: " + connString + "\n");
System.out.println("connection-string: " + connString + "\n");

try (GrpcTransport transport = GrpcTransport.forConnectionString(connString)
.withAuthProvider(CloudAuthHelper.getAuthProviderFromEnviron())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.slf4j.LoggerFactory;
import tech.ydb.core.Result;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.examples.SimpleExample;
import tech.ydb.topic.TopicClient;
import tech.ydb.topic.description.Codec;
import tech.ydb.topic.description.Consumer;
Expand All @@ -28,11 +27,11 @@
/**
* @author Nikolay Perfilov
*/
public class ControlPlane extends SimpleExample {
public class ControlPlane extends SimpleTopicExample {
private static final Logger logger = LoggerFactory.getLogger(ControlPlane.class);

@Override
protected void run(GrpcTransport transport, String pathPrefix) {
protected void run(GrpcTransport transport) {
logger.info("ControlPlane run");
try (TopicClient topicClient = TopicClient.newClient(transport).build()) {

Expand Down
13 changes: 2 additions & 11 deletions ydb-cookbook/src/main/java/tech/ydb/examples/topic/ReadAsync.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.examples.SimpleExample;
import tech.ydb.topic.TopicClient;
import tech.ydb.topic.read.AsyncReader;
import tech.ydb.topic.read.DecompressionException;
Expand All @@ -28,16 +27,15 @@
/**
* @author Nikolay Perfilov
*/
public class ReadAsync extends SimpleExample {
public class ReadAsync extends SimpleTopicExample {
private static final Logger logger = LoggerFactory.getLogger(ReadAsync.class);
private static final long MAX_MEMORY_USAGE_BYTES = 500 * 1024 * 1024; // 500 Mb
private static final int MESSAGES_COUNT = 5;

private final CompletableFuture<Void> messageReceivedFuture = new CompletableFuture<>();
private long lastSeqNo = -1;

@Override
protected void run(GrpcTransport transport, String pathPrefix) {
protected void run(GrpcTransport transport) {

try (TopicClient topicClient = TopicClient.newClient(transport)
.setCompressionPoolThreadCount(8)
Expand Down Expand Up @@ -108,13 +106,6 @@ public void onMessages(DataReceivedEvent event) {
} else {
logger.info("Message received. SeqNo={}, offset={}", message.getSeqNo(), message.getOffset());
}
if (lastSeqNo > message.getSeqNo()) {
logger.error("Received a message with seqNo {}. Previously got a message with seqNo {}",
message.getSeqNo(), lastSeqNo);
messageReceivedFuture.complete(null);
} else {
lastSeqNo = message.getSeqNo();
}
message.commit().thenRun(() -> {
logger.info("Message committed");
if (messageCounter.incrementAndGet() >= MESSAGES_COUNT) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.examples.SimpleExample;
import tech.ydb.topic.TopicClient;
import tech.ydb.topic.read.DecompressionException;
import tech.ydb.topic.read.Message;
Expand All @@ -18,11 +17,11 @@
/**
* @author Nikolay Perfilov
*/
public class ReadSync extends SimpleExample {
public class ReadSync extends SimpleTopicExample {
private static final Logger logger = LoggerFactory.getLogger(ReadSync.class);

@Override
protected void run(GrpcTransport transport, String pathPrefix) {
protected void run(GrpcTransport transport) {

try (TopicClient topicClient = TopicClient.newClient(transport).build()) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.examples.SimpleExample;
import tech.ydb.topic.TopicClient;
import tech.ydb.topic.description.Codec;
import tech.ydb.topic.read.AsyncReader;
Expand All @@ -40,7 +39,7 @@
/**
* @author Nikolay Perfilov
*/
public class ReadWriteWorkload extends SimpleExample {
public class ReadWriteWorkload extends SimpleTopicExample {
private static final Logger logger = LoggerFactory.getLogger(ReadWriteWorkload.class);
private static final int WRITE_TIMEOUT_SECONDS = 60;
private static final int MESSAGE_LENGTH_BYTES = 10_000_000; // 10 Mb
Expand All @@ -53,12 +52,11 @@ public class ReadWriteWorkload extends SimpleExample {
private final AtomicInteger messagesReceived = new AtomicInteger(0);
private final AtomicInteger messagesCommitted = new AtomicInteger(0);
private final AtomicLong bytesWritten = new AtomicLong(0);
private long lastSeqNo = -1;
CountDownLatch writeFinishedLatch = new CountDownLatch(1);
CountDownLatch readFinishedLatch = new CountDownLatch(1);

@Override
protected void run(GrpcTransport transport, String pathPrefix) {
protected void run(GrpcTransport transport) {

ExecutorService compressionExecutor = Executors.newFixedThreadPool(10);
AtomicBoolean timeToStopWriting = new AtomicBoolean(false);
Expand Down Expand Up @@ -154,9 +152,8 @@ protected void run(GrpcTransport transport, String pathPrefix) {
};

Runnable readingThread = () -> {
String consumerName = "consumer1";
ReaderSettings readerSettings = ReaderSettings.newBuilder()
.setConsumerName(consumerName)
.setConsumerName(CONSUMER_NAME)
.addTopic(TopicReadSettings.newBuilder()
.setPath(TOPIC_NAME)
.setReadFrom(Instant.now().minus(Duration.ofHours(24)))
Expand Down Expand Up @@ -274,12 +271,6 @@ public void onMessages(DataReceivedEvent event) {
} else {
logger.debug("Message received. SeqNo={}, offset={}", message.getSeqNo(), message.getOffset());
}
if (lastSeqNo > message.getSeqNo()) {
logger.error("Received a message with seqNo {}. Previously got a message with seqNo {}",
message.getSeqNo(), lastSeqNo);
} else {
lastSeqNo = message.getSeqNo();
}
message.commit().thenRun(() -> {
logger.trace("Message committed");
unreadMessagesCount.decrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package tech.ydb.examples.topic;

import tech.ydb.auth.iam.CloudAuthHelper;
import tech.ydb.core.grpc.GrpcTransport;


/**
* @author Nikolay Perfilov
*/
public abstract class SimpleTopicExample {
protected static final String TOPIC_NAME = System.getenv("YDB_TOPIC_NAME");
protected static final String CONSUMER_NAME = System.getenv("YDB_CONSUMER_NAME");

protected void doMain(String[] args) {
if (args.length > 1) {
System.err.println("Too many arguments");
return;
}
String connString;
if (args.length == 1) {
connString = args[0];
} else {
System.err.println("Pass <connection-string> as an argument. " +
"Example: some.host.name.com:2135?database=/Root\n");
return;
}

System.out.println("connection-string: " + connString + "\n");

try (GrpcTransport transport = GrpcTransport.forConnectionString(connString)
.withAuthProvider(CloudAuthHelper.getAuthProviderFromEnviron())
.build()) {
run(transport);
} catch (Throwable t) {
t.printStackTrace();
}
}

protected abstract void run(GrpcTransport transport);
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.examples.SimpleExample;
import tech.ydb.topic.TopicClient;
import tech.ydb.topic.description.Codec;
import tech.ydb.topic.settings.WriterSettings;
Expand All @@ -22,13 +21,13 @@
/**
* @author Nikolay Perfilov
*/
public class WriteAsync extends SimpleExample {
public class WriteAsync extends SimpleTopicExample {
private static final Logger logger = LoggerFactory.getLogger(WriteAsync.class);
private static final int MESSAGES_COUNT = 5;
private static final int WAIT_TIMEOUT_SECONDS = 60;

@Override
protected void run(GrpcTransport transport, String pathPrefix) {
protected void run(GrpcTransport transport) {
String producerId = "messageGroup1";
String messageGroupId = "messageGroup1";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.examples.SimpleExample;
import tech.ydb.topic.TopicClient;
import tech.ydb.topic.description.Codec;
import tech.ydb.topic.settings.WriterSettings;
Expand All @@ -17,16 +16,15 @@
/**
* @author Nikolay Perfilov
*/
public class WriteSync extends SimpleExample {
public class WriteSync extends SimpleTopicExample {
private static final Logger logger = LoggerFactory.getLogger(WriteSync.class);

@Override
protected void run(GrpcTransport transport, String pathPrefix) {
protected void run(GrpcTransport transport) {
String producerId = "messageGroup1";
String messageGroupId = "messageGroup1";

try (TopicClient topicClient = TopicClient.newClient(transport).build()) {

WriterSettings settings = WriterSettings.newBuilder()
.setTopicPath(TOPIC_NAME)
.setProducerId(producerId)
Expand Down
Loading
Loading