From 28c7bb2e31c60d54ee80f2702879774fca984bbf Mon Sep 17 00:00:00 2001
From: Michael Orlov <michael.orlov@apex.ai>
Date: Sun, 21 May 2023 17:55:34 -0700
Subject: [PATCH] Gracefully handle CTRL+C and CTR+Break events on Windows

- Add handler for the native Windows CTRL_C_EVENT, CTRL_BREAK_EVENT and
CTRL_CLOSE_EVENT in rosbag2 recorder and player.
- Map SIGINT signal to the native Windows CTRL_C_EVENT in the
 stop_execution(handle, signum) version for Windows. To be able
 correctly use start and stop execution routines from unit tests.
- Enable integration tests which was disabled for Windows due to the
incorrect sending and handling of the SIGINT event.
- Got rid of the `finalize_metadata_kludge(..)` helper function.

Signed-off-by: Michael <michael.orlov@apex.ai>
---
 rosbag2_py/src/rosbag2_py/_transport.cpp      | 88 ++++++++++++++++++-
 .../process_execution_helpers_windows.hpp     | 54 +++++++++---
 .../test/rosbag2_tests/record_fixture.hpp     | 40 +--------
 .../test_rosbag2_play_end_to_end.cpp          |  2 -
 .../test_rosbag2_record_end_to_end.cpp        | 19 +---
 .../rosbag2_transport_test_fixture.hpp        |  5 --
 6 files changed, 129 insertions(+), 79 deletions(-)

diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp
index e5a4fc8f35..a58b6300df 100644
--- a/rosbag2_py/src/rosbag2_py/_transport.cpp
+++ b/rosbag2_py/src/rosbag2_py/_transport.cpp
@@ -31,6 +31,11 @@
 
 #include "./pybind11.hpp"
 
+#ifdef _WIN32
+#include <windows.h>
+#include <iostream>
+#endif
+
 namespace py = pybind11;
 typedef std::unordered_map<std::string, rclcpp::QoS> QoSMap;
 
@@ -132,7 +137,49 @@ class Player
 public:
   Player()
   {
-    rclcpp::init(0, nullptr);
+    auto init_options = rclcpp::InitOptions();
+    init_options.shutdown_on_signal = false;
+    rclcpp::init(0, nullptr, init_options, rclcpp::SignalHandlerOptions::None);
+    std::signal(
+      SIGTERM, [](int /* signal */) {
+        rclcpp::shutdown();
+      });
+    std::signal(
+      SIGINT, [](int /* signal */) {
+        rclcpp::shutdown();
+      });
+#ifdef _WIN32
+    // According to the
+    // https://learn.microsoft.com/en-us/cpp/c-runtime-library/reference/signal?view=msvc-170
+    // SIGINT is not supported for any Win32 application. When a CTRL+C interrupt occurs,
+    // Win32 operating systems generate a new thread to specifically handle that interrupt.
+    // Therefore, need to use native Windows control event handler as analog for the SIGINT.
+
+    // Enable default CTRL+C handler first. This is workaround and needed for the cases when
+    // process created with CREATE_NEW_PROCESS_GROUP flag. Without it, installing custom Ctrl+C
+    // handler will not work.
+    if (!SetConsoleCtrlHandler(nullptr, false)) {
+      std::cerr << "Rosbag2 player error: Failed to enable default CTL+C handler.\n";
+    }
+    // Installing our own control handler
+    auto CtrlHandler = [](DWORD fdwCtrlType) -> BOOL {
+        switch (fdwCtrlType) {
+          case CTRL_C_EVENT:
+          case CTRL_BREAK_EVENT:
+          case CTRL_CLOSE_EVENT:
+            rclcpp::shutdown();
+            return TRUE;
+          case CTRL_SHUTDOWN_EVENT:
+            rclcpp::shutdown();
+            return FALSE;
+          default:
+            return FALSE;
+        }
+      };
+    if (!SetConsoleCtrlHandler(CtrlHandler, TRUE)) {
+      std::cerr << "Rosbag2 player error: Could not set control handler.\n";
+    }
+#endif  // #ifdef _WIN32
   }
 
   virtual ~Player()
@@ -157,7 +204,7 @@ class Player
     player->play();
 
     exec.cancel();
-    spin_thread.join();
+    if (spin_thread.joinable()) {spin_thread.join();}
   }
 
   void burst(
@@ -182,8 +229,8 @@ class Player
     player->burst(num_messages);
 
     exec.cancel();
-    spin_thread.join();
-    play_thread.join();
+    if (spin_thread.joinable()) {spin_thread.join();}
+    if (play_thread.joinable()) {play_thread.join();}
   }
 };
 
