Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add prefetch support to InvertedIndexFooterReader for reduced I/O time #5146

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/index/src/inverted_index/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ pub enum Error {
location: Location,
},

#[snafu(display("Invalid footer payload size"))]
InvalidFooterPayloadSize {
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Unexpected inverted index footer payload size, max: {max_payload_size}, actual: {actual_payload_size}"))]
UnexpectedFooterPayloadSize {
max_payload_size: u64,
Expand Down Expand Up @@ -229,7 +235,8 @@ impl ErrorExt for Error {
| KeysApplierUnexpectedPredicates { .. }
| CommonIo { .. }
| UnknownIntermediateCodecMagic { .. }
| FstCompile { .. } => StatusCode::Unexpected,
| FstCompile { .. }
| InvalidFooterPayloadSize { .. } => StatusCode::Unexpected,

ParseRegex { .. }
| ParseDFA { .. }
Expand Down
6 changes: 4 additions & 2 deletions src/index/src/inverted_index/format/reader/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ use common_base::range_read::RangeReader;
use greptime_proto::v1::index::InvertedIndexMetas;
use snafu::{ensure, ResultExt};

use super::footer::DEFAULT_PREFETCH_SIZE;
use crate::inverted_index::error::{CommonIoSnafu, Result, UnexpectedBlobSizeSnafu};
use crate::inverted_index::format::reader::footer::InvertedIndeFooterReader;
use crate::inverted_index::format::reader::footer::InvertedIndexFooterReader;
use crate::inverted_index::format::reader::InvertedIndexReader;
use crate::inverted_index::format::MIN_BLOB_SIZE;

Expand Down Expand Up @@ -72,7 +73,8 @@ impl<R: RangeReader> InvertedIndexReader for InvertedIndexBlobReader<R> {
let blob_size = metadata.content_length;
Self::validate_blob_size(blob_size)?;

let mut footer_reader = InvertedIndeFooterReader::new(&mut self.source, blob_size);
let mut footer_reader = InvertedIndexFooterReader::new(&mut self.source, blob_size)
.with_prefetch_size(DEFAULT_PREFETCH_SIZE);
footer_reader.metadata().await.map(Arc::new)
}
}
Expand Down
132 changes: 90 additions & 42 deletions src/index/src/inverted_index/format/reader/footer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,53 +18,83 @@ use prost::Message;
use snafu::{ensure, ResultExt};

use crate::inverted_index::error::{
CommonIoSnafu, DecodeProtoSnafu, Result, UnexpectedFooterPayloadSizeSnafu,
UnexpectedOffsetSizeSnafu, UnexpectedZeroSegmentRowCountSnafu,
CommonIoSnafu, DecodeProtoSnafu, InvalidFooterPayloadSizeSnafu, Result,
UnexpectedFooterPayloadSizeSnafu, UnexpectedOffsetSizeSnafu,
UnexpectedZeroSegmentRowCountSnafu,
};
use crate::inverted_index::format::FOOTER_PAYLOAD_SIZE_SIZE;

/// InvertedIndeFooterReader is for reading the footer section of the blob.
pub struct InvertedIndeFooterReader<R> {
pub const DEFAULT_PREFETCH_SIZE: u64 = 1024; // 1KiB

/// InvertedIndexFooterReader is for reading the footer section of the blob.
pub struct InvertedIndexFooterReader<R> {
source: R,
blob_size: u64,
prefetch_size: Option<u64>,
}

impl<R> InvertedIndeFooterReader<R> {
impl<R> InvertedIndexFooterReader<R> {
pub fn new(source: R, blob_size: u64) -> Self {
Self { source, blob_size }
Self {
source,
blob_size,
prefetch_size: None,
}
}
}

impl<R: RangeReader> InvertedIndeFooterReader<R> {
pub async fn metadata(&mut self) -> Result<InvertedIndexMetas> {
let payload_size = self.read_payload_size().await?;
let metas = self.read_payload(payload_size).await?;
Ok(metas)
/// Set the prefetch size for the footer reader.
pub fn with_prefetch_size(mut self, prefetch_size: u64) -> Self {
self.prefetch_size = Some(prefetch_size.max(FOOTER_PAYLOAD_SIZE_SIZE));
self
}

pub fn prefetch_size(&self) -> u64 {
self.prefetch_size.unwrap_or(FOOTER_PAYLOAD_SIZE_SIZE)
}
}

async fn read_payload_size(&mut self) -> Result<u64> {
let mut size_buf = [0u8; FOOTER_PAYLOAD_SIZE_SIZE as usize];
let end = self.blob_size;
let start = end - FOOTER_PAYLOAD_SIZE_SIZE;
self.source
.read_into(start..end, &mut &mut size_buf[..])
impl<R: RangeReader> InvertedIndexFooterReader<R> {
pub async fn metadata(&mut self) -> Result<InvertedIndexMetas> {
let footer_start = self.blob_size.saturating_sub(self.prefetch_size());
let suffix = self
.source
.read(footer_start..self.blob_size)
.await
.context(CommonIoSnafu)?;
let suffix_len = suffix.len();
let length = u32::from_le_bytes(Self::read_tailing_four_bytes(&suffix)?) as u64;
self.validate_payload_size(length)?;

let footer_size = FOOTER_PAYLOAD_SIZE_SIZE;

// Did not fetch the entire file metadata in the initial read, need to make a second request.
if length > suffix_len as u64 - footer_size {
let metadata_start = self.blob_size - length - footer_size;
let meta = self
.source
.read(metadata_start..self.blob_size - footer_size)
.await
.context(CommonIoSnafu)?;
self.parse_payload(&meta, length)
} else {
let metadata_start = self.blob_size - length - footer_size - footer_start;
let meta = &suffix[metadata_start as usize..suffix_len - footer_size as usize];
self.parse_payload(meta, length)
}
}

let payload_size = u32::from_le_bytes(size_buf) as u64;
self.validate_payload_size(payload_size)?;
fn read_tailing_four_bytes(suffix: &[u8]) -> Result<[u8; 4]> {
let suffix_len = suffix.len();
ensure!(suffix_len >= 4, InvalidFooterPayloadSizeSnafu);
let mut bytes = [0; 4];
bytes.copy_from_slice(&suffix[suffix_len - 4..suffix_len]);

Ok(payload_size)
Ok(bytes)
}

async fn read_payload(&mut self, payload_size: u64) -> Result<InvertedIndexMetas> {
let end = self.blob_size - FOOTER_PAYLOAD_SIZE_SIZE;
let start = end - payload_size;
let bytes = self.source.read(start..end).await.context(CommonIoSnafu)?;

let metas = InvertedIndexMetas::decode(&*bytes).context(DecodeProtoSnafu)?;
fn parse_payload(&mut self, bytes: &[u8], payload_size: u64) -> Result<InvertedIndexMetas> {
let metas = InvertedIndexMetas::decode(bytes).context(DecodeProtoSnafu)?;
self.validate_metas(&metas, payload_size)?;

Ok(metas)
}

Expand Down Expand Up @@ -113,9 +143,12 @@ impl<R: RangeReader> InvertedIndeFooterReader<R> {

#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;

use prost::Message;

use super::*;
use crate::inverted_index::error::Error;

fn create_test_payload(meta: InvertedIndexMeta) -> Vec<u8> {
let mut metas = InvertedIndexMetas {
Expand All @@ -141,14 +174,18 @@ mod tests {

let mut payload_buf = create_test_payload(meta);
let blob_size = payload_buf.len() as u64;
let mut reader = InvertedIndeFooterReader::new(&mut payload_buf, blob_size);

let payload_size = reader.read_payload_size().await.unwrap();
let metas = reader.read_payload(payload_size).await.unwrap();
for prefetch in [0, blob_size / 2, blob_size, blob_size + 10] {
let mut reader = InvertedIndexFooterReader::new(&mut payload_buf, blob_size);
if prefetch > 0 {
reader = reader.with_prefetch_size(prefetch);
}

assert_eq!(metas.metas.len(), 1);
let index_meta = &metas.metas.get("test").unwrap();
assert_eq!(index_meta.name, "test");
let metas = reader.metadata().await.unwrap();
assert_eq!(metas.metas.len(), 1);
let index_meta = &metas.metas.get("test").unwrap();
assert_eq!(index_meta.name, "test");
}
}

#[tokio::test]
Expand All @@ -157,14 +194,20 @@ mod tests {
name: "test".to_string(),
..Default::default()
};

let mut payload_buf = create_test_payload(meta);
payload_buf.push(0xff); // Add an extra byte to corrupt the footer
let blob_size = payload_buf.len() as u64;
let mut reader = InvertedIndeFooterReader::new(&mut payload_buf, blob_size);

let payload_size_result = reader.read_payload_size().await;
assert!(payload_size_result.is_err());
for prefetch in [0, blob_size / 2, blob_size, blob_size + 10] {
let blob_size = payload_buf.len() as u64;
let mut reader = InvertedIndexFooterReader::new(&mut payload_buf, blob_size);
if prefetch > 0 {
reader = reader.with_prefetch_size(prefetch);
}

let result = reader.metadata().await;
assert_matches!(result, Err(Error::UnexpectedFooterPayloadSize { .. }));
}
}

#[tokio::test]
Expand All @@ -178,10 +221,15 @@ mod tests {

let mut payload_buf = create_test_payload(meta);
let blob_size = payload_buf.len() as u64;
let mut reader = InvertedIndeFooterReader::new(&mut payload_buf, blob_size);

let payload_size = reader.read_payload_size().await.unwrap();
let payload_result = reader.read_payload(payload_size).await;
assert!(payload_result.is_err());
for prefetch in [0, blob_size / 2, blob_size, blob_size + 10] {
let mut reader = InvertedIndexFooterReader::new(&mut payload_buf, blob_size);
if prefetch > 0 {
reader = reader.with_prefetch_size(prefetch);
}

let result = reader.metadata().await;
assert_matches!(result, Err(Error::UnexpectedOffsetSize { .. }));
}
}
}
1 change: 1 addition & 0 deletions src/index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#![feature(iter_partition_in_place)]
#![feature(assert_matches)]

pub mod fulltext_index;
pub mod inverted_index;
Loading