Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GRPC endpoint placeholder for compact segments #324

Merged
merged 3 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 55 additions & 1 deletion rs/index_server/src/index_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use proto::muopdb::index_server_server::IndexServer;
use proto::muopdb::{
CreateCollectionRequest, CreateCollectionResponse, FlushRequest, FlushResponse,
GetSegmentsRequest, GetSegmentsResponse, InsertPackedRequest, InsertPackedResponse,
InsertRequest, InsertResponse, SearchRequest, SearchResponse,
InsertRequest, InsertResponse, SearchRequest, SearchResponse, CompactSegmentsRequest, CompactSegmentsResponse,
};
use tokio::sync::Mutex;
use utils::mem::{lows_and_highs_to_u128s, transmute_u8_to_slice, u128s_to_lows_highs};
Expand Down Expand Up @@ -392,4 +392,58 @@ impl IndexServer for IndexServerImpl {
)),
}
}

async fn compact_segments(
&self,
request: tonic::Request<CompactSegmentsRequest>,
) -> Result<tonic::Response<CompactSegmentsResponse>, tonic::Status> {
let start = std::time::Instant::now();
let req = request.into_inner();
let collection_name = req.collection_name;
let segment_names = req.segment_names;

let collection_opt = self
.collection_catalog
.lock()
.await
.get_collection(&collection_name)
.await;

match collection_opt {
Some(collection) => {
// Validation that segments exist in the collection
let segments = collection.get_all_segment_names();
let missing_segments: Vec<String> = segment_names
.iter()
.filter(|segment_name| !segments.contains(segment_name))
.cloned()
.collect();
if !missing_segments.is_empty() {
return Err(tonic::Status::new(
tonic::Code::NotFound,
format!("Segments not found: {:?}", missing_segments),
));
}

if segment_names.len() <= 1 {
return Err(tonic::Status::new(
tonic::Code::InvalidArgument,
"Require at least 2 segments to compact",
));
}

// TODO- khoa165: Logic to compact segments here

let end = std::time::Instant::now();
let duration = end.duration_since(start);
info!("[{}] Compacted {} segments in {:?}", collection_name, segment_names.len(), duration);

Ok(tonic::Response::new(CompactSegmentsResponse {}))
}
None => Err(tonic::Status::new(
tonic::Code::NotFound,
"Collection not found",
)),
}
}
}
11 changes: 11 additions & 0 deletions rs/proto/proto/muopdb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ service IndexServer {
rpc Flush(FlushRequest) returns (FlushResponse) {}

rpc GetSegments(GetSegmentsRequest) returns (GetSegmentsResponse) {}

rpc CompactSegments(CompactSegmentsRequest) returns (CompactSegmentsResponse) {}
}

message GetSegmentsRequest {
Expand All @@ -57,6 +59,15 @@ message GetSegmentsResponse {
repeated string segment_names = 1;
}

message CompactSegmentsRequest {
string collection_name = 1;
repeated string segment_names = 2;
}

message CompactSegmentsResponse {
}


message CreateCollectionRequest {
string collection_name = 1;

Expand Down
62 changes: 62 additions & 0 deletions rs/proto/src/muopdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,17 @@ pub struct GetSegmentsResponse {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CompactSegmentsRequest {
#[prost(string, tag = "1")]
pub collection_name: ::prost::alloc::string::String,
#[prost(string, repeated, tag = "2")]
pub segment_names: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CompactSegmentsResponse {}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CreateCollectionRequest {
#[prost(string, tag = "1")]
pub collection_name: ::prost::alloc::string::String,
Expand Down Expand Up @@ -494,6 +505,20 @@ pub mod index_server_client {
let path = http::uri::PathAndQuery::from_static("/muopdb.IndexServer/GetSegments");
self.inner.unary(request.into_request(), path, codec).await
}
pub async fn compact_segments(
&mut self,
request: impl tonic::IntoRequest<super::CompactSegmentsRequest>,
) -> Result<tonic::Response<super::CompactSegmentsResponse>, tonic::Status> {
self.inner.ready().await.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static("/muopdb.IndexServer/CompactSegments");
self.inner.unary(request.into_request(), path, codec).await
}
}
}
/// Generated server implementations.
Expand Down Expand Up @@ -658,6 +683,10 @@ pub mod index_server_server {
&self,
request: tonic::Request<super::GetSegmentsRequest>,
) -> Result<tonic::Response<super::GetSegmentsResponse>, tonic::Status>;
async fn compact_segments(
&self,
request: tonic::Request<super::CompactSegmentsRequest>,
) -> Result<tonic::Response<super::CompactSegmentsResponse>, tonic::Status>;
}
#[derive(Debug)]
pub struct IndexServerServer<T: IndexServer> {
Expand Down Expand Up @@ -902,6 +931,39 @@ pub mod index_server_server {
};
Box::pin(fut)
}
"/muopdb.IndexServer/CompactSegments" => {
#[allow(non_camel_case_types)]
struct CompactSegmentsSvc<T: IndexServer>(pub Arc<T>);
impl<T: IndexServer> tonic::server::UnaryService<super::CompactSegmentsRequest>
for CompactSegmentsSvc<T>
{
type Response = super::CompactSegmentsResponse;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
fn call(
&mut self,
request: tonic::Request<super::CompactSegmentsRequest>,
) -> Self::Future {
let inner = self.0.clone();
let fut = async move { (*inner).compact_segments(request).await };
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
let method = CompactSegmentsSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
_ => Box::pin(async move {
Ok(http::Response::builder()
.status(200)
Expand Down
Loading