diff --git a/modules/EnergyManagement/EEBUS/CMakeLists.txt b/modules/EnergyManagement/EEBUS/CMakeLists.txt index 4d27488fec..1e2f290d93 100644 --- a/modules/EnergyManagement/EEBUS/CMakeLists.txt +++ b/modules/EnergyManagement/EEBUS/CMakeLists.txt @@ -33,6 +33,7 @@ target_link_libraries(${MODULE_NAME} proto_control_service proto_cs_lpc_service everest::run_application + everest::io ) add_dependencies(${MODULE_NAME} eebus_grpc_api_cmd) target_include_directories(${MODULE_NAME} diff --git a/modules/EnergyManagement/EEBUS/ConfigValidator.cpp b/modules/EnergyManagement/EEBUS/ConfigValidator.cpp index 2801b20c45..131ac08a29 100644 --- a/modules/EnergyManagement/EEBUS/ConfigValidator.cpp +++ b/modules/EnergyManagement/EEBUS/ConfigValidator.cpp @@ -95,6 +95,14 @@ int ConfigValidator::get_max_nominal_power() const { return this->config.max_nominal_power_W; } +int ConfigValidator::get_restart_delay_s() const { + return this->config.restart_delay_s; +} + +int ConfigValidator::get_reconnect_delay_s() const { + return this->config.reconnect_delay_s; +} + bool ConfigValidator::validate_eebus_service_port() const { if (this->config.eebus_service_port < 0) { EVLOG_error << "eebus service port is negative"; diff --git a/modules/EnergyManagement/EEBUS/EEBUS.cpp b/modules/EnergyManagement/EEBUS/EEBUS.cpp index d550f3745a..fc57ffe7cc 100644 --- a/modules/EnergyManagement/EEBUS/EEBUS.cpp +++ b/modules/EnergyManagement/EEBUS/EEBUS.cpp @@ -41,67 +41,70 @@ void EEBUS::init() { } if (this->config.manage_eebus_grpc_api_binary) { - if (config_validator->validate()) { - this->eebus_grpc_api_thread_active.store(true); - this->eebus_grpc_api_thread = - std::thread(&EEBUS::start_eebus_grpc_api, this, config_validator->get_eebus_grpc_api_binary_path(), - config_validator->get_grpc_port(), config_validator->get_certificate_path(), - config_validator->get_private_key_path()); - } else { - EVLOG_critical << "Could not validate config for starting eebus_grpc_api binary"; - } + this->eebus_grpc_api_thread_active.store(true); + this->eebus_grpc_api_thread = std::thread( + &EEBUS::start_eebus_grpc_api, this, config_validator->get_eebus_grpc_api_binary_path(), + config_validator->get_grpc_port(), config_validator->get_certificate_path(), + config_validator->get_private_key_path(), config_validator->get_restart_delay_s()); } this->connection_handler = std::make_unique(config_validator); + event_handler.register_event_handler(this->connection_handler.get()); - if (!this->connection_handler->initialize_connection()) { - EVLOG_critical << "Failed to initialize connection to EEBUS service"; - return; - } - - if (!this->connection_handler->add_lpc_use_case(this->callbacks)) { - EVLOG_critical << "Failed to add LPC use case"; - return; - } - - this->connection_handler->subscribe_to_events(); + this->connection_handler->add_use_case(eebus::EEBusUseCase::LPC, this->callbacks); + this->connection_handler->done_adding_use_case(); } void EEBUS::start_eebus_grpc_api(const std::filesystem::path& binary_path, int port, - const std::filesystem::path& cert_file, const std::filesystem::path& key_file) { - try { - std::vector args; - constexpr int num_args = 6; - args.reserve(num_args); - args.emplace_back("-port"); - args.emplace_back(std::to_string(port)); - args.emplace_back("-certificate-path"); - args.emplace_back(cert_file.string()); - args.emplace_back("-private-key-path"); - args.emplace_back(key_file.string()); - everest::run_application::CmdOutput output = everest::run_application::run_application( - binary_path.string(), args, [this](const std::string& output_line) { - if (!output_line.empty()) { - EVLOG_debug << "eebus-grpc: " << output_line; - } - if (not this->eebus_grpc_api_thread_active) { - return everest::run_application::CmdControl::Terminate; - } - return everest::run_application::CmdControl::Continue; - }); - EVLOG_info << "eebus-grpc exit code: " << output.exit_code; - } catch (const std::exception& e) { - EVLOG_critical << "start_eebus_grpc_api thread caught exception: " << e.what(); - } catch (...) { - EVLOG_critical << "start_eebus_grpc_api thread caught unknown exception."; + const std::filesystem::path& cert_file, const std::filesystem::path& key_file, + int restart_delay_s) { + std::vector args; + constexpr int num_args = 6; + args.reserve(num_args); + args.emplace_back("-port"); + args.emplace_back(std::to_string(port)); + args.emplace_back("-certificate-path"); + args.emplace_back(cert_file.string()); + args.emplace_back("-private-key-path"); + args.emplace_back(key_file.string()); + + while (this->eebus_grpc_api_thread_active) { + try { + EVLOG_info << "Starting eebus_grpc_api binary..."; + everest::run_application::CmdOutput output = everest::run_application::run_application( + binary_path.string(), args, [this](const std::string& output_line) { + if (!output_line.empty()) { + EVLOG_debug << "eebus-grpc: " << output_line; + } + if (not this->eebus_grpc_api_thread_active) { + return everest::run_application::CmdControl::Terminate; + } + return everest::run_application::CmdControl::Continue; + }); + EVLOG_warning << "eebus-grpc process exited with code: " << output.exit_code; + } catch (const std::exception& e) { + EVLOG_critical << "start_eebus_grpc_api thread caught exception: " << e.what(); + } catch (...) { + EVLOG_critical << "start_eebus_grpc_api thread caught unknown exception."; + } + + if (this->eebus_grpc_api_thread_active) { + EVLOG_info << "Restarting eebus_grpc_api binary in " << restart_delay_s << " seconds..."; + std::this_thread::sleep_for(std::chrono::seconds(restart_delay_s)); + } } + EVLOG_info << "eebus_grpc_api monitoring thread is stopping."; } void EEBUS::ready() { invoke_ready(*p_main); - if (!this->connection_handler->start_service()) { - EVLOG_critical << "Failed to start EEBUS service"; - return; + + // Start the event handler in its own thread + event_handler_thread = std::thread([this]() { event_handler.run(running_flag); }); + + // Main application loop + while (running_flag) { + std::this_thread::sleep_for(std::chrono::milliseconds(500)); } } diff --git a/modules/EnergyManagement/EEBUS/EEBUS.hpp b/modules/EnergyManagement/EEBUS/EEBUS.hpp index ecb7223df6..e1863cbb7f 100644 --- a/modules/EnergyManagement/EEBUS/EEBUS.hpp +++ b/modules/EnergyManagement/EEBUS/EEBUS.hpp @@ -22,6 +22,8 @@ #include #include +#include +#include // ev@4bf81b14-a215-475c-a1d3-0a484ae48918:v1 namespace module { @@ -40,6 +42,8 @@ struct Conf { std::string serial_number; int failsafe_control_limit_W; int max_nominal_power_W; + int restart_delay_s; + int reconnect_delay_s; }; class EEBUS : public Everest::ModuleBase { @@ -72,13 +76,18 @@ class EEBUS : public Everest::ModuleBase { // ev@211cfdbe-f69a-4cd6-a4ec-f8aaa3d1b6c8:v1 void start_eebus_grpc_api(const std::filesystem::path& binary_path, int port, - const std::filesystem::path& cert_file, const std::filesystem::path& key_file); + const std::filesystem::path& cert_file, const std::filesystem::path& key_file, + int restart_delay_s); std::thread eebus_grpc_api_thread; std::atomic eebus_grpc_api_thread_active; std::unique_ptr connection_handler; eebus::EEBusCallbacks callbacks{}; + everest::lib::io::event::fd_event_handler event_handler; + std::thread event_handler_thread; + std::atomic_bool running_flag{true}; + std::shared_ptr config_validator; // ev@211cfdbe-f69a-4cd6-a4ec-f8aaa3d1b6c8:v1 }; diff --git a/modules/EnergyManagement/EEBUS/EebusConnectionHandler.cpp b/modules/EnergyManagement/EEBUS/EebusConnectionHandler.cpp index 2fb021c332..8f704a2bfd 100644 --- a/modules/EnergyManagement/EEBUS/EebusConnectionHandler.cpp +++ b/modules/EnergyManagement/EEBUS/EebusConnectionHandler.cpp @@ -4,6 +4,7 @@ #include #include +#include namespace module { @@ -12,22 +13,84 @@ constexpr auto CHANNEL_READY_TIMEOUT = std::chrono::seconds(60); constexpr auto HEARTBEAT_TIMEOUT_SECONDS = 60; } // namespace -EebusConnectionHandler::EebusConnectionHandler(std::shared_ptr config) : config(std::move(config)) { +EebusConnectionHandler::EebusConnectionHandler(std::shared_ptr config) : + config(std::move(config)), state(State::INIT) { + if (!this->initialize_connection()) { + this->m_handler.add_action([this] { this->handle_event(EebusConnectionEvents::DISCONNECTED); }); + } +} + +everest::lib::io::event::sync_status EebusConnectionHandler::sync() { + m_handler.run_once(); + return everest::lib::io::event::sync_status::ok; +} + +int EebusConnectionHandler::get_poll_fd() { + return m_handler.get_poll_fd(); } EebusConnectionHandler::~EebusConnectionHandler() { this->stop(); } +void EebusConnectionHandler::reconnect() { + EVLOG_info << "Attempting to reconnect to EEBUS gRPC server..."; + if (this->initialize_connection()) { + this->m_handler.unregister_event_handler(&this->reconnection_timer); + EVLOG_info << "Reconnected successfully."; + if (this->use_case_added) { + this->add_use_case(this->last_use_case, this->last_callbacks); + this->done_adding_use_case(); + } + } else { + EVLOG_warning << "Reconnect attempt failed. Will try again in " << this->config->get_reconnect_delay_s() + << " seconds."; + this->reconnection_timer.set_timeout(std::chrono::seconds(this->config->get_reconnect_delay_s())); + } +} + +void EebusConnectionHandler::reset() { + if (this->event_reader) { + this->event_reader->stop(); + this->event_reader.reset(); + } + if (this->lpc_handler) { + this->lpc_handler.reset(); + } + if (this->control_service_stub) { + this->control_service_stub.reset(); + } + if (this->control_service_channel) { + this->control_service_channel.reset(); + } +} + bool EebusConnectionHandler::initialize_connection() { + if (!this->create_channel_and_stub()) { + EVLOG_error << "Connection to EEBUS gRPC server failed, stopping."; + return false; + } + + if (!this->configure_service()) { + EVLOG_error << "Failed to configure EEBUS gRPC service, stopping."; + return false; + } + + m_handler.add_action([this] { handle_event(EebusConnectionEvents::CONNECTED); }); + return true; +} + +bool EebusConnectionHandler::create_channel_and_stub() { const auto address = "localhost:" + std::to_string(this->config->get_grpc_port()); this->control_service_channel = grpc::CreateChannel(address, grpc::InsecureChannelCredentials()); if (!EebusConnectionHandler::wait_for_channel_ready(this->control_service_channel, CHANNEL_READY_TIMEOUT)) { - EVLOG_error << "Connection to EEBUS gRPC server failed."; return false; } this->control_service_stub = control_service::ControlService::NewStub(this->control_service_channel); + return true; +} +bool EebusConnectionHandler::configure_service() { control_service::SetConfigRequest set_config_request = control_service::CreateSetConfigRequest( this->config->get_eebus_service_port(), this->config->get_vendor_code(), this->config->get_device_brand(), this->config->get_device_model(), this->config->get_serial_number(), @@ -48,8 +111,9 @@ bool EebusConnectionHandler::initialize_connection() { control_service::CallStartSetup(this->control_service_stub, start_setup_request, &start_setup_response); if (!start_setup_status.ok()) { EVLOG_warning << "start_setup failed: " << start_setup_status.error_message(); - return false; + // This is not considered a fatal error } + control_service::RegisterRemoteSkiRequest register_ski_request; register_ski_request.set_remote_ski(this->config->get_eebus_ems_ski()); control_service::EmptyResponse register_ski_response; @@ -63,63 +127,108 @@ bool EebusConnectionHandler::initialize_connection() { return true; } -bool EebusConnectionHandler::start_service() { +void EebusConnectionHandler::start_service() { control_service::EmptyRequest request; control_service::EmptyResponse response; auto status = control_service::CallStartService(this->control_service_stub, request, &response); if (!status.ok()) { EVLOG_error << "start_service failed: " << status.error_message(); - return false; + m_handler.add_action([this] { handle_event(EebusConnectionEvents::DISCONNECTED); }); } - this->lpc_handler->start(); + if (lpc_handler) { + this->lpc_handler->start(); + } - return true; + m_handler.add_action([this] { handle_event(EebusConnectionEvents::STARTED); }); } -bool EebusConnectionHandler::add_lpc_use_case(const eebus::EEBusCallbacks& callbacks) { - this->lpc_handler = std::make_unique(this->config, callbacks); - - control_service::UseCase use_case = LpcUseCaseHandler::get_use_case_info(); - common_types::EntityAddress entity_address = common_types::CreateEntityAddress({1}); - control_service::AddUseCaseRequest request = control_service::CreateAddUseCaseRequest(&entity_address, &use_case); - control_service::AddUseCaseResponse response; - auto status = control_service::CallAddUseCase(this->control_service_stub, request, &response); - std::ignore = request.release_entity_address(); - std::ignore = request.release_use_case(); - if (!status.ok() || response.endpoint().empty()) { - EVLOG_error << "add_use_case failed: " << status.error_message(); - return false; - } +bool EebusConnectionHandler::add_use_case(eebus::EEBusUseCase use_case, const eebus::EEBusCallbacks& callbacks) { + this->last_use_case = use_case; + this->last_callbacks = callbacks; + this->use_case_added = true; + + switch (use_case) { + case eebus::EEBusUseCase::LPC: + this->lpc_handler = std::make_unique(this->config, callbacks); + + control_service::UseCase use_case_info = LpcUseCaseHandler::get_use_case_info(); + common_types::EntityAddress entity_address = common_types::CreateEntityAddress({1}); + control_service::AddUseCaseRequest request = + control_service::CreateAddUseCaseRequest(&entity_address, &use_case_info); + control_service::AddUseCaseResponse response; + auto status = control_service::CallAddUseCase(this->control_service_stub, request, &response); + std::ignore = request.release_entity_address(); + std::ignore = request.release_use_case(); + if (!status.ok() || response.endpoint().empty()) { + EVLOG_error << "add_use_case failed: " << status.error_message(); + return false; + } - auto lpc_channel = grpc::CreateChannel(response.endpoint(), grpc::InsecureChannelCredentials()); - if (!EebusConnectionHandler::wait_for_channel_ready(lpc_channel, CHANNEL_READY_TIMEOUT)) { - EVLOG_error << "Connection to LPC use case gRPC server failed."; - return false; - } + auto lpc_channel = grpc::CreateChannel(response.endpoint(), grpc::InsecureChannelCredentials()); + if (!EebusConnectionHandler::wait_for_channel_ready(lpc_channel, CHANNEL_READY_TIMEOUT)) { + EVLOG_error << "Connection to LPC use case gRPC server failed."; + return false; + } - this->lpc_handler->set_stub(cs_lpc::ControllableSystemLPCControl::NewStub(lpc_channel)); - this->lpc_handler->configure_use_case(); + this->lpc_handler->set_stub(cs_lpc::ControllableSystemLPCControl::NewStub(lpc_channel)); + this->lpc_handler->configure_use_case(); + this->event_reader = + std::make_unique(this->control_service_stub, [this](const auto& event) { + m_handler.add_action([this, event] { this->lpc_handler->handle_event(event); }); + }); + + common_types::EntityAddress entity_address_for_event_reader = common_types::CreateEntityAddress({1}); + this->event_reader->start(entity_address_for_event_reader, use_case_info); + break; + } return true; } -void EebusConnectionHandler::subscribe_to_events() { - this->event_reader = std::make_unique( - this->control_service_stub, [this](const auto& event) { this->lpc_handler->handle_event(event); }); +void EebusConnectionHandler::handle_event(EebusConnectionEvents event) { + switch (event) { + case EebusConnectionEvents::CONNECTED: + if (State::INIT == state || State::DISCONNECTED == state) { + state = State::ADDING_USE_CASES; + } + break; + case EebusConnectionEvents::DISCONNECTED: + this->state = State::DISCONNECTED; + this->reset(); + EVLOG_info << "Disconnected from EEBUS gRPC service. Will try to reconnect in " + << this->config->get_reconnect_delay_s() << " seconds."; + this->reconnection_timer.set_timeout(std::chrono::seconds(this->config->get_reconnect_delay_s())); + this->m_handler.register_event_handler(&this->reconnection_timer, [this](auto&) { this->reconnect(); }); + break; + case EebusConnectionEvents::STARTED: + if (State::READY_TO_START == state) { + state = State::RUNNING; + this->state_machine_timer.set_timeout(std::chrono::seconds(1)); + this->m_handler.register_event_handler(&state_machine_timer, [this](auto&) { + if (this->lpc_handler) { + this->lpc_handler->run_state_machine(); + } + }); + } + break; + case EebusConnectionEvents::DONE_ADDING_USE_CASES: + if (State::ADDING_USE_CASES == state) { + state = State::READY_TO_START; + m_handler.add_action([this] { start_service(); }); + } + break; + } +} - common_types::EntityAddress entity_address = common_types::CreateEntityAddress({1}); - control_service::UseCase use_case = LpcUseCaseHandler::get_use_case_info(); - this->event_reader->start(entity_address, use_case); +void EebusConnectionHandler::done_adding_use_case() { + m_handler.add_action([this] { this->handle_event(EebusConnectionEvents::DONE_ADDING_USE_CASES); }); } void EebusConnectionHandler::stop() { if (this->event_reader) { this->event_reader->stop(); } - if (this->lpc_handler) { - this->lpc_handler->stop(); - } } bool EebusConnectionHandler::wait_for_channel_ready(const std::shared_ptr& channel, diff --git a/modules/EnergyManagement/EEBUS/LpcUseCaseHandler.cpp b/modules/EnergyManagement/EEBUS/LpcUseCaseHandler.cpp index 1148c777ec..41fcbb4001 100644 --- a/modules/EnergyManagement/EEBUS/LpcUseCaseHandler.cpp +++ b/modules/EnergyManagement/EEBUS/LpcUseCaseHandler.cpp @@ -19,197 +19,177 @@ LpcUseCaseHandler::LpcUseCaseHandler(std::shared_ptr config, ee heartbeat_timeout(HEARTBEAT_TIMEOUT), failsafe_duration_timeout(std::chrono::hours(2)), // Default to 2 hours, as per spec failsafe_control_limit(this->config->get_failsafe_control_limit()) { + this->initialize_event_handlers(); } -LpcUseCaseHandler::~LpcUseCaseHandler() { - stop(); +void LpcUseCaseHandler::initialize_event_handlers() { + this->event_handlers = { + {"DataUpdateHeartbeat", &LpcUseCaseHandler::handle_data_update_heartbeat}, + {"DataUpdateLimit", &LpcUseCaseHandler::handle_data_update_limit}, + {"DataUpdateFailsafeDurationMinimum", &LpcUseCaseHandler::handle_data_update_failsafe_duration_minimum}, + {"DataUpdateFailsafeConsumptionActivePowerLimit", + &LpcUseCaseHandler::handle_data_update_failsafe_consumption_active_power_limit}, + {"WriteApprovalRequired", &LpcUseCaseHandler::handle_write_approval_required}, + {"UseCaseSupportUpdate", &LpcUseCaseHandler::handle_use_case_support_update}, + }; } void LpcUseCaseHandler::start() { this->running = true; this->init_timestamp = std::chrono::steady_clock::now(); - this->worker_thread = std::thread(&LpcUseCaseHandler::run, this); + this->last_heartbeat_timestamp = std::chrono::steady_clock::now(); // Simulate initial heartbeat this->start_heartbeat(); } -void LpcUseCaseHandler::stop() { - if (running.exchange(false)) { - cv.notify_all(); - if (worker_thread.joinable()) { - worker_thread.join(); - } - } -} - void LpcUseCaseHandler::set_stub(std::shared_ptr stub) { this->stub = std::move(stub); } void LpcUseCaseHandler::handle_event(const control_service::UseCaseEvent& event) { - std::lock_guard lock(this->queue_mutex); - this->event_queue.push(event); - this->cv.notify_one(); + const auto& event_str = event.event(); + + auto it = this->event_handlers.find(event_str); + if (it != this->event_handlers.end()) { + (this->*(it->second))(); + } else if (!event_str.empty()) { + EVLOG_error << "Unknown event received: " << event_str; + } + + this->run_state_machine(); } -void LpcUseCaseHandler::run() { - this->set_state(State::Init); +void LpcUseCaseHandler::handle_data_update_heartbeat() { + this->last_heartbeat_timestamp = std::chrono::steady_clock::now(); + EVLOG_debug << "Heartbeat received"; +} + +void LpcUseCaseHandler::handle_data_update_limit() { + update_limit_from_event(); +} - { - std::lock_guard lock(this->heartbeat_mutex); - this->last_heartbeat_timestamp = std::chrono::steady_clock::now(); // Simulate initial heartbeat +void LpcUseCaseHandler::handle_data_update_failsafe_duration_minimum() { + cs_lpc::FailsafeDurationMinimumRequest read_duration_req; + cs_lpc::FailsafeDurationMinimumResponse read_duration_res; + auto read_status = cs_lpc::CallFailsafeDurationMinimum(this->stub, read_duration_req, &read_duration_res); + if (read_status.ok()) { + this->failsafe_duration_timeout = std::chrono::nanoseconds(read_duration_res.duration_nanoseconds()); + EVLOG_info << "FailsafeDurationMinimum updated to " << read_duration_res.duration_nanoseconds() << "ns"; + } else { + EVLOG_warning << "Could not re-read FailsafeDurationMinimum after update event: " << read_status.error_message(); } +} - while (this->running) { - std::unique_lock lock(this->queue_mutex); - if (cv.wait_for(lock, std::chrono::seconds(1), [this] { return !this->event_queue.empty(); })) { - // Event received - control_service::UseCaseEvent event = this->event_queue.front(); - this->event_queue.pop(); - lock.unlock(); - - const auto& event_str = event.event(); - - if (event_str == "DataUpdateHeartbeat") { - std::lock_guard heartbeat_lock(this->heartbeat_mutex); - this->last_heartbeat_timestamp = std::chrono::steady_clock::now(); - EVLOG_debug << "Heartbeat received"; - } else if (event_str == "DataUpdateLimit") { - update_limit_from_event(); - } else if (event_str == "DataUpdateFailsafeDurationMinimum") { - cs_lpc::FailsafeDurationMinimumRequest read_duration_req; - cs_lpc::FailsafeDurationMinimumResponse read_duration_res; - auto read_status = - cs_lpc::CallFailsafeDurationMinimum(this->stub, read_duration_req, &read_duration_res); - if (read_status.ok()) { - this->failsafe_duration_timeout = - std::chrono::nanoseconds(read_duration_res.duration_nanoseconds()); - EVLOG_info << "FailsafeDurationMinimum updated to " << read_duration_res.duration_nanoseconds() - << "ns"; - } else { - EVLOG_warning << "Could not re-read FailsafeDurationMinimum after update event: " - << read_status.error_message(); - } - } else if (event_str == "DataUpdateFailsafeConsumptionActivePowerLimit") { - cs_lpc::FailsafeConsumptionActivePowerLimitRequest read_limit_req; - cs_lpc::FailsafeConsumptionActivePowerLimitResponse read_limit_res; - auto read_status = - cs_lpc::CallFailsafeConsumptionActivePowerLimit(this->stub, read_limit_req, &read_limit_res); - if (read_status.ok()) { - this->failsafe_control_limit = read_limit_res.limit(); - EVLOG_info << "FailsafeConsumptionActivePowerLimit updated to " << this->failsafe_control_limit; - } else { - EVLOG_warning << "Could not re-read FailsafeConsumptionActivePowerLimit after update event: " - << read_status.error_message(); - } - } else if (event_str == "WriteApprovalRequired") { - approve_pending_writes(); - } else if (event_str == "UseCaseSupportUpdate") { - // ignore - } else { - EVLOG_error << "Unknown event received: " << event_str; - } - } else { - // Timeout - lock.unlock(); - } +void LpcUseCaseHandler::handle_data_update_failsafe_consumption_active_power_limit() { + cs_lpc::FailsafeConsumptionActivePowerLimitRequest read_limit_req; + cs_lpc::FailsafeConsumptionActivePowerLimitResponse read_limit_res; + auto read_status = cs_lpc::CallFailsafeConsumptionActivePowerLimit(this->stub, read_limit_req, &read_limit_res); + if (read_status.ok()) { + this->failsafe_control_limit = read_limit_res.limit(); + EVLOG_info << "FailsafeConsumptionActivePowerLimit updated to " << this->failsafe_control_limit; + } else { + EVLOG_warning << "Could not re-read FailsafeConsumptionActivePowerLimit after update event: " + << read_status.error_message(); + } +} + +void LpcUseCaseHandler::handle_write_approval_required() { + approve_pending_writes(); +} + +void LpcUseCaseHandler::handle_use_case_support_update() { + // ignore +} - auto now = std::chrono::steady_clock::now(); +void LpcUseCaseHandler::run_state_machine() { + auto now = std::chrono::steady_clock::now(); - // Heartbeat check - bool heartbeat_has_timeout = false; - { - std::lock_guard heartbeat_lock(this->heartbeat_mutex); - heartbeat_has_timeout = (now - this->last_heartbeat_timestamp) > this->heartbeat_timeout; + // Heartbeat check + bool heartbeat_has_timeout = (now - this->last_heartbeat_timestamp) > this->heartbeat_timeout; + std::optional limit; + limit = this->current_limit; + const bool limit_is_active = limit.has_value() && limit.value().is_active(); + const bool limit_is_deactivated = limit.has_value() && !limit.value().is_active(); + const bool limit_expired = + limit.has_value() && !limit->delete_duration() && limit->duration_nanoseconds() != 0 && + now >= (std::chrono::nanoseconds(limit->duration_nanoseconds()) + this->last_limit_received_timestamp); + + switch (this->state) { + case State::Init: + if (heartbeat_has_timeout) { + set_state(State::Failsafe); + break; } - std::optional limit; - { - std::lock_guard limit_lock(this->limit_mutex); - limit = this->current_limit; + if ((now - this->init_timestamp) > LPC_TIMEOUT) { + set_state(State::UnlimitedAutonomous); + break; } - const bool limit_is_active = limit.has_value() && limit.value().is_active(); - const bool limit_is_deactivated = limit.has_value() && !limit.value().is_active(); - const bool limit_expired = - limit.has_value() && !limit->delete_duration() && limit->duration_nanoseconds() != 0 && - now >= (std::chrono::nanoseconds(limit->duration_nanoseconds()) + this->last_limit_received_timestamp); - - switch (this->state.load()) { - case State::Init: - if (heartbeat_has_timeout) { - set_state(State::Failsafe); - break; - } - if ((now - this->init_timestamp) > LPC_TIMEOUT) { - set_state(State::UnlimitedAutonomous); - break; - } - if (limit_is_active) { - set_state(State::Limited); - break; - } - if (limit_is_deactivated) { - set_state(State::UnlimitedControlled); - } + if (limit_is_active) { + set_state(State::Limited); break; - case State::Limited: - if (heartbeat_has_timeout) { - set_state(State::Failsafe); - break; - } - if (limit_is_deactivated or limit_expired) { - set_state(State::UnlimitedControlled); - } - // duration of limit expired, deactivated power limit received -> UnlimitedControlled + } + if (limit_is_deactivated) { + set_state(State::UnlimitedControlled); + } + break; + case State::Limited: + if (heartbeat_has_timeout) { + set_state(State::Failsafe); break; - case State::UnlimitedControlled: - if (heartbeat_has_timeout) { - set_state(State::Failsafe); - break; - } - if (limit_is_active) { - set_state(State::Limited); - } + } + if (limit_is_deactivated or limit_expired) { + set_state(State::UnlimitedControlled); + } + // duration of limit expired, deactivated power limit received -> UnlimitedControlled + break; + case State::UnlimitedControlled: + if (heartbeat_has_timeout) { + set_state(State::Failsafe); break; - case State::UnlimitedAutonomous: - if (heartbeat_has_timeout) { - set_state(State::Failsafe); - break; - } - if (limit_is_deactivated or limit_expired) { - set_state(State::UnlimitedControlled); - break; - } + } + if (limit_is_active) { + set_state(State::Limited); + } + break; + case State::UnlimitedAutonomous: + if (heartbeat_has_timeout) { + set_state(State::Failsafe); + break; + } + if (limit_is_deactivated or limit_expired) { + set_state(State::UnlimitedControlled); + break; + } + if (limit_is_active) { + set_state(State::Limited); + } + break; + case State::Failsafe: + // No heartbeat stay in failsafe + if (heartbeat_has_timeout) { + break; + } + // Received heartbeat with a new limit + if (this->last_limit_received_timestamp >= this->failsafe_entry_timestamp) { if (limit_is_active) { set_state(State::Limited); + } else if (limit_is_deactivated) { + set_state(State::UnlimitedControlled); } break; - case State::Failsafe: - // No heartbeat stay in failsafe - if (heartbeat_has_timeout) { - break; - } - // Received heartbeat with a new limit - if (this->last_limit_received_timestamp >= this->failsafe_entry_timestamp) { - if (limit_is_active) { - set_state(State::Limited); - } else if (limit_is_deactivated) { - set_state(State::UnlimitedControlled); - } - break; - } - // Received heartbeat, but no new limit within failsafe duration timeout + 120 seconds spec timeout - if (now > (this->failsafe_entry_timestamp + this->failsafe_duration_timeout + - std::chrono::seconds(LPC_TIMEOUT))) { - set_state(State::UnlimitedAutonomous); - break; - } + } + // Received heartbeat, but no new limit within failsafe duration timeout + 120 seconds spec timeout + if (now > + (this->failsafe_entry_timestamp + this->failsafe_duration_timeout + std::chrono::seconds(LPC_TIMEOUT))) { + set_state(State::UnlimitedAutonomous); break; } + break; + } - const bool state_has_changed = this->state_changed.exchange(false); - const bool limit_has_changed = this->limit_value_changed.exchange(false); - if (state_has_changed || limit_has_changed) { - this->apply_limit_for_current_state(); - } + if (this->state_changed || this->limit_value_changed) { + this->state_changed = false; + this->limit_value_changed = false; + this->apply_limit_for_current_state(); } } @@ -289,11 +269,8 @@ void LpcUseCaseHandler::update_limit_from_event() { EVLOG_error << "ConsumptionLimit failed: " << status.error_message(); return; } - { - std::lock_guard lock(this->limit_mutex); - this->current_limit = response.load_limit(); - this->last_limit_received_timestamp = std::chrono::steady_clock::now(); - } + this->current_limit = response.load_limit(); + this->last_limit_received_timestamp = std::chrono::steady_clock::now(); this->limit_value_changed = true; } @@ -313,11 +290,8 @@ void LpcUseCaseHandler::start_heartbeat() { void LpcUseCaseHandler::apply_limit_for_current_state() { types::energy::ExternalLimits limits; - State state_copy = this->state.load(); - - switch (state_copy) { + switch (this->state) { case State::Limited: { - std::lock_guard lock(this->limit_mutex); if (this->current_limit.has_value()) { limits = translate_to_external_limits(this->current_limit.value()); } @@ -331,33 +305,29 @@ void LpcUseCaseHandler::apply_limit_for_current_state() { break; } case State::UnlimitedControlled: - case State::UnlimitedAutonomous: { + case State::UnlimitedAutonomous: + case State::Init: { common_types::LoadLimit unlimited_limit; unlimited_limit.set_is_active(false); limits = translate_to_external_limits(unlimited_limit); break; } - case State::Init: - // TODO(mlitre): Check what to apply - break; } this->callbacks.update_limits_callback(limits); } void LpcUseCaseHandler::set_state(State new_state) { - State old_state = this->state.load(); + State old_state = this->state; if (old_state != new_state) { EVLOG_info << "LPC Use Case Handler changing state from " << state_to_string(old_state) << " to " << state_to_string(new_state); - this->state.store(new_state); + this->state = new_state; this->state_changed = true; if (new_state == State::Failsafe) { this->failsafe_entry_timestamp = std::chrono::steady_clock::now(); } - - this->cv.notify_all(); } } diff --git a/modules/EnergyManagement/EEBUS/doc.rst b/modules/EnergyManagement/EEBUS/doc.rst index bb5cb06216..7d01575b08 100644 --- a/modules/EnergyManagement/EEBUS/doc.rst +++ b/modules/EnergyManagement/EEBUS/doc.rst @@ -44,16 +44,18 @@ The module's main class is ``EEBUS``. Its ``init()`` method orchestrates the set 4. **Use Case Logic**: For the LPC use case, an ``LpcUseCaseHandler`` is created. This class implements the LPC state machine as defined by the EEBUS specification. -5. **Event Handling**: To receive events (like new power limits or heartbeats) from the gRPC service, a ``UseCaseEventReader`` is started. It runs in the background, listens for incoming events, and pushes them into a queue within the ``LpcUseCaseHandler``. +6. **Event Handling**: The main module's event loop (`event_handler`) periodically calls the `sync()` method of the `EebusConnectionHandler`. This `sync()` method, in turn, runs the `EebusConnectionHandler`'s internal event loop (`m_handler`) once. This internal loop is responsible for handling all subsequent events. -6. **State Machine and Limit Calculation**: The ``LpcUseCaseHandler`` processes these events in a worker thread. Based on the events and its internal state (e.g., ``Limited``, ``Failsafe``), it determines the current power limit. +7. **Receiving gRPC Events**: A `UseCaseEventReader` runs in the background listening for incoming events from the gRPC service. When an event is received, it invokes a callback that posts an action to the `EebusConnectionHandler`'s internal event loop (`m_handler`). -7. **Publishing Limits**: The calculated limits are then translated into an EVerest ``ExternalLimits`` schedule using the ``helper::translate_to_external_limits`` function. This schedule is published to the ``EnergyManager`` (or another connected module) via the ``eebus_energy_sink`` required interface. +8. **State Machine and Limit Calculation**: The `m_handler` event loop executes the queued action, which calls the `LpcUseCaseHandler` to handle the event. The handler processes the event, runs its internal state machine, and determines the current power limit. The state machine is also periodically triggered by a timer within this same internal event loop. + +9. **Publishing Limits**: The calculated limits are then translated into an EVerest ``ExternalLimits`` schedule using the ``helper::translate_to_external_limits`` function. This schedule is published to the ``EnergyManager`` (or another connected module) via the ``eebus_energy_sink`` required interface. State Machine Diagram ===================== -The following diagram shows the state machine of the `LpcUseCaseHandler`. +The following diagram shows the state machine of the `LpcUseCaseHandler`, which is responsible for the "Limitation of Power Consumption" (LPC) logic. .. code-block:: plantuml @@ -96,6 +98,7 @@ The handler processes the following events: - ``DataUpdateFailsafeDurationMinimum``: Update of the minimum failsafe duration. - ``DataUpdateFailsafeConsumptionActivePowerLimit``: Update of the failsafe power limit. - ``WriteApprovalRequired``: The handler needs to approve pending writes from the EEBUS service. +- ``UseCaseSupportUpdate``: The handler receives an update about use case support. This is currently ignored. Based on its state and the received limits, the module publishes ``ExternalLimits`` to the ``eebus_energy_sink``, which is typically connected to an ``EnergyManager`` module. @@ -115,41 +118,49 @@ This sequence diagram illustrates the code flow when an event is received from t participant UseCaseEventReader participant "gRPC Service" as GrpcService - EEBUS -> EebusConnectionHandler : create - EebusConnectionHandler -> LpcUseCaseHandler : create - EebusConnectionHandler -> UseCaseEventReader : create - EEBUS -> EebusConnectionHandler : initialize_connection() - EEBUS -> EebusConnectionHandler : add_lpc_use_case() - EEBUS -> EebusConnectionHandler : subscribe_to_events() + EEBUS ->> EebusConnectionHandler : create() + activate EebusConnectionHandler + EebusConnectionHandler -> EebusConnectionHandler : initialize_connection() + deactivate EebusConnectionHandler + + EEBUS ->> EebusConnectionHandler : add_use_case(LPC) activate EebusConnectionHandler - EebusConnectionHandler -> UseCaseEventReader : start() + EebusConnectionHandler ->> LpcUseCaseHandler : create() + EebusConnectionHandler ->> UseCaseEventReader : create() + EebusConnectionHandler ->> GrpcService : AddUseCase() + EebusConnectionHandler ->> UseCaseEventReader : start() activate UseCaseEventReader - UseCaseEventReader -> GrpcService : SubscribeUseCaseEvents() + UseCaseEventReader ->> GrpcService : SubscribeUseCaseEvents() deactivate UseCaseEventReader deactivate EebusConnectionHandler - EEBUS -> EebusConnectionHandler : start_service() + EEBUS ->> EebusConnectionHandler : done_adding_use_case() activate EebusConnectionHandler - EebusConnectionHandler -> LpcUseCaseHandler : start() - activate LpcUseCaseHandler - deactivate LpcUseCaseHandler + EebusConnectionHandler -> EebusConnectionHandler : start_service() + EebusConnectionHandler ->> GrpcService : StartService() + EebusConnectionHandler ->> LpcUseCaseHandler : start() deactivate EebusConnectionHandler ... event arrives ... - GrpcService -> UseCaseEventReader : OnReadDone() + GrpcService ->> UseCaseEventReader : OnReadDone(event) activate UseCaseEventReader - UseCaseEventReader -> LpcUseCaseHandler : handle_event(event) - - LpcUseCaseHandler -> LpcUseCaseHandler : push event to queue & notify + UseCaseEventReader ->> EebusConnectionHandler : event_callback(event) deactivate UseCaseEventReader + activate EebusConnectionHandler + EebusConnectionHandler -> EebusConnectionHandler : m_handler.add_action() + note right: Event is queued in the event loop - LpcUseCaseHandler -> LpcUseCaseHandler : run() loop wakes up + ... event loop runs ... + + EebusConnectionHandler ->> LpcUseCaseHandler : handle_event(event) activate LpcUseCaseHandler - LpcUseCaseHandler -> LpcUseCaseHandler : process event, set_state() + LpcUseCaseHandler -> LpcUseCaseHandler : run_state_machine() LpcUseCaseHandler -> LpcUseCaseHandler : apply_limit_for_current_state() - LpcUseCaseHandler -> EEBUS : callbacks.update_limits_callback() + LpcUseCaseHandler ->> EEBUS : callbacks.update_limits_callback() deactivate LpcUseCaseHandler + deactivate EebusConnectionHandler + @enduml @@ -170,6 +181,8 @@ This diagram shows the main classes within the EEBUS module and their relationsh - eebus_grpc_api_thread: thread - connection_handler: unique_ptr - callbacks: eebus::EEBusCallbacks + - event_handler: fd_event_handler + - event_handler_thread: thread + init() + ready() } @@ -179,11 +192,19 @@ This diagram shows the main classes within the EEBUS module and their relationsh - lpc_handler: unique_ptr - event_reader: unique_ptr - control_service_stub: shared_ptr - + initialize_connection() - + start_service() - + add_lpc_use_case() - + subscribe_to_events() + - m_handler: fd_event_handler + - state_machine_timer: timer_fd + - reconnection_timer: timer_fd + - last_use_case: EEBusUseCase + - last_callbacks: EEBusCallbacks + - use_case_added: bool + + add_use_case() + stop() + - initialize_connection() + - configure_service() + - create_channel_and_stub() + - reconnect() + - reset() } class LpcUseCaseHandler { @@ -191,11 +212,9 @@ This diagram shows the main classes within the EEBUS module and their relationsh - callbacks: eebus::EEBusCallbacks - stub: shared_ptr - state: State - - event_queue: queue + start() - + stop() + handle_event() - - run() + + run_state_machine() - set_state() - apply_limit_for_current_state() } @@ -224,10 +243,20 @@ This diagram shows the main classes within the EEBUS module and their relationsh EebusConnectionHandler *-- ConfigValidator LpcUseCaseHandler *-- ConfigValidator LpcUseCaseHandler ..> eebus.EEBusCallbacks - UseCaseEventReader ..> LpcUseCaseHandler : event_callback + UseCaseEventReader ..> EebusConnectionHandler : event_callback + EebusConnectionHandler ..> LpcUseCaseHandler @enduml +Robustness +========== + +The module includes several features to make it resilient against connection losses and process crashes. + +- **gRPC Process Restart**: If the module is configured to manage the ``eebus_grpc_api`` binary (via ``manage_eebus_grpc_api_binary: true``), it will automatically restart the binary if it crashes or exits unexpectedly. There is a 5-second delay between restart attempts. + +- **gRPC Reconnection**: The ``EebusConnectionHandler`` will automatically try to reconnect to the gRPC service if the connection is lost. It will attempt to reconnect every 5 seconds. Once reconnected, it will re-establish the configured use cases. + Configuration ============= @@ -238,19 +267,19 @@ Configuration * - Key - Description * - ``manage_eebus_grpc_api_binary`` - - (boolean) Whether the module should manage the eebus grpc api binary. Default: ``true`` + - (boolean) Whether the module should manage the ``eebus_grpc_api`` binary. Default: ``true`` * - ``eebus_service_port`` - - (integer) Port for the control service, this will be sent in the SetConfig call. Default: ``4715`` + - (integer) Port for the control service, this will be sent in the ``SetConfig`` call. Default: ``4715`` * - ``grpc_port`` - - (integer) Port for grpc control service connection. This is the port on which we will create our control service channel and start the grpc binary with. Default: ``50051`` + - (integer) Port for gRPC control service connection. Required if ``manage_eebus_grpc_api_binary`` is true. Default: ``50051`` * - ``eebus_ems_ski`` - - (string, required) EEBUS EMS SKI. + - (string, required) The SKI of the EEBUS energy management system (e.g. HEMS) to connect to. * - ``certificate_path`` - - (string) Path to the certificate file used by eebus go client. If relative will be prefixed with everest prefix + etc/everest/certs. Otherwise absolute file path is used. Default: ``eebus/evse_cert`` + - (string) Path to the certificate file. If relative, it will be prefixed with ``/everest/certs``. Required if ``manage_eebus_grpc_api_binary`` is true. Default: ``eebus/evse_cert`` * - ``private_key_path`` - - (string) Path to the private key file used by eebus go client. If relative will be prefixed with everest prefix + etc/everest/certs. Otherwise absolute file path is used. Default: ``eebus/evse_key`` + - (string) Path to the private key file. If relative, it will be prefixed with ``/everest/certs``. Required if ``manage_eebus_grpc_api_binary`` is true. Default: ``eebus/evse_key`` * - ``eebus_grpc_api_binary_path`` - - (string) Path to the eebus grpc api binary. If relative will be prefixed with everest prefix + libexec. Otherwise absolute file path is used. Default: ``eebus_grpc_api`` + - (string) Path to the ``eebus_grpc_api`` binary. If relative, it will be prefixed with ````. Required if ``manage_eebus_grpc_api_binary`` is true. Default: ``eebus_grpc_api`` * - ``vendor_code`` - (string, required) Vendor code for the configuration of the control service. * - ``device_brand`` @@ -259,10 +288,10 @@ Configuration - (string, required) Device model for the configuration of the control service. * - ``serial_number`` - (string, required) Serial number for the configuration of the control service. - * - ``failsafe_control_limit`` - - (integer) Failsafe control limit for LPC use case. This will also be used for the default consumption limit, unit is Watts. Default: ``4200`` - * - ``max_nominal_power`` - - (integer) Maximum nominal power of the charging station. This is the max power the CS can consume. Default: ``32000`` + * - ``failsafe_control_limit_W`` + - (integer) Failsafe control limit for the LPC use case in Watts. This is also used for the default consumption limit. Default: ``4200`` + * - ``max_nominal_power_W`` + - (integer) Maximum nominal power of the charging station in Watts. This is the maximum power the CS can consume. Default: ``32000`` Provided and required interfaces ================================ diff --git a/modules/EnergyManagement/EEBUS/helper.cpp b/modules/EnergyManagement/EEBUS/helper.cpp index 2b533ba945..8f11e17525 100644 --- a/modules/EnergyManagement/EEBUS/helper.cpp +++ b/modules/EnergyManagement/EEBUS/helper.cpp @@ -2,53 +2,41 @@ namespace module { -bool compare_use_case(const control_service::UseCase& uc1, const control_service::UseCase& uc2) { - if (uc1.actor() != uc2.actor()) { - return false; - } - if (uc1.name() != uc2.name()) { - return false; - } - return true; -} - -types::energy::ScheduleReqEntry create_active_schedule_req_entry(std::chrono::time_point timestamp, - double total_power_W) { - types::energy::ScheduleReqEntry schedule_req_entry; - types::energy::LimitsReq limits_req; - schedule_req_entry.timestamp = Everest::Date::to_rfc3339(timestamp); - types::energy::NumberWithSource total_power; - total_power.value = total_power_W; - total_power.source = "EEBUS LPC"; - limits_req.total_power_W = total_power; - schedule_req_entry.limits_to_leaves = limits_req; - schedule_req_entry.limits_to_root = limits_req; - return schedule_req_entry; -} - -types::energy::ScheduleReqEntry create_inactive_schedule_req_entry(std::chrono::time_point timestamp) { - types::energy::ScheduleReqEntry schedule_req_entry; - schedule_req_entry.timestamp = Everest::Date::to_rfc3339(timestamp); - schedule_req_entry.limits_to_leaves = types::energy::LimitsReq(); - schedule_req_entry.limits_to_root = types::energy::LimitsReq(); - return schedule_req_entry; -} - types::energy::ExternalLimits translate_to_external_limits(const common_types::LoadLimit& load_limit) { types::energy::ExternalLimits limits; std::vector schedule_import; + + auto create_active_req = [](std::chrono::time_point timestamp, double total_power_W) { + types::energy::ScheduleReqEntry schedule_req_entry; + types::energy::LimitsReq limits_req; + schedule_req_entry.timestamp = Everest::Date::to_rfc3339(timestamp); + types::energy::NumberWithSource total_power; + total_power.value = total_power_W; + total_power.source = "EEBUS LPC"; + limits_req.total_power_W = total_power; + schedule_req_entry.limits_to_leaves = limits_req; + schedule_req_entry.limits_to_root = limits_req; + return schedule_req_entry; + }; + + auto create_inactive_req = [](std::chrono::time_point timestamp) { + types::energy::ScheduleReqEntry schedule_req_entry; + schedule_req_entry.timestamp = Everest::Date::to_rfc3339(timestamp); + schedule_req_entry.limits_to_leaves = types::energy::LimitsReq(); + schedule_req_entry.limits_to_root = types::energy::LimitsReq(); + return schedule_req_entry; + }; + + const auto now = date::utc_clock::from_sys(std::chrono::system_clock::now()); + if (load_limit.is_active()) { - schedule_import.push_back(create_active_schedule_req_entry( - date::utc_clock::from_sys(std::chrono::system_clock::now()), load_limit.value())); + schedule_import.push_back(create_active_req(now, load_limit.value())); if (load_limit.duration_nanoseconds() > 0) { - std::chrono::time_point timestamp; - timestamp = date::utc_clock::from_sys(std::chrono::system_clock::now()); - timestamp += std::chrono::nanoseconds(load_limit.duration_nanoseconds()); - schedule_import.push_back(create_inactive_schedule_req_entry(timestamp)); + const auto timestamp = now + std::chrono::nanoseconds(load_limit.duration_nanoseconds()); + schedule_import.push_back(create_inactive_req(timestamp)); } } else { - schedule_import.push_back( - create_inactive_schedule_req_entry(date::utc_clock::from_sys(std::chrono::system_clock::now()))); + schedule_import.push_back(create_inactive_req(now)); } limits.schedule_import = schedule_import; return limits; diff --git a/modules/EnergyManagement/EEBUS/include/ConfigValidator.hpp b/modules/EnergyManagement/EEBUS/include/ConfigValidator.hpp index c9c55f0c0e..1818a8cf21 100644 --- a/modules/EnergyManagement/EEBUS/include/ConfigValidator.hpp +++ b/modules/EnergyManagement/EEBUS/include/ConfigValidator.hpp @@ -27,6 +27,8 @@ class ConfigValidator { int get_failsafe_control_limit() const; int get_max_nominal_power() const; std::string get_eebus_ems_ski() const; + int get_restart_delay_s() const; + int get_reconnect_delay_s() const; private: bool validate_eebus_service_port() const; diff --git a/modules/EnergyManagement/EEBUS/include/EebusConnectionHandler.hpp b/modules/EnergyManagement/EEBUS/include/EebusConnectionHandler.hpp index 475c0482b5..744a72f43d 100644 --- a/modules/EnergyManagement/EEBUS/include/EebusConnectionHandler.hpp +++ b/modules/EnergyManagement/EEBUS/include/EebusConnectionHandler.hpp @@ -10,26 +10,59 @@ #include #include +#include +#include +#include #include namespace module { -class EebusConnectionHandler { +namespace eebus { +enum class EEBusUseCase { + LPC +}; +} // namespace eebus + +enum class EebusConnectionEvents { + CONNECTED, + DISCONNECTED, + DONE_ADDING_USE_CASES, + STARTED +}; + +enum class State { + INIT, + CONNECTED, + DISCONNECTED, + ADDING_USE_CASES, + READY_TO_START, + RUNNING +}; + +class EebusConnectionHandler : public everest::lib::io::event::fd_event_sync_interface { public: explicit EebusConnectionHandler(std::shared_ptr config); - ~EebusConnectionHandler(); + ~EebusConnectionHandler() override; EebusConnectionHandler(const EebusConnectionHandler&) = delete; EebusConnectionHandler& operator=(const EebusConnectionHandler&) = delete; - EebusConnectionHandler(EebusConnectionHandler&&) = default; - EebusConnectionHandler& operator=(EebusConnectionHandler&&) = default; + EebusConnectionHandler(EebusConnectionHandler&&) = delete; + EebusConnectionHandler& operator=(EebusConnectionHandler&&) = delete; - bool initialize_connection(); - bool start_service(); - bool add_lpc_use_case(const eebus::EEBusCallbacks& callbacks); - void subscribe_to_events(); + everest::lib::io::event::sync_status sync() override; + int get_poll_fd() override; + + bool add_use_case(eebus::EEBusUseCase use_case, const eebus::EEBusCallbacks& callbacks); + void done_adding_use_case(); void stop(); private: + void start_service(); + void handle_event(EebusConnectionEvents event); + bool initialize_connection(); + bool create_channel_and_stub(); + bool configure_service(); + void reconnect(); + void reset(); static bool wait_for_channel_ready(const std::shared_ptr& channel, std::chrono::milliseconds timeout); @@ -40,6 +73,16 @@ class EebusConnectionHandler { std::shared_ptr control_service_channel; std::shared_ptr control_service_stub; + everest::lib::io::event::fd_event_handler m_handler; + State state; + + everest::lib::io::event::timer_fd state_machine_timer; + everest::lib::io::event::timer_fd reconnection_timer; + + // store last added use case for reconnection + eebus::EEBusUseCase last_use_case; + eebus::EEBusCallbacks last_callbacks; + bool use_case_added{false}; }; } // namespace module diff --git a/modules/EnergyManagement/EEBUS/include/LpcUseCaseHandler.hpp b/modules/EnergyManagement/EEBUS/include/LpcUseCaseHandler.hpp index ee4a4d6165..091ddc85f5 100644 --- a/modules/EnergyManagement/EEBUS/include/LpcUseCaseHandler.hpp +++ b/modules/EnergyManagement/EEBUS/include/LpcUseCaseHandler.hpp @@ -29,22 +29,24 @@ class LpcUseCaseHandler { }; LpcUseCaseHandler(std::shared_ptr config, eebus::EEBusCallbacks callbacks); - ~LpcUseCaseHandler(); + ~LpcUseCaseHandler() = default; LpcUseCaseHandler(const LpcUseCaseHandler&) = delete; LpcUseCaseHandler& operator=(const LpcUseCaseHandler&) = delete; LpcUseCaseHandler(LpcUseCaseHandler&&) = delete; LpcUseCaseHandler& operator=(LpcUseCaseHandler&&) = delete; void start(); - void stop(); void set_stub(std::shared_ptr stub); void handle_event(const control_service::UseCaseEvent& event); + void run_state_machine(); static control_service::UseCase get_use_case_info(); void configure_use_case(); private: - void run(); + using event_handler_t = void (LpcUseCaseHandler::*)(); + + void initialize_event_handlers(); void set_state(State new_state); static std::string state_to_string(State state); @@ -53,25 +55,25 @@ class LpcUseCaseHandler { void apply_limit_for_current_state(); void start_heartbeat(); + void handle_data_update_heartbeat(); + void handle_data_update_limit(); + void handle_data_update_failsafe_duration_minimum(); + void handle_data_update_failsafe_consumption_active_power_limit(); + void handle_write_approval_required(); + void handle_use_case_support_update(); + std::shared_ptr config; eebus::EEBusCallbacks callbacks; std::shared_ptr stub; - std::thread worker_thread; std::atomic running{false}; - std::atomic state; - std::atomic state_changed{false}; - std::condition_variable cv; + State state; + bool state_changed{false}; - std::queue event_queue; - std::mutex queue_mutex; - - std::atomic limit_value_changed{false}; + bool limit_value_changed{false}; std::optional current_limit; - std::mutex limit_mutex; std::chrono::time_point last_heartbeat_timestamp; - std::mutex heartbeat_mutex; std::chrono::seconds heartbeat_timeout; std::chrono::time_point init_timestamp; @@ -79,6 +81,8 @@ class LpcUseCaseHandler { std::chrono::time_point last_limit_received_timestamp; std::chrono::nanoseconds failsafe_duration_timeout; double failsafe_control_limit; + + std::map event_handlers; }; } // namespace module diff --git a/modules/EnergyManagement/EEBUS/include/helper.hpp b/modules/EnergyManagement/EEBUS/include/helper.hpp index 6d18c4a863..07b5d427ab 100644 --- a/modules/EnergyManagement/EEBUS/include/helper.hpp +++ b/modules/EnergyManagement/EEBUS/include/helper.hpp @@ -13,8 +13,6 @@ namespace module { -bool compare_use_case(const control_service::UseCase& uc1, const control_service::UseCase& uc2); - types::energy::ExternalLimits translate_to_external_limits(const common_types::LoadLimit& load_limit); } // namespace module diff --git a/modules/EnergyManagement/EEBUS/manifest.yaml b/modules/EnergyManagement/EEBUS/manifest.yaml index fb6345ad0d..ebef25fadf 100644 --- a/modules/EnergyManagement/EEBUS/manifest.yaml +++ b/modules/EnergyManagement/EEBUS/manifest.yaml @@ -55,6 +55,14 @@ config: description: Maximum nominal power of the charging station. This is the max power the CS can consume. type: integer default: 32000 + restart_delay_s: + description: "Delay in seconds before restarting the eebus_grpc_api binary." + type: integer + default: 5 + reconnect_delay_s: + description: "Delay in seconds before trying to reconnect to the gRPC service." + type: integer + default: 5 provides: main: description: main