Skip to content

Commit 03dba46

Browse files
committed
another approach to safe error handling
1 parent da3e7d7 commit 03dba46

File tree

13 files changed

+86
-71
lines changed

13 files changed

+86
-71
lines changed

tests/slo/playground/configs/chaos.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ do
3131
sh -c "docker stop ${nodeForChaos} -t 10"
3232
sh -c "docker start ${nodeForChaos}"
3333

34-
sleep 60
34+
sleep 30
3535
done
3636

3737
# for i in $(seq 1 3)

tests/slo/slo_runner.sh

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ docker compose -f playground/configs/compose.yaml up -d --wait
33

44
../../.venv/bin/python ./src topic-create grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic
55

6-
# ../../.venv/bin/python ./src topic-run grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic --prom-pgw "" --read-threads 0 --write-rps 1 --time 600 --async
6+
../../.venv/bin/python ./src topic-run grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic --prom-pgw "" --read-threads 0 --write-rps 1 --time 120
77

8-
../../.venv/bin/python ./src topic-run grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic --prom-pgw "" --read-threads 0 --time 10
9-
../../.venv/bin/python ./src topic-run grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic --prom-pgw "" --write-threads 0 --read-rps 1 --time 600
8+
# ../../.venv/bin/python ./src topic-run grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic --prom-pgw "" --read-threads 0 --time 5
9+
# ../../.venv/bin/python ./src topic-run grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic --prom-pgw "" --write-threads 0 --read-rps 1 --time 200

ydb/_errors.py

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
from dataclasses import dataclass
2-
from typing import Optional, Union
3-
4-
import grpc
2+
from typing import Optional
53

64
from . import issues
75

