Skip to content

Commit 337b12f

Browse files
author
Igor Melnichenko
committed
GrpcStreamRetrier's ID was made configurable
1 parent c91ad03 commit 337b12f

File tree

6 files changed

+63
-4
lines changed

6 files changed

+63
-4
lines changed

.editorconfig

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,20 @@ ij_java_insert_override_annotation = true
1717
ij_java_lambda_brace_style = end_of_line
1818
ij_java_method_brace_style = end_of_line
1919
ij_java_names_count_to_use_import_on_demand = 9000
20+
ij_java_space_after_colon = true
21+
ij_java_space_before_colon = true
22+
ij_java_spaces_around_additive_operators = true
23+
ij_java_spaces_around_annotation_eq = true
24+
ij_java_spaces_around_assignment_operators = true
25+
ij_java_spaces_around_bitwise_operators = true
26+
ij_java_spaces_around_equality_operators = true
27+
ij_java_spaces_around_lambda_arrow = true
28+
ij_java_spaces_around_logical_operators = true
29+
ij_java_spaces_around_method_ref_dbl_colon = false
30+
ij_java_spaces_around_multiplicative_operators = true
31+
ij_java_spaces_around_relational_operators = true
32+
ij_java_spaces_around_shift_operators = true
33+
ij_java_spaces_around_type_bounds_in_type_parameters = true
34+
ij_java_spaces_around_unary_operator = false
2035
ij_java_use_single_class_imports = true
2136
ij_java_while_brace_force = always

topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,13 @@ public abstract class GrpcStreamRetrier {
3535
private final ScheduledExecutorService scheduler;
3636
private final BiConsumer<Status, Throwable> errorsHandler;
3737

38-
protected GrpcStreamRetrier(ScheduledExecutorService scheduler, BiConsumer<Status, Throwable> errorsHandler) {
38+
protected GrpcStreamRetrier(
39+
String id,
40+
ScheduledExecutorService scheduler,
41+
BiConsumer<Status, Throwable> errorsHandler
42+
) {
3943
this.scheduler = scheduler;
40-
this.id = generateRandomId(ID_LENGTH);
44+
this.id = id == null ? generateRandomId(ID_LENGTH) : id;
4145
this.errorsHandler = errorsHandler;
4246
}
4347

topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public abstract class ReaderImpl extends GrpcStreamRetrier {
5555
private final String consumerName;
5656

5757
public ReaderImpl(TopicRpc topicRpc, ReaderSettings settings) {
58-
super(topicRpc.getScheduler(), settings.getErrorsHandler());
58+
super(settings.getDebugId(), topicRpc.getScheduler(), settings.getErrorsHandler());
5959
this.topicRpc = topicRpc;
6060
this.settings = settings;
6161
this.session = new ReadSessionImpl();

topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
public class ReaderSettings {
1818
private static final long MAX_MEMORY_USAGE_BYTES_DEFAULT = 100 * 1024 * 1024; // 100 MB
1919

20+
private final String debugId;
2021
private final String consumerName;
2122
private final String readerName;
2223
private final List<TopicReadSettings> topics;
@@ -25,6 +26,7 @@ public class ReaderSettings {
2526
private final BiConsumer<Status, Throwable> errorsHandler;
2627

2728
private ReaderSettings(Builder builder) {
29+
this.debugId = builder.debugId;
2830
this.consumerName = builder.consumerName;
2931
this.readerName = builder.readerName;
3032
this.topics = ImmutableList.copyOf(builder.topics);
@@ -33,6 +35,10 @@ private ReaderSettings(Builder builder) {
3335
this.errorsHandler = builder.errorsHandler;
3436
}
3537

38+
public String getDebugId() {
39+
return debugId;
40+
}
41+
3642
public String getConsumerName() {
3743
return consumerName;
3844
}
@@ -66,6 +72,7 @@ public static Builder newBuilder() {
6672
* BUILDER
6773
*/
6874
public static class Builder {
75+
private String debugId = null;
6976
private String consumerName = null;
7077
private boolean readWithoutConsumer = false;
7178
private String readerName = null;
@@ -74,6 +81,19 @@ public static class Builder {
7481
private Executor decompressionExecutor = null;
7582
private BiConsumer<Status, Throwable> errorsHandler = null;
7683

84+
/**
85+
* Sets an ID of a reader to be used for logging, tracing, etc. If is {@code null}, an autogenerated value will
86+
* be used.
87+
*
88+
* @param debugId the ID
89+
*
90+
* @return this builder
91+
*/
92+
public Builder setDebugId(String debugId) {
93+
this.debugId = debugId;
94+
return this;
95+
}
96+
7797
public Builder setConsumerName(String consumerName) {
7898
this.consumerName = consumerName;
7999
return this;

topic/src/main/java/tech/ydb/topic/settings/WriterSettings.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ public class WriterSettings {
1212
private static final long MAX_MEMORY_USAGE_BYTES_DEFAULT = 20 * 1024 * 1024; // 20 MB
1313
private static final int MAX_IN_FLIGHT_COUNT_DEFAULT = 100000;
1414

15+
private final String debugId;
1516
private final String topicPath;
1617
private final String producerId;
1718
private final String messageGroupId;
@@ -22,6 +23,7 @@ public class WriterSettings {
2223
private final BiConsumer<Status, Throwable> errorsHandler;
2324

2425
private WriterSettings(Builder builder) {
26+
this.debugId = builder.debugId;
2527
this.topicPath = builder.topicPath;
2628
this.producerId = builder.producerId;
2729
this.messageGroupId = builder.messageGroupId;
@@ -36,6 +38,10 @@ public static Builder newBuilder() {
3638
return new Builder();
3739
}
3840

41+
public String getDebugId() {
42+
return debugId;
43+
}
44+
3945
public String getTopicPath() {
4046
return topicPath;
4147
}
@@ -72,6 +78,7 @@ public int getMaxSendBufferMessagesCount() {
7278
* BUILDER
7379
*/
7480
public static class Builder {
81+
private String debugId = null;
7582
private String topicPath = null;
7683
private String producerId = null;
7784
private String messageGroupId = null;
@@ -81,6 +88,19 @@ public static class Builder {
8188
private int maxSendBufferMessagesCount = MAX_IN_FLIGHT_COUNT_DEFAULT;
8289
private BiConsumer<Status, Throwable> errorsHandler = null;
8390

91+
/**
92+
* Sets an ID of a writer to be used for logging, tracing, etc. If is {@code null}, an autogenerated value will
93+
* be used.
94+
*
95+
* @param debugId the ID
96+
*
97+
* @return this builder
98+
*/
99+
public Builder setDebugId(String debugId) {
100+
this.debugId = debugId;
101+
return this;
102+
}
103+
84104
/**
85105
* Set path to a topic to write to
86106
* @param topicPath path to a topic

topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public abstract class WriterImpl extends GrpcStreamRetrier {
6666
private CompletableFuture<WriteAck> lastAcceptedMessageFuture;
6767

6868
public WriterImpl(TopicRpc topicRpc, WriterSettings settings, Executor compressionExecutor) {
69-
super(topicRpc.getScheduler(), settings.getErrorsHandler());
69+
super(settings.getDebugId(), topicRpc.getScheduler(), settings.getErrorsHandler());
7070
this.topicRpc = topicRpc;
7171
this.settings = settings;
7272
this.session = new WriteSessionImpl();

0 commit comments

Comments
 (0)