Skip to content

Commit

Permalink
dedicated kv ioctx
Browse files Browse the repository at this point in the history
Signed-off-by: Ruiyang Wang <[email protected]>
  • Loading branch information
rynewang committed Sep 18, 2024
1 parent 5ca9c66 commit bcf81c8
Show file tree
Hide file tree
Showing 11 changed files with 130 additions and 141 deletions.
46 changes: 46 additions & 0 deletions src/ray/common/asio/asio_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

#include <boost/asio.hpp>
#include <chrono>
#include <thread>

#include "ray/common/asio/instrumented_io_context.h"
#include "ray/util/util.h"

template <typename Duration>
std::shared_ptr<boost::asio::deadline_timer> execute_after(
Expand All @@ -37,3 +39,47 @@ std::shared_ptr<boost::asio::deadline_timer> execute_after(

return timer;
}

/**
* A class that manages an instrumented_io_context and a std::thread.
* The constructor takes a thread name and starts the thread.
* The destructor stops the io_service and joins the thread.
*/
class InstrumentedIoContextWithThread {
public:
/**
* Constructor.
* @param thread_name The name of the thread.
*/
explicit InstrumentedIoContextWithThread(const std::string &thread_name)
: io_service_(), work_(io_service_) {
io_thread_ = std::thread([this, thread_name] {
SetThreadName(thread_name);
io_service_.run();
});
}

~InstrumentedIoContextWithThread() { Stop(); }

// Non-movable and non-copyable.
InstrumentedIoContextWithThread(const InstrumentedIoContextWithThread &) = delete;
InstrumentedIoContextWithThread &operator=(const InstrumentedIoContextWithThread &) =
delete;
InstrumentedIoContextWithThread(InstrumentedIoContextWithThread &&) = delete;
InstrumentedIoContextWithThread &operator=(InstrumentedIoContextWithThread &&) = delete;

instrumented_io_context &GetIoService() { return io_service_; }

// Idempotent. Once it's stopped you can't restart it.
void Stop() {
io_service_.stop();
if (io_thread_.joinable()) {
io_thread_.join();
}
}

private:
instrumented_io_context io_service_;
boost::asio::io_service::work work_; // to keep io_service_ running
std::thread io_thread_;
};
34 changes: 3 additions & 31 deletions src/ray/gcs/gcs_client/gcs_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <thread>
#include <utility>

#include "ray/common/asio/asio_util.h"
#include "ray/common/ray_config.h"
#include "ray/gcs/gcs_client/accessor.h"
#include "ray/pubsub/subscriber.h"
Expand Down Expand Up @@ -717,38 +718,9 @@ std::unordered_map<std::string, std::string> PythonGetNodeLabels(
node_info.labels().end());
}

/// Creates a singleton thread that runs an io_service.
/// All ConnectToGcsStandalone calls will share this io_service.
class SingletonIoContext {
public:
static SingletonIoContext &Instance() {
static SingletonIoContext instance;
return instance;
}

instrumented_io_context &GetIoService() { return io_service_; }

private:
SingletonIoContext() : work_(io_service_) {
io_thread_ = std::thread([this] {
SetThreadName("singleton_io_context.gcs_client");
io_service_.run();
});
}
~SingletonIoContext() {
io_service_.stop();
if (io_thread_.joinable()) {
io_thread_.join();
}
}

instrumented_io_context io_service_;
boost::asio::io_service::work work_; // to keep io_service_ running
std::thread io_thread_;
};

Status ConnectOnSingletonIoContext(GcsClient &gcs_client, int64_t timeout_ms) {
instrumented_io_context &io_service = SingletonIoContext::Instance().GetIoService();
static InstrumentedIoContextWithThread io_context("gcs_client_io_service");
instrumented_io_context &io_service = io_context.GetIoService();
return gcs_client.Connect(io_service, timeout_ms);
}

