Skip to content

Commit dd28398

Browse files
committed
introduce specialization for API v1
Signed-off-by: iosmanthus <[email protected]>
1 parent 503047f commit dd28398

15 files changed

+430
-243
lines changed

Diff for: src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
//! # })}
9191
//! ```
9292
93+
#![feature(specialization)]
9394
#[macro_use]
9495
pub mod request;
9596
#[macro_use]

Diff for: src/pd/client.rs

+57-39
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,28 @@
11
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.
22

3-
use crate::{
4-
compat::stream_fn,
5-
kv::codec,
6-
pd::{retry::RetryClientTrait, RetryClient},
7-
region::{RegionId, RegionVerId, RegionWithLeader},
8-
region_cache::RegionCache,
9-
store::RegionStore,
10-
BoundRange, Config, Key, Result, SecurityManager, Timestamp,
11-
};
3+
use std::{collections::HashMap, sync::Arc, thread};
4+
use std::marker::PhantomData;
5+
126
use async_trait::async_trait;
137
use futures::{prelude::*, stream::BoxStream};
148
use grpcio::{EnvBuilder, Environment};
159
use slog::Logger;
16-
use std::{collections::HashMap, sync::Arc, thread};
10+
use tokio::sync::RwLock;
11+
1712
use tikv_client_pd::Cluster;
1813
use tikv_client_proto::{kvrpcpb, metapb};
1914
use tikv_client_store::{KvClient, KvConnect, TikvConnect};
20-
use tokio::sync::RwLock;
15+
16+
use crate::{
17+
BoundRange,
18+
compat::stream_fn,
19+
Config,
20+
Key,
21+
kv::codec,
22+
pd::{retry::RetryClientTrait, RetryClient},
23+
region::{RegionId, RegionVerId, RegionWithLeader}, region_cache::RegionCache, Result, SecurityManager, store::RegionStore, Timestamp,
24+
};
25+
use crate::request::request_codec::RequestCodec;
2126

2227
const CQ_COUNT: usize = 1;
2328
const CLIENT_PREFIX: &str = "tikv-client";
@@ -42,6 +47,7 @@ const CLIENT_PREFIX: &str = "tikv-client";
4247
#[async_trait]
4348
pub trait PdClient: Send + Sync + 'static {
4449
type KvClient: KvClient + Send + Sync + 'static;
50+
type RequestCodec: RequestCodec;
4551

4652
/// In transactional API, `region` is decoded (keys in raw format).
4753
async fn map_region_to_store(self: Arc<Self>, region: RegionWithLeader) -> Result<RegionStore>;
@@ -69,11 +75,11 @@ pub trait PdClient: Send + Sync + 'static {
6975

7076
fn group_keys_by_region<K, K2>(
7177
self: Arc<Self>,
72-
keys: impl Iterator<Item = K> + Send + Sync + 'static,
78+
keys: impl Iterator<Item=K> + Send + Sync + 'static,
7379
) -> BoxStream<'static, Result<(RegionId, Vec<K2>)>>
74-
where
75-
K: AsRef<Key> + Into<K2> + Send + Sync + 'static,
76-
K2: Send + Sync + 'static,
80+
where
81+
K: AsRef<Key> + Into<K2> + Send + Sync + 'static,
82+
K2: Send + Sync + 'static,
7783
{
7884
let keys = keys.peekable();
7985
stream_fn(keys, move |mut keys| {
@@ -95,7 +101,7 @@ pub trait PdClient: Send + Sync + 'static {
95101
}
96102
}
97103
})
98-
.boxed()
104+
.boxed()
99105
}
100106

101107
/// Returns a Stream which iterates over the contexts for each region covered by range.
@@ -126,7 +132,7 @@ pub trait PdClient: Send + Sync + 'static {
126132
Ok(Some((Some(region_end), store)))
127133
}
128134
})
129-
.boxed()
135+
.boxed()
130136
}
131137

