Skip to content

Commit f096077

Browse files
Alek5andr-Kotovydbot
authored andcommitted
Y_FAIL_S in the TDistributedTransaction constructor (#27103)
1 parent f44811c commit f096077

File tree

2 files changed

+29
-25
lines changed

2 files changed

+29
-25
lines changed

ydb/core/persqueue/pqtablet/pq_impl.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -811,6 +811,8 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult&
811811
NKikimrPQ::TTransaction tx;
812812
PQ_ENSURE(tx.ParseFromString(pair.GetValue()));
813813

814+
PQ_ENSURE(tx.GetKind() != NKikimrPQ::TTransaction::KIND_UNKNOWN)("Key", pair.GetKey());
815+
814816
PQ_LOG_TX_I("Restore Tx. " <<
815817
"TxId: " << tx.GetTxId() <<
816818
", Step: " << tx.GetStep() <<

ydb/core/persqueue/pqtablet/transaction.cpp

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
#include <ydb/library/wilson_ids/wilson.h>
66

7+
#define TX_ENSURE(condition) AFL_ENSURE(condition)("TxId", TxId)("State", NKikimrPQ::TTransaction_EState_Name(State))
8+
79
namespace NKikimr::NPQ {
810

911
TDistributedTransaction::TDistributedTransaction(const NKikimrPQ::TTransaction& tx) :
@@ -51,10 +53,10 @@ TDistributedTransaction::TDistributedTransaction(const NKikimrPQ::TTransaction&
5153
InitConfigTransaction(tx);
5254
break;
5355
case NKikimrPQ::TTransaction::KIND_UNKNOWN:
54-
Y_FAIL_S("unknown transaction type");
56+
TX_ENSURE(false);
5557
}
5658

57-
AFL_ENSURE(tx.HasSourceActor());
59+
TX_ENSURE(tx.HasSourceActor());
5860
SourceActor = ActorIdFromProto(tx.GetSourceActor());
5961

6062
if (tx.HasWriteId()) {
@@ -133,34 +135,34 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TEvProposeTr
133135
ui64 minStep,
134136
ui64 extractTabletId)
135137
{
136-
AFL_ENSURE(event.GetTxBodyCase() != NKikimrPQ::TEvProposeTransaction::TXBODY_NOT_SET);
137-
AFL_ENSURE(TxId == Max<ui64>());
138+
TX_ENSURE(event.GetTxBodyCase() != NKikimrPQ::TEvProposeTransaction::TXBODY_NOT_SET);
139+
TX_ENSURE(TxId == Max<ui64>());
138140

139141
TxId = event.GetTxId();
140142

141143
MinStep = minStep;
142144

143145
switch (event.GetTxBodyCase()) {
144146
case NKikimrPQ::TEvProposeTransaction::kData:
145-
AFL_ENSURE(event.HasData());
147+
TX_ENSURE(event.HasData());
146148
MaxStep = MinStep + TDuration::Seconds(30).MilliSeconds();
147149
OnProposeTransaction(event.GetData(), extractTabletId);
148150
break;
149151
case NKikimrPQ::TEvProposeTransaction::kConfig:
150-
AFL_ENSURE(event.HasConfig());
152+
TX_ENSURE(event.HasConfig());
151153
MaxStep = Max<ui64>();
152154
OnProposeTransaction(event.GetConfig(), extractTabletId);
153155
break;
154156
default:
155-
Y_FAIL_S("unknown TxBody case");
157+
TX_ENSURE(false);
156158
}
157159

158160
PartitionRepliesCount = 0;
159161
PartitionRepliesExpected = 0;
160162

161163
ReadSetCount = 0;
162164

163-
AFL_ENSURE(event.HasSourceActor());
165+
TX_ENSURE(event.HasSourceActor());
164166
SourceActor = ActorIdFromProto(event.GetSourceActor());
165167
}
166168

@@ -231,8 +233,8 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TConfigTrans
231233

232234
void TDistributedTransaction::OnPlanStep(ui64 step)
233235
{
234-
AFL_ENSURE(Step == Max<ui64>());
235-
AFL_ENSURE(TxId != Max<ui64>());
236+
TX_ENSURE(Step == Max<ui64>());
237+
TX_ENSURE(TxId != Max<ui64>());
236238

237239
Step = step;
238240
}
@@ -285,10 +287,10 @@ void TDistributedTransaction::OnProposePartitionConfigResult(TEvPQ::TEvProposePa
285287
template<class E>
286288
void TDistributedTransaction::OnPartitionResult(const E& event, TMaybe<EDecision> decision)
287289
{
288-
AFL_ENSURE(Step == event.Step);
289-
AFL_ENSURE(TxId == event.TxId);
290+
TX_ENSURE(Step == event.Step);
291+
TX_ENSURE(TxId == event.TxId);
290292

291-
AFL_ENSURE(Partitions.contains(event.Partition.OriginalPartitionId));
293+
TX_ENSURE(Partitions.contains(event.Partition.OriginalPartitionId));
292294

293295
if (decision.Defined()) {
294296
SetDecision(SelfDecision, *decision);
@@ -305,12 +307,12 @@ void TDistributedTransaction::OnReadSet(const NKikimrTx::TEvReadSet& event,
305307
{
306308
PQ_LOG_TX_D("Handle TEvReadSet " << TxId);
307309

308-
AFL_ENSURE((Step == Max<ui64>()) || (event.HasStep() && (Step == event.GetStep())));
309-
AFL_ENSURE(event.HasTxId() && (TxId == event.GetTxId()));
310+
TX_ENSURE((Step == Max<ui64>()) || (event.HasStep() && (Step == event.GetStep())));
311+
TX_ENSURE(event.HasTxId() && (TxId == event.GetTxId()));
310312

311313
if (PredicatesReceived.contains(event.GetTabletProducer())) {
312314
NKikimrTx::TReadSetData data;
313-
AFL_ENSURE(event.HasReadSet() && data.ParseFromString(event.GetReadSet()));
315+
TX_ENSURE(event.HasReadSet() && data.ParseFromString(event.GetReadSet()));
314316

315317
SetDecision(ParticipantsDecision, data.GetDecision());
316318
ReadSetAcks[sender] = std::move(ack);
@@ -326,7 +328,7 @@ void TDistributedTransaction::OnReadSet(const NKikimrTx::TEvReadSet& event,
326328
NKikimrPQ::TPartitions d;
327329
if (data.HasData()) {
328330
auto r = data.GetData().UnpackTo(&d);
329-
AFL_ENSURE(r)("description", "Unexpected data");
331+
TX_ENSURE(r)("description", "Unexpected data");
330332
}
331333

332334
for (auto& v : *d.MutablePartition()) {
@@ -341,8 +343,8 @@ void TDistributedTransaction::OnReadSetAck(const NKikimrTx::TEvReadSetAck& event
341343
{
342344
PQ_LOG_TX_D("Handle TEvReadSetAck txId " << TxId);
343345

344-
AFL_ENSURE(event.HasStep() && (Step == event.GetStep()));
345-
AFL_ENSURE(event.HasTxId() && (TxId == event.GetTxId()));
346+
TX_ENSURE(event.HasStep() && (Step == event.GetStep()));
347+
TX_ENSURE(event.HasTxId() && (TxId == event.GetTxId()));
346348

347349
OnReadSetAck(event.GetTabletConsumer());
348350
}
@@ -359,10 +361,10 @@ void TDistributedTransaction::OnReadSetAck(ui64 tabletId)
359361

360362
void TDistributedTransaction::OnTxCommitDone(const TEvPQ::TEvTxCommitDone& event)
361363
{
362-
AFL_ENSURE(Step == event.Step);
363-
AFL_ENSURE(TxId == event.TxId);
364+
TX_ENSURE(Step == event.Step);
365+
TX_ENSURE(TxId == event.TxId);
364366

365-
AFL_ENSURE(Partitions.contains(event.Partition.OriginalPartitionId));
367+
TX_ENSURE(Partitions.contains(event.Partition.OriginalPartitionId));
366368

367369
++PartitionRepliesCount;
368370
}
@@ -406,7 +408,7 @@ void TDistributedTransaction::AddCmdWrite(NKikimrClient::TKeyValueRequest& reque
406408
PQ_LOG_TX_D("save tx " << tx.ShortDebugString());
407409

408410
TString value;
409-
AFL_ENSURE(tx.SerializeToString(&value));
411+
TX_ENSURE(tx.SerializeToString(&value));
410412

411413
auto command = request.AddCmdWrite();
412414
command->SetKey(GetKey());
@@ -437,7 +439,7 @@ NKikimrPQ::TTransaction TDistributedTransaction::Serialize(EState state) {
437439
AddCmdWriteConfigTx(tx);
438440
break;
439441
case NKikimrPQ::TTransaction::KIND_UNKNOWN:
440-
Y_FAIL_S("unknown transaction type");
442+
TX_ENSURE(false);
441443
}
442444

443445
tx.MutableOperations()->Add(Operations.begin(), Operations.end());
@@ -452,7 +454,7 @@ NKikimrPQ::TTransaction TDistributedTransaction::Serialize(EState state) {
452454
tx.AddPredicateRecipients(tabletId);
453455
}
454456

455-
AFL_ENSURE(SourceActor != TActorId());
457+
TX_ENSURE(SourceActor != TActorId());
456458
ActorIdToProto(SourceActor, tx.MutableSourceActor());
457459

458460
*tx.MutablePartitions() = PartitionsData;

0 commit comments

Comments
 (0)