Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions cpp/src/arrow/csv/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ class CSVWriterImpl : public ipc::RecordBatchWriter {
for (auto maybe_slice : iterator) {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> slice, maybe_slice);
RETURN_NOT_OK(TranslateMinimalBatch(*slice));
RETURN_NOT_OK(sink_->Write(data_buffer_));
RETURN_NOT_OK(WriteAndClearBuffer());
stats_.num_record_batches++;
}
return Status::OK();
Expand All @@ -554,7 +554,7 @@ class CSVWriterImpl : public ipc::RecordBatchWriter {
RETURN_NOT_OK(reader.ReadNext(&batch));
while (batch != nullptr) {
RETURN_NOT_OK(TranslateMinimalBatch(*batch));
RETURN_NOT_OK(sink_->Write(data_buffer_));
RETURN_NOT_OK(WriteAndClearBuffer());
RETURN_NOT_OK(reader.ReadNext(&batch));
stats_.num_record_batches++;
}
Expand Down Expand Up @@ -590,6 +590,13 @@ class CSVWriterImpl : public ipc::RecordBatchWriter {
return Status::OK();
}

// GH-36889: Write buffer to sink and clear it to avoid stale content
// being written again if the next batch is empty.
Status WriteAndClearBuffer() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: perhaps renaming to FlushToSink which is shorter and has the implication that the internal buffer is flushed and cleared.

RETURN_NOT_OK(sink_->Write(data_buffer_));
return data_buffer_->Resize(0, /*shrink_to_fit=*/false);
}

