Skip to content

No Consumer Reader #580

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 218 additions & 0 deletions tests/topics/test_topic_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,3 +251,221 @@ async def wait(fut):

await reader0.close()
await reader1.close()


@pytest.fixture()
def topic_selector(topic_with_messages):
return ydb.TopicReaderSelector(path=topic_with_messages, partitions=[0])


@pytest.mark.asyncio
class TestTopicNoConsumerReaderAsyncIO:
async def test_reader_with_no_partition_ids_raises(self, driver, topic_with_messages):
with pytest.raises(ydb.Error):
driver.topic_client.reader(
topic_with_messages,
consumer=None,
)

async def test_reader_with_no_partition_ids_selector_raises(self, driver, topic_selector):
topic_selector.partitions = None

with pytest.raises(ydb.Error):
driver.topic_client.reader(
topic_selector,
consumer=None,
)

async def test_reader_with_default_lambda(self, driver, topic_selector):
reader = driver.topic_client.reader(topic_selector, consumer=None)
msg = await reader.receive_message()

assert msg.seqno == 1

await reader.close()

async def test_reader_with_sync_lambda(self, driver, topic_selector):
class CustomEventHandler(ydb.TopicReaderEvents.EventHandler):
def on_partition_get_start_offset(self, event):
assert topic_selector.path.endswith(event.topic)
assert event.partition_id == 0
return ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse(1)

reader = driver.topic_client.reader(
topic_selector,
consumer=None,
event_handler=CustomEventHandler(),
)

msg = await reader.receive_message()

assert msg.seqno == 2

await reader.close()

async def test_reader_with_async_lambda(self, driver, topic_selector):
class CustomEventHandler(ydb.TopicReaderEvents.EventHandler):
async def on_partition_get_start_offset(self, event):
assert topic_selector.path.endswith(event.topic)
assert event.partition_id == 0
return ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse(1)

reader = driver.topic_client.reader(
topic_selector,
consumer=None,
event_handler=CustomEventHandler(),
)

msg = await reader.receive_message()

assert msg.seqno == 2

await reader.close()

async def test_commit_not_allowed(self, driver, topic_selector):
reader = driver.topic_client.reader(
topic_selector,
consumer=None,
)
batch = await reader.receive_batch()

with pytest.raises(ydb.Error):
reader.commit(batch)

with pytest.raises(ydb.Error):
await reader.commit_with_ack(batch)

await reader.close()

async def test_offsets_updated_after_reconnect(self, driver, topic_selector):
current_offset = 0

class CustomEventHandler(ydb.TopicReaderEvents.EventHandler):
def on_partition_get_start_offset(self, event):
nonlocal current_offset
return ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse(current_offset)

reader = driver.topic_client.reader(
topic_selector,
consumer=None,
event_handler=CustomEventHandler(),
)
msg = await reader.receive_message()

assert msg.seqno == current_offset + 1

current_offset += 2
reader._reconnector._stream_reader._set_first_error(ydb.Unavailable("some retriable error"))

await asyncio.sleep(0)

msg = await reader.receive_message()

assert msg.seqno == current_offset + 1

await reader.close()


class TestTopicReaderWithoutConsumer:
def test_reader_with_no_partition_ids_raises(self, driver_sync, topic_with_messages):
with pytest.raises(ydb.Error):
driver_sync.topic_client.reader(
topic_with_messages,
consumer=None,
)

def test_reader_with_no_partition_ids_selector_raises(self, driver_sync, topic_selector):
topic_selector.partitions = None

with pytest.raises(ydb.Error):
driver_sync.topic_client.reader(
topic_selector,
consumer=None,
)

def test_reader_with_default_lambda(self, driver_sync, topic_selector):
reader = driver_sync.topic_client.reader(topic_selector, consumer=None)
msg = reader.receive_message()

assert msg.seqno == 1

reader.close()

def test_reader_with_sync_lambda(self, driver_sync, topic_selector):
class CustomEventHandler(ydb.TopicReaderEvents.EventHandler):
def on_partition_get_start_offset(self, event):
assert topic_selector.path.endswith(event.topic)
assert event.partition_id == 0
return ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse(1)

reader = driver_sync.topic_client.reader(
topic_selector,
consumer=None,
event_handler=CustomEventHandler(),
)

msg = reader.receive_message()

assert msg.seqno == 2

reader.close()

def test_reader_with_async_lambda(self, driver_sync, topic_selector):
class CustomEventHandler(ydb.TopicReaderEvents.EventHandler):
async def on_partition_get_start_offset(self, event):
assert topic_selector.path.endswith(event.topic)
assert event.partition_id == 0
return ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse(1)

reader = driver_sync.topic_client.reader(
topic_selector,
consumer=None,
event_handler=CustomEventHandler(),
)

msg = reader.receive_message()

assert msg.seqno == 2

reader.close()

def test_commit_not_allowed(self, driver_sync, topic_selector):
reader = driver_sync.topic_client.reader(
topic_selector,
consumer=None,
)
batch = reader.receive_batch()

with pytest.raises(ydb.Error):
reader.commit(batch)

with pytest.raises(ydb.Error):
reader.commit_with_ack(batch)

reader.close()

def test_offsets_updated_after_reconnect(self, driver_sync, topic_selector):
current_offset = 0

class CustomEventHandler(ydb.TopicReaderEvents.EventHandler):
def on_partition_get_start_offset(self, event):
nonlocal current_offset
return ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse(current_offset)

