Skip to content

Commit d580933

Browse files
authored
Merge pull request #251 fix commit in sync reader
2 parents 653d4ea + d8b4f3e commit d580933

File tree

7 files changed

+71
-11
lines changed

7 files changed

+71
-11
lines changed

tests/topics/test_topic_reader.py

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ async def test_read_batch(
1616

1717
await reader.close()
1818

19+
async def test_link_to_client(self, driver, topic_path, topic_consumer):
20+
reader = driver.topic_client.reader(topic_path, topic_consumer)
21+
assert reader._parent is driver.topic_client
22+
1923
async def test_read_message(
2024
self, driver, topic_path, topic_with_messages, topic_consumer
2125
):
@@ -27,7 +31,18 @@ async def test_read_message(
2731

2832
await reader.close()
2933

30-
async def test_read_and_commit_message(
34+
async def test_read_and_commit_with_close_reader(
35+
self, driver, topic_path, topic_with_messages, topic_consumer
36+
):
37+
async with driver.topic_client.reader(topic_path, topic_consumer) as reader:
38+
message = await reader.receive_message()
39+
reader.commit(message)
40+
41+
async with driver.topic_client.reader(topic_path, topic_consumer) as reader:
42+
message2 = await reader.receive_message()
43+
assert message != message2
44+
45+
async def test_read_and_commit_with_ack(
3146
self, driver, topic_path, topic_with_messages, topic_consumer
3247
):
3348

@@ -84,6 +99,10 @@ def test_read_batch(
8499

85100
reader.close()
86101

102+
def test_link_to_client(self, driver_sync, topic_path, topic_consumer):
103+
reader = driver_sync.topic_client.reader(topic_path, topic_consumer)
104+
assert reader._parent is driver_sync.topic_client
105+
87106
def test_read_message(
88107
self, driver_sync, topic_path, topic_with_messages, topic_consumer
89108
):
@@ -95,7 +114,18 @@ def test_read_message(
95114

96115
reader.close()
97116

98-
def test_read_and_commit_message(
117+
def test_read_and_commit_with_close_reader(
118+
self, driver_sync, topic_path, topic_with_messages, topic_consumer
119+
):
120+
with driver_sync.topic_client.reader(topic_path, topic_consumer) as reader:
121+
message = reader.receive_message()
122+
reader.commit(message)
123+
124+
with driver_sync.topic_client.reader(topic_path, topic_consumer) as reader:
125+
message2 = reader.receive_message()
126+
assert message != message2
127+
128+
def test_read_and_commit_with_ack(
99129
self, driver_sync, topic_path, topic_with_messages, topic_consumer
100130
):
101131
reader = driver_sync.topic_client.reader(topic_path, topic_consumer)

tests/topics/test_topic_writer.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ async def test_wait_last_seqno(self, driver: ydb.aio.Driver, topic_path):
2727
init_info = await writer2.wait_init()
2828
assert init_info.last_seqno == 5
2929

30+
async def test_link_to_client(self, driver, topic_path, topic_consumer):
31+
writer = driver.topic_client.writer(topic_path)
32+
assert writer._parent is driver.topic_client
33+
3034
async def test_random_producer_id(
3135
self, driver: ydb.aio.Driver, topic_path, topic_reader: ydb.TopicReaderAsyncIO
3236
):
@@ -138,6 +142,10 @@ def test_auto_flush_on_close(self, driver_sync: ydb.Driver, topic_path):
138142
init_info = writer.wait_init()
139143
assert init_info.last_seqno == last_seqno
140144

145+
def test_link_to_client(self, driver_sync, topic_path, topic_consumer):
146+
writer = driver_sync.topic_client.writer(topic_path)
147+
assert writer._parent is driver_sync.topic_client
148+
141149
def test_random_producer_id(
142150
self,
143151
driver_sync: ydb.aio.Driver,

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,19 @@ class PublicAsyncIOReader:
6161
_loop: asyncio.AbstractEventLoop
6262
_closed: bool
6363
_reconnector: ReaderReconnector
64+
_parent: typing.Any # need for prevent close parent client by GC
6465

65-
def __init__(self, driver: Driver, settings: topic_reader.PublicReaderSettings):
66+
def __init__(
67+
self,
68+
driver: Driver,
69+
settings: topic_reader.PublicReaderSettings,
70+
*,
71+
_parent=None,
72+
):
6673
self._loop = asyncio.get_running_loop()
6774
self._closed = False
6875
self._reconnector = ReaderReconnector(driver, settings)
76+
self._parent = _parent
6977

7078
async def __aenter__(self):
7179
return self

ydb/_topic_reader/topic_reader_sync.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,15 @@ class TopicReaderSync:
2525
_caller: CallFromSyncToAsync
2626
_async_reader: PublicAsyncIOReader
2727
_closed: bool
28+
_parent: typing.Any # need for prevent stop the client by GC
2829

2930
def __init__(
3031
self,
3132
driver: SupportedDriverType,
3233
settings: PublicReaderSettings,
3334
*,
3435
eventloop: Optional[asyncio.AbstractEventLoop] = None,
36+
_parent=None, # need for prevent stop the client by GC
3537
):
3638
self._closed = False
3739

@@ -49,6 +51,8 @@ async def create_reader():
4951
create_reader(), loop
5052
).result()
5153

54+
self._parent = _parent
55+
5256
def __del__(self):
5357
self.close(flush=False)
5458

@@ -122,7 +126,7 @@ def commit(
122126
"""
123127
self._check_closed()
124128

125-
self._caller.call_sync(self._async_reader.commit(mess))
129+
self._caller.call_sync(lambda: self._async_reader.commit(mess))
126130

127131
def commit_with_ack(
128132
self,

ydb/_topic_writer/topic_writer_asyncio.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,20 @@ class WriterAsyncIO:
4747
_loop: asyncio.AbstractEventLoop
4848
_reconnector: "WriterAsyncIOReconnector"
4949
_closed: bool
50+
_parent: typing.Any # need for prevent close parent client by GC
5051

51-
def __init__(self, driver: SupportedDriverType, settings: PublicWriterSettings):
52+
def __init__(
53+
self,
54+
driver: SupportedDriverType,
55+
settings: PublicWriterSettings,
56+
_client=None,
57+
):
5258
self._loop = asyncio.get_running_loop()
5359
self._closed = False
5460
self._reconnector = WriterAsyncIOReconnector(
5561
driver=driver, settings=WriterSettings(settings)
5662
)
63+
self._parent = _client
5764

5865
async def __aenter__(self) -> "WriterAsyncIO":
5966
return self

ydb/_topic_writer/topic_writer_sync.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import asyncio
4+
import typing
45
from concurrent.futures import Future
56
from typing import Union, List, Optional
67

@@ -25,13 +26,15 @@ class WriterSync:
2526
_caller: CallFromSyncToAsync
2627
_async_writer: WriterAsyncIO
2728
_closed: bool
29+
_parent: typing.Any # need for prevent close parent client by GC
2830

2931
def __init__(
3032
self,
3133
driver: SupportedDriverType,
3234
settings: PublicWriterSettings,
3335
*,
3436
eventloop: Optional[asyncio.AbstractEventLoop] = None,
37+
_parent=None,
3538
):
3639

3740
self._closed = False
@@ -49,6 +52,7 @@ async def create_async_writer():
4952
self._async_writer = self._caller.safe_call_with_result(
5053
create_async_writer(), None
5154
)
55+
self._parent = _parent
5256

5357
def __enter__(self):
5458
return self

ydb/topic.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,14 @@
4444
RetryPolicy as TopicWriterRetryPolicy,
4545
)
4646

47+
from ydb._topic_writer.topic_writer_asyncio import WriterAsyncIO as TopicWriterAsyncIO
4748
from ._topic_writer.topic_writer_sync import WriterSync as TopicWriter
4849

4950
from ._topic_common.common import (
5051
wrap_operation as _wrap_operation,
5152
create_result_wrapper as _create_result_wrapper,
5253
)
5354

54-
from ydb._topic_writer.topic_writer_asyncio import WriterAsyncIO as TopicWriterAsyncIO
55-
5655
from ._grpc.grpcwrapper import ydb_topic as _ydb_topic
5756
from ._grpc.grpcwrapper import ydb_topic_public_types as _ydb_topic_public_types
5857
from ._grpc.grpcwrapper.ydb_topic_public_types import ( # noqa: F401
@@ -174,7 +173,7 @@ def reader(
174173

175174
settings = TopicReaderSettings(**args)
176175

177-
return TopicReaderAsyncIO(self._driver, settings)
176+
return TopicReaderAsyncIO(self._driver, settings, _parent=self)
178177

179178
def writer(
180179
self,
@@ -201,7 +200,7 @@ def writer(
201200
if not settings.encoder_executor:
202201
settings.encoder_executor = self._executor
203202

204-
return TopicWriterAsyncIO(self._driver, settings)
203+
return TopicWriterAsyncIO(self._driver, settings, _client=self)
205204

206205
def close(self):
207206
if self._closed:
@@ -331,7 +330,7 @@ def reader(
331330

332331
settings = TopicReaderSettings(**args)
333332

334-
return TopicReader(self._driver, settings)
333+
return TopicReader(self._driver, settings, _parent=self)
335334

336335
def writer(
337336
self,
@@ -359,7 +358,7 @@ def writer(
359358
if not settings.encoder_executor:
360359
settings.encoder_executor = self._executor
361360

362-
return TopicWriter(self._driver, settings)
361+
return TopicWriter(self._driver, settings, _parent=self)
363362

364363
def close(self):
365364
if self._closed:

0 commit comments

Comments
 (0)