Skip to content

Commit 3c49f41

Browse files
author
Rafał Hibner
committed
log throttle
1 parent 5726a8f commit 3c49f41

1 file changed

Lines changed: 19 additions & 0 deletions

File tree

cpp/src/arrow/dataset/dataset_writer.cc

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,20 @@ struct DatasetWriterState {
103103

104104
bool StagingFull() const { return staged_rows_count.load() >= max_rows_staged; }
105105

106+
struct debugOp{
107+
char origin{'-'};
108+
char op{'-'};
109+
uint64_t val=0;
110+
std::shared_ptr<RecordBatch> batch;
111+
};
112+
int idx{0};
113+
std::array<debugOp,128> debugLog_;
114+
115+
void debugLog(debugOp op){
116+
debugLog_[idx++]=op;
117+
idx%=128;
118+
}
119+
106120
// Throttle for how many rows the dataset writer will allow to be in process memory
107121
// When this is exceeded the dataset writer will pause / apply backpressure
108122
Throttle rows_in_flight_throttle;
@@ -246,6 +260,7 @@ class DatasetWriterFileQueue
246260
[self = shared_from_this(), batch = std::move(next)]() {
247261
int64_t rows_to_release = batch->num_rows();
248262
Status status = self->writer_->Write(batch);
263+
self->writer_state_->debugLog({'n','R',rows_to_release,batch});
249264
self->writer_state_->rows_in_flight_throttle.Release(rows_to_release);
250265
return status;
251266
}));
@@ -679,6 +694,8 @@ class DatasetWriter::DatasetWriterImpl {
679694
}
680695
continue;
681696
}
697+
698+
writer_state_->debugLog({'d','A',next_chunk->num_rows(),batch});
682699
backpressure =
683700
writer_state_->rows_in_flight_throttle.Acquire(next_chunk->num_rows());
684701
if (!backpressure.is_finished()) {
@@ -689,6 +706,7 @@ class DatasetWriter::DatasetWriterImpl {
689706
backpressure = writer_state_->open_files_throttle.Acquire(1);
690707
if (!backpressure.is_finished()) {
691708
EVENT_ON_CURRENT_SPAN("DatasetWriter::Backpressure::TooManyOpenFiles");
709+
writer_state_->debugLog({'d','R',next_chunk->num_rows(),batch});
692710
writer_state_->rows_in_flight_throttle.Release(next_chunk->num_rows());
693711
RETURN_NOT_OK(TryCloseLargestFile());
694712
break;
@@ -701,6 +719,7 @@ class DatasetWriter::DatasetWriterImpl {
701719
//
702720
// `open_files_throttle` will be handed by `DatasetWriterDirectoryQueue`
703721
// so we don't need to release it here.
722+
writer_state_->debugLog({'e','R',next_chunk->num_rows(),batch});
704723
writer_state_->rows_in_flight_throttle.Release(next_chunk->num_rows());
705724
return s;
706725
}

0 commit comments

Comments
 (0)