Skip to content

Commit 75c0b18

Browse files
committed
another approach to safe error handling
1 parent da3e7d7 commit 75c0b18

File tree

9 files changed

+62
-33
lines changed

9 files changed

+62
-33
lines changed

tests/slo/playground/configs/chaos.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ do
3131
sh -c "docker stop ${nodeForChaos} -t 10"
3232
sh -c "docker start ${nodeForChaos}"
3333

34-
sleep 60
34+
sleep 30
3535
done
3636

3737
# for i in $(seq 1 3)

tests/slo/slo_runner.sh

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ docker compose -f playground/configs/compose.yaml up -d --wait
33

44
../../.venv/bin/python ./src topic-create grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic
55

6-
# ../../.venv/bin/python ./src topic-run grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic --prom-pgw "" --read-threads 0 --write-rps 1 --time 600 --async
6+
../../.venv/bin/python ./src topic-run grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic --prom-pgw "" --read-threads 0 --write-rps 1 --time 120
77

8-
../../.venv/bin/python ./src topic-run grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic --prom-pgw "" --read-threads 0 --time 10
9-
../../.venv/bin/python ./src topic-run grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic --prom-pgw "" --write-threads 0 --read-rps 1 --time 600
8+
# ../../.venv/bin/python ./src topic-run grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic --prom-pgw "" --read-threads 0 --time 5
9+
# ../../.venv/bin/python ./src topic-run grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic --prom-pgw "" --write-threads 0 --read-rps 1 --time 200

ydb/_grpc/grpcwrapper/common_utils.py

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,6 @@ def __init__(self, convert_server_grpc_to_wrapper):
165165
self._connection_state = "new"
166166
self._stream_call = None
167167
self._wait_executor = None
168-
self._on_disconnected_callback = None
169168

