From 4db79e7b5934f754bef429dc70a0089547e866a5 Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Sat, 18 Oct 2025 11:43:28 +0300 Subject: [PATCH 1/4] [+] logging --- ydb/core/persqueue/pqtablet/pq_impl.cpp | 5 +++++ ydb/core/persqueue/pqtablet/transaction.cpp | 6 +++--- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/ydb/core/persqueue/pqtablet/pq_impl.cpp b/ydb/core/persqueue/pqtablet/pq_impl.cpp index 19d650699f0d..90cfb53554cd 100644 --- a/ydb/core/persqueue/pqtablet/pq_impl.cpp +++ b/ydb/core/persqueue/pqtablet/pq_impl.cpp @@ -811,6 +811,11 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& NKikimrPQ::TTransaction tx; PQ_ENSURE(tx.ParseFromString(pair.GetValue())); + if (tx.GetKind() == NKikimrPQ::TTransaction::KIND_UNKNOWN) { + PQ_LOG_TX_W("Invalid transaction state. Key " << pair.GetKey()); + continue; + } + PQ_LOG_TX_I("Restore Tx. " << "TxId: " << tx.GetTxId() << ", Step: " << tx.GetStep() << diff --git a/ydb/core/persqueue/pqtablet/transaction.cpp b/ydb/core/persqueue/pqtablet/transaction.cpp index 5da8bcc853f2..747257d2b8c3 100644 --- a/ydb/core/persqueue/pqtablet/transaction.cpp +++ b/ydb/core/persqueue/pqtablet/transaction.cpp @@ -51,7 +51,7 @@ TDistributedTransaction::TDistributedTransaction(const NKikimrPQ::TTransaction& InitConfigTransaction(tx); break; case NKikimrPQ::TTransaction::KIND_UNKNOWN: - Y_FAIL_S("unknown transaction type"); + AFL_ENSURE(false, "unknown transaction type"); } AFL_ENSURE(tx.HasSourceActor()); @@ -152,7 +152,7 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TEvProposeTr OnProposeTransaction(event.GetConfig(), extractTabletId); break; default: - Y_FAIL_S("unknown TxBody case"); + AFL_ENSURE(false, "unknown TxBody case"); } PartitionRepliesCount = 0; @@ -437,7 +437,7 @@ NKikimrPQ::TTransaction TDistributedTransaction::Serialize(EState state) { AddCmdWriteConfigTx(tx); break; case NKikimrPQ::TTransaction::KIND_UNKNOWN: - Y_FAIL_S("unknown transaction type"); + AFL_ENSURE(false, "unknown transaction type"); } tx.MutableOperations()->Add(Operations.begin(), Operations.end()); From bdf14e60e78a3961987ea4f4ef2e0cc98cff1335 Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Mon, 20 Oct 2025 13:35:58 +0300 Subject: [PATCH 2/4] [-] broken build --- ydb/core/persqueue/pqtablet/transaction.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ydb/core/persqueue/pqtablet/transaction.cpp b/ydb/core/persqueue/pqtablet/transaction.cpp index 747257d2b8c3..696651a420bc 100644 --- a/ydb/core/persqueue/pqtablet/transaction.cpp +++ b/ydb/core/persqueue/pqtablet/transaction.cpp @@ -51,7 +51,7 @@ TDistributedTransaction::TDistributedTransaction(const NKikimrPQ::TTransaction& InitConfigTransaction(tx); break; case NKikimrPQ::TTransaction::KIND_UNKNOWN: - AFL_ENSURE(false, "unknown transaction type"); + AFL_ENSURE(false); } AFL_ENSURE(tx.HasSourceActor()); @@ -152,7 +152,7 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TEvProposeTr OnProposeTransaction(event.GetConfig(), extractTabletId); break; default: - AFL_ENSURE(false, "unknown TxBody case"); + AFL_ENSURE(false); } PartitionRepliesCount = 0; @@ -437,7 +437,7 @@ NKikimrPQ::TTransaction TDistributedTransaction::Serialize(EState state) { AddCmdWriteConfigTx(tx); break; case NKikimrPQ::TTransaction::KIND_UNKNOWN: - AFL_ENSURE(false, "unknown transaction type"); + AFL_ENSURE(false); } tx.MutableOperations()->Add(Operations.begin(), Operations.end()); From 06d06f1a387e174b0e6e0861ca2f2673b1650c02 Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Tue, 21 Oct 2025 14:05:28 +0300 Subject: [PATCH 3/4] [+] TX_ENSURE --- ydb/core/persqueue/pqtablet/transaction.cpp | 52 +++++++++++---------- 1 file changed, 27 insertions(+), 25 deletions(-) diff --git a/ydb/core/persqueue/pqtablet/transaction.cpp b/ydb/core/persqueue/pqtablet/transaction.cpp index 696651a420bc..538842869024 100644 --- a/ydb/core/persqueue/pqtablet/transaction.cpp +++ b/ydb/core/persqueue/pqtablet/transaction.cpp @@ -4,6 +4,8 @@ #include +#define TX_ENSURE(condition) AFL_ENSURE(condition)("TxId", TxId)("State", NKikimrPQ::TTransaction_EState_Name(State)) + namespace NKikimr::NPQ { TDistributedTransaction::TDistributedTransaction(const NKikimrPQ::TTransaction& tx) : @@ -51,10 +53,10 @@ TDistributedTransaction::TDistributedTransaction(const NKikimrPQ::TTransaction& InitConfigTransaction(tx); break; case NKikimrPQ::TTransaction::KIND_UNKNOWN: - AFL_ENSURE(false); + TX_ENSURE(false); } - AFL_ENSURE(tx.HasSourceActor()); + TX_ENSURE(tx.HasSourceActor()); SourceActor = ActorIdFromProto(tx.GetSourceActor()); if (tx.HasWriteId()) { @@ -133,8 +135,8 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TEvProposeTr ui64 minStep, ui64 extractTabletId) { - AFL_ENSURE(event.GetTxBodyCase() != NKikimrPQ::TEvProposeTransaction::TXBODY_NOT_SET); - AFL_ENSURE(TxId == Max()); + TX_ENSURE(event.GetTxBodyCase() != NKikimrPQ::TEvProposeTransaction::TXBODY_NOT_SET); + TX_ENSURE(TxId == Max()); TxId = event.GetTxId(); @@ -142,17 +144,17 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TEvProposeTr switch (event.GetTxBodyCase()) { case NKikimrPQ::TEvProposeTransaction::kData: - AFL_ENSURE(event.HasData()); + TX_ENSURE(event.HasData()); MaxStep = MinStep + TDuration::Seconds(30).MilliSeconds(); OnProposeTransaction(event.GetData(), extractTabletId); break; case NKikimrPQ::TEvProposeTransaction::kConfig: - AFL_ENSURE(event.HasConfig()); + TX_ENSURE(event.HasConfig()); MaxStep = Max(); OnProposeTransaction(event.GetConfig(), extractTabletId); break; default: - AFL_ENSURE(false); + TX_ENSURE(false); } PartitionRepliesCount = 0; @@ -160,7 +162,7 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TEvProposeTr ReadSetCount = 0; - AFL_ENSURE(event.HasSourceActor()); + TX_ENSURE(event.HasSourceActor()); SourceActor = ActorIdFromProto(event.GetSourceActor()); } @@ -231,8 +233,8 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TConfigTrans void TDistributedTransaction::OnPlanStep(ui64 step) { - AFL_ENSURE(Step == Max()); - AFL_ENSURE(TxId != Max()); + TX_ENSURE(Step == Max()); + TX_ENSURE(TxId != Max()); Step = step; } @@ -285,10 +287,10 @@ void TDistributedTransaction::OnProposePartitionConfigResult(TEvPQ::TEvProposePa template void TDistributedTransaction::OnPartitionResult(const E& event, TMaybe decision) { - AFL_ENSURE(Step == event.Step); - AFL_ENSURE(TxId == event.TxId); + TX_ENSURE(Step == event.Step); + TX_ENSURE(TxId == event.TxId); - AFL_ENSURE(Partitions.contains(event.Partition.OriginalPartitionId)); + TX_ENSURE(Partitions.contains(event.Partition.OriginalPartitionId)); if (decision.Defined()) { SetDecision(SelfDecision, *decision); @@ -305,12 +307,12 @@ void TDistributedTransaction::OnReadSet(const NKikimrTx::TEvReadSet& event, { PQ_LOG_TX_D("Handle TEvReadSet " << TxId); - AFL_ENSURE((Step == Max()) || (event.HasStep() && (Step == event.GetStep()))); - AFL_ENSURE(event.HasTxId() && (TxId == event.GetTxId())); + TX_ENSURE((Step == Max()) || (event.HasStep() && (Step == event.GetStep()))); + TX_ENSURE(event.HasTxId() && (TxId == event.GetTxId())); if (PredicatesReceived.contains(event.GetTabletProducer())) { NKikimrTx::TReadSetData data; - AFL_ENSURE(event.HasReadSet() && data.ParseFromString(event.GetReadSet())); + TX_ENSURE(event.HasReadSet() && data.ParseFromString(event.GetReadSet())); SetDecision(ParticipantsDecision, data.GetDecision()); ReadSetAcks[sender] = std::move(ack); @@ -326,7 +328,7 @@ void TDistributedTransaction::OnReadSet(const NKikimrTx::TEvReadSet& event, NKikimrPQ::TPartitions d; if (data.HasData()) { auto r = data.GetData().UnpackTo(&d); - AFL_ENSURE(r)("description", "Unexpected data"); + TX_ENSURE(r)("description", "Unexpected data"); } for (auto& v : *d.MutablePartition()) { @@ -341,8 +343,8 @@ void TDistributedTransaction::OnReadSetAck(const NKikimrTx::TEvReadSetAck& event { PQ_LOG_TX_D("Handle TEvReadSetAck txId " << TxId); - AFL_ENSURE(event.HasStep() && (Step == event.GetStep())); - AFL_ENSURE(event.HasTxId() && (TxId == event.GetTxId())); + TX_ENSURE(event.HasStep() && (Step == event.GetStep())); + TX_ENSURE(event.HasTxId() && (TxId == event.GetTxId())); OnReadSetAck(event.GetTabletConsumer()); } @@ -359,10 +361,10 @@ void TDistributedTransaction::OnReadSetAck(ui64 tabletId) void TDistributedTransaction::OnTxCommitDone(const TEvPQ::TEvTxCommitDone& event) { - AFL_ENSURE(Step == event.Step); - AFL_ENSURE(TxId == event.TxId); + TX_ENSURE(Step == event.Step); + TX_ENSURE(TxId == event.TxId); - AFL_ENSURE(Partitions.contains(event.Partition.OriginalPartitionId)); + TX_ENSURE(Partitions.contains(event.Partition.OriginalPartitionId)); ++PartitionRepliesCount; } @@ -406,7 +408,7 @@ void TDistributedTransaction::AddCmdWrite(NKikimrClient::TKeyValueRequest& reque PQ_LOG_TX_D("save tx " << tx.ShortDebugString()); TString value; - AFL_ENSURE(tx.SerializeToString(&value)); + TX_ENSURE(tx.SerializeToString(&value)); auto command = request.AddCmdWrite(); command->SetKey(GetKey()); @@ -437,7 +439,7 @@ NKikimrPQ::TTransaction TDistributedTransaction::Serialize(EState state) { AddCmdWriteConfigTx(tx); break; case NKikimrPQ::TTransaction::KIND_UNKNOWN: - AFL_ENSURE(false); + TX_ENSURE(false); } tx.MutableOperations()->Add(Operations.begin(), Operations.end()); @@ -452,7 +454,7 @@ NKikimrPQ::TTransaction TDistributedTransaction::Serialize(EState state) { tx.AddPredicateRecipients(tabletId); } - AFL_ENSURE(SourceActor != TActorId()); + TX_ENSURE(SourceActor != TActorId()); ActorIdToProto(SourceActor, tx.MutableSourceActor()); *tx.MutablePartitions() = PartitionsData; From d444402d65e7cec2a86bd6582a6ce27665462eb1 Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Tue, 21 Oct 2025 14:10:19 +0300 Subject: [PATCH 4/4] [*] restart instead of logging --- ydb/core/persqueue/pqtablet/pq_impl.cpp | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/ydb/core/persqueue/pqtablet/pq_impl.cpp b/ydb/core/persqueue/pqtablet/pq_impl.cpp index 90cfb53554cd..e90174e5751c 100644 --- a/ydb/core/persqueue/pqtablet/pq_impl.cpp +++ b/ydb/core/persqueue/pqtablet/pq_impl.cpp @@ -811,10 +811,7 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& NKikimrPQ::TTransaction tx; PQ_ENSURE(tx.ParseFromString(pair.GetValue())); - if (tx.GetKind() == NKikimrPQ::TTransaction::KIND_UNKNOWN) { - PQ_LOG_TX_W("Invalid transaction state. Key " << pair.GetKey()); - continue; - } + PQ_ENSURE(tx.GetKind() != NKikimrPQ::TTransaction::KIND_UNKNOWN)("Key", pair.GetKey()); PQ_LOG_TX_I("Restore Tx. " << "TxId: " << tx.GetTxId() <<