Skip to content

Commit 46afbc7

Browse files
committed
wip: integration tests with tikv-service
Signed-off-by: iosmanthus <[email protected]>
1 parent 182a457 commit 46afbc7

File tree

5 files changed

+37
-8
lines changed

5 files changed

+37
-8
lines changed

src/raw/requests.rs

+7
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
22

33
use std::{any::Any, ops::Range, sync::Arc};
4+
use std::fmt::Formatter;
45

56
use async_trait::async_trait;
67
use futures::stream::BoxStream;
@@ -345,6 +346,12 @@ pub struct RawCoprocessorRequest {
345346
data_builder: RawCoprocessorRequestDataBuilder,
346347
}
347348

349+
impl std::fmt::Debug for RawCoprocessorRequest {
350+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
351+
Ok(())
352+
}
353+
}
354+
348355
#[async_trait]
349356
impl Request for RawCoprocessorRequest {
350357
async fn dispatch(&self, client: &TikvClient, options: CallOption) -> Result<Box<dyn Any>> {

src/request/codec.rs

+17-4
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,10 @@ pub trait RequestCodecExt: RequestCodec {
7474
keys.into_iter().map(|key| self.encode_key(key)).collect()
7575
}
7676

77+
fn encode_secondaries(&self, secondaries: Vec<Vec<u8>>) -> Vec<Vec<u8>> {
78+
self.encode_keys(secondaries)
79+
}
80+
7781
fn encode_pairs(&self, mut pairs: Vec<kvrpcpb::KvPair>) -> Vec<kvrpcpb::KvPair> {
7882
for pair in pairs.iter_mut() {
7983
*pair.mut_key() = self.encode_key(pair.take_key());
@@ -217,18 +221,18 @@ pub trait Mode: Clone + Copy + Sync + Send + 'static {
217221
const MAX_KEY: &'static [u8] = &[Self::PREFIX + 1, 0, 0, 0];
218222
}
219223

220-
#[derive(Clone, Copy)]
224+
#[derive(Default, Clone, Copy)]
221225
pub struct RawMode;
222226

223-
#[derive(Clone, Copy)]
227+
#[derive(Default, Clone, Copy)]
224228
pub struct TxnMode;
225229

226230
impl Mode for RawMode {
227231
const PREFIX: u8 = b'r';
228232
}
229233

230234
impl Mode for TxnMode {
231-
const PREFIX: u8 = b't';
235+
const PREFIX: u8 = b'x';
232236
}
233237

234238
#[derive(Clone)]
@@ -263,12 +267,21 @@ impl RequestCodec for ApiV1<TxnMode> {
263267

264268
impl TxnCodec for ApiV1<TxnMode> {}
265269

266-
#[derive(Clone, Copy, Default)]
270+
#[derive(Clone, Copy)]
267271
pub struct KeySpace<M: Mode> {
268272
id: KeySpaceId,
269273
_phantom: PhantomData<M>,
270274
}
271275

276+
impl<M: Mode> Default for KeySpace<M> {
277+
fn default() -> Self {
278+
KeySpace {
279+
id: KeySpaceId::default(),
280+
_phantom: PhantomData,
281+
}
282+
}
283+
}
284+
272285
impl<M: Mode> From<KeySpace<M>> for Prefix {
273286
fn from(s: KeySpace<M>) -> Self {
274287
[M::PREFIX, s.id[0], s.id[1], s.id[2]]

src/request/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ pub mod codec;
2828

2929
/// Abstracts any request sent to a TiKV server.
3030
#[async_trait]
31-
pub trait KvRequest<C>: Request + Sized + Clone + Sync + Send + 'static {
31+
pub trait KvRequest<C>: Request + Sized + Clone + Sync + Send + std::fmt::Debug + 'static {
3232
/// The expected response to the request.
33-
type Response: HasKeyErrors + HasLocks + HasRegionError + Clone + Send + 'static;
33+
type Response: HasKeyErrors + HasLocks + HasRegionError + Clone + Send + std::fmt::Debug +'static;
3434

3535
fn encode_request(self, _codec: &C) -> Self {
3636
self

src/request/plan.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ impl<C: RequestCodec, Req: KvRequest<C>> Plan for Dispatch<C, Req> {
5757
async fn execute(&self) -> Result<Self::Result> {
5858
let req = self.codec.encode_request(&self.request);
5959

60+
let time = std::time::Instant::now();
61+
62+
println!("{:?}: sending: {:?}", time ,req);
63+
6064
let stats = tikv_stats(self.request.label());
6165
let result = self
6266
.kv_client
@@ -71,7 +75,9 @@ impl<C: RequestCodec, Req: KvRequest<C>> Plan for Dispatch<C, Req> {
7175
.downcast()
7276
.expect("Downcast failed: request and response type mismatch");
7377

74-
self.codec.decode_response(req.as_ref(), resp)
78+
let resp = self.codec.decode_response(req.as_ref(), resp);
79+
println!("{:?}: recv(decode): {:?}",time, resp);
80+
resp
7581
})
7682
}
7783
}

src/transaction/requests.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,10 @@ pub fn new_pessimistic_prewrite_request(
233233
req
234234
}
235235

236-
impl_kv_request!(kvrpcpb::PrewriteRequest, mutations; kvrpcpb::PrewriteResponse, errors);
236+
impl_kv_request!(
237+
kvrpcpb::PrewriteRequest, mutations, primary_lock, secondaries;
238+
kvrpcpb::PrewriteResponse, errors
239+
);
237240

238241
impl Shardable for kvrpcpb::PrewriteRequest {
239242
type Shard = Vec<kvrpcpb::Mutation>;

0 commit comments

Comments
 (0)