Skip to content

Commit 174853d

Browse files
debugging data loss in parquet conversion
1 parent 64a945f commit 174853d

File tree

2 files changed

+94
-9
lines changed

2 files changed

+94
-9
lines changed

src/staging/reader.rs

+87-9
Original file line numberDiff line numberDiff line change
@@ -84,15 +84,37 @@ pub struct MergedReverseRecordReader {
8484
impl MergedReverseRecordReader {
8585
pub fn try_new(files: &[PathBuf]) -> Self {
8686
let mut readers = Vec::with_capacity(files.len());
87-
for file in files {
88-
let Ok(reader) = get_reverse_reader(File::open(file).unwrap()) else {
89-
error!("Invalid file detected, ignoring it: {:?}", file);
90-
continue;
91-
};
9287

93-
readers.push(reader);
88+
for file in files {
89+
match File::open(file) {
90+
Ok(file) => {
91+
// Create two readers - one for counting and one for keeping
92+
match (get_reverse_reader(file.try_clone().unwrap()), get_reverse_reader(file.try_clone().unwrap())) {
93+
(Ok(count_reader), Ok(keep_reader)) => {
94+
// Count records from the first reader
95+
let count: usize = count_reader
96+
.flat_map(|r| r.ok())
97+
.map(|b| b.num_rows()).count();
98+
99+
println!("File {:?} has {} records", file, count);
100+
101+
// Keep the second reader for actual processing
102+
readers.push(keep_reader);
103+
}
104+
_ => {
105+
error!("Invalid file detected, ignoring it: {:?}", file);
106+
continue;
107+
}
108+
}
109+
}
110+
Err(e) => {
111+
error!("Failed to open file {:?}: {}", file, e);
112+
continue;
113+
}
114+
}
94115
}
95-
116+
117+
println!("Total valid readers: {}", readers.len());
96118
Self { readers }
97119
}
98120

@@ -248,18 +270,30 @@ pub fn get_reverse_reader<T: Read + Seek>(
248270
) -> Result<StreamReader<BufReader<OffsetReader<T>>>, io::Error> {
249271
let mut offset = 0;
250272
let mut messages = Vec::new();
273+
let mut record_count = 0;
251274

252275
while let Some(res) = find_limit_and_type(&mut reader).transpose() {
253276
match res {
254277
Ok((header, size)) => {
278+
// Log message type and size
279+
// println!("Message type: {:?}, size: {}", header, size);
255280
messages.push((header, offset, size));
281+
if header == MessageHeader::RecordBatch {
282+
record_count += 1;
283+
}
256284
offset += size;
257285
}
258-
Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => break,
286+
Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => {
287+
println!("Reached EOF after {} record batches", record_count);
288+
break;
289+
}
259290
Err(err) => return Err(err),
260291
}
261292
}
262293

294+
println!("Total record batches found: {}", record_count);
295+
println!("Total messages found: {}", messages.len());
296+
263297
// reverse everything leaving the first because it has schema message.
264298
messages[1..].reverse();
265299
let messages = messages
@@ -273,14 +307,58 @@ pub fn get_reverse_reader<T: Read + Seek>(
273307
Ok(StreamReader::try_new(BufReader::new(OffsetReader::new(reader, messages)), None).unwrap())
274308
}
275309

276-
// return limit for
277310
fn find_limit_and_type(
278311
reader: &mut (impl Read + Seek),
279312
) -> Result<Option<(MessageHeader, usize)>, io::Error> {
280313
let mut size = 0;
281314
let marker = reader.read_u32::<LittleEndian>()?;
282315
size += 4;
283316

317+
if marker != 0xFFFFFFFF {
318+
println!("Invalid marker: {:x}", marker);
319+
return Err(io::Error::new(
320+
io::ErrorKind::InvalidData,
321+
format!("Invalid Continuation Marker: {:x}", marker),
322+
));
323+
}
324+
325+
let metadata_size = reader.read_u32::<LittleEndian>()? as usize;
326+
size += 4;
327+
328+
if metadata_size == 0x00000000 {
329+
println!("Found end of stream (metadata_size = 0)");
330+
return Ok(None);
331+
}
332+
333+
// println!("Metadata size: {}", metadata_size);
334+
335+
let mut message = vec![0u8; metadata_size];
336+
reader.read_exact(&mut message)?;
337+
size += metadata_size;
338+
339+
let message = unsafe { root_as_message_unchecked(&message) };
340+
let header = message.header_type();
341+
let message_size = message.bodyLength();
342+
343+
// println!("Message header: {:?}, body length: {}", header, message_size);
344+
345+
size += message_size as usize;
346+
347+
let padding = (8 - (size % 8)) % 8;
348+
reader.seek(SeekFrom::Current(padding as i64 + message_size))?;
349+
size += padding;
350+
351+
Ok(Some((header, size)))
352+
}
353+
354+
// return limit for
355+
fn find_limit_and_type1(
356+
reader: &mut (impl Read + Seek),
357+
) -> Result<Option<(MessageHeader, usize)>, io::Error> {
358+
let mut size = 0;
359+
let marker = reader.read_u32::<LittleEndian>()?;
360+
size += 4;
361+
284362
if marker != 0xFFFFFFFF {
285363
return Err(io::Error::new(
286364
io::ErrorKind::InvalidData,

src/staging/streams.rs

+7
Original file line numberDiff line numberDiff line change
@@ -406,9 +406,12 @@ impl<'a> Stream<'a> {
406406
}
407407

408408
let record_reader = MergedReverseRecordReader::try_new(&arrow_files);
409+
println!("Number of valid readers: {}", record_reader.readers.len());
410+
409411
if record_reader.readers.is_empty() {
410412
continue;
411413
}
414+
412415
let merged_schema = record_reader.merged_schema();
413416

414417
let props = parquet_writer_props(
@@ -428,9 +431,13 @@ impl<'a> Stream<'a> {
428431
.open(&part_path)
429432
.map_err(|_| StagingError::Create)?;
430433
let mut writer = ArrowWriter::try_new(&mut part_file, schema.clone(), Some(props))?;
434+
let mut input_count = 0;
431435
for ref record in record_reader.merged_iter(schema, time_partition.cloned()) {
436+
let batch_size = record.num_rows();
432437
writer.write(record)?;
438+
input_count += batch_size;
433439
}
440+
println!("Total input count: {}", input_count);
434441
writer.close()?;
435442

436443
if part_file.metadata().unwrap().len() < parquet::file::FOOTER_SIZE as u64 {

0 commit comments

Comments
 (0)