Skip to content

Commit 05a4b04

Browse files
committed
Fix problem writing sliced BooleanBuffers as fast-encoding format
1 parent 89bfa1e commit 05a4b04

File tree

1 file changed

+38
-3
lines changed
  • native/core/src/execution/shuffle

1 file changed

+38
-3
lines changed

native/core/src/execution/shuffle/codec.rs

+38-3
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ impl<W: Write> BatchWriter<W> {
181181
// be determined from the data buffer size (length is in bits rather than bytes)
182182
self.write_all(&arr.len().to_le_bytes())?;
183183
// write data buffer
184-
self.write_buffer(arr.values().inner())?;
184+
self.write_boolean_buffer(arr.values())?;
185185
// write null buffer
186186
self.write_null_buffer(arr.nulls())?;
187187
}
@@ -300,8 +300,7 @@ impl<W: Write> BatchWriter<W> {
300300
// write null buffer length in bits
301301
self.write_all(&buffer.len().to_le_bytes())?;
302302
// write null buffer
303-
let buffer = buffer.inner();
304-
self.write_buffer(buffer)?;
303+
self.write_boolean_buffer(buffer)?;
305304
} else {
306305
self.inner.write_all(&0_usize.to_le_bytes())?;
307306
}
@@ -315,6 +314,19 @@ impl<W: Write> BatchWriter<W> {
315314
self.inner.write_all(buffer.as_slice())
316315
}
317316

317+
fn write_boolean_buffer(&mut self, buffer: &BooleanBuffer) -> std::io::Result<()> {
318+
let inner_buffer = buffer.inner();
319+
if buffer.offset() == 0 && buffer.len() == inner_buffer.len() {
320+
// Not a sliced buffer, write the inner buffer directly
321+
self.write_buffer(inner_buffer)?;
322+
} else {
323+
// Sliced buffer, create and write the sliced buffer
324+
let buffer = buffer.sliced();
325+
self.write_buffer(&buffer)?;
326+
}
327+
Ok(())
328+
}
329+
318330
pub fn inner(self) -> W {
319331
self.inner
320332
}
@@ -621,6 +633,29 @@ mod test {
621633
assert_eq!(batch, batch2);
622634
}
623635

636+
#[test]
637+
fn roundtrip_sliced() {
638+
let batch = create_batch(8192, true);
639+
640+
let mut start = 0;
641+
let batch_size = 128;
642+
while start < batch.num_rows() {
643+
let end = (start + batch_size).min(batch.num_rows());
644+
let sliced_batch = batch.slice(start, end - start);
645+
let buffer = Vec::new();
646+
let mut writer = BatchWriter::new(buffer);
647+
writer.write_partial_schema(&sliced_batch.schema()).unwrap();
648+
writer.write_batch(&sliced_batch).unwrap();
649+
let buffer = writer.inner();
650+
651+
let mut reader = BatchReader::new(&buffer);
652+
let batch2 = reader.read_batch().unwrap();
653+
assert_eq!(sliced_batch, batch2);
654+
655+
start = end;
656+
}
657+
}
658+
624659
fn create_batch(num_rows: usize, allow_nulls: bool) -> RecordBatch {
625660
let schema = Arc::new(Schema::new(vec![
626661
Field::new("bool", DataType::Boolean, true),

0 commit comments

Comments
 (0)