Skip to content

Commit 4fcf281

Browse files
authored
Merge pull request #705 from ydb-platform/fix_stream_error_handling
Fix topic stream error handling
2 parents 92b60c2 + e90fad6 commit 4fcf281

File tree

21 files changed

+290
-49
lines changed

21 files changed

+290
-49
lines changed

tests/slo/playground/configs/chaos.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ do
3131
sh -c "docker stop ${nodeForChaos} -t 10"
3232
sh -c "docker start ${nodeForChaos}"
3333

34-
sleep 60
34+
sleep 30
3535
done
3636

3737
# for i in $(seq 1 3)

tests/slo/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
requests==2.28.2
22
ratelimiter==1.2.0.post0
3+
aiolimiter==1.1.0
34
prometheus-client==0.17.0
45
quantile-estimator==0.1.2

tests/slo/slo_runner.sh

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ docker compose -f playground/configs/compose.yaml up -d --wait
33

44
../../.venv/bin/python ./src topic-create grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic
55

6-
../../.venv/bin/python ./src topic-run grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic --prom-pgw "" --read-threads 0 --time 10
6+
../../.venv/bin/python ./src topic-run grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic --prom-pgw "" --read-threads 0 --write-rps 1 --time 120
77

8-
../../.venv/bin/python ./src topic-run grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic --prom-pgw "" --write-threads 0 --read-rps 1 --debug --time 600
8+
# ../../.venv/bin/python ./src topic-run grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic --prom-pgw "" --read-threads 0 --time 5
9+
# ../../.venv/bin/python ./src topic-run grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic --prom-pgw "" --write-threads 0 --read-rps 1 --time 200
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
import asyncio
2+
import ydb.aio
3+
import time
4+
import logging
5+
from aiolimiter import AsyncLimiter
6+
7+
from .base import BaseJobManager
8+
from core.metrics import OP_TYPE_READ, OP_TYPE_WRITE
9+
10+
logger = logging.getLogger(__name__)
11+
12+
13+
class AsyncTopicJobManager(BaseJobManager):
14+
def __init__(self, driver, args, metrics):
15+
super().__init__(driver, args, metrics)
16+
self.driver: ydb.aio.Driver = driver
17+
18+
async def run_tests(self):
19+
tasks = [
20+
*await self._run_topic_write_jobs(),
21+
*await self._run_topic_read_jobs(),
22+
*self._run_metric_job(),
23+
]
24+
25+
await asyncio.gather(*tasks)
26+
27+
async def _run_topic_write_jobs(self):
28+
logger.info("Start async topic write jobs")
29+
30+
write_limiter = AsyncLimiter(max_rate=self.args.write_rps, time_period=1)
31+
32+
tasks = []
33+
for i in range(self.args.write_threads):
34+
task = asyncio.create_task(self._run_topic_writes(write_limiter, i), name=f"slo_topic_write_{i}")
35+
tasks.append(task)
36+
37+
return tasks
38+
39+
async def _run_topic_read_jobs(self):
40+
logger.info("Start async topic read jobs")
41+
42+
read_limiter = AsyncLimiter(max_rate=self.args.read_rps, time_period=1)
43+
44+
tasks = []
45+
for i in range(self.args.read_threads):
46+
task = asyncio.create_task(self._run_topic_reads(read_limiter), name=f"slo_topic_read_{i}")
47+
tasks.append(task)
48+
49+
return tasks
50+
51+
async def _run_topic_writes(self, limiter, partition_id=None):
52+
start_time = time.time()
53+
logger.info("Start async topic write workload")
54+
55+
async with self.driver.topic_client.writer(
56+
self.args.path,
57+
codec=ydb.TopicCodec.GZIP,
58+
partition_id=partition_id,
59+
) as writer:
60+
logger.info("Async topic writer created")
61+
62+
message_count = 0
63+
while time.time() - start_time < self.args.time:
64+
async with limiter:
65+
message_count += 1
66+
67+
content = f"message_{message_count}_{asyncio.current_task().get_name()}".encode("utf-8")
68+
69+
if len(content) < self.args.message_size:
70+
content += b"x" * (self.args.message_size - len(content))
71+
72+
message = ydb.TopicWriterMessage(data=content)
73+
74+
ts = self.metrics.start((OP_TYPE_WRITE,))
75+
try:
76+
await writer.write_with_ack(message)
77+
logger.info("Write message: %s", content)
78+
self.metrics.stop((OP_TYPE_WRITE,), ts)
79+
except Exception as e:
80+
self.metrics.stop((OP_TYPE_WRITE,), ts, error=e)
81+
logger.error("Write error: %s", e)
82+
83+
logger.info("Stop async topic write workload")
84+
85+
async def _run_topic_reads(self, limiter):
86+
start_time = time.time()
87+
logger.info("Start async topic read workload")
88+
89+
async with self.driver.topic_client.reader(
90+
self.args.path,
91+
self.args.consumer,
92+
) as reader:
93+
logger.info("Async topic reader created")
94+
95+
while time.time() - start_time < self.args.time:
96+
async with limiter:
97+
ts = self.metrics.start((OP_TYPE_READ,))
98+
try:
99+
msg = await reader.receive_message()
100+
if msg is not None:
101+
logger.info("Read message: %s", msg.data.decode())
102+
await reader.commit_with_ack(msg)
103+
104+
self.metrics.stop((OP_TYPE_READ,), ts)
105+
except Exception as e:
106+
self.metrics.stop((OP_TYPE_READ,), ts, error=e)
107+
logger.error("Read error: %s", e)
108+
109+
logger.info("Stop async topic read workload")
110+
111+
def _run_metric_job(self):
112+
if not self.args.prom_pgw:
113+
return []
114+
115+
# Create async task for metrics
116+
task = asyncio.create_task(self._async_metric_sender(self.args.time), name="slo_metrics_sender")
117+
return [task]
118+
119+
async def _async_metric_sender(self, runtime):
120+
start_time = time.time()
121+
logger.info("Start push metrics (async)")
122+
123+
limiter = AsyncLimiter(max_rate=10**6 // self.args.report_period, time_period=1)
124+
125+
while time.time() - start_time < runtime:
126+
async with limiter:
127+
# Call sync metrics.push() in executor to avoid blocking
128+
await asyncio.get_event_loop().run_in_executor(None, self.metrics.push)
129+
130+
logger.info("Stop push metrics (async)")

tests/slo/src/options.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ def add_common_options(parser):
66
parser.add_argument("db", help="YDB database name")
77
parser.add_argument("-t", "--table-name", default="key_value", help="Table name")
88
parser.add_argument("--debug", action="store_true", help="Enable debug logging")
9+
parser.add_argument("--async", action="store_true", help="Use async mode for operations")
910

1011

1112
def make_table_create_parser(subparsers):

tests/slo/src/root_runner.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
import asyncio
12
import ydb
3+
import ydb.aio
24
import logging
35
from typing import Dict
46

@@ -26,6 +28,15 @@ def run_command(self, args):
2628
raise ValueError(f"Unknown prefix: {prefix}. Available: {list(self.runners.keys())}")
2729

2830
runner_instance = self.runners[prefix]()
31+
32+
# Check if async mode is requested and command is 'run'
33+
if getattr(args, "async", False) and command == "run":
34+
asyncio.run(self._run_async_command(args, runner_instance, command))
35+
else:
36+
self._run_sync_command(args, runner_instance, command)
37+
38+
def _run_sync_command(self, args, runner_instance, command):
39+
"""Run command in synchronous mode"""
2940
driver_config = ydb.DriverConfig(
3041
args.endpoint,
3142
database=args.db,
@@ -43,13 +54,33 @@ def run_command(self, args):
4354
elif command == "cleanup":
4455
runner_instance.cleanup(args)
4556
else:
46-
raise RuntimeError(f"Unknown command {command} for prefix {prefix}")
57+
raise RuntimeError(f"Unknown command {command} for prefix {runner_instance.prefix}")
4758
except BaseException:
4859
logger.exception("Something went wrong")
4960
raise
5061
finally:
5162
driver.stop(timeout=getattr(args, "shutdown_time", 10))
5263

64+
async def _run_async_command(self, args, runner_instance, command):
65+
"""Run command in asynchronous mode"""
66+
driver_config = ydb.DriverConfig(
67+
args.endpoint,
68+
database=args.db,
69+
grpc_keep_alive_timeout=5000,
70+
)
71+
72+
async with ydb.aio.Driver(driver_config) as driver:
73+
await driver.wait(timeout=300)
74+
try:
75+
runner_instance.set_driver(driver)
76+
if command == "run":
77+
await runner_instance.run_async(args)
78+
else:
79+
raise RuntimeError(f"Async mode only supports 'run' command, got '{command}'")
80+
except BaseException:
81+
logger.exception("Something went wrong in async mode")
82+
raise
83+
5384

5485
def create_runner() -> SLORunner:
5586
runner = SLORunner()

tests/slo/src/runners/base.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ def create(self, args):
2424
def run(self, args):
2525
pass
2626

27+
async def run_async(self, args):
28+
raise NotImplementedError(f"Async mode not supported for {self.prefix}")
29+
2730
@abstractmethod
2831
def cleanup(self, args):
2932
pass

tests/slo/src/runners/topic_runner.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import time
22
import ydb
3+
import ydb.aio
34

45
from .base import BaseRunner
56
from jobs.topic_jobs import TopicJobManager
7+
from jobs.async_topic_jobs import AsyncTopicJobManager
68
from core.metrics import create_metrics
79

810

@@ -76,6 +78,21 @@ def run(self, args):
7678
if hasattr(metrics, "reset"):
7779
metrics.reset()
7880

81+
async def run_async(self, args):
82+
"""Async version of topic SLO tests using ydb.aio.Driver"""
83+
metrics = create_metrics(args.prom_pgw)
84+
85+
self.logger.info("Starting async topic SLO tests")
86+
87+
# Use async driver for topic operations
88+
job_manager = AsyncTopicJobManager(self.driver, args, metrics)
89+
await job_manager.run_tests()
90+
91+
self.logger.info("Async topic SLO tests completed")
92+
93+
if hasattr(metrics, "reset"):
94+
metrics.reset()
95+
7996
def cleanup(self, args):
8097
self.logger.info("Cleaning up topic: %s", args.path)
8198

ydb/_errors.py

Lines changed: 6 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
from dataclasses import dataclass
2-
from typing import Optional, Union
3-
4-
import grpc
2+
from typing import Optional
53

64
from . import issues
75

@@ -15,13 +13,18 @@
1513
issues.Overloaded,
1614
issues.SessionPoolEmpty,
1715
issues.ConnectionError,
16+
issues.ConnectionLost,
1817
]
1918
_errors_retriable_slow_backoff_idempotent_types = [
2019
issues.Undetermined,
2120
]
2221

2322

2423
def check_retriable_error(err, retry_settings, attempt):
24+
if isinstance(err, issues.Cancelled):
25+
if retry_settings.retry_cancelled:
26+
return ErrorRetryInfo(True, retry_settings.fast_backoff.calc_timeout(attempt))
27+
2528
if isinstance(err, issues.NotFound):
2629
if retry_settings.retry_not_found:
2730
return ErrorRetryInfo(True, retry_settings.fast_backoff.calc_timeout(attempt))
@@ -54,26 +57,3 @@ def check_retriable_error(err, retry_settings, attempt):
5457
class ErrorRetryInfo:
5558
is_retriable: bool
5659
sleep_timeout_seconds: Optional[float]
57-
58-
59-
def stream_error_converter(exc: BaseException) -> Union[issues.Error, BaseException]:
60-
"""Converts gRPC stream errors to appropriate YDB exception types.
61-
62-
This function takes a base exception and converts specific gRPC aio stream errors
63-
to their corresponding YDB exception types for better error handling and semantic
64-
clarity.
65-
66-
Args:
67-
exc (BaseException): The original exception to potentially convert.
68-
69-
Returns:
70-
BaseException: Either a converted YDB exception or the original exception
71-
if no specific conversion rule applies.
72-
"""
73-
if isinstance(exc, (grpc.RpcError, grpc.aio.AioRpcError)):
74-
if exc.code() == grpc.StatusCode.UNAVAILABLE:
75-
return issues.Unavailable(exc.details() or "")
76-
if exc.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
77-
return issues.DeadlineExceed("Deadline exceeded on request")
78-
return issues.Error("Stream has been terminated. Original exception: {}".format(str(exc.details())))
79-
return exc

ydb/_grpc/grpcwrapper/common_utils.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import contextvars
77
import datetime
88
import functools
9+
import logging
910
import typing
1011
from typing import (
1112
Optional,
@@ -37,6 +38,8 @@
3738
from ...settings import BaseRequestSettings
3839
from ..._constants import DEFAULT_LONG_STREAM_TIMEOUT
3940

41+
logger = logging.getLogger(__name__)
42+
4043

4144
class IFromProto(abc.ABC):
4245
@staticmethod

0 commit comments

Comments
 (0)