From bcf81c85000c9739123338bc46607163ea9e7847 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Wed, 18 Sep 2024 13:53:21 -0700 Subject: [PATCH 01/14] dedicated kv ioctx Signed-off-by: Ruiyang Wang --- src/ray/common/asio/asio_util.h | 46 ++++++++++++ src/ray/gcs/gcs_client/gcs_client.cc | 34 +-------- src/ray/gcs/gcs_server/gcs_server.cc | 75 +++++++++++-------- src/ray/gcs/gcs_server/gcs_server.h | 13 +++- src/ray/gcs/gcs_server/gcs_task_manager.cc | 7 -- src/ray/gcs/gcs_server/gcs_task_manager.h | 31 ++------ src/ray/gcs/gcs_server/pubsub_handler.cc | 13 ---- src/ray/gcs/gcs_server/pubsub_handler.h | 4 - src/ray/gcs/redis_context.cc | 6 +- .../store_client/observable_store_client.cc | 40 +++++----- .../gcs/store_client/redis_store_client.cc | 2 +- 11 files changed, 130 insertions(+), 141 deletions(-) diff --git a/src/ray/common/asio/asio_util.h b/src/ray/common/asio/asio_util.h index 232e397e5ce7..6dc8bbe8cd8b 100644 --- a/src/ray/common/asio/asio_util.h +++ b/src/ray/common/asio/asio_util.h @@ -16,8 +16,10 @@ #include #include +#include #include "ray/common/asio/instrumented_io_context.h" +#include "ray/util/util.h" template std::shared_ptr execute_after( @@ -37,3 +39,47 @@ std::shared_ptr execute_after( return timer; } + +/** + * A class that manages an instrumented_io_context and a std::thread. + * The constructor takes a thread name and starts the thread. + * The destructor stops the io_service and joins the thread. + */ +class InstrumentedIoContextWithThread { + public: + /** + * Constructor. + * @param thread_name The name of the thread. + */ + explicit InstrumentedIoContextWithThread(const std::string &thread_name) + : io_service_(), work_(io_service_) { + io_thread_ = std::thread([this, thread_name] { + SetThreadName(thread_name); + io_service_.run(); + }); + } + + ~InstrumentedIoContextWithThread() { Stop(); } + + // Non-movable and non-copyable. + InstrumentedIoContextWithThread(const InstrumentedIoContextWithThread &) = delete; + InstrumentedIoContextWithThread &operator=(const InstrumentedIoContextWithThread &) = + delete; + InstrumentedIoContextWithThread(InstrumentedIoContextWithThread &&) = delete; + InstrumentedIoContextWithThread &operator=(InstrumentedIoContextWithThread &&) = delete; + + instrumented_io_context &GetIoService() { return io_service_; } + + // Idempotent. Once it's stopped you can't restart it. + void Stop() { + io_service_.stop(); + if (io_thread_.joinable()) { + io_thread_.join(); + } + } + + private: + instrumented_io_context io_service_; + boost::asio::io_service::work work_; // to keep io_service_ running + std::thread io_thread_; +}; diff --git a/src/ray/gcs/gcs_client/gcs_client.cc b/src/ray/gcs/gcs_client/gcs_client.cc index cb734bc1dca1..8825dd0ab73f 100644 --- a/src/ray/gcs/gcs_client/gcs_client.cc +++ b/src/ray/gcs/gcs_client/gcs_client.cc @@ -18,6 +18,7 @@ #include #include +#include "ray/common/asio/asio_util.h" #include "ray/common/ray_config.h" #include "ray/gcs/gcs_client/accessor.h" #include "ray/pubsub/subscriber.h" @@ -717,38 +718,9 @@ std::unordered_map PythonGetNodeLabels( node_info.labels().end()); } -/// Creates a singleton thread that runs an io_service. -/// All ConnectToGcsStandalone calls will share this io_service. -class SingletonIoContext { - public: - static SingletonIoContext &Instance() { - static SingletonIoContext instance; - return instance; - } - - instrumented_io_context &GetIoService() { return io_service_; } - - private: - SingletonIoContext() : work_(io_service_) { - io_thread_ = std::thread([this] { - SetThreadName("singleton_io_context.gcs_client"); - io_service_.run(); - }); - } - ~SingletonIoContext() { - io_service_.stop(); - if (io_thread_.joinable()) { - io_thread_.join(); - } - } - - instrumented_io_context io_service_; - boost::asio::io_service::work work_; // to keep io_service_ running - std::thread io_thread_; -}; - Status ConnectOnSingletonIoContext(GcsClient &gcs_client, int64_t timeout_ms) { - instrumented_io_context &io_service = SingletonIoContext::Instance().GetIoService(); + static InstrumentedIoContextWithThread io_context("gcs_client_io_service"); + instrumented_io_context &io_service = io_context.GetIoService(); return gcs_client.Connect(io_service, timeout_ms); } diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index 88708b005e6a..f9601fcae7f8 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -54,6 +54,10 @@ GcsServer::GcsServer(const ray::gcs::GcsServerConfig &config, : config_(config), storage_type_(GetStorageType()), main_service_(main_service), + pubsub_io_context_("pubsub_io_context"), + kv_io_context_("kv_io_context"), + task_io_context_("task_io_context"), + ray_syncer_io_context_("ray_syncer_io_context"), rpc_server_(config.grpc_server_name, config.grpc_server_port, config.node_ip_address == "127.0.0.1", @@ -65,7 +69,7 @@ GcsServer::GcsServer(const ray::gcs::GcsServerConfig &config, RayConfig::instance().gcs_server_rpc_client_thread_num()), raylet_client_pool_( std::make_shared(client_call_manager_)), - pubsub_periodical_runner_(pubsub_io_service_), + pubsub_periodical_runner_(pubsub_io_context_.GetIoService()), periodical_runner_(main_service), is_started_(false), is_stopped_(false) { @@ -73,10 +77,12 @@ GcsServer::GcsServer(const ray::gcs::GcsServerConfig &config, RAY_LOG(INFO) << "GCS storage type is " << storage_type_; switch (storage_type_) { case StorageType::IN_MEMORY: - gcs_table_storage_ = std::make_shared(main_service_); + gcs_table_storage_ = + std::make_shared(kv_io_context_.GetIoService()); break; case StorageType::REDIS_PERSIST: - gcs_table_storage_ = std::make_shared(GetOrConnectRedis()); + gcs_table_storage_ = std::make_shared( + GetOrConnectRedis(kv_io_context_.GetIoService())); break; default: RAY_LOG(FATAL) << "Unexpected storage type: " << storage_type_; @@ -264,14 +270,11 @@ void GcsServer::DoStart(const GcsInitData &gcs_init_data) { void GcsServer::Stop() { if (!is_stopped_) { RAY_LOG(INFO) << "Stopping GCS server."; - ray_syncer_io_context_.stop(); - ray_syncer_thread_->join(); - ray_syncer_.reset(); - gcs_task_manager_->Stop(); - - pubsub_handler_->Stop(); - pubsub_handler_.reset(); + ray_syncer_io_context_.Stop(); + task_io_context_.Stop(); + kv_io_context_.Stop(); + pubsub_io_context_.Stop(); // Shutdown the rpc server rpc_server_.Shutdown(); @@ -531,16 +534,12 @@ GcsServer::StorageType GcsServer::GetStorageType() const { } void GcsServer::InitRaySyncer(const GcsInitData &gcs_init_data) { - ray_syncer_ = - std::make_unique(ray_syncer_io_context_, kGCSNodeID.Binary()); + ray_syncer_ = std::make_unique(ray_syncer_io_context_.GetIoService(), + kGCSNodeID.Binary()); ray_syncer_->Register( syncer::MessageType::RESOURCE_VIEW, nullptr, gcs_resource_manager_.get()); ray_syncer_->Register( syncer::MessageType::COMMANDS, nullptr, gcs_resource_manager_.get()); - ray_syncer_thread_ = std::make_unique([this]() { - boost::asio::io_service::work work(ray_syncer_io_context_); - ray_syncer_io_context_.run(); - }); ray_syncer_service_ = std::make_unique(*ray_syncer_); rpc_server_.RegisterService(*ray_syncer_service_); } @@ -563,13 +562,13 @@ void GcsServer::InitKVManager() { std::unique_ptr instance; switch (storage_type_) { case (StorageType::REDIS_PERSIST): - instance = std::make_unique( - std::make_unique(GetOrConnectRedis())); + instance = std::make_unique(std::make_unique( + GetOrConnectRedis(kv_io_context_.GetIoService()))); break; case (StorageType::IN_MEMORY): instance = std::make_unique(std::make_unique( - std::make_unique(main_service_))); + std::make_unique(kv_io_context_.GetIoService()))); break; default: RAY_LOG(FATAL) << "Unexpected storage type! " << storage_type_; @@ -580,16 +579,17 @@ void GcsServer::InitKVManager() { void GcsServer::InitKVService() { RAY_CHECK(kv_manager_); - kv_service_ = std::make_unique(main_service_, *kv_manager_); + kv_service_ = std::make_unique( + kv_io_context_.GetIoService(), *kv_manager_); // Register service. rpc_server_.RegisterService(*kv_service_, false /* token_auth */); } void GcsServer::InitPubSubHandler() { - pubsub_handler_ = - std::make_unique(pubsub_io_service_, gcs_publisher_); - pubsub_service_ = std::make_unique(pubsub_io_service_, - *pubsub_handler_); + pubsub_handler_ = std::make_unique( + pubsub_io_context_.GetIoService(), gcs_publisher_); + pubsub_service_ = std::make_unique( + pubsub_io_context_.GetIoService(), *pubsub_handler_); // Register service. rpc_server_.RegisterService(*pubsub_service_); } @@ -683,10 +683,10 @@ void GcsServer::InitGcsAutoscalerStateManager(const GcsInitData &gcs_init_data) } void GcsServer::InitGcsTaskManager() { - gcs_task_manager_ = std::make_unique(); + gcs_task_manager_ = std::make_unique(task_io_context_.GetIoService()); // Register service. - task_info_service_.reset(new rpc::TaskInfoGrpcService(gcs_task_manager_->GetIoContext(), - *gcs_task_manager_)); + task_info_service_.reset( + new rpc::TaskInfoGrpcService(task_io_context_.GetIoService(), *gcs_task_manager_)); rpc_server_.RegisterService(*task_info_service_); } @@ -819,15 +819,16 @@ std::string GcsServer::GetDebugState() const { return stream.str(); } -std::shared_ptr GcsServer::GetOrConnectRedis() { +std::shared_ptr GcsServer::GetOrConnectRedis( + instrumented_io_context &io_service) { if (redis_client_ == nullptr) { redis_client_ = std::make_shared(GetRedisClientOptions()); - auto status = redis_client_->Connect(main_service_); + auto status = redis_client_->Connect(io_service); RAY_CHECK(status.ok()) << "Failed to init redis gcs client as " << status; // Init redis failure detector. gcs_redis_failure_detector_ = - std::make_shared(main_service_, redis_client_, []() { + std::make_shared(io_service, redis_client_, []() { RAY_LOG(FATAL) << "Redis connection failed. Shutdown GCS."; }); gcs_redis_failure_detector_->Start(); @@ -840,9 +841,17 @@ void GcsServer::PrintAsioStats() { const auto event_stats_print_interval_ms = RayConfig::instance().event_stats_print_interval_ms(); if (event_stats_print_interval_ms != -1 && RayConfig::instance().event_stats()) { - RAY_LOG(INFO) << "Event stats:\n\n" << main_service_.stats().StatsString() << "\n\n"; - RAY_LOG(INFO) << "GcsTaskManager Event stats:\n\n" - << gcs_task_manager_->GetIoContext().stats().StatsString() << "\n\n"; + RAY_LOG(INFO) << "main_service_ Event stats:\n\n" + << main_service_.stats().StatsString() << "\n\n"; + RAY_LOG(INFO) << "pubsub_io_context_ Event stats:\n\n" + << pubsub_io_context_.GetIoService().stats().StatsString() << "\n\n"; + RAY_LOG(INFO) << "kv_io_context_ Event stats:\n\n" + << kv_io_context_.GetIoService().stats().StatsString() << "\n\n"; + RAY_LOG(INFO) << "task_io_context_ Event stats:\n\n" + << task_io_context_.GetIoService().stats().StatsString() << "\n\n"; + RAY_LOG(INFO) << "ray_syncer_io_context_ Event stats:\n\n" + << ray_syncer_io_context_.GetIoService().stats().StatsString() + << "\n\n"; } } diff --git a/src/ray/gcs/gcs_server/gcs_server.h b/src/ray/gcs/gcs_server/gcs_server.h index 99296ee6d84b..2cf994cddb2f 100644 --- a/src/ray/gcs/gcs_server/gcs_server.h +++ b/src/ray/gcs/gcs_server/gcs_server.h @@ -14,6 +14,7 @@ #pragma once +#include "ray/common/asio/asio_util.h" #include "ray/common/asio/instrumented_io_context.h" #include "ray/common/ray_syncer/ray_syncer.h" #include "ray/common/runtime_env_manager.h" @@ -201,7 +202,7 @@ class GcsServer { void PrintAsioStats(); /// Get or connect to a redis server - std::shared_ptr GetOrConnectRedis(); + std::shared_ptr GetOrConnectRedis(instrumented_io_context &io_service); void TryGlobalGC(); @@ -212,7 +213,13 @@ class GcsServer { /// The main io service to drive event posted from grpc threads. instrumented_io_context &main_service_; /// The io service used by Pubsub, for isolation from other workload. - instrumented_io_context pubsub_io_service_; + InstrumentedIoContextWithThread pubsub_io_context_; + // The io service used by internal KV service, table storage and the StoreClient. + InstrumentedIoContextWithThread kv_io_context_; + // The io service used by task manager. + InstrumentedIoContextWithThread task_io_context_; + // The io service used by ray syncer. + InstrumentedIoContextWithThread ray_syncer_io_context_; /// The grpc server rpc::GrpcServer rpc_server_; /// The `ClientCallManager` object that is shared by all `NodeManagerWorkerClient`s. @@ -254,8 +261,6 @@ class GcsServer { /// Ray Syncer related fields. std::unique_ptr ray_syncer_; std::unique_ptr ray_syncer_service_; - std::unique_ptr ray_syncer_thread_; - instrumented_io_context ray_syncer_io_context_; /// The node id of GCS. NodeID gcs_node_id_; diff --git a/src/ray/gcs/gcs_server/gcs_task_manager.cc b/src/ray/gcs/gcs_server/gcs_task_manager.cc index 33a8bb6ded86..38b631a78545 100644 --- a/src/ray/gcs/gcs_server/gcs_task_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_task_manager.cc @@ -21,13 +21,6 @@ namespace ray { namespace gcs { -void GcsTaskManager::Stop() { - io_service_.stop(); - if (io_service_thread_->joinable()) { - io_service_thread_->join(); - } -} - std::vector GcsTaskManager::GcsTaskManagerStorage::GetTaskEvents() const { std::vector ret; diff --git a/src/ray/gcs/gcs_server/gcs_task_manager.h b/src/ray/gcs/gcs_server/gcs_task_manager.h index 9c8e8c215d8d..1e87baec43b3 100644 --- a/src/ray/gcs/gcs_server/gcs_task_manager.h +++ b/src/ray/gcs/gcs_server/gcs_task_manager.h @@ -86,18 +86,13 @@ class FinishedTaskActorTaskGcPolicy : public TaskEventsGcPolicyInterface { class GcsTaskManager : public rpc::TaskInfoHandler { public: /// Create a GcsTaskManager. - GcsTaskManager() - : stats_counter_(), + explicit GcsTaskManager(instrumented_io_context &io_service) + : io_service_(io_service), + stats_counter_(), task_event_storage_(std::make_unique( RayConfig::instance().task_events_max_num_task_in_gcs(), stats_counter_, std::make_unique())), - io_service_thread_(std::make_unique([this] { - SetThreadName("task_events"); - // Keep io_service_ alive. - boost::asio::io_service::work io_service_work_(io_service_); - io_service_.run(); - })), periodical_runner_(io_service_) { periodical_runner_.RunFnPeriodically([this] { task_event_storage_->GcJobSummary(); }, 5 * 1000, @@ -122,12 +117,6 @@ class GcsTaskManager : public rpc::TaskInfoHandler { rpc::GetTaskEventsReply *reply, rpc::SendReplyCallback send_reply_callback) override; - /// Stops the event loop and the thread of the task event handler. - /// - /// After this is called, no more requests will be handled. - /// This function returns when the io thread is joined. - void Stop(); - /// Handler to be called when a job finishes. This marks all non-terminated tasks /// of the job as failed. /// @@ -143,11 +132,6 @@ class GcsTaskManager : public rpc::TaskInfoHandler { void OnWorkerDead(const WorkerID &worker_id, const std::shared_ptr &worker_failure_data); - /// Returns the io_service. - /// - /// \return Reference to its io_service. - instrumented_io_context &GetIoContext() { return io_service_; } - /// Return string of debug state. /// /// \return Debug string @@ -514,6 +498,9 @@ class GcsTaskManager : public rpc::TaskInfoHandler { /// Test only size_t GetNumTaskEventsStored() { return stats_counter_.Get(kNumTaskEventsStored); } + /// Dedicated IO service separated from the main service. + instrumented_io_context &io_service_; + // Mutex guarding the usage stats client absl::Mutex mutex_; @@ -526,12 +513,6 @@ class GcsTaskManager : public rpc::TaskInfoHandler { // the io_service_thread_. Access to it is *not* thread safe. std::unique_ptr task_event_storage_; - /// Its own separate IO service separated from the main service. - instrumented_io_context io_service_; - - /// Its own IO thread from the main thread. - std::unique_ptr io_service_thread_; - /// The runner to run function periodically. PeriodicalRunner periodical_runner_; diff --git a/src/ray/gcs/gcs_server/pubsub_handler.cc b/src/ray/gcs/gcs_server/pubsub_handler.cc index a110ce49b956..d926b051102c 100644 --- a/src/ray/gcs/gcs_server/pubsub_handler.cc +++ b/src/ray/gcs/gcs_server/pubsub_handler.cc @@ -22,12 +22,6 @@ InternalPubSubHandler::InternalPubSubHandler( const std::shared_ptr &gcs_publisher) : io_service_(io_service), gcs_publisher_(gcs_publisher) { RAY_CHECK(gcs_publisher_); - io_service_thread_ = std::make_unique([this] { - SetThreadName("pubsub"); - // Keep io_service_ alive. - boost::asio::io_service::work io_service_work_(io_service_); - io_service_.run(); - }); } void InternalPubSubHandler::HandleGcsPublish(rpc::GcsPublishRequest request, @@ -129,12 +123,5 @@ void InternalPubSubHandler::RemoveSubscriberFrom(const std::string &sender_id) { sender_to_subscribers_.erase(iter); } -void InternalPubSubHandler::Stop() { - io_service_.stop(); - if (io_service_thread_->joinable()) { - io_service_thread_->join(); - } -} - } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/gcs_server/pubsub_handler.h b/src/ray/gcs/gcs_server/pubsub_handler.h index 71dded16967a..a92209a6954c 100644 --- a/src/ray/gcs/gcs_server/pubsub_handler.h +++ b/src/ray/gcs/gcs_server/pubsub_handler.h @@ -47,9 +47,6 @@ class InternalPubSubHandler : public rpc::InternalPubSubHandler { rpc::GcsUnregisterSubscriberReply *reply, rpc::SendReplyCallback send_reply_callback) final; - // Stops the event loop and the thread of the pubsub handler. - void Stop(); - std::string DebugString() const; void RemoveSubscriberFrom(const std::string &sender_id); @@ -57,7 +54,6 @@ class InternalPubSubHandler : public rpc::InternalPubSubHandler { private: /// Not owning the io service, to allow sharing it with pubsub::Publisher. instrumented_io_context &io_service_; - std::unique_ptr io_service_thread_; std::shared_ptr gcs_publisher_; absl::flat_hash_map> sender_to_subscribers_; }; diff --git a/src/ray/gcs/redis_context.cc b/src/ray/gcs/redis_context.cc index efe775eb6bad..6de20bfe34af 100644 --- a/src/ray/gcs/redis_context.cc +++ b/src/ray/gcs/redis_context.cc @@ -199,8 +199,8 @@ void RedisRequestContext::RedisResponseFn(struct redisAsyncContext *async_contex }, "RedisRequestContext.Callback"); auto end_time = absl::Now(); - ray::stats::GcsLatency().Record((end_time - request_cxt->start_time_) / - absl::Milliseconds(1)); + ray::stats::GcsLatency().Record( + absl::ToDoubleMilliseconds(end_time - request_cxt->start_time_)); delete request_cxt; } } @@ -215,7 +215,7 @@ void RedisRequestContext::Run() { --pending_retries_; Status status = redis_context_->RedisAsyncCommandArgv( - *(RedisResponseFn), this, argv_.size(), argv_.data(), argc_.data()); + RedisResponseFn, this, argv_.size(), argv_.data(), argc_.data()); if (!status.ok()) { RedisResponseFn(redis_context_->GetRawRedisAsyncContext(), nullptr, this); diff --git a/src/ray/gcs/store_client/observable_store_client.cc b/src/ray/gcs/store_client/observable_store_client.cc index c188e27e57b7..147c9191a824 100644 --- a/src/ray/gcs/store_client/observable_store_client.cc +++ b/src/ray/gcs/store_client/observable_store_client.cc @@ -29,19 +29,19 @@ Status ObservableStoreClient::AsyncPut(const std::string &table_name, std::function callback) { auto start = absl::GetCurrentTimeNanos(); STATS_gcs_storage_operation_count.Record(1, "Put"); - return delegate_->AsyncPut(table_name, - key, - data, - overwrite, - [start, callback = std::move(callback)](auto result) { - auto end = absl::GetCurrentTimeNanos(); - STATS_gcs_storage_operation_latency_ms.Record( - absl::Nanoseconds(end - start) / absl::Milliseconds(1), - "Put"); - if (callback) { - callback(std::move(result)); - } - }); + return delegate_->AsyncPut( + table_name, + key, + data, + overwrite, + [start, callback = std::move(callback)](auto result) { + auto end = absl::GetCurrentTimeNanos(); + STATS_gcs_storage_operation_latency_ms.Record( + absl::ToDoubleMilliseconds(absl::Nanoseconds(end - start)), "Put"); + if (callback) { + callback(std::move(result)); + } + }); } Status ObservableStoreClient::AsyncGet( @@ -54,7 +54,7 @@ Status ObservableStoreClient::AsyncGet( table_name, key, [start, callback](auto status, auto result) { auto end = absl::GetCurrentTimeNanos(); STATS_gcs_storage_operation_latency_ms.Record( - absl::Nanoseconds(end - start) / absl::Milliseconds(1), "Get"); + absl::ToDoubleMilliseconds(absl::Nanoseconds(end - start)), "Get"); if (callback) { callback(status, std::move(result)); } @@ -69,7 +69,7 @@ Status ObservableStoreClient::AsyncGetAll( return delegate_->AsyncGetAll(table_name, [start, callback](auto result) { auto end = absl::GetCurrentTimeNanos(); STATS_gcs_storage_operation_latency_ms.Record( - absl::Nanoseconds(end - start) / absl::Milliseconds(1), "GetAll"); + absl::ToDoubleMilliseconds(absl::Nanoseconds(end - start)), "GetAll"); if (callback) { callback(std::move(result)); } @@ -84,7 +84,7 @@ Status ObservableStoreClient::AsyncMultiGet( return delegate_->AsyncMultiGet(table_name, keys, [start, callback](auto result) { auto end = absl::GetCurrentTimeNanos(); STATS_gcs_storage_operation_latency_ms.Record( - absl::Nanoseconds(end - start) / absl::Milliseconds(1), "MultiGet"); + absl::ToDoubleMilliseconds(absl::Nanoseconds(end - start)), "MultiGet"); if (callback) { callback(std::move(result)); } @@ -100,7 +100,7 @@ Status ObservableStoreClient::AsyncDelete(const std::string &table_name, table_name, key, [start, callback = std::move(callback)](auto result) { auto end = absl::GetCurrentTimeNanos(); STATS_gcs_storage_operation_latency_ms.Record( - absl::Nanoseconds(end - start) / absl::Milliseconds(1), "Delete"); + absl::ToDoubleMilliseconds(absl::Nanoseconds(end - start)), "Delete"); if (callback) { callback(std::move(result)); } @@ -116,7 +116,7 @@ Status ObservableStoreClient::AsyncBatchDelete(const std::string &table_name, table_name, keys, [start, callback = std::move(callback)](auto result) { auto end = absl::GetCurrentTimeNanos(); STATS_gcs_storage_operation_latency_ms.Record( - absl::Nanoseconds(end - start) / absl::Milliseconds(1), "BatchDelete"); + absl::ToDoubleMilliseconds(absl::Nanoseconds(end - start)), "BatchDelete"); if (callback) { callback(std::move(result)); } @@ -135,7 +135,7 @@ Status ObservableStoreClient::AsyncGetKeys( table_name, prefix, [start, callback = std::move(callback)](auto result) { auto end = absl::GetCurrentTimeNanos(); STATS_gcs_storage_operation_latency_ms.Record( - absl::Nanoseconds(end - start) / absl::Milliseconds(1), "GetKeys"); + absl::ToDoubleMilliseconds(absl::Nanoseconds(end - start)), "GetKeys"); if (callback) { callback(std::move(result)); } @@ -151,7 +151,7 @@ Status ObservableStoreClient::AsyncExists(const std::string &table_name, table_name, key, [start, callback = std::move(callback)](auto result) { auto end = absl::GetCurrentTimeNanos(); STATS_gcs_storage_operation_latency_ms.Record( - absl::Nanoseconds(end - start) / absl::Milliseconds(1), "Exists"); + absl::ToDoubleMilliseconds(absl::Nanoseconds(end - start)), "Exists"); if (callback) { callback(std::move(result)); } diff --git a/src/ray/gcs/store_client/redis_store_client.cc b/src/ray/gcs/store_client/redis_store_client.cc index 81d1ce292aa7..8a158e136376 100644 --- a/src/ray/gcs/store_client/redis_store_client.cc +++ b/src/ray/gcs/store_client/redis_store_client.cc @@ -19,9 +19,9 @@ #include "absl/strings/match.h" #include "absl/strings/str_cat.h" +#include "ray/common/asio/asio_util.h" #include "ray/gcs/redis_context.h" #include "ray/util/logging.h" - namespace ray { namespace gcs { From c84fd80399c8023a1063898a6ea316e0059acd83 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Wed, 18 Sep 2024 14:30:59 -0700 Subject: [PATCH 02/14] move gcs_table_storage_ back to main service. Signed-off-by: Ruiyang Wang --- src/ray/gcs/gcs_server/gcs_server.cc | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index f9601fcae7f8..08de01b8862a 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -73,16 +73,16 @@ GcsServer::GcsServer(const ray::gcs::GcsServerConfig &config, periodical_runner_(main_service), is_started_(false), is_stopped_(false) { - // Init GCS table storage. + // Init GCS table storage. Note this is on main_service_, not kv_io_context_, to avoid + // congestion on the kv_io_context_. RAY_LOG(INFO) << "GCS storage type is " << storage_type_; switch (storage_type_) { case StorageType::IN_MEMORY: - gcs_table_storage_ = - std::make_shared(kv_io_context_.GetIoService()); + gcs_table_storage_ = std::make_shared(main_service_); break; case StorageType::REDIS_PERSIST: - gcs_table_storage_ = std::make_shared( - GetOrConnectRedis(kv_io_context_.GetIoService())); + gcs_table_storage_ = + std::make_shared(GetOrConnectRedis(main_service_)); break; default: RAY_LOG(FATAL) << "Unexpected storage type: " << storage_type_; From 36fc80862de1ba255b40387fd9d8e6e04e5d3603 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Mon, 23 Sep 2024 16:31:17 -0700 Subject: [PATCH 03/14] fix cpp test Signed-off-by: Ruiyang Wang --- .../gcs/gcs_server/test/gcs_task_manager_test.cc | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/ray/gcs/gcs_server/test/gcs_task_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_task_manager_test.cc index 07d4ee662966..939ae13be6fa 100644 --- a/src/ray/gcs/gcs_server/test/gcs_task_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_task_manager_test.cc @@ -18,6 +18,7 @@ #include "gmock/gmock.h" #include "gtest/gtest.h" +#include "ray/common/asio/asio_util.h" #include "ray/gcs/pb_util.h" #include "ray/gcs/test/gcs_test_util.h" @@ -36,9 +37,15 @@ class GcsTaskManagerTest : public ::testing::Test { )"); } - virtual void SetUp() { task_manager.reset(new GcsTaskManager()); } + virtual void SetUp() { + io_context_ = std::make_unique("GcsTaskManagerTest"); + task_manager = std::make_unique(io_context_->GetIoService()); + } - virtual void TearDown() { task_manager->Stop(); } + virtual void TearDown() { + task_manager.reset(); + io_context_.reset(); + } std::vector GenTaskIDs(size_t num_tasks) { std::vector task_ids; @@ -104,7 +111,7 @@ class GcsTaskManagerTest : public ::testing::Test { request.mutable_data()->CopyFrom(events_data); // Dispatch so that it runs in GcsTaskManager's io service. - task_manager->GetIoContext().dispatch( + io_context_->GetIoService().dispatch( [this, &promise, &request, &reply]() { task_manager->HandleAddTaskEventData( request, @@ -161,7 +168,7 @@ class GcsTaskManagerTest : public ::testing::Test { request.mutable_filters()->set_exclude_driver(exclude_driver); - task_manager->GetIoContext().dispatch( + io_context_->GetIoService().dispatch( [this, &promise, &request, &reply]() { task_manager->HandleGetTaskEvents( request, @@ -275,6 +282,7 @@ class GcsTaskManagerTest : public ::testing::Test { } std::unique_ptr task_manager = nullptr; + std::unique_ptr io_context_ = nullptr; }; class GcsTaskManagerMemoryLimitedTest : public GcsTaskManagerTest { From a87c39d2810c8468c8680bfa9422b43c212384ca Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Tue, 24 Sep 2024 11:40:12 -0700 Subject: [PATCH 04/14] fix atomics now that we have multiple thread reads... Signed-off-by: Ruiyang Wang --- src/ray/gcs/gcs_server/gcs_job_manager.cc | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_job_manager.cc b/src/ray/gcs/gcs_server/gcs_job_manager.cc index b42de3f95533..4be8809b6ea6 100644 --- a/src/ray/gcs/gcs_server/gcs_job_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_job_manager.cc @@ -247,11 +247,14 @@ void GcsJobManager::HandleGetAllJobInfo(rpc::GetAllJobInfoRequest request, // entrypoint script calls ray.init() multiple times). std::unordered_map> job_data_key_to_indices; - // Create a shared counter for the number of jobs processed - std::shared_ptr num_processed_jobs = std::make_shared(0); + // Create a shared counter for the number of jobs processed. + // This is written in internal_kv_'s thread and read in the main thread. + std::shared_ptr> num_processed_jobs = + std::make_shared>(0); // Create a shared boolean flag for the internal KV callback completion - std::shared_ptr kv_callback_done = std::make_shared(false); + std::shared_ptr> kv_callback_done = + std::make_shared>(false); // Function to send the reply once all jobs have been processed and KV callback // completed From 7cd77057e697b83fc6b0d2a74856b0c7492039b6 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Tue, 24 Sep 2024 12:17:50 -0700 Subject: [PATCH 05/14] atomics Signed-off-by: Ruiyang Wang --- src/ray/gcs/gcs_server/gcs_job_manager.cc | 93 +++++++++++++---------- 1 file changed, 51 insertions(+), 42 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_job_manager.cc b/src/ray/gcs/gcs_server/gcs_job_manager.cc index 4be8809b6ea6..ccc7df3ce6ac 100644 --- a/src/ray/gcs/gcs_server/gcs_job_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_job_manager.cc @@ -247,25 +247,6 @@ void GcsJobManager::HandleGetAllJobInfo(rpc::GetAllJobInfoRequest request, // entrypoint script calls ray.init() multiple times). std::unordered_map> job_data_key_to_indices; - // Create a shared counter for the number of jobs processed. - // This is written in internal_kv_'s thread and read in the main thread. - std::shared_ptr> num_processed_jobs = - std::make_shared>(0); - - // Create a shared boolean flag for the internal KV callback completion - std::shared_ptr> kv_callback_done = - std::make_shared>(false); - - // Function to send the reply once all jobs have been processed and KV callback - // completed - auto try_send_reply = - [num_processed_jobs, kv_callback_done, reply, send_reply_callback]() { - if (*num_processed_jobs == reply->job_info_list_size() && *kv_callback_done) { - RAY_LOG(DEBUG) << "Finished getting all job info."; - GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); - } - }; - // Load the job table data into the reply. int i = 0; for (auto &data : result) { @@ -289,28 +270,59 @@ void GcsJobManager::HandleGetAllJobInfo(rpc::GetAllJobInfoRequest request, job_api_data_keys.push_back(job_data_key); job_data_key_to_indices[job_data_key].push_back(i); } + i++; + } + + // Jobs are filtered. Now, optionally populate is_running_tasks and job_info. A + // `asyncio.gather` is needed but we are in C++; so we use atomic counters. - if (!request.skip_is_running_tasks_field()) { - JobID job_id = data.first; - WorkerID worker_id = - WorkerID::FromBinary(data.second.driver_address().worker_id()); + // Atomic counter of pending async tasks before sending the reply. + // Once it reaches total_tasks, the reply is sent. + std::shared_ptr> num_finished_tasks = + std::make_shared>(0); - // If job is not dead, get is_running_tasks from the core worker for the driver. - if (data.second.is_dead()) { + // N tasks for N jobs; and 1 task for the MultiKVGet. If either is skipped the counter + // still increments. + const size_t total_tasks = reply->job_info_list_size() + 1; + + // Those async tasks need to atomically read-and-increment the counter, so this + // callback can't capture the atomic variable directly. Instead, it asks for an + // regular variable argument coming from the read-and-increment caller. + auto try_send_reply = + [reply, send_reply_callback, total_tasks](size_t finished_tasks) { + if (finished_tasks == total_tasks) { + RAY_LOG(DEBUG) << "Finished getting all job info."; + GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); + } + }; + + if (request.skip_is_running_tasks_field()) { + // Skipping RPCs to workers, just mark all job tasks as done. + const size_t job_count = reply->job_info_list_size(); + size_t updated_finished_tasks = + num_finished_tasks->fetch_add(job_count) + job_count; + try_send_reply(updated_finished_tasks); + } else { + for (const auto &data : reply->job_info_list()) { + auto job_id = JobID::FromBinary(data.job_id()); + WorkerID worker_id = WorkerID::FromBinary(data.driver_address().worker_id()); + + // If job is dead, no need to get. + if (data.is_dead()) { reply->mutable_job_info_list(i)->set_is_running_tasks(false); core_worker_clients_.Disconnect(worker_id); - (*num_processed_jobs)++; - try_send_reply(); + size_t updated_finished_tasks = num_finished_tasks->fetch_add(1) + 1; + try_send_reply(updated_finished_tasks); } else { // Get is_running_tasks from the core worker for the driver. - auto client = core_worker_clients_.GetOrConnect(data.second.driver_address()); + auto client = core_worker_clients_.GetOrConnect(data.driver_address()); auto request = std::make_unique(); constexpr int64_t kNumPendingTasksRequestTimeoutMs = 1000; RAY_LOG(DEBUG) << "Send NumPendingTasksRequest to worker " << worker_id << ", timeout " << kNumPendingTasksRequestTimeoutMs << " ms."; client->NumPendingTasks( std::move(request), - [job_id, worker_id, reply, i, num_processed_jobs, try_send_reply]( + [job_id, worker_id, reply, i, num_finished_tasks, try_send_reply]( const Status &status, const rpc::NumPendingTasksReply &num_pending_tasks_reply) { RAY_LOG(DEBUG).WithField(worker_id) @@ -324,25 +336,25 @@ void GcsJobManager::HandleGetAllJobInfo(rpc::GetAllJobInfoRequest request, bool is_running_tasks = num_pending_tasks_reply.num_pending_tasks() > 0; reply->mutable_job_info_list(i)->set_is_running_tasks(is_running_tasks); } - (*num_processed_jobs)++; - try_send_reply(); + size_t updated_finished_tasks = num_finished_tasks->fetch_add(1) + 1; + try_send_reply(updated_finished_tasks); }, kNumPendingTasksRequestTimeoutMs); } - } else { - (*num_processed_jobs)++; - try_send_reply(); } - i++; } - if (!request.skip_submission_job_info_field()) { + if (request.skip_submission_job_info_field()) { + // Skipping MultiKVGet, just mark the counter. + size_t updated_finished_tasks = num_finished_tasks->fetch_add(1) + 1; + try_send_reply(updated_finished_tasks); + } else { // Load the JobInfo for jobs submitted via the Ray Job API. auto kv_multi_get_callback = [reply, send_reply_callback, job_data_key_to_indices, - kv_callback_done, + num_finished_tasks, try_send_reply](std::unordered_map &&result) { for (const auto &data : result) { const std::string &job_data_key = data.first; @@ -365,13 +377,10 @@ void GcsJobManager::HandleGetAllJobInfo(rpc::GetAllJobInfoRequest request, } } } - *kv_callback_done = true; - try_send_reply(); + size_t updated_finished_tasks = num_finished_tasks->fetch_add(1) + 1; + try_send_reply(updated_finished_tasks); }; internal_kv_.MultiGet("job", job_api_data_keys, kv_multi_get_callback); - } else { - *kv_callback_done = true; - try_send_reply(); } }; Status status = gcs_table_storage_->JobTable().GetAll(on_done); From bbf02cd4a3504d06e3b2f730128fafd24825609a Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Tue, 24 Sep 2024 16:06:50 -0700 Subject: [PATCH 06/14] fix Signed-off-by: Ruiyang Wang --- src/ray/gcs/gcs_server/gcs_job_manager.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/ray/gcs/gcs_server/gcs_job_manager.cc b/src/ray/gcs/gcs_server/gcs_job_manager.cc index ccc7df3ce6ac..8146295e60ab 100644 --- a/src/ray/gcs/gcs_server/gcs_job_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_job_manager.cc @@ -303,7 +303,8 @@ void GcsJobManager::HandleGetAllJobInfo(rpc::GetAllJobInfoRequest request, num_finished_tasks->fetch_add(job_count) + job_count; try_send_reply(updated_finished_tasks); } else { - for (const auto &data : reply->job_info_list()) { + for (size_t i = 0; i < reply->job_info_list_size(); i++) { + const auto &data = reply->job_info_list(i); auto job_id = JobID::FromBinary(data.job_id()); WorkerID worker_id = WorkerID::FromBinary(data.driver_address().worker_id()); From 99f7ba9f698415619528b85f956536050feab1da Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Wed, 25 Sep 2024 10:23:26 -0700 Subject: [PATCH 07/14] size_t -> int for proto Signed-off-by: Ruiyang Wang --- src/ray/gcs/gcs_server/gcs_job_manager.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/gcs/gcs_server/gcs_job_manager.cc b/src/ray/gcs/gcs_server/gcs_job_manager.cc index 8146295e60ab..d3d044c8c7fa 100644 --- a/src/ray/gcs/gcs_server/gcs_job_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_job_manager.cc @@ -303,7 +303,7 @@ void GcsJobManager::HandleGetAllJobInfo(rpc::GetAllJobInfoRequest request, num_finished_tasks->fetch_add(job_count) + job_count; try_send_reply(updated_finished_tasks); } else { - for (size_t i = 0; i < reply->job_info_list_size(); i++) { + for (int i = 0; i < reply->job_info_list_size(); i++) { const auto &data = reply->job_info_list(i); auto job_id = JobID::FromBinary(data.job_id()); WorkerID worker_id = WorkerID::FromBinary(data.driver_address().worker_id()); From a1ab6c613e10d1fcd5c9058ad09a1abc5f19a86c Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Wed, 25 Sep 2024 14:19:59 -0700 Subject: [PATCH 08/14] fix atomics in periodical_runner Signed-off-by: Ruiyang Wang --- src/ray/common/asio/periodical_runner.cc | 65 ++++++++++-------------- src/ray/common/asio/periodical_runner.h | 8 +-- 2 files changed, 30 insertions(+), 43 deletions(-) diff --git a/src/ray/common/asio/periodical_runner.cc b/src/ray/common/asio/periodical_runner.cc index 7de7fafc82c6..4c1babadc293 100644 --- a/src/ray/common/asio/periodical_runner.cc +++ b/src/ray/common/asio/periodical_runner.cc @@ -20,7 +20,7 @@ namespace ray { PeriodicalRunner::PeriodicalRunner(instrumented_io_context &io_service) - : io_service_(io_service), mutex_(), stopped_(std::make_shared(false)) {} + : io_service_(io_service) {} PeriodicalRunner::~PeriodicalRunner() { RAY_LOG(DEBUG) << "PeriodicalRunner is destructed"; @@ -29,7 +29,7 @@ PeriodicalRunner::~PeriodicalRunner() { void PeriodicalRunner::Clear() { absl::MutexLock lock(&mutex_); - *stopped_ = true; + stopped_ = true; for (const auto &timer : timers_) { timer->cancel(); } @@ -38,8 +38,8 @@ void PeriodicalRunner::Clear() { void PeriodicalRunner::RunFnPeriodically(std::function fn, uint64_t period_ms, - const std::string name) { - *stopped_ = false; + const std::string &name) { + stopped_ = false; if (period_ms > 0) { auto timer = std::make_shared(io_service_); { @@ -47,13 +47,8 @@ void PeriodicalRunner::RunFnPeriodically(std::function fn, timers_.push_back(timer); } io_service_.post( - [this, - stopped = stopped_, - fn = std::move(fn), - period_ms, - name, - timer = std::move(timer)]() { - if (*stopped) { + [this, fn = std::move(fn), period_ms, name, timer = std::move(timer)]() { + if (this->stopped_) { return; } if (RayConfig::instance().event_stats()) { @@ -74,28 +69,27 @@ void PeriodicalRunner::DoRunFnPeriodically( fn(); absl::MutexLock lock(&mutex_); timer->expires_from_now(period); - timer->async_wait( - [this, stopped = stopped_, fn = std::move(fn), period, timer = std::move(timer)]( - const boost::system::error_code &error) { - if (*stopped) { - return; - } - if (error == boost::asio::error::operation_aborted) { - // `operation_aborted` is set when `timer` is canceled or destroyed. - // The Monitor lifetime may be short than the object who use it. (e.g. - // gcs_server) - return; - } - RAY_CHECK(!error) << error.message(); - DoRunFnPeriodically(fn, period, timer); - }); + timer->async_wait([this, fn, period, timer = std::move(timer)]( + const boost::system::error_code &error) { + if (stopped_) { + return; + } + if (error == boost::asio::error::operation_aborted) { + // `operation_aborted` is set when `timer` is canceled or destroyed. + // The Monitor lifetime may be short than the object who use it. (e.g. + // gcs_server) + return; + } + RAY_CHECK(!error) << error.message(); + DoRunFnPeriodically(fn, period, timer); + }); } void PeriodicalRunner::DoRunFnPeriodicallyInstrumented( const std::function &fn, boost::posix_time::milliseconds period, std::shared_ptr timer, - const std::string name) { + const std::string &name) { fn(); absl::MutexLock lock(&mutex_); timer->expires_from_now(period); @@ -104,24 +98,17 @@ void PeriodicalRunner::DoRunFnPeriodicallyInstrumented( // event loop. auto stats_handle = io_service_.stats().RecordStart(name, period.total_nanoseconds()); timer->async_wait([this, - fn = std::move(fn), - stopped = stopped_, + fn, period, timer = std::move(timer), stats_handle = std::move(stats_handle), name](const boost::system::error_code &error) { - if (*stopped) { + if (this->stopped_) { return; } io_service_.stats().RecordExecution( - [this, - stopped = stopped_, - fn = std::move(fn), - error, - period, - timer = std::move(timer), - name]() { - if (*stopped) { + [this, fn, error, period, timer, name]() { + if (this->stopped_) { return; } if (error == boost::asio::error::operation_aborted) { @@ -133,7 +120,7 @@ void PeriodicalRunner::DoRunFnPeriodicallyInstrumented( RAY_CHECK(!error) << error.message(); DoRunFnPeriodicallyInstrumented(fn, period, timer, name); }, - std::move(stats_handle)); + stats_handle); }); } diff --git a/src/ray/common/asio/periodical_runner.h b/src/ray/common/asio/periodical_runner.h index c67b02620330..c4ae74dcafa3 100644 --- a/src/ray/common/asio/periodical_runner.h +++ b/src/ray/common/asio/periodical_runner.h @@ -30,7 +30,7 @@ namespace ray { /// All registered functions will stop running once this object is destructed. class PeriodicalRunner { public: - PeriodicalRunner(instrumented_io_context &io_service); + explicit PeriodicalRunner(instrumented_io_context &io_service); ~PeriodicalRunner(); @@ -38,7 +38,7 @@ class PeriodicalRunner { void RunFnPeriodically(std::function fn, uint64_t period_ms, - const std::string name) ABSL_LOCKS_EXCLUDED(mutex_); + const std::string &name) ABSL_LOCKS_EXCLUDED(mutex_); private: void DoRunFnPeriodically(const std::function &fn, @@ -49,14 +49,14 @@ class PeriodicalRunner { void DoRunFnPeriodicallyInstrumented(const std::function &fn, boost::posix_time::milliseconds period, std::shared_ptr timer, - const std::string name) + const std::string &name) ABSL_LOCKS_EXCLUDED(mutex_); instrumented_io_context &io_service_; mutable absl::Mutex mutex_; std::vector> timers_ ABSL_GUARDED_BY(mutex_); - std::shared_ptr stopped_; + std::atomic stopped_ = false; }; } // namespace ray From cf3f343f1f6a8af75977a03adc482250ce2ecc76 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Wed, 25 Sep 2024 14:33:23 -0700 Subject: [PATCH 09/14] update doc Signed-off-by: Ruiyang Wang --- src/ray/gcs/gcs_server/gcs_job_manager.cc | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_job_manager.cc b/src/ray/gcs/gcs_server/gcs_job_manager.cc index d3d044c8c7fa..7b9f8af23da8 100644 --- a/src/ray/gcs/gcs_server/gcs_job_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_job_manager.cc @@ -273,8 +273,16 @@ void GcsJobManager::HandleGetAllJobInfo(rpc::GetAllJobInfoRequest request, i++; } - // Jobs are filtered. Now, optionally populate is_running_tasks and job_info. A - // `asyncio.gather` is needed but we are in C++; so we use atomic counters. + // Jobs are filtered. Now, optionally populate is_running_tasks and job_info. We + // do async calls to: + // + // - N outbound RPCs, one to each jobs' core workers on GcsServer::main_service_. + // - One InternalKV MultiGet call on GcsServer::kv_service_. + // + // And then we wait all by examining an atomic num_finished_tasks counter and then + // reply. The wait counter is written from 2 different thread, which requires an + // atomic read-and-increment. Each thread performs read-and-increment, and check + // the atomic readout to ensure try_send_reply is executed exactly once. // Atomic counter of pending async tasks before sending the reply. // Once it reaches total_tasks, the reply is sent. @@ -284,10 +292,6 @@ void GcsJobManager::HandleGetAllJobInfo(rpc::GetAllJobInfoRequest request, // N tasks for N jobs; and 1 task for the MultiKVGet. If either is skipped the counter // still increments. const size_t total_tasks = reply->job_info_list_size() + 1; - - // Those async tasks need to atomically read-and-increment the counter, so this - // callback can't capture the atomic variable directly. Instead, it asks for an - // regular variable argument coming from the read-and-increment caller. auto try_send_reply = [reply, send_reply_callback, total_tasks](size_t finished_tasks) { if (finished_tasks == total_tasks) { From 6d006e9de9abc508f6f8b438c858bba4d0daf89b Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Thu, 26 Sep 2024 14:12:18 -0700 Subject: [PATCH 10/14] stopped -> shared_ptr> Signed-off-by: Ruiyang Wang --- src/ray/common/asio/periodical_runner.cc | 24 +++++++++++++++--------- src/ray/common/asio/periodical_runner.h | 4 +++- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/src/ray/common/asio/periodical_runner.cc b/src/ray/common/asio/periodical_runner.cc index 4c1babadc293..855590445e98 100644 --- a/src/ray/common/asio/periodical_runner.cc +++ b/src/ray/common/asio/periodical_runner.cc @@ -29,7 +29,7 @@ PeriodicalRunner::~PeriodicalRunner() { void PeriodicalRunner::Clear() { absl::MutexLock lock(&mutex_); - stopped_ = true; + *stopped_ = true; for (const auto &timer : timers_) { timer->cancel(); } @@ -39,7 +39,7 @@ void PeriodicalRunner::Clear() { void PeriodicalRunner::RunFnPeriodically(std::function fn, uint64_t period_ms, const std::string &name) { - stopped_ = false; + *stopped_ = false; if (period_ms > 0) { auto timer = std::make_shared(io_service_); { @@ -47,8 +47,13 @@ void PeriodicalRunner::RunFnPeriodically(std::function fn, timers_.push_back(timer); } io_service_.post( - [this, fn = std::move(fn), period_ms, name, timer = std::move(timer)]() { - if (this->stopped_) { + [this, + stopped = stopped_, + fn = std::move(fn), + period_ms, + name, + timer = std::move(timer)]() { + if (*stopped) { return; } if (RayConfig::instance().event_stats()) { @@ -69,9 +74,9 @@ void PeriodicalRunner::DoRunFnPeriodically( fn(); absl::MutexLock lock(&mutex_); timer->expires_from_now(period); - timer->async_wait([this, fn, period, timer = std::move(timer)]( + timer->async_wait([this, stopped = stopped_, fn, period, timer = std::move(timer)]( const boost::system::error_code &error) { - if (stopped_) { + if (*stopped) { return; } if (error == boost::asio::error::operation_aborted) { @@ -99,16 +104,17 @@ void PeriodicalRunner::DoRunFnPeriodicallyInstrumented( auto stats_handle = io_service_.stats().RecordStart(name, period.total_nanoseconds()); timer->async_wait([this, fn, + stopped = stopped_, period, timer = std::move(timer), stats_handle = std::move(stats_handle), name](const boost::system::error_code &error) { - if (this->stopped_) { + if (*stopped) { return; } io_service_.stats().RecordExecution( - [this, fn, error, period, timer, name]() { - if (this->stopped_) { + [this, stopped = stopped, fn, error, period, timer, name]() { + if (*stopped) { return; } if (error == boost::asio::error::operation_aborted) { diff --git a/src/ray/common/asio/periodical_runner.h b/src/ray/common/asio/periodical_runner.h index c4ae74dcafa3..ec469f6c1352 100644 --- a/src/ray/common/asio/periodical_runner.h +++ b/src/ray/common/asio/periodical_runner.h @@ -56,7 +56,9 @@ class PeriodicalRunner { mutable absl::Mutex mutex_; std::vector> timers_ ABSL_GUARDED_BY(mutex_); - std::atomic stopped_ = false; + // `stopped_` is copied to the timer callback, and may outlive `this`. + std::shared_ptr> stopped_ = + std::make_shared>(false); }; } // namespace ray From 110ae3ec03ffa76bf26e010b8ac40068c52b0891 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Thu, 26 Sep 2024 17:33:19 -0700 Subject: [PATCH 11/14] rename Signed-off-by: Ruiyang Wang --- src/ray/common/asio/asio_util.h | 14 +++++++------- src/ray/gcs/gcs_client/gcs_client.cc | 2 +- src/ray/gcs/gcs_server/gcs_server.h | 8 ++++---- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/ray/common/asio/asio_util.h b/src/ray/common/asio/asio_util.h index 6dc8bbe8cd8b..0dae0a972a6b 100644 --- a/src/ray/common/asio/asio_util.h +++ b/src/ray/common/asio/asio_util.h @@ -45,13 +45,13 @@ std::shared_ptr execute_after( * The constructor takes a thread name and starts the thread. * The destructor stops the io_service and joins the thread. */ -class InstrumentedIoContextWithThread { +class InstrumentedIOContextWithThread { public: /** * Constructor. * @param thread_name The name of the thread. */ - explicit InstrumentedIoContextWithThread(const std::string &thread_name) + explicit InstrumentedIOContextWithThread(const std::string &thread_name) : io_service_(), work_(io_service_) { io_thread_ = std::thread([this, thread_name] { SetThreadName(thread_name); @@ -59,14 +59,14 @@ class InstrumentedIoContextWithThread { }); } - ~InstrumentedIoContextWithThread() { Stop(); } + ~InstrumentedIOContextWithThread() { Stop(); } // Non-movable and non-copyable. - InstrumentedIoContextWithThread(const InstrumentedIoContextWithThread &) = delete; - InstrumentedIoContextWithThread &operator=(const InstrumentedIoContextWithThread &) = + InstrumentedIOContextWithThread(const InstrumentedIOContextWithThread &) = delete; + InstrumentedIOContextWithThread &operator=(const InstrumentedIOContextWithThread &) = delete; - InstrumentedIoContextWithThread(InstrumentedIoContextWithThread &&) = delete; - InstrumentedIoContextWithThread &operator=(InstrumentedIoContextWithThread &&) = delete; + InstrumentedIOContextWithThread(InstrumentedIOContextWithThread &&) = delete; + InstrumentedIOContextWithThread &operator=(InstrumentedIOContextWithThread &&) = delete; instrumented_io_context &GetIoService() { return io_service_; } diff --git a/src/ray/gcs/gcs_client/gcs_client.cc b/src/ray/gcs/gcs_client/gcs_client.cc index 8825dd0ab73f..e46007f819ba 100644 --- a/src/ray/gcs/gcs_client/gcs_client.cc +++ b/src/ray/gcs/gcs_client/gcs_client.cc @@ -719,7 +719,7 @@ std::unordered_map PythonGetNodeLabels( } Status ConnectOnSingletonIoContext(GcsClient &gcs_client, int64_t timeout_ms) { - static InstrumentedIoContextWithThread io_context("gcs_client_io_service"); + static InstrumentedIOContextWithThread io_context("gcs_client_io_service"); instrumented_io_context &io_service = io_context.GetIoService(); return gcs_client.Connect(io_service, timeout_ms); } diff --git a/src/ray/gcs/gcs_server/gcs_server.h b/src/ray/gcs/gcs_server/gcs_server.h index 2cf994cddb2f..59a45a95ea31 100644 --- a/src/ray/gcs/gcs_server/gcs_server.h +++ b/src/ray/gcs/gcs_server/gcs_server.h @@ -213,13 +213,13 @@ class GcsServer { /// The main io service to drive event posted from grpc threads. instrumented_io_context &main_service_; /// The io service used by Pubsub, for isolation from other workload. - InstrumentedIoContextWithThread pubsub_io_context_; + InstrumentedIOContextWithThread pubsub_io_context_; // The io service used by internal KV service, table storage and the StoreClient. - InstrumentedIoContextWithThread kv_io_context_; + InstrumentedIOContextWithThread kv_io_context_; // The io service used by task manager. - InstrumentedIoContextWithThread task_io_context_; + InstrumentedIOContextWithThread task_io_context_; // The io service used by ray syncer. - InstrumentedIoContextWithThread ray_syncer_io_context_; + InstrumentedIOContextWithThread ray_syncer_io_context_; /// The grpc server rpc::GrpcServer rpc_server_; /// The `ClientCallManager` object that is shared by all `NodeManagerWorkerClient`s. From 3461330a962efc7235f585f29d4235e6005aaaa9 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Thu, 26 Sep 2024 17:37:26 -0700 Subject: [PATCH 12/14] fit lint Signed-off-by: Ruiyang Wang --- src/ray/gcs/store_client/redis_store_client.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/gcs/store_client/redis_store_client.cc b/src/ray/gcs/store_client/redis_store_client.cc index 4836b34e31d9..0a6e53f66f48 100644 --- a/src/ray/gcs/store_client/redis_store_client.cc +++ b/src/ray/gcs/store_client/redis_store_client.cc @@ -22,10 +22,10 @@ #include "absl/cleanup/cleanup.h" #include "absl/strings/match.h" #include "absl/strings/str_cat.h" -#include "ray/common/asio/asio_util.h" #include "ray/gcs/redis_context.h" #include "ray/util/container_util.h" #include "ray/util/logging.h" + namespace ray { namespace gcs { From a6075288d902f25ab55f0103689bbeafb3bf4a7f Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Tue, 29 Oct 2024 15:42:03 -0700 Subject: [PATCH 13/14] type traits and policy for kv Signed-off-by: Ruiyang Wang --- src/ray/common/asio/asio_util.h | 14 +++++++-- src/ray/gcs/gcs_server/gcs_server.cc | 14 ++++----- .../gcs_server/gcs_server_io_context_policy.h | 8 +++-- src/ray/util/type_traits.h | 31 +++++++++++++++++++ 4 files changed, 56 insertions(+), 11 deletions(-) create mode 100644 src/ray/util/type_traits.h diff --git a/src/ray/common/asio/asio_util.h b/src/ray/common/asio/asio_util.h index c7df71405758..963ac9296ddd 100644 --- a/src/ray/common/asio/asio_util.h +++ b/src/ray/common/asio/asio_util.h @@ -23,6 +23,7 @@ #include "ray/common/asio/instrumented_io_context.h" #include "ray/util/array.h" +#include "ray/util/type_traits.h" #include "ray/util/util.h" template @@ -133,15 +134,24 @@ class IOContextProvider { } } + template + struct Wrapper { + static constexpr int value = N; + }; + // Gets IOContext registered for type T. If the type is not registered in // Policy::kAllDedicatedIOContextNames, it's a compile error. template instrumented_io_context &GetIOContext() const { constexpr int index = Policy::template GetDedicatedIOContextIndex(); static_assert( - index >= -1 && index < Policy::kAllDedicatedIOContextNames.size(), + (index == -1) || + (index >= 0 && + static_cast(index) < Policy::kAllDedicatedIOContextNames.size()) || + // To show index in compile error... + ray::AlwaysFalseValue, "index out of bound, invalid GetDedicatedIOContextIndex implementation! Index " - "can only be -1 or within range of kAllDedicatedIOContextNames"); + "can only be -1 or within range of kAllDedicatedIOContextNames: "); if constexpr (index == -1) { return default_io_context_; diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index 719eba869837..aed5210dd838 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -69,8 +69,8 @@ GcsServer::GcsServer(const ray::gcs::GcsServerConfig &config, periodical_runner_(io_context_provider_.GetDefaultIOContext()), is_started_(false), is_stopped_(false) { - // Init GCS table storage. Note this is on main_service_, not kv_io_context_, to avoid - // congestion on the kv_io_context_. + // Init GCS table storage. Note this is on the default io context, not the one with + // GcsInternalKVManager, to avoid congestion on the latter. RAY_LOG(INFO) << "GCS storage type is " << storage_type_; switch (storage_type_) { case StorageType::IN_MEMORY: @@ -78,8 +78,8 @@ GcsServer::GcsServer(const ray::gcs::GcsServerConfig &config, io_context_provider_.GetDefaultIOContext()); break; case StorageType::REDIS_PERSIST: - gcs_table_storage_ = - std::make_shared(GetOrConnectRedis(main_service_)); + gcs_table_storage_ = std::make_shared( + GetOrConnectRedis(io_context_provider_.GetDefaultIOContext())); break; default: RAY_LOG(FATAL) << "Unexpected storage type: " << storage_type_; @@ -560,12 +560,12 @@ void GcsServer::InitKVManager() { switch (storage_type_) { case (StorageType::REDIS_PERSIST): instance = std::make_unique(std::make_unique( - GetOrConnectRedis(kv_io_context_.GetIoService()))); + GetOrConnectRedis(io_context_provider_.GetIOContext()))); break; case (StorageType::IN_MEMORY): instance = std::make_unique( std::make_unique(std::make_unique( - io_context_provider_.GetDefaultIOContext()))); + io_context_provider_.GetIOContext()))); break; default: RAY_LOG(FATAL) << "Unexpected storage type! " << storage_type_; @@ -578,7 +578,7 @@ void GcsServer::InitKVManager() { void GcsServer::InitKVService() { RAY_CHECK(kv_manager_); kv_service_ = std::make_unique( - io_context_provider_.GetDefaultIOContext(), *kv_manager_); + io_context_provider_.GetIOContext(), *kv_manager_); // Register service. rpc_server_.RegisterService(*kv_service_, false /* token_auth */); } diff --git a/src/ray/gcs/gcs_server/gcs_server_io_context_policy.h b/src/ray/gcs/gcs_server/gcs_server_io_context_policy.h index 9c146d811353..f4e45f3e2acd 100644 --- a/src/ray/gcs/gcs_server/gcs_server_io_context_policy.h +++ b/src/ray/gcs/gcs_server/gcs_server_io_context_policy.h @@ -22,6 +22,7 @@ #include "ray/gcs/gcs_server/gcs_task_manager.h" #include "ray/gcs/pubsub/gcs_pub_sub.h" #include "ray/util/array.h" +#include "ray/util/type_traits.h" namespace ray { namespace gcs { @@ -40,10 +41,13 @@ struct GcsServerIOContextPolicy { return IndexOf("pubsub_io_context"); } else if constexpr (std::is_same_v) { return IndexOf("ray_syncer_io_context"); + } else if constexpr (std::is_same_v) { + // default io context + return -1; } else { // Due to if-constexpr limitations, this have to be in an else block. - // Using this tuple_size_v to put T into compile error message. - static_assert(std::tuple_size_v> == 0, "unknown type"); + // Using this template to put T into compile error message. + static_assert(AlwaysFalse, "unknown type"); } } diff --git a/src/ray/util/type_traits.h b/src/ray/util/type_traits.h new file mode 100644 index 000000000000..dc5e366af05f --- /dev/null +++ b/src/ray/util/type_traits.h @@ -0,0 +1,31 @@ +// Copyright 2024 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +namespace ray { + +template +constexpr bool AlwaysFalse = false; + +template +constexpr bool AlwaysTrue = true; + +template +constexpr bool AlwaysFalseValue = false; + +template +constexpr bool AlwaysTrueValue = true; + +} // namespace ray From 698cbe9c187a47c194e6cc424deb2d934525a8c2 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Tue, 29 Oct 2024 15:51:39 -0700 Subject: [PATCH 14/14] remove temp code Signed-off-by: Ruiyang Wang --- src/ray/common/asio/asio_util.h | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/ray/common/asio/asio_util.h b/src/ray/common/asio/asio_util.h index 963ac9296ddd..4e8beea50f3e 100644 --- a/src/ray/common/asio/asio_util.h +++ b/src/ray/common/asio/asio_util.h @@ -134,11 +134,6 @@ class IOContextProvider { } } - template - struct Wrapper { - static constexpr int value = N; - }; - // Gets IOContext registered for type T. If the type is not registered in // Policy::kAllDedicatedIOContextNames, it's a compile error. template @@ -151,7 +146,7 @@ class IOContextProvider { // To show index in compile error... ray::AlwaysFalseValue, "index out of bound, invalid GetDedicatedIOContextIndex implementation! Index " - "can only be -1 or within range of kAllDedicatedIOContextNames: "); + "can only be -1 or within range of kAllDedicatedIOContextNames"); if constexpr (index == -1) { return default_io_context_;