Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
GRPC endpoint placeholder for compact segments
  • Loading branch information
khoa165 committed Jan 20, 2025
commit f0f4731560bea8e45e8d0892ad74b67c71a7d236
35 changes: 34 additions & 1 deletion rs/index_server/src/index_server.rs
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@ use proto::muopdb::index_server_server::IndexServer;
use proto::muopdb::{
CreateCollectionRequest, CreateCollectionResponse, FlushRequest, FlushResponse,
GetSegmentsRequest, GetSegmentsResponse, InsertPackedRequest, InsertPackedResponse,
InsertRequest, InsertResponse, SearchRequest, SearchResponse,
InsertRequest, InsertResponse, SearchRequest, SearchResponse, CompactSegmentsRequest, CompactSegmentsResponse,
};
use tokio::sync::Mutex;
use utils::mem::{lows_and_highs_to_u128s, transmute_u8_to_slice, u128s_to_lows_highs};
@@ -392,4 +392,37 @@ impl IndexServer for IndexServerImpl {
)),
}
}

async fn compact_segments(
&self,
request: tonic::Request<CompactSegmentsRequest>,
) -> Result<tonic::Response<CompactSegmentsResponse>, tonic::Status> {
let start = std::time::Instant::now();
let req = request.into_inner();
let collection_name = req.collection_name;
let _segment_names = req.segment_names;

let collection_opt = self
.collection_catalog
.lock()
.await
.get_collection(&collection_name)
.await;

match collection_opt {
Some(_collection) => {
// Validation that segments exist in the collection
// Logic to compact segments here
let end = std::time::Instant::now();
let duration = end.duration_since(start);
info!("[{}] Compacted segments in {:?}", collection_name, duration);

Ok(tonic::Response::new(CompactSegmentsResponse {}))
}
None => Err(tonic::Status::new(
tonic::Code::NotFound,
"Collection not found",
)),
}
}
}
11 changes: 11 additions & 0 deletions rs/proto/proto/muopdb.proto
Original file line number Diff line number Diff line change
@@ -47,6 +47,8 @@ service IndexServer {
rpc Flush(FlushRequest) returns (FlushResponse) {}

rpc GetSegments(GetSegmentsRequest) returns (GetSegmentsResponse) {}

rpc CompactSegments(CompactSegmentsRequest) returns (CompactSegmentsResponse) {}
}

message GetSegmentsRequest {
@@ -57,6 +59,15 @@ message GetSegmentsResponse {
repeated string segment_names = 1;
}

message CompactSegmentsRequest {
string collection_name = 1;
repeated string segment_names = 2;
}

message CompactSegmentsResponse {
}


message CreateCollectionRequest {
string collection_name = 1;

62 changes: 62 additions & 0 deletions rs/proto/src/muopdb.rs
Original file line number Diff line number Diff line change
@@ -42,6 +42,17 @@ pub struct GetSegmentsResponse {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CompactSegmentsRequest {
#[prost(string, tag = "1")]
pub collection_name: ::prost::alloc::string::String,
#[prost(string, repeated, tag = "2")]
pub segment_names: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CompactSegmentsResponse {}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CreateCollectionRequest {
#[prost(string, tag = "1")]
pub collection_name: ::prost::alloc::string::String,
@@ -494,6 +505,20 @@ pub mod index_server_client {
let path = http::uri::PathAndQuery::from_static("/muopdb.IndexServer/GetSegments");
self.inner.unary(request.into_request(), path, codec).await
}
pub async fn compact_segments(
&mut self,
request: impl tonic::IntoRequest<super::CompactSegmentsRequest>,
) -> Result<tonic::Response<super::CompactSegmentsResponse>, tonic::Status> {
self.inner.ready().await.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static("/muopdb.IndexServer/CompactSegments");
self.inner.unary(request.into_request(), path, codec).await
}
}
}
/// Generated server implementations.
@@ -658,6 +683,10 @@ pub mod index_server_server {
&self,
request: tonic::Request<super::GetSegmentsRequest>,
) -> Result<tonic::Response<super::GetSegmentsResponse>, tonic::Status>;
async fn compact_segments(
&self,
request: tonic::Request<super::CompactSegmentsRequest>,
) -> Result<tonic::Response<super::CompactSegmentsResponse>, tonic::Status>;
}
#[derive(Debug)]
pub struct IndexServerServer<T: IndexServer> {
@@ -902,6 +931,39 @@ pub mod index_server_server {
};
Box::pin(fut)
}
"/muopdb.IndexServer/CompactSegments" => {
#[allow(non_camel_case_types)]
struct CompactSegmentsSvc<T: IndexServer>(pub Arc<T>);
impl<T: IndexServer> tonic::server::UnaryService<super::CompactSegmentsRequest>
for CompactSegmentsSvc<T>
{
type Response = super::CompactSegmentsResponse;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
fn call(
&mut self,
request: tonic::Request<super::CompactSegmentsRequest>,
) -> Self::Future {
let inner = self.0.clone();
let fut = async move { (*inner).compact_segments(request).await };
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
let method = CompactSegmentsSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
_ => Box::pin(async move {
Ok(http::Response::builder()
.status(200)