int64_t CalculateHeaderSize(QuotingStyle quoting_style) const {
int64_t header_length = 0;
for (int col = 0; col < schema_->num_fields(); col++) {
Expand Down Expand Up @@ -654,7 +661,7 @@ class CSVWriterImpl : public ipc::RecordBatchWriter {
next += options_.eol.size();
DCHECK_EQ(reinterpret_cast<uint8_t*>(next),
data_buffer_->data() + data_buffer_->size());
return sink_->Write(data_buffer_);
return WriteAndClearBuffer();
}

Status TranslateMinimalBatch(const RecordBatch& batch) {
Expand Down
42 changes: 42 additions & 0 deletions cpp/src/arrow/csv/writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "arrow/ipc/writer.h"
#include "arrow/record_batch.h"
#include "arrow/result.h"
#include "arrow/table.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/matchers.h"
#include "arrow/type.h"
Expand Down Expand Up @@ -405,5 +406,46 @@ INSTANTIATE_TEST_SUITE_P(
"\n2016-02-29 10:42:23-0700,2016-02-29 17:42:23Z\n")));
#endif

// GH-36889: Empty batches at the start should not cause duplicate headers
TEST(TestWriteCSV, EmptyBatchAtStart) {
auto schema = arrow::schema({field("col1", utf8())});
auto empty_batch = RecordBatchFromJSON(schema, "[]");
auto data_batch = RecordBatchFromJSON(schema, R"([{"col1": "a"}, {"col1": "b"}])");

// Concatenate empty table with data table
ASSERT_OK_AND_ASSIGN(auto empty_table, Table::FromRecordBatches(schema, {empty_batch}));
ASSERT_OK_AND_ASSIGN(auto data_table, Table::FromRecordBatches(schema, {data_batch}));
ASSERT_OK_AND_ASSIGN(auto combined_table, ConcatenateTables({empty_table, data_table}));

ASSERT_OK_AND_ASSIGN(auto out, io::BufferOutputStream::Create());
ASSERT_OK(WriteCSV(*combined_table, WriteOptions::Defaults(), out.get()));
ASSERT_OK_AND_ASSIGN(auto buffer, out->Finish());

std::string result(reinterpret_cast<const char*>(buffer->data()), buffer->size());
// Should have exactly one header, not two
EXPECT_EQ(result, "\"col1\"\n\"a\"\n\"b\"\n");
}

// GH-36889: Empty batches in the middle should not cause issues
TEST(TestWriteCSV, EmptyBatchInMiddle) {
auto schema = arrow::schema({field("col1", utf8())});
auto batch1 = RecordBatchFromJSON(schema, R"([{"col1": "a"}])");
auto empty_batch = RecordBatchFromJSON(schema, "[]");
auto batch2 = RecordBatchFromJSON(schema, R"([{"col1": "b"}])");

ASSERT_OK_AND_ASSIGN(auto table1, Table::FromRecordBatches(schema, {batch1}));
ASSERT_OK_AND_ASSIGN(auto empty_table, Table::FromRecordBatches(schema, {empty_batch}));
ASSERT_OK_AND_ASSIGN(auto table2, Table::FromRecordBatches(schema, {batch2}));
ASSERT_OK_AND_ASSIGN(auto combined_table,
ConcatenateTables({table1, empty_table, table2}));

ASSERT_OK_AND_ASSIGN(auto out, io::BufferOutputStream::Create());
ASSERT_OK(WriteCSV(*combined_table, WriteOptions::Defaults(), out.get()));
ASSERT_OK_AND_ASSIGN(auto buffer, out->Finish());

std::string result(reinterpret_cast<const char*>(buffer->data()), buffer->size());
EXPECT_EQ(result, "\"col1\"\n\"a\"\n\"b\"\n");
}
Comment on lines +409 to +448
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// GH-36889: Empty batches at the start should not cause duplicate headers
TEST(TestWriteCSV, EmptyBatchAtStart) {
auto schema = arrow::schema({field("col1", utf8())});
auto empty_batch = RecordBatchFromJSON(schema, "[]");
auto data_batch = RecordBatchFromJSON(schema, R"([{"col1": "a"}, {"col1": "b"}])");
// Concatenate empty table with data table
ASSERT_OK_AND_ASSIGN(auto empty_table, Table::FromRecordBatches(schema, {empty_batch}));
ASSERT_OK_AND_ASSIGN(auto data_table, Table::FromRecordBatches(schema, {data_batch}));
ASSERT_OK_AND_ASSIGN(auto combined_table, ConcatenateTables({empty_table, data_table}));
ASSERT_OK_AND_ASSIGN(auto out, io::BufferOutputStream::Create());
ASSERT_OK(WriteCSV(*combined_table, WriteOptions::Defaults(), out.get()));
ASSERT_OK_AND_ASSIGN(auto buffer, out->Finish());
std::string result(reinterpret_cast<const char*>(buffer->data()), buffer->size());
// Should have exactly one header, not two
EXPECT_EQ(result, "\"col1\"\n\"a\"\n\"b\"\n");
}
// GH-36889: Empty batches in the middle should not cause issues
TEST(TestWriteCSV, EmptyBatchInMiddle) {
auto schema = arrow::schema({field("col1", utf8())});
auto batch1 = RecordBatchFromJSON(schema, R"([{"col1": "a"}])");
auto empty_batch = RecordBatchFromJSON(schema, "[]");
auto batch2 = RecordBatchFromJSON(schema, R"([{"col1": "b"}])");
ASSERT_OK_AND_ASSIGN(auto table1, Table::FromRecordBatches(schema, {batch1}));
ASSERT_OK_AND_ASSIGN(auto empty_table, Table::FromRecordBatches(schema, {empty_batch}));
ASSERT_OK_AND_ASSIGN(auto table2, Table::FromRecordBatches(schema, {batch2}));
ASSERT_OK_AND_ASSIGN(auto combined_table,
ConcatenateTables({table1, empty_table, table2}));
ASSERT_OK_AND_ASSIGN(auto out, io::BufferOutputStream::Create());
ASSERT_OK(WriteCSV(*combined_table, WriteOptions::Defaults(), out.get()));
ASSERT_OK_AND_ASSIGN(auto buffer, out->Finish());
std::string result(reinterpret_cast<const char*>(buffer->data()), buffer->size());
EXPECT_EQ(result, "\"col1\"\n\"a\"\n\"b\"\n");
}
TEST(TestWriteCSV, EmptyBatchShouldNotPolluteOutput) {
auto schema = arrow::schema({field("col1", utf8())});
auto empty_batch = RecordBatchFromJSON(schema, "[]");
auto batch_a = RecordBatchFromJSON(schema, R"([{"col1": "a"}])");
auto batch_b = RecordBatchFromJSON(schema, R"([{"col1": "b"}])");
struct TestParam {
std::shared_ptr<Table> table;
std::string expected_output;
};
std::vector<TestParam> test_params = {
// Empty batch in the beginning
{Table::FromRecordBatches(schema, {empty_batch, batch_a, batch_b}).ValueOrDie(),
"\"col1\"\n\"a\"\n\"b\"\n"},
// Empty batch in the middle
{Table::FromRecordBatches(schema, {batch_a, empty_batch, batch_b}).ValueOrDie(),
"\"col1\"\n\"a\"\n\"b\"\n"},
// Empty batch in the end
{Table::FromRecordBatches(schema, {batch_a, batch_b, empty_batch}).ValueOrDie(),
"\"col1\"\n\"a\"\n\"b\"\n"},
};
for (const auto& param : test_params) {
ASSERT_OK_AND_ASSIGN(auto out, io::BufferOutputStream::Create());
ASSERT_OK(WriteCSV(*param.table, WriteOptions::Defaults(), out.get()));
ASSERT_OK_AND_ASSIGN(auto buffer, out->Finish());
EXPECT_EQ(buffer->ToString(), param.expected_output);
}
}

These two cases are so similar so I'd recommend to combine them with different params like the above.


} // namespace csv
} // namespace arrow
33 changes: 33 additions & 0 deletions python/pyarrow/tests/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -2065,3 +2065,36 @@ def readinto(self, *args):
for i in range(20):
with pytest.raises(pa.ArrowInvalid):
read_csv(MyBytesIO(data))


def test_write_csv_empty_batch_no_duplicate_header():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not an expert in pyarrow but I'd recommend to consolidate similar cases.

# GH-36889: Empty batches at the start should not cause duplicate headers
table = pa.table({"col1": ["a", "b", "c"]})

# Concatenate empty table with data table
empty_table = table.schema.empty_table()
combined = pa.concat_tables([empty_table, table])

buf = io.BytesIO()
write_csv(combined, buf)
buf.seek(0)
result = buf.read()

# Should have exactly one header, not two
assert result == b'"col1"\n"a"\n"b"\n"c"\n'


def test_write_csv_empty_batch_in_middle():
# GH-36889: Empty batches in the middle should not cause issues
table1 = pa.table({"col1": ["a"]})
table2 = pa.table({"col1": ["b"]})
empty_table = table1.schema.empty_table()

combined = pa.concat_tables([table1, empty_table, table2])

buf = io.BytesIO()
write_csv(combined, buf)
buf.seek(0)
result = buf.read()

assert result == b'"col1"\n"a"\n"b"\n'
Loading