Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tests/slo/playground/configs/chaos.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ do
sh -c "docker stop ${nodeForChaos} -t 10"
sh -c "docker start ${nodeForChaos}"

sleep 60
sleep 30
done

# for i in $(seq 1 3)
Expand Down
1 change: 1 addition & 0 deletions tests/slo/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
requests==2.28.2
ratelimiter==1.2.0.post0
aiolimiter==1.1.0
prometheus-client==0.17.0
quantile-estimator==0.1.2
5 changes: 3 additions & 2 deletions tests/slo/slo_runner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ docker compose -f playground/configs/compose.yaml up -d --wait

../../.venv/bin/python ./src topic-create grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic

../../.venv/bin/python ./src topic-run grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic --prom-pgw "" --read-threads 0 --time 10
../../.venv/bin/python ./src topic-run grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic --prom-pgw "" --read-threads 0 --write-rps 1 --time 120

../../.venv/bin/python ./src topic-run grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic --prom-pgw "" --write-threads 0 --read-rps 1 --debug --time 600
# ../../.venv/bin/python ./src topic-run grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic --prom-pgw "" --read-threads 0 --time 5
# ../../.venv/bin/python ./src topic-run grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic --prom-pgw "" --write-threads 0 --read-rps 1 --time 200
130 changes: 130 additions & 0 deletions tests/slo/src/jobs/async_topic_jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import asyncio
import ydb.aio
import time
import logging
from aiolimiter import AsyncLimiter

from .base import BaseJobManager
from core.metrics import OP_TYPE_READ, OP_TYPE_WRITE

logger = logging.getLogger(__name__)


class AsyncTopicJobManager(BaseJobManager):
def __init__(self, driver, args, metrics):
super().__init__(driver, args, metrics)
self.driver: ydb.aio.Driver = driver

async def run_tests(self):
tasks = [
*await self._run_topic_write_jobs(),
*await self._run_topic_read_jobs(),
*self._run_metric_job(),
]

await asyncio.gather(*tasks)

async def _run_topic_write_jobs(self):
logger.info("Start async topic write jobs")

write_limiter = AsyncLimiter(max_rate=self.args.write_rps, time_period=1)

tasks = []
for i in range(self.args.write_threads):
task = asyncio.create_task(self._run_topic_writes(write_limiter, i), name=f"slo_topic_write_{i}")
tasks.append(task)

return tasks

async def _run_topic_read_jobs(self):
logger.info("Start async topic read jobs")

read_limiter = AsyncLimiter(max_rate=self.args.read_rps, time_period=1)

tasks = []
for i in range(self.args.read_threads):
task = asyncio.create_task(self._run_topic_reads(read_limiter), name=f"slo_topic_read_{i}")
tasks.append(task)

return tasks

async def _run_topic_writes(self, limiter, partition_id=None):
start_time = time.time()
logger.info("Start async topic write workload")

async with self.driver.topic_client.writer(
self.args.path,
codec=ydb.TopicCodec.GZIP,
partition_id=partition_id,
) as writer:
logger.info("Async topic writer created")

message_count = 0
while time.time() - start_time < self.args.time:
async with limiter:
message_count += 1

content = f"message_{message_count}_{asyncio.current_task().get_name()}".encode("utf-8")

if len(content) < self.args.message_size:
content += b"x" * (self.args.message_size - len(content))

message = ydb.TopicWriterMessage(data=content)

ts = self.metrics.start((OP_TYPE_WRITE,))
try:
await writer.write_with_ack(message)
logger.info("Write message: %s", content)
self.metrics.stop((OP_TYPE_WRITE,), ts)
except Exception as e:
self.metrics.stop((OP_TYPE_WRITE,), ts, error=e)
logger.error("Write error: %s", e)

logger.info("Stop async topic write workload")

async def _run_topic_reads(self, limiter):
start_time = time.time()
logger.info("Start async topic read workload")

async with self.driver.topic_client.reader(
self.args.path,
self.args.consumer,
) as reader:
logger.info("Async topic reader created")

while time.time() - start_time < self.args.time:
async with limiter:
ts = self.metrics.start((OP_TYPE_READ,))
try:
msg = await reader.receive_message()
if msg is not None:
logger.info("Read message: %s", msg.data.decode())
await reader.commit_with_ack(msg)

self.metrics.stop((OP_TYPE_READ,), ts)
except Exception as e:
self.metrics.stop((OP_TYPE_READ,), ts, error=e)
logger.error("Read error: %s", e)

logger.info("Stop async topic read workload")

def _run_metric_job(self):
if not self.args.prom_pgw:
return []

