Skip to content

bug: CancelledError while closing topic writer #508

Open
@alex2211-put

Description

@alex2211-put

Bug Report

YDB Python SDK version:

3.18.2

Environment

Ubuntu 22.04.4, python3.7

Current behavior:

After calling writer.close(), the following logs appear:
Close writer reconnector

Client request_iterator raised exception:
Traceback (most recent call last):
File "/usr/lib/yandex/taxi-py3-2/lib/python3.7/site-packages/grpc/aio/_call.py", line 406, in _consume_request_iterator
async for request in request_iterator:
File "/usr/lib/yandex/taxi-py3-2/lib/python3.7/site-packages/ydb/_grpc/grpcwrapper/common_utils.py", line 89, in __anext__
item = await self._queue.get()
File "/usr/lib/python3.7/asyncio/queues.py", line 159, in get
await getter
concurrent.futures._base.CancelledError

And our process terminates with concurrent.futures._base.CancelledError

Expected behavior:

CancelledError is not thrown when calling write.close() or driver.stop()

Related code:

import asyncio
import attr
from concurrent import futures
import datetime
import logging
import typing

import ydb
from ydb import aio
from ydb import issues
from ydb import topic as ydb_topic


console = logging.StreamHandler()
console.setLevel(logging.DEBUG)
logging.getLogger('').addHandler(console)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
_DRIVER_WAITING_TIMEOUT = 10


class BaseError(Exception):
    pass


class YdbConnectionError(BaseError):
    pass


_TIMEOUT = 20


_WRITER_ERRORS = (
    issues.ConnectionError,
    issues.SessionExpired,
    ydb_topic.TopicError,
)


class BaseError(Exception):
    pass


class YdbTopicTimeoutError(BaseError):
    pass


class YdbTopicWriterError(BaseError):
    pass


@attr.dataclass
class _Writer:
    writer: ydb.TopicWriterAsyncIO
    is_alive: bool = attr.ib(default=True, init=False)

    def write(self, message):
        if not self.is_alive:
            raise RuntimeError('Writer already died')
        utcnow = datetime.datetime.utcnow()
        return self.writer.write(ydb.TopicWriterMessage(data=message, created_at=utcnow))

    async def close(self):
        try:
            await self.writer.close()
        except futures.CancelledError:  # do not fail on CancelledError from ydb.
            # https://github.com/ydb-platform/ydb-python-sdk/issues/508
            logger.info('Got CancelledError while closing writer. Writer close status: %s', self.writer._closed)


class WriterKeeper:
    def __init__(self, ydb_driver: aio.Driver, max_inflight: int):
        self._ydb_driver = ydb_driver
        self._max_inflight = max_inflight
        self._writers: typing.Dict[typing.Tuple[str, bytes, typing.Optional[int]], _Writer] = {}

    async def write(
        self,
        data_chunk,
        topic: str,
        source_id: bytes,
        partition_group: typing.Optional[int] = None,
    ) -> None:
        writer: _Writer = await self._get_writer(
            topic,
            source_id,
            partition_group,
        )

        messages_written = 0
        total_messages_count = len(data_chunk)
        max_inflight = self._max_inflight

        current_message = 0
        messages_inflight: typing.List[asyncio.Future] = []
        logger.debug(
            'Trying to write %d message(s) to YDB topic',
            total_messages_count,
        )
        while messages_written < total_messages_count:
            while len(messages_inflight) < max_inflight and current_message < total_messages_count:
                messages_inflight.append(
                    writer.write(data_chunk[current_message]),
                )
                current_message += 1

            try:
                done, pending = await asyncio.wait(
                    messages_inflight,
                    timeout=_TIMEOUT,
                    return_when=asyncio.ALL_COMPLETED,
                )
            except _WRITER_ERRORS as exc:
                writer.is_alive = False
                raise YdbTopicWriterError(str(exc))

            completed_count = 0
            if pending:
                raise YdbTopicWriterError(
                    'Timeout occurred during messages writing',
                )
            for task in done:
                if task.exception():
                    writer.is_alive = False
                    raise YdbTopicWriterError(
                        f'Exception occurred during message write: {str(task.exception())}',
                    )
                completed_count += 1
            messages_written += completed_count

            messages_inflight = messages_inflight[completed_count:]

        if messages_inflight:
            raise RuntimeError(
                f'Unexpected messages_inflight, count: %d: {len(messages_inflight)}',
            )

    async def close(self):
        for writer in self._writers.values():
            await writer.close()

    async def _get_writer(self, topic: str, source_id: bytes, partition_group: typing.Optional[int] = None):
        writer_id = (topic, source_id, partition_group)
        writer = self._writers.get(writer_id)
        if writer is None or not writer.is_alive:
            self._writers[writer_id] = await self._create_writer(
                topic,
                source_id,
                partition_group,
            )
        return self._writers[writer_id]

    async def _create_writer(self, topic: str, source_id: bytes, partition_id: typing.Optional[int] = None):
        writer = self._ydb_driver.topic_client.writer(
            topic=topic,
            producer_id=source_id,
            partition_id=partition_id,
            auto_created_at=False,
        )
        try:
            start_result = await asyncio.wait_for(
                writer.wait_init(),
                timeout=_TIMEOUT,
            )
        except futures.TimeoutError as exc:
            raise YdbTopicTimeoutError(f'Timeout at init YDB topic writer: {exc}')

        if not isinstance(start_result, ydb.topic.TopicWriterInitInfo):
            raise YdbTopicWriterError(
                f'Error occurred on start of writer: {start_result}',
            )
        return _Writer(writer)


