Skip to content

Commit

Permalink
feat: add prefetch support to PuffinFileFooterReader for reduced I/…
Browse files Browse the repository at this point in the history
…O time (#5145)

* feat: introduce `PuffinFileFooterReader`

* refactor: remove `SyncReader` trait and impl

* refactor: replace `FooterParser` with `PuffinFileFooterReader`

* chore: remove unused errors
  • Loading branch information
WenyXu authored Dec 12, 2024
1 parent a801214 commit e2a41cc
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 523 deletions.
11 changes: 1 addition & 10 deletions src/index/src/inverted_index/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,6 @@ use crate::inverted_index::search::predicate::Predicate;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Failed to seek"))]
Seek {
#[snafu(source)]
error: IoError,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to read"))]
Read {
#[snafu(source)]
Expand Down Expand Up @@ -215,8 +207,7 @@ impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
use Error::*;
match self {
Seek { .. }
| Read { .. }
Read { .. }
| Write { .. }
| Flush { .. }
| Close { .. }
Expand Down
52 changes: 8 additions & 44 deletions src/puffin/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,6 @@ use snafu::{Location, Snafu};
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Failed to seek"))]
Seek {
#[snafu(source)]
error: IoError,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to read"))]
Read {
#[snafu(source)]
Expand Down Expand Up @@ -119,14 +111,6 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to convert bytes to integer"))]
BytesToInteger {
#[snafu(source)]
error: std::array::TryFromSliceError,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Unsupported decompression: {}", decompression))]
UnsupportedDecompression {
decompression: String,
Expand All @@ -150,17 +134,15 @@ pub enum Error {
location: Location,
},

#[snafu(display("Parse stage not match, expected: {}, actual: {}", expected, actual))]
ParseStageNotMatch {
expected: String,
actual: String,
#[snafu(display("Unexpected footer payload size: {}", size))]
UnexpectedFooterPayloadSize {
size: i32,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Unexpected footer payload size: {}", size))]
UnexpectedFooterPayloadSize {
size: i32,
#[snafu(display("Invalid puffin footer"))]
InvalidPuffinFooter {
#[snafu(implicit)]
location: Location,
},
Expand All @@ -177,20 +159,6 @@ pub enum Error {
location: Location,
},

#[snafu(display("Invalid blob offset: {}, location: {:?}", offset, location))]
InvalidBlobOffset {
offset: i64,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Invalid blob area end: {}, location: {:?}", offset, location))]
InvalidBlobAreaEnd {
offset: u64,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to compress lz4"))]
Lz4Compression {
#[snafu(source)]
Expand Down Expand Up @@ -262,8 +230,7 @@ impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
use Error::*;
match self {
Seek { .. }
| Read { .. }
Read { .. }
| MagicNotMatched { .. }
| DeserializeJson { .. }
| Write { .. }
Expand All @@ -275,18 +242,15 @@ impl ErrorExt for Error {
| Remove { .. }
| Rename { .. }
| SerializeJson { .. }
| BytesToInteger { .. }
| ParseStageNotMatch { .. }
| UnexpectedFooterPayloadSize { .. }
| UnexpectedPuffinFileSize { .. }
| InvalidBlobOffset { .. }
| InvalidBlobAreaEnd { .. }
| Lz4Compression { .. }
| Lz4Decompression { .. }
| BlobNotFound { .. }
| BlobIndexOutOfBound { .. }
| FileKeyNotMatch { .. }
| WalkDir { .. } => StatusCode::Unexpected,
| WalkDir { .. }
| InvalidPuffinFooter { .. } => StatusCode::Unexpected,

UnsupportedCompression { .. } | UnsupportedDecompression { .. } => {
StatusCode::Unsupported
Expand Down
14 changes: 1 addition & 13 deletions src/puffin/src/file_format/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,9 @@ use common_base::range_read::RangeReader;
use crate::blob_metadata::BlobMetadata;
use crate::error::Result;
pub use crate::file_format::reader::file::PuffinFileReader;
pub use crate::file_format::reader::footer::PuffinFileFooterReader;
use crate::file_metadata::FileMetadata;

/// `SyncReader` defines a synchronous reader for puffin data.
pub trait SyncReader<'a> {
type Reader: std::io::Read + std::io::Seek;

/// Fetches the FileMetadata.
fn metadata(&'a mut self) -> Result<FileMetadata>;

/// Reads particular blob data based on given metadata.
///
/// Data read from the reader is compressed leaving the caller to decompress the data.
fn blob_reader(&'a mut self, blob_metadata: &BlobMetadata) -> Result<Self::Reader>;
}

/// `AsyncReader` defines an asynchronous reader for puffin data.
#[async_trait]
pub trait AsyncReader<'a> {
Expand Down
73 changes: 7 additions & 66 deletions src/puffin/src/file_format/reader/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::io::{self, SeekFrom};

use async_trait::async_trait;
use common_base::range_read::RangeReader;
use snafu::{ensure, ResultExt};

use crate::blob_metadata::BlobMetadata;
use crate::error::{
MagicNotMatchedSnafu, ReadSnafu, Result, SeekSnafu, UnexpectedPuffinFileSizeSnafu,
UnsupportedDecompressionSnafu,
};
use crate::file_format::reader::footer::FooterParser;
use crate::file_format::reader::{AsyncReader, SyncReader};
use crate::file_format::{MAGIC, MAGIC_SIZE, MIN_FILE_SIZE};
use crate::error::{ReadSnafu, Result, UnexpectedPuffinFileSizeSnafu};
use crate::file_format::reader::footer::DEFAULT_PREFETCH_SIZE;
use crate::file_format::reader::{AsyncReader, PuffinFileFooterReader};
use crate::file_format::MIN_FILE_SIZE;
use crate::file_metadata::FileMetadata;
use crate::partial_reader::PartialReader;

Expand Down Expand Up @@ -72,45 +67,6 @@ impl<R> PuffinFileReader<R> {
}
}

impl<'a, R: io::Read + io::Seek + 'a> SyncReader<'a> for PuffinFileReader<R> {
type Reader = PartialReader<&'a mut R>;

fn metadata(&mut self) -> Result<FileMetadata> {
if let Some(metadata) = &self.metadata {
return Ok(metadata.clone());
}

// check the magic
let mut magic = [0; MAGIC_SIZE as usize];
self.source.read_exact(&mut magic).context(ReadSnafu)?;
ensure!(magic == MAGIC, MagicNotMatchedSnafu);

let file_size = self.get_file_size_sync()?;

// parse the footer
let metadata = FooterParser::new(&mut self.source, file_size).parse_sync()?;
self.metadata = Some(metadata.clone());
Ok(metadata)
}

fn blob_reader(&'a mut self, blob_metadata: &BlobMetadata) -> Result<Self::Reader> {
// TODO(zhongzc): support decompression
let compression = blob_metadata.compression_codec.as_ref();
ensure!(
compression.is_none(),
UnsupportedDecompressionSnafu {
decompression: compression.unwrap().to_string()
}
);

Ok(PartialReader::new(
&mut self.source,
blob_metadata.offset as _,
blob_metadata.length as _,
))
}
}

#[async_trait]
impl<'a, R: RangeReader + 'a> AsyncReader<'a> for PuffinFileReader<R> {
type Reader = PartialReader<&'a mut R>;
Expand All @@ -119,17 +75,10 @@ impl<'a, R: RangeReader + 'a> AsyncReader<'a> for PuffinFileReader<R> {
if let Some(metadata) = &self.metadata {
return Ok(metadata.clone());
}

// check the magic
let magic = self.source.read(0..MAGIC_SIZE).await.context(ReadSnafu)?;
ensure!(*magic == MAGIC, MagicNotMatchedSnafu);

let file_size = self.get_file_size_async().await?;

// parse the footer
let metadata = FooterParser::new(&mut self.source, file_size)
.parse_async()
.await?;
let mut reader = PuffinFileFooterReader::new(&mut self.source, file_size)
.with_prefetch_size(DEFAULT_PREFETCH_SIZE);
let metadata = reader.metadata().await?;
self.metadata = Some(metadata.clone());
Ok(metadata)
}
Expand All @@ -143,14 +92,6 @@ impl<'a, R: RangeReader + 'a> AsyncReader<'a> for PuffinFileReader<R> {
}
}

impl<R: io::Read + io::Seek> PuffinFileReader<R> {
fn get_file_size_sync(&mut self) -> Result<u64> {
let file_size = self.source.seek(SeekFrom::End(0)).context(SeekSnafu)?;
Self::validate_file_size(file_size)?;
Ok(file_size)
}
}

impl<R: RangeReader> PuffinFileReader<R> {
async fn get_file_size_async(&mut self) -> Result<u64> {
let file_size = self
Expand Down
Loading

0 comments on commit e2a41cc

Please sign in to comment.