@@ -208,6 +255,39 @@ class Recorder
       SIGINT, [](int /* signal */) {
         rosbag2_py::Recorder::cancel();
       });
+#ifdef _WIN32
+    // According to the
+    // https://learn.microsoft.com/en-us/cpp/c-runtime-library/reference/signal?view=msvc-170
+    // SIGINT is not supported for any Win32 application. When a CTRL+C interrupt occurs,
+    // Win32 operating systems generate a new thread to specifically handle that interrupt.
+    // Therefore, need to use native Windows control event handler as analog for the SIGINT.
+
+    // Enable default CTRL+C handler first. This is workaround and needed for the cases when
+    // process created with CREATE_NEW_PROCESS_GROUP flag. Without it, installing custom Ctrl+C
+    // handler will not work.
+    if (!SetConsoleCtrlHandler(nullptr, false)) {
+      std::cerr << "Rosbag2 recorder error: Failed to enable default CTL+C handler.\n";
+    }
+    // Installing our own control handler
+    auto CtrlHandler = [](DWORD fdwCtrlType) -> BOOL {
+        switch (fdwCtrlType) {
+          case CTRL_C_EVENT:
+          case CTRL_BREAK_EVENT:
+          case CTRL_CLOSE_EVENT:
+            rosbag2_py::Recorder::cancel();
+            return TRUE;
+          case CTRL_SHUTDOWN_EVENT:
+            rosbag2_py::Recorder::cancel();
+            rclcpp::shutdown();
+            return FALSE;
+          default:
+            return FALSE;
+        }
+      };
+    if (!SetConsoleCtrlHandler(CtrlHandler, TRUE)) {
+      std::cerr << "Rosbag2 recorder error: Could not set control handler.\n";
+    }
+#endif  // #ifdef _WIN32
   }
 
   virtual ~Recorder()
diff --git a/rosbag2_test_common/include/rosbag2_test_common/process_execution_helpers_windows.hpp b/rosbag2_test_common/include/rosbag2_test_common/process_execution_helpers_windows.hpp
index eb840c481d..5bc83bf196 100644
--- a/rosbag2_test_common/include/rosbag2_test_common/process_execution_helpers_windows.hpp
+++ b/rosbag2_test_common/include/rosbag2_test_common/process_execution_helpers_windows.hpp
@@ -18,7 +18,7 @@
 #include <gmock/gmock.h>
 
 #include <direct.h>
-#include <Windows.h>
+#include <windows.h>
 
 #include <chrono>
 #include <csignal>
@@ -46,7 +46,10 @@ PROCESS_INFORMATION create_process(TCHAR * command, const char * path = nullptr)
     nullptr,
     nullptr,
     false,
-    0,
+    // Create process suspended and resume it after adding to the newly created job. Otherwise,
+    // there is a potential race condition where newly created process starts a subprocess
+    // before we've called AssignProcessToJobObject();
+    CREATE_NEW_PROCESS_GROUP | CREATE_SUSPENDED,
     nullptr,
     path,
     &start_up_info,
