diff --git a/cloud/blockstore/config/storage.proto b/cloud/blockstore/config/storage.proto index ad82a894ac7..2730433783e 100644 --- a/cloud/blockstore/config/storage.proto +++ b/cloud/blockstore/config/storage.proto @@ -1107,4 +1107,7 @@ message TStorageServiceConfig // When enabled, tag "use-intermediate-write-buffer" will be added // after scrubbing finds a mismatch optional bool AutomaticallyEnableBufferCopyingAfterChecksumMismatch = 403; + + // Enabling direct sending AcquireDevices to disk agent. + optional bool NonReplicatedVolumeDirectAcquireEnabled = 404; } diff --git a/cloud/blockstore/libs/storage/core/config.cpp b/cloud/blockstore/libs/storage/core/config.cpp index 2df913d52bf..698249d729a 100644 --- a/cloud/blockstore/libs/storage/core/config.cpp +++ b/cloud/blockstore/libs/storage/core/config.cpp @@ -494,6 +494,7 @@ TDuration MSeconds(ui32 value) xxx(VolumeProxyCacheRetryDuration, TDuration, Seconds(15) )\ \ xxx(UseDirectCopyRange, bool, false )\ + xxx(NonReplicatedVolumeDirectAcquireEnabled, bool, false )\ xxx(MaxShadowDiskFillBandwidth, ui32, 512 )\ xxx(MaxShadowDiskFillIoDepth, ui32, 1 )\ xxx(BackgroundOperationsTotalBandwidth, ui32, 1024 )\ diff --git a/cloud/blockstore/libs/storage/core/config.h b/cloud/blockstore/libs/storage/core/config.h index 440b68ef0ae..bef384c063c 100644 --- a/cloud/blockstore/libs/storage/core/config.h +++ b/cloud/blockstore/libs/storage/core/config.h @@ -633,6 +633,7 @@ class TStorageConfig bool GetYdbViewerServiceEnabled() const; bool GetAutomaticallyEnableBufferCopyingAfterChecksumMismatch() const; + [[nodiscard]] bool GetNonReplicatedVolumeDirectAcquireEnabled() const; }; ui64 GetAllocationUnit( diff --git a/cloud/blockstore/libs/storage/core/proto_helpers.cpp b/cloud/blockstore/libs/storage/core/proto_helpers.cpp index 4f1c472d4f7..51a23fb1e7d 100644 --- a/cloud/blockstore/libs/storage/core/proto_helpers.cpp +++ b/cloud/blockstore/libs/storage/core/proto_helpers.cpp @@ -444,4 +444,15 @@ ui64 GetVolumeRequestId(const TEvDiskAgent::TEvZeroDeviceBlocksRequest& request) return request.Record.GetVolumeRequestId(); } +TString LogDevices(const TVector& devices) +{ + TStringBuilder sb; + sb << "( "; + for (const auto& d: devices) { + sb << d.GetDeviceUUID() << "@" << d.GetAgentId() << " "; + } + sb << ")"; + return sb; +} + } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/core/proto_helpers.h b/cloud/blockstore/libs/storage/core/proto_helpers.h index 0fbecf7cbc8..f8ecfb90327 100644 --- a/cloud/blockstore/libs/storage/core/proto_helpers.h +++ b/cloud/blockstore/libs/storage/core/proto_helpers.h @@ -223,4 +223,5 @@ TBlockRange64 BuildRequestBlockRange( ui64 GetVolumeRequestId(const TEvDiskAgent::TEvWriteDeviceBlocksRequest& request); ui64 GetVolumeRequestId(const TEvDiskAgent::TEvZeroDeviceBlocksRequest& request); +TString LogDevices(const TVector& devices); } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.cpp index 5f251d85cb0..72ac69b5d23 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.cpp +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.cpp @@ -879,19 +879,6 @@ bool ToLogicalBlocks(NProto::TDeviceConfig& device, ui32 logicalBlockSize) //////////////////////////////////////////////////////////////////////////////// -TString LogDevices(const TVector& devices) -{ - TStringBuilder sb; - sb << "( "; - for (const auto& d: devices) { - sb << d.GetDeviceUUID() << "@" << d.GetAgentId() << " "; - } - sb << ")"; - return sb; -} - -//////////////////////////////////////////////////////////////////////////////// - void TDiskRegistryActor::OnDiskAcquired( TVector sentAcquireRequests) { @@ -943,6 +930,9 @@ void TDiskRegistryActor::SendCachedAcquireRequestsToAgent( const TActorContext& ctx, const NProto::TAgentConfig& config) { + if (Config->GetNonReplicatedVolumeDirectAcquireEnabled()) { + return; + } auto& acquireCacheByAgentId = State->GetAcquireCacheByAgentId(); auto cacheIt = acquireCacheByAgentId.find(config.GetAgentId()); if (cacheIt == acquireCacheByAgentId.end()) { diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.h b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.h index 951be732909..cf04576b522 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.h +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.h @@ -515,6 +515,4 @@ class TDiskRegistryActor final TDiskRegistryStateSnapshot MakeNewLoadState( NProto::TDiskRegistryStateBackup&& backup); bool ToLogicalBlocks(NProto::TDeviceConfig& device, ui32 logicalBlockSize); -TString LogDevices(const TVector& devices); - } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire.cpp index f7d78f8ef35..6280e97df91 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire.cpp +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire.cpp @@ -1,6 +1,7 @@ #include "disk_registry_actor.h" #include +#include #include diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_release.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_release.cpp index 2cff5a6d1a6..2ed2c92a084 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_release.cpp +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_release.cpp @@ -1,4 +1,5 @@ #include "disk_registry_actor.h" +#include #include diff --git a/cloud/blockstore/libs/storage/testlib/disk_agent_mock.h b/cloud/blockstore/libs/storage/testlib/disk_agent_mock.h index 35c4fd724cf..8b9ee4d1edd 100644 --- a/cloud/blockstore/libs/storage/testlib/disk_agent_mock.h +++ b/cloud/blockstore/libs/storage/testlib/disk_agent_mock.h @@ -100,6 +100,8 @@ class TDiskAgentMock final HFunc(TEvDiskAgent::TEvZeroDeviceBlocksRequest, HandleZeroDeviceBlocks); HFunc(TEvDiskAgent::TEvChecksumDeviceBlocksRequest, HandleChecksumDeviceBlocks); HFunc(TEvDiskAgent::TEvDirectCopyBlocksRequest, HandleDirectCopyBlocks); + HFunc(TEvDiskAgent::TEvAcquireDevicesRequest, HandleAcquireDevicesRequest); + HFunc(TEvDiskAgent::TEvReleaseDevicesRequest, HandleReleaseDevicesRequest); default: Y_ABORT("Unexpected event %x", ev->GetTypeRewrite()); @@ -315,6 +317,26 @@ class TDiskAgentMock final { State->CreateDirectCopyActorFunc(ev, ctx, SelfId()); } + + void HandleAcquireDevicesRequest( + TEvDiskAgent::TEvAcquireDevicesRequest::TPtr& ev, + const NActors::TActorContext& ctx) + { + auto response = + std::make_unique(); + + Reply(ctx, *ev, std::move(response)); + } + + void HandleReleaseDevicesRequest( + TEvDiskAgent::TEvReleaseDevicesRequest::TPtr& ev, + const NActors::TActorContext& ctx) + { + auto response = + std::make_unique(); + + Reply(ctx, *ev, std::move(response)); + } }; } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/volume/ut/ya.make b/cloud/blockstore/libs/storage/volume/ut/ya.make index ca5b81b744c..17aba5b89b7 100644 --- a/cloud/blockstore/libs/storage/volume/ut/ya.make +++ b/cloud/blockstore/libs/storage/volume/ut/ya.make @@ -7,6 +7,7 @@ SRCS( volume_state_ut.cpp volume_ut.cpp volume_ut_checkpoint.cpp + volume_ut_session.cpp volume_ut_stats.cpp ) diff --git a/cloud/blockstore/libs/storage/volume/volume_actor.cpp b/cloud/blockstore/libs/storage/volume/volume_actor.cpp index 2f5901fbd29..c533063cbfa 100644 --- a/cloud/blockstore/libs/storage/volume/volume_actor.cpp +++ b/cloud/blockstore/libs/storage/volume/volume_actor.cpp @@ -1052,6 +1052,9 @@ STFUNC(TVolumeActor::StateWork) HFunc( TEvDiskRegistry::TEvAcquireDiskResponse, HandleAcquireDiskResponse); + HFunc( + TEvVolumePrivate::TEvDevicesAcquireFinished, + HandleDevicesAcquireFinished); HFunc( TEvVolumePrivate::TEvAcquireDiskIfNeeded, HandleAcquireDiskIfNeeded); @@ -1060,6 +1063,9 @@ STFUNC(TVolumeActor::StateWork) HFunc( TEvDiskRegistry::TEvReleaseDiskResponse, HandleReleaseDiskResponse); + HFunc( + TEvVolumePrivate::TEvDevicesReleaseFinished, + HandleDevicesReleasedFinished); HFunc( TEvDiskRegistry::TEvAllocateDiskResponse, HandleAllocateDiskResponse); diff --git a/cloud/blockstore/libs/storage/volume/volume_actor.h b/cloud/blockstore/libs/storage/volume/volume_actor.h index c44e9873e88..4cd8bea8a3c 100644 --- a/cloud/blockstore/libs/storage/volume/volume_actor.h +++ b/cloud/blockstore/libs/storage/volume/volume_actor.h @@ -716,12 +716,26 @@ class TVolumeActor final const TEvDiskRegistry::TEvAcquireDiskResponse::TPtr& ev, const NActors::TActorContext& ctx); + void HandleDevicesAcquireFinishedImpl( + const NProto::TError& error, + const NActors::TActorContext& ctx); + void AcquireDisk( const NActors::TActorContext& ctx, TString clientId, NProto::EVolumeAccessMode accessMode, ui64 mountSeqNumber); + void SendAcquireDevicesToAgents( + TString clientId, + NProto::EVolumeAccessMode accessMode, + ui64 mountSeqNumber, + const NActors::TActorContext& ctx); + + void HandleDevicesAcquireFinished( + const TEvVolumePrivate::TEvDevicesAcquireFinished::TPtr& ev, + const NActors::TActorContext& ctx); + void AcquireDiskIfNeeded(const NActors::TActorContext& ctx); void ScheduleAcquireDiskIfNeeded(const NActors::TActorContext& ctx); @@ -742,8 +756,20 @@ class TVolumeActor final const TEvDiskRegistry::TEvReleaseDiskResponse::TPtr& ev, const NActors::TActorContext& ctx); + void HandleDevicesReleasedFinishedImpl( + const NProto::TError& error, + const NActors::TActorContext& ctx); + void ReleaseDisk(const NActors::TActorContext& ctx, const TString& clientId); + void SendReleaseDevicesToAgents( + const TString& clientId, + const NActors::TActorContext& ctx); + + void HandleDevicesReleasedFinished( + const TEvVolumePrivate::TEvDevicesReleaseFinished::TPtr& ev, + const NActors::TActorContext& ctx); + void HandleAllocateDiskResponse( const TEvDiskRegistry::TEvAllocateDiskResponse::TPtr& ev, const NActors::TActorContext& ctx); diff --git a/cloud/blockstore/libs/storage/volume/volume_actor_acquire.cpp b/cloud/blockstore/libs/storage/volume/volume_actor_acquire.cpp new file mode 100644 index 00000000000..5dba7995a46 --- /dev/null +++ b/cloud/blockstore/libs/storage/volume/volume_actor_acquire.cpp @@ -0,0 +1,383 @@ +#include "volume_actor.h" + +#include + +#include +#include + +#include +#include + +namespace NCloud::NBlockStore::NStorage { + +using namespace NActors; + +using namespace NKikimr::NTabletFlatExecutor; + +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +class TAcquireDevicesActor final + : public TActorBootstrapped +{ +private: + const TActorId Owner; + TVector Devices; + const TString DiskId; + const TString ClientId; + const NProto::EVolumeAccessMode AccessMode; + const ui64 MountSeqNumber; + const ui32 VolumeGeneration; + const TDuration RequestTimeout; + const bool MuteIOErrors; + + int PendingRequests = 0; + +public: + TAcquireDevicesActor( + const TActorId& owner, + TVector devices, + TString diskId, + TString clientId, + NProto::EVolumeAccessMode accessMode, + ui64 mountSeqNumber, + ui32 volumeGeneration, + TDuration requestTimeout, + bool muteIOErrors); + + void Bootstrap(const TActorContext& ctx); + +private: + void PrepareRequest(NProto::TAcquireDevicesRequest& request) const; + void PrepareRequest(NProto::TReleaseDevicesRequest& request) const; + + void ReplyAndDie(const TActorContext& ctx, NProto::TError error); + + void OnAcquireResponse( + const TActorContext& ctx, + ui32 nodeId, + NProto::TError error); + + template + struct TSentRequest + { + TString AgentId; + ui32 NodeId = 0; + decltype(TRequest::Record) Record; + }; + + template + TVector> CreateRequests() const; + + template + void SendRequests( + const TActorContext& ctx, + const TVector>& requests); + +private: + STFUNC(StateAcquire); + + void HandlePoisonPill( + const TEvents::TEvPoisonPill::TPtr& ev, + const TActorContext& ctx); + + void HandleAcquireDevicesResponse( + const TEvDiskAgent::TEvAcquireDevicesResponse::TPtr& ev, + const TActorContext& ctx); + + void HandleAcquireDevicesUndelivery( + const TEvDiskAgent::TEvAcquireDevicesRequest::TPtr& ev, + const TActorContext& ctx); + + void HandleWakeup( + const TEvents::TEvWakeup::TPtr& ev, + const TActorContext& ctx); + + TString LogTargets() const; +}; + +//////////////////////////////////////////////////////////////////////////////// + +TAcquireDevicesActor::TAcquireDevicesActor( + const TActorId& owner, + TVector devices, + TString diskId, + TString clientId, + NProto::EVolumeAccessMode accessMode, + ui64 mountSeqNumber, + ui32 volumeGeneration, + TDuration requestTimeout, + bool muteIOErrors) + : Owner(owner) + , Devices(std::move(devices)) + , DiskId(std::move(diskId)) + , ClientId(std::move(clientId)) + , AccessMode(accessMode) + , MountSeqNumber(mountSeqNumber) + , VolumeGeneration(volumeGeneration) + , RequestTimeout(requestTimeout) + , MuteIOErrors(muteIOErrors) +{ + SortBy(Devices, [](auto& d) { return d.GetNodeId(); }); +} + +void TAcquireDevicesActor::Bootstrap(const TActorContext& ctx) +{ + Become(&TThis::StateAcquire); + + if (Devices.empty()) { + ReplyAndDie(ctx, {}); + return; + } + + ctx.Schedule(RequestTimeout, new TEvents::TEvWakeup()); + + LOG_DEBUG( + ctx, + TBlockStoreComponents::VOLUME, + "[%s] Sending acquire devices requests for disk %s, targets %s", + ClientId.c_str(), + DiskId.c_str(), + LogTargets().c_str()); + + SendRequests(ctx, CreateRequests()); +} + +void TAcquireDevicesActor::ReplyAndDie( + const TActorContext& ctx, + NProto::TError error) +{ + using TType = TEvVolumePrivate::TEvDevicesAcquireFinished; + + if (HasError(error)) { + LOG_ERROR( + ctx, + TBlockStoreComponents::VOLUME, + "[%s] AcquireDevices %s targets %s error: %s", + ClientId.c_str(), + DiskId.c_str(), + LogTargets().c_str(), + FormatError(error).c_str()); + } + + NCloud::Send(ctx, Owner, std::make_unique(std::move(error))); + + Die(ctx); +} + +void TAcquireDevicesActor::PrepareRequest( + NProto::TAcquireDevicesRequest& request) const +{ + request.MutableHeaders()->SetClientId(ClientId); + request.SetAccessMode(AccessMode); + request.SetMountSeqNumber(MountSeqNumber); + request.SetDiskId(DiskId); + request.SetVolumeGeneration(VolumeGeneration); +} + +void TAcquireDevicesActor::PrepareRequest( + NProto::TReleaseDevicesRequest& request) const +{ + request.MutableHeaders()->SetClientId(ClientId); +} + +template +auto TAcquireDevicesActor::CreateRequests() const + -> TVector> +{ + auto it = Devices.begin(); + TVector> requests; + while (it != Devices.end()) { + const ui32 nodeId = it->GetNodeId(); + + auto& request = requests.emplace_back(); + request.AgentId = it->GetAgentId(); + request.NodeId = nodeId; + PrepareRequest(request.Record); + + for (; it != Devices.end() && it->GetNodeId() == nodeId; ++it) { + *request.Record.AddDeviceUUIDs() = it->GetDeviceUUID(); + } + } + return requests; +} + +template +void TAcquireDevicesActor::SendRequests( + const TActorContext& ctx, + const TVector>& requests) +{ + PendingRequests = 0; + + for (const auto& r: requests) { + auto request = std::make_unique(TCallContextPtr{}, r.Record); + + LOG_DEBUG( + ctx, + TBlockStoreComponents::VOLUME, + "[%s] Send an acquire request to node #%d. Devices: %s", + ClientId.c_str(), + r.NodeId, + JoinSeq(", ", request->Record.GetDeviceUUIDs()).c_str()); + + auto event = std::make_unique( + MakeDiskAgentServiceId(r.NodeId), + ctx.SelfID, + request.release(), + IEventHandle::FlagForwardOnNondelivery, + r.NodeId, + &ctx.SelfID // forwardOnNondelivery + ); + + ctx.Send(event.release()); + + ++PendingRequests; + } +} + +//////////////////////////////////////////////////////////////////////////////// + +void TAcquireDevicesActor::HandlePoisonPill( + const TEvents::TEvPoisonPill::TPtr& ev, + const TActorContext& ctx) +{ + Y_UNUSED(ev); + + ReplyAndDie(ctx, MakeError(E_REJECTED, "Tablet is dead")); +} + +void TAcquireDevicesActor::OnAcquireResponse( + const TActorContext& ctx, + ui32 nodeId, + NProto::TError error) +{ + Y_ABORT_UNLESS(PendingRequests > 0); + + if (HasError(error) && !MuteIOErrors) { + LOG_ERROR( + ctx, + TBlockStoreComponents::VOLUME, + "[%s] AcquireDevices on the node #%d %s error: %s", + ClientId.c_str(), + nodeId, + LogTargets().c_str(), + FormatError(error).c_str()); + + if (GetErrorKind(error) != EErrorKind::ErrorRetriable) { + LOG_DEBUG( + ctx, + TBlockStoreComponents::VOLUME, + "[%s] Canceling acquire operation for disk %s, targets %s", + ClientId.c_str(), + DiskId.c_str(), + LogTargets().c_str()); + + SendRequests( + ctx, + CreateRequests()); + } + + ReplyAndDie(ctx, std::move(error)); + + return; + } + + if (--PendingRequests == 0) { + ReplyAndDie(ctx, {}); + } +} + +void TAcquireDevicesActor::HandleAcquireDevicesResponse( + const TEvDiskAgent::TEvAcquireDevicesResponse::TPtr& ev, + const TActorContext& ctx) +{ + OnAcquireResponse( + ctx, + SafeIntegerCast(ev->Cookie), + ev->Get()->GetError()); +} + +void TAcquireDevicesActor::HandleAcquireDevicesUndelivery( + const TEvDiskAgent::TEvAcquireDevicesRequest::TPtr& ev, + const TActorContext& ctx) +{ + OnAcquireResponse( + ctx, + SafeIntegerCast(ev->Cookie), + MakeError(E_REJECTED, "not delivered")); +} + +void TAcquireDevicesActor::HandleWakeup( + const TEvents::TEvWakeup::TPtr& ev, + const TActorContext& ctx) +{ + OnAcquireResponse( + ctx, + SafeIntegerCast(ev->Cookie), + MakeError(E_REJECTED, "timeout")); +} + +//////////////////////////////////////////////////////////////////////////////// + +TString TAcquireDevicesActor::LogTargets() const +{ + return LogDevices(Devices); +} + +//////////////////////////////////////////////////////////////////////////////// + +STFUNC(TAcquireDevicesActor::StateAcquire) +{ + switch (ev->GetTypeRewrite()) { + HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); + + HFunc( + TEvDiskAgent::TEvAcquireDevicesResponse, + HandleAcquireDevicesResponse); + HFunc( + TEvDiskAgent::TEvAcquireDevicesRequest, + HandleAcquireDevicesUndelivery); + + HFunc(TEvents::TEvWakeup, HandleWakeup); + + default: + HandleUnexpectedEvent(ev, TBlockStoreComponents::VOLUME); + break; + } +} + +} // namespace + +//////////////////////////////////////////////////////////////////////////////// + +void TVolumeActor::SendAcquireDevicesToAgents( + TString clientId, + NProto::EVolumeAccessMode accessMode, + ui64 mountSeqNumber, + const TActorContext& ctx) +{ + auto devices = State->GetAllDevicesForAcquireRelease(); + + auto actor = NCloud::Register( + ctx, + ctx.SelfID, + std::move(devices), + State->GetDiskId(), + std::move(clientId), + accessMode, + mountSeqNumber, + Executor()->Generation(), + Config->GetAgentRequestTimeout(), + State->GetMeta().GetMuteIOErrors()); + Actors.insert(actor); +} + +void TVolumeActor::HandleDevicesAcquireFinished( + const TEvVolumePrivate::TEvDevicesAcquireFinished::TPtr& ev, + const TActorContext& ctx) +{ + HandleDevicesAcquireFinishedImpl(ev->Get()->GetError(), ctx); +} + +} // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/volume/volume_actor_addclient.cpp b/cloud/blockstore/libs/storage/volume/volume_actor_addclient.cpp index dff49881dea..99d44e17495 100644 --- a/cloud/blockstore/libs/storage/volume/volume_actor_addclient.cpp +++ b/cloud/blockstore/libs/storage/volume/volume_actor_addclient.cpp @@ -40,6 +40,15 @@ void TVolumeActor::AcquireDisk( "Acquiring disk " << State->GetDiskId() ); + if (Config->GetNonReplicatedVolumeDirectAcquireEnabled()) { + SendAcquireDevicesToAgents( + std::move(clientId), + accessMode, + mountSeqNumber, + ctx); + return; + } + auto request = std::make_unique(); request->Record.SetDiskId(State->GetDiskId()); @@ -178,6 +187,13 @@ void TVolumeActor::HandleAcquireDiskResponse( // agents auto& record = msg->Record; + HandleDevicesAcquireFinishedImpl(record.GetError(), ctx); +} + +void TVolumeActor::HandleDevicesAcquireFinishedImpl( + const NProto::TError& error, + const NActors::TActorContext& ctx) +{ ScheduleAcquireDiskIfNeeded(ctx); if (AcquireReleaseDiskRequests.empty()) { @@ -193,7 +209,7 @@ void TVolumeActor::HandleAcquireDiskResponse( auto& request = AcquireReleaseDiskRequests.front(); auto& cr = request.ClientRequest; - if (HasError(record.GetError())) { + if (HasError(error)) { LOG_DEBUG_S( ctx, TBlockStoreComponents::VOLUME, @@ -201,8 +217,8 @@ void TVolumeActor::HandleAcquireDiskResponse( ); if (cr) { - auto response = std::make_unique( - record.GetError()); + auto response = + std::make_unique(error); response->Record.MutableVolume()->SetDiskId(cr->DiskId); response->Record.SetClientId(cr->GetClientId()); response->Record.SetTabletId(TabletID()); diff --git a/cloud/blockstore/libs/storage/volume/volume_actor_release.cpp b/cloud/blockstore/libs/storage/volume/volume_actor_release.cpp new file mode 100644 index 00000000000..6e3debf3422 --- /dev/null +++ b/cloud/blockstore/libs/storage/volume/volume_actor_release.cpp @@ -0,0 +1,284 @@ +#include "volume_actor.h" +#include + +#include +#include + +namespace NCloud::NBlockStore::NStorage { + +using namespace NActors; + +using namespace NKikimr::NTabletFlatExecutor; + +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +class TReleaseDevicesActor final + : public TActorBootstrapped +{ +private: + const TActorId Owner; + const TString DiskId; + const TString ClientId; + const ui32 VolumeGeneration; + const TDuration RequestTimeout; + const bool MuteIOErrors; + + TVector Devices; + int PendingRequests = 0; + +public: + TReleaseDevicesActor( + const TActorId& owner, + TString diskId, + TString clientId, + ui32 volumeGeneration, + TDuration requestTimeout, + TVector devices, + bool muteIOErrors); + + void Bootstrap(const TActorContext& ctx); + +private: + void PrepareRequest(NProto::TReleaseDevicesRequest& request); + void ReplyAndDie(const TActorContext& ctx, NProto::TError error); + + void OnReleaseResponse( + const TActorContext& ctx, + ui64 cookie, + NProto::TError error); + +private: + STFUNC(StateWork); + + void HandlePoisonPill( + const TEvents::TEvPoisonPill::TPtr& ev, + const TActorContext& ctx); + + void HandleReleaseDevicesResponse( + const TEvDiskAgent::TEvReleaseDevicesResponse::TPtr& ev, + const TActorContext& ctx); + + void HandleReleaseDevicesUndelivery( + const TEvDiskAgent::TEvReleaseDevicesRequest::TPtr& ev, + const TActorContext& ctx); + + void HandleTimeout( + const TEvents::TEvWakeup::TPtr& ev, + const TActorContext& ctx); + + TString LogTargets() const; +}; + +//////////////////////////////////////////////////////////////////////////////// + +TReleaseDevicesActor::TReleaseDevicesActor( + const TActorId& owner, + TString diskId, + TString clientId, + ui32 volumeGeneration, + TDuration requestTimeout, + TVector devices, + bool muteIOErrors) + : Owner(owner) + , DiskId(std::move(diskId)) + , ClientId(std::move(clientId)) + , VolumeGeneration(volumeGeneration) + , RequestTimeout(requestTimeout) + , MuteIOErrors(muteIOErrors) + , Devices(std::move(devices)) +{} + +void TReleaseDevicesActor::PrepareRequest(NProto::TReleaseDevicesRequest& request) +{ + request.MutableHeaders()->SetClientId(ClientId); + request.SetDiskId(DiskId); + request.SetVolumeGeneration(VolumeGeneration); +} + +void TReleaseDevicesActor::Bootstrap(const TActorContext& ctx) +{ + Become(&TThis::StateWork); + + SortBy(Devices, [](auto& d) { return d.GetNodeId(); }); + + auto it = Devices.begin(); + while (it != Devices.end()) { + auto request = + std::make_unique(); + PrepareRequest(request->Record); + + const ui32 nodeId = it->GetNodeId(); + + for (; it != Devices.end() && it->GetNodeId() == nodeId; ++it) { + *request->Record.AddDeviceUUIDs() = it->GetDeviceUUID(); + } + + ++PendingRequests; + NCloud::Send( + ctx, + MakeDiskAgentServiceId(nodeId), + std::move(request), + nodeId); + } + + ctx.Schedule(RequestTimeout, new TEvents::TEvWakeup()); +} + +void TReleaseDevicesActor::ReplyAndDie( + const TActorContext& ctx, + NProto::TError error) +{ + NCloud::Send( + ctx, + Owner, + std::make_unique( + std::move(error))); + + Die(ctx); +} + +void TReleaseDevicesActor::OnReleaseResponse( + const TActorContext& ctx, + ui64 cookie, + NProto::TError error) +{ + Y_ABORT_UNLESS(PendingRequests > 0); + + if (HasError(error)) { + LOG_LOG( + ctx, + MuteIOErrors ? NLog::PRI_WARN : NLog::PRI_ERROR, + TBlockStoreComponents::VOLUME, + "ReleaseDevices %s error: %s, %llu", + LogTargets().c_str(), + FormatError(error).c_str(), + cookie); + } + + if (--PendingRequests == 0) { + ReplyAndDie(ctx, {}); + } +} + +void TReleaseDevicesActor::HandleReleaseDevicesResponse( + const TEvDiskAgent::TEvReleaseDevicesResponse::TPtr& ev, + const TActorContext& ctx) +{ + OnReleaseResponse(ctx, ev->Cookie, ev->Get()->GetError()); +} + +void TReleaseDevicesActor::HandleReleaseDevicesUndelivery( + const TEvDiskAgent::TEvReleaseDevicesRequest::TPtr& ev, + const TActorContext& ctx) +{ + OnReleaseResponse(ctx, ev->Cookie, MakeError(E_REJECTED, "not delivered")); +} + +void TReleaseDevicesActor::HandlePoisonPill( + const TEvents::TEvPoisonPill::TPtr& ev, + const TActorContext& ctx) +{ + Y_UNUSED(ev); + + ReplyAndDie(ctx, MakeError(E_REJECTED, "Tablet is dead")); +} + +void TReleaseDevicesActor::HandleTimeout( + const TEvents::TEvWakeup::TPtr& ev, + const TActorContext& ctx) +{ + Y_UNUSED(ev); + + const auto err = TStringBuilder() + << "TReleaseDiskActor timeout." << " DiskId: " << DiskId + << " ClientId: " << ClientId + << " Targets: " << LogTargets() + << " VolumeGeneration: " << VolumeGeneration + << " PendingRequests: " << PendingRequests; + + LOG_WARN(ctx, TBlockStoreComponents::VOLUME, err); + + ReplyAndDie(ctx, MakeError(E_TIMEOUT, err)); +} + +STFUNC(TReleaseDevicesActor::StateWork) +{ + switch (ev->GetTypeRewrite()) { + HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); + HFunc(TEvents::TEvWakeup, HandleTimeout); + + HFunc( + TEvDiskAgent::TEvReleaseDevicesResponse, + HandleReleaseDevicesResponse); + HFunc( + TEvDiskAgent::TEvReleaseDevicesRequest, + HandleReleaseDevicesUndelivery); + + default: + HandleUnexpectedEvent(ev, TBlockStoreComponents::VOLUME); + break; + } +} + +//////////////////////////////////////////////////////////////////////////////// + +TString TReleaseDevicesActor::LogTargets() const +{ + return LogDevices(Devices); +} + +} // namespace + +//////////////////////////////////////////////////////////////////////////////// + +void TVolumeActor::SendReleaseDevicesToAgents( + const TString& clientId, + const TActorContext& ctx) +{ + auto replyWithError = [&](auto error) + { + NCloud::Send( + ctx, + SelfId(), + std::make_unique( + std::move(error))); + }; + + TString diskId = State->GetDiskId(); + ui32 volumeGeneration = Executor()->Generation(); + + if (!clientId) { + replyWithError(MakeError(E_ARGUMENT, "empty client id")); + return; + } + + if (!diskId) { + replyWithError(MakeError(E_ARGUMENT, "empty disk id")); + return; + } + + auto devices = State->GetAllDevicesForAcquireRelease(); + + auto actor = NCloud::Register( + ctx, + ctx.SelfID, + std::move(diskId), + clientId, + volumeGeneration, + Config->GetAgentRequestTimeout(), + std::move(devices), + State->GetMeta().GetMuteIOErrors()); + + Actors.insert(actor); +} + +void TVolumeActor::HandleDevicesReleasedFinished( + const TEvVolumePrivate::TEvDevicesReleaseFinished::TPtr& ev, + const NActors::TActorContext& ctx) +{ + HandleDevicesReleasedFinishedImpl(ev->Get()->GetError(), ctx); +} + +} // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/volume/volume_actor_removeclient.cpp b/cloud/blockstore/libs/storage/volume/volume_actor_removeclient.cpp index 27c26445aea..a1b72d50aaa 100644 --- a/cloud/blockstore/libs/storage/volume/volume_actor_removeclient.cpp +++ b/cloud/blockstore/libs/storage/volume/volume_actor_removeclient.cpp @@ -28,7 +28,10 @@ void TVolumeActor::ReleaseDisk(const TActorContext& ctx, const TString& clientId request->Record.SetDiskId(State->GetDiskId()); request->Record.MutableHeaders()->SetClientId(clientId); request->Record.SetVolumeGeneration(Executor()->Generation()); - + if (Config->GetNonReplicatedVolumeDirectAcquireEnabled()) { + SendReleaseDevicesToAgents(clientId, ctx); + return; + } NCloud::Send( ctx, MakeDiskRegistryProxyServiceId(), @@ -42,6 +45,13 @@ void TVolumeActor::HandleReleaseDiskResponse( auto* msg = ev->Get(); auto& record = msg->Record; + HandleDevicesReleasedFinishedImpl(record.GetError(), ctx); +} + +void TVolumeActor::HandleDevicesReleasedFinishedImpl( + const NProto::TError& error, + const NActors::TActorContext& ctx) +{ if (AcquireReleaseDiskRequests.empty()) { LOG_DEBUG_S( ctx, @@ -54,7 +64,6 @@ void TVolumeActor::HandleReleaseDiskResponse( auto& request = AcquireReleaseDiskRequests.front(); auto& cr = request.ClientRequest; - const auto& error = record.GetError(); if (HasError(error) && (error.GetCode() != E_NOT_FOUND)) { LOG_DEBUG_S( diff --git a/cloud/blockstore/libs/storage/volume/volume_events_private.h b/cloud/blockstore/libs/storage/volume/volume_events_private.h index 923fdb8ba96..c28a6b2eeba 100644 --- a/cloud/blockstore/libs/storage/volume/volume_events_private.h +++ b/cloud/blockstore/libs/storage/volume/volume_events_private.h @@ -308,6 +308,22 @@ struct TEvVolumePrivate TExternalDrainDone() = default; }; + // + // DevicesAcquireFinished + // + + struct TDevicesAcquireFinished + { + }; + + // + // DevicesReleaseFinished + // + + struct TDevicesReleaseFinished + { + }; + // // Events declaration // @@ -330,6 +346,8 @@ struct TEvVolumePrivate EvRemoveExpiredVolumeParams, EvShadowDiskAcquired, EvExternalDrainDone, + EvDevicesAcquireFinished, + EvDevicesReleaseFinished, EvEnd }; @@ -392,6 +410,12 @@ struct TEvVolumePrivate TExternalDrainDone, EvExternalDrainDone >; + + using TEvDevicesAcquireFinished = + TResponseEvent; + + using TEvDevicesReleaseFinished = + TResponseEvent; }; } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/volume/volume_state.cpp b/cloud/blockstore/libs/storage/volume/volume_state.cpp index 8d9cbd8ea9c..91722013d65 100644 --- a/cloud/blockstore/libs/storage/volume/volume_state.cpp +++ b/cloud/blockstore/libs/storage/volume/volume_state.cpp @@ -811,6 +811,31 @@ const THashMultiMap& TVolumeState::GetPipeServerId2ClientId() return ClientIdsByPipeServerId; } +TVector +TVolumeState::GetAllDevicesForAcquireRelease() const +{ + const size_t allDevicesCount = + ((Meta.ReplicasSize() + 1) * Meta.DevicesSize()) + + GetMeta().MigrationsSize(); + + TVector resultDevices; + resultDevices.reserve(allDevicesCount); + + for (const auto& device: Meta.GetDevices()) { + resultDevices.emplace_back(device); + } + for (const auto& replica: Meta.GetReplicas()) { + for (const auto& device: replica.GetDevices()) { + resultDevices.emplace_back(device); + } + } + for (const auto& migration: Meta.GetMigrations()) { + resultDevices.emplace_back(migration.GetTargetDevice()); + } + + return resultDevices; +} + bool TVolumeState::CanPreemptClient( const TString& oldClientId, TInstant referenceTimestamp, diff --git a/cloud/blockstore/libs/storage/volume/volume_state.h b/cloud/blockstore/libs/storage/volume/volume_state.h index 7b92e555392..d27dedae766 100644 --- a/cloud/blockstore/libs/storage/volume/volume_state.h +++ b/cloud/blockstore/libs/storage/volume/volume_state.h @@ -240,7 +240,6 @@ class TVolumeState // The number of blocks that need to be migrated to complete the migration. std::optional BlockCountToMigrate; - public: TVolumeState( TStorageConfigPtr storageConfig, @@ -725,6 +724,8 @@ class TVolumeState return Meta.GetResyncNeeded(); } + TVector GetAllDevicesForAcquireRelease() const; + private: bool CanPreemptClient( const TString& oldClientId, diff --git a/cloud/blockstore/libs/storage/volume/volume_state_ut.cpp b/cloud/blockstore/libs/storage/volume/volume_state_ut.cpp index 1cd86f2ce03..e5ade7417a9 100644 --- a/cloud/blockstore/libs/storage/volume/volume_state_ut.cpp +++ b/cloud/blockstore/libs/storage/volume/volume_state_ut.cpp @@ -1939,6 +1939,40 @@ Y_UNIT_TEST_SUITE(TVolumeStateTest) UNIT_ASSERT(!state.GetTrackUsedBlocks()); } } + + Y_UNIT_TEST(AcquireDisk) + { + auto volumeState = CreateVolumeState(); + auto meta = volumeState.GetMeta(); + const TInstant oldDate = TInstant::ParseIso8601("2023-08-30"); + meta.MutableVolumeConfig()->SetCreationTs(oldDate.MicroSeconds()); + meta.AddDevices()->SetDeviceUUID("d1"); + meta.AddDevices()->SetDeviceUUID("d2"); + auto& r1 = *meta.AddReplicas(); + r1.AddDevices()->SetDeviceUUID("d3"); + r1.AddDevices()->SetDeviceUUID("d4"); + auto& r2 = *meta.AddReplicas(); + r2.AddDevices()->SetDeviceUUID("d5"); + r2.AddDevices()->SetDeviceUUID("d6"); + + NProto::TDeviceMigration deviceMigration; + deviceMigration.SetSourceDeviceId("d1"); + deviceMigration.MutableTargetDevice()->SetDeviceUUID("d7"); + + meta.MutableMigrations()->Add(std::move(deviceMigration)); + volumeState.ResetMeta(meta); + + const THashSet + deviceUUIDSExpected{"d1", "d2", "d3", "d4", "d5", "d6", "d7"}; + + auto devices = volumeState.GetAllDevicesForAcquireRelease(); + THashSet devicesUUIDSActual; + for (const auto& d: volumeState.GetAllDevicesForAcquireRelease()) { + devicesUUIDSActual.insert(d.GetDeviceUUID()); + } + + UNIT_ASSERT_EQUAL(deviceUUIDSExpected, devicesUUIDSActual); + } } } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/volume/volume_ut_session.cpp b/cloud/blockstore/libs/storage/volume/volume_ut_session.cpp new file mode 100644 index 00000000000..224a61e3882 --- /dev/null +++ b/cloud/blockstore/libs/storage/volume/volume_ut_session.cpp @@ -0,0 +1,461 @@ +#include "volume_ut.h" + +#include +#include +#include +#include + +#include + +namespace NCloud::NBlockStore::NStorage { + +using namespace std::chrono_literals; + +using namespace NActors; + +using namespace NKikimr; + +using namespace NCloud::NBlockStore::NStorage::NPartition; + +using namespace NCloud::NStorage; + +using namespace NTestVolume; + +using namespace NTestVolumeHelpers; + +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +struct TFixture: public NUnitTest::TBaseFixture +{ + std::unique_ptr Runtime; + TIntrusivePtr State; + + void SetupTest(TDuration agentRequestTimeout = 1s) + { + NProto::TStorageServiceConfig config; + config.SetAcquireNonReplicatedDevices(true); + config.SetNonReplicatedVolumeDirectAcquireEnabled(true); + config.SetAgentRequestTimeout(agentRequestTimeout.MilliSeconds()); + config.SetClientRemountPeriod(2000); + State = MakeIntrusive(); + Runtime = PrepareTestActorRuntime(config, State); + auto volume = GetVolumeClient(); + + volume.UpdateVolumeConfig( + 0, + 0, + 0, + 0, + false, + 1, + NCloud::NProto::STORAGE_MEDIA_SSD_NONREPLICATED, + 1024); + + volume.WaitReady(); + } + + TVolumeClient GetVolumeClient() const + { + return {*Runtime}; + } +}; + +} // namespace + +//////////////////////////////////////////////////////////////////////////////// + +Y_UNIT_TEST_SUITE(TVolumeSessionTest) +{ + Y_UNIT_TEST_F(ShouldPassAllParamsInAcquireDevicesRequest, TFixture) + { + SetupTest(); + + auto volume = GetVolumeClient(); + + auto response = volume.GetVolumeInfo(); + auto diskInfo = response->Record.GetVolume(); + THashSet devices; + for (const auto& d: diskInfo.GetDevices()) { + devices.emplace(d.GetDeviceUUID()); + } + for (const auto& m: diskInfo.GetMigrations()) { + devices.emplace(m.GetTargetDevice().GetDeviceUUID()); + } + + auto statVolumeResponse = volume.StatVolume(); + + bool requestSended = false; + + Runtime->SetObserverFunc( + [&](TAutoPtr& event) + { + if (event->GetTypeRewrite() == + TEvDiskAgent::EvAcquireDevicesRequest) + { + requestSended = true; + auto* acquireReq = + event->Get(); + Y_UNUSED(acquireReq); + + auto& record = acquireReq->Record; + + UNIT_ASSERT_EQUAL( + record.GetAccessMode(), + NProto::VOLUME_ACCESS_READ_WRITE); + + UNIT_ASSERT_EQUAL(record.GetDiskId(), diskInfo.GetDiskId()); + + const auto& deviceUUIDS = record.GetDeviceUUIDs(); + UNIT_ASSERT_EQUAL( + static_cast(deviceUUIDS.size()), + devices.size()); + for (const auto& deviceUUID: deviceUUIDS) { + UNIT_ASSERT(devices.contains(deviceUUID)); + } + + UNIT_ASSERT_EQUAL( + statVolumeResponse->Record.GetVolumeGeneration(), + record.GetVolumeGeneration()); + + UNIT_ASSERT_EQUAL( + statVolumeResponse->Record.GetMountSeqNumber(), + record.GetMountSeqNumber()); + } + + return TTestActorRuntime::DefaultObserverFunc(event); + }); + + auto writer = CreateVolumeClientInfo( + NProto::VOLUME_ACCESS_READ_WRITE, + NProto::VOLUME_MOUNT_LOCAL, + 0); + volume.AddClient(writer); + + UNIT_ASSERT(requestSended); + } + + Y_UNIT_TEST_F(ShouldPassAllParamsInReleaseDevicesRequest, TFixture) + { + SetupTest(); + + auto volume = GetVolumeClient(); + + auto writer = CreateVolumeClientInfo( + NProto::VOLUME_ACCESS_READ_WRITE, + NProto::VOLUME_MOUNT_LOCAL, + 0); + volume.AddClient(writer); + + auto response = volume.GetVolumeInfo(); + auto diskInfo = response->Record.GetVolume(); + THashSet devices; + for (const auto& d: diskInfo.GetDevices()) { + devices.emplace(d.GetDeviceUUID()); + } + for (const auto& m: diskInfo.GetMigrations()) { + devices.emplace(m.GetTargetDevice().GetDeviceUUID()); + } + + auto statVolumeResponse = volume.StatVolume(); + + bool requestSended = false; + + Runtime->SetObserverFunc( + [&](TAutoPtr& event) + { + if (event->GetTypeRewrite() == + TEvDiskAgent::EvReleaseDevicesRequest) + { + requestSended = true; + auto* acquireReq = + event->Get(); + Y_UNUSED(acquireReq); + + auto& record = acquireReq->Record; + + UNIT_ASSERT_EQUAL(record.GetDiskId(), diskInfo.GetDiskId()); + + const auto& deviceUUIDS = record.GetDeviceUUIDs(); + UNIT_ASSERT_EQUAL( + static_cast(deviceUUIDS.size()), + devices.size()); + for (const auto& deviceUUID: deviceUUIDS) { + UNIT_ASSERT(devices.contains(deviceUUID)); + } + + UNIT_ASSERT_EQUAL( + statVolumeResponse->Record.GetVolumeGeneration(), + record.GetVolumeGeneration()); + } + + return TTestActorRuntime::DefaultObserverFunc(event); + }); + + volume.RemoveClient(writer.GetClientId()); + + UNIT_ASSERT(requestSended); + } + + Y_UNIT_TEST_F(ShouldSendAcquireReleaseRequestsDirectlyToDiskAgent, TFixture) + { + SetupTest(); + + TVolumeClient writerClient = GetVolumeClient(); + auto readerClient1 = GetVolumeClient(); + auto readerClient2 = GetVolumeClient(); + + ui32 acquireRequestsToDiskRegistry = 0; + ui32 releaseRequestsToDiskRegistry = 0; + ui32 readerAcquireRequests = 0; + ui32 writerAcquireRequests = 0; + ui32 releaseRequests = 0; + + Runtime->SetObserverFunc( + [&](TAutoPtr& event) + { + switch (event->GetTypeRewrite()) { + case TEvDiskRegistry::EvAcquireDiskRequest: + ++acquireRequestsToDiskRegistry; + break; + case TEvDiskRegistry::EvReleaseDiskRequest: + ++releaseRequestsToDiskRegistry; + break; + case TEvDiskAgent::EvAcquireDevicesRequest: { + auto* msg = + event + ->Get(); + if (msg->Record.GetAccessMode() == + NProto::VOLUME_ACCESS_READ_ONLY) + { + ++readerAcquireRequests; + } else { + ++writerAcquireRequests; + } + break; + } + case TEvDiskAgent::EvReleaseDevicesRequest: + ++releaseRequests; + break; + default: + break; + } + + return TTestActorRuntime::DefaultObserverFunc(event); + }); + + Runtime->AdvanceCurrentTime(2s); + Runtime->DispatchEvents({}, 1ms); + + UNIT_ASSERT_VALUES_EQUAL(acquireRequestsToDiskRegistry, 0); + UNIT_ASSERT_VALUES_EQUAL(writerAcquireRequests, 0); + UNIT_ASSERT_VALUES_EQUAL(readerAcquireRequests, 0); + + auto writer = CreateVolumeClientInfo( + NProto::VOLUME_ACCESS_READ_WRITE, + NProto::VOLUME_MOUNT_LOCAL, + 0); + writerClient.AddClient(writer); + + UNIT_ASSERT_VALUES_EQUAL(acquireRequestsToDiskRegistry, 0); + UNIT_ASSERT_VALUES_EQUAL(writerAcquireRequests, 1); + UNIT_ASSERT_VALUES_EQUAL(readerAcquireRequests, 0); + + auto reader1 = CreateVolumeClientInfo( + NProto::VOLUME_ACCESS_READ_ONLY, + NProto::VOLUME_MOUNT_REMOTE, + 0); + readerClient1.AddClient(reader1); + + auto reader2 = CreateVolumeClientInfo( + NProto::VOLUME_ACCESS_READ_ONLY, + NProto::VOLUME_MOUNT_REMOTE, + 0); + readerClient2.AddClient(reader2); + + UNIT_ASSERT_VALUES_EQUAL(acquireRequestsToDiskRegistry, 0); + UNIT_ASSERT_VALUES_EQUAL(writerAcquireRequests, 1); + UNIT_ASSERT_VALUES_EQUAL(readerAcquireRequests, 2); + + Runtime->AdvanceCurrentTime(2s); + Runtime->DispatchEvents({}, 1ms); + + UNIT_ASSERT_VALUES_EQUAL(acquireRequestsToDiskRegistry, 0); + UNIT_ASSERT_VALUES_EQUAL(writerAcquireRequests, 2); + UNIT_ASSERT_VALUES_EQUAL(readerAcquireRequests, 4); + + UNIT_ASSERT_VALUES_EQUAL(releaseRequests, 0); + UNIT_ASSERT_VALUES_EQUAL(releaseRequestsToDiskRegistry, 0); + + readerClient1.RemoveClient(reader1.GetClientId()); + + Runtime->AdvanceCurrentTime(2s); + Runtime->DispatchEvents({}, 1ms); + + UNIT_ASSERT_VALUES_EQUAL(acquireRequestsToDiskRegistry, 0); + UNIT_ASSERT_VALUES_EQUAL(writerAcquireRequests, 3); + UNIT_ASSERT_VALUES_EQUAL(readerAcquireRequests, 5); + + UNIT_ASSERT_VALUES_EQUAL(releaseRequests, 1); + UNIT_ASSERT_VALUES_EQUAL(releaseRequestsToDiskRegistry, 0); + + writerClient.RemoveClient(writer.GetClientId()); + readerClient2.RemoveClient(reader2.GetClientId()); + + Runtime->AdvanceCurrentTime(2s); + Runtime->DispatchEvents({}, 1ms); + + UNIT_ASSERT_VALUES_EQUAL(acquireRequestsToDiskRegistry, 0); + UNIT_ASSERT_VALUES_EQUAL(writerAcquireRequests, 3); + UNIT_ASSERT_VALUES_EQUAL(readerAcquireRequests, 5); + + UNIT_ASSERT_VALUES_EQUAL(releaseRequests, 3); + UNIT_ASSERT_VALUES_EQUAL(releaseRequestsToDiskRegistry, 0); + } + + Y_UNIT_TEST_F(ShouldRejectTimedoutAcquireRequests, TFixture) + { + SetupTest(100ms); + + auto writerClient = GetVolumeClient(); + + std::unique_ptr stollenResponse; + Runtime->SetObserverFunc( + [&](TAutoPtr& event) + { + if (event->GetTypeRewrite() == + TEvDiskAgent::EvAcquireDevicesResponse) + { + stollenResponse.reset(event.Release()); + } + + return TTestActorRuntime::DefaultObserverFunc(event); + }); + + auto writer = CreateVolumeClientInfo( + NProto::VOLUME_ACCESS_READ_WRITE, + NProto::VOLUME_MOUNT_LOCAL, + 0); + + writerClient.SendAddClientRequest(writer); + auto response = writerClient.RecvAddClientResponse(); + UNIT_ASSERT_EQUAL(response->GetError().GetCode(), E_REJECTED); + UNIT_ASSERT_VALUES_EQUAL(response->GetError().GetMessage(), "timeout"); + } + + Y_UNIT_TEST_F(ShouldPassErrorsFromDiskAgent, TFixture) + { + SetupTest(); + + auto writerClient = GetVolumeClient(); + + Runtime->SetObserverFunc( + [&](TAutoPtr& event) + { + if (event->GetTypeRewrite() == + TEvDiskAgent::EvAcquireDevicesResponse) + { + auto response = std::make_unique< + TEvDiskAgent::TEvAcquireDevicesResponse>( + MakeError(E_TRY_AGAIN)); + + Runtime->Send(new IEventHandle( + event->Recipient, + event->Sender, + response.release(), + 0, // flags + event->Cookie)); + + return TTestActorRuntime::EEventAction::DROP; + } + + return TTestActorRuntime::DefaultObserverFunc(event); + }); + + auto writer = CreateVolumeClientInfo( + NProto::VOLUME_ACCESS_READ_WRITE, + NProto::VOLUME_MOUNT_LOCAL, + 0); + + writerClient.SendAddClientRequest(writer); + auto response = writerClient.RecvAddClientResponse(); + + UNIT_ASSERT_EQUAL(response->GetError().GetCode(), E_TRY_AGAIN); + } + + Y_UNIT_TEST_F(ShouldMuteErrorsWithMuteIoErrors, TFixture) + { + SetupTest(); + + auto writerClient = GetVolumeClient(); + + Runtime->SetObserverFunc( + [&](TAutoPtr& event) + { + if (event->GetTypeRewrite() == + TEvDiskAgent::EvAcquireDevicesResponse) + { + auto response = std::make_unique< + TEvDiskAgent::TEvAcquireDevicesResponse>( + MakeError(E_TRY_AGAIN)); + + Runtime->Send(new IEventHandle( + event->Recipient, + event->Sender, + response.release(), + 0, // flags + event->Cookie)); + + return TTestActorRuntime::EEventAction::DROP; + } + + return TTestActorRuntime::DefaultObserverFunc(event); + }); + + auto writer = CreateVolumeClientInfo( + NProto::VOLUME_ACCESS_READ_WRITE, + NProto::VOLUME_MOUNT_LOCAL, + 0); + + auto& disk = State->Disks.at("vol0"); + disk.IOMode = NProto::VOLUME_IO_ERROR_READ_ONLY; + disk.IOModeTs = Runtime->GetCurrentTime(); + disk.MuteIOErrors = true; + + auto volume = GetVolumeClient(); + volume.ReallocateDisk(); + // reallocate disk will trigger pipes reset, so reestablish connection + volume.ReconnectPipe(); + + writerClient.AddClient(writer); + } + + Y_UNIT_TEST_F(ShouldHandleRequestsUndelivery, TFixture) + { + SetupTest(); + + auto writerClient = GetVolumeClient(); + + auto writer = CreateVolumeClientInfo( + NProto::VOLUME_ACCESS_READ_WRITE, + NProto::VOLUME_MOUNT_LOCAL, + 0); + + auto agentNodeId = MakeDiskAgentServiceId( + State->Disks.at("vol0").Devices[0].GetNodeId()); + + Runtime->Send(new IEventHandle( + agentNodeId, + TActorId(), + new TEvents::TEvPoisonPill)); + + writerClient.SendAddClientRequest(writer); + + auto response = writerClient.RecvAddClientResponse(); + + UNIT_ASSERT_EQUAL(response->GetError().GetCode(), E_REJECTED); + UNIT_ASSERT_EQUAL(response->GetError().GetMessage(), "not delivered"); + } +} +} // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/volume/ya.make b/cloud/blockstore/libs/storage/volume/ya.make index 77f72624e71..3a04be7cdc4 100644 --- a/cloud/blockstore/libs/storage/volume/ya.make +++ b/cloud/blockstore/libs/storage/volume/ya.make @@ -7,6 +7,7 @@ SRCS( volume.cpp volume_actor_addclient.cpp + volume_actor_acquire.cpp volume_actor_allocatedisk.cpp volume_actor_change_storage_config.cpp volume_actor_checkpoint.cpp @@ -22,6 +23,7 @@ SRCS( volume_actor_read_history.cpp volume_actor_read_meta_history.cpp volume_actor_reallocatedisk.cpp + volume_actor_release.cpp volume_actor_removeclient.cpp volume_actor_reset_seqnumber.cpp volume_actor_resync.cpp