Skip to content

Commit d8b4f3e

Browse files
committed
group messages to batches
1 parent 041c2b4 commit d8b4f3e

File tree

3 files changed

+47
-51
lines changed

3 files changed

+47
-51
lines changed

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 29 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
Codec,
2727
)
2828
from .._errors import check_retriable_error
29-
from .. import topic
3029

3130

3231
class TopicReaderError(YdbError):
@@ -65,11 +64,11 @@ class PublicAsyncIOReader:
6564
_parent: typing.Any # need for prevent close parent client by GC
6665

6766
def __init__(
68-
self,
69-
driver: Driver,
70-
settings: topic_reader.PublicReaderSettings,
71-
*,
72-
_parent=None,
67+
self,
68+
driver: Driver,
69+
settings: topic_reader.PublicReaderSettings,
70+
*,
71+
_parent=None,
7372
):
7473
self._loop = asyncio.get_running_loop()
7574
self._closed = False
@@ -87,7 +86,7 @@ def __del__(self):
8786
self._loop.create_task(self.close(flush=False), name="close reader")
8887

8988
async def receive_batch(
90-
self,
89+
self,
9190
) -> typing.Union[datatypes.PublicBatch, None]:
9291
"""
9392
Get one messages batch from reader.
@@ -108,7 +107,7 @@ async def receive_message(self) -> typing.Optional[datatypes.PublicMessage]:
108107
return self._reconnector.receive_message_nowait()
109108

110109
def commit(
111-
self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch]
110+
self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch]
112111
):
113112
"""
114113
Write commit message to a buffer.
@@ -119,7 +118,7 @@ def commit(
119118
self._reconnector.commit(batch)
120119

121120
async def commit_with_ack(
122-
self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch]
121+
self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch]
123122
):
124123
"""
125124
write commit message to a buffer and wait ack from the server.
@@ -204,7 +203,7 @@ def receive_message_nowait(self):
204203
return self._stream_reader.receive_message_nowait()
205204

206205
def commit(
207-
self, batch: datatypes.ICommittable
206+
self, batch: datatypes.ICommittable
208207
) -> datatypes.PartitionSession.CommitAckWaiter:
209208
return self._stream_reader.commit(batch)
210209

@@ -263,10 +262,10 @@ class ReaderStream:
263262
_get_token_function: Callable[[], str]
264263

265264
def __init__(
266-
self,
267-
reader_reconnector_id: int,
268-
settings: topic_reader.PublicReaderSettings,
269-
get_token_function: Optional[Callable[[], str]] = None,
265+
self,
266+
reader_reconnector_id: int,
267+
settings: topic_reader.PublicReaderSettings,
268+
get_token_function: Optional[Callable[[], str]] = None,
270269
):
271270
self._loop = asyncio.get_running_loop()
272271
self._id = ReaderStream._static_id_counter.inc_and_get()
@@ -295,9 +294,9 @@ def __init__(
295294

296295
@staticmethod
297296
async def create(
298-
reader_reconnector_id: int,
299-
driver: SupportedDriverType,
300-
settings: topic_reader.PublicReaderSettings,
297+
reader_reconnector_id: int,
298+
driver: SupportedDriverType,
299+
settings: topic_reader.PublicReaderSettings,
301300
) -> "ReaderStream":
302301
stream = GrpcWrapperAsyncIO(StreamReadMessage.FromServer.from_proto)
303302

@@ -315,7 +314,7 @@ async def create(
315314
return reader
316315

317316
async def _start(
318-
self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMessage.InitRequest
317+
self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMessage.InitRequest
319318
):
320319
if self._started:
321320
raise TopicReaderError("Double start ReaderStream")
@@ -381,7 +380,7 @@ def receive_message_nowait(self):
381380
return message
382381

383382
def commit(
384-
self, batch: datatypes.ICommittable
383+
self, batch: datatypes.ICommittable
385384
) -> datatypes.PartitionSession.CommitAckWaiter:
386385
partition_session = batch._commit_get_partition_session()
387386

@@ -435,19 +434,19 @@ async def _read_messages_loop(self):
435434
self._on_read_response(message.server_message)
436435

437436
elif isinstance(
438-
message.server_message, StreamReadMessage.CommitOffsetResponse
437+
message.server_message, StreamReadMessage.CommitOffsetResponse
439438
):
440439
self._on_commit_response(message.server_message)
441440

442441
elif isinstance(
443-
message.server_message,
444-
StreamReadMessage.StartPartitionSessionRequest,
442+
message.server_message,
443+
StreamReadMessage.StartPartitionSessionRequest,
445444
):
446445
self._on_start_partition_session(message.server_message)
447446

448447
elif isinstance(
449-
message.server_message,
450-
StreamReadMessage.StopPartitionSessionRequest,
448+
message.server_message,
449+
StreamReadMessage.StopPartitionSessionRequest,
451450
):
452451
self._on_partition_session_stop(message.server_message)
453452

@@ -479,12 +478,12 @@ async def _update_token(self, token: str):
479478
self._update_token_event.clear()
480479

481480
def _on_start_partition_session(
482-
self, message: StreamReadMessage.StartPartitionSessionRequest
481+
self, message: StreamReadMessage.StartPartitionSessionRequest
483482
):
484483
try:
485484
if (
486-
message.partition_session.partition_session_id
487-
in self._partition_sessions
485+
message.partition_session.partition_session_id
486+
in self._partition_sessions
488487
):
489488
raise TopicReaderError(
490489
"Double start partition session: %s"
@@ -515,7 +514,7 @@ def _on_start_partition_session(
515514
self._set_first_error(err)
516515

517516
def _on_partition_session_stop(
518-
self, message: StreamReadMessage.StopPartitionSessionRequest
517+
self, message: StreamReadMessage.StopPartitionSessionRequest
519518
):
520519
if message.partition_session_id not in self._partition_sessions:
521520
# may if receive stop partition with graceful=false after response on stop partition
@@ -563,7 +562,7 @@ def _buffer_release_bytes(self, bytes_size):
563562
)
564563

565564
def _read_response_to_batches(
566-
self, message: StreamReadMessage.ReadResponse
565+
self, message: StreamReadMessage.ReadResponse
567566
) -> typing.List[datatypes.PublicBatch]:
568567
batches = []
569568

@@ -573,7 +572,7 @@ def _read_response_to_batches(
573572

574573
bytes_per_batch = message.bytes_size // batch_count
575574
additional_bytes_to_last_batch = (
576-
message.bytes_size - bytes_per_batch * batch_count
575+
message.bytes_size - bytes_per_batch * batch_count
577576
)
578577

579578
for partition_data in message.partition_data:

ydb/_topic_writer/topic_writer_asyncio.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,6 @@
4040
GrpcWrapperAsyncIO,
4141
)
4242

43-
from .. import topic
44-
4543
logger = logging.getLogger(__name__)
4644

4745

@@ -52,10 +50,10 @@ class WriterAsyncIO:
5250
_parent: typing.Any # need for prevent close parent client by GC
5351

5452
def __init__(
55-
self,
56-
driver: SupportedDriverType,
57-
settings: PublicWriterSettings,
58-
_client=None,
53+
self,
54+
driver: SupportedDriverType,
55+
settings: PublicWriterSettings,
56+
_client=None,
5957
):
6058
self._loop = asyncio.get_running_loop()
6159
self._closed = False

ydb/_topic_writer/topic_writer_sync.py

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
from concurrent.futures import Future
66
from typing import Union, List, Optional
77

8-
from .. import topic
98
from .._grpc.grpcwrapper.common_utils import SupportedDriverType
109
from .topic_writer import (
1110
PublicWriterSettings,
@@ -30,12 +29,12 @@ class WriterSync:
3029
_parent: typing.Any # need for prevent close parent client by GC
3130

3231
def __init__(
33-
self,
34-
driver: SupportedDriverType,
35-
settings: PublicWriterSettings,
36-
*,
37-
eventloop: Optional[asyncio.AbstractEventLoop] = None,
38-
_parent=None
32+
self,
33+
driver: SupportedDriverType,
34+
settings: PublicWriterSettings,
35+
*,
36+
eventloop: Optional[asyncio.AbstractEventLoop] = None,
37+
_parent=None,
3938
):
4039

4140
self._closed = False
@@ -101,17 +100,17 @@ def wait_init(self, *, timeout: TimeoutType = None) -> PublicWriterInitInfo:
101100
)
102101

103102
def write(
104-
self,
105-
messages: Union[Message, List[Message]],
106-
timeout: TimeoutType = None,
103+
self,
104+
messages: Union[Message, List[Message]],
105+
timeout: TimeoutType = None,
107106
):
108107
self._check_closed()
109108

110109
self._caller.safe_call_with_result(self._async_writer.write(messages), timeout)
111110

112111
def async_write_with_ack(
113-
self,
114-
messages: Union[Message, List[Message]],
112+
self,
113+
messages: Union[Message, List[Message]],
115114
) -> Future[Union[PublicWriteResult, List[PublicWriteResult]]]:
116115
self._check_closed()
117116

@@ -120,9 +119,9 @@ def async_write_with_ack(
120119
)
121120

122121
def write_with_ack(
123-
self,
124-
messages: Union[Message, List[Message]],
125-
timeout: Union[float, None] = None,
122+
self,
123+
messages: Union[Message, List[Message]],
124+
timeout: Union[float, None] = None,
126125
) -> Union[PublicWriteResult, List[PublicWriteResult]]:
127126
self._check_closed()
128127

0 commit comments

Comments
 (0)