Skip to content

Commit da3e7d7

Browse files
committed
async slo jobs for local testing
1 parent 5c25731 commit da3e7d7

File tree

12 files changed

+202
-18
lines changed

12 files changed

+202
-18
lines changed

tests/slo/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
requests==2.28.2
22
ratelimiter==1.2.0.post0
3+
aiolimiter==1.1.0
34
prometheus-client==0.17.0
45
quantile-estimator==0.1.2

tests/slo/slo_runner.sh

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ docker compose -f playground/configs/compose.yaml up -d --wait
33

44
../../.venv/bin/python ./src topic-create grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic
55

6-
../../.venv/bin/python ./src topic-run grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic --prom-pgw "" --read-threads 0 --time 10
6+
# ../../.venv/bin/python ./src topic-run grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic --prom-pgw "" --read-threads 0 --write-rps 1 --time 600 --async
77

8-
../../.venv/bin/python ./src topic-run grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic --prom-pgw "" --write-threads 0 --read-rps 1 --debug --time 600
8+
../../.venv/bin/python ./src topic-run grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic --prom-pgw "" --read-threads 0 --time 10
9+
../../.venv/bin/python ./src topic-run grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic --prom-pgw "" --write-threads 0 --read-rps 1 --time 600
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
import asyncio
2+
import ydb.aio
3+
import time
4+
import logging
5+
from aiolimiter import AsyncLimiter
6+
7+
from .base import BaseJobManager
8+
from core.metrics import OP_TYPE_READ, OP_TYPE_WRITE
9+
10+
logger = logging.getLogger(__name__)
11+
12+
13+
class AsyncTopicJobManager(BaseJobManager):
14+
def __init__(self, driver, args, metrics):
15+
super().__init__(driver, args, metrics)
16+
self.driver: ydb.aio.Driver = driver
17+
18+
async def run_tests(self):
19+
tasks = [
20+
*await self._run_topic_write_jobs(),
21+
*await self._run_topic_read_jobs(),
22+
*self._run_metric_job(),
23+
]
24+
25+
await asyncio.gather(*tasks)
26+
27+
async def _run_topic_write_jobs(self):
28+
logger.info("Start async topic write jobs")
29+
30+
write_limiter = AsyncLimiter(max_rate=self.args.write_rps, time_period=1)
31+
32+
tasks = []
33+
for i in range(self.args.write_threads):
34+
task = asyncio.create_task(self._run_topic_writes(write_limiter, i), name=f"slo_topic_write_{i}")
35+
tasks.append(task)
36+
37+
return tasks
38+
39+
async def _run_topic_read_jobs(self):
40+
logger.info("Start async topic read jobs")
41+
42+
read_limiter = AsyncLimiter(max_rate=self.args.read_rps, time_period=1)
43+
44+
tasks = []
45+
for i in range(self.args.read_threads):
46+
task = asyncio.create_task(self._run_topic_reads(read_limiter), name=f"slo_topic_read_{i}")
47+
tasks.append(task)
48+
49+
return tasks
50+
51+
async def _run_topic_writes(self, limiter, partition_id=None):
52+
start_time = time.time()
53+
logger.info("Start async topic write workload")
54+
55+
async with self.driver.topic_client.writer(
56+
self.args.path,
57+
codec=ydb.TopicCodec.GZIP,
58+
partition_id=partition_id,
59+
) as writer:
60+
logger.info("Async topic writer created")
61+
62+
message_count = 0
63+
while time.time() - start_time < self.args.time:
64+
async with limiter:
65+
message_count += 1
66+
67+
content = f"message_{message_count}_{asyncio.current_task().get_name()}".encode("utf-8")
68+
69+
if len(content) < self.args.message_size:
70+
content += b"x" * (self.args.message_size - len(content))
71+
72+
message = ydb.TopicWriterMessage(data=content)
73+
74+
ts = self.metrics.start((OP_TYPE_WRITE,))
75+
try:
76+
await writer.write_with_ack(message)
77+
logger.info("Write message: %s", content)
78+
self.metrics.stop((OP_TYPE_WRITE,), ts)
79+
except Exception as e:
80+
self.metrics.stop((OP_TYPE_WRITE,), ts, error=e)
81+
logger.error("Write error: %s", e)
82+
83+
logger.info("Stop async topic write workload")
84+
85+
async def _run_topic_reads(self, limiter):
86+
start_time = time.time()
87+
logger.info("Start async topic read workload")
88+
89+
async with self.driver.topic_client.reader(
90+
self.args.path,
91+
self.args.consumer,
92+
) as reader:
93+
logger.info("Async topic reader created")
94+
95+
while time.time() - start_time < self.args.time:
96+
async with limiter:
97+
ts = self.metrics.start((OP_TYPE_READ,))
98+
try:
99+
msg = await reader.receive_message()
100+
if msg is not None:
101+
logger.info("Read message: %s", msg.data.decode())
102+
await reader.commit_with_ack(msg)
103+
104+
self.metrics.stop((OP_TYPE_READ,), ts)
105+
except Exception as e:
106+
self.metrics.stop((OP_TYPE_READ,), ts, error=e)
107+
logger.error("Read error: %s", e)
108+
109+
logger.info("Stop async topic read workload")
110+
111+
def _run_metric_job(self):
112+
if not self.args.prom_pgw:
113+
return []
114+
115+
# Create async task for metrics
116+
task = asyncio.create_task(self._async_metric_sender(self.args.time), name="slo_metrics_sender")
117+
return [task]
118+
119+
async def _async_metric_sender(self, runtime):
120+
start_time = time.time()
121+
logger.info("Start push metrics (async)")
122+
123+
limiter = AsyncLimiter(max_rate=10**6 // self.args.report_period, time_period=1)
124+
125+
while time.time() - start_time < runtime:
126+
async with limiter:
127+
# Call sync metrics.push() in executor to avoid blocking
128+
await asyncio.get_event_loop().run_in_executor(None, self.metrics.push)
129+
130+
logger.info("Stop push metrics (async)")

tests/slo/src/options.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ def add_common_options(parser):
66
parser.add_argument("db", help="YDB database name")
77
parser.add_argument("-t", "--table-name", default="key_value", help="Table name")
88
parser.add_argument("--debug", action="store_true", help="Enable debug logging")
9+
parser.add_argument("--async", action="store_true", help="Use async mode for operations")
910

1011

1112
def make_table_create_parser(subparsers):

tests/slo/src/root_runner.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
import asyncio
12
import ydb
3+
import ydb.aio
24
import logging
35
from typing import Dict
46

@@ -26,6 +28,15 @@ def run_command(self, args):
2628
raise ValueError(f"Unknown prefix: {prefix}. Available: {list(self.runners.keys())}")
2729

2830
runner_instance = self.runners[prefix]()
31+
32+
# Check if async mode is requested and command is 'run'
33+
if getattr(args, "async", False) and command == "run":
34+
asyncio.run(self._run_async_command(args, runner_instance, command))
35+
else:
36+
self._run_sync_command(args, runner_instance, command)
37+
38+
def _run_sync_command(self, args, runner_instance, command):
39+
"""Run command in synchronous mode"""
2940
driver_config = ydb.DriverConfig(
3041
args.endpoint,
3142
database=args.db,
@@ -43,13 +54,33 @@ def run_command(self, args):
4354
elif command == "cleanup":
4455
runner_instance.cleanup(args)
4556
else:
46-
raise RuntimeError(f"Unknown command {command} for prefix {prefix}")
57+
raise RuntimeError(f"Unknown command {command} for prefix {runner_instance.prefix}")
4758
except BaseException:
4859
logger.exception("Something went wrong")
4960
raise
5061
finally:
5162
driver.stop(timeout=getattr(args, "shutdown_time", 10))
5263

64+
async def _run_async_command(self, args, runner_instance, command):
65+
"""Run command in asynchronous mode"""
66+
driver_config = ydb.DriverConfig(
67+
args.endpoint,
68+
database=args.db,
69+
grpc_keep_alive_timeout=5000,
70+
)
71+
72+
async with ydb.aio.Driver(driver_config) as driver:
73+
await driver.wait(timeout=300)
74+
try:
75+
runner_instance.set_driver(driver)
76+
if command == "run":
77+
await runner_instance.run_async(args)
78+
else:
79+
raise RuntimeError(f"Async mode only supports 'run' command, got '{command}'")
80+
except BaseException:
81+
logger.exception("Something went wrong in async mode")
82+
raise
83+
5384

5485
def create_runner() -> SLORunner:
5586
runner = SLORunner()

tests/slo/src/runners/base.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ def create(self, args):
2424
def run(self, args):
2525
pass
2626

27+
async def run_async(self, args):
28+
raise NotImplementedError(f"Async mode not supported for {self.prefix}")
29+
2730
@abstractmethod
2831
def cleanup(self, args):
2932
pass

tests/slo/src/runners/topic_runner.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import time
22
import ydb
3+
import ydb.aio
34

45
from .base import BaseRunner
56
from jobs.topic_jobs import TopicJobManager
7+
from jobs.async_topic_jobs import AsyncTopicJobManager
68
from core.metrics import create_metrics
79

810

@@ -76,6 +78,21 @@ def run(self, args):
7678
if hasattr(metrics, "reset"):
7779
metrics.reset()
7880

81+
async def run_async(self, args):
82+
"""Async version of topic SLO tests using ydb.aio.Driver"""
83+
metrics = create_metrics(args.prom_pgw)
84+
85+
self.logger.info("Starting async topic SLO tests")
86+
87+
# Use async driver for topic operations
88+
job_manager = AsyncTopicJobManager(self.driver, args, metrics)
89+
await job_manager.run_tests()
90+
91+
self.logger.info("Async topic SLO tests completed")
92+
93+
if hasattr(metrics, "reset"):
94+
metrics.reset()
95+
7996
def cleanup(self, args):
8097
self.logger.info("Cleaning up topic: %s", args.path)
8198

ydb/_grpc/grpcwrapper/common_utils.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ def __init__(self, convert_server_grpc_to_wrapper):
165165
self._connection_state = "new"
166166
self._stream_call = None
167167
self._wait_executor = None
168-
self._on_disconnected_lambda = None
168+
self._on_disconnected_callback = None
169169

170170
self._stream_settings: BaseRequestSettings = (
171171
BaseRequestSettings()
@@ -197,32 +197,32 @@ def _clean_executor(self, wait: bool):
197197

198198
async def _start_asyncio_driver(self, driver: DriverIO, stub, method):
199199
requests_iterator = QueueToIteratorAsyncIO(self.from_client_grpc)
200-
stream_call, on_disconnected_lambda = await driver(
200+
stream_call, on_disconnected_callback = await driver(
201201
requests_iterator,
202202
stub,
203203
method,
204204
settings=self._stream_settings,
205-
include_disconnected_lambda_to_result=True,
205+
include_disconnected_callback_to_result=True,
206206
)
207207
self._stream_call = stream_call
208-
self._on_disconnected_lambda = on_disconnected_lambda
208+
self._on_disconnected_callback = on_disconnected_callback
209209
self.from_server_grpc = stream_call.__aiter__()
210210

211211
async def _start_sync_driver(self, driver: Driver, stub, method):
212212
requests_iterator = AsyncQueueToSyncIteratorAsyncIO(self.from_client_grpc)
213213
self._wait_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
214214

215-
stream_call, on_disconnected_lambda = await to_thread(
215+
stream_call, on_disconnected_callback = await to_thread(
216216
driver,
217217
requests_iterator,
218218
stub,
219219
method,
220220
executor=self._wait_executor,
221221
settings=self._stream_settings,
222-
include_disconnected_lambda_to_result=True,
222+
include_disconnected_callback_to_result=True,
223223
)
224224
self._stream_call = stream_call
225-
self._on_disconnected_lambda = on_disconnected_lambda
225+
self._on_disconnected_callback = on_disconnected_callback
226226
self.from_server_grpc = SyncToAsyncIterator(stream_call.__iter__(), self._wait_executor)
227227

228228
async def receive(self, timeout: Optional[int] = None) -> Any:
@@ -238,8 +238,8 @@ async def get_response():
238238
grpc_message = await asyncio.wait_for(get_response(), timeout)
239239

240240
except (grpc.RpcError, grpc.aio.AioRpcError) as e:
241-
if self._on_disconnected_lambda:
242-
coro = self._on_disconnected_lambda()
241+
if self._on_disconnected_callback:
242+
coro = self._on_disconnected_callback()
243243
if asyncio.iscoroutine(coro):
244244
await coro
245245

ydb/aio/connection.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ async def close(self, grace: float = 30):
235235
been terminated are cancelled. If grace is None, this method will wait until all tasks are finished.
236236
:return: None
237237
"""
238-
logger.info("Closing channel for endpoint %s", self.endpoint)
238+
logger.debug("Closing channel for endpoint %s", self.endpoint)
239239

240240
self.closing = True
241241

ydb/aio/pool.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ async def __call__(
259259
wrap_args=(),
260260
preferred_endpoint=None,
261261
fast_fail=False,
262-
include_disconnected_lambda_to_result=False,
262+
include_disconnected_callback_to_result=False,
263263
):
264264
if self._stopped:
265265
raise issues.Error("Driver was stopped")
@@ -283,6 +283,6 @@ async def __call__(
283283
on_disconnected,
284284
)
285285

286-
if include_disconnected_lambda_to_result:
286+
if include_disconnected_callback_to_result:
287287
return res, on_disconnected
288288
return res

0 commit comments

Comments
 (0)