Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 209 additions & 10 deletions stacker/src/expull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use common::serialize_vint_u32;
use crate::fastcpy::fast_short_slice_copy;
use crate::{Addr, MemoryArena};

const FIRST_BLOCK_NUM: u16 = 2;
const FIRST_BLOCK_NUM: u32 = 2;

/// An exponential unrolled link.
///
Expand Down Expand Up @@ -33,8 +33,8 @@ pub struct ExpUnrolledLinkedList {
// u16, since the max size of each block is (1<<next_cap_pow_2)
// Limited to 15, so we don't overflow remaining_cap.
remaining_cap: u16,
// To get the current number of blocks: block_num - FIRST_BLOCK_NUM
block_num: u16,
// Tracks the number of blocks allocated: block_num - FIRST_BLOCK_NUM
block_num: u32,
head: Addr,
tail: Addr,
}
Expand Down Expand Up @@ -110,16 +110,27 @@ impl ExpUnrolledLinkedListWriter<'_> {
}
}

// The block size is 2^block_num + 2, but max 2^15= 32k
// Initial size is 8, for the first block => block_num == 1
// The block size is 2^block_num, but max 2^15 = 32KB
// Initial size is 8 bytes (2^3), for the first block => block_num == 2
// Block size caps at 32KB (2^15) regardless of how high block_num goes
#[inline]
fn get_block_size(block_num: u16) -> u16 {
1 << block_num.min(15)
fn get_block_size(block_num: u32) -> u16 {
// Cap at 15 to prevent block sizes > 32KB
// block_num can now be much larger than 15, but block size maxes out
let exp = block_num.min(15) as u32;
(1u32 << exp) as u16
}

impl ExpUnrolledLinkedList {
#[inline(always)]
pub fn increment_num_blocks(&mut self) {
self.block_num += 1;
// Add overflow check as a safety measure
// With u32, we can handle up to ~4 billion blocks before overflow
// At 32KB per block (max size), that's 128 TB of data
self.block_num = self
.block_num
.checked_add(1)
.expect("ExpUnrolledLinkedList block count overflow - exceeded 4 billion blocks");
}

#[inline]
Expand All @@ -132,9 +143,26 @@ impl ExpUnrolledLinkedList {
if addr.is_null() {
return;
}
let last_block_len = get_block_size(self.block_num) as usize - self.remaining_cap as usize;

// Full Blocks
// Calculate last block length with bounds checking to prevent underflow
let block_size = get_block_size(self.block_num) as usize;
let last_block_len = block_size.saturating_sub(self.remaining_cap as usize);

// Safety check: if remaining_cap > block_size, the metadata is corrupted
assert!(
self.remaining_cap as usize <= block_size,
"ExpUnrolledLinkedList metadata corruption detected: remaining_cap ({}) > block_size \
({}). This indicates a serious bug, please report! (block_num={}, head={:?}, \
tail={:?})",
self.remaining_cap,
block_size,
self.block_num,
self.head,
self.tail
);

// Full Blocks (iterate through all blocks except the last one)
// Note: Blocks are numbered starting from FIRST_BLOCK_NUM+1 (=3) after first allocation
for block_num in FIRST_BLOCK_NUM + 1..self.block_num {
let cap = get_block_size(block_num) as usize;
let data = arena.slice(addr, cap);
Expand Down Expand Up @@ -259,6 +287,177 @@ mod tests {
assert_eq!(&vec1[..], &res1[..]);
assert_eq!(&vec2[..], &res2[..]);
}

// Tests for u32 block_num fix (issue with large arrays)

#[test]
fn test_block_num_exceeds_u16_max() {
// Test that we can handle more than 65,535 blocks (old u16 limit)
let mut eull = ExpUnrolledLinkedList::default();

// Simulate allocating 70,000 blocks (exceeds u16::MAX of 65,535)
for _ in 0..70_000 {
eull.increment_num_blocks();
}

// Verify block_num is correct
assert_eq!(eull.block_num, FIRST_BLOCK_NUM + 70_000);

// Verify we can still get block size (should be capped at 32KB)
let block_size = get_block_size(eull.block_num);
assert_eq!(block_size, 1 << 15); // 32KB max
}

#[test]
fn test_large_dataset_simulation() {
// Simulate the scenario: large arrays requiring many blocks
// We write enough data to require thousands of blocks
let mut arena = MemoryArena::default();
let mut eull = ExpUnrolledLinkedList::default();

// Write 100 MB of data (this will require ~3,200 blocks at 32KB each)
// This is enough to validate the system works with large datasets
// but not so much that the test is slow
let bytes_per_write = 10_000;
let num_writes = 10_000; // 10k * 10k = 100 MB

let data: Vec<u8> = (0..bytes_per_write).map(|i| (i % 256) as u8).collect();
for _ in 0..num_writes {
eull.writer(&mut arena).extend_from_slice(&data);
}

// Verify we allocated many blocks (should be in the thousands)
assert!(
eull.block_num > 1000,
"block_num ({}) should be > 1000 for this much data",
eull.block_num
);

// Verify we can read back correctly
let mut buffer = Vec::new();
eull.read_to_end(&arena, &mut buffer);
assert_eq!(buffer.len(), bytes_per_write * num_writes);

// Verify data integrity on a sample
for i in 0..bytes_per_write {
assert_eq!(buffer[i], (i % 256) as u8);
}
}

#[test]
fn test_get_block_size_with_large_block_num() {
// Test that get_block_size handles large u32 values correctly

// Small block numbers (under 15)
assert_eq!(get_block_size(2), 4); // 2^2 = 4
assert_eq!(get_block_size(3), 8); // 2^3 = 8
assert_eq!(get_block_size(10), 1024); // 2^10 = 1KB

// At the cap (15)
assert_eq!(get_block_size(15), 32768); // 2^15 = 32KB

// Beyond the cap (should stay at 32KB)
assert_eq!(get_block_size(16), 32768);
assert_eq!(get_block_size(100), 32768);
assert_eq!(get_block_size(65_536), 32768); // Old u16::MAX + 1
assert_eq!(get_block_size(100_000), 32768);
assert_eq!(get_block_size(1_000_000), 32768);
}

#[test]
fn test_increment_blocks_near_u16_boundary() {
// Test incrementing around the old u16::MAX boundary
let mut eull = ExpUnrolledLinkedList::default();

// Set to just before old limit
for _ in 0..65_533 {
eull.increment_num_blocks();
}
assert_eq!(eull.block_num, FIRST_BLOCK_NUM + 65_533);

// Cross the old u16::MAX boundary (this would have overflowed before)
eull.increment_num_blocks(); // 65,534
eull.increment_num_blocks(); // 65,535 (old max)
eull.increment_num_blocks(); // 65,536 (would overflow u16)
eull.increment_num_blocks(); // 65,537

// Verify we're past the old limit
assert_eq!(eull.block_num, FIRST_BLOCK_NUM + 65_537);
}

#[test]
fn test_write_and_read_with_many_blocks() {
// Test that write/read works correctly with many blocks
let mut arena = MemoryArena::default();
let mut eull = ExpUnrolledLinkedList::default();

// Write data that will span many blocks
let test_data: Vec<u8> = (0..50_000).map(|i| (i % 256) as u8).collect();
eull.writer(&mut arena).extend_from_slice(&test_data);

// Read it back
let mut buffer = Vec::new();
eull.read_to_end(&arena, &mut buffer);

// Verify data integrity
assert_eq!(buffer.len(), test_data.len());
assert_eq!(&buffer[..], &test_data[..]);
}

#[test]
fn test_multiple_eull_with_large_block_counts() {
// Test multiple ExpUnrolledLinkedLists with high block counts
// (simulates parallel columnar writes)
let mut arena = MemoryArena::default();
let mut eull1 = ExpUnrolledLinkedList::default();
let mut eull2 = ExpUnrolledLinkedList::default();

// Write different data to each
for i in 0..10_000u32 {
eull1.writer(&mut arena).write_u32_vint(i);
eull2.writer(&mut arena).write_u32_vint(i * 2);
}

// Read back and verify
let mut buf1 = Vec::new();
let mut buf2 = Vec::new();
eull1.read_to_end(&arena, &mut buf1);
eull2.read_to_end(&arena, &mut buf2);

// Deserialize and check
let mut cursor1 = &buf1[..];
let mut cursor2 = &buf2[..];
for i in 0..10_000u32 {
assert_eq!(read_u32_vint(&mut cursor1), i);
assert_eq!(read_u32_vint(&mut cursor2), i * 2);
}
}

#[test]
fn test_block_size_stays_capped() {
// Verify that even with massive block numbers, size stays at 32KB
let mut eull = ExpUnrolledLinkedList::default();

// Increment to a very large number
for _ in 0..200_000 {
eull.increment_num_blocks();
}

let block_size = get_block_size(eull.block_num);
assert_eq!(block_size, 32768, "Block size should be capped at 32KB");
}

#[test]
#[should_panic(expected = "ExpUnrolledLinkedList block count overflow")]
fn test_increment_overflow_protection() {
// Test that we panic gracefully if we somehow hit u32::MAX
// This is extremely unlikely in practice (would require 128TB of data)
let mut eull = ExpUnrolledLinkedList::default();
eull.block_num = u32::MAX;

// This should panic with our custom error message
eull.increment_num_blocks();
}
}

#[cfg(all(test, feature = "unstable"))]
Expand Down
Loading