Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 98 additions & 58 deletions cpp/src/arrow/dataset/dataset_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,28 +131,38 @@ Result<std::shared_ptr<FileWriter>> OpenWriter(
{write_options.filesystem, filename});
}

class DatasetWriterFileQueue {
class DatasetWriterFileQueue
: public std::enable_shared_from_this<DatasetWriterFileQueue> {
public:
explicit DatasetWriterFileQueue(const std::shared_ptr<Schema>& schema,
const FileSystemDatasetWriteOptions& options,
DatasetWriterState* writer_state)
: options_(options), schema_(schema), writer_state_(writer_state) {}
std::shared_ptr<DatasetWriterState> writer_state)
: options_(options), schema_(schema), writer_state_(std::move(writer_state)) {}

void Start(util::AsyncTaskScheduler* file_tasks, const std::string& filename) {
file_tasks_ = file_tasks;
void Start(std::unique_ptr<util::ThrottledAsyncTaskScheduler> file_tasks,
std::string filename) {
file_tasks_ = std::move(file_tasks);
// Because the scheduler runs one task at a time we know the writer will
// be opened before any attempt to write
file_tasks_->AddSimpleTask(
[this, filename] {
Executor* io_executor = options_.filesystem->io_context().executor();
return DeferNotOk(io_executor->Submit([this, filename]() {
ARROW_ASSIGN_OR_RAISE(writer_, OpenWriter(options_, schema_, filename));
[self = shared_from_this(), filename = std::move(filename)] {
Executor* io_executor = self->options_.filesystem->io_context().executor();
return DeferNotOk(io_executor->Submit([self, filename = std::move(filename)]() {
ARROW_ASSIGN_OR_RAISE(self->writer_,
OpenWriter(self->options_, self->schema_, filename));
return Status::OK();
}));
},
"DatasetWriter::OpenWriter"sv);
}

void Abort() {
// The scheduler may be keeping this object alive through shared_ptr references
// in async closures. Make sure we break any reference cycles by losing our
// reference to the scheduler.
file_tasks_.reset();
}

Result<std::shared_ptr<RecordBatch>> PopStagedBatch() {
std::vector<std::shared_ptr<RecordBatch>> batches_to_write;
uint64_t num_rows = 0;
Expand Down Expand Up @@ -184,7 +194,7 @@ class DatasetWriterFileQueue {

void ScheduleBatch(std::shared_ptr<RecordBatch> batch) {
file_tasks_->AddSimpleTask(
[self = this, batch = std::move(batch)]() {
[self = shared_from_this(), batch = std::move(batch)]() {
return self->WriteNext(std::move(batch));
},
"DatasetWriter::WriteBatch"sv);
Expand Down Expand Up @@ -217,21 +227,26 @@ class DatasetWriterFileQueue {
Status Finish() {
writer_state_->staged_rows_count -= rows_currently_staged_;
while (!staged_batches_.empty()) {
RETURN_NOT_OK(PopAndDeliverStagedBatch());
auto st = PopAndDeliverStagedBatch().status();
if (!st.ok()) {
file_tasks_.reset();
return st;
}
Comment on lines +230 to +234
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
auto st = PopAndDeliverStagedBatch().status();
if (!st.ok()) {
file_tasks_.reset();
return st;
}
RETURN_NOT_OK_ELSE(PopAndDeliverStagedBatch(), file_tasks_.reset());

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had forgotten about this little-used macro. I honestly find it a bit confusing, I had to look up the definition to ensure I understood the semantics correctly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree the name is confusing. I do believe when given a less confusing name, this can be an actually useful macro. How about we rename it to something like RETURN_NOT_OK_AFTER (in follow-up PR)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I'm not sure RETURN_NOT_OK_AFTER would be much less confusing, honestly.
I wonder if we can add some kind of functional API to Status, for example:

      RETURN_NOT_OK(PopAndDeliverStagedBatch().OrElse([&] { file_tasks_.reset() }));

}
// At this point all write tasks have been added. Because the scheduler
// is a 1-task FIFO we know this task will run at the very end and can
// add it now.
file_tasks_->AddSimpleTask([this] { return DoFinish(); },
file_tasks_->AddSimpleTask([self = shared_from_this()] { return self->DoFinish(); },
"DatasetWriter::FinishFile"sv);
file_tasks_.reset();
return Status::OK();
}

private:
Future<> WriteNext(std::shared_ptr<RecordBatch> next) {
// May want to prototype / measure someday pushing the async write down further
return DeferNotOk(options_.filesystem->io_context().executor()->Submit(
[self = this, batch = std::move(next)]() {
[self = shared_from_this(), batch = std::move(next)]() {
int64_t rows_to_release = batch->num_rows();
Status status = self->writer_->Write(batch);
self->writer_state_->rows_in_flight_throttle.Release(rows_to_release);
Expand All @@ -244,40 +259,48 @@ class DatasetWriterFileQueue {
std::lock_guard<std::mutex> lg(writer_state_->visitors_mutex);
RETURN_NOT_OK(options_.writer_pre_finish(writer_.get()));
}
return writer_->Finish().Then([this]() {
std::lock_guard<std::mutex> lg(writer_state_->visitors_mutex);
return options_.writer_post_finish(writer_.get());
});
return writer_->Finish().Then(
[self = shared_from_this(), writer_post_finish = options_.writer_post_finish]() {
std::lock_guard<std::mutex> lg(self->writer_state_->visitors_mutex);
return writer_post_finish(self->writer_.get());
});
}

const FileSystemDatasetWriteOptions& options_;
const std::shared_ptr<Schema>& schema_;
DatasetWriterState* writer_state_;
std::shared_ptr<DatasetWriterState> writer_state_;
std::shared_ptr<FileWriter> writer_;
// Batches are accumulated here until they are large enough to write out at which
// point they are merged together and added to write_queue_
std::deque<std::shared_ptr<RecordBatch>> staged_batches_;
uint64_t rows_currently_staged_ = 0;
util::AsyncTaskScheduler* file_tasks_ = nullptr;
std::unique_ptr<util::ThrottledAsyncTaskScheduler> file_tasks_;
};

struct WriteTask {
std::string filename;
uint64_t num_rows;
};

class DatasetWriterDirectoryQueue {
class DatasetWriterDirectoryQueue
: public std::enable_shared_from_this<DatasetWriterDirectoryQueue> {
public:
DatasetWriterDirectoryQueue(util::AsyncTaskScheduler* scheduler, std::string directory,
std::string prefix, std::shared_ptr<Schema> schema,
const FileSystemDatasetWriteOptions& write_options,
DatasetWriterState* writer_state)
std::shared_ptr<DatasetWriterState> writer_state)
: scheduler_(std::move(scheduler)),
directory_(std::move(directory)),
prefix_(std::move(prefix)),
schema_(std::move(schema)),
write_options_(write_options),
writer_state_(writer_state) {}
writer_state_(std::move(writer_state)) {}

~DatasetWriterDirectoryQueue() {
if (latest_open_file_) {
latest_open_file_->Abort();
}
}

Result<std::shared_ptr<RecordBatch>> NextWritableChunk(
std::shared_ptr<RecordBatch> batch, std::shared_ptr<RecordBatch>* remainder,
Expand Down Expand Up @@ -330,32 +353,27 @@ class DatasetWriterDirectoryQueue {

Status FinishCurrentFile() {
if (latest_open_file_) {
ARROW_RETURN_NOT_OK(latest_open_file_->Finish());
latest_open_file_tasks_.reset();
latest_open_file_ = nullptr;
auto file = std::move(latest_open_file_);
ARROW_RETURN_NOT_OK(file->Finish());
}
rows_written_ = 0;
return GetNextFilename().Value(&current_filename_);
}

Status OpenFileQueue(const std::string& filename) {
auto file_queue =
std::make_unique<DatasetWriterFileQueue>(schema_, write_options_, writer_state_);
latest_open_file_ = file_queue.get();
// Create a dedicated throttle for write jobs to this file and keep it alive until we
// are finished and have closed the file.
auto file_finish_task = [this, file_queue = std::move(file_queue)] {
writer_state_->open_files_throttle.Release(1);
latest_open_file_.reset(
new DatasetWriterFileQueue(schema_, write_options_, writer_state_));
auto file_finish_task = [self = shared_from_this()] {
self->writer_state_->open_files_throttle.Release(1);
return Status::OK();
};
latest_open_file_tasks_ = util::MakeThrottledAsyncTaskGroup(
scheduler_, 1, /*queue=*/nullptr, std::move(file_finish_task));
auto file_tasks = util::MakeThrottledAsyncTaskGroup(scheduler_, 1, /*queue=*/nullptr,
std::move(file_finish_task));
if (init_future_.is_valid()) {
latest_open_file_tasks_->AddSimpleTask(
[init_future = init_future_]() { return init_future; },
"DatasetWriter::WaitForDirectoryInit"sv);
file_tasks->AddSimpleTask([init_future = init_future_]() { return init_future; },
"DatasetWriter::WaitForDirectoryInit"sv);
}
latest_open_file_->Start(latest_open_file_tasks_.get(), filename);
latest_open_file_->Start(std::move(file_tasks), filename);
return Status::OK();
}

Expand Down Expand Up @@ -398,41 +416,46 @@ class DatasetWriterDirectoryQueue {
"DatasetWriter::InitializeDirectory"sv);
}

static Result<std::unique_ptr<DatasetWriterDirectoryQueue>> Make(
static Result<std::shared_ptr<DatasetWriterDirectoryQueue>> Make(
util::AsyncTaskScheduler* scheduler,
const FileSystemDatasetWriteOptions& write_options,
DatasetWriterState* writer_state, std::shared_ptr<Schema> schema,
std::shared_ptr<DatasetWriterState> writer_state, std::shared_ptr<Schema> schema,
std::string directory, std::string prefix) {
auto dir_queue = std::make_unique<DatasetWriterDirectoryQueue>(
auto dir_queue = std::make_shared<DatasetWriterDirectoryQueue>(
scheduler, std::move(directory), std::move(prefix), std::move(schema),
write_options, writer_state);
write_options, std::move(writer_state));
dir_queue->PrepareDirectory();
ARROW_ASSIGN_OR_RAISE(dir_queue->current_filename_, dir_queue->GetNextFilename());
return dir_queue;
}

Status Finish() {
if (latest_open_file_) {
ARROW_RETURN_NOT_OK(latest_open_file_->Finish());
latest_open_file_tasks_.reset();
latest_open_file_ = nullptr;
auto file = std::move(latest_open_file_);
ARROW_RETURN_NOT_OK(file->Finish());
}
used_filenames_.clear();
return Status::OK();
}

void Abort() {
if (latest_open_file_) {
latest_open_file_->Abort();
latest_open_file_.reset();
}
}

private:
util::AsyncTaskScheduler* scheduler_ = nullptr;
std::string directory_;
std::string prefix_;
std::shared_ptr<Schema> schema_;
const FileSystemDatasetWriteOptions& write_options_;
DatasetWriterState* writer_state_;
std::shared_ptr<DatasetWriterState> writer_state_;
Future<> init_future_;
std::string current_filename_;
std::unordered_set<std::string> used_filenames_;
DatasetWriterFileQueue* latest_open_file_ = nullptr;
std::unique_ptr<util::ThrottledAsyncTaskScheduler> latest_open_file_tasks_;
std::shared_ptr<DatasetWriterFileQueue> latest_open_file_;
uint64_t rows_written_ = 0;
uint32_t file_counter_ = 0;
};
Expand Down Expand Up @@ -520,11 +543,26 @@ class DatasetWriter::DatasetWriterImpl {
return Status::OK();
})),
write_options_(std::move(write_options)),
writer_state_(max_rows_queued, write_options_.max_open_files,
CalculateMaxRowsStaged(max_rows_queued)),
writer_state_(std::make_shared<DatasetWriterState>(
max_rows_queued, write_options_.max_open_files,
CalculateMaxRowsStaged(max_rows_queued))),
pause_callback_(std::move(pause_callback)),
resume_callback_(std::move(resume_callback)) {}

~DatasetWriterImpl() {
// In case something went wrong (e.g. an IO error occurred), some tasks
// may be left dangling in a ThrottledAsyncTaskScheduler and that may
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are such tasks left dangling exactly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly I haven't tried to analyze this very deeply. It might be the file_finish_task actually? In any case, this is necessary to suppress the memory leaks in error cases.

// lead to memory leaks via shared_ptr reference cycles (this can show up
// in some unit tests under Valgrind).
// To prevent this, explicitly break reference cycles at DatasetWriter
// destruction.
// The alternative is to use weak_from_this() thoroughly in async callbacks,
// but that makes for less readable code.
for (const auto& directory_queue : directory_queues_) {
directory_queue.second->Abort();
}
}

Future<> WriteAndCheckBackpressure(std::shared_ptr<RecordBatch> batch,
const std::string& directory,
const std::string& prefix) {
Expand Down Expand Up @@ -592,8 +630,10 @@ class DatasetWriter::DatasetWriterImpl {
"DatasetWriter::FinishAll"sv);
// Reset write_tasks_ to signal that we are done adding tasks, this will allow
// us to invoke the finish callback once the tasks wrap up.
std::lock_guard lg(mutex_);
write_tasks_.reset();
{
std::lock_guard lg(mutex_);
write_tasks_.reset();
}
}

protected:
Expand Down Expand Up @@ -621,7 +661,7 @@ class DatasetWriter::DatasetWriterImpl {
&directory_queues_, directory + prefix,
[this, &batch, &directory, &prefix](const std::string& key) {
return DatasetWriterDirectoryQueue::Make(scheduler_, write_options_,
&writer_state_, batch->schema(),
writer_state_, batch->schema(),
directory, prefix);
}));
std::shared_ptr<DatasetWriterDirectoryQueue> dir_queue = dir_queue_itr->second;
Expand All @@ -643,16 +683,16 @@ class DatasetWriter::DatasetWriterImpl {
continue;
}
backpressure =
writer_state_.rows_in_flight_throttle.Acquire(next_chunk->num_rows());
writer_state_->rows_in_flight_throttle.Acquire(next_chunk->num_rows());
if (!backpressure.is_finished()) {
EVENT_ON_CURRENT_SPAN("DatasetWriter::Backpressure::TooManyRowsQueued");
break;
}
if (will_open_file) {
backpressure = writer_state_.open_files_throttle.Acquire(1);
backpressure = writer_state_->open_files_throttle.Acquire(1);
if (!backpressure.is_finished()) {
EVENT_ON_CURRENT_SPAN("DatasetWriter::Backpressure::TooManyOpenFiles");
writer_state_.rows_in_flight_throttle.Release(next_chunk->num_rows());
writer_state_->rows_in_flight_throttle.Release(next_chunk->num_rows());
RETURN_NOT_OK(TryCloseLargestFile());
break;
}
Expand All @@ -664,7 +704,7 @@ class DatasetWriter::DatasetWriterImpl {
//
// `open_files_throttle` will be handed by `DatasetWriterDirectoryQueue`
// so we don't need to release it here.
writer_state_.rows_in_flight_throttle.Release(next_chunk->num_rows());
writer_state_->rows_in_flight_throttle.Release(next_chunk->num_rows());
return s;
}
batch = std::move(remainder);
Expand All @@ -685,7 +725,7 @@ class DatasetWriter::DatasetWriterImpl {
std::unique_ptr<util::ThrottledAsyncTaskScheduler> write_tasks_;
Future<> finish_fut_ = Future<>::Make();
FileSystemDatasetWriteOptions write_options_;
DatasetWriterState writer_state_;
std::shared_ptr<DatasetWriterState> writer_state_;
std::function<void()> pause_callback_;
std::function<void()> resume_callback_;
// Map from directory + prefix to the queue for that directory
Expand Down
24 changes: 11 additions & 13 deletions cpp/src/arrow/util/async_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -316,15 +316,11 @@ class ThrottledAsyncTaskSchedulerImpl
#endif
queue_->Push(std::move(task));
lk.unlock();
maybe_backoff->AddCallback(
[weak_self = std::weak_ptr<ThrottledAsyncTaskSchedulerImpl>(
shared_from_this())](const Status& st) {
if (st.ok()) {
if (auto self = weak_self.lock()) {
self->ContinueTasks();
}
}
});
maybe_backoff->AddCallback([weak_self = weak_from_this()](const Status& st) {
if (auto self = weak_self.lock(); self && st.ok()) {
self->ContinueTasks();
}
});
return true;
} else {
lk.unlock();
Expand All @@ -350,8 +346,9 @@ class ThrottledAsyncTaskSchedulerImpl
self = shared_from_this()]() mutable -> Result<Future<>> {
ARROW_ASSIGN_OR_RAISE(Future<> inner_fut, (*inner_task)());
if (!inner_fut.TryAddCallback([&] {
return [latched_cost, self = std::move(self)](const Status& st) -> void {
if (st.ok()) {
return [latched_cost,
weak_self = self->weak_from_this()](const Status& st) -> void {
if (auto self = weak_self.lock(); self && st.ok()) {
self->throttle_->Release(latched_cost);
self->ContinueTasks();
}
Expand All @@ -360,6 +357,7 @@ class ThrottledAsyncTaskSchedulerImpl
// If the task is already finished then don't run ContinueTasks
// if we are already running it so we can avoid stack overflow
self->throttle_->Release(latched_cost);
inner_task.reset();
if (!in_continue) {
self->ContinueTasks();
}
Expand All @@ -377,8 +375,8 @@ class ThrottledAsyncTaskSchedulerImpl
if (maybe_backoff) {
lk.unlock();
if (!maybe_backoff->TryAddCallback([&] {
return [self = shared_from_this()](const Status& st) {
if (st.ok()) {
return [weak_self = weak_from_this()](const Status& st) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

if (auto self = weak_self.lock(); self && st.ok()) {
self->ContinueTasks();
}
};
Expand Down
Loading