Skip to content

Commit

Permalink
Fix broken test (ydb-platform#3792)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hor911 authored Apr 17, 2024
1 parent b2c3e7d commit 62e55d6
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 10 deletions.
10 changes: 8 additions & 2 deletions ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -492,8 +492,14 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute

void operator()(NYdb::NTopic::TSessionClosedEvent& ev) {
const auto& LogPrefix = Self.LogPrefix;
SRC_LOG_E("SessionId: " << Self.GetSessionId() << " Read session to topic \"" << Self.SourceParams.GetTopicPath() << "\" was closed: " << ev.DebugString());
Self.Send(Self.ComputeActorId, new TEvAsyncInputError(Self.InputIndex, ev.GetIssues(), NYql::NDqProto::StatusIds::BAD_REQUEST));
TString message = (TStringBuilder() << "Read session to topic \"" << Self.SourceParams.GetTopicPath() << "\" was closed");
SRC_LOG_E("SessionId: " << Self.GetSessionId() << " " << message << ": " << ev.DebugString());
TIssue issue(message);
for (const auto& subIssue : ev.GetIssues()) {
TIssuePtr newIssue(new TIssue(subIssue));
issue.AddSubIssue(newIssue);
}
Self.Send(Self.ComputeActorId, new TEvAsyncInputError(Self.InputIndex, TIssues({issue}), NYql::NDqProto::StatusIds::BAD_REQUEST));
}

void operator()(NYdb::NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent&) { }
Expand Down
17 changes: 9 additions & 8 deletions ydb/tests/fq/pq_async_io/dq_pq_read_actor_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,21 @@ Y_UNIT_TEST_SUITE(TDqPqReadActorTest) {
}

Y_UNIT_TEST_F(ReadNonExistentTopic, TPqIoTestFixture) {
return; // TODO: fix me. test was broken after #2855
const TString topicName = "NonExistentTopic";
InitSource(topicName);

while (true) {
try {
SourceRead<TString>(UVParser);
} catch (yexception& e) {
UNIT_ASSERT_STRING_CONTAINS(e.what(), "Read session to topic \"NonExistentTopic\" was closed");
TInstant deadline = Now() + TDuration::Seconds(5);
auto future = CaSetup->AsyncInputPromises.FatalError.GetFuture();
bool failured = false;
while (Now() < deadline) {
SourceRead<TString>(UVParser);
if (future.HasValue()) {
UNIT_ASSERT_STRING_CONTAINS(future.GetValue().ToOneLineString(), "Read session to topic \"NonExistentTopic\" was closed");
failured = true;
break;
}

sleep(1);
}
UNIT_ASSERT_C(failured, "Failure timeout");
}

Y_UNIT_TEST(TestSaveLoadPqRead) {
Expand Down

0 comments on commit 62e55d6

Please sign in to comment.