Skip to content

Commit 8bcb4de

Browse files
committed
another approach
1 parent cbc0352 commit 8bcb4de

File tree

5 files changed

+189
-113
lines changed

5 files changed

+189
-113
lines changed

tests/topics/test_topic_reader.py

+79-63
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
import ydb
66

7+
from ydb._topic_reader.events import OnPartitionGetStartOffsetResponse
8+
79

810
@pytest.mark.asyncio
911
class TestTopicReaderAsyncIO:
@@ -253,6 +255,11 @@ async def wait(fut):
253255
await reader1.close()
254256

255257

258+
@pytest.fixture()
259+
def topic_selector(topic_with_messages):
260+
return ydb.TopicReaderSelector(path=topic_with_messages, partitions=[0])
261+
262+
256263
@pytest.mark.asyncio
257264
class TestTopicNoConsumerReaderAsyncIO:
258265
async def test_reader_with_no_partition_ids_raises(self, driver, topic_with_messages):
@@ -262,57 +269,63 @@ async def test_reader_with_no_partition_ids_raises(self, driver, topic_with_mess
262269
consumer=None,
263270
)
264271

265-
async def test_reader_with_default_lambda(self, driver, topic_with_messages):
266-
reader = driver.topic_client.reader(
267-
topic_with_messages,
268-
consumer=None,
269-
partition_ids=[0],
270-
)
272+
async def test_reader_with_no_partition_ids_selector_raises(self, driver, topic_selector):
273+
topic_selector.partitions = None
274+
275+
with pytest.raises(ydb.Error):
276+
driver.topic_client.reader(
277+
topic_selector,
278+
consumer=None,
279+
)
280+
281+
async def test_reader_with_default_lambda(self, driver, topic_selector):
282+
reader = driver.topic_client.reader(topic_selector, consumer=None)
271283
msg = await reader.receive_message()
272284

273285
assert msg.seqno == 1
274286

275287
await reader.close()
276288

277-
async def test_reader_with_sync_lambda(self, driver, topic_with_messages):
278-
def sync_lambda(partition_id: int):
279-
assert partition_id == 0
280-
return 1
289+
async def test_reader_with_sync_lambda(self, driver, topic_selector):
290+
class CustomEventHandler(ydb.TopicReaderEventHandler):
291+
def on_partition_get_start_offset(self, event):
292+
assert event.partition_id == 0
293+
return OnPartitionGetStartOffsetResponse(1)
281294

282295
reader = driver.topic_client.reader(
283-
topic_with_messages,
296+
topic_selector,
284297
consumer=None,
285-
partition_ids=[0],
286-
get_start_offset_lambda=sync_lambda,
298+
event_handler=CustomEventHandler(),
287299
)
300+
288301
msg = await reader.receive_message()
289302

290303
assert msg.seqno == 2
291304

292305
await reader.close()
293306

294-
async def test_reader_with_async_lambda(self, driver, topic_with_messages):
295-
async def async_lambda(partition_id: int) -> int:
296-
assert partition_id == 0
297-
return 1
307+
async def test_reader_with_async_lambda(self, driver, topic_selector):
308+
class CustomEventHandler(ydb.TopicReaderEventHandler):
309+
async def on_partition_get_start_offset(self, event):
310+
assert event.partition_id == 0
311+
return OnPartitionGetStartOffsetResponse(1)
298312

299313
reader = driver.topic_client.reader(
300-
topic_with_messages,
314+
topic_selector,
301315
consumer=None,
302-
partition_ids=[0],
303-
get_start_offset_lambda=async_lambda,
316+
event_handler=CustomEventHandler(),
304317
)
318+
305319
msg = await reader.receive_message()
306320

307321
assert msg.seqno == 2
308322

309323
await reader.close()
310324

311-
async def test_commit_not_allowed(self, driver, topic_with_messages):
325+
async def test_commit_not_allowed(self, driver, topic_selector):
312326
reader = driver.topic_client.reader(
313-
topic_with_messages,
327+
topic_selector,
314328
consumer=None,
315-
partition_ids=[0],
316329
)
317330
batch = await reader.receive_batch()
318331

@@ -324,18 +337,18 @@ async def test_commit_not_allowed(self, driver, topic_with_messages):
324337

325338
await reader.close()
326339

327-
async def test_offsets_updated_after_reconnect(self, driver, topic_with_messages):
340+
async def test_offsets_updated_after_reconnect(self, driver, topic_selector):
328341
current_offset = 0
329342

330-
def get_start_offset_lambda(partition_id: int) -> int:
331-
nonlocal current_offset
332-
return current_offset
343+
class CustomEventHandler(ydb.TopicReaderEventHandler):
344+
def on_partition_get_start_offset(self, event):
345+
nonlocal current_offset
346+
return OnPartitionGetStartOffsetResponse(current_offset)
333347