132138
/// Returns a Stream which iterates over the contexts for ranges in the same region.
@@ -190,7 +196,7 @@ pub trait PdClient: Send + Sync + 'static {
190196
}
191197
}
192198
})
193-
.boxed()
199+
.boxed()
194200
}
195201

196202
fn decode_region(mut region: RegionWithLeader, enable_codec: bool) -> Result<RegionWithLeader> {
@@ -204,22 +210,27 @@ pub trait PdClient: Send + Sync + 'static {
204210
async fn update_leader(&self, ver_id: RegionVerId, leader: metapb::Peer) -> Result<()>;
205211

206212
async fn invalidate_region_cache(&self, ver_id: RegionVerId);
213+
214+
fn get_request_codec(&self) -> Self::RequestCodec;
207215
}
208216

209217
/// This client converts requests for the logical TiKV cluster into requests
210218
/// for a single TiKV store using PD and internal logic.
211-
pub struct PdRpcClient<KvC: KvConnect + Send + Sync + 'static = TikvConnect, Cl = Cluster> {
219+
pub struct PdRpcClient<C, KvC: KvConnect + Send + Sync + 'static = TikvConnect, Cl = Cluster> {
212220
pd: Arc<RetryClient<Cl>>,
213221
kv_connect: KvC,
214222
kv_client_cache: Arc<RwLock<HashMap<String, KvC::KvClient>>>,
215223
enable_codec: bool,
216224
region_cache: RegionCache<RetryClient<Cl>>,
217225
logger: Logger,
226+
// TODO: change to a real codec.
227+
_phantom: PhantomData<C>,
218228
}
219229

