Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Nodes communication via DB #1976

Open
wants to merge 18 commits into
base: develop
Choose a base branch
from
Open
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
add_subdirectory(util)
add_subdirectory(data)
add_subdirectory(cluster)
add_subdirectory(etl)
add_subdirectory(etlng)
add_subdirectory(feed)
Expand Down
11 changes: 10 additions & 1 deletion src/app/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
add_library(clio_app)
target_sources(clio_app PRIVATE CliArgs.cpp ClioApplication.cpp Stopper.cpp WebHandlers.cpp)

target_link_libraries(clio_app PUBLIC clio_etl clio_etlng clio_feed clio_web clio_rpc clio_migration)
target_link_libraries(
clio_app
PUBLIC clio_cluster
clio_etl
clio_etlng
clio_feed
clio_web
clio_rpc
clio_migration
)
4 changes: 4 additions & 0 deletions src/app/ClioApplication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "app/Stopper.hpp"
#include "app/WebHandlers.hpp"
#include "cluster/ClusterCommunicationService.hpp"
#include "data/AmendmentCenter.hpp"
#include "data/BackendFactory.hpp"
#include "data/LedgerCache.hpp"
Expand Down Expand Up @@ -108,6 +109,9 @@ ClioApplication::run(bool const useNgWebServer)
// Interface to the database
auto backend = data::makeBackend(config_, cache);

cluster::ClusterCommunicationService clusterCommunicationService{backend};
clusterCommunicationService.run();

auto const amendmentCenter = std::make_shared<data::AmendmentCenter const>(backend);