334348
reader = driver.topic_client.reader(
335-
topic_with_messages,
349+
topic_selector,
336350
consumer=None,
337-
partition_ids=[0],
338-
get_start_offset_lambda=get_start_offset_lambda,
351+
event_handler=CustomEventHandler(),
339352
)
340353
msg = await reader.receive_message()
341354

@@ -361,57 +374,63 @@ def test_reader_with_no_partition_ids_raises(self, driver_sync, topic_with_messa
361374
consumer=None,
362375
)
363376

364-
def test_reader_with_default_lambda(self, driver_sync, topic_with_messages):
365-
reader = driver_sync.topic_client.reader(
366-
topic_with_messages,
367-
consumer=None,
368-
partition_ids=[0],
369-
)
377+
def test_reader_with_no_partition_ids_selector_raises(self, driver_sync, topic_selector):
378+
topic_selector.partitions = None
379+
380+
with pytest.raises(ydb.Error):
381+
driver_sync.topic_client.reader(
382+
topic_selector,
383+
consumer=None,
384+
)
385+
386+
def test_reader_with_default_lambda(self, driver_sync, topic_selector):
387+
reader = driver_sync.topic_client.reader(topic_selector, consumer=None)
370388
msg = reader.receive_message()
371389

372390
assert msg.seqno == 1
373391

374392
reader.close()
375393

376-
def test_reader_with_sync_lambda(self, driver_sync, topic_with_messages):
377-
def sync_lambda(partition_id: int):
378-
assert partition_id == 0
379-
return 1
394+
def test_reader_with_sync_lambda(self, driver_sync, topic_selector):
395+
class CustomEventHandler(ydb.TopicReaderEventHandler):
396+
def on_partition_get_start_offset(self, event):
397+
assert event.partition_id == 0
398+
return OnPartitionGetStartOffsetResponse(1)
380399

381400
reader = driver_sync.topic_client.reader(
382-
topic_with_messages,
401+
topic_selector,
383402
consumer=None,
384-
partition_ids=[0],
385-
get_start_offset_lambda=sync_lambda,
403+
event_handler=CustomEventHandler(),
386404
)
405+
387406
msg = reader.receive_message()
388407

389408
assert msg.seqno == 2
390409

391410
reader.close()
392411

393-
def test_reader_with_async_lambda(self, driver_sync, topic_with_messages):
394-
async def async_lambda(partition_id: int) -> int:
395-
assert partition_id == 0
396-
return 1
412+
def test_reader_with_async_lambda(self, driver_sync, topic_selector):
413+
class CustomEventHandler(ydb.TopicReaderEventHandler):
414+
async def on_partition_get_start_offset(self, event):
415+
assert event.partition_id == 0
416+
return OnPartitionGetStartOffsetResponse(1)
397417

398418
reader = driver_sync.topic_client.reader(
399-
topic_with_messages,
419+
topic_selector,
400420
consumer=None,
401-
partition_ids=[0],
402-
get_start_offset_lambda=async_lambda,
421+
event_handler=CustomEventHandler(),
403422
)
423+
404424
msg = reader.receive_message()
405425

406426
assert msg.seqno == 2
407427

408428
reader.close()
409429

410-
def test_commit_not_allowed(self, driver_sync, topic_with_messages):
430+
def test_commit_not_allowed(self, driver_sync, topic_selector):
411431
reader = driver_sync.topic_client.reader(
412-
topic_with_messages,
432+
topic_selector,
413433
consumer=None,
414-
partition_ids=[0],
415434
)
416435
batch = reader.receive_batch()
417436

@@ -421,23 +440,20 @@ def test_commit_not_allowed(self, driver_sync, topic_with_messages):
421440
with pytest.raises(ydb.Error):
422441
reader.commit_with_ack(batch)
423442

424-
with pytest.raises(ydb.Error):
425-
reader.async_commit_with_ack(batch)
426-
427443
reader.close()
428444

429-
def test_offsets_updated_after_reconnect(self, driver_sync, topic_with_messages):
445+
def test_offsets_updated_after_reconnect(self, driver_sync, topic_selector):
430446
current_offset = 0
431447

432-
def get_start_offset_lambda(partition_id: int) -> int:
433-
nonlocal current_offset
434-
return current_offset
448+
class CustomEventHandler(ydb.TopicReaderEventHandler):
449+
def on_partition_get_start_offset(self, event):
450+
nonlocal current_offset
451+
return OnPartitionGetStartOffsetResponse(current_offset)
435452

436453
reader = driver_sync.topic_client.reader(
437-
topic_with_messages,
454+
topic_selector,
438455
consumer=None,
439-
partition_ids=[0],
440-
get_start_offset_lambda=get_start_offset_lambda,
456+
event_handler=CustomEventHandler(),
441457
)
442458
msg = reader.receive_message()
443459

ydb/_topic_reader/events.py

