From c7e36aea4d5e4b5c6b5e79ba22106d3ad6cdaaeb Mon Sep 17 00:00:00 2001 From: Vladislav Gogov Date: Fri, 15 Nov 2024 13:46:20 +0300 Subject: [PATCH] Fix #11186 (#11631) --- ydb/core/kqp/ut/olap/kqp_olap_ut.cpp | 25 ++++++++++++++++++ .../tx/columnshard/columnshard__write.cpp | 26 ++++++++++--------- ydb/core/tx/columnshard/data_reader/actor.cpp | 4 ++- ydb/core/tx/columnshard/data_reader/actor.h | 6 +++-- .../operations/batch_builder/builder.h | 5 ++-- ydb/core/tx/columnshard/operations/write.cpp | 24 ++++++++--------- ydb/core/tx/columnshard/operations/write.h | 2 +- 7 files changed, 62 insertions(+), 30 deletions(-) diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index c1e2a5d43f9a..002053da8e4c 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -2907,6 +2907,31 @@ Y_UNIT_TEST_SUITE(KqpOlap) { UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); } } + + Y_UNIT_TEST(ScanFailedSnapshotTooOld) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true); + appConfig.MutableColumnShardConfig()->SetMaxReadStaleness_ms(5000); + auto settings = TKikimrSettings().SetAppConfig(appConfig).SetWithSampleTables(false); + TTestHelper testHelper(settings); + + TTestHelper::TColumnTable cnt; + TVector schema = { + TTestHelper::TColumnSchema().SetName("key").SetType(NScheme::NTypeIds::Int32).SetNullable(false), + TTestHelper::TColumnSchema().SetName("c").SetType(NScheme::NTypeIds::Int32).SetNullable(true) + }; + cnt.SetName("/Root/cnt").SetPrimaryKey({ "key" }).SetSchema(schema); + testHelper.CreateTable(cnt); + Sleep(TDuration::Seconds(10)); + auto client = testHelper.GetKikimr().GetQueryClient(); + auto result = + client + .ExecuteQuery( + TStringBuilder() << "$v = SELECT CAST(COUNT(*) AS INT32) FROM `/Root/cnt`; INSERT INTO `/Root/cnt` (key, c) values(1, $v);", + NYdb::NQuery::TTxControl::BeginTx().CommitTx()) + .GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + } } } diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 93dda2268ae5..bab3ca229fdd 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -19,8 +19,8 @@ namespace NKikimr::NColumnShard { using namespace NTabletFlatExecutor; -void TColumnShard::OverloadWriteFail(const EOverloadStatus overloadReason, const NEvWrite::TWriteMeta& writeMeta, const ui64 writeSize, const ui64 cookie, - std::unique_ptr&& event, const TActorContext& ctx) { +void TColumnShard::OverloadWriteFail(const EOverloadStatus overloadReason, const NEvWrite::TWriteMeta& writeMeta, const ui64 writeSize, + const ui64 cookie, std::unique_ptr&& event, const TActorContext& ctx) { Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL); switch (overloadReason) { case EOverloadStatus::Disk: @@ -262,8 +262,8 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex << (writeMeta.GetWriteId() ? (" writeId " + ToString(writeMeta.GetWriteId())).c_str() : " ") << Counters.GetWritesMonitor()->DebugString() << " at tablet " << TabletID()); writeData.MutableWriteMeta().SetWriteMiddle1StartInstant(TMonotonic::Now()); - std::shared_ptr task = std::make_shared( - TabletID(), SelfId(), BufferizationWriteActorId, std::move(writeData), snapshotSchema, GetLastTxSnapshot(), Counters.GetCSCounters().WritingCounters); + std::shared_ptr task = std::make_shared(TabletID(), SelfId(), BufferizationWriteActorId, + std::move(writeData), snapshotSchema, GetLastTxSnapshot(), Counters.GetCSCounters().WritingCounters); NConveyor::TInsertServiceOperator::AsyncTaskToExecute(task); } } @@ -285,7 +285,8 @@ class TCommitOperation { } TCommitOperation(const ui64 tabletId) - : TabletId(tabletId) { + : TabletId(tabletId) + { } TConclusionStatus Parse(const NEvents::TDataEvents::TEvWrite& evWrite) { @@ -357,7 +358,8 @@ class TProposeWriteTransaction: public NTabletFlatExecutor::TTransactionBaseGetGeneration() != commitOperation->GetGeneration()) { - sendError("tablet lock have another generation: " + ::ToString(lockInfo->GetGeneration()) + - " != " + ::ToString(commitOperation->GetGeneration()), - NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN); + sendError("tablet lock have another generation: " + ::ToString(lockInfo->GetGeneration()) + " != " + + ::ToString(commitOperation->GetGeneration()), NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN); } else if (lockInfo->GetInternalGenerationCounter() != commitOperation->GetInternalGenerationCounter()) { sendError( "tablet lock have another internal generation counter: " + ::ToString(lockInfo->GetInternalGenerationCounter()) + @@ -567,7 +569,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor auto writeOperation = OperationsManager->RegisterOperation(lockId, cookie, granuleShardingVersionId, *mType); Y_ABORT_UNLESS(writeOperation); writeOperation->SetBehaviour(behaviour); - writeOperation->Start(*this, tableId, arrowData, source, schema, ctx); + writeOperation->Start(*this, tableId, arrowData, source, schema, ctx, NOlap::TSnapshot::Max()); } -} // namespace NKikimr::NColumnShard +} // namespace NKikimr::NColumnShard diff --git a/ydb/core/tx/columnshard/data_reader/actor.cpp b/ydb/core/tx/columnshard/data_reader/actor.cpp index 4fd69af8a7ab..a1fb223f78a6 100644 --- a/ydb/core/tx/columnshard/data_reader/actor.cpp +++ b/ydb/core/tx/columnshard/data_reader/actor.cpp @@ -23,6 +23,7 @@ void TActor::HandleExecute(NKqp::TEvKqpCompute::TEvScanData::TPtr& ev) { } else { AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "restore_task_finished")("reason", status.GetErrorMessage()); } + PassAway(); } } @@ -35,10 +36,11 @@ void TActor::HandleExecute(NKqp::TEvKqpCompute::TEvScanInitActor::TPtr& ev) { } void TActor::HandleExecute(NKqp::TEvKqpCompute::TEvScanError::TPtr& ev) { - SwitchStage(EStage::WaitData, EStage::Finished); + SwitchStage(std::nullopt, EStage::Finished); AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "problem_on_restore_data")( "reason", NYql::IssuesFromMessageAsString(ev->Get()->Record.GetIssues())); RestoreTask->OnError(NYql::IssuesFromMessageAsString(ev->Get()->Record.GetIssues())); + PassAway(); } void TActor::Bootstrap(const TActorContext& /*ctx*/) { diff --git a/ydb/core/tx/columnshard/data_reader/actor.h b/ydb/core/tx/columnshard/data_reader/actor.h index 2eca911a87e2..048ee314922f 100644 --- a/ydb/core/tx/columnshard/data_reader/actor.h +++ b/ydb/core/tx/columnshard/data_reader/actor.h @@ -59,8 +59,10 @@ class TActor: public NActors::TActorBootstrapped { EStage Stage = EStage::Initialization; static inline const ui64 FreeSpace = ((ui64)8) << 20; - void SwitchStage(const EStage from, const EStage to) { - AFL_VERIFY(Stage == from)("from", (ui32)from)("real", (ui32)Stage)("to", (ui32)to); + void SwitchStage(const std::optional from, const EStage to) { + if (from) { + AFL_VERIFY(Stage == *from)("from", (ui32)*from)("real", (ui32)Stage)("to", (ui32)to); + } Stage = to; } diff --git a/ydb/core/tx/columnshard/operations/batch_builder/builder.h b/ydb/core/tx/columnshard/operations/batch_builder/builder.h index 31ca0ac7ac43..654dd4ba8035 100644 --- a/ydb/core/tx/columnshard/operations/batch_builder/builder.h +++ b/ydb/core/tx/columnshard/operations/batch_builder/builder.h @@ -36,7 +36,8 @@ class TBuildBatchesTask: public NConveyor::ITask { , BufferActorId(bufferActorId) , ActualSchema(actualSchema) , ActualSnapshot(actualSnapshot) - , WritingCounters(writingCounters) { + , WritingCounters(writingCounters) + { } }; -} // namespace NKikimr::NOlap +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/operations/write.cpp b/ydb/core/tx/columnshard/operations/write.cpp index 1068e7167413..b69c25b8de8a 100644 --- a/ydb/core/tx/columnshard/operations/write.cpp +++ b/ydb/core/tx/columnshard/operations/write.cpp @@ -20,11 +20,12 @@ TWriteOperation::TWriteOperation(const TOperationWriteId writeId, const ui64 loc , LockId(lockId) , Cookie(cookie) , GranuleShardingVersionId(granuleShardingVersionId) - , ModificationType(mType) { + , ModificationType(mType) +{ } void TWriteOperation::Start(TColumnShard& owner, const ui64 tableId, const NEvWrite::IDataContainer::TPtr& data, const NActors::TActorId& source, - const std::shared_ptr& schema, const TActorContext& ctx) { + const std::shared_ptr& schema, const TActorContext& ctx, const NOlap::TSnapshot& applyToSnapshot) { Y_ABORT_UNLESS(Status == EOperationStatus::Draft); NEvWrite::TWriteMeta writeMeta((ui64)WriteId, tableId, source, GranuleShardingVersionId); @@ -34,13 +35,14 @@ void TWriteOperation::Start(TColumnShard& owner, const ui64 tableId, const NEvWr std::make_shared(owner.TabletID(), ctx.SelfID, owner.BufferizationWriteActorId, NEvWrite::TWriteData(writeMeta, data, owner.TablesManager.GetPrimaryIndex()->GetReplaceKey(), owner.StoragesManager->GetInsertOperator()->StartWritingAction(NOlap::NBlobOperations::EConsumer::WRITING_OPERATOR)), - schema, owner.GetLastTxSnapshot(), owner.Counters.GetCSCounters().WritingCounters); - NConveyor::TInsertServiceOperator::AsyncTaskToExecute(task); + schema, applyToSnapshot, owner.Counters.GetCSCounters().WritingCounters); + NConveyor::TCompServiceOperator::SendTaskToExecute(task); Status = EOperationStatus::Started; } -void TWriteOperation::CommitOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc, const NOlap::TSnapshot& snapshot) const { +void TWriteOperation::CommitOnExecute( + TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc, const NOlap::TSnapshot& snapshot) const { Y_ABORT_UNLESS(Status == EOperationStatus::Prepared); TBlobGroupSelector dsGroupSelector(owner.Info()); @@ -78,12 +80,10 @@ void TWriteOperation::OnWriteFinish( TString metadata; Y_ABORT_UNLESS(proto.SerializeToString(&metadata)); - db.Table() - .Key((ui64)WriteId) - .Update(NIceDb::TUpdate((ui32)Status), NIceDb::TUpdate(CreatedAt.Seconds()), - NIceDb::TUpdate(metadata), NIceDb::TUpdate(LockId), - NIceDb::TUpdate(Cookie), - NIceDb::TUpdate(GranuleShardingVersionId.value_or(0))); + db.Table().Key((ui64)WriteId).Update(NIceDb::TUpdate((ui32)Status), + NIceDb::TUpdate(CreatedAt.Seconds()), NIceDb::TUpdate(metadata), + NIceDb::TUpdate(LockId), NIceDb::TUpdate(Cookie), + NIceDb::TUpdate(GranuleShardingVersionId.value_or(0))); } void TWriteOperation::ToProto(NKikimrTxColumnShard::TInternalOperationData& proto) const { @@ -119,4 +119,4 @@ void TWriteOperation::AbortOnComplete(TColumnShard& /*owner*/) const { Y_ABORT_UNLESS(Status == EOperationStatus::Prepared); } -} // namespace NKikimr::NColumnShard +} // namespace NKikimr::NColumnShard diff --git a/ydb/core/tx/columnshard/operations/write.h b/ydb/core/tx/columnshard/operations/write.h index ad22caa651d4..5251402347c0 100644 --- a/ydb/core/tx/columnshard/operations/write.h +++ b/ydb/core/tx/columnshard/operations/write.h @@ -62,7 +62,7 @@ class TWriteOperation { const std::optional granuleShardingVersionId, const NEvWrite::EModificationType mType); void Start(TColumnShard& owner, const ui64 tableId, const NEvWrite::IDataContainer::TPtr& data, const NActors::TActorId& source, - const std::shared_ptr& schema, const TActorContext& ctx); + const std::shared_ptr& schema, const TActorContext& ctx, const NOlap::TSnapshot& applyToSnapshot); void OnWriteFinish(NTabletFlatExecutor::TTransactionContext& txc, const std::vector& insertWriteIds, const bool ephemeralFlag); void CommitOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc, const NOlap::TSnapshot& snapshot) const; void CommitOnComplete(TColumnShard& owner, const NOlap::TSnapshot& snapshot) const;