{
Expand Down
5 changes: 5 additions & 0 deletions src/cluster/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
add_library(clio_cluster)

target_sources(clio_cluster PRIVATE ClioNode.cpp ClusterCommunicationService.cpp)

target_link_libraries(clio_cluster PRIVATE clio_util clio_data)
65 changes: 65 additions & 0 deletions src/cluster/ClioNode.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.

Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.

THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================

#include "cluster/ClioNode.hpp"

#include "util/TimeUtils.hpp"

#include <boost/json/conversion.hpp>
#include <boost/json/object.hpp>
#include <boost/json/value.hpp>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_io.hpp>

#include <memory>
#include <stdexcept>
#include <string>
#include <string_view>

namespace cluster {

namespace {

struct Fields {
static constexpr std::string_view const kUPDATE_TIME = "update_time";
};

} // namespace

void
tag_invoke(boost::json::value_from_tag, boost::json::value& jv, ClioNode const& node)
{
jv = {
{Fields::kUPDATE_TIME, util::systemTpToUtcStr(node.updateTime, ClioNode::kTIME_FORMAT)},
};
}

ClioNode
tag_invoke(boost::json::value_to_tag<ClioNode>, boost::json::value const& jv)
{
auto const& updateTimeStr = jv.as_object().at(Fields::kUPDATE_TIME).as_string();
auto const updateTime = util::systemTpFromUtcStr(std::string(updateTimeStr), ClioNode::kTIME_FORMAT);
if (!updateTime.has_value()) {
throw std::runtime_error("Failed to parse update time");
}

return ClioNode{.uuid = std::make_shared<boost::uuids::uuid>(), .updateTime = updateTime.value()};
}

} // namespace cluster
58 changes: 58 additions & 0 deletions src/cluster/ClioNode.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.

Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.

THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================

#pragma once

#include <boost/json/conversion.hpp>
#include <boost/json/value.hpp>
#include <boost/uuid/uuid.hpp>

#include <chrono>
#include <memory>

namespace cluster {

/**
* @brief Represents a node in the cluster.
*/
struct ClioNode {
/**
* @brief The format of the time to store in the database.
*/
static constexpr char const* kTIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ";

// enum class WriterRole {
// ReadOnly,
// NotWriter,
// Writer
// };

std::shared_ptr<boost::uuids::uuid> uuid; ///< The UUID of the node.
std::chrono::system_clock::time_point updateTime; ///< The time the data about the node was last updated.

// WriterRole writerRole;
};

void
tag_invoke(boost::json::value_from_tag, boost::json::value& jv, ClioNode const& node);

ClioNode
tag_invoke(boost::json::value_to_tag<ClioNode>, boost::json::value const& jv);

} // namespace cluster
175 changes: 175 additions & 0 deletions src/cluster/ClusterCommunicationService.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.

Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.

THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================

#include "cluster/ClusterCommunicationService.hpp"

#include "cluster/ClioNode.hpp"
#include "data/BackendInterface.hpp"
#include "util/log/Logger.hpp"

#include <boost/asio/spawn.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/json/parse.hpp>
#include <boost/json/serialize.hpp>
#include <boost/json/value.hpp>
#include <boost/json/value_from.hpp>
#include <boost/json/value_to.hpp>
#include <boost/uuid/random_generator.hpp>
#include <boost/uuid/uuid.hpp>

#include <chrono>
#include <ctime>
#include <memory>
#include <utility>
#include <vector>

namespace cluster {

ClusterCommunicationService::ClusterCommunicationService(
std::shared_ptr<data::BackendInterface> backend,
std::chrono::steady_clock::duration readInterval,
std::chrono::steady_clock::duration writeInterval
)
: backend_(std::move(backend))
, readInterval_(readInterval)
, writeInterval_(writeInterval)
, selfData_{ClioNode{
.uuid = std::make_shared<boost::uuids::uuid>(boost::uuids::random_generator{}()),
.updateTime = std::chrono::system_clock::time_point{}
}}
{
nodesInClusterMetric_.set(1); // The node always sees itself
isHealthy_ = true;
}

void
ClusterCommunicationService::run()
{
boost::asio::spawn(strand_, [this](boost::asio::yield_context yield) {
boost::asio::steady_timer timer(yield.get_executor());
while (true) {
timer.expires_after(readInterval_);
timer.async_wait(yield);
doRead(yield);
}
});

boost::asio::spawn(strand_, [this](boost::asio::yield_context yield) {
boost::asio::steady_timer timer(yield.get_executor());
while (true) {
doWrite();
timer.expires_after(writeInterval_);
timer.async_wait(yield);
}
});
}

ClusterCommunicationService::~ClusterCommunicationService()
{
stop();
}

void
ClusterCommunicationService::stop()
{
if (stopped_)
return;

Check warning on line 92 in src/cluster/ClusterCommunicationService.cpp

View check run for this annotation

Codecov / codecov/patch

src/cluster/ClusterCommunicationService.cpp#L92

Added line #L92 was not covered by tests

ctx_.stop();
ctx_.join();
stopped_ = true;
}

std::shared_ptr<boost::uuids::uuid>
ClusterCommunicationService::selfUuid() const
{
// Uuid never changes so it is safe to copy it without using strand_
return selfData_.uuid;
}

ClioNode
ClusterCommunicationService::selfData() const

Check warning on line 107 in src/cluster/ClusterCommunicationService.cpp

View check run for this annotation

Codecov / codecov/patch

src/cluster/ClusterCommunicationService.cpp#L107

Added line #L107 was not covered by tests
{
ClioNode result{};

Check warning on line 109 in src/cluster/ClusterCommunicationService.cpp

View check run for this annotation

Codecov / codecov/patch

src/cluster/ClusterCommunicationService.cpp#L109

Added line #L109 was not covered by tests
boost::asio::spawn(strand_, [this, &result](boost::asio::yield_context) { result = selfData_; });
return result;
}

Check warning on line 112 in src/cluster/ClusterCommunicationService.cpp

View check run for this annotation

Codecov / codecov/patch

src/cluster/ClusterCommunicationService.cpp#L111-L112

Added lines #L111 - L112 were not covered by tests

std::vector<ClioNode>
ClusterCommunicationService::clusterData() const
{
std::vector<ClioNode> result;
boost::asio::spawn(strand_, [this, &result](boost::asio::yield_context) {
result = otherNodesData_;
result.push_back(selfData_);
});
return result;
}

Check warning on line 123 in src/cluster/ClusterCommunicationService.cpp

View check run for this annotation

Codecov / codecov/patch

src/cluster/ClusterCommunicationService.cpp#L123

Added line #L123 was not covered by tests

void
ClusterCommunicationService::doRead(boost::asio::yield_context yield)
{
otherNodesData_.clear();

auto expectedResult = backend_->fetchClioNodesData(yield);
if (!expectedResult.has_value()) {
LOG(log_.error()) << "Failed to fetch nodes data";
isHealthy_ = false;
return;
}

// Create a new vector here to not have partially parsed data in otherNodesData_
std::vector<ClioNode> otherNodesData;
for (auto const& [uuid, nodeDataStr] : expectedResult.value()) {
if (uuid == *selfData_.uuid) {
continue;
}

boost::system::error_code errorCode;
auto const json = boost::json::parse(nodeDataStr, errorCode);
if (errorCode.failed()) {
LOG(log_.error()) << "Error parsing json from DB: " << nodeDataStr;
isHealthy_ = false;
return;
}

auto expectedNodeData = boost::json::try_value_to<ClioNode>(json);
if (expectedNodeData.has_error()) {
LOG(log_.error()) << "Error converting json to ClioNode: " << json;
isHealthy_ = false;
return;
}
*expectedNodeData->uuid = uuid;
otherNodesData.push_back(std::move(expectedNodeData).value());
}
otherNodesData_ = std::move(otherNodesData);
nodesInClusterMetric_.set(otherNodesData_.size() + 1);
isHealthy_ = true;
}

void
ClusterCommunicationService::doWrite()
{
selfData_.updateTime = std::chrono::system_clock::now();
boost::json::value jsonValue{};
boost::json::value_from(selfData_, jsonValue);
backend_->writeNodeMessage(*selfData_.uuid, boost::json::serialize(jsonValue.as_object()));
}

} // namespace cluster
Loading
Loading