Skip to content

Commit

Permalink
Support dynamic node configuration updates through NodeWarden (ydb-pl…
Browse files Browse the repository at this point in the history
  • Loading branch information
alexvru authored Jun 6, 2024
1 parent 5685e21 commit c7be641
Show file tree
Hide file tree
Showing 11 changed files with 207 additions and 26 deletions.
2 changes: 2 additions & 0 deletions ydb/core/base/blobstorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,8 @@ struct TEvBlobStorage {
EvNodeWardenStorageConfigConfirm,
EvNodeWardenQueryBaseConfig,
EvNodeWardenBaseConfig,
EvNodeWardenDynamicConfigSubscribe,
EvNodeWardenDynamicConfigPush,

// Other
EvRunActor = EvPut + 15 * 512,
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/blobstorage/nodewarden/bind_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ namespace NKikimr::NStorage {
#endif

public:
bool Empty() const {
return BindQueue.empty();
}

void Disable(ui32 nodeId) {
#ifndef NDEBUG
Y_ABORT_UNLESS(Enabled.contains(nodeId));
Expand Down
64 changes: 43 additions & 21 deletions ydb/core/blobstorage/nodewarden/distconf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
namespace NKikimr::NStorage {

TDistributedConfigKeeper::TDistributedConfigKeeper(TIntrusivePtr<TNodeWardenConfig> cfg,
const NKikimrBlobStorage::TStorageConfig& baseConfig)
: Cfg(std::move(cfg))
const NKikimrBlobStorage::TStorageConfig& baseConfig, bool isSelfStatic)
: IsSelfStatic(isSelfStatic)
, Cfg(std::move(cfg))
, BaseConfig(baseConfig)
, InitialConfig(baseConfig)
{
Expand All @@ -17,22 +18,32 @@ namespace NKikimr::NStorage {
void TDistributedConfigKeeper::Bootstrap() {
STLOG(PRI_DEBUG, BS_NODE, NWDC00, "Bootstrap");

// report initial node listing
auto ns = NNodeBroker::BuildNameserverTable(Cfg->NameserviceConfig);
auto ev = std::make_unique<TEvInterconnect::TEvNodesInfo>();
for (const auto& [nodeId, item] : ns->StaticNodeTable) {
ev->Nodes.emplace_back(nodeId, item.Address, item.Host, item.ResolveHost, item.Port, item.Location);
// report initial node listing for static node
if (IsSelfStatic) {
auto ns = NNodeBroker::BuildNameserverTable(Cfg->NameserviceConfig);
auto ev = std::make_unique<TEvInterconnect::TEvNodesInfo>();
for (const auto& [nodeId, item] : ns->StaticNodeTable) {
ev->Nodes.emplace_back(nodeId, item.Address, item.Host, item.ResolveHost, item.Port, item.Location);
}
Send(SelfId(), ev.release());
}
Send(SelfId(), ev.release());

// and subscribe for the node list too
Send(GetNameserviceActorId(), new TEvInterconnect::TEvListNodes(true));

// generate initial drive set and query stored configuration
EnumerateConfigDrives(InitialConfig, SelfId().NodeId(), [&](const auto& /*node*/, const auto& drive) {
DrivesToRead.push_back(drive.GetPath());
});
std::sort(DrivesToRead.begin(), DrivesToRead.end());
if (IsSelfStatic) {
EnumerateConfigDrives(InitialConfig, SelfId().NodeId(), [&](const auto& /*node*/, const auto& drive) {
DrivesToRead.push_back(drive.GetPath());
});
std::sort(DrivesToRead.begin(), DrivesToRead.end());

auto query = std::bind(&TThis::ReadConfig, TActivationContext::ActorSystem(), SelfId(), DrivesToRead, Cfg, 0);
Send(MakeIoDispatcherActorId(), new TEvInvokeQuery(std::move(query)));
} else {
StorageConfigLoaded = true;
}

auto query = std::bind(&TThis::ReadConfig, TActivationContext::ActorSystem(), SelfId(), DrivesToRead, Cfg, 0);
Send(MakeIoDispatcherActorId(), new TEvInvokeQuery(std::move(query)));
Become(&TThis::StateWaitForInit);
}

Expand Down Expand Up @@ -60,7 +71,10 @@ namespace NKikimr::NStorage {
}
Send(MakeBlobStorageNodeWardenID(SelfId().NodeId()), new TEvNodeWardenStorageConfig(*StorageConfig,
ProposedStorageConfig ? &ProposedStorageConfig.value() : nullptr));
PersistConfig({});
if (IsSelfStatic) {
PersistConfig({});
ApplyConfigUpdateToDynamicNodes();
}
return true;
} else if (StorageConfig->GetGeneration() && StorageConfig->GetGeneration() == config.GetGeneration() &&
StorageConfig->GetFingerprint() != config.GetFingerprint()) {
Expand Down Expand Up @@ -149,6 +163,9 @@ namespace NKikimr::NStorage {
if (!sessionId) {
okay = true; // may be just obsolete subscription request
}
if (ConnectedDynamicNodes.contains(nodeId)) {
okay = true;
}
Y_ABORT_UNLESS(okay);
}

Expand Down Expand Up @@ -202,8 +219,12 @@ namespace NKikimr::NStorage {
}

if (change && NodeListObtained && StorageConfigLoaded) {
UpdateBound(SelfNode.NodeId(), SelfNode, *StorageConfig, nullptr);
IssueNextBindRequest();
if (IsSelfStatic) {
UpdateBound(SelfNode.NodeId(), SelfNode, *StorageConfig, nullptr);
IssueNextBindRequest();
} else {

}
processPendingEvents();
}
}
Expand All @@ -224,6 +245,9 @@ namespace NKikimr::NStorage {
hFunc(TEvPrivate::TEvStorageConfigLoaded, Handle);
hFunc(TEvPrivate::TEvStorageConfigStored, Handle);
fFunc(TEvBlobStorage::EvNodeWardenStorageConfigConfirm, HandleConfigConfirm);
fFunc(TEvBlobStorage::EvNodeWardenDynamicConfigSubscribe, HandleDynamicConfigSubscribe);
hFunc(TEvNodeWardenDynamicConfigPush, Handle);
cFunc(TEvPrivate::EvReconnect, HandleReconnect);
hFunc(NMon::TEvHttpInfo, Handle);
fFunc(TEvents::TSystem::Gone, HandleGone);
cFunc(TEvents::TSystem::Wakeup, HandleWakeup);
Expand All @@ -234,10 +258,8 @@ namespace NKikimr::NStorage {

void TNodeWarden::StartDistributedConfigKeeper() {
auto *appData = AppData();
if (!appData->DynamicNameserviceConfig || SelfId().NodeId() <= appData->DynamicNameserviceConfig->MaxStaticNodeId) {
// start distributed configuration machinery only on static nodes
DistributedConfigKeeperId = Register(new TDistributedConfigKeeper(Cfg, StorageConfig));
}
const bool isSelfStatic = !appData->DynamicNameserviceConfig || SelfId().NodeId() <= appData->DynamicNameserviceConfig->MaxStaticNodeId;
DistributedConfigKeeperId = Register(new TDistributedConfigKeeper(Cfg, StorageConfig, isSelfStatic));
}

void TNodeWarden::ForwardToDistributedConfigKeeper(STATEFN_SIG) {
Expand Down
29 changes: 28 additions & 1 deletion ydb/core/blobstorage/nodewarden/distconf.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ namespace NKikimr::NStorage {
EvErrorTimeout,
EvStorageConfigLoaded,
EvStorageConfigStored,
EvReconnect,
};

struct TEvStorageConfigLoaded : TEventLocal<TEvStorageConfigLoaded, EvStorageConfigLoaded> {
Expand Down Expand Up @@ -167,6 +168,7 @@ namespace NKikimr::NStorage {
}
};

const bool IsSelfStatic = false;
TIntrusivePtr<TNodeWardenConfig> Cfg;

// currently active storage config
Expand Down Expand Up @@ -248,7 +250,8 @@ namespace NKikimr::NStorage {
return NKikimrServices::TActivity::NODEWARDEN_DISTRIBUTED_CONFIG;
}

TDistributedConfigKeeper(TIntrusivePtr<TNodeWardenConfig> cfg, const NKikimrBlobStorage::TStorageConfig& baseConfig);
TDistributedConfigKeeper(TIntrusivePtr<TNodeWardenConfig> cfg, const NKikimrBlobStorage::TStorageConfig& baseConfig,
bool isSelfStatic);

void Bootstrap();
void PassAway() override;
Expand Down Expand Up @@ -359,6 +362,30 @@ namespace NKikimr::NStorage {

void Handle(TEvNodeConfigInvokeOnRoot::TPtr ev);

////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Dynamic node interaction

TBindQueue StaticBindQueue;
THashMap<TActorId, TActorId> DynamicConfigSubscribers; // <session id> -> <actor id>
ui32 ConnectedToStaticNode = 0;
TActorId StaticNodeSessionId;
bool ReconnectScheduled = false;
THashSet<ui32> ConnectedDynamicNodes;

// these are used on the dynamic nodes
void ApplyStaticNodeIds(const std::vector<ui32>& nodeIds);
void ConnectToStaticNode();
void HandleReconnect();
void OnStaticNodeConnected(ui32 nodeId, TActorId sessionId);
void OnStaticNodeDisconnected(ui32 nodeId, TActorId sessionId);
void Handle(TEvNodeWardenDynamicConfigPush::TPtr ev);

// these are used on the static nodes
void ApplyConfigUpdateToDynamicNodes();
void OnDynamicNodeDisconnected(ui32 nodeId, TActorId sessionId);
void HandleDynamicConfigSubscribe(STATEFN_SIG);
void PushConfigToDynamicNode(TActorId actorId, TActorId sessionId);

////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Event delivery

Expand Down
21 changes: 17 additions & 4 deletions ydb/core/blobstorage/nodewarden/distconf_binding.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@ namespace NKikimr::NStorage {
STLOG(PRI_DEBUG, BS_NODE, NWDC11, "TEvNodesInfo");

// create a vector of peer static nodes
bool iAmStatic = false;
std::vector<ui32> nodeIds;
const ui32 selfNodeId = SelfId().NodeId();
for (const auto& item : ev->Get()->Nodes) {
if (item.NodeId == selfNodeId) {
iAmStatic = item.IsStatic;
SelfNode = TNodeIdentifier(item.ResolveHost, item.Port, selfNodeId);
Y_ABORT_UNLESS(IsSelfStatic == item.IsStatic);
}
if (item.IsStatic) {
nodeIds.push_back(item.NodeId);
Expand All @@ -21,8 +20,9 @@ namespace NKikimr::NStorage {
std::sort(nodeIds.begin(), nodeIds.end());

// do not start configuration negotiation for dynamic nodes
if (!iAmStatic) {
if (!IsSelfStatic) {
Y_ABORT_UNLESS(NodeIds.empty());
ApplyStaticNodeIds(nodeIds);
return;
}

Expand Down Expand Up @@ -115,6 +115,10 @@ namespace NKikimr::NStorage {

STLOG(PRI_DEBUG, BS_NODE, NWDC14, "TEvNodeConnected", (NodeId, nodeId));

if (!IsSelfStatic) {
return OnStaticNodeConnected(nodeId, sessionId);
}

// update subscription information
const auto [it, inserted] = SubscribedSessions.try_emplace(nodeId, sessionId);
Y_ABORT_UNLESS(!inserted);
Expand All @@ -133,12 +137,18 @@ namespace NKikimr::NStorage {
void TDistributedConfigKeeper::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr ev) {
const ui32 nodeId = ev->Get()->NodeId;

STLOG(PRI_DEBUG, BS_NODE, NWDC07, "TEvNodeDisconnected", (NodeId, nodeId));

if (!IsSelfStatic) {
return OnStaticNodeDisconnected(nodeId, ev->Sender);
}

const auto it = SubscribedSessions.find(nodeId);
Y_ABORT_UNLESS(it != SubscribedSessions.end());
Y_ABORT_UNLESS(!it->second || it->second == ev->Sender);
SubscribedSessions.erase(it);

STLOG(PRI_DEBUG, BS_NODE, NWDC07, "TEvNodeDisconnected", (NodeId, nodeId));
OnDynamicNodeDisconnected(nodeId, ev->Sender);

UnbindNode(nodeId, "disconnection");

Expand All @@ -163,6 +173,9 @@ namespace NKikimr::NStorage {
if (DirectBoundNodes.contains(nodeId)) {
return;
}
if (ConnectedDynamicNodes.contains(nodeId)) {
return;
}
if (const auto it = SubscribedSessions.find(nodeId); it != SubscribedSessions.end() && it->second) {
TActivationContext::Send(new IEventHandle(TEvents::TSystem::Unsubscribe, 0, it->second, SelfId(), nullptr, 0));
SubscribedSessions.erase(it);
Expand Down
90 changes: 90 additions & 0 deletions ydb/core/blobstorage/nodewarden/distconf_dynamic.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#include "distconf.h"

namespace NKikimr::NStorage {

void TDistributedConfigKeeper::ApplyStaticNodeIds(const std::vector<ui32>& nodeIds) {
StaticBindQueue.Update(nodeIds);
ConnectToStaticNode();
}

void TDistributedConfigKeeper::ConnectToStaticNode() {
if (ConnectedToStaticNode) {
return;
}

const TMonotonic now = TActivationContext::Monotonic();
TMonotonic timestamp;
if (std::optional<ui32> nodeId = StaticBindQueue.Pick(now, &timestamp)) {
ConnectedToStaticNode = *nodeId;
TActivationContext::Send(new IEventHandle(TEvBlobStorage::EvNodeWardenDynamicConfigSubscribe,
IEventHandle::FlagSubscribeOnSession, MakeBlobStorageNodeWardenID(ConnectedToStaticNode), SelfId(),
nullptr, 0));
} else if (timestamp != TMonotonic::Max()) {
if (!ReconnectScheduled) {
TActivationContext::Schedule(timestamp, new IEventHandle(TEvPrivate::EvReconnect, 0, SelfId(), {}, nullptr, 0));
ReconnectScheduled = true;
}
}
}

void TDistributedConfigKeeper::HandleReconnect() {
Y_ABORT_UNLESS(ReconnectScheduled);
ReconnectScheduled = false;
ConnectToStaticNode();
}

void TDistributedConfigKeeper::OnStaticNodeConnected(ui32 nodeId, TActorId sessionId) {
Y_ABORT_UNLESS(nodeId == ConnectedToStaticNode);
Y_ABORT_UNLESS(!StaticNodeSessionId);
StaticNodeSessionId = sessionId;
}

void TDistributedConfigKeeper::OnStaticNodeDisconnected(ui32 nodeId, TActorId /*sessionId*/) {
Y_ABORT_UNLESS(nodeId == ConnectedToStaticNode);
ConnectedToStaticNode = 0;
StaticNodeSessionId = {};
ConnectToStaticNode();
}

void TDistributedConfigKeeper::Handle(TEvNodeWardenDynamicConfigPush::TPtr ev) {
auto& record = ev->Get()->Record;
ApplyStorageConfig(record.GetConfig());
}

void TDistributedConfigKeeper::ApplyConfigUpdateToDynamicNodes() {
for (const auto& [sessionId, actorId] : DynamicConfigSubscribers) {
PushConfigToDynamicNode(actorId, sessionId);
}
}

void TDistributedConfigKeeper::OnDynamicNodeDisconnected(ui32 nodeId, TActorId sessionId) {
ConnectedDynamicNodes.erase(nodeId);
DynamicConfigSubscribers.erase(sessionId);
}

void TDistributedConfigKeeper::HandleDynamicConfigSubscribe(STATEFN_SIG) {
const TActorId sessionId = ev->InterconnectSession;
Y_ABORT_UNLESS(sessionId);
const ui32 peerNodeId = ev->Sender.NodeId();
if (const auto [it, inserted] = SubscribedSessions.try_emplace(peerNodeId); inserted) {
TActivationContext::Send(new IEventHandle(TEvents::TSystem::Subscribe, IEventHandle::FlagTrackDelivery,
sessionId, SelfId(), nullptr, 0));
}
ConnectedDynamicNodes.insert(peerNodeId);
const auto [_, inserted] = DynamicConfigSubscribers.try_emplace(sessionId, ev->Sender);
Y_ABORT_UNLESS(inserted);

if (StorageConfig) {
PushConfigToDynamicNode(ev->Sender, sessionId);
}
}

void TDistributedConfigKeeper::PushConfigToDynamicNode(TActorId actorId, TActorId sessionId) {
auto ev = std::make_unique<TEvNodeWardenDynamicConfigPush>();
ev->Record.MutableConfig()->CopyFrom(*StorageConfig);
auto handle = std::make_unique<IEventHandle>(actorId, SelfId(), ev.release());
handle->Rewrite(TEvInterconnect::EvForward, sessionId);
TActivationContext::Send(handle.release());
}

} // NKikimr::NStorage
12 changes: 12 additions & 0 deletions ydb/core/blobstorage/nodewarden/distconf_mon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,18 @@ namespace NKikimr::NStorage {
}
}

DIV_CLASS("panel panel-info") {
DIV_CLASS("panel-heading") {
out << "Static <-> dynamic node interaction";
}
DIV_CLASS("panel-body") {
out << "IsSelfStatic: " << (IsSelfStatic ? "true" : "false") << "<br/>";
out << "ConnectedToStaticNode: " << ConnectedToStaticNode << "<br/>";
out << "StaticNodeSessionId: " << StaticNodeSessionId << "<br/>";
out << "ConnectedDynamicNodes: " << FormatList(ConnectedDynamicNodes) << "<br/>";
}
}

DIV_CLASS("panel panel-info") {
DIV_CLASS("panel-heading") {
out << "Incoming bindings";
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/blobstorage/nodewarden/node_warden_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,8 @@ namespace NKikimr::NStorage {
NKikimrBlobStorage::TBaseConfig BaseConfig;
};

struct TEvNodeWardenDynamicConfigPush
: TEventPB<TEvNodeWardenDynamicConfigPush, NKikimrBlobStorage::TEvNodeWardenDynamicConfigPush, TEvBlobStorage::EvNodeWardenDynamicConfigPush>
{};

} // NKikimr::NStorage
2 changes: 2 additions & 0 deletions ydb/core/blobstorage/nodewarden/node_warden_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,8 @@ namespace NKikimr::NStorage {
fFunc(TEvBlobStorage::EvNodeConfigScatter, ForwardToDistributedConfigKeeper);
fFunc(TEvBlobStorage::EvNodeConfigGather, ForwardToDistributedConfigKeeper);
fFunc(TEvBlobStorage::EvNodeConfigInvokeOnRoot, ForwardToDistributedConfigKeeper);
fFunc(TEvBlobStorage::EvNodeWardenDynamicConfigSubscribe, ForwardToDistributedConfigKeeper);
fFunc(TEvBlobStorage::EvNodeWardenDynamicConfigPush, ForwardToDistributedConfigKeeper);

hFunc(TEvNodeWardenQueryBaseConfig, Handle);
hFunc(TEvNodeConfigInvokeOnRootResult, Handle);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/blobstorage/nodewarden/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SRCS(
distconf.cpp
distconf.h
distconf_binding.cpp
distconf_dynamic.cpp
distconf_generate.cpp
distconf_fsm.cpp
distconf_invoke.cpp
Expand Down
Loading

0 comments on commit c7be641

Please sign in to comment.