reader = driver_sync.topic_client.reader(
topic_selector,
consumer=None,
event_handler=CustomEventHandler(),
)
msg = reader.receive_message()

assert msg.seqno == current_offset + 1

current_offset += 2
reader._async_reader._reconnector._stream_reader._set_first_error(ydb.Unavailable("some retriable error"))

msg = reader.receive_message()

assert msg.seqno == current_offset + 1

reader.close()
5 changes: 3 additions & 2 deletions ydb/_grpc/grpcwrapper/ydb_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,12 +439,13 @@ def from_proto(
@dataclass
class InitRequest(IToProto):
topics_read_settings: List["StreamReadMessage.InitRequest.TopicReadSettings"]
consumer: str
consumer: Optional[str]
auto_partitioning_support: bool

def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.InitRequest:
res = ydb_topic_pb2.StreamReadMessage.InitRequest()
res.consumer = self.consumer
if self.consumer is not None:
res.consumer = self.consumer
for settings in self.topics_read_settings:
res.topics_read_settings.append(settings.to_proto())
res.auto_partitioning_support = self.auto_partitioning_support
Expand Down
81 changes: 81 additions & 0 deletions ydb/_topic_reader/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import asyncio
from dataclasses import dataclass
from typing import Awaitable, Union

from ..issues import ClientInternalError

__all__ = [
"OnCommit",
"OnPartitionGetStartOffsetRequest",
"OnPartitionGetStartOffsetResponse",
"OnInitPartition",
"OnShutdownPartition",
"EventHandler",
]


class BaseReaderEvent:
pass


@dataclass
class OnCommit(BaseReaderEvent):
topic: str
offset: int


@dataclass
class OnPartitionGetStartOffsetRequest(BaseReaderEvent):
topic: str
partition_id: int


@dataclass
class OnPartitionGetStartOffsetResponse:
start_offset: int


class OnInitPartition(BaseReaderEvent):
pass


class OnShutdownPartition:
pass


TopicEventDispatchType = Union[OnPartitionGetStartOffsetResponse, None]


class EventHandler:
def on_commit(self, event: OnCommit) -> Union[None, Awaitable[None]]:
pass

def on_partition_get_start_offset(
self,
event: OnPartitionGetStartOffsetRequest,
) -> Union[OnPartitionGetStartOffsetResponse, Awaitable[OnPartitionGetStartOffsetResponse]]:
pass

def on_init_partition(self, event: OnInitPartition) -> Union[None, Awaitable[None]]:
pass

def on_shutdown_partition(self, event: OnShutdownPartition) -> Union[None, Awaitable[None]]:
pass

async def _dispatch(self, event: BaseReaderEvent) -> Awaitable[TopicEventDispatchType]:
f = None
if isinstance(event, OnCommit):
f = self.on_commit
elif isinstance(event, OnPartitionGetStartOffsetRequest):
f = self.on_partition_get_start_offset
elif isinstance(event, OnInitPartition):
f = self.on_init_partition
elif isinstance(event, OnShutdownPartition):
f = self.on_shutdown_partition
else:
raise ClientInternalError("Unsupported topic reader event")

if asyncio.iscoroutinefunction(f):
return await f(event)

return f(event)
26 changes: 5 additions & 21 deletions ydb/_topic_reader/topic_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
Callable,
)

from .events import EventHandler
from ..retries import RetrySettings
from .._grpc.grpcwrapper.ydb_topic import StreamReadMessage, OffsetsRange

Expand All @@ -20,6 +21,7 @@ class PublicTopicSelector:
partitions: Optional[Union[int, List[int]]] = None
read_from: Optional[datetime.datetime] = None
max_lag: Optional[datetime.timedelta] = None
read_offset: Optional[int] = None

def _to_topic_read_settings(self) -> StreamReadMessage.InitRequest.TopicReadSettings:
partitions = self.partitions
Expand All @@ -42,7 +44,7 @@ def _to_topic_read_settings(self) -> StreamReadMessage.InitRequest.TopicReadSett

@dataclass
class PublicReaderSettings:
consumer: str
consumer: Optional[str]
topic: TopicSelectorTypes
buffer_size_bytes: int = 50 * 1024 * 1024
auto_partitioning_support: bool = True
Expand All @@ -53,13 +55,14 @@ class PublicReaderSettings:
# decoder_executor, must be set for handle non raw messages
decoder_executor: Optional[concurrent.futures.Executor] = None
update_token_interval: Union[int, float] = 3600
event_handler: Optional[EventHandler] = None

def __post_init__(self):
# check possible create init message
_ = self._init_message()

def _init_message(self) -> StreamReadMessage.InitRequest:
if not isinstance(self.consumer, str):
if self.consumer is not None and not isinstance(self.consumer, str):
raise TypeError("Unsupported type for customer field: '%s'" % type(self.consumer))

if isinstance(self.topic, list):
Expand All @@ -85,25 +88,6 @@ def _retry_settings(self) -> RetrySettings:
return RetrySettings(idempotent=True)


class Events:
class OnCommit:
topic: str
offset: int

class OnPartitionGetStartOffsetRequest:
topic: str
partition_id: int

class OnPartitionGetStartOffsetResponse:
start_offset: int

class OnInitPartition:
pass

class OnShutdownPatition:
pass


class RetryPolicy:
connection_timeout_sec: float
overload_timeout_sec: float
Expand Down
Loading
Loading