Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 55 additions & 1 deletion rs/index_server/src/index_server.rs
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@ use proto::muopdb::index_server_server::IndexServer;
use proto::muopdb::{
CreateCollectionRequest, CreateCollectionResponse, FlushRequest, FlushResponse,
GetSegmentsRequest, GetSegmentsResponse, InsertPackedRequest, InsertPackedResponse,
InsertRequest, InsertResponse, SearchRequest, SearchResponse,
InsertRequest, InsertResponse, SearchRequest, SearchResponse, CompactSegmentsRequest, CompactSegmentsResponse,
};
use tokio::sync::Mutex;
use utils::mem::{lows_and_highs_to_u128s, transmute_u8_to_slice, u128s_to_lows_highs};
@@ -392,4 +392,58 @@ impl IndexServer for IndexServerImpl {
)),
}
}

async fn compact_segments(
&self,
request: tonic::Request<CompactSegmentsRequest>,
) -> Result<tonic::Response<CompactSegmentsResponse>, tonic::Status> {
let start = std::time::Instant::now();
let req = request.into_inner();
let collection_name = req.collection_name;
let segment_names = req.segment_names;

let collection_opt = self
.collection_catalog
.lock()
.await
.get_collection(&collection_name)
.await;

match collection_opt {
Some(collection) => {
// Validation that segments exist in the collection
let segments = collection.get_all_segment_names();
let missing_segments: Vec<String> = segment_names
.iter()
.filter(|segment_name| !segments.contains(segment_name))
.cloned()
.collect();
if !missing_segments.is_empty() {
return Err(tonic::Status::new(
tonic::Code::NotFound,
format!("Segments not found: {:?}", missing_segments),
));
}

if segment_names.len() <= 1 {
return Err(tonic::Status::new(
tonic::Code::InvalidArgument,
"Require at least 2 segments to compact",
));
}

// TODO- khoa165: Logic to compact segments here

let end = std::time::Instant::now();
let duration = end.duration_since(start);
info!("[{}] Compacted {} segments in {:?}", collection_name, segment_names.len(), duration);

Ok(tonic::Response::new(CompactSegmentsResponse {}))
}
None => Err(tonic::Status::new(
tonic::Code::NotFound,
"Collection not found",
)),
}
}
}
11 changes: 11 additions & 0 deletions rs/proto/proto/muopdb.proto
Original file line number Diff line number Diff line change
@@ -47,6 +47,8 @@ service IndexServer {
rpc Flush(FlushRequest) returns (FlushResponse) {}

rpc GetSegments(GetSegmentsRequest) returns (GetSegmentsResponse) {}

rpc CompactSegments(CompactSegmentsRequest) returns (CompactSegmentsResponse) {}
}

message GetSegmentsRequest {
@@ -57,6 +59,15 @@ message GetSegmentsResponse {
repeated string segment_names = 1;
}

message CompactSegmentsRequest {
string collection_name = 1;
repeated string segment_names = 2;
}

message CompactSegmentsResponse {
}


message CreateCollectionRequest {
string collection_name = 1;

62 changes: 62 additions & 0 deletions rs/proto/src/muopdb.rs
Original file line number Diff line number Diff line change
@@ -42,6 +42,17 @@ pub struct GetSegmentsResponse {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CompactSegmentsRequest {
#[prost(string, tag = "1")]
pub collection_name: ::prost::alloc::string::String,
#[prost(string, repeated, tag = "2")]
pub segment_names: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CompactSegmentsResponse {}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CreateCollectionRequest {
#[prost(string, tag = "1")]
pub collection_name: ::prost::alloc::string::String,
@@ -494,6 +505,20 @@ pub mod index_server_client {
let path = http::uri::PathAndQuery::from_static("/muopdb.IndexServer/GetSegments");
self.inner.unary(request.into_request(), path, codec).await
}
pub async fn compact_segments(
&mut self,
request: impl tonic::IntoRequest<super::CompactSegmentsRequest>,
) -> Result<tonic::Response<super::CompactSegmentsResponse>, tonic::Status> {
self.inner.ready().await.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static("/muopdb.IndexServer/CompactSegments");
self.inner.unary(request.into_request(), path, codec).await
}
}
}
/// Generated server implementations.
@@ -658,6 +683,10 @@ pub mod index_server_server {
&self,
request: tonic::Request<super::GetSegmentsRequest>,
) -> Result<tonic::Response<super::GetSegmentsResponse>, tonic::Status>;
async fn compact_segments(
&self,
request: tonic::Request<super::CompactSegmentsRequest>,
) -> Result<tonic::Response<super::CompactSegmentsResponse>, tonic::Status>;
}
#[derive(Debug)]
pub struct IndexServerServer<T: IndexServer> {
@@ -902,6 +931,39 @@ pub mod index_server_server {
};
Box::pin(fut)
}
"/muopdb.IndexServer/CompactSegments" => {
#[allow(non_camel_case_types)]
struct CompactSegmentsSvc<T: IndexServer>(pub Arc<T>);
impl<T: IndexServer> tonic::server::UnaryService<super::CompactSegmentsRequest>
for CompactSegmentsSvc<T>
{
type Response = super::CompactSegmentsResponse;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
fn call(
&mut self,
request: tonic::Request<super::CompactSegmentsRequest>,
) -> Self::Future {
let inner = self.0.clone();
let fut = async move { (*inner).compact_segments(request).await };
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
let method = CompactSegmentsSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
_ => Box::pin(async move {
Ok(http::Response::builder()
.status(200)