From 3060f93b1cd8b6d853f17c826d7d1a3ebf6bec57 Mon Sep 17 00:00:00 2001 From: Khoa Thien Le <46258781+khoa165@users.noreply.github.com> Date: Tue, 21 Jan 2025 18:12:37 -0500 Subject: [PATCH] GRPC endpoint placeholder for compact segments (#324) * GRPC endpoint placeholder for compact segments * Validation for segments in collection * Validation to require at least 2 segments in input --- rs/index_server/src/index_server.rs | 56 +++++++++++++++++++++++++- rs/proto/proto/muopdb.proto | 11 +++++ rs/proto/src/muopdb.rs | 62 +++++++++++++++++++++++++++++ 3 files changed, 128 insertions(+), 1 deletion(-) diff --git a/rs/index_server/src/index_server.rs b/rs/index_server/src/index_server.rs index 6e603e7..410856b 100644 --- a/rs/index_server/src/index_server.rs +++ b/rs/index_server/src/index_server.rs @@ -8,7 +8,7 @@ use proto::muopdb::index_server_server::IndexServer; use proto::muopdb::{ CreateCollectionRequest, CreateCollectionResponse, FlushRequest, FlushResponse, GetSegmentsRequest, GetSegmentsResponse, InsertPackedRequest, InsertPackedResponse, - InsertRequest, InsertResponse, SearchRequest, SearchResponse, + InsertRequest, InsertResponse, SearchRequest, SearchResponse, CompactSegmentsRequest, CompactSegmentsResponse, }; use tokio::sync::Mutex; use utils::mem::{lows_and_highs_to_u128s, transmute_u8_to_slice, u128s_to_lows_highs}; @@ -392,4 +392,58 @@ impl IndexServer for IndexServerImpl { )), } } + + async fn compact_segments( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let start = std::time::Instant::now(); + let req = request.into_inner(); + let collection_name = req.collection_name; + let segment_names = req.segment_names; + + let collection_opt = self + .collection_catalog + .lock() + .await + .get_collection(&collection_name) + .await; + + match collection_opt { + Some(collection) => { + // Validation that segments exist in the collection + let segments = collection.get_all_segment_names(); + let missing_segments: Vec = segment_names + .iter() + .filter(|segment_name| !segments.contains(segment_name)) + .cloned() + .collect(); + if !missing_segments.is_empty() { + return Err(tonic::Status::new( + tonic::Code::NotFound, + format!("Segments not found: {:?}", missing_segments), + )); + } + + if segment_names.len() <= 1 { + return Err(tonic::Status::new( + tonic::Code::InvalidArgument, + "Require at least 2 segments to compact", + )); + } + + // TODO- khoa165: Logic to compact segments here + + let end = std::time::Instant::now(); + let duration = end.duration_since(start); + info!("[{}] Compacted {} segments in {:?}", collection_name, segment_names.len(), duration); + + Ok(tonic::Response::new(CompactSegmentsResponse {})) + } + None => Err(tonic::Status::new( + tonic::Code::NotFound, + "Collection not found", + )), + } + } } diff --git a/rs/proto/proto/muopdb.proto b/rs/proto/proto/muopdb.proto index b1cb5d1..dc59497 100644 --- a/rs/proto/proto/muopdb.proto +++ b/rs/proto/proto/muopdb.proto @@ -47,6 +47,8 @@ service IndexServer { rpc Flush(FlushRequest) returns (FlushResponse) {} rpc GetSegments(GetSegmentsRequest) returns (GetSegmentsResponse) {} + + rpc CompactSegments(CompactSegmentsRequest) returns (CompactSegmentsResponse) {} } message GetSegmentsRequest { @@ -57,6 +59,15 @@ message GetSegmentsResponse { repeated string segment_names = 1; } +message CompactSegmentsRequest { + string collection_name = 1; + repeated string segment_names = 2; +} + +message CompactSegmentsResponse { +} + + message CreateCollectionRequest { string collection_name = 1; diff --git a/rs/proto/src/muopdb.rs b/rs/proto/src/muopdb.rs index 5f1453b..f707ae3 100644 --- a/rs/proto/src/muopdb.rs +++ b/rs/proto/src/muopdb.rs @@ -42,6 +42,17 @@ pub struct GetSegmentsResponse { } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct CompactSegmentsRequest { + #[prost(string, tag = "1")] + pub collection_name: ::prost::alloc::string::String, + #[prost(string, repeated, tag = "2")] + pub segment_names: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct CompactSegmentsResponse {} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct CreateCollectionRequest { #[prost(string, tag = "1")] pub collection_name: ::prost::alloc::string::String, @@ -494,6 +505,20 @@ pub mod index_server_client { let path = http::uri::PathAndQuery::from_static("/muopdb.IndexServer/GetSegments"); self.inner.unary(request.into_request(), path, codec).await } + pub async fn compact_segments( + &mut self, + request: impl tonic::IntoRequest, + ) -> Result, tonic::Status> { + self.inner.ready().await.map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/muopdb.IndexServer/CompactSegments"); + self.inner.unary(request.into_request(), path, codec).await + } } } /// Generated server implementations. @@ -658,6 +683,10 @@ pub mod index_server_server { &self, request: tonic::Request, ) -> Result, tonic::Status>; + async fn compact_segments( + &self, + request: tonic::Request, + ) -> Result, tonic::Status>; } #[derive(Debug)] pub struct IndexServerServer { @@ -902,6 +931,39 @@ pub mod index_server_server { }; Box::pin(fut) } + "/muopdb.IndexServer/CompactSegments" => { + #[allow(non_camel_case_types)] + struct CompactSegmentsSvc(pub Arc); + impl tonic::server::UnaryService + for CompactSegmentsSvc + { + type Response = super::CompactSegmentsResponse; + type Future = BoxFuture, tonic::Status>; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = self.0.clone(); + let fut = async move { (*inner).compact_segments(request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = CompactSegmentsSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } _ => Box::pin(async move { Ok(http::Response::builder() .status(200)