Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ TConclusionStatus TReadMetadata::Init(const NColumnShard::TColumnShard* owner, c
auto op = owner->GetOperationsManager().GetOperationByInsertWriteIdVerified(i);
// we do not need to check our own uncommitted writes
if (op->GetLockId() != *LockId) {
AddWriteIdToCheck(i, op->GetLockId());
AddMaybeConflictingWrite(i, op->GetLockId());
}
}
}
Expand Down Expand Up @@ -88,12 +88,13 @@ void TReadMetadata::DoOnReadFinished(NColumnShard::TColumnShard& owner) const {
return;
}
const ui64 lock = *GetLockId();
if (GetBrokenWithCommitted()) {
if (GetBreakLockOnReadFinished()) {
owner.GetOperationsManager().GetLockVerified(lock).SetBroken();
} else {
NOlap::NTxInteractions::TTxConflicts conflicts;
for (auto&& i : GetConflictableLockIds()) {
conflicts.Add(i, lock);
for (auto&& lockIdToCommit : GetConflictingLockIds()) {
// if lockIdToCommit commits, lock must be broken
conflicts.Add(lockIdToCommit, lock);
}
if (!conflicts.IsEmpty()) {
auto writer =
Expand All @@ -108,7 +109,7 @@ void TReadMetadata::DoOnBeforeStartReading(NColumnShard::TColumnShard& owner) co
return;
}
auto evWriter = std::make_shared<NOlap::NTxInteractions::TEvReadStartWriter>(TableMetadataAccessor->GetPathIdVerified(),
GetResultSchema()->GetIndexInfo().GetPrimaryKey(), GetPKRangesFilterPtr(), GetConflictableLockIds());
GetResultSchema()->GetIndexInfo().GetPrimaryKey(), GetPKRangesFilterPtr(), GetMaybeConflictingLockIds());
owner.GetOperationsManager().AddEventForLock(owner, *LockId, evWriter);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class TReadMetadata: public TReadMetadataBase {
using TBase = TReadMetadataBase;

private:
std::shared_ptr<TAtomicCounter> BrokenWithCommitted = std::make_shared<TAtomicCounter>();
mutable TAtomicCounter BreakLockOnReadFinished = TAtomicCounter();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mutable? Из-за избавления от shared_ptr?

Copy link
Collaborator Author

@kirillvasilenko kirillvasilenko Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Так точно. Там метод который это поле меняет const, и на этот const там полагаются вызывающие методы и т.д. В общем, я не стал все это править, просто локально упростил. shared_ptr убрал т.к. это внутренняя переменная, никуда мы ее не шарим, а сам класс не копируемый.

std::shared_ptr<NColumnShard::TLockSharingInfo> LockSharingInfo;

class TWriteIdInfo {
Expand All @@ -99,17 +99,17 @@ class TReadMetadata: public TReadMetadataBase {
return LockId;
}

void MarkAsConflictable() const {
void MarkAsConflicting() const {
Conflicts->Inc();
}

bool IsConflictable() const {
bool IsConflicting() const {
return Conflicts->Val();
}
};

THashMap<ui64, std::shared_ptr<TAtomicCounter>> LockConflictCounters;
THashMap<TInsertWriteId, TWriteIdInfo> ConflictedWriteIds;
THashMap<TInsertWriteId, TWriteIdInfo> ConflictingWrites;

virtual void DoOnReadFinished(NColumnShard::TColumnShard& owner) const override;
virtual void DoOnBeforeStartReading(NColumnShard::TColumnShard& owner) const override;
Expand All @@ -132,51 +132,48 @@ class TReadMetadata: public TReadMetadataBase {
return std::move(SourcesConstructor);
}

bool GetBrokenWithCommitted() const {
return BrokenWithCommitted->Val();
bool GetBreakLockOnReadFinished() const {
return BreakLockOnReadFinished.Val();
}
THashSet<ui64> GetConflictableLockIds() const {

void SetBreakLockOnReadFinished() const {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А почему не Add или Inc?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ну оно по смыслу не Add. Мы именно просим «сломай свой лок когда закончится чтение».

Copy link
Collaborator Author

@kirillvasilenko kirillvasilenko Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

То что оно там add внутри - это детали реализации, не вижу смысла «показывать» их наружу.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Странно выглядит, что на самом деле это не Set

BreakLockOnReadFinished.Inc();
}

THashSet<ui64> GetConflictingLockIds() const {
THashSet<ui64> result;
for (auto&& i : ConflictedWriteIds) {
if (i.second.IsConflictable()) {
result.emplace(i.second.GetLockId());
for (auto& [_, writeIdInfo] : ConflictingWrites) {
if (writeIdInfo.IsConflicting()) {
result.emplace(writeIdInfo.GetLockId());
}
}
return result;
}

bool IsLockConflictable(const ui64 lockId) const {
auto it = LockConflictCounters.find(lockId);
AFL_VERIFY(it != LockConflictCounters.end());
return it->second->Val();
}

bool IsWriteConflictable(const TInsertWriteId writeId) const {
auto it = ConflictedWriteIds.find(writeId);
AFL_VERIFY(it != ConflictedWriteIds.end());
return it->second.IsConflictable();
THashSet<ui64> GetMaybeConflictingLockIds() const {
THashSet<ui64> result;
for (auto& [_, writeInfo] : ConflictingWrites) {
result.emplace(writeInfo.GetLockId());
}
return result;
}

bool MayWriteBeConflicting(const TInsertWriteId writeId) const {
return ConflictedWriteIds.contains(writeId);
return ConflictingWrites.contains(writeId);
}

void AddWriteIdToCheck(const TInsertWriteId writeId, const ui64 lockId) {
void AddMaybeConflictingWrite(const TInsertWriteId writeId, const ui64 lockId) {
auto it = LockConflictCounters.find(lockId);
if (it == LockConflictCounters.end()) {
it = LockConflictCounters.emplace(lockId, std::make_shared<TAtomicCounter>()).first;
}
AFL_VERIFY(ConflictedWriteIds.emplace(writeId, TWriteIdInfo(lockId, it->second)).second);
AFL_VERIFY(ConflictingWrites.emplace(writeId, TWriteIdInfo(lockId, it->second)).second);
}

void SetConflictedWriteId(const TInsertWriteId writeId) const {
auto it = ConflictedWriteIds.find(writeId);
AFL_VERIFY(it != ConflictedWriteIds.end());
it->second.MarkAsConflictable();
}

void SetBrokenWithCommitted() const {
BrokenWithCommitted->Inc();
void SetWriteConflicting(const TInsertWriteId writeId) const {
auto it = ConflictingWrites.find(writeId);
AFL_VERIFY(it != ConflictingWrites.end());
it->second.MarkAsConflicting();
}

NArrow::NMerger::TSortableBatchPosition BuildSortedPosition(const NArrow::TSimpleRow& key) const;
Expand All @@ -192,6 +189,9 @@ class TReadMetadata: public TReadMetadataBase {

TReadMetadata(const std::shared_ptr<const TVersionedIndex>& schemaIndex, const TReadDescription& read);

TReadMetadata(const TReadMetadata&) = delete;
TReadMetadata& operator=(const TReadMetadata&) = delete;

virtual std::vector<TNameTypeInfo> GetKeyYqlSchema() const override {
return GetResultSchema()->GetIndexInfo().GetPrimaryKeyColumns();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,11 @@ bool TPortionDataSource::DoStartFetchingAccessor(const std::shared_ptr<NCommon::
bool TPortionDataSource::DoAddTxConflict() {
auto state = GetContext()->GetPortionStateAtScanStart(this->GetPortionInfo());
if (state.Committed) {
GetContext()->GetReadMetadata()->SetBrokenWithCommitted();
GetContext()->GetReadMetadata()->SetBreakLockOnReadFinished();
return true;
} else if (!state.IsMyUncommitted()) {
const auto* wPortion = static_cast<const TWrittenPortionInfo*>(Portion.get());
GetContext()->GetReadMetadata()->SetConflictedWriteId(wPortion->GetInsertWriteId());
GetContext()->GetReadMetadata()->SetWriteConflicting(wPortion->GetInsertWriteId());
return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,11 +465,11 @@ TConclusion<bool> TPortionDataSource::DoStartReserveMemory(const NArrow::NSSA::T
bool TPortionDataSource::DoAddTxConflict() {
auto state = GetContext()->GetPortionStateAtScanStart(this->GetPortionInfo());
if (state.Committed) {
GetContext()->GetReadMetadata()->SetBrokenWithCommitted();
GetContext()->GetReadMetadata()->SetBreakLockOnReadFinished();
return true;
} else if (!state.IsMyUncommitted()) {
const auto* wPortion = static_cast<const TWrittenPortionInfo*>(Portion.get());
GetContext()->GetReadMetadata()->SetConflictedWriteId(wPortion->GetInsertWriteId());
GetContext()->GetReadMetadata()->SetWriteConflicting(wPortion->GetInsertWriteId());
return true;
}
return false;
Expand Down
27 changes: 14 additions & 13 deletions ydb/core/tx/columnshard/operations/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,15 @@ bool TOperationsManager::Load(NTabletFlatExecutor::TTransactionContext& txc) {
}

void TOperationsManager::BreakConflictingTxs(const TLockFeatures& lock) {
for (auto&& i : lock.GetBrokeOnCommit()) {
if (auto lockNotify = GetLockOptional(i)) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_TX)("broken_lock_id", i);
lockNotify->SetBroken();
for (auto&& lockIdToBreak : lock.GetBreakOnCommit()) {
if (auto lockToBreak = GetLockOptional(lockIdToBreak)) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_TX)("broken_lock_id", lockIdToBreak);
lockToBreak->SetBroken();
}
}
for (auto&& i : lock.GetNotifyOnCommit()) {
if (auto lockNotify = GetLockOptional(i)) {
lockNotify->AddNotifyCommit(lock.GetLockId());
for (auto&& lockIdToNotify : lock.GetNotifyOnCommit()) {
if (auto lockToNotify = GetLockOptional(lockIdToNotify)) {
lockToNotify->NotifyAboutCommit(lock.GetLockId());
}
}
}
Expand Down Expand Up @@ -328,16 +328,17 @@ void TOperationsManager::AddEventForLock(
auto& txLock = GetLockVerified(lockId);
writer->CheckInteraction(lockId, InteractionsContext, txConflicts, txNotifications);
for (auto& [commitLockId, breakLockIds] : txConflicts) {
// if commitLockId not found, it means the conflicting tx is already committed or aborted
if (GetLockOptional(commitLockId)) {
GetLockVerified(commitLockId).AddBrokeOnCommit(breakLockIds);
// if txId not found, it means the conflicting tx is already committed or aborted
} else if (txLock.IsCommitted(commitLockId)) {
// if the conflicting tx is already committed, we cannot commit the given tx, so break its lock
GetLockVerified(commitLockId).AddBreakOnCommit(breakLockIds);
}
// if the conflicting tx is already committed, we cannot commit the given tx, so break its lock
if (txLock.IsCommitted(commitLockId)) {
txLock.SetBroken();
}
}
for (auto&& i : txNotifications) {
GetLockVerified(i.first).AddNotificationsOnCommit(i.second);
for (auto& [commitLockId, lockIdsToNotify] : txNotifications) {
GetLockVerified(commitLockId).AddNotifyOnCommit(lockIdsToNotify);
}
if (auto txEvent = writer->BuildEvent()) {
NOlap::NTxInteractions::TTxEventContainer container(lockId, txEvent);
Expand Down
14 changes: 8 additions & 6 deletions ydb/core/tx/columnshard/operations/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class TLockFeatures: TMoveOnly {
YDB_READONLY_DEF(std::vector<NOlap::NTxInteractions::TTxEventContainer>, Events);
std::shared_ptr<TLockSharingInfo> SharingInfo;

YDB_READONLY_DEF(THashSet<ui64>, BrokeOnCommit);
YDB_READONLY_DEF(THashSet<ui64>, BreakOnCommit);
YDB_READONLY_DEF(THashSet<ui64>, NotifyOnCommit);
YDB_READONLY_DEF(THashSet<ui64>, Committed);
YDB_READONLY_DEF(TPositiveControlInteger, OperationsInProgress);
Expand Down Expand Up @@ -146,16 +146,18 @@ class TLockFeatures: TMoveOnly {
return Committed.contains(lockId);
}

void AddNotifyCommit(const ui64 lockId) {
AFL_VERIFY(NotifyOnCommit.erase(lockId));
/*
Let the given `TLockFeatures` know that the transaction with `lockId` has committed
*/
void NotifyAboutCommit(const ui64 lockId) {
Committed.emplace(lockId);
}

void AddBrokeOnCommit(const THashSet<ui64>& lockIds) {
BrokeOnCommit.insert(lockIds.begin(), lockIds.end());
void AddBreakOnCommit(const THashSet<ui64>& lockIds) {
BreakOnCommit.insert(lockIds.begin(), lockIds.end());
}

void AddNotificationsOnCommit(const THashSet<ui64>& lockIds) {
void AddNotifyOnCommit(const THashSet<ui64>& lockIds) {
NotifyOnCommit.insert(lockIds.begin(), lockIds.end());
}

Expand Down
34 changes: 17 additions & 17 deletions ydb/core/tx/columnshard/transactions/locks/abstract.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ class ITxEvent {
using TProto = NKikimrColumnShardTxProto::TEvent;

protected:
virtual void DoAddToInteraction(const ui64 txId, TInteractionsContext& context) const = 0;
virtual void DoRemoveFromInteraction(const ui64 txId, TInteractionsContext& context) const = 0;
virtual void DoAddToInteraction(const ui64 lockId, TInteractionsContext& context) const = 0;
virtual void DoRemoveFromInteraction(const ui64 lockId, TInteractionsContext& context) const = 0;
virtual bool DoDeserializeFromProto(const NKikimrColumnShardTxProto::TEvent& proto) = 0;
virtual void DoSerializeToProto(NKikimrColumnShardTxProto::TEvent& proto) const = 0;

Expand All @@ -43,57 +43,57 @@ class ITxEvent {
DoSerializeToProto(proto);
}

void AddToInteraction(const ui64 txId, TInteractionsContext& context) const {
return DoAddToInteraction(txId, context);
void AddToInteraction(const ui64 lockId, TInteractionsContext& context) const {
return DoAddToInteraction(lockId, context);
}

void RemoveFromInteraction(const ui64 txId, TInteractionsContext& context) const {
return DoRemoveFromInteraction(txId, context);
void RemoveFromInteraction(const ui64 lockId, TInteractionsContext& context) const {
return DoRemoveFromInteraction(lockId, context);
}
};

class TTxEventContainer: public NBackgroundTasks::TInterfaceProtoContainer<ITxEvent> {
private:
using TBase = NBackgroundTasks::TInterfaceProtoContainer<ITxEvent>;
YDB_READONLY(ui64, TxId, 0);
YDB_READONLY(ui64, LockId, 0);

public:
void AddToInteraction(TInteractionsContext& context) const {
return GetObjectVerified().AddToInteraction(TxId, context);
return GetObjectVerified().AddToInteraction(LockId, context);
}

void RemoveFromInteraction(TInteractionsContext& context) const {
return GetObjectVerified().RemoveFromInteraction(TxId, context);
return GetObjectVerified().RemoveFromInteraction(LockId, context);
}

TTxEventContainer(const ui64 txId, const std::shared_ptr<ITxEvent>& txEvent)
TTxEventContainer(const ui64 lockId, const std::shared_ptr<ITxEvent>& txEvent)
: TBase(txEvent)
, TxId(txId) {
, LockId(lockId) {
}

TTxEventContainer(const ui64 txId)
: TxId(txId) {
TTxEventContainer(const ui64 lockId)
: LockId(lockId) {
}

bool operator<(const TTxEventContainer& item) const {
return TxId < item.TxId;
return LockId < item.LockId;
}
};

class ITxEventWriter {
protected:
virtual bool DoCheckInteraction(
const ui64 selfTxId, TInteractionsContext& context, TTxConflicts& conflicts, TTxConflicts& notifications) const = 0;
const ui64 selfLockId, TInteractionsContext& context, TTxConflicts& conflicts, TTxConflicts& notifications) const = 0;
virtual std::shared_ptr<ITxEvent> DoBuildEvent() = 0;

public:
ITxEventWriter() = default;
virtual ~ITxEventWriter() = default;

bool CheckInteraction(const ui64 selfTxId, TInteractionsContext& context, TTxConflicts& conflicts, TTxConflicts& notifications) const {
bool CheckInteraction(const ui64 selfLockId, TInteractionsContext& context, TTxConflicts& conflicts, TTxConflicts& notifications) const {
TTxConflicts conflictsResult;
TTxConflicts notificationsResult;
const bool result = DoCheckInteraction(selfTxId, context, conflictsResult, notificationsResult);
const bool result = DoCheckInteraction(selfLockId, context, conflictsResult, notificationsResult);
std::swap(conflictsResult, conflicts);
std::swap(notificationsResult, notifications);
return result;
Expand Down
18 changes: 9 additions & 9 deletions ydb/core/tx/columnshard/transactions/locks/interaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -414,37 +414,37 @@ class TInteractionsContext {
return result;
}

THashSet<ui64> GetAffectedTxIds(const TInternalPathId pathId, const std::shared_ptr<arrow::RecordBatch>& batch) const {
THashSet<ui64> GetAffectedLockIds(const TInternalPathId pathId, const std::shared_ptr<arrow::RecordBatch>& batch) const {
auto it = ReadIntervalsByPathId.find(pathId);
if (it == ReadIntervalsByPathId.end()) {
return {};
}
return it->second.GetAffectedTxIds(batch);
}

void AddInterval(const ui64 txId, const TInternalPathId pathId, const TIntervalPoint& from, const TIntervalPoint& to) {
void AddInterval(const ui64 lockId, const TInternalPathId pathId, const TIntervalPoint& from, const TIntervalPoint& to) {
auto& intervals = ReadIntervalsByPathId[pathId];
auto itFrom = intervals.InsertPoint(from);
auto itTo = intervals.InsertPoint(to);
itFrom->second.AddStart(txId, from.IsIncluded());
itFrom->second.AddStart(lockId, from.IsIncluded());
for (auto it = itFrom; it != itTo; ++it) {
it->second.AddIntervalTx(txId);
it->second.AddIntervalTx(lockId);
}
itTo->second.AddFinish(txId, to.IsIncluded());
itTo->second.AddFinish(lockId, to.IsIncluded());
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "add_interval")("interactions_info", DebugJson().GetStringRobust());
}

void RemoveInterval(const ui64 txId, const TInternalPathId pathId, const TIntervalPoint& from, const TIntervalPoint& to) {
void RemoveInterval(const ui64 lockId, const TInternalPathId pathId, const TIntervalPoint& from, const TIntervalPoint& to) {
auto itIntervals = ReadIntervalsByPathId.find(pathId);
AFL_VERIFY(itIntervals != ReadIntervalsByPathId.end())("path_id", pathId);
auto& intervals = itIntervals->second;
auto itFrom = intervals.GetPointIterator(from);
auto itTo = intervals.GetPointIterator(to);
itFrom->second.RemoveStart(txId, from.IsIncluded());
itFrom->second.RemoveStart(lockId, from.IsIncluded());
for (auto it = itFrom; it != itTo; ++it) {
it->second.RemoveIntervalTx(txId);
it->second.RemoveIntervalTx(lockId);
}
itTo->second.RemoveFinish(txId, to.IsIncluded());
itTo->second.RemoveFinish(lockId, to.IsIncluded());
for (auto&& it = itFrom; it != itTo;) {
if (it->second.IsEmpty()) {
it = intervals.Erase(it);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class TEvReadFinishedWriter: public ITxEventWriter {
TTxConflicts Conflicts;

virtual bool DoCheckInteraction(
const ui64 /*selfTxId*/, TInteractionsContext& /*context*/, TTxConflicts& conflicts, TTxConflicts& /*notifications*/) const override {
const ui64 /*selfLockId*/, TInteractionsContext& /*context*/, TTxConflicts& conflicts, TTxConflicts& /*notifications*/) const override {
conflicts = Conflicts;
return true;
}
Expand Down
Loading
Loading