Skip to content

Commit 143127a

Browse files
committed
fix(epoxy): use vbare
1 parent 896a1bb commit 143127a

File tree

22 files changed

+152
-83
lines changed

22 files changed

+152
-83
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/packages/epoxy/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ tracing.workspace = true
3535
universaldb.workspace = true
3636
url.workspace = true
3737
uuid.workspace = true
38+
vbare.workspace = true
3839

3940
[dev-dependencies]
4041
gas.workspace = true

engine/packages/epoxy/src/keys/keys.rs

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,9 @@
11
use anyhow::Result;
2-
use epoxy_protocol::protocol;
2+
use epoxy_protocol::{PROTOCOL_VERSION, protocol, versioned};
33
use serde::{Deserialize, Serialize};
44
use universaldb::prelude::*;
55
use universaldb::tuple::Versionstamp;
6-
7-
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
8-
pub struct CommittedValue {
9-
// NOTE: An empty value may exist for cached entries to denote the value was not found on any datacenter
10-
// and cached as such.
11-
pub value: Vec<u8>,
12-
pub version: u64,
13-
pub mutable: bool,
14-
}
6+
use vbare::OwnedVersionedData;
157

168
/// In-flight accepted proposal state stored under `kv/{key}/accepted`.
179
///
@@ -44,14 +36,15 @@ impl KvValueKey {
4436
}
4537

4638
impl FormalKey for KvValueKey {
47-
type Value = CommittedValue;
39+
type Value = protocol::CommittedValue;
4840

4941
fn deserialize(&self, raw: &[u8]) -> Result<Self::Value> {
50-
serde_bare::from_slice(raw).map_err(Into::into)
42+
versioned::CommittedValue::deserialize_with_embedded_version(raw)
5143
}
5244

5345
fn serialize(&self, value: Self::Value) -> Result<Vec<u8>> {
54-
serde_bare::to_vec(&value).map_err(Into::into)
46+
versioned::CommittedValue::wrap_latest(value)
47+
.serialize_with_embedded_version(PROTOCOL_VERSION)
5548
}
5649
}
5750

@@ -260,14 +253,14 @@ impl KvOptimisticCacheKey {
260253
}
261254

262255
impl FormalKey for KvOptimisticCacheKey {
263-
type Value = CommittedValue;
256+
type Value = protocol::CachedValue;
264257

265258
fn deserialize(&self, raw: &[u8]) -> Result<Self::Value> {
266-
serde_bare::from_slice(raw).map_err(Into::into)
259+
versioned::CachedValue::deserialize_with_embedded_version(raw)
267260
}
268261

269262
fn serialize(&self, value: Self::Value) -> Result<Vec<u8>> {
270-
serde_bare::to_vec(&value).map_err(Into::into)
263+
versioned::CachedValue::wrap_latest(value).serialize_with_embedded_version(PROTOCOL_VERSION)
271264
}
272265
}
273266

engine/packages/epoxy/src/keys/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ pub mod keys;
55
pub mod replica;
66

77
pub use self::keys::{
8-
ChangelogKey, CommittedValue, KvAcceptedKey, KvAcceptedValue, KvBallotKey,
9-
KvOptimisticCacheKey, KvValueKey, LegacyCommittedValueKey,
8+
ChangelogKey, KvAcceptedKey, KvAcceptedValue, KvBallotKey, KvOptimisticCacheKey, KvValueKey,
9+
LegacyCommittedValueKey,
1010
};
1111
pub use self::replica::ConfigKey;
1212

engine/packages/epoxy/src/ops/kv/get_local.rs

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
use anyhow::Result;
2-
use epoxy_protocol::protocol::ReplicaId;
2+
use epoxy_protocol::protocol::{CachedValue, CommittedValue, ReplicaId};
33
use gas::prelude::*;
44
use universaldb::utils::{FormalKey, IsolationLevel::Serializable};
55

6-
use crate::keys::{
7-
self, CommittedValue, KvOptimisticCacheKey, KvValueKey, LegacyCommittedValueKey,
8-
};
6+
use crate::keys::{self, KvOptimisticCacheKey, KvValueKey, LegacyCommittedValueKey};
97