Expand Down
75 changes: 42 additions & 33 deletions src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ GcsServer::GcsServer(const ray::gcs::GcsServerConfig &config,
: config_(config),
storage_type_(GetStorageType()),
main_service_(main_service),
pubsub_io_context_("pubsub_io_context"),
kv_io_context_("kv_io_context"),
task_io_context_("task_io_context"),
ray_syncer_io_context_("ray_syncer_io_context"),
rpc_server_(config.grpc_server_name,
config.grpc_server_port,
config.node_ip_address == "127.0.0.1",
Expand All @@ -65,18 +69,20 @@ GcsServer::GcsServer(const ray::gcs::GcsServerConfig &config,
RayConfig::instance().gcs_server_rpc_client_thread_num()),
raylet_client_pool_(
std::make_shared<rpc::NodeManagerClientPool>(client_call_manager_)),
pubsub_periodical_runner_(pubsub_io_service_),
pubsub_periodical_runner_(pubsub_io_context_.GetIoService()),
periodical_runner_(main_service),
is_started_(false),
is_stopped_(false) {
// Init GCS table storage.
RAY_LOG(INFO) << "GCS storage type is " << storage_type_;
switch (storage_type_) {
case StorageType::IN_MEMORY:
gcs_table_storage_ = std::make_shared<InMemoryGcsTableStorage>(main_service_);
gcs_table_storage_ =
std::make_shared<InMemoryGcsTableStorage>(kv_io_context_.GetIoService());
break;
case StorageType::REDIS_PERSIST:
gcs_table_storage_ = std::make_shared<gcs::RedisGcsTableStorage>(GetOrConnectRedis());
gcs_table_storage_ = std::make_shared<gcs::RedisGcsTableStorage>(
GetOrConnectRedis(kv_io_context_.GetIoService()));
break;
default:
RAY_LOG(FATAL) << "Unexpected storage type: " << storage_type_;
Expand Down Expand Up @@ -264,14 +270,11 @@ void GcsServer::DoStart(const GcsInitData &gcs_init_data) {
void GcsServer::Stop() {
if (!is_stopped_) {
RAY_LOG(INFO) << "Stopping GCS server.";
ray_syncer_io_context_.stop();
ray_syncer_thread_->join();
ray_syncer_.reset();

gcs_task_manager_->Stop();

pubsub_handler_->Stop();
pubsub_handler_.reset();
ray_syncer_io_context_.Stop();
task_io_context_.Stop();
kv_io_context_.Stop();
pubsub_io_context_.Stop();

// Shutdown the rpc server
rpc_server_.Shutdown();
Expand Down Expand Up @@ -531,16 +534,12 @@ GcsServer::StorageType GcsServer::GetStorageType() const {
}

void GcsServer::InitRaySyncer(const GcsInitData &gcs_init_data) {
ray_syncer_ =
std::make_unique<syncer::RaySyncer>(ray_syncer_io_context_, kGCSNodeID.Binary());
ray_syncer_ = std::make_unique<syncer::RaySyncer>(ray_syncer_io_context_.GetIoService(),
kGCSNodeID.Binary());
ray_syncer_->Register(
syncer::MessageType::RESOURCE_VIEW, nullptr, gcs_resource_manager_.get());
ray_syncer_->Register(
syncer::MessageType::COMMANDS, nullptr, gcs_resource_manager_.get());
ray_syncer_thread_ = std::make_unique<std::thread>([this]() {
boost::asio::io_service::work work(ray_syncer_io_context_);
ray_syncer_io_context_.run();
});
ray_syncer_service_ = std::make_unique<syncer::RaySyncerService>(*ray_syncer_);
rpc_server_.RegisterService(*ray_syncer_service_);
}
Expand All @@ -563,13 +562,13 @@ void GcsServer::InitKVManager() {
std::unique_ptr<InternalKVInterface> instance;
switch (storage_type_) {
case (StorageType::REDIS_PERSIST):
instance = std::make_unique<StoreClientInternalKV>(
std::make_unique<RedisStoreClient>(GetOrConnectRedis()));
instance = std::make_unique<StoreClientInternalKV>(std::make_unique<RedisStoreClient>(
GetOrConnectRedis(kv_io_context_.GetIoService())));
break;
case (StorageType::IN_MEMORY):
instance =
std::make_unique<StoreClientInternalKV>(std::make_unique<ObservableStoreClient>(
std::make_unique<InMemoryStoreClient>(main_service_)));
std::make_unique<InMemoryStoreClient>(kv_io_context_.GetIoService())));
break;
default:
RAY_LOG(FATAL) << "Unexpected storage type! " << storage_type_;
Expand All @@ -580,16 +579,17 @@ void GcsServer::InitKVManager() {

void GcsServer::InitKVService() {
RAY_CHECK(kv_manager_);
kv_service_ = std::make_unique<rpc::InternalKVGrpcService>(main_service_, *kv_manager_);
kv_service_ = std::make_unique<rpc::InternalKVGrpcService>(
kv_io_context_.GetIoService(), *kv_manager_);
// Register service.
rpc_server_.RegisterService(*kv_service_, false /* token_auth */);
}

void GcsServer::InitPubSubHandler() {
pubsub_handler_ =
std::make_unique<InternalPubSubHandler>(pubsub_io_service_, gcs_publisher_);
pubsub_service_ = std::make_unique<rpc::InternalPubSubGrpcService>(pubsub_io_service_,
*pubsub_handler_);
pubsub_handler_ = std::make_unique<InternalPubSubHandler>(
pubsub_io_context_.GetIoService(), gcs_publisher_);
pubsub_service_ = std::make_unique<rpc::InternalPubSubGrpcService>(
pubsub_io_context_.GetIoService(), *pubsub_handler_);
// Register service.
rpc_server_.RegisterService(*pubsub_service_);
}
Expand Down Expand Up @@ -683,10 +683,10 @@ void GcsServer::InitGcsAutoscalerStateManager(const GcsInitData &gcs_init_data)
}

void GcsServer::InitGcsTaskManager() {
gcs_task_manager_ = std::make_unique<GcsTaskManager>();
gcs_task_manager_ = std::make_unique<GcsTaskManager>(task_io_context_.GetIoService());
// Register service.
task_info_service_.reset(new rpc::TaskInfoGrpcService(gcs_task_manager_->GetIoContext(),
*gcs_task_manager_));
task_info_service_.reset(
new rpc::TaskInfoGrpcService(task_io_context_.GetIoService(), *gcs_task_manager_));
rpc_server_.RegisterService(*task_info_service_);
}

Expand Down Expand Up @@ -819,15 +819,16 @@ std::string GcsServer::GetDebugState() const {
return stream.str();
}

std::shared_ptr<RedisClient> GcsServer::GetOrConnectRedis() {
std::shared_ptr<RedisClient> GcsServer::GetOrConnectRedis(
instrumented_io_context &io_service) {
if (redis_client_ == nullptr) {
redis_client_ = std::make_shared<RedisClient>(GetRedisClientOptions());
auto status = redis_client_->Connect(main_service_);
auto status = redis_client_->Connect(io_service);
RAY_CHECK(status.ok()) << "Failed to init redis gcs client as " << status;

// Init redis failure detector.
gcs_redis_failure_detector_ =
std::make_shared<GcsRedisFailureDetector>(main_service_, redis_client_, []() {
std::make_shared<GcsRedisFailureDetector>(io_service, redis_client_, []() {
RAY_LOG(FATAL) << "Redis connection failed. Shutdown GCS.";
});
gcs_redis_failure_detector_->Start();
Expand All @@ -840,9 +841,17 @@ void GcsServer::PrintAsioStats() {
const auto event_stats_print_interval_ms =
RayConfig::instance().event_stats_print_interval_ms();
if (event_stats_print_interval_ms != -1 && RayConfig::instance().event_stats()) {
RAY_LOG(INFO) << "Event stats:\n\n" << main_service_.stats().StatsString() << "\n\n";
RAY_LOG(INFO) << "GcsTaskManager Event stats:\n\n"
<< gcs_task_manager_->GetIoContext().stats().StatsString() << "\n\n";
RAY_LOG(INFO) << "main_service_ Event stats:\n\n"
<< main_service_.stats().StatsString() << "\n\n";
RAY_LOG(INFO) << "pubsub_io_context_ Event stats:\n\n"
<< pubsub_io_context_.GetIoService().stats().StatsString() << "\n\n";
RAY_LOG(INFO) << "kv_io_context_ Event stats:\n\n"
<< kv_io_context_.GetIoService().stats().StatsString() << "\n\n";
RAY_LOG(INFO) << "task_io_context_ Event stats:\n\n"
<< task_io_context_.GetIoService().stats().StatsString() << "\n\n";
RAY_LOG(INFO) << "ray_syncer_io_context_ Event stats:\n\n"
<< ray_syncer_io_context_.GetIoService().stats().StatsString()
<< "\n\n";
}
}

Expand Down
13 changes: 9 additions & 4 deletions src/ray/gcs/gcs_server/gcs_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include "ray/common/asio/asio_util.h"
#include "ray/common/asio/instrumented_io_context.h"
#include "ray/common/ray_syncer/ray_syncer.h"
#include "ray/common/runtime_env_manager.h"
Expand Down Expand Up @@ -201,7 +202,7 @@ class GcsServer {
void PrintAsioStats();

/// Get or connect to a redis server
std::shared_ptr<RedisClient> GetOrConnectRedis();
std::shared_ptr<RedisClient> GetOrConnectRedis(instrumented_io_context &io_service);

void TryGlobalGC();

Expand All @@ -212,7 +213,13 @@ class GcsServer {
/// The main io service to drive event posted from grpc threads.
instrumented_io_context &main_service_;
/// The io service used by Pubsub, for isolation from other workload.
instrumented_io_context pubsub_io_service_;
InstrumentedIoContextWithThread pubsub_io_context_;
// The io service used by internal KV service, table storage and the StoreClient.
InstrumentedIoContextWithThread kv_io_context_;
// The io service used by task manager.
InstrumentedIoContextWithThread task_io_context_;
// The io service used by ray syncer.
InstrumentedIoContextWithThread ray_syncer_io_context_;
/// The grpc server
rpc::GrpcServer rpc_server_;
/// The `ClientCallManager` object that is shared by all `NodeManagerWorkerClient`s.
Expand Down Expand Up @@ -254,8 +261,6 @@ class GcsServer {
/// Ray Syncer related fields.
std::unique_ptr<syncer::RaySyncer> ray_syncer_;
std::unique_ptr<syncer::RaySyncerService> ray_syncer_service_;
std::unique_ptr<std::thread> ray_syncer_thread_;
instrumented_io_context ray_syncer_io_context_;

/// The node id of GCS.
NodeID gcs_node_id_;
Expand Down
7 changes: 0 additions & 7 deletions src/ray/gcs/gcs_server/gcs_task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,6 @@
namespace ray {
namespace gcs {

void GcsTaskManager::Stop() {
io_service_.stop();
if (io_service_thread_->joinable()) {
io_service_thread_->join();
}
}

std::vector<rpc::TaskEvents> GcsTaskManager::GcsTaskManagerStorage::GetTaskEvents()
const {
std::vector<rpc::TaskEvents> ret;
Expand Down
Loading

0 comments on commit bcf81c8

Please sign in to comment.