From f76e6ef796a76385e3a22e1d7ff0e32f6d5e85cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=90=D0=BB=D0=B5=D0=BA=D1=81=D0=B0=D0=BD=D0=B4=D1=80=20?= =?UTF-8?q?=D0=9D=D0=BE=D0=B2=D0=BE=D0=B6=D0=B8=D0=BB=D0=BE=D0=B2?= Date: Fri, 26 Apr 2024 10:57:18 +0300 Subject: [PATCH] RPC Reader now retrying on DQ (#4035) --- .../providers/dq/task_runner_actor/task_runner_actor.cpp | 7 ++++++- .../yql/providers/yt/comp_nodes/dq/dq_yt_rpc_reader.cpp | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp index 65fce0604648..df705389bf52 100644 --- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp +++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp @@ -255,11 +255,12 @@ class TTaskRunnerActor bool fallback = false; bool retry = false; + bool rpcReaderFalledBack = false; for (TStringBuf line: StringSplitter(input).SplitByString("\n")) { if (line.Contains("mlockall failed")) { // skip } else { - if (!fallback) { + if (!fallback || rpcReaderFalledBack) { if (line.Contains("FindColumnInfo(): requirement memberType->GetKind() == TType::EKind::Data")) { // YQL-14757: temporary workaround for part6/produce-reduce_lambda_list_table-default.txt fallback = true; @@ -272,6 +273,10 @@ class TTaskRunnerActor } else if (line.Contains("YT RPC Reader exception:")) { // RPC reader fallback to YT fallback = true; + rpcReaderFalledBack = true; + } else if (line.Contains("Attachments stream write timed out") || line.Contains("No alive peers found")) { + // RPC reader DQ retry + retry = true; } else if (line.Contains("Transaction") && line.Contains("aborted")) { // YQL-15542 fallback = true; diff --git a/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_rpc_reader.cpp b/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_rpc_reader.cpp index 81b82ef64960..e3a815c7aa33 100644 --- a/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_rpc_reader.cpp +++ b/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_rpc_reader.cpp @@ -86,7 +86,7 @@ void TParallelFileInputState::Finish() { void TParallelFileInputState::CheckError() const { if (!InnerState_->Error.IsOK()) { - Cerr << "YT RPC Reader exception:\n"; + Cerr << "YT RPC Reader exception:\n" << InnerState_->Error.GetMessage(); InnerState_->Error.ThrowOnError(); } }