class YdbTopicClient:
    def __init__(
        self,
        *,
        endpoint: str,
        auth_token: str,
        database: typing.Optional[str],
        max_inflight: int = 100,
    ):
        logger.info('endpoint=%s, database=%s', endpoint, database)
        driver_config = ydb.DriverConfig(
            endpoint=endpoint,
            database=database,
            credentials=ydb.AuthTokenCredentials(auth_token),
        )

        self._driver_config: ydb.DriverConfig = driver_config
        self._max_inflight = max_inflight
        self._driver = None
        self._writer_keeper: typing.Optional[WriterKeeper] = None

    async def close(self):
        if self._writer_keeper is not None:
            await self._writer_keeper.close()
        if self._driver is not None:
            await self._driver.stop()

    @property
    def ydb_driver(self) -> aio.Driver:
        if self._driver is None:
            raise RuntimeError('Not initialized')
        return self._driver

    @property
    def writer_keeper(self) -> WriterKeeper:
        if self._writer_keeper is None:
            raise RuntimeError('Not initialized')
        return self._writer_keeper

    async def init(self):  # for migration from lb driver
        if self._driver is None:
            try:
                driver = aio.Driver(self._driver_config)
                await driver.wait(timeout=_DRIVER_WAITING_TIMEOUT)
            except TimeoutError:
                msg = 'YDB connection got timeout during waiting for driver answer'
                logger.error(msg)
                raise YdbConnectionError(msg)
            self._driver = driver
            self._writer_keeper = WriterKeeper(
                self.ydb_driver,
                max_inflight=self._max_inflight,
            )

    async def write(
        self,
        data_chunk,
        topic: str,
        source_id: bytes,
        partition_group: typing.Optional[int] = None,
    ) -> None:
        await self.init()
        if not data_chunk:
            raise ValueError('Empty input bytes chunk: %s' % data_chunk)
        await self.writer_keeper.write(data_chunk, topic, source_id, partition_group)


async def main():
    logger.debug('Starting process')
    client = YdbTopicClient(endpoint='logbroker.yandex.net:2135', database='/Root/logbroker-federation/logbroker-playground', auth_token='some_token')
    await client.init()
    await asyncio.sleep(10)
    await client.write(
        data_chunk=["{1: 1}"],
        topic='/Root/logbroker-federation/logbroker-playground/alik-put/replication',
        source_id="9541a29aa7734ab7b87143a3384d8429".encode(),
        partition_group=0,
    )
    await asyncio.sleep(10)
    await client.close()
    await asyncio.sleep(10)
    logger.debug('Finish process')

