Skip to content

Making GrpcStreamRetrier ID configurable #492

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,20 @@ ij_java_insert_override_annotation = true
ij_java_lambda_brace_style = end_of_line
ij_java_method_brace_style = end_of_line
ij_java_names_count_to_use_import_on_demand = 9000
ij_java_space_after_colon = true
ij_java_space_before_colon = true
ij_java_spaces_around_additive_operators = true
ij_java_spaces_around_annotation_eq = true
ij_java_spaces_around_assignment_operators = true
ij_java_spaces_around_bitwise_operators = true
ij_java_spaces_around_equality_operators = true
ij_java_spaces_around_lambda_arrow = true
ij_java_spaces_around_logical_operators = true
ij_java_spaces_around_method_ref_dbl_colon = false
ij_java_spaces_around_multiplicative_operators = true
ij_java_spaces_around_relational_operators = true
ij_java_spaces_around_shift_operators = true
ij_java_spaces_around_type_bounds_in_type_parameters = true
ij_java_spaces_around_unary_operator = false
ij_java_use_single_class_imports = true
ij_java_while_brace_force = always
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,13 @@ public abstract class GrpcStreamRetrier {
private final ScheduledExecutorService scheduler;
private final BiConsumer<Status, Throwable> errorsHandler;

protected GrpcStreamRetrier(ScheduledExecutorService scheduler, BiConsumer<Status, Throwable> errorsHandler) {
protected GrpcStreamRetrier(
String id,
ScheduledExecutorService scheduler,
BiConsumer<Status, Throwable> errorsHandler
) {
this.scheduler = scheduler;
this.id = generateRandomId(ID_LENGTH);
this.id = id == null ? generateRandomId(ID_LENGTH) : id;
this.errorsHandler = errorsHandler;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public abstract class ReaderImpl extends GrpcStreamRetrier {
private final String consumerName;

public ReaderImpl(TopicRpc topicRpc, ReaderSettings settings) {
super(topicRpc.getScheduler(), settings.getErrorsHandler());
super(settings.getDebugId(), topicRpc.getScheduler(), settings.getErrorsHandler());
this.topicRpc = topicRpc;
this.settings = settings;
this.session = new ReadSessionImpl();
Expand Down
20 changes: 20 additions & 0 deletions topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
public class ReaderSettings {
private static final long MAX_MEMORY_USAGE_BYTES_DEFAULT = 100 * 1024 * 1024; // 100 MB

private final String debugId;
private final String consumerName;
private final String readerName;
private final List<TopicReadSettings> topics;
Expand All @@ -25,6 +26,7 @@ public class ReaderSettings {
private final BiConsumer<Status, Throwable> errorsHandler;

private ReaderSettings(Builder builder) {
this.debugId = builder.debugId;
this.consumerName = builder.consumerName;
this.readerName = builder.readerName;
this.topics = ImmutableList.copyOf(builder.topics);
Expand All @@ -33,6 +35,10 @@ private ReaderSettings(Builder builder) {
this.errorsHandler = builder.errorsHandler;
}

public String getDebugId() {
return debugId;
}

public String getConsumerName() {
return consumerName;
}
Expand Down Expand Up @@ -66,6 +72,7 @@ public static Builder newBuilder() {
* BUILDER
*/
public static class Builder {
private String debugId = null;
private String consumerName = null;
private boolean readWithoutConsumer = false;
private String readerName = null;
Expand All @@ -74,6 +81,19 @@ public static class Builder {
private Executor decompressionExecutor = null;
private BiConsumer<Status, Throwable> errorsHandler = null;

/**
* Sets an ID of a reader to be used for logging, tracing, etc. If is {@code null}, an autogenerated value will
* be used.
*
* @param debugId the ID
*
* @return this builder
*/
public Builder setDebugId(String debugId) {
this.debugId = debugId;
return this;
}

public Builder setConsumerName(String consumerName) {
this.consumerName = consumerName;
return this;
Expand Down
20 changes: 20 additions & 0 deletions topic/src/main/java/tech/ydb/topic/settings/WriterSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public class WriterSettings {
private static final long MAX_MEMORY_USAGE_BYTES_DEFAULT = 20 * 1024 * 1024; // 20 MB
private static final int MAX_IN_FLIGHT_COUNT_DEFAULT = 100000;

private final String debugId;
private final String topicPath;
private final String producerId;
private final String messageGroupId;
Expand All @@ -22,6 +23,7 @@ public class WriterSettings {
private final BiConsumer<Status, Throwable> errorsHandler;

private WriterSettings(Builder builder) {
this.debugId = builder.debugId;
this.topicPath = builder.topicPath;
this.producerId = builder.producerId;
this.messageGroupId = builder.messageGroupId;
Expand All @@ -36,6 +38,10 @@ public static Builder newBuilder() {
return new Builder();
}

public String getDebugId() {
return debugId;
}

public String getTopicPath() {
return topicPath;
}
Expand Down Expand Up @@ -72,6 +78,7 @@ public int getMaxSendBufferMessagesCount() {
* BUILDER
*/
public static class Builder {
private String debugId = null;
private String topicPath = null;
private String producerId = null;
private String messageGroupId = null;
Expand All @@ -81,6 +88,19 @@ public static class Builder {
private int maxSendBufferMessagesCount = MAX_IN_FLIGHT_COUNT_DEFAULT;
private BiConsumer<Status, Throwable> errorsHandler = null;

/**
* Sets an ID of a writer to be used for logging, tracing, etc. If is {@code null}, an autogenerated value will
* be used.
*
* @param debugId the ID
*
* @return this builder
*/
public Builder setDebugId(String debugId) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LogId looks better IMO, more specific.
Or LogPrefix to be even more specific

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be sure: are you sure that this ID won't be useful for other observability purposes?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. That identifier was created and is used for debug purpose only

this.debugId = debugId;
return this;
}

/**
* Set path to a topic to write to
* @param topicPath path to a topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public abstract class WriterImpl extends GrpcStreamRetrier {
private CompletableFuture<WriteAck> lastAcceptedMessageFuture;

public WriterImpl(TopicRpc topicRpc, WriterSettings settings, Executor compressionExecutor) {
super(topicRpc.getScheduler(), settings.getErrorsHandler());
super(settings.getDebugId(), topicRpc.getScheduler(), settings.getErrorsHandler());
this.topicRpc = topicRpc;
this.settings = settings;
this.session = new WriteSessionImpl();
Expand Down