diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index f75fd970a1ee..0c15f6f18444 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -119,7 +119,6 @@ #include "arrow/util/string.h" #include "arrow/util/task_group.h" #include "arrow/util/thread_pool.h" -#include "arrow/util/value_parsing.h" namespace arrow::fs { @@ -3579,9 +3578,10 @@ S3GlobalOptions S3GlobalOptions::Defaults() { log_level = S3LogLevel::Off; } - value = arrow::internal::GetEnvVar("ARROW_S3_THREADS").ValueOr("1"); - if (uint32_t u; ::arrow::internal::ParseUnsigned(value.data(), value.size(), &u)) { - num_event_loop_threads = u; + auto maybe_num_threads = + arrow::internal::GetEnvVarInteger("ARROW_S3_THREADS", /*min_value=*/1); + if (maybe_num_threads.ok()) { + num_event_loop_threads = static_cast(*maybe_num_threads); } return S3GlobalOptions{log_level, num_event_loop_threads}; } diff --git a/cpp/src/arrow/io/interfaces.cc b/cpp/src/arrow/io/interfaces.cc index 12c124ce213f..cdd2470b629c 100644 --- a/cpp/src/arrow/io/interfaces.cc +++ b/cpp/src/arrow/io/interfaces.cc @@ -390,23 +390,15 @@ namespace { constexpr int kDefaultNumIoThreads = 8; std::shared_ptr MakeIOThreadPool() { - int threads = 0; - auto maybe_env_var = ::arrow::internal::GetEnvVar("ARROW_IO_THREADS"); - if (maybe_env_var.ok()) { - auto str = *std::move(maybe_env_var); - if (!str.empty()) { - try { - threads = std::stoi(str); - } catch (...) { - } - if (threads <= 0) { - ARROW_LOG(WARNING) - << "ARROW_IO_THREADS does not contain a valid number of threads " - "(should be an integer > 0)"; - } - } + int threads = kDefaultNumIoThreads; + auto maybe_num_threads = ::arrow::internal::GetEnvVarInteger( + "ARROW_IO_THREADS", /*min_value=*/1, /*max_value=*/std::numeric_limits::max()); + if (maybe_num_threads.ok()) { + threads = static_cast(*maybe_num_threads); + } else if (!maybe_num_threads.status().IsKeyError()) { + maybe_num_threads.status().Warn(); } - auto maybe_pool = ThreadPool::MakeEternal(threads > 0 ? threads : kDefaultNumIoThreads); + auto maybe_pool = ThreadPool::MakeEternal(threads); if (!maybe_pool.ok()) { maybe_pool.status().Abort("Failed to create global IO thread pool"); } diff --git a/cpp/src/arrow/testing/gtest_util.cc b/cpp/src/arrow/testing/gtest_util.cc index 1acc47a99d4d..0e2cbdb644ac 100644 --- a/cpp/src/arrow/testing/gtest_util.cc +++ b/cpp/src/arrow/testing/gtest_util.cc @@ -660,21 +660,24 @@ LocaleGuard::LocaleGuard(const char* new_locale) : impl_(new Impl(new_locale)) { LocaleGuard::~LocaleGuard() {} -EnvVarGuard::EnvVarGuard(const std::string& name, const std::string& value) - : name_(name) { - auto maybe_value = arrow::internal::GetEnvVar(name); +EnvVarGuard::EnvVarGuard(std::string name, std::optional value) + : name_(std::move(name)) { + auto maybe_value = arrow::internal::GetEnvVar(name_); if (maybe_value.ok()) { - was_set_ = true; old_value_ = *std::move(maybe_value); } else { - was_set_ = false; + old_value_ = std::nullopt; + } + if (value.has_value()) { + ARROW_CHECK_OK(arrow::internal::SetEnvVar(name_, *value)); + } else { + ARROW_CHECK_OK(arrow::internal::DelEnvVar(name_)); } - ARROW_CHECK_OK(arrow::internal::SetEnvVar(name, value)); } EnvVarGuard::~EnvVarGuard() { - if (was_set_) { - ARROW_CHECK_OK(arrow::internal::SetEnvVar(name_, old_value_)); + if (old_value_.has_value()) { + ARROW_CHECK_OK(arrow::internal::SetEnvVar(name_, *old_value_)); } else { ARROW_CHECK_OK(arrow::internal::DelEnvVar(name_)); } diff --git a/cpp/src/arrow/testing/gtest_util.h b/cpp/src/arrow/testing/gtest_util.h index 62bf907a2d89..b84d253a89e8 100644 --- a/cpp/src/arrow/testing/gtest_util.h +++ b/cpp/src/arrow/testing/gtest_util.h @@ -418,9 +418,11 @@ ARROW_TESTING_EXPORT void AssertChildExit(int child_pid, int expected_exit_status = 0); #endif -// A RAII-style object that switches to a new locale, and switches back -// to the old locale when going out of scope. Doesn't do anything if the -// new locale doesn't exist on the local machine. +// A RAII-style object that temporarily switches to a new locale +// +// The guard switches back to the old locale when going out of scope. +// It doesn't do anything if the new locale doesn't exist on the local machine. +// // ATTENTION: may crash with an assertion failure on Windows debug builds. // See ARROW-6108, also https://gerrit.libreoffice.org/#/c/54110/ class ARROW_TESTING_EXPORT LocaleGuard { @@ -433,15 +435,20 @@ class ARROW_TESTING_EXPORT LocaleGuard { std::unique_ptr impl_; }; +// A RAII-style object that temporarily sets an environment variable +// +// The guard restores the variable's previous value when going out of scope, +// or deletes the variable if it was not initially set. +// The environment variable can also be temporarily deleted if std::nullopt +// is passed instead of a string value. class ARROW_TESTING_EXPORT EnvVarGuard { public: - EnvVarGuard(const std::string& name, const std::string& value); + EnvVarGuard(std::string name, std::optional value); ~EnvVarGuard(); protected: - const std::string name_; - std::string old_value_; - bool was_set_; + std::string name_; + std::optional old_value_; }; namespace internal { diff --git a/cpp/src/arrow/util/atfork_internal.cc b/cpp/src/arrow/util/atfork_internal.cc index 7772f1c62bea..fa3a09d0a2bd 100644 --- a/cpp/src/arrow/util/atfork_internal.cc +++ b/cpp/src/arrow/util/atfork_internal.cc @@ -34,6 +34,22 @@ namespace internal { namespace { +bool IsAtForkEnabled() { + static bool is_enabled = [] { + auto maybe_value = + GetEnvVarInteger("ARROW_REGISTER_ATFORK", /*min_value=*/0, /*max_value=*/1); + if (maybe_value.ok()) { + return *maybe_value != 0; + } + if (!maybe_value.status().IsKeyError()) { + maybe_value.status().Warn(); + } + // Enabled by default + return true; + }(); + return is_enabled; +} + // Singleton state for at-fork management. // We do not use global variables because of initialization order issues (ARROW-18383). // Instead, a function-local static ensures the state is initialized @@ -147,7 +163,11 @@ AtForkState* GetAtForkState() { }; // namespace void RegisterAtFork(std::weak_ptr weak_handler) { - GetAtForkState()->RegisterAtFork(std::move(weak_handler)); + // Only fetch the atfork state (and thus lazily call pthread_atfork) if enabled at all, + // to minimize potential nastiness with fork and threads. + if (IsAtForkEnabled()) { + GetAtForkState()->RegisterAtFork(std::move(weak_handler)); + } } } // namespace internal diff --git a/cpp/src/arrow/util/atfork_test.cc b/cpp/src/arrow/util/atfork_test.cc index 97910f9539c0..ea9bdca53602 100644 --- a/cpp/src/arrow/util/atfork_test.cc +++ b/cpp/src/arrow/util/atfork_test.cc @@ -190,6 +190,9 @@ TEST_F(TestAtFork, SingleThread) { ASSERT_THAT(child_after_, ElementsAre()); } +// XXX we would like to test the ARROW_REGISTER_ATFORK environment variable, +// but that would require spawning a test subprocess + # if !(defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER) || \ defined(THREAD_SANITIZER)) diff --git a/cpp/src/arrow/util/fuzz_internal.cc b/cpp/src/arrow/util/fuzz_internal.cc index 935089b2bc96..28d210333dda 100644 --- a/cpp/src/arrow/util/fuzz_internal.cc +++ b/cpp/src/arrow/util/fuzz_internal.cc @@ -36,17 +36,16 @@ MemoryPool* fuzzing_memory_pool() { void LogFuzzStatus(const Status& st, const uint8_t* data, int64_t size) { static const int kVerbosity = []() { - auto maybe_env_value = GetEnvVar("ARROW_FUZZING_VERBOSITY"); - if (maybe_env_value.status().IsKeyError()) { - return 0; + auto maybe_env_value = + GetEnvVarInteger("ARROW_FUZZING_VERBOSITY", /*min_value=*/0, /*max_value=*/1); + if (maybe_env_value.ok()) { + return static_cast(*maybe_env_value); } - auto env_value = std::move(maybe_env_value).ValueOrDie(); - int32_t value; - if (!ParseValue(env_value.data(), env_value.length(), &value)) { - Status::Invalid("Invalid value for ARROW_FUZZING_VERBOSITY: '", env_value, "'") - .Abort(); + if (!maybe_env_value.status().IsKeyError()) { + maybe_env_value.status().Abort(); } - return value; + // Quiet by default + return 0; }(); if (!st.ok() && kVerbosity >= 1) { diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc index b3ef48d29651..03acd8297d41 100644 --- a/cpp/src/arrow/util/io_util.cc +++ b/cpp/src/arrow/util/io_util.cc @@ -99,6 +99,7 @@ #include "arrow/util/io_util.h" #include "arrow/util/logging_internal.h" #include "arrow/util/mutex.h" +#include "arrow/util/value_parsing.h" // For filename conversion #if defined(_WIN32) @@ -1762,19 +1763,28 @@ Result GetEnvVar(std::string_view name) { #ifdef _WIN32 // On Windows, getenv() reads an early copy of the process' environment // which doesn't get updated when SetEnvironmentVariable() is called. - constexpr int32_t bufsize = 2000; - char c_str[bufsize]; - auto res = GetEnvironmentVariableA(name.data(), c_str, bufsize); - if (res >= bufsize) { - return Status::CapacityError("environment variable value too long"); - } else if (res == 0) { - return Status::KeyError("environment variable '", name, "'undefined"); - } - return std::string(c_str); + std::string value(100, '\0'); + + uint32_t res = GetEnvironmentVariableA(name.data(), value.data(), + static_cast(value.size())); + if (res >= value.size()) { + // Value buffer too small, need to upsize + // (`res` includes the null-terminating character in this case) + value.resize(res); + res = GetEnvironmentVariableA(name.data(), value.data(), + static_cast(value.size())); + } + if (res == 0) { + return Status::KeyError("environment variable '", name, "' undefined"); + } + // On success, `res` does not include the null-terminating character + DCHECK_EQ(value.data()[res], 0); + value.resize(res); + return value; #else char* c_str = getenv(name.data()); if (c_str == nullptr) { - return Status::KeyError("environment variable '", name, "'undefined"); + return Status::KeyError("environment variable '", name, "' undefined"); } return std::string(c_str); #endif @@ -1782,18 +1792,25 @@ Result GetEnvVar(std::string_view name) { #ifdef _WIN32 Result GetEnvVarNative(std::string_view name) { - NativePathString w_name; - constexpr int32_t bufsize = 2000; - wchar_t w_str[bufsize]; + ARROW_ASSIGN_OR_RAISE(std::wstring w_name, StringToNative(name)); + std::wstring value(100, '\0'); - ARROW_ASSIGN_OR_RAISE(w_name, StringToNative(name)); - auto res = GetEnvironmentVariableW(w_name.c_str(), w_str, bufsize); - if (res >= bufsize) { - return Status::CapacityError("environment variable value too long"); - } else if (res == 0) { - return Status::KeyError("environment variable '", name, "'undefined"); + uint32_t res = GetEnvironmentVariableW(w_name.data(), value.data(), + static_cast(value.size())); + if (res >= value.size()) { + // Value buffer too small, need to upsize + // (`res` includes the null-terminating character in this case) + value.resize(res); + res = GetEnvironmentVariableW(w_name.data(), value.data(), + static_cast(value.size())); + } + if (res == 0) { + return Status::KeyError("environment variable '", name, "' undefined"); } - return NativePathString(w_str); + // On success, `res` does not include the null-terminating character + DCHECK_EQ(value.data()[res], 0); + value.resize(res); + return value; } #else @@ -1804,6 +1821,18 @@ Result GetEnvVarNative(std::string_view name) { #endif +Result GetEnvVarInteger(std::string_view name, std::optional min_value, + std::optional max_value) { + ARROW_ASSIGN_OR_RAISE(auto env_string, GetEnvVar(name)); + int64_t value; + if (!ParseValue(env_string.data(), env_string.length(), &value) || + (min_value.has_value() && value < *min_value) || + (max_value.has_value() && value > *max_value)) { + return Status::Invalid("Invalid value for ", name, ": '", env_string, "'"); + } + return value; +} + Status SetEnvVar(std::string_view name, std::string_view value) { #ifdef _WIN32 if (SetEnvironmentVariableA(name.data(), value.data())) { diff --git a/cpp/src/arrow/util/io_util.h b/cpp/src/arrow/util/io_util.h index 56bd4eff3d66..fa53c0dc67a6 100644 --- a/cpp/src/arrow/util/io_util.h +++ b/cpp/src/arrow/util/io_util.h @@ -244,6 +244,12 @@ ARROW_EXPORT Result GetEnvVar(std::string_view name); ARROW_EXPORT Result GetEnvVarNative(std::string_view name); +// Returns KeyError if the environment variable doesn't exist, +// Invalid if it's not a valid integer in the given range. +ARROW_EXPORT +Result GetEnvVarInteger(std::string_view name, + std::optional min_value = {}, + std::optional max_value = {}); ARROW_EXPORT Status SetEnvVar(std::string_view name, std::string_view value); diff --git a/cpp/src/arrow/util/io_util_test.cc b/cpp/src/arrow/util/io_util_test.cc index de8458dc1171..44188b3f2ee9 100644 --- a/cpp/src/arrow/util/io_util_test.cc +++ b/cpp/src/arrow/util/io_util_test.cc @@ -1134,5 +1134,44 @@ TEST(CpuAffinity, NumberOfCores) { #endif } +TEST(Environment, GetEnvVar) { + // An environment variable that should exist on roughly all platforms + ASSERT_OK_AND_ASSIGN(auto v, GetEnvVar("PATH")); + ASSERT_FALSE(v.empty()); + ASSERT_OK_AND_ASSIGN(auto w, GetEnvVarNative("PATH")); + ASSERT_FALSE(w.empty()); + // An environment variable that most probably does not exist + ASSERT_RAISES(KeyError, GetEnvVar("BZZT_NONEXISTENT_VAR")); + ASSERT_RAISES(KeyError, GetEnvVarNative("BZZT_NONEXISTENT_VAR")); + // (we try not to rely on EnvVarGuard here as that would be circular) +} + +TEST(Environment, GetEnvVarInteger) { + { + EnvVarGuard guard("FOOBAR", "5"); + ASSERT_OK_AND_EQ(5, GetEnvVarInteger("FOOBAR")); + ASSERT_OK_AND_EQ(5, GetEnvVarInteger("FOOBAR", /*min_value=*/5, /*max_value=*/7)); + ASSERT_RAISES(Invalid, GetEnvVarInteger("FOOBAR", /*min_value=*/6, /*max_value=*/7)); + ASSERT_RAISES(Invalid, GetEnvVarInteger("FOOBAR", /*min_value=*/3, /*max_value=*/4)); + } + { + EnvVarGuard guard("FOOBAR", "BAZ"); + ASSERT_RAISES(Invalid, GetEnvVarInteger("FOOBAR")); + } + { + EnvVarGuard guard("FOOBAR", std::nullopt); + ASSERT_RAISES(KeyError, GetEnvVarInteger("FOOBAR")); + } +} + +TEST(Environment, SetEnvVar) { + EnvVarGuard guard("FOOBAR", "one"); + ASSERT_OK_AND_EQ("one", GetEnvVar("FOOBAR")); + ASSERT_OK(SetEnvVar("FOOBAR", "two")); + ASSERT_OK_AND_EQ("two", GetEnvVar("FOOBAR")); + ASSERT_OK(DelEnvVar("FOOBAR")); + ASSERT_RAISES(KeyError, GetEnvVar("FOOBAR")); +} + } // namespace internal } // namespace arrow diff --git a/docs/source/cpp/env_vars.rst b/docs/source/cpp/env_vars.rst index 20df98c5eccf..6ee6993e2ba7 100644 --- a/docs/source/cpp/env_vars.rst +++ b/docs/source/cpp/env_vars.rst @@ -87,6 +87,18 @@ that changing their value later will have an effect. ``libhdfs.dylib`` on macOS, ``libhdfs.so`` on other platforms). Alternatively, one can set :envvar:`HADOOP_HOME`. +.. envvar:: ARROW_REGISTER_ATFORK + + **Experimental**. An integer value to enable or disable the registration + of at-fork handlers. These are enabled by default or explicitly using the + value "1"; use "0" to disable. + + If enabled, at-fork handlers make Arrow C++ compatible with the use of the + ``fork()`` system call, such as by Python's :python:mod:`multiprocessing`, + but at the expense of executing + `potentially unsafe code `__ + in a forked child process if the parent process is multi-threaded. + .. envvar:: ARROW_S3_LOG_LEVEL Controls the verbosity of logging produced by S3 calls. Defaults to ``FATAL`` diff --git a/python/pyarrow/tests/test_misc.py b/python/pyarrow/tests/test_misc.py index 64f45d8bed85..d6a2fe6a2765 100644 --- a/python/pyarrow/tests/test_misc.py +++ b/python/pyarrow/tests/test_misc.py @@ -80,8 +80,7 @@ def run_with_env_var(env_var): for v in ('-1', 'z'): out, err = run_with_env_var(v) assert out.strip() == '8' # default value - assert ("ARROW_IO_THREADS does not contain a valid number of threads" - in err.strip()) + assert "Invalid value for ARROW_IO_THREADS" in err.strip() def test_build_info():