From f0f4731560bea8e45e8d0892ad74b67c71a7d236 Mon Sep 17 00:00:00 2001 From: Khoa Le Date: Mon, 20 Jan 2025 12:45:02 -0500 Subject: [PATCH 1/3] GRPC endpoint placeholder for compact segments --- rs/index_server/src/index_server.rs | 35 +++++++++++++++- rs/proto/proto/muopdb.proto | 11 +++++ rs/proto/src/muopdb.rs | 62 +++++++++++++++++++++++++++++ 3 files changed, 107 insertions(+), 1 deletion(-) diff --git a/rs/index_server/src/index_server.rs b/rs/index_server/src/index_server.rs index 6e603e7..4ac57a6 100644 --- a/rs/index_server/src/index_server.rs +++ b/rs/index_server/src/index_server.rs @@ -8,7 +8,7 @@ use proto::muopdb::index_server_server::IndexServer; use proto::muopdb::{ CreateCollectionRequest, CreateCollectionResponse, FlushRequest, FlushResponse, GetSegmentsRequest, GetSegmentsResponse, InsertPackedRequest, InsertPackedResponse, - InsertRequest, InsertResponse, SearchRequest, SearchResponse, + InsertRequest, InsertResponse, SearchRequest, SearchResponse, CompactSegmentsRequest, CompactSegmentsResponse, }; use tokio::sync::Mutex; use utils::mem::{lows_and_highs_to_u128s, transmute_u8_to_slice, u128s_to_lows_highs}; @@ -392,4 +392,37 @@ impl IndexServer for IndexServerImpl { )), } } + + async fn compact_segments( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let start = std::time::Instant::now(); + let req = request.into_inner(); + let collection_name = req.collection_name; + let _segment_names = req.segment_names; + + let collection_opt = self + .collection_catalog + .lock() + .await + .get_collection(&collection_name) + .await; + + match collection_opt { + Some(_collection) => { + // Validation that segments exist in the collection + // Logic to compact segments here + let end = std::time::Instant::now(); + let duration = end.duration_since(start); + info!("[{}] Compacted segments in {:?}", collection_name, duration); + + Ok(tonic::Response::new(CompactSegmentsResponse {})) + } + None => Err(tonic::Status::new( + tonic::Code::NotFound, + "Collection not found", + )), + } + } } diff --git a/rs/proto/proto/muopdb.proto b/rs/proto/proto/muopdb.proto index b1cb5d1..dc59497 100644 --- a/rs/proto/proto/muopdb.proto +++ b/rs/proto/proto/muopdb.proto @@ -47,6 +47,8 @@ service IndexServer { rpc Flush(FlushRequest) returns (FlushResponse) {} rpc GetSegments(GetSegmentsRequest) returns (GetSegmentsResponse) {} + + rpc CompactSegments(CompactSegmentsRequest) returns (CompactSegmentsResponse) {} } message GetSegmentsRequest { @@ -57,6 +59,15 @@ message GetSegmentsResponse { repeated string segment_names = 1; } +message CompactSegmentsRequest { + string collection_name = 1; + repeated string segment_names = 2; +} + +message CompactSegmentsResponse { +} + + message CreateCollectionRequest { string collection_name = 1; diff --git a/rs/proto/src/muopdb.rs b/rs/proto/src/muopdb.rs index 5f1453b..f707ae3 100644 --- a/rs/proto/src/muopdb.rs +++ b/rs/proto/src/muopdb.rs @@ -42,6 +42,17 @@ pub struct GetSegmentsResponse { } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct CompactSegmentsRequest { + #[prost(string, tag = "1")] + pub collection_name: ::prost::alloc::string::String, + #[prost(string, repeated, tag = "2")] + pub segment_names: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct CompactSegmentsResponse {} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct CreateCollectionRequest { #[prost(string, tag = "1")] pub collection_name: ::prost::alloc::string::String, @@ -494,6 +505,20 @@ pub mod index_server_client { let path = http::uri::PathAndQuery::from_static("/muopdb.IndexServer/GetSegments"); self.inner.unary(request.into_request(), path, codec).await } + pub async fn compact_segments( + &mut self, + request: impl tonic::IntoRequest, + ) -> Result, tonic::Status> { + self.inner.ready().await.map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/muopdb.IndexServer/CompactSegments"); + self.inner.unary(request.into_request(), path, codec).await + } } } /// Generated server implementations. @@ -658,6 +683,10 @@ pub mod index_server_server { &self, request: tonic::Request, ) -> Result, tonic::Status>; + async fn compact_segments( + &self, + request: tonic::Request, + ) -> Result, tonic::Status>; } #[derive(Debug)] pub struct IndexServerServer { @@ -902,6 +931,39 @@ pub mod index_server_server { }; Box::pin(fut) } + "/muopdb.IndexServer/CompactSegments" => { + #[allow(non_camel_case_types)] + struct CompactSegmentsSvc(pub Arc); + impl tonic::server::UnaryService + for CompactSegmentsSvc + { + type Response = super::CompactSegmentsResponse; + type Future = BoxFuture, tonic::Status>; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = self.0.clone(); + let fut = async move { (*inner).compact_segments(request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = CompactSegmentsSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } _ => Box::pin(async move { Ok(http::Response::builder() .status(200) From 17721dcc33959c029e0863ce29c34d89276f77f9 Mon Sep 17 00:00:00 2001 From: Khoa Le Date: Mon, 20 Jan 2025 13:04:24 -0500 Subject: [PATCH 2/3] Validation for segments in collection --- rs/index_server/src/index_server.rs | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/rs/index_server/src/index_server.rs b/rs/index_server/src/index_server.rs index 4ac57a6..bd8fdbe 100644 --- a/rs/index_server/src/index_server.rs +++ b/rs/index_server/src/index_server.rs @@ -400,7 +400,7 @@ impl IndexServer for IndexServerImpl { let start = std::time::Instant::now(); let req = request.into_inner(); let collection_name = req.collection_name; - let _segment_names = req.segment_names; + let segment_names = req.segment_names; let collection_opt = self .collection_catalog @@ -410,12 +410,26 @@ impl IndexServer for IndexServerImpl { .await; match collection_opt { - Some(_collection) => { + Some(collection) => { // Validation that segments exist in the collection - // Logic to compact segments here + let segments = collection.get_all_segment_names(); + let missing_segments: Vec = segment_names + .iter() + .filter(|segment_name| !segments.contains(segment_name)) + .cloned() + .collect(); + if !missing_segments.is_empty() { + return Err(tonic::Status::new( + tonic::Code::NotFound, + format!("Segments not found: {:?}", missing_segments), + )); + } + + // TODO- khoa165: Logic to compact segments here + let end = std::time::Instant::now(); let duration = end.duration_since(start); - info!("[{}] Compacted segments in {:?}", collection_name, duration); + info!("[{}] Compacted {} segments in {:?}", collection_name, segment_names.len(), duration); Ok(tonic::Response::new(CompactSegmentsResponse {})) } From 369ab73e4280ef7888235d74130929f4b10219b0 Mon Sep 17 00:00:00 2001 From: Khoa Le Date: Mon, 20 Jan 2025 14:15:30 -0500 Subject: [PATCH 3/3] Validation to require at least 2 segments in input --- rs/index_server/src/index_server.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/rs/index_server/src/index_server.rs b/rs/index_server/src/index_server.rs index bd8fdbe..410856b 100644 --- a/rs/index_server/src/index_server.rs +++ b/rs/index_server/src/index_server.rs @@ -425,6 +425,13 @@ impl IndexServer for IndexServerImpl { )); } + if segment_names.len() <= 1 { + return Err(tonic::Status::new( + tonic::Code::InvalidArgument, + "Require at least 2 segments to compact", + )); + } + // TODO- khoa165: Logic to compact segments here let end = std::time::Instant::now();