108
#[derive(Debug)]
119
pub struct Input {
@@ -26,7 +24,7 @@ pub async fn epoxy_kv_get_local(
2624
#[derive(Debug)]
2725
pub(crate) struct LocalValueRead {
2826
pub value: Option<CommittedValue>,
29-
pub cache_value: Option<CommittedValue>,
27+
pub cache_value: Option<CachedValue>,
3028
}
3129

3230
/// Reads a committed value from the local replica with dual-read fallback.
@@ -95,18 +93,12 @@ pub(crate) async fn read_local_value(
9593
if let Some(value) = cache_value {
9694
let cache_value = cache_key.deserialize(&value)?;
9795

98-
// Special case with empty values. These are inserted in kv_get_optimistic with `save_empty`
99-
if cache_value.value.is_empty() {
96+
if let Some(value) = cache_value.value {
10097
return Ok(LocalValueRead {
10198
value: None,
102-
cache_value: None,
99+
cache_value: Some(cache_key.deserialize(&value)?),
103100
});
104101
}
105-
106-
return Ok(LocalValueRead {
107-
value: None,
108-
cache_value: Some(cache_key.deserialize(&value)?),
109-
});
110102
}
111103

112104
Ok(LocalValueRead {

engine/packages/epoxy/src/ops/kv/get_optimistic.rs

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,10 @@
11
use anyhow::*;
2-
use epoxy_protocol::protocol::{self, ReplicaId};
2+
use epoxy_protocol::protocol::{self, CachedValue, CommittedValue, ReplicaId};
33
use gas::prelude::*;
44
use rivet_api_builder::ApiCtx;
55
use universaldb::prelude::*;
66

7-
use crate::{
8-
http_client,
9-
keys::{self, CommittedValue},
10-
utils,
11-
};
7+
use crate::{http_client, keys, utils};
128

139
use super::get_local::read_local_value;
1410

@@ -68,10 +64,12 @@ pub async fn epoxy_kv_get_optimistic(ctx: &OperationCtx, input: &Input) -> Resul
6864
input.caching_behavior == protocol::CachingBehavior::Optimistic,
6965
)
7066
.await?;
71-
if let Some(value) = local_read.value.or(local_read.cache_value) {
72-
return Ok(Output {
73-
value: Some(value.value),
74-
});
67+
if let Some(value) = local_read
68+
.value
69+
.map(|v| v.value)
70+
.or(local_read.cache_value.and_then(|v| v.value))
71+
{
72+
return Ok(Output { value: Some(value) });
7573
}
7674

7775
// Request fanout to other datacenters, return first datacenter with any non-none value
@@ -169,7 +167,13 @@ async fn cache_fanout_value(
169167
}
170168
}
171169

172-
tx.write(&cache_key, value_to_cache.clone())?;
170+
tx.write(
171+
&cache_key,
172+
CachedValue {
173+
value: Some(value_to_cache.value.clone()),
174+
version: value_to_cache.version,
175+
},
176+
)?;
173177
Ok(())
174178
}
175179
})
@@ -189,11 +193,9 @@ async fn cache_empty_value(ctx: &OperationCtx, replica_id: ReplicaId, key: &[u8]
189193

190194
tx.write(
191195
&cache_key,
192-
CommittedValue {
193-
value: Vec::new(),
196+
CachedValue {
197+
value: None,
194198
version: 0,
195-
// TODO: What should this be set to?
196-
mutable: true,
197199
},
198200
)?;
199201
Ok(())

engine/packages/epoxy/src/ops/propose.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use anyhow::{Context, Result, bail};
2-
use epoxy_protocol::protocol::{self, ReplicaId};
2+
use epoxy_protocol::protocol::{self, CommittedValue, ReplicaId};
33
use futures_util::{StreamExt, stream::FuturesUnordered};
44
use gas::prelude::*;
55
use rand::Rng;
@@ -8,9 +8,7 @@ use serde::{Deserialize, Serialize};
88
use std::time::{Duration, Instant};
99

1010
use crate::{
11-
http_client,
12-
keys::CommittedValue,
13-
metrics,
11+
http_client, metrics,
1412
replica::{
1513
ballot::{self, Ballot, BallotSelection},
1614
commit_kv::{self, CommitKvOutcome},

engine/packages/epoxy/src/replica/ballot.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
use anyhow::{Context, Result};
2-
use epoxy_protocol::protocol;
2+
use epoxy_protocol::protocol::{self, CommittedValue};
33
use std::cmp::Ordering;
44
use universaldb::Transaction;
55
use universaldb::utils::{FormalKey, IsolationLevel::Serializable};
66

7-
use crate::keys::{self, CommittedValue, KvBallotKey, KvValueKey, LegacyCommittedValueKey};
7+
use crate::keys::{self, KvBallotKey, KvValueKey, LegacyCommittedValueKey};
88

99
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1010
pub struct Ballot {

engine/packages/epoxy/src/replica/changelog.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use anyhow::{Context, Result, bail};
2-
use epoxy_protocol::protocol;
2+
use epoxy_protocol::protocol::{self, CommittedValue};
33
use futures_util::TryStreamExt;
44
use universaldb::{
55
KeySelector, RangeOption, Transaction,
@@ -10,8 +10,7 @@ use universaldb::{
1010
};
1111

1212
use crate::keys::{
13-
self, ChangelogKey, CommittedValue, KvAcceptedKey, KvBallotKey, KvOptimisticCacheKey,
14-
KvValueKey,
13+
self, ChangelogKey, KvAcceptedKey, KvBallotKey, KvOptimisticCacheKey, KvValueKey,
1514
};
1615
use crate::metrics;
1716

engine/packages/epoxy/src/replica/commit_kv.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use anyhow::Result;
2-
use epoxy_protocol::protocol;
2+
use epoxy_protocol::protocol::{self, CommittedValue};
33
use universaldb::{Transaction, utils::IsolationLevel::Serializable};
44

55
use crate::{
6-
keys::{self, CommittedValue, KvAcceptedKey, KvBallotKey, KvOptimisticCacheKey, KvValueKey},
6+
keys::{self, KvAcceptedKey, KvBallotKey, KvOptimisticCacheKey, KvValueKey},
77
replica::ballot::Ballot,
88
};
99

0 commit comments

Comments
 (0)