# Create async task for metrics
task = asyncio.create_task(self._async_metric_sender(self.args.time), name="slo_metrics_sender")
return [task]

async def _async_metric_sender(self, runtime):
start_time = time.time()
logger.info("Start push metrics (async)")

limiter = AsyncLimiter(max_rate=10**6 // self.args.report_period, time_period=1)

while time.time() - start_time < runtime:
async with limiter:
# Call sync metrics.push() in executor to avoid blocking
await asyncio.get_event_loop().run_in_executor(None, self.metrics.push)

logger.info("Stop push metrics (async)")
1 change: 1 addition & 0 deletions tests/slo/src/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ def add_common_options(parser):
parser.add_argument("db", help="YDB database name")
parser.add_argument("-t", "--table-name", default="key_value", help="Table name")
parser.add_argument("--debug", action="store_true", help="Enable debug logging")
parser.add_argument("--async", action="store_true", help="Use async mode for operations")


def make_table_create_parser(subparsers):
Expand Down
33 changes: 32 additions & 1 deletion tests/slo/src/root_runner.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
import ydb
import ydb.aio
import logging
from typing import Dict

Expand Down Expand Up @@ -26,6 +28,15 @@ def run_command(self, args):
raise ValueError(f"Unknown prefix: {prefix}. Available: {list(self.runners.keys())}")

runner_instance = self.runners[prefix]()

# Check if async mode is requested and command is 'run'
if getattr(args, "async", False) and command == "run":
asyncio.run(self._run_async_command(args, runner_instance, command))
else:
self._run_sync_command(args, runner_instance, command)

def _run_sync_command(self, args, runner_instance, command):
"""Run command in synchronous mode"""
driver_config = ydb.DriverConfig(
args.endpoint,
database=args.db,
Expand All @@ -43,13 +54,33 @@ def run_command(self, args):
elif command == "cleanup":
runner_instance.cleanup(args)
else:
raise RuntimeError(f"Unknown command {command} for prefix {prefix}")
raise RuntimeError(f"Unknown command {command} for prefix {runner_instance.prefix}")
except BaseException:
logger.exception("Something went wrong")
raise
finally:
driver.stop(timeout=getattr(args, "shutdown_time", 10))

async def _run_async_command(self, args, runner_instance, command):
"""Run command in asynchronous mode"""
driver_config = ydb.DriverConfig(
args.endpoint,
database=args.db,
grpc_keep_alive_timeout=5000,
)

async with ydb.aio.Driver(driver_config) as driver:
await driver.wait(timeout=300)
try:
runner_instance.set_driver(driver)
if command == "run":
await runner_instance.run_async(args)
else:
raise RuntimeError(f"Async mode only supports 'run' command, got '{command}'")
except BaseException:
logger.exception("Something went wrong in async mode")
raise


def create_runner() -> SLORunner:
runner = SLORunner()
Expand Down
3 changes: 3 additions & 0 deletions tests/slo/src/runners/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ def create(self, args):
def run(self, args):
pass

async def run_async(self, args):
raise NotImplementedError(f"Async mode not supported for {self.prefix}")

@abstractmethod
def cleanup(self, args):
pass
17 changes: 17 additions & 0 deletions tests/slo/src/runners/topic_runner.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import time
import ydb
import ydb.aio

from .base import BaseRunner
from jobs.topic_jobs import TopicJobManager
from jobs.async_topic_jobs import AsyncTopicJobManager
from core.metrics import create_metrics


Expand Down Expand Up @@ -76,6 +78,21 @@ def run(self, args):
if hasattr(metrics, "reset"):
metrics.reset()

async def run_async(self, args):
"""Async version of topic SLO tests using ydb.aio.Driver"""
metrics = create_metrics(args.prom_pgw)

self.logger.info("Starting async topic SLO tests")

# Use async driver for topic operations
job_manager = AsyncTopicJobManager(self.driver, args, metrics)
await job_manager.run_tests()

self.logger.info("Async topic SLO tests completed")

if hasattr(metrics, "reset"):
metrics.reset()

def cleanup(self, args):
self.logger.info("Cleaning up topic: %s", args.path)

Expand Down
32 changes: 6 additions & 26 deletions ydb/_errors.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from dataclasses import dataclass
from typing import Optional, Union

import grpc
from typing import Optional

from . import issues

Expand All @@ -15,13 +13,18 @@
issues.Overloaded,
issues.SessionPoolEmpty,
issues.ConnectionError,
issues.ConnectionLost,
]
_errors_retriable_slow_backoff_idempotent_types = [
issues.Undetermined,
]


def check_retriable_error(err, retry_settings, attempt):
if isinstance(err, issues.Cancelled):
if retry_settings.retry_cancelled:
return ErrorRetryInfo(True, retry_settings.fast_backoff.calc_timeout(attempt))

if isinstance(err, issues.NotFound):
if retry_settings.retry_not_found:
return ErrorRetryInfo(True, retry_settings.fast_backoff.calc_timeout(attempt))
Expand Down Expand Up @@ -54,26 +57,3 @@ def check_retriable_error(err, retry_settings, attempt):
class ErrorRetryInfo:
is_retriable: bool
sleep_timeout_seconds: Optional[float]


def stream_error_converter(exc: BaseException) -> Union[issues.Error, BaseException]:
"""Converts gRPC stream errors to appropriate YDB exception types.

This function takes a base exception and converts specific gRPC aio stream errors
to their corresponding YDB exception types for better error handling and semantic
clarity.

Args:
exc (BaseException): The original exception to potentially convert.

Returns:
BaseException: Either a converted YDB exception or the original exception
if no specific conversion rule applies.
"""
if isinstance(exc, (grpc.RpcError, grpc.aio.AioRpcError)):
if exc.code() == grpc.StatusCode.UNAVAILABLE:
return issues.Unavailable(exc.details() or "")
if exc.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
return issues.DeadlineExceed("Deadline exceeded on request")
return issues.Error("Stream has been terminated. Original exception: {}".format(str(exc.details())))
return exc
3 changes: 3 additions & 0 deletions ydb/_grpc/grpcwrapper/common_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import contextvars
import datetime
import functools
import logging
import typing
from typing import (
Optional,
Expand Down Expand Up @@ -37,6 +38,8 @@
from ...settings import BaseRequestSettings
from ..._constants import DEFAULT_LONG_STREAM_TIMEOUT

logger = logging.getLogger(__name__)


class IFromProto(abc.ABC):
@staticmethod
Expand Down
2 changes: 1 addition & 1 deletion ydb/_topic_reader/topic_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def _init_message(self) -> StreamReadMessage.InitRequest:
)

def _retry_settings(self) -> RetrySettings:
return RetrySettings(idempotent=True)
return RetrySettings(idempotent=True, retry_cancelled=True)


class RetryPolicy:
Expand Down
3 changes: 3 additions & 0 deletions ydb/_topic_reader/topic_reader_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,12 +248,15 @@ async def _connection_loop(self):
self._state_changed.set()
await self._stream_reader.wait_error()
except BaseException as err:
logger.debug("reader %s, attempt %s connection loop error %s", self._id, attempt, err)
retry_info = check_retriable_error(err, self._settings._retry_settings(), attempt)
if not retry_info.is_retriable:
logger.debug("reader %s stop connection loop due to %s", self._id, err)
self._set_first_error(err)
return

logger.debug("sleep before retry for %s seconds", retry_info.sleep_timeout_seconds)

await asyncio.sleep(retry_info.sleep_timeout_seconds)

attempt += 1
Expand Down
5 changes: 3 additions & 2 deletions ydb/_topic_writer/topic_writer_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ def _check_stop(self):
raise self._stop_reason.exception()

async def _connection_loop(self):
retry_settings = RetrySettings() # todo
retry_settings = RetrySettings(retry_cancelled=True) # todo

while True:
attempt = 0 # todo calc and reset
Expand Down Expand Up @@ -485,15 +485,16 @@ async def _connection_loop(self):
except issues.Error as err:
err_info = check_retriable_error(err, retry_settings, attempt)
if not err_info.is_retriable or self._tx is not None: # no retries in tx writer
logger.debug("writer reconnector %s stop connection loop due to %s", self._id, err)
self._stop(err)
return

await asyncio.sleep(err_info.sleep_timeout_seconds)
logger.debug(
"writer reconnector %s retry in %s seconds",
self._id,
err_info.sleep_timeout_seconds,
)
await asyncio.sleep(err_info.sleep_timeout_seconds)

Copy link
Preview

Copilot AI Sep 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving the logging statement before the sleep operation could help with debugging timing issues, but the current placement after sleep makes more sense for clarity - the log message indicates what action will be taken next. However, this creates a gap between the log message about retrying 'in X seconds' and the actual sleep, which could be confusing. Consider updating the log message to reflect that the retry is happening now.

Suggested change
logger.debug(
"writer reconnector %s retrying now",
self._id,
)

Copilot uses AI. Check for mistakes.

except (asyncio.CancelledError, Exception) as err:
self._stop(err)
Expand Down
Loading
Loading