Skip to content

Commit e68c174

Browse files
committed
fix(epoxy): add list operation
1 parent cd89bc9 commit e68c174

File tree

8 files changed

+492
-142
lines changed

8 files changed

+492
-142
lines changed
Lines changed: 96 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,110 @@
1-
use anyhow::*;
1+
use anyhow::Result;
22
use epoxy_protocol::protocol::ReplicaId;
33
use gas::prelude::*;
4+
use universaldb::utils::{FormalKey, IsolationLevel::Serializable};
45

5-
use super::read_value;
6+
use crate::keys::{
7+
self, CommittedValue, KvOptimisticCacheKey, KvValueKey, LegacyCommittedValueKey,
8+
};
69

710
#[derive(Debug)]
811
pub struct Input {
912
pub replica_id: ReplicaId,
1013
pub key: Vec<u8>,
1114
}
1215

16+
#[operation]
17+
pub async fn epoxy_kv_get_local(
18+
ctx: &OperationCtx,
19+
input: &Input,
20+
) -> Result<Option<CommittedValue>> {
21+
Ok(read_local_value(ctx, input.replica_id, &input.key, false)
22+
.await?
23+
.value)
24+
}
25+
1326
#[derive(Debug)]
14-
pub struct Output {
15-
pub value: Option<Vec<u8>>,
16-
pub version: Option<u64>,
17-
pub mutable: bool,
27+
pub(crate) struct LocalValueRead {
28+
pub value: Option<CommittedValue>,
29+
pub cache_value: Option<CommittedValue>,
1830
}
1931

20-
#[operation]
21-
pub async fn epoxy_kv_get_local(ctx: &OperationCtx, input: &Input) -> Result<Output> {
22-
let committed_value =
23-
read_value::read_local_value(ctx, input.replica_id, input.key.clone(), false)
24-
.await?
25-
.value;
26-
27-
Ok(Output {
28-
value: committed_value.as_ref().map(|value| value.value.clone()),
29-
version: committed_value.as_ref().map(|value| value.version),
30-
mutable: committed_value
31-
.as_ref()
32-
.map(|value| value.mutable)
33-
.unwrap_or(false),
34-
})
32+
/// Reads a committed value from the local replica with dual-read fallback.
33+
///
34+
/// This performs a cascading lookup across storage generations so that values written
35+
/// before the v2 migration remain readable without a full data migration:
36+
///
37+
/// 1. **V2 value** (`EPOXY_V2/replica/{id}/kv/{key}/value`). The current write path.
38+
/// 2. **Legacy committed value** (`EPOXY_V1/replica/{id}/kv/{key}/committed_value`). Written by
39+
/// the original EPaxos protocol. Deserialized as raw bytes with version 0 and mutable=false.
40+
/// 3. **Optimistic cache** (`EPOXY_V2/replica/{id}/kv/{key}/cache`). Only checked when
41+
/// `include_cache` is true. Contains values fetched from remote replicas for the optimistic
42+
/// read path.
43+
///
44+
/// The first path that returns a value wins. This lets the background backfill migrate data
45+
/// at its own pace without blocking reads.
46+
pub(crate) async fn read_local_value(
47+
ctx: &OperationCtx,
48+
replica_id: ReplicaId,
49+
key: &[u8],
50+
include_cache: bool,
51+
) -> Result<LocalValueRead> {
52+
ctx.udb()?
53+
.run(|tx| {
54+
async move {
55+
let value_key = KvValueKey::new(key.to_vec());
56+
let legacy_value_key = LegacyCommittedValueKey::new(key.to_vec());
57+
let cache_key = KvOptimisticCacheKey::new(key.to_vec());
58+
let packed_value_key = keys::subspace(replica_id).pack(&value_key);
59+
let packed_legacy_value_key =
60+
keys::legacy_subspace(replica_id).pack(&legacy_value_key);
61+
let packed_cache_key = keys::subspace(replica_id).pack(&cache_key);
62+
63+
let (local_value, legacy_value, cache_value) = tokio::try_join!(
64+
tx.get(&packed_value_key, Serializable),
65+
tx.get(&packed_legacy_value_key, Serializable),
66+
async {
67+
if include_cache {
68+
tx.get(&packed_cache_key, Serializable).await
69+
} else {
70+
Ok(None)
71+
}
72+
},
73+
)?;
74+
75+
// V2 committed value (current write path)
76+
if let Some(value) = local_value {
77+
return Ok(LocalValueRead {
78+
value: Some(value_key.deserialize(&value)?),
79+
cache_value: None,
80+
});
81+
}
82+
83+
// Legacy committed value (original EPaxos raw bytes)
84+
if let Some(value) = legacy_value {
85+
return Ok(LocalValueRead {
86+
value: Some(CommittedValue {
87+
value: legacy_value_key.deserialize(&value)?,
88+
version: 0,
89+
mutable: false,
90+
}),
91+
cache_value: None,
92+
});
93+
}
94+
95+
if let Some(value) = cache_value {
96+
return Ok(LocalValueRead {
97+
value: None,
98+
cache_value: Some(cache_key.deserialize(&value)?),
99+
});
100+
}
101+
102+
Ok(LocalValueRead {
103+
value: None,
104+
cache_value: None,
105+
})
106+
}
107+
})
108+
.custom_instrument(tracing::info_span!("read_local_value_tx"))
109+
.await
35110
}

engine/packages/epoxy/src/ops/kv/get_optimistic.rs

Lines changed: 17 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use crate::{
99
utils,
1010
};
1111

12-
use super::read_value;
12+
use super::get_local::read_local_value;
1313

1414
#[derive(Debug)]
1515
pub struct Input {
@@ -50,20 +50,14 @@ pub struct Output {
5050
/// best-effort lookup.
5151
#[operation]
5252
pub async fn epoxy_kv_get_optimistic(ctx: &OperationCtx, input: &Input) -> Result<Output> {
53-
let local_read = read_value::read_local_value(
53+
let local_read = read_local_value(
5454
ctx,
5555
input.replica_id,
56-
input.key.clone(),
56+
&input.key,
5757
input.caching_behavior == protocol::CachingBehavior::Optimistic,
5858
)
5959
.await?;
60-
if local_read.value.is_some() {
61-
return Ok(Output {
62-
value: local_read.value.map(|value| value.value),
63-
});
64-
}
65-
66-
if let Some(value) = local_read.cache_value {
60+
if let Some(value) = local_read.value.or(local_read.cache_value) {
6761
return Ok(Output {
6862
value: Some(value.value),
6963
});
@@ -114,22 +108,21 @@ pub async fn epoxy_kv_get_optimistic(ctx: &OperationCtx, input: &Input) -> Resul
114108
)
115109
.await?;
116110

117-
for response in responses {
118-
if let Some(value) = response {
119-
let value = CommittedValue {
120-
value: value.value,
121-
version: value.version,
122-
mutable: value.mutable,
123-
};
124-
125-
if input.caching_behavior == protocol::CachingBehavior::Optimistic {
126-
cache_fanout_value(ctx, input.replica_id, input.key.clone(), value.clone()).await?;
127-
}
111+
// Should only have 1 response
112+
if let Some(value) = responses.first().and_then(|r| r.response) {
113+
let value = CommittedValue {
114+
value: value.value,
115+
version: value.version,
116+
mutable: value.mutable,
117+
};
128118

129-
return Ok(Output {
130-
value: Some(value.value),
131-
});
119+
if input.caching_behavior == protocol::CachingBehavior::Optimistic {
120+
cache_fanout_value(ctx, input.replica_id, input.key.clone(), value.clone()).await?;
132121
}
122+
123+
return Ok(Output {
124+
value: Some(value.value),
125+
});
133126
}
134127

135128
// No value found in any datacenter
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
use anyhow::Result;
2+
use epoxy_protocol::protocol::ReplicaId;
3+
use futures_util::{StreamExt, TryStreamExt, future::Either};
4+
use gas::prelude::*;
5+
use std::collections::HashMap;
6+
use universaldb::prelude::*;
7+
8+
use crate::{
9+
keys::{self, CommittedValue, KvOptimisticCacheKey, KvValueKey},
10+
ops::kv::get_local::LocalValueRead,
11+
};
12+
13+
#[derive(Debug)]
14+
pub struct Input {
15+
pub replica_id: ReplicaId,
16+
pub begin_key: Vec<u8>,
17+
pub end_key: Vec<u8>,
18+
pub limit: Option<usize>,
19+
pub reverse: bool,
20+
}
21+
22+
#[derive(Debug)]
23+
pub struct Output {
24+
pub entries: Vec<Entry>,
25+
}
26+
27+
#[derive(Debug)]
28+
pub struct Entry {
29+
pub key: Vec<u8>,
30+
pub value: Vec<u8>,
31+
pub version: u64,
32+
pub mutable: bool,
33+
}
34+
35+
#[operation]
36+
pub async fn epoxy_kv_list_local(ctx: &OperationCtx, input: &Input) -> Result<Output> {
37+
let entries = list_local_values(
38+
ctx,
39+
input.replica_id,
40+
&input.begin_key,
41+
&input.end_key,
42+
input.limit,
43+
input.reverse,
44+
false,
45+
)
46+
.await?;
47+
48+
Ok(Output {
49+
entries: entries
50+
.into_iter()
51+
.filter_map(|e| {
52+
if let Some(value) = e.value {
53+
Some(Entry {
54+
key: e.key,
55+
value: value.value,
56+
version: value.version,
57+
mutable: value.mutable,
58+
})
59+
} else {
60+
None
61+
}
62+
})
63+
.collect(),
64+
})
65+
}
66+
67+
#[derive(Debug)]
68+
pub(crate) struct LocalEntryRead {
69+
pub key: Vec<u8>,
70+
pub value: Option<CommittedValue>,
71+
pub cache_value: Option<CommittedValue>,
72+
}
73+
74+
pub(crate) async fn list_local_values(
75+
ctx: &OperationCtx,
76+
replica_id: ReplicaId,
77+
begin_key: &[u8],
78+
end_key: &[u8],
79+
limit: Option<usize>,
80+
reverse: bool,
81+
include_cache: bool,
82+
) -> Result<Vec<LocalEntryRead>> {
83+
ctx.udb()?
84+
.run(|tx| async move {
85+
let value_begin_key = KvValueKey::new(begin_key.to_vec());
86+
let cache_begin_key = KvOptimisticCacheKey::new(end_key.to_vec());
87+
let packed_value_begin_key = keys::subspace(replica_id).pack(&value_begin_key);
88+
let packed_cache_begin_key = keys::subspace(replica_id).pack(&cache_begin_key);
89+
let value_end_key = KvValueKey::new(end_key.to_vec());
90+
let cache_end_key = KvOptimisticCacheKey::new(end_key.to_vec());
91+
let packed_value_end_key = keys::subspace(replica_id).pack(&value_end_key);
92+
let packed_cache_end_key = keys::subspace(replica_id).pack(&cache_end_key);
93+
94+
let mut value_stream = tx.get_ranges_keyvalues(
95+
RangeOption {
96+
mode: StreamingMode::WantAll,
97+
limit,
98+
reverse,
99+
..(packed_value_begin_key, packed_value_end_key).into()
100+
},
101+
Serializable,
102+
);
103+
104+
let mut entries = HashMap::new();
105+
106+
if include_cache {
107+
let cache_stream = tx.get_ranges_keyvalues(
108+
RangeOption {
109+
mode: StreamingMode::WantAll,
110+
..(packed_cache_begin_key, packed_cache_end_key).into()
111+
},
112+
Serializable,
113+
);
114+
// Combine streams
115+
let mut stream = futures_util::stream::select(
116+
value_stream.map(|x| x.map(Either::Left)),
117+
cache_stream.map(|x| x.map(Either::Right)),
118+
);
119+
120+
while let Some(entry) = stream.try_next().await? {
121+
match entry {
122+
Either::Left(entry) => {
123+
let value_key = KvValueKey::new(entry.key().to_vec());
124+
entries.insert(
125+
entry.key().to_vec(),
126+
LocalValueRead {
127+
value: Some(value_key.deserialize(entry.value())?),
128+
cache_value: None,
129+
},
130+
);
131+
}
132+
Either::Right(entry) => {
133+
if !entries.contains_key(entry.key()) {
134+
let value_key = KvValueKey::new(entry.key().to_vec());
135+
entries.insert(
136+
entry.key().to_vec(),
137+
LocalValueRead {
138+
value: None,
139+
cache_value: Some(value_key.deserialize(entry.value())?),
140+
},
141+
);
142+
}
143+
}
144+
}
145+
}
146+
} else {
147+
while let Some(entry) = value_stream.try_next().await? {
148+
let value_key = KvValueKey::new(entry.key().to_vec());
149+
entries.insert(
150+
entry.key().to_vec(),
151+
LocalValueRead {
152+
value: Some(value_key.deserialize(entry.value())?),
153+
cache_value: None,
154+
},
155+
);
156+
}
157+
}
158+
159+
Ok(entries
160+
.into_iter()
161+
.map(|(key, value)| LocalEntryRead {
162+
key,
163+
value: value.value,
164+
cache_value: value.cache_value,
165+
})
166+
.collect::<Vec<_>>())
167+
})
168+
.custom_instrument(tracing::info_span!("list_local_value_tx"))
169+
.await
170+
}

0 commit comments

Comments
 (0)