Skip to content

Commit decbae6

Browse files
author
Igor Melnichenko
committed
GrpcStreamRetrier was made configurable
1 parent c91ad03 commit decbae6

File tree

9 files changed

+104
-19
lines changed

9 files changed

+104
-19
lines changed

topic/src/main/java/tech/ydb/topic/TopicClient.java

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,20 @@ default CompletableFuture<Result<ConsumerDescription>> describeConsumer(String p
125125
* @param settings reader settings
126126
* @return topic {@link SyncReader}
127127
*/
128-
SyncReader createSyncReader(ReaderSettings settings);
128+
default SyncReader createSyncReader(ReaderSettings settings) {
129+
return createSyncReader(null, settings);
130+
}
131+
132+
/**
133+
* Create sync topic reader.
134+
*
135+
* @param id an ID of a reader to be used for logging, tracing, etc. If is {@code null}, an autogenerated value will
136+
* be used
137+
* @param settings reader settings
138+
*
139+
* @return topic {@link SyncReader}
140+
*/
141+
SyncReader createSyncReader(String id, ReaderSettings settings);
129142

130143
/**
131144
* Create async topic reader.
@@ -134,7 +147,21 @@ default CompletableFuture<Result<ConsumerDescription>> describeConsumer(String p
134147
* @param handlersSettings settings for read event handling
135148
* @return topic {@link AsyncReader}
136149
*/
137-
AsyncReader createAsyncReader(ReaderSettings settings, ReadEventHandlersSettings handlersSettings);
150+
default AsyncReader createAsyncReader(ReaderSettings settings, ReadEventHandlersSettings handlersSettings) {
151+
return createAsyncReader(null, settings, handlersSettings);
152+
}
153+
154+
/**
155+
* Create async topic reader.
156+
*
157+
* @param id an ID of a reader to be used for logging, tracing, etc. If is {@code null}, an autogenerated value will
158+
* be used
159+
* @param settings reader settings
160+
* @param handlersSettings settings for read event handling
161+
*
162+
* @return topic {@link AsyncReader}
163+
*/
164+
AsyncReader createAsyncReader(String id, ReaderSettings settings, ReadEventHandlersSettings handlersSettings);
138165

139166
/**
140167
* Commit offset to topic
@@ -151,15 +178,41 @@ default CompletableFuture<Result<ConsumerDescription>> describeConsumer(String p
151178
* @param settings {@link WriterSettings}
152179
* @return topic {@link SyncWriter}
153180
*/
154-
SyncWriter createSyncWriter(WriterSettings settings);
181+
default SyncWriter createSyncWriter(WriterSettings settings) {
182+
return createSyncWriter(null, settings);
183+
}
184+
185+
/**
186+
* Create sync topic writer.
187+
*
188+
* @param id an ID of a writer to be used for logging, tracing, etc. If is {@code null}, an autogenerated value will
189+
* be used
190+
* @param settings {@link WriterSettings}
191+
*
192+
* @return topic {@link SyncWriter}
193+
*/
194+
SyncWriter createSyncWriter(String id, WriterSettings settings);
155195

156196
/**
157197
* Create async topic writer.
158198
*
159199
* @param settings {@link WriterSettings}
160200
* @return topic {@link AsyncWriter}
161201
*/
162-
AsyncWriter createAsyncWriter(WriterSettings settings);
202+
default AsyncWriter createAsyncWriter(WriterSettings settings) {
203+
return createAsyncWriter(null, settings);
204+
}
205+
206+
/**
207+
* Create async topic writer.
208+
*
209+
* @param id an ID of a writer to be used for logging, tracing, etc. If is {@code null}, an autogenerated value will
210+
* be used
211+
* @param settings {@link WriterSettings}
212+
*
213+
* @return topic {@link AsyncWriter}
214+
*/
215+
AsyncWriter createAsyncWriter(String id, WriterSettings settings);
163216

164217
@Override
165218
void close();

topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,13 @@ public abstract class GrpcStreamRetrier {
3535
private final ScheduledExecutorService scheduler;
3636
private final BiConsumer<Status, Throwable> errorsHandler;
3737

38-
protected GrpcStreamRetrier(ScheduledExecutorService scheduler, BiConsumer<Status, Throwable> errorsHandler) {
38+
protected GrpcStreamRetrier(
39+
String id,
40+
ScheduledExecutorService scheduler,
41+
BiConsumer<Status, Throwable> errorsHandler
42+
) {
3943
this.scheduler = scheduler;
40-
this.id = generateRandomId(ID_LENGTH);
44+
this.id = id==null ? generateRandomId(ID_LENGTH):id;
4145
this.errorsHandler = errorsHandler;
4246
}
4347

topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -362,13 +362,13 @@ private TopicDescription mapDescribeTopic(YdbTopic.DescribeTopicResult result) {
362362
}
363363

364364
@Override
365-
public SyncReader createSyncReader(ReaderSettings settings) {
366-
return new SyncReaderImpl(topicRpc, settings);
365+
public SyncReader createSyncReader(String id, ReaderSettings settings) {
366+
return new SyncReaderImpl(id, topicRpc, settings);
367367
}
368368

369369
@Override
370-
public AsyncReader createAsyncReader(ReaderSettings settings, ReadEventHandlersSettings handlersSettings) {
371-
return new AsyncReaderImpl(topicRpc, settings, handlersSettings);
370+
public AsyncReader createAsyncReader(String id, ReaderSettings settings, ReadEventHandlersSettings handlersSettings) {
371+
return new AsyncReaderImpl(id, topicRpc, settings, handlersSettings);
372372
}
373373

374374
@Override
@@ -389,13 +389,13 @@ public CompletableFuture<Status> commitOffset(String path, CommitOffsetSettings
389389
}
390390

391391
@Override
392-
public SyncWriter createSyncWriter(WriterSettings settings) {
393-
return new SyncWriterImpl(topicRpc, settings, compressionExecutor);
392+
public SyncWriter createSyncWriter(String id, WriterSettings settings) {
393+
return new SyncWriterImpl(id, topicRpc, settings, compressionExecutor);
394394
}
395395

396396
@Override
397-
public AsyncWriter createAsyncWriter(WriterSettings settings) {
398-
return new AsyncWriterImpl(topicRpc, settings, compressionExecutor);
397+
public AsyncWriter createAsyncWriter(String id, WriterSettings settings) {
398+
return new AsyncWriterImpl(id, topicRpc, settings, compressionExecutor);
399399
}
400400

401401
private static YdbTopic.MeteringMode toProto(MeteringMode meteringMode) {

topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,19 @@ public class AsyncReaderImpl extends ReaderImpl implements AsyncReader {
4747
private final ReadEventHandler eventHandler;
4848

4949
public AsyncReaderImpl(TopicRpc topicRpc, ReaderSettings settings, ReadEventHandlersSettings handlersSettings) {
50-
super(topicRpc, settings);
50+
this(null, topicRpc, settings, handlersSettings);
51+
}
52+
53+
public AsyncReaderImpl(
54+
String id,
55+
TopicRpc topicRpc,
56+
ReaderSettings settings,
57+
ReadEventHandlersSettings handlersSettings
58+
) {
59+
super(id, topicRpc, settings);
5160
this.eventHandler = handlersSettings.getEventHandler();
5261

53-
if (handlersSettings.getExecutor() != null) {
62+
if (handlersSettings.getExecutor()!=null) {
5463
logger.debug("Using handler executor provided by user");
5564
this.defaultHandlerExecutorService = null;
5665
this.handlerExecutor = handlersSettings.getExecutor();

topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,11 @@ public abstract class ReaderImpl extends GrpcStreamRetrier {
5555
private final String consumerName;
5656

5757
public ReaderImpl(TopicRpc topicRpc, ReaderSettings settings) {
58-
super(topicRpc.getScheduler(), settings.getErrorsHandler());
58+
this(null, topicRpc, settings);
59+
}
60+
61+
public ReaderImpl(String id, TopicRpc topicRpc, ReaderSettings settings) {
62+
super(id, topicRpc.getScheduler(), settings.getErrorsHandler());
5963
this.topicRpc = topicRpc;
6064
this.settings = settings;
6165
this.session = new ReadSessionImpl();

topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ public SyncReaderImpl(TopicRpc topicRpc, ReaderSettings settings) {
4545
super(topicRpc, settings);
4646
}
4747

48+
public SyncReaderImpl(String id, TopicRpc topicRpc, ReaderSettings settings) {
49+
super(id, topicRpc, settings);
50+
}
51+
4852
private static class MessageBatchWrapper {
4953
private final List<Message> messages;
5054
private final CompletableFuture<Void> future;

topic/src/main/java/tech/ydb/topic/write/impl/AsyncWriterImpl.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,14 @@
1717
* @author Nikolay Perfilov
1818
*/
1919
public class AsyncWriterImpl extends WriterImpl implements AsyncWriter {
20-
2120
public AsyncWriterImpl(TopicRpc topicRpc, WriterSettings settings, Executor compressionExecutor) {
2221
super(topicRpc, settings, compressionExecutor);
2322
}
2423

24+
public AsyncWriterImpl(String id, TopicRpc topicRpc, WriterSettings settings, Executor compressionExecutor) {
25+
super(id, topicRpc, settings, compressionExecutor);
26+
}
27+
2528
@Override
2629
public CompletableFuture<InitResult> init() {
2730
return initImpl();

topic/src/main/java/tech/ydb/topic/write/impl/SyncWriterImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ public SyncWriterImpl(TopicRpc topicRpc, WriterSettings settings, Executor compr
2222
super(topicRpc, settings, compressionExecutor);
2323
}
2424

25+
public SyncWriterImpl(String id, TopicRpc topicRpc, WriterSettings settings, Executor compressionExecutor) {
26+
super(topicRpc, settings, compressionExecutor);
27+
}
28+
2529
@Override
2630
public void init() {
2731
initImpl();

topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,11 @@ public abstract class WriterImpl extends GrpcStreamRetrier {
6666
private CompletableFuture<WriteAck> lastAcceptedMessageFuture;
6767

6868
public WriterImpl(TopicRpc topicRpc, WriterSettings settings, Executor compressionExecutor) {
69-
super(topicRpc.getScheduler(), settings.getErrorsHandler());
69+
this(null, topicRpc, settings, compressionExecutor);
70+
}
71+
72+
public WriterImpl(String id, TopicRpc topicRpc, WriterSettings settings, Executor compressionExecutor) {
73+
super(id, topicRpc.getScheduler(), settings.getErrorsHandler());
7074
this.topicRpc = topicRpc;
7175
this.settings = settings;
7276
this.session = new WriteSessionImpl();

0 commit comments

Comments
 (0)