220230
#[async_trait]
221-
impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
231+
impl<C: RequestCodec, KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<C, KvC> {
222232
type KvClient = KvC::KvClient;
233+
type RequestCodec = C;
223234

224235
async fn map_region_to_store(self: Arc<Self>, region: RegionWithLeader) -> Result<RegionStore> {
225236
let store_id = region.get_store_id()?;
@@ -260,15 +271,19 @@ impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
260271
async fn invalidate_region_cache(&self, ver_id: RegionVerId) {
261272
self.region_cache.invalidate_region_cache(ver_id).await
262273
}
274+
275+
fn get_request_codec(&self) -> Self::RequestCodec {
276+
todo!()
277+
}
263278
}
264279

265-
impl PdRpcClient<TikvConnect, Cluster> {
280+
impl<C> PdRpcClient<C, TikvConnect, Cluster> {
266281
pub async fn connect(
267282
pd_endpoints: &[String],
268283
config: Config,
269284
enable_codec: bool,
270285
logger: Logger,
271-
) -> Result<PdRpcClient> {
286+
) -> Result<PdRpcClient<C, TikvConnect, Cluster>> {
272287
PdRpcClient::new(
273288
config.clone(),
274289
|env, security_mgr| TikvConnect::new(env, security_mgr, config.timeout),
@@ -278,7 +293,7 @@ impl PdRpcClient<TikvConnect, Cluster> {
278293
enable_codec,
279294
logger,
280295
)
281-
.await
296+
.await
282297
}
283298
}
284299

@@ -291,18 +306,18 @@ fn thread_name(prefix: &str) -> String {
291306
.unwrap_or_else(|| prefix.to_owned())
292307
}
293308

294-
impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
309+
impl<C, KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<C, KvC, Cl> {
295310
pub async fn new<PdFut, MakeKvC, MakePd>(
296311
config: Config,
297312
kv_connect: MakeKvC,
298313
pd: MakePd,
299314
enable_codec: bool,
300315
logger: Logger,
301-
) -> Result<PdRpcClient<KvC, Cl>>
302-
where
303-
PdFut: Future<Output = Result<RetryClient<Cl>>>,
304-
MakeKvC: FnOnce(Arc<Environment>, Arc<SecurityManager>) -> KvC,
305-
MakePd: FnOnce(Arc<Environment>, Arc<SecurityManager>) -> PdFut,
316+
) -> Result<PdRpcClient<C, KvC, Cl>>
317+
where
318+
PdFut: Future<Output=Result<RetryClient<Cl>>>,
319+
MakeKvC: FnOnce(Arc<Environment>, Arc<SecurityManager>) -> KvC,
320+
MakePd: FnOnce(Arc<Environment>, Arc<SecurityManager>) -> PdFut,
306321
{
307322
let env = Arc::new(
308323
EnvBuilder::new()
@@ -312,7 +327,7 @@ impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
312327
);
313328
let security_mgr = Arc::new(
314329
if let (Some(ca_path), Some(cert_path), Some(key_path)) =
315-
(&config.ca_path, &config.cert_path, &config.key_path)
330+
(&config.ca_path, &config.cert_path, &config.key_path)
316331
{
317332
SecurityManager::load(ca_path, cert_path, key_path)?
318333
} else {
@@ -329,6 +344,8 @@ impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
329344
enable_codec,
330345
region_cache: RegionCache::new(pd),
331346
logger,
347+
// TODO
348+
_phantom: PhantomData,
332349
})
333350
}
334351

@@ -352,10 +369,11 @@ impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
352369

353370
#[cfg(test)]
354371
pub mod test {
355-
use super::*;
372+
use futures::{executor, executor::block_on};
373+
356374
use crate::mock::*;
357375

358-
use futures::{executor, executor::block_on};
376+
use super::*;
359377

360378
#[tokio::test]
361379
async fn test_kv_client_caching() {
@@ -396,7 +414,7 @@ pub mod test {
396414
vec![1].into(),
397415
vec![2].into(),
398416
vec![3].into(),
399-
vec![5, 2].into()
417+
vec![5, 2].into(),
400418
]
401419
);
402420
assert_eq!(
@@ -458,36 +476,36 @@ pub mod test {
458476
vec![
459477
kvrpcpb::KeyRange {
460478
start_key: k1.clone(),
461-
end_key: k2.clone()
479+
end_key: k2.clone(),
462480
},
463481
kvrpcpb::KeyRange {
464482
start_key: k1,
465-
end_key: k_split.clone()
466-
}
483+
end_key: k_split.clone(),
484+
},
467485
]
468486
);
469487
assert_eq!(ranges2.0, 2);
470488
assert_eq!(
471489
ranges2.1,
472490
vec![kvrpcpb::KeyRange {
473491
start_key: k_split.clone(),
474-
end_key: k3
492+
end_key: k3,
475493
}]
476494
);
477495
assert_eq!(ranges3.0, 1);
478496
assert_eq!(
479497
ranges3.1,
480498
vec![kvrpcpb::KeyRange {
481499
start_key: k2,
482-
end_key: k_split.clone()
500+
end_key: k_split.clone(),
483501
}]
484502
);
485503
assert_eq!(ranges4.0, 2);
486504
assert_eq!(
487505
ranges4.1,
488506
vec![kvrpcpb::KeyRange {
489507
start_key: k_split,
490-
end_key: k4
508+
end_key: k4,
491509
}]
492510
);
493511
assert!(stream.next().is_none());

Diff for: src/raw/client.rs

+5-3
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use crate::{
1515
request::{Collect, CollectSingle, Plan},
1616
Backoff, BoundRange, ColumnFamily, Key, KvPair, Result, Value,
1717
};
18+
use crate::request::request_codec::RequestCodec;
1819

1920
const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240;
2021

@@ -26,15 +27,16 @@ const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240;
2627
/// The returned results of raw request methods are [`Future`](std::future::Future)s that must be
2728
/// awaited to execute.
2829
#[derive(Clone)]
29-
pub struct Client<PdC: PdClient = PdRpcClient> {
30+
pub struct Client<C, PdC: PdClient = PdRpcClient<C>> {
3031
rpc: Arc<PdC>,
3132
cf: Option<ColumnFamily>,
3233
/// Whether to use the [`atomic mode`](Client::with_atomic_for_cas).
3334
atomic: bool,
3435
logger: Logger,
36+
_phantom: std::marker::PhantomData<C>,
3537
}
3638

37-
impl Client<PdRpcClient> {
39+
impl<C: RequestCodec> Client<C, PdRpcClient<C>> {
3840
/// Create a raw [`Client`] and connect to the TiKV cluster.
3941
///
4042
/// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for
@@ -158,7 +160,7 @@ impl Client<PdRpcClient> {
158160
}
159161
}
160162

161-
impl<PdC: PdClient> Client<PdC> {
163+
impl<C:RequestCodec, PdC: PdClient> Client<C, PdC> {
162164
/// Create a new 'get' request.
163165
///
164166
/// Once resolved this request will result in the fetching of the value associated with the

Diff for: src/raw/lowering.rs

+7-6
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,18 @@
55
//! generated protobuf code, then calls the low-level ctor functions in the requests module.
66
77
use std::{iter::Iterator, ops::Range, sync::Arc};
8+
use std::marker::PhantomData;
89

910
use tikv_client_proto::{kvrpcpb, metapb};
1011

11-
use crate::{raw::requests, BoundRange, ColumnFamily, Key, KvPair, Value};
12+
use crate::{BoundRange, ColumnFamily, Key, KvPair, raw::requests, Value};
1213

1314
pub fn new_raw_get_request(key: Key, cf: Option<ColumnFamily>) -> kvrpcpb::RawGetRequest {
1415
requests::new_raw_get_request(key.into(), cf)
1516
}
1617

1718
pub fn new_raw_batch_get_request(
18-
keys: impl Iterator<Item = Key>,
19+
keys: impl Iterator<Item=Key>,
1920
cf: Option<ColumnFamily>,
2021
) -> kvrpcpb::RawBatchGetRequest {
2122
requests::new_raw_batch_get_request(keys.map(Into::into).collect(), cf)
@@ -31,7 +32,7 @@ pub fn new_raw_put_request(
3132
}
3233

3334
pub fn new_raw_batch_put_request(
34-
pairs: impl Iterator<Item = KvPair>,
35+
pairs: impl Iterator<Item=KvPair>,
3536
cf: Option<ColumnFamily>,
3637
atomic: bool,
3738
) -> kvrpcpb::RawBatchPutRequest {
@@ -47,7 +48,7 @@ pub fn new_raw_delete_request(
4748
}
4849

4950
pub fn new_raw_batch_delete_request(
50-
keys: impl Iterator<Item = Key>,
51+
keys: impl Iterator<Item=Key>,
5152
cf: Option<ColumnFamily>,
5253
) -> kvrpcpb::RawBatchDeleteRequest {
5354
requests::new_raw_batch_delete_request(keys.map(Into::into).collect(), cf)
@@ -78,7 +79,7 @@ pub fn new_raw_scan_request(
7879
}
7980

8081
pub fn new_raw_batch_scan_request(
81-
ranges: impl Iterator<Item = BoundRange>,
82+
ranges: impl Iterator<Item=BoundRange>,
8283
each_limit: u32,
8384
key_only: bool,
8485
cf: Option<ColumnFamily>,
@@ -98,7 +99,7 @@ pub fn new_cas_request(
9899
pub fn new_raw_coprocessor_request(
99100
copr_name: String,
100101
copr_version_req: String,
101-
ranges: impl Iterator<Item = BoundRange>,
102+
ranges: impl Iterator<Item=BoundRange>,
102103
request_builder: impl Fn(metapb::Region, Vec<Range<Key>>) -> Vec<u8> + Send + Sync + 'static,
103104
) -> requests::RawCoprocessorRequest {
104105
requests::new_raw_coprocessor_request(

0 commit comments

Comments
 (0)