Skip to content

Commit

Permalink
GRPC endpoint placeholder for compact segments (#324)
Browse files Browse the repository at this point in the history
* GRPC endpoint placeholder for compact segments

* Validation for segments in collection

* Validation to require at least 2 segments in input
  • Loading branch information
khoa165 authored Jan 21, 2025
1 parent c46fdae commit 3060f93
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 1 deletion.
56 changes: 55 additions & 1 deletion rs/index_server/src/index_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use proto::muopdb::index_server_server::IndexServer;
use proto::muopdb::{
CreateCollectionRequest, CreateCollectionResponse, FlushRequest, FlushResponse,
GetSegmentsRequest, GetSegmentsResponse, InsertPackedRequest, InsertPackedResponse,
InsertRequest, InsertResponse, SearchRequest, SearchResponse,
InsertRequest, InsertResponse, SearchRequest, SearchResponse, CompactSegmentsRequest, CompactSegmentsResponse,
};
use tokio::sync::Mutex;
use utils::mem::{lows_and_highs_to_u128s, transmute_u8_to_slice, u128s_to_lows_highs};
Expand Down Expand Up @@ -392,4 +392,58 @@ impl IndexServer for IndexServerImpl {
)),
}
}

async fn compact_segments(
&self,
request: tonic::Request<CompactSegmentsRequest>,
) -> Result<tonic::Response<CompactSegmentsResponse>, tonic::Status> {
let start = std::time::Instant::now();
let req = request.into_inner();
let collection_name = req.collection_name;
let segment_names = req.segment_names;

let collection_opt = self
.collection_catalog
.lock()
.await
.get_collection(&collection_name)
.await;

match collection_opt {
Some(collection) => {
// Validation that segments exist in the collection
let segments = collection.get_all_segment_names();
let missing_segments: Vec<String> = segment_names
.iter()
.filter(|segment_name| !segments.contains(segment_name))
.cloned()
.collect();
if !missing_segments.is_empty() {
return Err(tonic::Status::new(
tonic::Code::NotFound,
format!("Segments not found: {:?}", missing_segments),
));
}

if segment_names.len() <= 1 {
return Err(tonic::Status::new(
tonic::Code::InvalidArgument,
"Require at least 2 segments to compact",
));
}

// TODO- khoa165: Logic to compact segments here

let end = std::time::Instant::now();
let duration = end.duration_since(start);
info!("[{}] Compacted {} segments in {:?}", collection_name, segment_names.len(), duration);

Ok(tonic::Response::new(CompactSegmentsResponse {}))
}
None => Err(tonic::Status::new(
tonic::Code::NotFound,
"Collection not found",
)),
}
}
}
11 changes: 11 additions & 0 deletions rs/proto/proto/muopdb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ service IndexServer {
rpc Flush(FlushRequest) returns (FlushResponse) {}

rpc GetSegments(GetSegmentsRequest) returns (GetSegmentsResponse) {}

rpc CompactSegments(CompactSegmentsRequest) returns (CompactSegmentsResponse) {}
}

message GetSegmentsRequest {
Expand All @@ -57,6 +59,15 @@ message GetSegmentsResponse {
repeated string segment_names = 1;
}

message CompactSegmentsRequest {
string collection_name = 1;
repeated string segment_names = 2;
}

message CompactSegmentsResponse {
}


message CreateCollectionRequest {
string collection_name = 1;

Expand Down
62 changes: 62 additions & 0 deletions rs/proto/src/muopdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,17 @@ pub struct GetSegmentsResponse {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CompactSegmentsRequest {
#[prost(string, tag = "1")]
pub collection_name: ::prost::alloc::string::String,
#[prost(string, repeated, tag = "2")]
pub segment_names: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CompactSegmentsResponse {}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CreateCollectionRequest {
#[prost(string, tag = "1")]
pub collection_name: ::prost::alloc::string::String,
Expand Down Expand Up @@ -494,6 +505,20 @@ pub mod index_server_client {
let path = http::uri::PathAndQuery::from_static("/muopdb.IndexServer/GetSegments");
self.inner.unary(request.into_request(), path, codec).await
}
pub async fn compact_segments(
&mut self,
request: impl tonic::IntoRequest<super::CompactSegmentsRequest>,
) -> Result<tonic::Response<super::CompactSegmentsResponse>, tonic::Status> {
self.inner.ready().await.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static("/muopdb.IndexServer/CompactSegments");
self.inner.unary(request.into_request(), path, codec).await
}
}
}
/// Generated server implementations.
Expand Down Expand Up @@ -658,6 +683,10 @@ pub mod index_server_server {
&self,
request: tonic::Request<super::GetSegmentsRequest>,
) -> Result<tonic::Response<super::GetSegmentsResponse>, tonic::Status>;
async fn compact_segments(
&self,
request: tonic::Request<super::CompactSegmentsRequest>,
) -> Result<tonic::Response<super::CompactSegmentsResponse>, tonic::Status>;
}
#[derive(Debug)]
pub struct IndexServerServer<T: IndexServer> {
Expand Down Expand Up @@ -902,6 +931,39 @@ pub mod index_server_server {
};
Box::pin(fut)
}
"/muopdb.IndexServer/CompactSegments" => {
#[allow(non_camel_case_types)]
struct CompactSegmentsSvc<T: IndexServer>(pub Arc<T>);
impl<T: IndexServer> tonic::server::UnaryService<super::CompactSegmentsRequest>
for CompactSegmentsSvc<T>
{
type Response = super::CompactSegmentsResponse;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
fn call(
&mut self,
request: tonic::Request<super::CompactSegmentsRequest>,
) -> Self::Future {
let inner = self.0.clone();
let fut = async move { (*inner).compact_segments(request).await };
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
let method = CompactSegmentsSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
_ => Box::pin(async move {
Ok(http::Response::builder()
.status(200)
Expand Down

0 comments on commit 3060f93

Please sign in to comment.