Skip to content

Commit 22f31f5

Browse files
authored
chore: paginated query region stats (GreptimeTeam#4942)
1 parent 5d20acc commit 22f31f5

File tree

4 files changed

+76
-10
lines changed

4 files changed

+76
-10
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/meta-client/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ common-grpc.workspace = true
1515
common-macro.workspace = true
1616
common-meta.workspace = true
1717
common-telemetry.workspace = true
18+
futures.workspace = true
19+
futures-util.workspace = true
1820
humantime-serde.workspace = true
1921
rand.workspace = true
2022
serde.workspace = true

src/meta-client/src/client.rs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ mod cluster;
2121
mod store;
2222
mod util;
2323

24+
use std::sync::Arc;
25+
2426
use api::v1::meta::{ProcedureDetailResponse, Role};
2527
use cluster::Client as ClusterClient;
2628
use common_error::ext::BoxedError;
@@ -30,7 +32,8 @@ use common_meta::cluster::{
3032
};
3133
use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, RegionStat};
3234
use common_meta::ddl::{ExecutorContext, ProcedureExecutor};
33-
use common_meta::error::{self as meta_error, Result as MetaResult};
35+
use common_meta::error::{self as meta_error, ExternalSnafu, Result as MetaResult};
36+
use common_meta::range_stream::PaginationStream;
3437
use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
3538
use common_meta::rpc::procedure::{
3639
MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
@@ -40,8 +43,10 @@ use common_meta::rpc::store::{
4043
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
4144
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
4245
};
46+
use common_meta::rpc::KeyValue;
4347
use common_meta::ClusterId;
4448
use common_telemetry::info;
49+
use futures::TryStreamExt;
4550
use heartbeat::Client as HeartbeatClient;
4651
use procedure::Client as ProcedureClient;
4752
use snafu::{OptionExt, ResultExt};
@@ -314,16 +319,15 @@ impl ClusterInfo for MetaClient {
314319
}
315320

316321
async fn list_region_stats(&self) -> Result<Vec<RegionStat>> {
317-
let cluster_client = self.cluster_client()?;
322+
let cluster_kv_backend = Arc::new(self.cluster_client()?);
318323
let range_prefix = DatanodeStatKey::key_prefix_with_cluster_id(self.id.0);
319324
let req = RangeRequest::new().with_prefix(range_prefix);
320-
let mut datanode_stats = cluster_client
321-
.range(req)
322-
.await?
323-
.kvs
324-
.into_iter()
325-
.map(|kv| DatanodeStatValue::try_from(kv.value).context(ConvertMetaRequestSnafu))
326-
.collect::<Result<Vec<_>>>()?;
325+
let stream = PaginationStream::new(cluster_kv_backend, req, 256, Arc::new(decode_stats))
326+
.into_stream();
327+
let mut datanode_stats = stream
328+
.try_collect::<Vec<_>>()
329+
.await
330+
.context(ConvertMetaResponseSnafu)?;
327331
let region_stats = datanode_stats
328332
.iter_mut()
329333
.flat_map(|datanode_stat| {
@@ -336,6 +340,12 @@ impl ClusterInfo for MetaClient {
336340
}
337341
}
338342

343+
fn decode_stats(kv: KeyValue) -> MetaResult<DatanodeStatValue> {
344+
DatanodeStatValue::try_from(kv.value)
345+
.map_err(BoxedError::new)
346+
.context(ExternalSnafu)
347+
}
348+
339349
impl MetaClient {
340350
pub fn new(id: Id) -> Self {
341351
Self {

src/meta-client/src/client/cluster.rs

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,22 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::any::Any;
1516
use std::future::Future;
1617
use std::sync::Arc;
1718

1819
use api::greptime_proto::v1;
1920
use api::v1::meta::cluster_client::ClusterClient;
2021
use api::v1::meta::{MetasrvNodeInfo, MetasrvPeersRequest, ResponseHeader, Role};
22+
use common_error::ext::BoxedError;
2123
use common_grpc::channel_manager::ChannelManager;
22-
use common_meta::rpc::store::{BatchGetRequest, BatchGetResponse, RangeRequest, RangeResponse};
24+
use common_meta::error::{Error as MetaError, ExternalSnafu, Result as MetaResult};
25+
use common_meta::kv_backend::{KvBackend, TxnService};
26+
use common_meta::rpc::store::{
27+
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
28+
BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse,
29+
RangeRequest, RangeResponse,
30+
};
2331
use common_telemetry::{info, warn};
2432
use snafu::{ensure, ResultExt};
2533
use tokio::sync::RwLock;
@@ -79,6 +87,51 @@ impl Client {
7987
}
8088
}
8189

90+
impl TxnService for Client {
91+
type Error = MetaError;
92+
}
93+
94+
#[async_trait::async_trait]
95+
impl KvBackend for Client {
96+
fn name(&self) -> &str {
97+
"ClusterClientKvBackend"
98+
}
99+
100+
fn as_any(&self) -> &dyn Any {
101+
self
102+
}
103+
104+
async fn range(&self, req: RangeRequest) -> MetaResult<RangeResponse> {
105+
self.range(req)
106+
.await
107+
.map_err(BoxedError::new)
108+
.context(ExternalSnafu)
109+
}
110+
111+
async fn put(&self, _: PutRequest) -> MetaResult<PutResponse> {
112+
unimplemented!("`put` is not supported in cluster client kv backend")
113+
}
114+
115+
async fn batch_put(&self, _: BatchPutRequest) -> MetaResult<BatchPutResponse> {
116+
unimplemented!("`batch_put` is not supported in cluster client kv backend")
117+
}
118+
119+
async fn batch_get(&self, req: BatchGetRequest) -> MetaResult<BatchGetResponse> {
120+
self.batch_get(req)
121+
.await
122+
.map_err(BoxedError::new)
123+
.context(ExternalSnafu)
124+
}
125+
126+
async fn delete_range(&self, _: DeleteRangeRequest) -> MetaResult<DeleteRangeResponse> {
127+
unimplemented!("`delete_range` is not supported in cluster client kv backend")
128+
}
129+
130+
async fn batch_delete(&self, _: BatchDeleteRequest) -> MetaResult<BatchDeleteResponse> {
131+
unimplemented!("`batch_delete` is not supported in cluster client kv backend")
132+
}
133+
}
134+
82135
#[derive(Debug)]
83136
struct Inner {
84137
id: Id,

0 commit comments

Comments
 (0)