Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ydb/core/persqueue/pqtablet/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,8 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult&
NKikimrPQ::TTransaction tx;
PQ_ENSURE(tx.ParseFromString(pair.GetValue()));

PQ_ENSURE(tx.GetKind() != NKikimrPQ::TTransaction::KIND_UNKNOWN)("Key", pair.GetKey());

PQ_LOG_TX_I("Restore Tx. " <<
"TxId: " << tx.GetTxId() <<
", Step: " << tx.GetStep() <<
Expand Down
52 changes: 27 additions & 25 deletions ydb/core/persqueue/pqtablet/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

#include <ydb/library/wilson_ids/wilson.h>

#define TX_ENSURE(condition) AFL_ENSURE(condition)("TxId", TxId)("State", NKikimrPQ::TTransaction_EState_Name(State))

namespace NKikimr::NPQ {

TDistributedTransaction::TDistributedTransaction(const NKikimrPQ::TTransaction& tx) :
Expand Down Expand Up @@ -51,10 +53,10 @@ TDistributedTransaction::TDistributedTransaction(const NKikimrPQ::TTransaction&
InitConfigTransaction(tx);
break;
case NKikimrPQ::TTransaction::KIND_UNKNOWN:
Y_FAIL_S("unknown transaction type");
TX_ENSURE(false);
}

AFL_ENSURE(tx.HasSourceActor());
TX_ENSURE(tx.HasSourceActor());
SourceActor = ActorIdFromProto(tx.GetSourceActor());

if (tx.HasWriteId()) {
Expand Down Expand Up @@ -133,34 +135,34 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TEvProposeTr
ui64 minStep,
ui64 extractTabletId)
{
AFL_ENSURE(event.GetTxBodyCase() != NKikimrPQ::TEvProposeTransaction::TXBODY_NOT_SET);
AFL_ENSURE(TxId == Max<ui64>());
TX_ENSURE(event.GetTxBodyCase() != NKikimrPQ::TEvProposeTransaction::TXBODY_NOT_SET);
TX_ENSURE(TxId == Max<ui64>());

TxId = event.GetTxId();

MinStep = minStep;

switch (event.GetTxBodyCase()) {
case NKikimrPQ::TEvProposeTransaction::kData:
AFL_ENSURE(event.HasData());
TX_ENSURE(event.HasData());
MaxStep = MinStep + TDuration::Seconds(30).MilliSeconds();
OnProposeTransaction(event.GetData(), extractTabletId);
break;
case NKikimrPQ::TEvProposeTransaction::kConfig:
AFL_ENSURE(event.HasConfig());
TX_ENSURE(event.HasConfig());
MaxStep = Max<ui64>();
OnProposeTransaction(event.GetConfig(), extractTabletId);
break;
default:
Y_FAIL_S("unknown TxBody case");
TX_ENSURE(false);
}

PartitionRepliesCount = 0;
PartitionRepliesExpected = 0;

ReadSetCount = 0;

AFL_ENSURE(event.HasSourceActor());
TX_ENSURE(event.HasSourceActor());
SourceActor = ActorIdFromProto(event.GetSourceActor());
}

Expand Down Expand Up @@ -231,8 +233,8 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TConfigTrans

void TDistributedTransaction::OnPlanStep(ui64 step)
{
AFL_ENSURE(Step == Max<ui64>());
AFL_ENSURE(TxId != Max<ui64>());
TX_ENSURE(Step == Max<ui64>());
TX_ENSURE(TxId != Max<ui64>());

Step = step;
}
Expand Down Expand Up @@ -285,10 +287,10 @@ void TDistributedTransaction::OnProposePartitionConfigResult(TEvPQ::TEvProposePa
template<class E>
void TDistributedTransaction::OnPartitionResult(const E& event, TMaybe<EDecision> decision)
{
AFL_ENSURE(Step == event.Step);
AFL_ENSURE(TxId == event.TxId);
TX_ENSURE(Step == event.Step);
TX_ENSURE(TxId == event.TxId);

AFL_ENSURE(Partitions.contains(event.Partition.OriginalPartitionId));
TX_ENSURE(Partitions.contains(event.Partition.OriginalPartitionId));

if (decision.Defined()) {
SetDecision(SelfDecision, *decision);
Expand All @@ -305,12 +307,12 @@ void TDistributedTransaction::OnReadSet(const NKikimrTx::TEvReadSet& event,
{
PQ_LOG_TX_D("Handle TEvReadSet " << TxId);

AFL_ENSURE((Step == Max<ui64>()) || (event.HasStep() && (Step == event.GetStep())));
AFL_ENSURE(event.HasTxId() && (TxId == event.GetTxId()));
TX_ENSURE((Step == Max<ui64>()) || (event.HasStep() && (Step == event.GetStep())));
TX_ENSURE(event.HasTxId() && (TxId == event.GetTxId()));

if (PredicatesReceived.contains(event.GetTabletProducer())) {
NKikimrTx::TReadSetData data;
AFL_ENSURE(event.HasReadSet() && data.ParseFromString(event.GetReadSet()));
TX_ENSURE(event.HasReadSet() && data.ParseFromString(event.GetReadSet()));

SetDecision(ParticipantsDecision, data.GetDecision());
ReadSetAcks[sender] = std::move(ack);
Expand All @@ -326,7 +328,7 @@ void TDistributedTransaction::OnReadSet(const NKikimrTx::TEvReadSet& event,
NKikimrPQ::TPartitions d;
if (data.HasData()) {
auto r = data.GetData().UnpackTo(&d);
AFL_ENSURE(r)("description", "Unexpected data");
TX_ENSURE(r)("description", "Unexpected data");
}

for (auto& v : *d.MutablePartition()) {
Expand All @@ -341,8 +343,8 @@ void TDistributedTransaction::OnReadSetAck(const NKikimrTx::TEvReadSetAck& event
{
PQ_LOG_TX_D("Handle TEvReadSetAck txId " << TxId);

AFL_ENSURE(event.HasStep() && (Step == event.GetStep()));
AFL_ENSURE(event.HasTxId() && (TxId == event.GetTxId()));
TX_ENSURE(event.HasStep() && (Step == event.GetStep()));
TX_ENSURE(event.HasTxId() && (TxId == event.GetTxId()));

OnReadSetAck(event.GetTabletConsumer());
}
Expand All @@ -359,10 +361,10 @@ void TDistributedTransaction::OnReadSetAck(ui64 tabletId)

void TDistributedTransaction::OnTxCommitDone(const TEvPQ::TEvTxCommitDone& event)
{
AFL_ENSURE(Step == event.Step);
AFL_ENSURE(TxId == event.TxId);
TX_ENSURE(Step == event.Step);
TX_ENSURE(TxId == event.TxId);

AFL_ENSURE(Partitions.contains(event.Partition.OriginalPartitionId));
TX_ENSURE(Partitions.contains(event.Partition.OriginalPartitionId));

++PartitionRepliesCount;
}
Expand Down Expand Up @@ -406,7 +408,7 @@ void TDistributedTransaction::AddCmdWrite(NKikimrClient::TKeyValueRequest& reque
PQ_LOG_TX_D("save tx " << tx.ShortDebugString());

TString value;
AFL_ENSURE(tx.SerializeToString(&value));
TX_ENSURE(tx.SerializeToString(&value));

auto command = request.AddCmdWrite();
command->SetKey(GetKey());
Expand Down Expand Up @@ -437,7 +439,7 @@ NKikimrPQ::TTransaction TDistributedTransaction::Serialize(EState state) {
AddCmdWriteConfigTx(tx);
break;
case NKikimrPQ::TTransaction::KIND_UNKNOWN:
Y_FAIL_S("unknown transaction type");
TX_ENSURE(false);
}

tx.MutableOperations()->Add(Operations.begin(), Operations.end());
Expand All @@ -452,7 +454,7 @@ NKikimrPQ::TTransaction TDistributedTransaction::Serialize(EState state) {
tx.AddPredicateRecipients(tabletId);
}

AFL_ENSURE(SourceActor != TActorId());
TX_ENSURE(SourceActor != TActorId());
ActorIdToProto(SourceActor, tx.MutableSourceActor());

*tx.MutablePartitions() = PartitionsData;
Expand Down
Loading