diff --git a/src/index/src/inverted_index/error.rs b/src/index/src/inverted_index/error.rs index 07a42b8b8767..430cd632e212 100644 --- a/src/index/src/inverted_index/error.rs +++ b/src/index/src/inverted_index/error.rs @@ -76,6 +76,12 @@ pub enum Error { location: Location, }, + #[snafu(display("Invalid footer payload size"))] + InvalidFooterPayloadSize { + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Unexpected inverted index footer payload size, max: {max_payload_size}, actual: {actual_payload_size}"))] UnexpectedFooterPayloadSize { max_payload_size: u64, @@ -229,7 +235,8 @@ impl ErrorExt for Error { | KeysApplierUnexpectedPredicates { .. } | CommonIo { .. } | UnknownIntermediateCodecMagic { .. } - | FstCompile { .. } => StatusCode::Unexpected, + | FstCompile { .. } + | InvalidFooterPayloadSize { .. } => StatusCode::Unexpected, ParseRegex { .. } | ParseDFA { .. } diff --git a/src/index/src/inverted_index/format/reader/blob.rs b/src/index/src/inverted_index/format/reader/blob.rs index ace0e5c48536..201813bf5fdf 100644 --- a/src/index/src/inverted_index/format/reader/blob.rs +++ b/src/index/src/inverted_index/format/reader/blob.rs @@ -19,6 +19,7 @@ use common_base::range_read::RangeReader; use greptime_proto::v1::index::InvertedIndexMetas; use snafu::{ensure, ResultExt}; +use super::footer::DEFAULT_PREFETCH_SIZE; use crate::inverted_index::error::{CommonIoSnafu, Result, UnexpectedBlobSizeSnafu}; use crate::inverted_index::format::reader::footer::InvertedIndeFooterReader; use crate::inverted_index::format::reader::InvertedIndexReader; @@ -72,7 +73,8 @@ impl InvertedIndexReader for InvertedIndexBlobReader { let blob_size = metadata.content_length; Self::validate_blob_size(blob_size)?; - let mut footer_reader = InvertedIndeFooterReader::new(&mut self.source, blob_size); + let mut footer_reader = InvertedIndeFooterReader::new(&mut self.source, blob_size) + .with_prefetch_size(DEFAULT_PREFETCH_SIZE); footer_reader.metadata().await.map(Arc::new) } } diff --git a/src/index/src/inverted_index/format/reader/footer.rs b/src/index/src/inverted_index/format/reader/footer.rs index 1f35237711ce..6cc18488325d 100644 --- a/src/index/src/inverted_index/format/reader/footer.rs +++ b/src/index/src/inverted_index/format/reader/footer.rs @@ -18,53 +18,83 @@ use prost::Message; use snafu::{ensure, ResultExt}; use crate::inverted_index::error::{ - CommonIoSnafu, DecodeProtoSnafu, Result, UnexpectedFooterPayloadSizeSnafu, - UnexpectedOffsetSizeSnafu, UnexpectedZeroSegmentRowCountSnafu, + CommonIoSnafu, DecodeProtoSnafu, InvalidFooterPayloadSizeSnafu, Result, + UnexpectedFooterPayloadSizeSnafu, UnexpectedOffsetSizeSnafu, + UnexpectedZeroSegmentRowCountSnafu, }; use crate::inverted_index::format::FOOTER_PAYLOAD_SIZE_SIZE; +pub const DEFAULT_PREFETCH_SIZE: u64 = 1024; // 1KiB + /// InvertedIndeFooterReader is for reading the footer section of the blob. pub struct InvertedIndeFooterReader { source: R, blob_size: u64, + prefetch_size: Option, } impl InvertedIndeFooterReader { pub fn new(source: R, blob_size: u64) -> Self { - Self { source, blob_size } + Self { + source, + blob_size, + prefetch_size: None, + } + } + + /// Set the prefetch size for the footer reader. + pub fn with_prefetch_size(mut self, prefetch_size: u64) -> Self { + self.prefetch_size = Some(prefetch_size.max(FOOTER_PAYLOAD_SIZE_SIZE)); + self + } + + pub fn prefetch_size(&self) -> u64 { + self.prefetch_size.unwrap_or(FOOTER_PAYLOAD_SIZE_SIZE) } } impl InvertedIndeFooterReader { pub async fn metadata(&mut self) -> Result { - let payload_size = self.read_payload_size().await?; - let metas = self.read_payload(payload_size).await?; - Ok(metas) - } - - async fn read_payload_size(&mut self) -> Result { - let mut size_buf = [0u8; FOOTER_PAYLOAD_SIZE_SIZE as usize]; - let end = self.blob_size; - let start = end - FOOTER_PAYLOAD_SIZE_SIZE; - self.source - .read_into(start..end, &mut &mut size_buf[..]) + let footer_start = self.blob_size.saturating_sub(self.prefetch_size()); + let suffix = self + .source + .read(footer_start..self.blob_size) .await .context(CommonIoSnafu)?; + let suffix_len = suffix.len(); + let length = u32::from_le_bytes(Self::read_tailing_four_bytes(&suffix)?) as u64; + self.validate_payload_size(length)?; + + let footer_size = FOOTER_PAYLOAD_SIZE_SIZE; + + // Did not fetch the entire file metadata in the initial read, need to make a second request. + if length > suffix_len as u64 - footer_size { + let metadata_start = self.blob_size - length - footer_size; + let meta = self + .source + .read(metadata_start..self.blob_size - footer_size) + .await + .context(CommonIoSnafu)?; + self.parse_payload(&meta, length) + } else { + let metadata_start = self.blob_size - length - footer_size - footer_start; + let meta = &suffix[metadata_start as usize..suffix_len - footer_size as usize]; + self.parse_payload(meta, length) + } + } - let payload_size = u32::from_le_bytes(size_buf) as u64; - self.validate_payload_size(payload_size)?; + fn read_tailing_four_bytes(suffix: &[u8]) -> Result<[u8; 4]> { + let suffix_len = suffix.len(); + ensure!(suffix_len >= 4, InvalidFooterPayloadSizeSnafu); + let mut bytes = [0; 4]; + bytes.copy_from_slice(&suffix[suffix_len - 4..suffix_len]); - Ok(payload_size) + Ok(bytes) } - async fn read_payload(&mut self, payload_size: u64) -> Result { - let end = self.blob_size - FOOTER_PAYLOAD_SIZE_SIZE; - let start = end - payload_size; - let bytes = self.source.read(start..end).await.context(CommonIoSnafu)?; - - let metas = InvertedIndexMetas::decode(&*bytes).context(DecodeProtoSnafu)?; + fn parse_payload(&mut self, bytes: &[u8], payload_size: u64) -> Result { + let metas = InvertedIndexMetas::decode(bytes).context(DecodeProtoSnafu)?; self.validate_metas(&metas, payload_size)?; - Ok(metas) } @@ -113,9 +143,12 @@ impl InvertedIndeFooterReader { #[cfg(test)] mod tests { + use std::assert_matches::assert_matches; + use prost::Message; use super::*; + use crate::inverted_index::error::Error; fn create_test_payload(meta: InvertedIndexMeta) -> Vec { let mut metas = InvertedIndexMetas { @@ -141,14 +174,18 @@ mod tests { let mut payload_buf = create_test_payload(meta); let blob_size = payload_buf.len() as u64; - let mut reader = InvertedIndeFooterReader::new(&mut payload_buf, blob_size); - let payload_size = reader.read_payload_size().await.unwrap(); - let metas = reader.read_payload(payload_size).await.unwrap(); + for prefetch in [0, blob_size / 2, blob_size, blob_size + 10] { + let mut reader = InvertedIndeFooterReader::new(&mut payload_buf, blob_size); + if prefetch > 0 { + reader = reader.with_prefetch_size(prefetch); + } - assert_eq!(metas.metas.len(), 1); - let index_meta = &metas.metas.get("test").unwrap(); - assert_eq!(index_meta.name, "test"); + let metas = reader.metadata().await.unwrap(); + assert_eq!(metas.metas.len(), 1); + let index_meta = &metas.metas.get("test").unwrap(); + assert_eq!(index_meta.name, "test"); + } } #[tokio::test] @@ -157,14 +194,20 @@ mod tests { name: "test".to_string(), ..Default::default() }; - let mut payload_buf = create_test_payload(meta); payload_buf.push(0xff); // Add an extra byte to corrupt the footer let blob_size = payload_buf.len() as u64; - let mut reader = InvertedIndeFooterReader::new(&mut payload_buf, blob_size); - let payload_size_result = reader.read_payload_size().await; - assert!(payload_size_result.is_err()); + for prefetch in [0, blob_size / 2, blob_size, blob_size + 10] { + let blob_size = payload_buf.len() as u64; + let mut reader = InvertedIndeFooterReader::new(&mut payload_buf, blob_size); + if prefetch > 0 { + reader = reader.with_prefetch_size(prefetch); + } + + let result = reader.metadata().await; + assert_matches!(result, Err(Error::UnexpectedFooterPayloadSize { .. })); + } } #[tokio::test] @@ -178,10 +221,15 @@ mod tests { let mut payload_buf = create_test_payload(meta); let blob_size = payload_buf.len() as u64; - let mut reader = InvertedIndeFooterReader::new(&mut payload_buf, blob_size); - let payload_size = reader.read_payload_size().await.unwrap(); - let payload_result = reader.read_payload(payload_size).await; - assert!(payload_result.is_err()); + for prefetch in [0, blob_size / 2, blob_size, blob_size + 10] { + let mut reader = InvertedIndeFooterReader::new(&mut payload_buf, blob_size); + if prefetch > 0 { + reader = reader.with_prefetch_size(prefetch); + } + + let result = reader.metadata().await; + assert_matches!(result, Err(Error::UnexpectedOffsetSize { .. })); + } } } diff --git a/src/index/src/lib.rs b/src/index/src/lib.rs index 197fc01818c0..5e2e41166863 100644 --- a/src/index/src/lib.rs +++ b/src/index/src/lib.rs @@ -13,6 +13,7 @@ // limitations under the License. #![feature(iter_partition_in_place)] +#![feature(assert_matches)] pub mod fulltext_index; pub mod inverted_index;