asyncio.run(main())

To catch the error locally, I edited the ydb/_grpc/grpcwrapper/common_utils file.py in this way:

class QueueToIteratorAsyncIO:
    __slots__ = ("_queue",)

    def __init__(self, q: asyncio.Queue):
        self._queue = q

    def __aiter__(self):
        return self

    async def __anext__(self):
        try:
            item = await self._queue.get()
            if item is _stop_grpc_connection_marker:
                raise StopAsyncIteration()
        except Exception as exc:  # catching CancelledError
            logger.info('got exception: %s', type(exc))
        return item

Resulted trace:

>>> from replication.common.ydb_tools import main
Starting process
INFO 2024-10-24 17:34:21,821 endpoint=logbroker.yandex.net:2135, database=/Root/logbroker-federation/logbroker-playground
endpoint=logbroker.yandex.net:2135, database=/Root/logbroker-federation/logbroker-playground
INFO 2024-10-24 17:34:21,882 Closing channel for endpoint logbroker.yandex.net:2135
Closing channel for endpoint logbroker.yandex.net:2135
Trying to write 1 message(s) to YDB topic
INFO 2024-10-24 17:34:41,979 Close writer reconnector
Close writer reconnector
INFO 2024-10-24 17:34:41,979 Stop topic writer: topic writer was stopped by call close
Stop topic writer: topic writer was stopped by call close
INFO 2024-10-24 17:34:41,980 got exception: <class 'concurrent.futures._base.CancelledError'>
got exception: <class 'concurrent.futures._base.CancelledError'>
INFO 2024-10-24 17:34:41,981 Closing channel for endpoint logbroker.yandex.net:2135
Closing channel for endpoint logbroker.yandex.net:2135
INFO 2024-10-24 17:34:41,982 Closing channel for endpoint vla0-4100.lbk-vla.logbroker.yandex.net:31051
Closing channel for endpoint vla0-4100.lbk-vla.logbroker.yandex.net:31051
INFO 2024-10-24 17:34:41,982 Closing channel for endpoint vla0-4033.lbk-vla.logbroker.yandex.net:31051
Closing channel for endpoint vla0-4033.lbk-vla.logbroker.yandex.net:31051
INFO 2024-10-24 17:34:41,982 Closing channel for endpoint vla3-2217.lbk-vla.logbroker.yandex.net:31041
Closing channel for endpoint vla3-2217.lbk-vla.logbroker.yandex.net:31041
INFO 2024-10-24 17:34:41,982 Closing channel for endpoint vla4-0115.lbk-vla.logbroker.yandex.net:31031
Closing channel for endpoint vla4-0115.lbk-vla.logbroker.yandex.net:31031
INFO 2024-10-24 17:34:41,982 Closing channel for endpoint vla3-8996.lbk-vla.logbroker.yandex.net:31051
Closing channel for endpoint vla3-8996.lbk-vla.logbroker.yandex.net:31051
INFO 2024-10-24 17:34:41,983 Closing channel for endpoint vla3-2215.lbk-vla.logbroker.yandex.net:31041
Closing channel for endpoint vla3-2215.lbk-vla.logbroker.yandex.net:31041
INFO 2024-10-24 17:34:41,983 Closing channel for endpoint vla0-4039.lbk-vla.logbroker.yandex.net:31041
Closing channel for endpoint vla0-4039.lbk-vla.logbroker.yandex.net:31041
INFO 2024-10-24 17:34:41,983 Closing channel for endpoint vla3-2164.lbk-vla.logbroker.yandex.net:31041
Closing channel for endpoint vla3-2164.lbk-vla.logbroker.yandex.net:31041
INFO 2024-10-24 17:34:41,983 Closing channel for endpoint vla0-3999.lbk-vla.logbroker.yandex.net:31051
Closing channel for endpoint vla0-3999.lbk-vla.logbroker.yandex.net:31051
INFO 2024-10-24 17:34:41,990 Successfully terminated discovery process
Successfully terminated discovery process
Finish process

Other information:

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions