diff --git a/cpp/src/arrow/dataset/dataset_writer.cc b/cpp/src/arrow/dataset/dataset_writer.cc index 43895374aa7..7845f488219 100644 --- a/cpp/src/arrow/dataset/dataset_writer.cc +++ b/cpp/src/arrow/dataset/dataset_writer.cc @@ -131,28 +131,38 @@ Result> OpenWriter( {write_options.filesystem, filename}); } -class DatasetWriterFileQueue { +class DatasetWriterFileQueue + : public std::enable_shared_from_this { public: explicit DatasetWriterFileQueue(const std::shared_ptr& schema, const FileSystemDatasetWriteOptions& options, - DatasetWriterState* writer_state) - : options_(options), schema_(schema), writer_state_(writer_state) {} + std::shared_ptr writer_state) + : options_(options), schema_(schema), writer_state_(std::move(writer_state)) {} - void Start(util::AsyncTaskScheduler* file_tasks, const std::string& filename) { - file_tasks_ = file_tasks; + void Start(std::unique_ptr file_tasks, + std::string filename) { + file_tasks_ = std::move(file_tasks); // Because the scheduler runs one task at a time we know the writer will // be opened before any attempt to write file_tasks_->AddSimpleTask( - [this, filename] { - Executor* io_executor = options_.filesystem->io_context().executor(); - return DeferNotOk(io_executor->Submit([this, filename]() { - ARROW_ASSIGN_OR_RAISE(writer_, OpenWriter(options_, schema_, filename)); + [self = shared_from_this(), filename = std::move(filename)] { + Executor* io_executor = self->options_.filesystem->io_context().executor(); + return DeferNotOk(io_executor->Submit([self, filename = std::move(filename)]() { + ARROW_ASSIGN_OR_RAISE(self->writer_, + OpenWriter(self->options_, self->schema_, filename)); return Status::OK(); })); }, "DatasetWriter::OpenWriter"sv); } + void Abort() { + // The scheduler may be keeping this object alive through shared_ptr references + // in async closures. Make sure we break any reference cycles by losing our + // reference to the scheduler. + file_tasks_.reset(); + } + Result> PopStagedBatch() { std::vector> batches_to_write; uint64_t num_rows = 0; @@ -184,7 +194,7 @@ class DatasetWriterFileQueue { void ScheduleBatch(std::shared_ptr batch) { file_tasks_->AddSimpleTask( - [self = this, batch = std::move(batch)]() { + [self = shared_from_this(), batch = std::move(batch)]() { return self->WriteNext(std::move(batch)); }, "DatasetWriter::WriteBatch"sv); @@ -217,13 +227,18 @@ class DatasetWriterFileQueue { Status Finish() { writer_state_->staged_rows_count -= rows_currently_staged_; while (!staged_batches_.empty()) { - RETURN_NOT_OK(PopAndDeliverStagedBatch()); + auto st = PopAndDeliverStagedBatch().status(); + if (!st.ok()) { + file_tasks_.reset(); + return st; + } } // At this point all write tasks have been added. Because the scheduler // is a 1-task FIFO we know this task will run at the very end and can // add it now. - file_tasks_->AddSimpleTask([this] { return DoFinish(); }, + file_tasks_->AddSimpleTask([self = shared_from_this()] { return self->DoFinish(); }, "DatasetWriter::FinishFile"sv); + file_tasks_.reset(); return Status::OK(); } @@ -231,7 +246,7 @@ class DatasetWriterFileQueue { Future<> WriteNext(std::shared_ptr next) { // May want to prototype / measure someday pushing the async write down further return DeferNotOk(options_.filesystem->io_context().executor()->Submit( - [self = this, batch = std::move(next)]() { + [self = shared_from_this(), batch = std::move(next)]() { int64_t rows_to_release = batch->num_rows(); Status status = self->writer_->Write(batch); self->writer_state_->rows_in_flight_throttle.Release(rows_to_release); @@ -244,21 +259,22 @@ class DatasetWriterFileQueue { std::lock_guard lg(writer_state_->visitors_mutex); RETURN_NOT_OK(options_.writer_pre_finish(writer_.get())); } - return writer_->Finish().Then([this]() { - std::lock_guard lg(writer_state_->visitors_mutex); - return options_.writer_post_finish(writer_.get()); - }); + return writer_->Finish().Then( + [self = shared_from_this(), writer_post_finish = options_.writer_post_finish]() { + std::lock_guard lg(self->writer_state_->visitors_mutex); + return writer_post_finish(self->writer_.get()); + }); } const FileSystemDatasetWriteOptions& options_; const std::shared_ptr& schema_; - DatasetWriterState* writer_state_; + std::shared_ptr writer_state_; std::shared_ptr writer_; // Batches are accumulated here until they are large enough to write out at which // point they are merged together and added to write_queue_ std::deque> staged_batches_; uint64_t rows_currently_staged_ = 0; - util::AsyncTaskScheduler* file_tasks_ = nullptr; + std::unique_ptr file_tasks_; }; struct WriteTask { @@ -266,18 +282,25 @@ struct WriteTask { uint64_t num_rows; }; -class DatasetWriterDirectoryQueue { +class DatasetWriterDirectoryQueue + : public std::enable_shared_from_this { public: DatasetWriterDirectoryQueue(util::AsyncTaskScheduler* scheduler, std::string directory, std::string prefix, std::shared_ptr schema, const FileSystemDatasetWriteOptions& write_options, - DatasetWriterState* writer_state) + std::shared_ptr writer_state) : scheduler_(std::move(scheduler)), directory_(std::move(directory)), prefix_(std::move(prefix)), schema_(std::move(schema)), write_options_(write_options), - writer_state_(writer_state) {} + writer_state_(std::move(writer_state)) {} + + ~DatasetWriterDirectoryQueue() { + if (latest_open_file_) { + latest_open_file_->Abort(); + } + } Result> NextWritableChunk( std::shared_ptr batch, std::shared_ptr* remainder, @@ -330,32 +353,27 @@ class DatasetWriterDirectoryQueue { Status FinishCurrentFile() { if (latest_open_file_) { - ARROW_RETURN_NOT_OK(latest_open_file_->Finish()); - latest_open_file_tasks_.reset(); - latest_open_file_ = nullptr; + auto file = std::move(latest_open_file_); + ARROW_RETURN_NOT_OK(file->Finish()); } rows_written_ = 0; return GetNextFilename().Value(¤t_filename_); } Status OpenFileQueue(const std::string& filename) { - auto file_queue = - std::make_unique(schema_, write_options_, writer_state_); - latest_open_file_ = file_queue.get(); - // Create a dedicated throttle for write jobs to this file and keep it alive until we - // are finished and have closed the file. - auto file_finish_task = [this, file_queue = std::move(file_queue)] { - writer_state_->open_files_throttle.Release(1); + latest_open_file_.reset( + new DatasetWriterFileQueue(schema_, write_options_, writer_state_)); + auto file_finish_task = [self = shared_from_this()] { + self->writer_state_->open_files_throttle.Release(1); return Status::OK(); }; - latest_open_file_tasks_ = util::MakeThrottledAsyncTaskGroup( - scheduler_, 1, /*queue=*/nullptr, std::move(file_finish_task)); + auto file_tasks = util::MakeThrottledAsyncTaskGroup(scheduler_, 1, /*queue=*/nullptr, + std::move(file_finish_task)); if (init_future_.is_valid()) { - latest_open_file_tasks_->AddSimpleTask( - [init_future = init_future_]() { return init_future; }, - "DatasetWriter::WaitForDirectoryInit"sv); + file_tasks->AddSimpleTask([init_future = init_future_]() { return init_future; }, + "DatasetWriter::WaitForDirectoryInit"sv); } - latest_open_file_->Start(latest_open_file_tasks_.get(), filename); + latest_open_file_->Start(std::move(file_tasks), filename); return Status::OK(); } @@ -398,14 +416,14 @@ class DatasetWriterDirectoryQueue { "DatasetWriter::InitializeDirectory"sv); } - static Result> Make( + static Result> Make( util::AsyncTaskScheduler* scheduler, const FileSystemDatasetWriteOptions& write_options, - DatasetWriterState* writer_state, std::shared_ptr schema, + std::shared_ptr writer_state, std::shared_ptr schema, std::string directory, std::string prefix) { - auto dir_queue = std::make_unique( + auto dir_queue = std::make_shared( scheduler, std::move(directory), std::move(prefix), std::move(schema), - write_options, writer_state); + write_options, std::move(writer_state)); dir_queue->PrepareDirectory(); ARROW_ASSIGN_OR_RAISE(dir_queue->current_filename_, dir_queue->GetNextFilename()); return dir_queue; @@ -413,26 +431,31 @@ class DatasetWriterDirectoryQueue { Status Finish() { if (latest_open_file_) { - ARROW_RETURN_NOT_OK(latest_open_file_->Finish()); - latest_open_file_tasks_.reset(); - latest_open_file_ = nullptr; + auto file = std::move(latest_open_file_); + ARROW_RETURN_NOT_OK(file->Finish()); } used_filenames_.clear(); return Status::OK(); } + void Abort() { + if (latest_open_file_) { + latest_open_file_->Abort(); + latest_open_file_.reset(); + } + } + private: util::AsyncTaskScheduler* scheduler_ = nullptr; std::string directory_; std::string prefix_; std::shared_ptr schema_; const FileSystemDatasetWriteOptions& write_options_; - DatasetWriterState* writer_state_; + std::shared_ptr writer_state_; Future<> init_future_; std::string current_filename_; std::unordered_set used_filenames_; - DatasetWriterFileQueue* latest_open_file_ = nullptr; - std::unique_ptr latest_open_file_tasks_; + std::shared_ptr latest_open_file_; uint64_t rows_written_ = 0; uint32_t file_counter_ = 0; }; @@ -520,11 +543,26 @@ class DatasetWriter::DatasetWriterImpl { return Status::OK(); })), write_options_(std::move(write_options)), - writer_state_(max_rows_queued, write_options_.max_open_files, - CalculateMaxRowsStaged(max_rows_queued)), + writer_state_(std::make_shared( + max_rows_queued, write_options_.max_open_files, + CalculateMaxRowsStaged(max_rows_queued))), pause_callback_(std::move(pause_callback)), resume_callback_(std::move(resume_callback)) {} + ~DatasetWriterImpl() { + // In case something went wrong (e.g. an IO error occurred), some tasks + // may be left dangling in a ThrottledAsyncTaskScheduler and that may + // lead to memory leaks via shared_ptr reference cycles (this can show up + // in some unit tests under Valgrind). + // To prevent this, explicitly break reference cycles at DatasetWriter + // destruction. + // The alternative is to use weak_from_this() thoroughly in async callbacks, + // but that makes for less readable code. + for (const auto& directory_queue : directory_queues_) { + directory_queue.second->Abort(); + } + } + Future<> WriteAndCheckBackpressure(std::shared_ptr batch, const std::string& directory, const std::string& prefix) { @@ -592,8 +630,10 @@ class DatasetWriter::DatasetWriterImpl { "DatasetWriter::FinishAll"sv); // Reset write_tasks_ to signal that we are done adding tasks, this will allow // us to invoke the finish callback once the tasks wrap up. - std::lock_guard lg(mutex_); - write_tasks_.reset(); + { + std::lock_guard lg(mutex_); + write_tasks_.reset(); + } } protected: @@ -621,7 +661,7 @@ class DatasetWriter::DatasetWriterImpl { &directory_queues_, directory + prefix, [this, &batch, &directory, &prefix](const std::string& key) { return DatasetWriterDirectoryQueue::Make(scheduler_, write_options_, - &writer_state_, batch->schema(), + writer_state_, batch->schema(), directory, prefix); })); std::shared_ptr dir_queue = dir_queue_itr->second; @@ -643,16 +683,16 @@ class DatasetWriter::DatasetWriterImpl { continue; } backpressure = - writer_state_.rows_in_flight_throttle.Acquire(next_chunk->num_rows()); + writer_state_->rows_in_flight_throttle.Acquire(next_chunk->num_rows()); if (!backpressure.is_finished()) { EVENT_ON_CURRENT_SPAN("DatasetWriter::Backpressure::TooManyRowsQueued"); break; } if (will_open_file) { - backpressure = writer_state_.open_files_throttle.Acquire(1); + backpressure = writer_state_->open_files_throttle.Acquire(1); if (!backpressure.is_finished()) { EVENT_ON_CURRENT_SPAN("DatasetWriter::Backpressure::TooManyOpenFiles"); - writer_state_.rows_in_flight_throttle.Release(next_chunk->num_rows()); + writer_state_->rows_in_flight_throttle.Release(next_chunk->num_rows()); RETURN_NOT_OK(TryCloseLargestFile()); break; } @@ -664,7 +704,7 @@ class DatasetWriter::DatasetWriterImpl { // // `open_files_throttle` will be handed by `DatasetWriterDirectoryQueue` // so we don't need to release it here. - writer_state_.rows_in_flight_throttle.Release(next_chunk->num_rows()); + writer_state_->rows_in_flight_throttle.Release(next_chunk->num_rows()); return s; } batch = std::move(remainder); @@ -685,7 +725,7 @@ class DatasetWriter::DatasetWriterImpl { std::unique_ptr write_tasks_; Future<> finish_fut_ = Future<>::Make(); FileSystemDatasetWriteOptions write_options_; - DatasetWriterState writer_state_; + std::shared_ptr writer_state_; std::function pause_callback_; std::function resume_callback_; // Map from directory + prefix to the queue for that directory diff --git a/cpp/src/arrow/util/async_util.cc b/cpp/src/arrow/util/async_util.cc index 46825c35da0..f8b979a3f56 100644 --- a/cpp/src/arrow/util/async_util.cc +++ b/cpp/src/arrow/util/async_util.cc @@ -316,15 +316,11 @@ class ThrottledAsyncTaskSchedulerImpl #endif queue_->Push(std::move(task)); lk.unlock(); - maybe_backoff->AddCallback( - [weak_self = std::weak_ptr( - shared_from_this())](const Status& st) { - if (st.ok()) { - if (auto self = weak_self.lock()) { - self->ContinueTasks(); - } - } - }); + maybe_backoff->AddCallback([weak_self = weak_from_this()](const Status& st) { + if (auto self = weak_self.lock(); self && st.ok()) { + self->ContinueTasks(); + } + }); return true; } else { lk.unlock(); @@ -350,8 +346,9 @@ class ThrottledAsyncTaskSchedulerImpl self = shared_from_this()]() mutable -> Result> { ARROW_ASSIGN_OR_RAISE(Future<> inner_fut, (*inner_task)()); if (!inner_fut.TryAddCallback([&] { - return [latched_cost, self = std::move(self)](const Status& st) -> void { - if (st.ok()) { + return [latched_cost, + weak_self = self->weak_from_this()](const Status& st) -> void { + if (auto self = weak_self.lock(); self && st.ok()) { self->throttle_->Release(latched_cost); self->ContinueTasks(); } @@ -360,6 +357,7 @@ class ThrottledAsyncTaskSchedulerImpl // If the task is already finished then don't run ContinueTasks // if we are already running it so we can avoid stack overflow self->throttle_->Release(latched_cost); + inner_task.reset(); if (!in_continue) { self->ContinueTasks(); } @@ -377,8 +375,8 @@ class ThrottledAsyncTaskSchedulerImpl if (maybe_backoff) { lk.unlock(); if (!maybe_backoff->TryAddCallback([&] { - return [self = shared_from_this()](const Status& st) { - if (st.ok()) { + return [weak_self = weak_from_this()](const Status& st) { + if (auto self = weak_self.lock(); self && st.ok()) { self->ContinueTasks(); } };