From c7be6417873609238e6c4794fb034188bf3c7f42 Mon Sep 17 00:00:00 2001 From: Alexander Rutkovsky Date: Thu, 6 Jun 2024 13:00:18 +0300 Subject: [PATCH] Support dynamic node configuration updates through NodeWarden (#5227) --- ydb/core/base/blobstorage.h | 2 + ydb/core/blobstorage/nodewarden/bind_queue.h | 4 + ydb/core/blobstorage/nodewarden/distconf.cpp | 64 ++++++++----- ydb/core/blobstorage/nodewarden/distconf.h | 29 +++++- .../nodewarden/distconf_binding.cpp | 21 ++++- .../nodewarden/distconf_dynamic.cpp | 90 +++++++++++++++++++ .../blobstorage/nodewarden/distconf_mon.cpp | 12 +++ .../nodewarden/node_warden_events.h | 4 + .../blobstorage/nodewarden/node_warden_impl.h | 2 + ydb/core/blobstorage/nodewarden/ya.make | 1 + .../blobstorage_distributed_config.proto | 4 + 11 files changed, 207 insertions(+), 26 deletions(-) create mode 100644 ydb/core/blobstorage/nodewarden/distconf_dynamic.cpp diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h index f639422bc4e7..58b45decea0b 100644 --- a/ydb/core/base/blobstorage.h +++ b/ydb/core/base/blobstorage.h @@ -845,6 +845,8 @@ struct TEvBlobStorage { EvNodeWardenStorageConfigConfirm, EvNodeWardenQueryBaseConfig, EvNodeWardenBaseConfig, + EvNodeWardenDynamicConfigSubscribe, + EvNodeWardenDynamicConfigPush, // Other EvRunActor = EvPut + 15 * 512, diff --git a/ydb/core/blobstorage/nodewarden/bind_queue.h b/ydb/core/blobstorage/nodewarden/bind_queue.h index 3718d655b5c6..bea6fd9c8778 100644 --- a/ydb/core/blobstorage/nodewarden/bind_queue.h +++ b/ydb/core/blobstorage/nodewarden/bind_queue.h @@ -26,6 +26,10 @@ namespace NKikimr::NStorage { #endif public: + bool Empty() const { + return BindQueue.empty(); + } + void Disable(ui32 nodeId) { #ifndef NDEBUG Y_ABORT_UNLESS(Enabled.contains(nodeId)); diff --git a/ydb/core/blobstorage/nodewarden/distconf.cpp b/ydb/core/blobstorage/nodewarden/distconf.cpp index 82d760514b5d..706f155c4cd2 100644 --- a/ydb/core/blobstorage/nodewarden/distconf.cpp +++ b/ydb/core/blobstorage/nodewarden/distconf.cpp @@ -5,8 +5,9 @@ namespace NKikimr::NStorage { TDistributedConfigKeeper::TDistributedConfigKeeper(TIntrusivePtr cfg, - const NKikimrBlobStorage::TStorageConfig& baseConfig) - : Cfg(std::move(cfg)) + const NKikimrBlobStorage::TStorageConfig& baseConfig, bool isSelfStatic) + : IsSelfStatic(isSelfStatic) + , Cfg(std::move(cfg)) , BaseConfig(baseConfig) , InitialConfig(baseConfig) { @@ -17,22 +18,32 @@ namespace NKikimr::NStorage { void TDistributedConfigKeeper::Bootstrap() { STLOG(PRI_DEBUG, BS_NODE, NWDC00, "Bootstrap"); - // report initial node listing - auto ns = NNodeBroker::BuildNameserverTable(Cfg->NameserviceConfig); - auto ev = std::make_unique(); - for (const auto& [nodeId, item] : ns->StaticNodeTable) { - ev->Nodes.emplace_back(nodeId, item.Address, item.Host, item.ResolveHost, item.Port, item.Location); + // report initial node listing for static node + if (IsSelfStatic) { + auto ns = NNodeBroker::BuildNameserverTable(Cfg->NameserviceConfig); + auto ev = std::make_unique(); + for (const auto& [nodeId, item] : ns->StaticNodeTable) { + ev->Nodes.emplace_back(nodeId, item.Address, item.Host, item.ResolveHost, item.Port, item.Location); + } + Send(SelfId(), ev.release()); } - Send(SelfId(), ev.release()); + + // and subscribe for the node list too + Send(GetNameserviceActorId(), new TEvInterconnect::TEvListNodes(true)); // generate initial drive set and query stored configuration - EnumerateConfigDrives(InitialConfig, SelfId().NodeId(), [&](const auto& /*node*/, const auto& drive) { - DrivesToRead.push_back(drive.GetPath()); - }); - std::sort(DrivesToRead.begin(), DrivesToRead.end()); + if (IsSelfStatic) { + EnumerateConfigDrives(InitialConfig, SelfId().NodeId(), [&](const auto& /*node*/, const auto& drive) { + DrivesToRead.push_back(drive.GetPath()); + }); + std::sort(DrivesToRead.begin(), DrivesToRead.end()); + + auto query = std::bind(&TThis::ReadConfig, TActivationContext::ActorSystem(), SelfId(), DrivesToRead, Cfg, 0); + Send(MakeIoDispatcherActorId(), new TEvInvokeQuery(std::move(query))); + } else { + StorageConfigLoaded = true; + } - auto query = std::bind(&TThis::ReadConfig, TActivationContext::ActorSystem(), SelfId(), DrivesToRead, Cfg, 0); - Send(MakeIoDispatcherActorId(), new TEvInvokeQuery(std::move(query))); Become(&TThis::StateWaitForInit); } @@ -60,7 +71,10 @@ namespace NKikimr::NStorage { } Send(MakeBlobStorageNodeWardenID(SelfId().NodeId()), new TEvNodeWardenStorageConfig(*StorageConfig, ProposedStorageConfig ? &ProposedStorageConfig.value() : nullptr)); - PersistConfig({}); + if (IsSelfStatic) { + PersistConfig({}); + ApplyConfigUpdateToDynamicNodes(); + } return true; } else if (StorageConfig->GetGeneration() && StorageConfig->GetGeneration() == config.GetGeneration() && StorageConfig->GetFingerprint() != config.GetFingerprint()) { @@ -149,6 +163,9 @@ namespace NKikimr::NStorage { if (!sessionId) { okay = true; // may be just obsolete subscription request } + if (ConnectedDynamicNodes.contains(nodeId)) { + okay = true; + } Y_ABORT_UNLESS(okay); } @@ -202,8 +219,12 @@ namespace NKikimr::NStorage { } if (change && NodeListObtained && StorageConfigLoaded) { - UpdateBound(SelfNode.NodeId(), SelfNode, *StorageConfig, nullptr); - IssueNextBindRequest(); + if (IsSelfStatic) { + UpdateBound(SelfNode.NodeId(), SelfNode, *StorageConfig, nullptr); + IssueNextBindRequest(); + } else { + + } processPendingEvents(); } } @@ -224,6 +245,9 @@ namespace NKikimr::NStorage { hFunc(TEvPrivate::TEvStorageConfigLoaded, Handle); hFunc(TEvPrivate::TEvStorageConfigStored, Handle); fFunc(TEvBlobStorage::EvNodeWardenStorageConfigConfirm, HandleConfigConfirm); + fFunc(TEvBlobStorage::EvNodeWardenDynamicConfigSubscribe, HandleDynamicConfigSubscribe); + hFunc(TEvNodeWardenDynamicConfigPush, Handle); + cFunc(TEvPrivate::EvReconnect, HandleReconnect); hFunc(NMon::TEvHttpInfo, Handle); fFunc(TEvents::TSystem::Gone, HandleGone); cFunc(TEvents::TSystem::Wakeup, HandleWakeup); @@ -234,10 +258,8 @@ namespace NKikimr::NStorage { void TNodeWarden::StartDistributedConfigKeeper() { auto *appData = AppData(); - if (!appData->DynamicNameserviceConfig || SelfId().NodeId() <= appData->DynamicNameserviceConfig->MaxStaticNodeId) { - // start distributed configuration machinery only on static nodes - DistributedConfigKeeperId = Register(new TDistributedConfigKeeper(Cfg, StorageConfig)); - } + const bool isSelfStatic = !appData->DynamicNameserviceConfig || SelfId().NodeId() <= appData->DynamicNameserviceConfig->MaxStaticNodeId; + DistributedConfigKeeperId = Register(new TDistributedConfigKeeper(Cfg, StorageConfig, isSelfStatic)); } void TNodeWarden::ForwardToDistributedConfigKeeper(STATEFN_SIG) { diff --git a/ydb/core/blobstorage/nodewarden/distconf.h b/ydb/core/blobstorage/nodewarden/distconf.h index 51c03b9205d7..e8af479c1b1f 100644 --- a/ydb/core/blobstorage/nodewarden/distconf.h +++ b/ydb/core/blobstorage/nodewarden/distconf.h @@ -73,6 +73,7 @@ namespace NKikimr::NStorage { EvErrorTimeout, EvStorageConfigLoaded, EvStorageConfigStored, + EvReconnect, }; struct TEvStorageConfigLoaded : TEventLocal { @@ -167,6 +168,7 @@ namespace NKikimr::NStorage { } }; + const bool IsSelfStatic = false; TIntrusivePtr Cfg; // currently active storage config @@ -248,7 +250,8 @@ namespace NKikimr::NStorage { return NKikimrServices::TActivity::NODEWARDEN_DISTRIBUTED_CONFIG; } - TDistributedConfigKeeper(TIntrusivePtr cfg, const NKikimrBlobStorage::TStorageConfig& baseConfig); + TDistributedConfigKeeper(TIntrusivePtr cfg, const NKikimrBlobStorage::TStorageConfig& baseConfig, + bool isSelfStatic); void Bootstrap(); void PassAway() override; @@ -359,6 +362,30 @@ namespace NKikimr::NStorage { void Handle(TEvNodeConfigInvokeOnRoot::TPtr ev); + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Dynamic node interaction + + TBindQueue StaticBindQueue; + THashMap DynamicConfigSubscribers; // -> + ui32 ConnectedToStaticNode = 0; + TActorId StaticNodeSessionId; + bool ReconnectScheduled = false; + THashSet ConnectedDynamicNodes; + + // these are used on the dynamic nodes + void ApplyStaticNodeIds(const std::vector& nodeIds); + void ConnectToStaticNode(); + void HandleReconnect(); + void OnStaticNodeConnected(ui32 nodeId, TActorId sessionId); + void OnStaticNodeDisconnected(ui32 nodeId, TActorId sessionId); + void Handle(TEvNodeWardenDynamicConfigPush::TPtr ev); + + // these are used on the static nodes + void ApplyConfigUpdateToDynamicNodes(); + void OnDynamicNodeDisconnected(ui32 nodeId, TActorId sessionId); + void HandleDynamicConfigSubscribe(STATEFN_SIG); + void PushConfigToDynamicNode(TActorId actorId, TActorId sessionId); + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Event delivery diff --git a/ydb/core/blobstorage/nodewarden/distconf_binding.cpp b/ydb/core/blobstorage/nodewarden/distconf_binding.cpp index 26b328db167b..e69eaf2bad4c 100644 --- a/ydb/core/blobstorage/nodewarden/distconf_binding.cpp +++ b/ydb/core/blobstorage/nodewarden/distconf_binding.cpp @@ -6,13 +6,12 @@ namespace NKikimr::NStorage { STLOG(PRI_DEBUG, BS_NODE, NWDC11, "TEvNodesInfo"); // create a vector of peer static nodes - bool iAmStatic = false; std::vector nodeIds; const ui32 selfNodeId = SelfId().NodeId(); for (const auto& item : ev->Get()->Nodes) { if (item.NodeId == selfNodeId) { - iAmStatic = item.IsStatic; SelfNode = TNodeIdentifier(item.ResolveHost, item.Port, selfNodeId); + Y_ABORT_UNLESS(IsSelfStatic == item.IsStatic); } if (item.IsStatic) { nodeIds.push_back(item.NodeId); @@ -21,8 +20,9 @@ namespace NKikimr::NStorage { std::sort(nodeIds.begin(), nodeIds.end()); // do not start configuration negotiation for dynamic nodes - if (!iAmStatic) { + if (!IsSelfStatic) { Y_ABORT_UNLESS(NodeIds.empty()); + ApplyStaticNodeIds(nodeIds); return; } @@ -115,6 +115,10 @@ namespace NKikimr::NStorage { STLOG(PRI_DEBUG, BS_NODE, NWDC14, "TEvNodeConnected", (NodeId, nodeId)); + if (!IsSelfStatic) { + return OnStaticNodeConnected(nodeId, sessionId); + } + // update subscription information const auto [it, inserted] = SubscribedSessions.try_emplace(nodeId, sessionId); Y_ABORT_UNLESS(!inserted); @@ -133,12 +137,18 @@ namespace NKikimr::NStorage { void TDistributedConfigKeeper::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr ev) { const ui32 nodeId = ev->Get()->NodeId; + STLOG(PRI_DEBUG, BS_NODE, NWDC07, "TEvNodeDisconnected", (NodeId, nodeId)); + + if (!IsSelfStatic) { + return OnStaticNodeDisconnected(nodeId, ev->Sender); + } + const auto it = SubscribedSessions.find(nodeId); Y_ABORT_UNLESS(it != SubscribedSessions.end()); Y_ABORT_UNLESS(!it->second || it->second == ev->Sender); SubscribedSessions.erase(it); - STLOG(PRI_DEBUG, BS_NODE, NWDC07, "TEvNodeDisconnected", (NodeId, nodeId)); + OnDynamicNodeDisconnected(nodeId, ev->Sender); UnbindNode(nodeId, "disconnection"); @@ -163,6 +173,9 @@ namespace NKikimr::NStorage { if (DirectBoundNodes.contains(nodeId)) { return; } + if (ConnectedDynamicNodes.contains(nodeId)) { + return; + } if (const auto it = SubscribedSessions.find(nodeId); it != SubscribedSessions.end() && it->second) { TActivationContext::Send(new IEventHandle(TEvents::TSystem::Unsubscribe, 0, it->second, SelfId(), nullptr, 0)); SubscribedSessions.erase(it); diff --git a/ydb/core/blobstorage/nodewarden/distconf_dynamic.cpp b/ydb/core/blobstorage/nodewarden/distconf_dynamic.cpp new file mode 100644 index 000000000000..ba0a3d154791 --- /dev/null +++ b/ydb/core/blobstorage/nodewarden/distconf_dynamic.cpp @@ -0,0 +1,90 @@ +#include "distconf.h" + +namespace NKikimr::NStorage { + + void TDistributedConfigKeeper::ApplyStaticNodeIds(const std::vector& nodeIds) { + StaticBindQueue.Update(nodeIds); + ConnectToStaticNode(); + } + + void TDistributedConfigKeeper::ConnectToStaticNode() { + if (ConnectedToStaticNode) { + return; + } + + const TMonotonic now = TActivationContext::Monotonic(); + TMonotonic timestamp; + if (std::optional nodeId = StaticBindQueue.Pick(now, ×tamp)) { + ConnectedToStaticNode = *nodeId; + TActivationContext::Send(new IEventHandle(TEvBlobStorage::EvNodeWardenDynamicConfigSubscribe, + IEventHandle::FlagSubscribeOnSession, MakeBlobStorageNodeWardenID(ConnectedToStaticNode), SelfId(), + nullptr, 0)); + } else if (timestamp != TMonotonic::Max()) { + if (!ReconnectScheduled) { + TActivationContext::Schedule(timestamp, new IEventHandle(TEvPrivate::EvReconnect, 0, SelfId(), {}, nullptr, 0)); + ReconnectScheduled = true; + } + } + } + + void TDistributedConfigKeeper::HandleReconnect() { + Y_ABORT_UNLESS(ReconnectScheduled); + ReconnectScheduled = false; + ConnectToStaticNode(); + } + + void TDistributedConfigKeeper::OnStaticNodeConnected(ui32 nodeId, TActorId sessionId) { + Y_ABORT_UNLESS(nodeId == ConnectedToStaticNode); + Y_ABORT_UNLESS(!StaticNodeSessionId); + StaticNodeSessionId = sessionId; + } + + void TDistributedConfigKeeper::OnStaticNodeDisconnected(ui32 nodeId, TActorId /*sessionId*/) { + Y_ABORT_UNLESS(nodeId == ConnectedToStaticNode); + ConnectedToStaticNode = 0; + StaticNodeSessionId = {}; + ConnectToStaticNode(); + } + + void TDistributedConfigKeeper::Handle(TEvNodeWardenDynamicConfigPush::TPtr ev) { + auto& record = ev->Get()->Record; + ApplyStorageConfig(record.GetConfig()); + } + + void TDistributedConfigKeeper::ApplyConfigUpdateToDynamicNodes() { + for (const auto& [sessionId, actorId] : DynamicConfigSubscribers) { + PushConfigToDynamicNode(actorId, sessionId); + } + } + + void TDistributedConfigKeeper::OnDynamicNodeDisconnected(ui32 nodeId, TActorId sessionId) { + ConnectedDynamicNodes.erase(nodeId); + DynamicConfigSubscribers.erase(sessionId); + } + + void TDistributedConfigKeeper::HandleDynamicConfigSubscribe(STATEFN_SIG) { + const TActorId sessionId = ev->InterconnectSession; + Y_ABORT_UNLESS(sessionId); + const ui32 peerNodeId = ev->Sender.NodeId(); + if (const auto [it, inserted] = SubscribedSessions.try_emplace(peerNodeId); inserted) { + TActivationContext::Send(new IEventHandle(TEvents::TSystem::Subscribe, IEventHandle::FlagTrackDelivery, + sessionId, SelfId(), nullptr, 0)); + } + ConnectedDynamicNodes.insert(peerNodeId); + const auto [_, inserted] = DynamicConfigSubscribers.try_emplace(sessionId, ev->Sender); + Y_ABORT_UNLESS(inserted); + + if (StorageConfig) { + PushConfigToDynamicNode(ev->Sender, sessionId); + } + } + + void TDistributedConfigKeeper::PushConfigToDynamicNode(TActorId actorId, TActorId sessionId) { + auto ev = std::make_unique(); + ev->Record.MutableConfig()->CopyFrom(*StorageConfig); + auto handle = std::make_unique(actorId, SelfId(), ev.release()); + handle->Rewrite(TEvInterconnect::EvForward, sessionId); + TActivationContext::Send(handle.release()); + } + +} // NKikimr::NStorage diff --git a/ydb/core/blobstorage/nodewarden/distconf_mon.cpp b/ydb/core/blobstorage/nodewarden/distconf_mon.cpp index 12314a27f1d4..b630bb8411bd 100644 --- a/ydb/core/blobstorage/nodewarden/distconf_mon.cpp +++ b/ydb/core/blobstorage/nodewarden/distconf_mon.cpp @@ -198,6 +198,18 @@ namespace NKikimr::NStorage { } } + DIV_CLASS("panel panel-info") { + DIV_CLASS("panel-heading") { + out << "Static <-> dynamic node interaction"; + } + DIV_CLASS("panel-body") { + out << "IsSelfStatic: " << (IsSelfStatic ? "true" : "false") << "
"; + out << "ConnectedToStaticNode: " << ConnectedToStaticNode << "
"; + out << "StaticNodeSessionId: " << StaticNodeSessionId << "
"; + out << "ConnectedDynamicNodes: " << FormatList(ConnectedDynamicNodes) << "
"; + } + } + DIV_CLASS("panel panel-info") { DIV_CLASS("panel-heading") { out << "Incoming bindings"; diff --git a/ydb/core/blobstorage/nodewarden/node_warden_events.h b/ydb/core/blobstorage/nodewarden/node_warden_events.h index b79f92a3051c..162107493aa5 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_events.h +++ b/ydb/core/blobstorage/nodewarden/node_warden_events.h @@ -67,4 +67,8 @@ namespace NKikimr::NStorage { NKikimrBlobStorage::TBaseConfig BaseConfig; }; + struct TEvNodeWardenDynamicConfigPush + : TEventPB + {}; + } // NKikimr::NStorage diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.h b/ydb/core/blobstorage/nodewarden/node_warden_impl.h index 30c3e5c464b8..844f3cc0b5d9 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_impl.h +++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.h @@ -640,6 +640,8 @@ namespace NKikimr::NStorage { fFunc(TEvBlobStorage::EvNodeConfigScatter, ForwardToDistributedConfigKeeper); fFunc(TEvBlobStorage::EvNodeConfigGather, ForwardToDistributedConfigKeeper); fFunc(TEvBlobStorage::EvNodeConfigInvokeOnRoot, ForwardToDistributedConfigKeeper); + fFunc(TEvBlobStorage::EvNodeWardenDynamicConfigSubscribe, ForwardToDistributedConfigKeeper); + fFunc(TEvBlobStorage::EvNodeWardenDynamicConfigPush, ForwardToDistributedConfigKeeper); hFunc(TEvNodeWardenQueryBaseConfig, Handle); hFunc(TEvNodeConfigInvokeOnRootResult, Handle); diff --git a/ydb/core/blobstorage/nodewarden/ya.make b/ydb/core/blobstorage/nodewarden/ya.make index cbea4624aba8..0f0790d4bca2 100644 --- a/ydb/core/blobstorage/nodewarden/ya.make +++ b/ydb/core/blobstorage/nodewarden/ya.make @@ -7,6 +7,7 @@ SRCS( distconf.cpp distconf.h distconf_binding.cpp + distconf_dynamic.cpp distconf_generate.cpp distconf_fsm.cpp distconf_invoke.cpp diff --git a/ydb/core/protos/blobstorage_distributed_config.proto b/ydb/core/protos/blobstorage_distributed_config.proto index 161584647616..851638b2ab75 100644 --- a/ydb/core/protos/blobstorage_distributed_config.proto +++ b/ydb/core/protos/blobstorage_distributed_config.proto @@ -236,3 +236,7 @@ message TEvNodeConfigInvokeOnRootResult { TReassignStateStorageNode ReassignStateStorageNode = 9; } } + +message TEvNodeWardenDynamicConfigPush { + TStorageConfig Config = 1; +}