@@ -73,8 +76,9 @@ int execute_and_wait_until_completion(const std::string & command, const std::st
   const_char_to_tchar(command.c_str(), command_char);
 
   auto process = create_process(command_char, path.c_str());
-  DWORD exit_code = 259;  // 259 is the code one gets if the process is still active.
-  while (exit_code == 259) {
+  ResumeThread(process.hThread);
+  DWORD exit_code = STILL_ACTIVE;
+  while (exit_code == STILL_ACTIVE) {
     EXPECT_TRUE(GetExitCodeProcess(process.hProcess, &exit_code));
     std::this_thread::sleep_for(std::chrono::milliseconds(10));
   }
@@ -97,6 +101,7 @@ ProcessHandle start_execution(const std::string & command)
   auto process_info = create_process(command_char);
 
   AssignProcessToJobObject(h_job, process_info.hProcess);
+  ResumeThread(process_info.hThread);
   Process process;
   process.process_info = process_info;
   process.job_handle = h_job;
@@ -117,12 +122,12 @@ bool wait_until_completion(
   DWORD exit_code = 0;
   std::chrono::steady_clock::time_point const start = std::chrono::steady_clock::now();
   EXPECT_TRUE(GetExitCodeProcess(handle.process_info.hProcess, &exit_code));
-  // 259 indicates that the process is still active
-  while (exit_code == 259 && std::chrono::steady_clock::now() - start < timeout) {
+  while (exit_code == STILL_ACTIVE && std::chrono::steady_clock::now() - start < timeout) {
     std::this_thread::sleep_for(std::chrono::milliseconds(10));
     EXPECT_TRUE(GetExitCodeProcess(handle.process_info.hProcess, &exit_code));
   }
-  return exit_code != 259;
+  EXPECT_EQ(exit_code, 0);
+  return exit_code != STILL_ACTIVE;
 }
 
 /// @brief Force to stop process with signal if it's currently running
@@ -135,16 +140,37 @@ void stop_execution(
   std::chrono::duration<double> timeout = std::chrono::seconds(10))
 {
   // Match the Unix version by allowing for int signum argument - however Windows does not have
-  // Linux signals in the same way, so there isn't a 1:1 mapping to dispatch e.g. SIGTERM
-  DWORD exit_code;
+  // Linux signals in the same way, so there isn't a 1:1 mapping to dispatch e.g. SIGINT or SIGTERM
+  DWORD exit_code = STILL_ACTIVE;
   EXPECT_TRUE(GetExitCodeProcess(handle.process_info.hProcess, &exit_code));
-  // 259 indicates that the process is still active: we want to make sure that the process is
-  // still running properly before killing it.
-  if (exit_code == 259) {
-    EXPECT_TRUE(GenerateConsoleCtrlEvent(CTRL_C_EVENT, handle.process_info.dwThreadId));
+  // Make sure that the process is still running properly before stopping it.
+  if (exit_code == STILL_ACTIVE) {
+    switch (signum) {
+      // According to the
+      // https://learn.microsoft.com/en-us/cpp/c-runtime-library/reference/signal?view=msvc-170
+      // SIGINT and SIGBREAK is not supported for any Win32 application.
+      // Need to use native Windows control event instead.
+      case SIGINT:
+        EXPECT_TRUE(GenerateConsoleCtrlEvent(CTRL_C_EVENT, handle.process_info.dwProcessId));
+        break;
+      case SIGBREAK:
+        EXPECT_TRUE(GenerateConsoleCtrlEvent(CTRL_BREAK_EVENT, handle.process_info.dwProcessId));
+        break;
+      case SIGTERM:
+      // The CTRL_CLOSE_EVENT is analog of the SIGTERM from POSIX. Windows sends CTRL_CLOSE_EVENT
+      // to all processes attached to a console when the user closes the console (either by
+      // clicking Close on the console window's window menu, or by clicking the End Task
+      // button command from Task Manager). Although according to the
+      // https://learn.microsoft.com/en-us/windows/console/generateconsolectrlevent the
+      // GenerateConsoleCtrlEvent doesn't support sending CTRL_CLOSE_EVENT. There are no way to
+      // directly send CTRL_CLOSE_EVENT to the process in the same console application.
+      // Therefore, adding SIGTERM to the unsupported events.
+      default:
+        throw std::runtime_error("Unsupported signum: " + std::to_string(signum));
+    }
     bool process_finished = wait_until_completion(handle, timeout);
     if (!process_finished) {
-      std::cerr << "Testing process " << handle.process_info.hProcess <<
+      std::cerr << "Testing process " << handle.process_info.dwProcessId <<
         " hangout. Killing it with TerminateProcess(..) \n";
       EXPECT_TRUE(TerminateProcess(handle.process_info.hProcess, 2));
       EXPECT_TRUE(process_finished);
diff --git a/rosbag2_tests/test/rosbag2_tests/record_fixture.hpp b/rosbag2_tests/test/rosbag2_tests/record_fixture.hpp
index 694e6e4973..a3f4158b05 100644
--- a/rosbag2_tests/test/rosbag2_tests/record_fixture.hpp
+++ b/rosbag2_tests/test/rosbag2_tests/record_fixture.hpp
@@ -126,7 +126,7 @@ class RecordFixture : public ParametrizedTemporaryDirectoryFixture
       std::this_thread::sleep_for(50ms);
     }
     ASSERT_EQ(metadata_io.metadata_file_exists(bag_path), true)
-      << "Could not find metadata file.";
+      << "Could not find " << bag_path << " metadata file.";
   }
 
   void wait_for_storage_file(std::chrono::duration<float> timeout = std::chrono::seconds(10))
@@ -181,44 +181,6 @@ class RecordFixture : public ParametrizedTemporaryDirectoryFixture
     return topic_it->serialization_format;
   }
 
-  void finalize_metadata_kludge(
-    int expected_splits = 0,
-    const std::string & compression_format = "",
-    const std::string & compression_mode = "")
-  {
-    // TODO(ros-tooling): Find out how to correctly send a Ctrl-C signal on Windows
-    // This is necessary as the process is killed hard on Windows and doesn't write a metadata file
-  #ifdef _WIN32
-    rosbag2_storage::BagMetadata metadata{};
-    metadata.storage_identifier = rosbag2_storage::get_default_storage_id();
-    for (int i = 0; i <= expected_splits; i++) {
-      rcpputils::fs::path bag_file_path;
-      if (!compression_format.empty()) {
-        bag_file_path = get_bag_file_path(i);
-      } else {
-        bag_file_path = get_compressed_bag_file_path(i);
-      }
-
-      if (rcpputils::fs::exists(bag_file_path)) {
-        metadata.relative_file_paths.push_back(bag_file_path.string());
-      }
-    }
-    metadata.duration = std::chrono::nanoseconds(0);
-    metadata.starting_time =
-      std::chrono::time_point<std::chrono::high_resolution_clock>(std::chrono::nanoseconds(0));
-    metadata.message_count = 0;
-    metadata.compression_mode = compression_mode;
-    metadata.compression_format = compression_format;
-
-    rosbag2_storage::MetadataIo metadata_io;
-    metadata_io.write_metadata(root_bag_path_.string(), metadata);
-  #else
-    (void)expected_splits;
-    (void)compression_format;
-    (void)compression_mode;
-  #endif
-  }
-
   // relative path to the root of the bag file.
   rcpputils::fs::path root_bag_path_;
 
diff --git a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_play_end_to_end.cpp b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_play_end_to_end.cpp
index de3ce26812..4315198dc7 100644
--- a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_play_end_to_end.cpp
+++ b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_play_end_to_end.cpp
@@ -271,7 +271,6 @@ TEST_P(PlayEndToEndTestFixture, play_filters_by_topic) {
   }
 }
 
-#ifndef _WIN32
 TEST_P(PlayEndToEndTestFixture, play_end_to_end_exits_gracefully_on_sigint) {
   sub_->add_subscription<test_msgs::msg::BasicTypes>("/test_topic", 3, sub_qos_);
   sub_->add_subscription<test_msgs::msg::Arrays>("/array_topic", 2, sub_qos_);
@@ -293,7 +292,6 @@ TEST_P(PlayEndToEndTestFixture, play_end_to_end_exits_gracefully_on_sigint) {
   stop_execution(process_id, SIGINT);
   cleanup_process_handle.cancel();
 }
-#endif  // #ifndef _WIN32
 
 INSTANTIATE_TEST_SUITE_P(
   TestPlayEndToEnd,
diff --git a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_record_end_to_end.cpp b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_record_end_to_end.cpp
index 74277e4965..e12a0f0da7 100644
--- a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_record_end_to_end.cpp
+++ b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_record_end_to_end.cpp
@@ -58,7 +58,6 @@ std::shared_ptr<test_msgs::msg::Strings> create_string_message(
 }
 }  // namespace
 
-#ifndef _WIN32
 TEST_P(RecordFixture, record_end_to_end_test_with_zstd_file_compression) {
   constexpr const char topic_name[] = "/test_topic";
   const auto compression_format = "zstd";
@@ -118,7 +117,6 @@ TEST_P(RecordFixture, record_end_to_end_test_with_zstd_file_compression) {
     EXPECT_EQ(message->string_value, "test");
   }
 }
-#endif
 
 TEST_P(RecordFixture, record_end_to_end_test) {
   auto message = get_messages_strings()[0];
@@ -149,7 +147,6 @@ TEST_P(RecordFixture, record_end_to_end_test) {
   stop_execution(process_handle);
   cleanup_process_handle.cancel();
 
-  finalize_metadata_kludge();
   wait_for_metadata();
   auto test_topic_messages = get_messages_for_topic<test_msgs::msg::Strings>("/test_topic");
   EXPECT_THAT(test_topic_messages, SizeIs(Ge(expected_test_messages)));
@@ -191,12 +188,14 @@ TEST_P(RecordFixture, record_end_to_end_test_start_paused) {
   stop_execution(process_handle);
   cleanup_process_handle.cancel();
 
-  finalize_metadata_kludge();
   wait_for_metadata();
   auto test_topic_messages = get_messages_for_topic<test_msgs::msg::Strings>("/test_topic");
   EXPECT_THAT(test_topic_messages, IsEmpty());
 }
 
+#ifndef _WIN32
+// This test is disabled for Windows platform because there are no way to send SIGTERM or its
+// alternative CTRL_CLOSE_EVENT directly to the child process in the same console application.
 TEST_P(RecordFixture, record_end_to_end_exits_gracefully_on_sigterm) {
   const std::string topic_name = "/test_sigterm";
   auto message = get_messages_strings()[0];
@@ -214,11 +213,8 @@ TEST_P(RecordFixture, record_end_to_end_exits_gracefully_on_sigterm) {
   stop_execution(process_handle, SIGTERM);
   wait_for_metadata();
 }
+#endif  // #ifndef _WIN32
 
-// TODO(zmichaels11): Fix and enable this test on Windows.
-// This tests depends on the ability to read the metadata file.
-// Stopping the process on Windows does a hard kill and the metadata file is not written.
-#ifndef _WIN32
 TEST_P(RecordFixture, record_end_to_end_with_splitting_metadata_contains_all_topics) {
   constexpr const int bagfile_split_size = 4 * 1024 * 1024;  // 4MB.
   constexpr const char first_topic_name[] = "/test_topic0";
@@ -273,7 +269,6 @@ TEST_P(RecordFixture, record_end_to_end_with_splitting_metadata_contains_all_top
   EXPECT_NE(topic_names.end(), topic_names.find(first_topic_name));
   EXPECT_NE(topic_names.end(), topic_names.find(second_topic_name));
 }
-#endif
 
 TEST_P(RecordFixture, record_end_to_end_with_splitting_bagsize_split_is_at_least_specified_size) {
   constexpr const char topic_name[] = "/test_topic";
@@ -307,7 +302,6 @@ TEST_P(RecordFixture, record_end_to_end_with_splitting_bagsize_split_is_at_least
 
   pub_manager.run_publishers();
 
-  finalize_metadata_kludge(expected_splits);
   wait_for_metadata();
   rosbag2_storage::MetadataIo metadata_io;
   const auto metadata = metadata_io.read_metadata(root_bag_path_.string());
@@ -362,7 +356,6 @@ TEST_P(RecordFixture, record_end_to_end_with_splitting_max_size_not_reached) {
   stop_execution(process_handle);
   cleanup_process_handle.cancel();
 
-  finalize_metadata_kludge();
   wait_for_metadata();
   rosbag2_storage::MetadataIo metadata_io;
   const auto metadata = metadata_io.read_metadata(root_bag_path_.string());
@@ -413,7 +406,6 @@ TEST_P(RecordFixture, record_end_to_end_with_splitting_splits_bagfile) {
   cleanup_process_handle.cancel();
 
   wait_for_metadata();
-  finalize_metadata_kludge(expected_splits);
   rosbag2_storage::MetadataIo metadata_io;
   const auto metadata = metadata_io.read_metadata(root_bag_path_.string());
 
@@ -459,7 +451,6 @@ TEST_P(RecordFixture, record_end_to_end_with_duration_splitting_splits_bagfile)
   stop_execution(process_handle);
   cleanup_process_handle.cancel();
 
-  finalize_metadata_kludge();
   wait_for_metadata();
   rosbag2_storage::MetadataIo metadata_io;
   const auto metadata = metadata_io.read_metadata(root_bag_path_.string());
@@ -506,7 +497,6 @@ TEST_P(RecordFixture, record_end_to_end_test_with_zstd_file_compression_compress
   stop_execution(process_handle);
   cleanup_process_handle.cancel();
 
-  finalize_metadata_kludge(expected_splits);
   wait_for_metadata();
   rosbag2_storage::MetadataIo metadata_io;
   const auto metadata = metadata_io.read_metadata(root_bag_path_.string());
@@ -594,7 +584,6 @@ TEST_P(RecordFixture, record_end_to_end_test_with_cache) {
   stop_execution(process_handle);
   cleanup_process_handle.cancel();
 
-  finalize_metadata_kludge();
   wait_for_metadata();
   auto test_topic_messages =
     get_messages_for_topic<test_msgs::msg::Strings>(topic_name);
diff --git a/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp b/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp
index 9e75cc84fa..42f8f8f761 100644
--- a/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp
+++ b/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp
@@ -26,11 +26,6 @@
 #include "rosbag2_cpp/types.hpp"
 #include "rosbag2_cpp/writer.hpp"
 
-#ifdef _WIN32
-# include <direct.h>
-# include <Windows.h>
-#endif
-
 #include "rosbag2_storage/storage_options.hpp"
 
 #include "rosbag2_transport/play_options.hpp"