+72
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import asyncio
2+
from dataclasses import dataclass
3+
from typing import Awaitable, Union
4+
5+
from ..issues import ClientInternalError
6+
7+
8+
class ReaderEvent:
9+
pass
10+
11+
12+
@dataclass
13+
class OnCommit(ReaderEvent):
14+
topic: str
15+
offset: int
16+
17+
18+
@dataclass
19+
class OnPartitionGetStartOffsetRequest(ReaderEvent):
20+
topic: str
21+
partition_id: int
22+
23+
24+
@dataclass
25+
class OnPartitionGetStartOffsetResponse:
26+
start_offset: int
27+
28+
29+
class OnInitPartition(ReaderEvent):
30+
pass
31+
32+
33+
class OnShutdownPartition:
34+
pass
35+
36+
37+
TopicEventDispatchType = Union[OnPartitionGetStartOffsetResponse, None]
38+
39+
40+
class ReaderEventHandler:
41+
def on_commit(self, event: OnCommit) -> Union[None, Awaitable[None]]:
42+
pass
43+
44+
def on_partition_get_start_offset(
45+
self,
46+
event: OnPartitionGetStartOffsetRequest,
47+
) -> Union[OnPartitionGetStartOffsetResponse, Awaitable[OnPartitionGetStartOffsetResponse]]:
48+
pass
49+
50+
def on_init_partition(self, event: OnInitPartition) -> Union[None, Awaitable[None]]:
51+
pass
52+
53+
def on_shutdown_partition(self, event: OnShutdownPartition) -> Union[None, Awaitable[None]]:
54+
pass
55+
56+
async def _dispatch(self, event: ReaderEvent) -> Awaitable[TopicEventDispatchType]:
57+
f = None
58+
if isinstance(event, OnCommit):
59+
f = self.on_commit
60+
elif isinstance(event, OnPartitionGetStartOffsetRequest):
61+
f = self.on_partition_get_start_offset
62+
elif isinstance(event, OnInitPartition):
63+
f = self.on_init_partition
64+
elif isinstance(event, OnShutdownPartition):
65+
f = self.on_shutdown_partition
66+
else:
67+
raise ClientInternalError("Unsupported topic reader event")
68+
69+
if asyncio.iscoroutinefunction(f):
70+
return await f(event)
71+
72+
return f(event)

ydb/_topic_reader/topic_reader.py

+4-24
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@
33
import datetime
44
from dataclasses import dataclass
55
from typing import (
6-
Awaitable,
76
Union,
87
Optional,
98
List,
109
Mapping,
1110
Callable,
1211
)
1312

13+
from .events import ReaderEventHandler
1414
from ..retries import RetrySettings
1515
from .._grpc.grpcwrapper.ydb_topic import StreamReadMessage, OffsetsRange
1616

@@ -21,6 +21,7 @@ class PublicTopicSelector:
2121
partitions: Optional[Union[int, List[int]]] = None
2222
read_from: Optional[datetime.datetime] = None
2323
max_lag: Optional[datetime.timedelta] = None
24+
read_offset: Optional[int] = None
2425

2526
def _to_topic_read_settings(self) -> StreamReadMessage.InitRequest.TopicReadSettings:
2627
partitions = self.partitions
@@ -54,9 +55,7 @@ class PublicReaderSettings:
5455
# decoder_executor, must be set for handle non raw messages
5556
decoder_executor: Optional[concurrent.futures.Executor] = None
5657
update_token_interval: Union[int, float] = 3600
57-
58-
partition_ids: Optional[List[int]] = None
59-
get_start_offset_lambda: Optional[Union[Callable[[int], int], Callable[[int], Awaitable[int]]]] = None
58+
event_handler: Optional[ReaderEventHandler] = None
6059

6160
def __post_init__(self):
6261
# check possible create init message
@@ -73,7 +72,7 @@ def _init_message(self) -> StreamReadMessage.InitRequest:
7372

7473
for index, selector in enumerate(selectors):
7574
if isinstance(selector, str):
76-
selectors[index] = PublicTopicSelector(path=selector, partitions=self.partition_ids)
75+
selectors[index] = PublicTopicSelector(path=selector)
7776
elif isinstance(selector, PublicTopicSelector):
7877
pass
7978
else:
@@ -89,25 +88,6 @@ def _retry_settings(self) -> RetrySettings:
8988
return RetrySettings(idempotent=True)
9089

9190

92-
class Events:
93-
class OnCommit:
94-
topic: str
95-
offset: int
96-
97-
class OnPartitionGetStartOffsetRequest:
98-
topic: str
99-
partition_id: int
100-
101-
class OnPartitionGetStartOffsetResponse:
102-
start_offset: int
103-
104-
class OnInitPartition:
105-
pass
106-
107-
class OnShutdownPatition:
108-
pass
109-
110-
11191
class RetryPolicy:
11292
connection_timeout_sec: float
11393
overload_timeout_sec: float

0 commit comments

Comments
 (0)