Skip to content

Commit 3a0899e

Browse files
authored
Fixed double closing of the read session (#24717)
2 parents b2d3af5 + 211e58e commit 3a0899e

File tree

2 files changed

+28
-16
lines changed

2 files changed

+28
-16
lines changed

ydb/services/persqueue_v1/actors/read_session_actor.cpp

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -690,7 +690,9 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvCommitDone::
690690
<< ", from# " << msg->StartCookie
691691
<< ", to# " << msg->LastCookie
692692
<< ", offset# " << partition.Offset);
693-
WriteToStreamOrDie(ctx, std::move(result));
693+
if (!WriteToStreamOrDie(ctx, std::move(result))) {
694+
return;
695+
}
694696

695697
NotifyChildren(partition, ctx);
696698
}
@@ -1098,8 +1100,8 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvAuthResultOk
10981100

10991101
if (IsQuotaRequired()) {
11001102
Y_ABORT_UNLESS(MaybeRequestQuota(1, EWakeupTag::RlInit, ctx));
1101-
} else {
1102-
InitSession(ctx);
1103+
} else if (!InitSession(ctx)) {
1104+
return;
11031105
}
11041106
} else {
11051107
for (const auto& [name, t] : ev->Get()->TopicAndTablets) {
@@ -1123,17 +1125,17 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvAuthResultOk
11231125
}
11241126

11251127
template <bool UseMigrationProtocol>
1126-
void TReadSessionActor<UseMigrationProtocol>::InitSession(const TActorContext& ctx) {
1128+
bool TReadSessionActor<UseMigrationProtocol>::InitSession(const TActorContext& ctx) {
11271129
TServerMessage result;
11281130
result.set_status(Ydb::StatusIds::SUCCESS);
11291131

11301132
result.mutable_init_response()->set_session_id(Session);
11311133
if (!WriteToStreamOrDie(ctx, std::move(result))) {
1132-
return;
1134+
return false;
11331135
}
11341136

11351137
if (!ReadFromStreamOrDie(ctx)) {
1136-
return;
1138+
return false;
11371139
}
11381140

11391141
for (auto& [_, holder] : Topics) {
@@ -1153,10 +1155,13 @@ void TReadSessionActor<UseMigrationProtocol>::InitSession(const TActorContext& c
11531155
for (const auto& [topicName, topic] : Topics) {
11541156
if (ReadWithoutConsumer) {
11551157
if (topic->Groups.size() == 0) {
1156-
return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "explicitly specify the partitions when reading without a consumer", ctx);
1158+
CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "explicitly specify the partitions when reading without a consumer", ctx);
1159+
return false;
11571160
}
11581161
for (auto group : topic->Groups) {
1159-
SendLockPartitionToSelf(group-1, topicName, topic, ctx);
1162+
if (!SendLockPartitionToSelf(group-1, topicName, topic, ctx)) {
1163+
return false;
1164+
}
11601165
}
11611166
} else {
11621167
RegisterSession(topic->FullConverter->GetInternalName(), topic->PipeClient, topic->Groups, ctx);
@@ -1166,13 +1171,16 @@ void TReadSessionActor<UseMigrationProtocol>::InitSession(const TActorContext& c
11661171
}
11671172

11681173
ctx.Schedule(TDuration::Seconds(AppData(ctx)->PQConfig.GetACLRetryTimeoutSec()), new TEvents::TEvWakeup(EWakeupTag::RecheckAcl));
1174+
1175+
return true;
11691176
}
11701177

11711178
template <bool UseMigrationProtocol>
1172-
void TReadSessionActor<UseMigrationProtocol>::SendLockPartitionToSelf(ui32 partitionId, TString topicName, const TTopicHolder::TPtr& topic, const TActorContext& ctx) {
1179+
bool TReadSessionActor<UseMigrationProtocol>::SendLockPartitionToSelf(ui32 partitionId, TString topicName, const TTopicHolder::TPtr& topic, const TActorContext& ctx) {
11731180
auto partitionIt = topic->Partitions.find(partitionId);
11741181
if (partitionIt == topic->Partitions.end()) {
1175-
return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, TStringBuilder() << "no partition " << partitionId << " in topic " << topicName, ctx);
1182+
CloseSession(PersQueue::ErrorCode::BAD_REQUEST, TStringBuilder() << "no partition " << partitionId << " in topic " << topicName, ctx);
1183+
return false;
11761184
}
11771185
THolder<TEvPersQueue::TEvLockPartition> res{new TEvPersQueue::TEvLockPartition};
11781186
res->Record.SetSession(Session);
@@ -1184,6 +1192,8 @@ void TReadSessionActor<UseMigrationProtocol>::SendLockPartitionToSelf(ui32 parti
11841192
res->Record.SetClientId(ClientId);
11851193
res->Record.SetTabletId(partitionIt->second.TabletId);
11861194
ctx.Send(ctx.SelfID, res.Release());
1195+
1196+
return true;
11871197
}
11881198

11891199
template <bool UseMigrationProtocol>
@@ -1783,7 +1793,9 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(NGRpcService::TGRpcRequestP
17831793
TServerMessage result;
17841794
result.set_status(Ydb::StatusIds::SUCCESS);
17851795
result.mutable_update_token_response();
1786-
WriteToStreamOrDie(ctx, std::move(result));
1796+
if (!WriteToStreamOrDie(ctx, std::move(result))) {
1797+
return;
1798+
}
17871799
}
17881800
} else {
17891801
if (ev->Get()->Retryable) {
@@ -2321,7 +2333,7 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvents::TEvWakeup::TPtr& e
23212333

23222334
switch (tag) {
23232335
case EWakeupTag::RlInit:
2324-
return InitSession(ctx);
2336+
return (void)InitSession(ctx);
23252337

23262338
case EWakeupTag::RecheckAcl:
23272339
return RecheckACL(ctx);

ydb/services/persqueue_v1/actors/read_session_actor.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -272,8 +272,8 @@ class TReadSessionActor
272272
}
273273
}
274274

275-
bool ReadFromStreamOrDie(const TActorContext& ctx);
276-
bool WriteToStreamOrDie(const TActorContext& ctx, TServerMessage&& response, bool finish = false);
275+
[[nodiscard]] bool ReadFromStreamOrDie(const TActorContext& ctx);
276+
[[nodiscard]] bool WriteToStreamOrDie(const TActorContext& ctx, TServerMessage&& response, bool finish = false);
277277
bool SendControlMessage(TPartitionId id, TServerMessage&& message, const TActorContext& ctx);
278278

279279
// grpc events
@@ -324,10 +324,10 @@ class TReadSessionActor
324324

325325
void RunAuthActor(const TActorContext& ctx);
326326
void RecheckACL(const TActorContext& ctx);
327-
void InitSession(const TActorContext& ctx);
327+
[[nodiscard]] bool InitSession(const TActorContext& ctx);
328328
void RegisterSession(const TString& topic, const TActorId& pipe, const TVector<ui32>& groups, const TActorContext& ctx);
329329
void CloseSession(PersQueue::ErrorCode::ErrorCode code, const TString& reason, const TActorContext& ctx);
330-
void SendLockPartitionToSelf(ui32 partitionId, TString topicName, const TTopicHolder::TPtr& topic, const TActorContext& ctx);
330+
[[nodiscard]] bool SendLockPartitionToSelf(ui32 partitionId, TString topicName, const TTopicHolder::TPtr& topic, const TActorContext& ctx);
331331

332332
void SetupBytesReadByUserAgentCounter();
333333
void SetupCounters();

0 commit comments

Comments
 (0)