Skip to content

Commit 48ff9aa

Browse files
committed
chunk read_at
1 parent e764a7a commit 48ff9aa

File tree

1 file changed

+37
-7
lines changed

1 file changed

+37
-7
lines changed

vortex-io/src/object_store.rs

+37-7
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ impl ObjectStoreReadAt {
3636
}
3737
}
3838

39+
/// Blocking reads to read at most this size before yielding.
40+
const FILE_CHUNK_SIZE: u64 = 2 << 20; // 1MB
41+
3942
impl VortexReadAt for ObjectStoreReadAt {
4043
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(size = range.end - range.start)))]
4144
async fn read_byte_range(
@@ -65,14 +68,41 @@ impl VortexReadAt for ObjectStoreReadAt {
6568
.await?;
6669

6770
let buffer = match response.payload {
68-
GetResultPayload::File(file, _) => {
71+
GetResultPayload::File(mut file, _) => {
72+
let mut bytes_read = 0u64;
6973
unsafe { buffer.set_len(len) };
70-
tokio::task::spawn_blocking(move || {
71-
file.read_exact_at(&mut buffer, range.start)?;
72-
Ok::<_, io::Error>(buffer)
73-
})
74-
.await
75-
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))??
74+
75+
while bytes_read < len as u64 {
76+
let current_chunk_end =
77+
(len as u64).min(range.start + bytes_read + FILE_CHUNK_SIZE);
78+
let (read, b, f) = tokio::task::spawn_blocking(move || {
79+
let (Ok(bytes_read), Ok(current_chunk_end)) =
80+
(bytes_read.try_into(), current_chunk_end.try_into())
81+
else {
82+
return Err(io::Error::new(
83+
io::ErrorKind::FileTooLarge,
84+
"can not read from u64 offset",
85+
));
86+
};
87+
let slice = &mut buffer[bytes_read..current_chunk_end];
88+
let read = file.read_at(slice, range.start + bytes_read as u64)?;
89+
Ok::<_, io::Error>((read, buffer, file))
90+
})
91+
.await
92+
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))??;
93+
94+
if read == 0 {
95+
return Err(io::Error::new(
96+
io::ErrorKind::UnexpectedEof,
97+
"premature eof",
98+
));
99+
}
100+
101+
bytes_read += read as u64;
102+
buffer = b;
103+
file = f;
104+
}
105+
buffer
76106
}
77107
GetResultPayload::Stream(mut byte_stream) => {
78108
while let Some(bytes) = byte_stream.next().await {

0 commit comments

Comments
 (0)