From 0a6bfbddf515c3532460b697b71fb10e6a8ddb46 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 11 Dec 2024 14:32:25 +0000 Subject: [PATCH 1/3] feat: add prefetch support to `InvertedIndeFooterReader` --- src/index/src/inverted_index/error.rs | 9 +- .../src/inverted_index/format/reader/blob.rs | 4 +- .../inverted_index/format/reader/footer.rs | 124 ++++++++++++------ src/index/src/lib.rs | 1 + 4 files changed, 98 insertions(+), 40 deletions(-) diff --git a/src/index/src/inverted_index/error.rs b/src/index/src/inverted_index/error.rs index 07a42b8b8767..430cd632e212 100644 --- a/src/index/src/inverted_index/error.rs +++ b/src/index/src/inverted_index/error.rs @@ -76,6 +76,12 @@ pub enum Error { location: Location, }, + #[snafu(display("Invalid footer payload size"))] + InvalidFooterPayloadSize { + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Unexpected inverted index footer payload size, max: {max_payload_size}, actual: {actual_payload_size}"))] UnexpectedFooterPayloadSize { max_payload_size: u64, @@ -229,7 +235,8 @@ impl ErrorExt for Error { | KeysApplierUnexpectedPredicates { .. } | CommonIo { .. } | UnknownIntermediateCodecMagic { .. } - | FstCompile { .. } => StatusCode::Unexpected, + | FstCompile { .. } + | InvalidFooterPayloadSize { .. } => StatusCode::Unexpected, ParseRegex { .. } | ParseDFA { .. } diff --git a/src/index/src/inverted_index/format/reader/blob.rs b/src/index/src/inverted_index/format/reader/blob.rs index ace0e5c48536..201813bf5fdf 100644 --- a/src/index/src/inverted_index/format/reader/blob.rs +++ b/src/index/src/inverted_index/format/reader/blob.rs @@ -19,6 +19,7 @@ use common_base::range_read::RangeReader; use greptime_proto::v1::index::InvertedIndexMetas; use snafu::{ensure, ResultExt}; +use super::footer::DEFAULT_PREFETCH_SIZE; use crate::inverted_index::error::{CommonIoSnafu, Result, UnexpectedBlobSizeSnafu}; use crate::inverted_index::format::reader::footer::InvertedIndeFooterReader; use crate::inverted_index::format::reader::InvertedIndexReader; @@ -72,7 +73,8 @@ impl InvertedIndexReader for InvertedIndexBlobReader { let blob_size = metadata.content_length; Self::validate_blob_size(blob_size)?; - let mut footer_reader = InvertedIndeFooterReader::new(&mut self.source, blob_size); + let mut footer_reader = InvertedIndeFooterReader::new(&mut self.source, blob_size) + .with_prefetch_size(DEFAULT_PREFETCH_SIZE); footer_reader.metadata().await.map(Arc::new) } } diff --git a/src/index/src/inverted_index/format/reader/footer.rs b/src/index/src/inverted_index/format/reader/footer.rs index 1f35237711ce..6cc18488325d 100644 --- a/src/index/src/inverted_index/format/reader/footer.rs +++ b/src/index/src/inverted_index/format/reader/footer.rs @@ -18,53 +18,83 @@ use prost::Message; use snafu::{ensure, ResultExt}; use crate::inverted_index::error::{ - CommonIoSnafu, DecodeProtoSnafu, Result, UnexpectedFooterPayloadSizeSnafu, - UnexpectedOffsetSizeSnafu, UnexpectedZeroSegmentRowCountSnafu, + CommonIoSnafu, DecodeProtoSnafu, InvalidFooterPayloadSizeSnafu, Result, + UnexpectedFooterPayloadSizeSnafu, UnexpectedOffsetSizeSnafu, + UnexpectedZeroSegmentRowCountSnafu, }; use crate::inverted_index::format::FOOTER_PAYLOAD_SIZE_SIZE; +pub const DEFAULT_PREFETCH_SIZE: u64 = 1024; // 1KiB + /// InvertedIndeFooterReader is for reading the footer section of the blob. pub struct InvertedIndeFooterReader { source: R, blob_size: u64, + prefetch_size: Option, } impl InvertedIndeFooterReader { pub fn new(source: R, blob_size: u64) -> Self { - Self { source, blob_size } + Self { + source, + blob_size, + prefetch_size: None, + } + } + + /// Set the prefetch size for the footer reader. + pub fn with_prefetch_size(mut self, prefetch_size: u64) -> Self { + self.prefetch_size = Some(prefetch_size.max(FOOTER_PAYLOAD_SIZE_SIZE)); + self + } + + pub fn prefetch_size(&self) -> u64 { + self.prefetch_size.unwrap_or(FOOTER_PAYLOAD_SIZE_SIZE) } } impl InvertedIndeFooterReader { pub async fn metadata(&mut self) -> Result { - let payload_size = self.read_payload_size().await?; - let metas = self.read_payload(payload_size).await?; - Ok(metas) - } - - async fn read_payload_size(&mut self) -> Result { - let mut size_buf = [0u8; FOOTER_PAYLOAD_SIZE_SIZE as usize]; - let end = self.blob_size; - let start = end - FOOTER_PAYLOAD_SIZE_SIZE; - self.source - .read_into(start..end, &mut &mut size_buf[..]) + let footer_start = self.blob_size.saturating_sub(self.prefetch_size()); + let suffix = self + .source + .read(footer_start..self.blob_size) .await .context(CommonIoSnafu)?; + let suffix_len = suffix.len(); + let length = u32::from_le_bytes(Self::read_tailing_four_bytes(&suffix)?) as u64; + self.validate_payload_size(length)?; + + let footer_size = FOOTER_PAYLOAD_SIZE_SIZE; + + // Did not fetch the entire file metadata in the initial read, need to make a second request. + if length > suffix_len as u64 - footer_size { + let metadata_start = self.blob_size - length - footer_size; + let meta = self + .source + .read(metadata_start..self.blob_size - footer_size) + .await + .context(CommonIoSnafu)?; + self.parse_payload(&meta, length) + } else { + let metadata_start = self.blob_size - length - footer_size - footer_start; + let meta = &suffix[metadata_start as usize..suffix_len - footer_size as usize]; + self.parse_payload(meta, length) + } + } - let payload_size = u32::from_le_bytes(size_buf) as u64; - self.validate_payload_size(payload_size)?; + fn read_tailing_four_bytes(suffix: &[u8]) -> Result<[u8; 4]> { + let suffix_len = suffix.len(); + ensure!(suffix_len >= 4, InvalidFooterPayloadSizeSnafu); + let mut bytes = [0; 4]; + bytes.copy_from_slice(&suffix[suffix_len - 4..suffix_len]); - Ok(payload_size) + Ok(bytes) } - async fn read_payload(&mut self, payload_size: u64) -> Result { - let end = self.blob_size - FOOTER_PAYLOAD_SIZE_SIZE; - let start = end - payload_size; - let bytes = self.source.read(start..end).await.context(CommonIoSnafu)?; - - let metas = InvertedIndexMetas::decode(&*bytes).context(DecodeProtoSnafu)?; + fn parse_payload(&mut self, bytes: &[u8], payload_size: u64) -> Result { + let metas = InvertedIndexMetas::decode(bytes).context(DecodeProtoSnafu)?; self.validate_metas(&metas, payload_size)?; - Ok(metas) } @@ -113,9 +143,12 @@ impl InvertedIndeFooterReader { #[cfg(test)] mod tests { + use std::assert_matches::assert_matches; + use prost::Message; use super::*; + use crate::inverted_index::error::Error; fn create_test_payload(meta: InvertedIndexMeta) -> Vec { let mut metas = InvertedIndexMetas { @@ -141,14 +174,18 @@ mod tests { let mut payload_buf = create_test_payload(meta); let blob_size = payload_buf.len() as u64; - let mut reader = InvertedIndeFooterReader::new(&mut payload_buf, blob_size); - let payload_size = reader.read_payload_size().await.unwrap(); - let metas = reader.read_payload(payload_size).await.unwrap(); + for prefetch in [0, blob_size / 2, blob_size, blob_size + 10] { + let mut reader = InvertedIndeFooterReader::new(&mut payload_buf, blob_size); + if prefetch > 0 { + reader = reader.with_prefetch_size(prefetch); + } - assert_eq!(metas.metas.len(), 1); - let index_meta = &metas.metas.get("test").unwrap(); - assert_eq!(index_meta.name, "test"); + let metas = reader.metadata().await.unwrap(); + assert_eq!(metas.metas.len(), 1); + let index_meta = &metas.metas.get("test").unwrap(); + assert_eq!(index_meta.name, "test"); + } } #[tokio::test] @@ -157,14 +194,20 @@ mod tests { name: "test".to_string(), ..Default::default() }; - let mut payload_buf = create_test_payload(meta); payload_buf.push(0xff); // Add an extra byte to corrupt the footer let blob_size = payload_buf.len() as u64; - let mut reader = InvertedIndeFooterReader::new(&mut payload_buf, blob_size); - let payload_size_result = reader.read_payload_size().await; - assert!(payload_size_result.is_err()); + for prefetch in [0, blob_size / 2, blob_size, blob_size + 10] { + let blob_size = payload_buf.len() as u64; + let mut reader = InvertedIndeFooterReader::new(&mut payload_buf, blob_size); + if prefetch > 0 { + reader = reader.with_prefetch_size(prefetch); + } + + let result = reader.metadata().await; + assert_matches!(result, Err(Error::UnexpectedFooterPayloadSize { .. })); + } } #[tokio::test] @@ -178,10 +221,15 @@ mod tests { let mut payload_buf = create_test_payload(meta); let blob_size = payload_buf.len() as u64; - let mut reader = InvertedIndeFooterReader::new(&mut payload_buf, blob_size); - let payload_size = reader.read_payload_size().await.unwrap(); - let payload_result = reader.read_payload(payload_size).await; - assert!(payload_result.is_err()); + for prefetch in [0, blob_size / 2, blob_size, blob_size + 10] { + let mut reader = InvertedIndeFooterReader::new(&mut payload_buf, blob_size); + if prefetch > 0 { + reader = reader.with_prefetch_size(prefetch); + } + + let result = reader.metadata().await; + assert_matches!(result, Err(Error::UnexpectedOffsetSize { .. })); + } } } diff --git a/src/index/src/lib.rs b/src/index/src/lib.rs index 197fc01818c0..5e2e41166863 100644 --- a/src/index/src/lib.rs +++ b/src/index/src/lib.rs @@ -13,6 +13,7 @@ // limitations under the License. #![feature(iter_partition_in_place)] +#![feature(assert_matches)] pub mod fulltext_index; pub mod inverted_index; From 3abb7dc2072c5b006d7ea5b4ebf0eded1ac86ee4 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 11 Dec 2024 14:51:51 +0000 Subject: [PATCH 2/3] chore: correct struct name --- src/index/src/inverted_index/format/reader/blob.rs | 4 ++-- .../src/inverted_index/format/reader/footer.rs | 14 +++++++------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/index/src/inverted_index/format/reader/blob.rs b/src/index/src/inverted_index/format/reader/blob.rs index 201813bf5fdf..de34cd36f849 100644 --- a/src/index/src/inverted_index/format/reader/blob.rs +++ b/src/index/src/inverted_index/format/reader/blob.rs @@ -21,7 +21,7 @@ use snafu::{ensure, ResultExt}; use super::footer::DEFAULT_PREFETCH_SIZE; use crate::inverted_index::error::{CommonIoSnafu, Result, UnexpectedBlobSizeSnafu}; -use crate::inverted_index::format::reader::footer::InvertedIndeFooterReader; +use crate::inverted_index::format::reader::footer::InvertedIndexFooterReader; use crate::inverted_index::format::reader::InvertedIndexReader; use crate::inverted_index::format::MIN_BLOB_SIZE; @@ -73,7 +73,7 @@ impl InvertedIndexReader for InvertedIndexBlobReader { let blob_size = metadata.content_length; Self::validate_blob_size(blob_size)?; - let mut footer_reader = InvertedIndeFooterReader::new(&mut self.source, blob_size) + let mut footer_reader = InvertedIndexFooterReader::new(&mut self.source, blob_size) .with_prefetch_size(DEFAULT_PREFETCH_SIZE); footer_reader.metadata().await.map(Arc::new) } diff --git a/src/index/src/inverted_index/format/reader/footer.rs b/src/index/src/inverted_index/format/reader/footer.rs index 6cc18488325d..e3dc83cddc1d 100644 --- a/src/index/src/inverted_index/format/reader/footer.rs +++ b/src/index/src/inverted_index/format/reader/footer.rs @@ -26,14 +26,14 @@ use crate::inverted_index::format::FOOTER_PAYLOAD_SIZE_SIZE; pub const DEFAULT_PREFETCH_SIZE: u64 = 1024; // 1KiB -/// InvertedIndeFooterReader is for reading the footer section of the blob. -pub struct InvertedIndeFooterReader { +/// InvertedIndexFooterReader is for reading the footer section of the blob. +pub struct InvertedIndexFooterReader { source: R, blob_size: u64, prefetch_size: Option, } -impl InvertedIndeFooterReader { +impl InvertedIndexFooterReader { pub fn new(source: R, blob_size: u64) -> Self { Self { source, @@ -53,7 +53,7 @@ impl InvertedIndeFooterReader { } } -impl InvertedIndeFooterReader { +impl InvertedIndexFooterReader { pub async fn metadata(&mut self) -> Result { let footer_start = self.blob_size.saturating_sub(self.prefetch_size()); let suffix = self @@ -176,7 +176,7 @@ mod tests { let blob_size = payload_buf.len() as u64; for prefetch in [0, blob_size / 2, blob_size, blob_size + 10] { - let mut reader = InvertedIndeFooterReader::new(&mut payload_buf, blob_size); + let mut reader = InvertedIndexFooterReader::new(&mut payload_buf, blob_size); if prefetch > 0 { reader = reader.with_prefetch_size(prefetch); } @@ -200,7 +200,7 @@ mod tests { for prefetch in [0, blob_size / 2, blob_size, blob_size + 10] { let blob_size = payload_buf.len() as u64; - let mut reader = InvertedIndeFooterReader::new(&mut payload_buf, blob_size); + let mut reader = InvertedIndexFooterReader::new(&mut payload_buf, blob_size); if prefetch > 0 { reader = reader.with_prefetch_size(prefetch); } @@ -223,7 +223,7 @@ mod tests { let blob_size = payload_buf.len() as u64; for prefetch in [0, blob_size / 2, blob_size, blob_size + 10] { - let mut reader = InvertedIndeFooterReader::new(&mut payload_buf, blob_size); + let mut reader = InvertedIndexFooterReader::new(&mut payload_buf, blob_size); if prefetch > 0 { reader = reader.with_prefetch_size(prefetch); } From c14ab76e546e16326f4a1fdc6b48ea017a810eb4 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 12 Dec 2024 03:25:20 +0000 Subject: [PATCH 3/3] chore: apply suggestions from CR --- src/index/src/inverted_index/error.rs | 9 ++++++++- src/index/src/inverted_index/format/reader/footer.rs | 7 ++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/src/index/src/inverted_index/error.rs b/src/index/src/inverted_index/error.rs index 430cd632e212..de05b6232b6a 100644 --- a/src/index/src/inverted_index/error.rs +++ b/src/index/src/inverted_index/error.rs @@ -76,6 +76,12 @@ pub enum Error { location: Location, }, + #[snafu(display("Blob size too small"))] + BlobSizeTooSmall { + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Invalid footer payload size"))] InvalidFooterPayloadSize { #[snafu(implicit)] @@ -236,7 +242,8 @@ impl ErrorExt for Error { | CommonIo { .. } | UnknownIntermediateCodecMagic { .. } | FstCompile { .. } - | InvalidFooterPayloadSize { .. } => StatusCode::Unexpected, + | InvalidFooterPayloadSize { .. } + | BlobSizeTooSmall { .. } => StatusCode::Unexpected, ParseRegex { .. } | ParseDFA { .. } diff --git a/src/index/src/inverted_index/format/reader/footer.rs b/src/index/src/inverted_index/format/reader/footer.rs index e3dc83cddc1d..c025ecf52ecd 100644 --- a/src/index/src/inverted_index/format/reader/footer.rs +++ b/src/index/src/inverted_index/format/reader/footer.rs @@ -18,7 +18,7 @@ use prost::Message; use snafu::{ensure, ResultExt}; use crate::inverted_index::error::{ - CommonIoSnafu, DecodeProtoSnafu, InvalidFooterPayloadSizeSnafu, Result, + BlobSizeTooSmallSnafu, CommonIoSnafu, DecodeProtoSnafu, InvalidFooterPayloadSizeSnafu, Result, UnexpectedFooterPayloadSizeSnafu, UnexpectedOffsetSizeSnafu, UnexpectedZeroSegmentRowCountSnafu, }; @@ -55,6 +55,11 @@ impl InvertedIndexFooterReader { impl InvertedIndexFooterReader { pub async fn metadata(&mut self) -> Result { + ensure!( + self.blob_size >= FOOTER_PAYLOAD_SIZE_SIZE, + BlobSizeTooSmallSnafu + ); + let footer_start = self.blob_size.saturating_sub(self.prefetch_size()); let suffix = self .source