86
_errors_retriable_fast_backoff_types = [
97
issues.Unavailable,
108
issues.ClientInternalError,
9+
issues.ConnectionLost,
1110
]
1211
_errors_retriable_slow_backoff_types = [
1312
issues.Aborted,
@@ -54,26 +53,3 @@ def check_retriable_error(err, retry_settings, attempt):
5453
class ErrorRetryInfo:
5554
is_retriable: bool
5655
sleep_timeout_seconds: Optional[float]
57-
58-
59-
def stream_error_converter(exc: BaseException) -> Union[issues.Error, BaseException]:
60-
"""Converts gRPC stream errors to appropriate YDB exception types.
61-
62-
This function takes a base exception and converts specific gRPC aio stream errors
63-
to their corresponding YDB exception types for better error handling and semantic
64-
clarity.
65-
66-
Args:
67-
exc (BaseException): The original exception to potentially convert.
68-
69-
Returns:
70-
BaseException: Either a converted YDB exception or the original exception
71-
if no specific conversion rule applies.
72-
"""
73-
if isinstance(exc, (grpc.RpcError, grpc.aio.AioRpcError)):
74-
if exc.code() == grpc.StatusCode.UNAVAILABLE:
75-
return issues.Unavailable(exc.details() or "")
76-
if exc.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
77-
return issues.DeadlineExceed("Deadline exceeded on request")
78-
return issues.Error("Stream has been terminated. Original exception: {}".format(str(exc.details())))
79-
return exc

ydb/_grpc/grpcwrapper/common_utils.py

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,6 @@ def __init__(self, convert_server_grpc_to_wrapper):
165165
self._connection_state = "new"
166166
self._stream_call = None
167167
self._wait_executor = None
168-
self._on_disconnected_callback = None
169168

170169
self._stream_settings: BaseRequestSettings = (
171170
BaseRequestSettings()
@@ -197,32 +196,28 @@ def _clean_executor(self, wait: bool):
197196

198197
async def _start_asyncio_driver(self, driver: DriverIO, stub, method):
199198
requests_iterator = QueueToIteratorAsyncIO(self.from_client_grpc)
200-
stream_call, on_disconnected_callback = await driver(
199+
stream_call = await driver(
201200
requests_iterator,
202201
stub,
203202
method,
204203
settings=self._stream_settings,
205-
include_disconnected_callback_to_result=True,
206204
)
207205
self._stream_call = stream_call
208-
self._on_disconnected_callback = on_disconnected_callback
209206
self.from_server_grpc = stream_call.__aiter__()
210207

211208
async def _start_sync_driver(self, driver: Driver, stub, method):
212209
requests_iterator = AsyncQueueToSyncIteratorAsyncIO(self.from_client_grpc)
213210
self._wait_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
214211

215-
stream_call, on_disconnected_callback = await to_thread(
212+
stream_call = await to_thread(
216213
driver,
217214
requests_iterator,
218215
stub,
219216
method,
220217
executor=self._wait_executor,
221218
settings=self._stream_settings,
222-
include_disconnected_callback_to_result=True,
223219
)
224220
self._stream_call = stream_call
225-
self._on_disconnected_callback = on_disconnected_callback
226221
self.from_server_grpc = SyncToAsyncIterator(stream_call.__iter__(), self._wait_executor)
227222

228223
async def receive(self, timeout: Optional[int] = None) -> Any:
@@ -238,11 +233,6 @@ async def get_response():
238233
grpc_message = await asyncio.wait_for(get_response(), timeout)
239234

240235
except (grpc.RpcError, grpc.aio.AioRpcError) as e:
241-
if self._on_disconnected_callback:
242-
coro = self._on_disconnected_callback()
243-
if asyncio.iscoroutine(coro):
244-
await coro
245-
246236
raise connection._rpc_error_handler(self._connection_state, e)
247237

248238
issues._process_response(grpc_message)

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -501,9 +501,6 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMess
501501
) # type: StreamReadMessage.FromServer
502502
except asyncio.TimeoutError:
503503
raise TopicReaderError("Timeout waiting for init response")
504-
except Exception as e:
505-
logger.debug("reader stream %s init request error %s", self._id, e)
506-
raise e
507504

508505
if isinstance(init_response.server_message, StreamReadMessage.InitResponse):
509506
self._session_id = init_response.server_message.session_id

ydb/_topic_writer/topic_writer_asyncio.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -488,12 +488,12 @@ async def _connection_loop(self):
488488
self._stop(err)
489489
return
490490

491-
await asyncio.sleep(err_info.sleep_timeout_seconds)
492491
logger.debug(
493492
"writer reconnector %s retry in %s seconds",
494493
self._id,
495494
err_info.sleep_timeout_seconds,
496495
)
496+
await asyncio.sleep(err_info.sleep_timeout_seconds)
497497

498498
except (asyncio.CancelledError, Exception) as err:
499499
self._stop(err)

ydb/aio/_utilities.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,9 @@
22

33

44
class AsyncResponseIterator(object):
5-
def __init__(self, it, wrapper, error_converter=None):
5+
def __init__(self, it, wrapper):
66
self.it = it.__aiter__()
77
self.wrapper = wrapper
8-
self.error_converter = error_converter
98

109
def cancel(self):
1110
self.it.cancel()
@@ -18,12 +17,7 @@ def __aiter__(self):
1817
return self
1918

2019
async def _next(self):
21-
try:
22-
res = self.wrapper(await self.it.__anext__())
23-
except BaseException as e:
24-
if self.error_converter:
25-
raise self.error_converter(e) from e
26-
raise e
20+
res = self.wrapper(await self.it.__anext__())
2721

2822
if res is not None:
2923
return res

ydb/aio/connection.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
_log_request,
1212
_log_response,
1313
_rpc_error_handler,
14+
_is_disconnect_needed,
1415
_get_request_timeout,
1516
_set_server_timeouts,
1617
_RpcState as RpcState,
@@ -103,6 +104,30 @@ def future(self, *args, **kwargs):
103104
raise NotImplementedError
104105

105106

107+
class _SafeAsyncIterator:
108+
def __init__(self, resp, rpc_state, on_disconnected_callback):
109+
self.resp = resp
110+
self.it = resp.__aiter__()
111+
self.rpc_state = rpc_state
112+
self.on_disconnected_callback = on_disconnected_callback
113+
114+
def cancel(self):
115+
self.resp.cancel()
116+
return self
117+
118+
def __aiter__(self):
119+
return self
120+
121+
async def __anext__(self):
122+
try:
123+
return await self.it.__anext__()
124+
except grpc.RpcError as rpc_error:
125+
ydb_error = _rpc_error_handler(self.rpc_state, rpc_error, use_unavailable=True)
126+
if _is_disconnect_needed(ydb_error):
127+
await self.on_disconnected_callback()
128+
raise ydb_error
129+
130+
106131
class Connection:
107132
__slots__ = (
108133
"endpoint",
@@ -192,6 +217,10 @@ async def __call__(
192217

193218
response = await feature
194219
_log_response(rpc_state, response)
220+
221+
if hasattr(response, "__aiter__"):
222+
response = _SafeAsyncIterator(response, rpc_state, on_disconnected)
223+
195224
return response if wrap_result is None else wrap_result(rpc_state, response, *wrap_args)
196225
except grpc.RpcError as rpc_error:
197226
if on_disconnected:

ydb/aio/pool.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,6 @@ async def __call__(
259259
wrap_args=(),
260260
preferred_endpoint=None,
261261
fast_fail=False,
262-
include_disconnected_callback_to_result=False,
263262
):
264263
if self._stopped:
265264
raise issues.Error("Driver was stopped")
@@ -271,18 +270,14 @@ async def __call__(
271270
self._discovery.notify_disconnected()
272271
raise
273272

274-
on_disconnected = self._on_disconnected(connection)
275-
276273
res = await connection(
277274
request,
278275
stub,
279276
rpc_name,
280277
wrap_result,
281278
settings,
282279
wrap_args,
283-
on_disconnected,
280+
self._on_disconnected(connection),
284281
)
285282

286-
if include_disconnected_callback_to_result:
287-
return res, on_disconnected
288283
return res

ydb/aio/query/session.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
)
2424

2525
from ..._constants import DEFAULT_INITIAL_RESPONSE_TIMEOUT
26-
from ..._errors import stream_error_converter
2726

2827

2928
class QuerySession(BaseQuerySession):
@@ -164,7 +163,6 @@ async def execute(
164163
session=self,
165164
settings=self._settings,
166165
),
167-
error_converter=stream_error_converter,
168166
)
169167

170168
async def explain(

0 commit comments

Comments
 (0)