170169
self._stream_settings: BaseRequestSettings = (
171170
BaseRequestSettings()
@@ -197,32 +196,28 @@ def _clean_executor(self, wait: bool):
197196

198197
async def _start_asyncio_driver(self, driver: DriverIO, stub, method):
199198
requests_iterator = QueueToIteratorAsyncIO(self.from_client_grpc)
200-
stream_call, on_disconnected_callback = await driver(
199+
stream_call = await driver(
201200
requests_iterator,
202201
stub,
203202
method,
204203
settings=self._stream_settings,
205-
include_disconnected_callback_to_result=True,
206204
)
207205
self._stream_call = stream_call
208-
self._on_disconnected_callback = on_disconnected_callback
209206
self.from_server_grpc = stream_call.__aiter__()
210207

211208
async def _start_sync_driver(self, driver: Driver, stub, method):
212209
requests_iterator = AsyncQueueToSyncIteratorAsyncIO(self.from_client_grpc)
213210
self._wait_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
214211

215-
stream_call, on_disconnected_callback = await to_thread(
212+
stream_call = await to_thread(
216213
driver,
217214
requests_iterator,
218215
stub,
219216
method,
220217
executor=self._wait_executor,
221218
settings=self._stream_settings,
222-
include_disconnected_callback_to_result=True,
223219
)
224220
self._stream_call = stream_call
225-
self._on_disconnected_callback = on_disconnected_callback
226221
self.from_server_grpc = SyncToAsyncIterator(stream_call.__iter__(), self._wait_executor)
227222

228223
async def receive(self, timeout: Optional[int] = None) -> Any:
@@ -238,11 +233,6 @@ async def get_response():
238233
grpc_message = await asyncio.wait_for(get_response(), timeout)
239234

240235
except (grpc.RpcError, grpc.aio.AioRpcError) as e:
241-
if self._on_disconnected_callback:
242-
coro = self._on_disconnected_callback()
243-
if asyncio.iscoroutine(coro):
244-
await coro
245-
246236
raise connection._rpc_error_handler(self._connection_state, e)
247237

248238
issues._process_response(grpc_message)

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -501,9 +501,6 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMess
501501
) # type: StreamReadMessage.FromServer
502502
except asyncio.TimeoutError:
503503
raise TopicReaderError("Timeout waiting for init response")
504-
except Exception as e:
505-
logger.debug("reader stream %s init request error %s", self._id, e)
506-
raise e
507504

508505
if isinstance(init_response.server_message, StreamReadMessage.InitResponse):
509506
self._session_id = init_response.server_message.session_id

ydb/_topic_writer/topic_writer_asyncio.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -488,12 +488,12 @@ async def _connection_loop(self):
488488
self._stop(err)
489489
return
490490

491-
await asyncio.sleep(err_info.sleep_timeout_seconds)
492491
logger.debug(
493492
"writer reconnector %s retry in %s seconds",
494493
self._id,
495494
err_info.sleep_timeout_seconds,
496495
)
496+
await asyncio.sleep(err_info.sleep_timeout_seconds)
497497

498498
except (asyncio.CancelledError, Exception) as err:
499499
self._stop(err)

ydb/aio/connection.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,28 @@ def future(self, *args, **kwargs):
103103
raise NotImplementedError
104104

105105

106+
class _SafeAsyncIterator:
107+
def __init__(self, resp, rpc_state, on_disconnected_callback):
108+
self.resp = resp
109+
self.it = resp.__aiter__()
110+
self.rpc_state = rpc_state
111+
self.on_disconnected_callback = on_disconnected_callback
112+
113+
def cancel(self):
114+
self.resp.cancel()
115+
return self
116+
117+
def __aiter__(self):
118+
return self
119+
120+
async def __anext__(self):
121+
try:
122+
return await self.it.__anext__()
123+
except grpc.RpcError as rpc_error:
124+
await self.on_disconnected_callback()
125+
raise _rpc_error_handler(self.rpc_state, rpc_error)
126+
127+
106128
class Connection:
107129
__slots__ = (
108130
"endpoint",
@@ -192,6 +214,10 @@ async def __call__(
192214

193215
response = await feature
194216
_log_response(rpc_state, response)
217+
218+
if hasattr(response, "__aiter__"):
219+
response = _SafeAsyncIterator(response, rpc_state, on_disconnected)
220+
195221
return response if wrap_result is None else wrap_result(rpc_state, response, *wrap_args)
196222
except grpc.RpcError as rpc_error:
197223
if on_disconnected:

ydb/aio/pool.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,6 @@ async def __call__(
259259
wrap_args=(),
260260
preferred_endpoint=None,
261261
fast_fail=False,
262-
include_disconnected_callback_to_result=False,
263262
):
264263
if self._stopped:
265264
raise issues.Error("Driver was stopped")
@@ -271,18 +270,14 @@ async def __call__(
271270
self._discovery.notify_disconnected()
272271
raise
273272

274-
on_disconnected = self._on_disconnected(connection)
275-
276273
res = await connection(
277274
request,
278275
stub,
279276
rpc_name,
280277
wrap_result,
281278
settings,
282279
wrap_args,
283-
on_disconnected,
280+
self._on_disconnected(connection),
284281
)
285282

286-
if include_disconnected_callback_to_result:
287-
return res, on_disconnected
288283
return res

ydb/connection.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ def _rpc_error_handler(
7474
:param rpc_error: an underlying rpc error to handle
7575
:param on_disconnected: a handler to call on disconnected connection
7676
"""
77-
logger.info("%s: received error, %s", rpc_state, rpc_error)
77+
logger.debug("%s: received error, %s", rpc_state, rpc_error)
7878
if isinstance(rpc_error, (grpc.RpcError, grpc.aio.AioRpcError, grpc.Call, grpc.aio.Call)):
7979
if rpc_error.code() == grpc.StatusCode.UNAUTHENTICATED:
8080
return issues.Unauthenticated(rpc_error.details())
@@ -325,6 +325,28 @@ def __init__(self, endpoint, node_id):
325325
self.node_id = node_id
326326

327327

328+
class _SafeSyncIterator:
329+
def __init__(self, resp, rpc_state, on_disconnected_callback):
330+
self.resp = resp
331+
self.it = resp.__iter__()
332+
self.rpc_state = rpc_state
333+
self.on_disconnected_callback = on_disconnected_callback
334+
335+
def cancel(self):
336+
self.resp.cancel()
337+
return self
338+
339+
def __iter__(self):
340+
return self
341+
342+
def __next__(self):
343+
try:
344+
return self.it.__next__()
345+
except grpc.RpcError as rpc_error:
346+
self.on_disconnected_callback()
347+
raise _rpc_error_handler(self.rpc_state, rpc_error)
348+
349+
328350
class Connection(object):
329351
__slots__ = (
330352
"endpoint",
@@ -466,6 +488,10 @@ def __call__(
466488
compression=getattr(settings, "compression", None),
467489
)
468490
_log_response(rpc_state, response)
491+
492+
if hasattr(response, "__iter__"):
493+
response = _SafeSyncIterator(response, rpc_state, on_disconnected)
494+
469495
return response if wrap_result is None else wrap_result(rpc_state, response, *wrap_args)
470496
except grpc.RpcError as rpc_error:
471497
raise _rpc_error_handler(rpc_state, rpc_error, on_disconnected)

ydb/pool.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,6 @@ def __call__(
437437
settings=None,
438438
wrap_args=(),
439439
preferred_endpoint=None,
440-
include_disconnected_callback_to_result=False,
441440
):
442441
"""
443442
Synchronously sends request constructed by client library
@@ -462,22 +461,18 @@ def __call__(
462461
self._discovery_thread.notify_disconnected()
463462
raise
464463

465-
on_disconnected = lambda: self._on_disconnected(connection) # noqa: E731
466-
467464
res = connection(
468465
request,
469466
stub,
470467
rpc_name,
471468
wrap_result,
472469
settings,
473470
wrap_args,
474-
on_disconnected,
471+
lambda: self._on_disconnected(connection),
475472
)
476473

477474
tracing.trace(self.tracer, {"response": res}, trace_level=tracing.TraceLevel.DEBUG)
478475

479-
if include_disconnected_callback_to_result:
480-
return res, on_disconnected
481476
return res
482477

483478
@_utilities.wrap_async_call_exceptions

0 commit comments

Comments
 (0)