Skip to content

Commit

Permalink
Topic tests adjustments
Browse files Browse the repository at this point in the history
  • Loading branch information
pnv1 committed Mar 29, 2024
1 parent 9d761ba commit 53998f0
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 13 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<log4j.version>2.22.1</log4j.version>
<jcommander.version>1.82</jcommander.version>

<ydb.sdk.version>2.2.0-SNAPSHOT</ydb.sdk.version>
<ydb.sdk.version>2.2.0</ydb.sdk.version>
</properties>

<modules>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package tech.ydb.examples.topic;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.examples.SimpleExample;
import tech.ydb.table.Session;
import tech.ydb.topic.TopicClient;
import tech.ydb.topic.description.MetadataItem;
import tech.ydb.topic.read.DecompressionException;
import tech.ydb.topic.read.Message;
import tech.ydb.topic.read.SyncReader;
Expand All @@ -31,7 +31,7 @@ protected void run(GrpcTransport transport, String pathPrefix) {
ReaderSettings settings = ReaderSettings.newBuilder()
.setConsumerName(CONSUMER_NAME)
.addTopic(TopicReadSettings.newBuilder()
.setPath(CONSUMER_NAME)
.setPath(TOPIC_NAME)
.setReadFrom(Instant.now().minus(Duration.ofHours(24)))
.setMaxLag(Duration.ofMinutes(30))
.build())
Expand All @@ -42,7 +42,6 @@ protected void run(GrpcTransport transport, String pathPrefix) {
// Init in background
reader.init();


try {
// Reading 5 messages
for (int i = 0; i < 5; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ protected void run(GrpcTransport transport, String pathPrefix) {
switch (result.getState()) {
case WRITTEN:
WriteAck.Details details = result.getDetails();
logger.info("Message was written successfully."
+ ", offset: " + details.getOffset());
logger.info("Message was written successfully, offset: " +
details.getOffset());
break;
case ALREADY_WRITTEN:
logger.warn("Message was already written");
Expand All @@ -106,7 +106,9 @@ protected void run(GrpcTransport transport, String pathPrefix) {
break;
}
}
});
})
// Waiting for message to reach server before transaction commit
.join();
} catch (QueueOverflowException exception) {
logger.error("Queue overflow exception while sending message{}: ", index, exception);
// Send queue is full. Need retry with backoff or skip
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,13 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.common.transaction.BaseTransaction;
import tech.ydb.common.transaction.TxMode;
import tech.ydb.core.Result;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.examples.SimpleExample;
import tech.ydb.table.Session;
import tech.ydb.table.TableClient;
import tech.ydb.table.transaction.TableTransaction;
import tech.ydb.table.transaction.Transaction;
import tech.ydb.table.transaction.TxControl;
import tech.ydb.topic.TopicClient;
import tech.ydb.topic.description.Codec;
import tech.ydb.topic.settings.SendSettings;
Expand Down Expand Up @@ -92,11 +89,11 @@ protected void run(GrpcTransport transport, String pathPrefix) {
} catch (InterruptedException | ExecutionException exception) {
logger.error("Couldn't put message {} into sending queue due to exception: ", i, exception);
}
// flush to wait until all messages reach server before commit
writer.flush();
transaction.commit().join();
}

writer.flush();
logger.info("Flush finished");
long shutdownTimeoutSeconds = 10;
try {
writer.shutdown(shutdownTimeoutSeconds, TimeUnit.SECONDS);
Expand Down

0 comments on commit 53998f0

Please sign in to comment.