Skip to content

Commit 75eea1e

Browse files
committed
[storage] Draft for Envelope v2
# Summary - Separate header, keys, and other envelope related information from the actual payload of the envelope - Support different encoding for the payload (flexbuffers, or bilrost) - Delay decoding of payload until needed # TODO: - [ ] Decode v1 envelope into v2 - [ ] Use Envelope v2 in code
1 parent 7903fea commit 75eea1e

File tree

14 files changed

+994
-10
lines changed

14 files changed

+994
-10
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/encoding/src/bilrost_encodings/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
1313
mod arc_encodings;
1414
mod nonzero;
15+
mod phantom_data;
1516
mod range;
1617

1718
pub mod display_from_str;
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH.
2+
// All rights reserved.
3+
//
4+
// Use of this software is governed by the Business Source License
5+
// included in the LICENSE file.
6+
//
7+
// As of the Change Date specified in that file, in accordance with
8+
// the Business Source License, use of this software will be governed
9+
// by the Apache License, Version 2.0.
10+
11+
use std::marker::PhantomData;
12+
13+
use bilrost::{
14+
DecodeErrorKind,
15+
encoding::{EmptyState, ForOverwrite, Proxiable},
16+
};
17+
18+
use crate::bilrost_encodings::RestateEncoding;
19+
20+
struct PhantomDataTag;
21+
22+
impl<T> Proxiable<PhantomDataTag> for PhantomData<T> {
23+
type Proxy = ();
24+
25+
fn encode_proxy(&self) -> Self::Proxy {}
26+
27+
fn decode_proxy(&mut self, _: Self::Proxy) -> Result<(), DecodeErrorKind> {
28+
Ok(())
29+
}
30+
}
31+
32+
impl<T> ForOverwrite<RestateEncoding, PhantomData<T>> for () {
33+
fn for_overwrite() -> PhantomData<T> {
34+
PhantomData
35+
}
36+
}
37+
38+
impl<T> EmptyState<RestateEncoding, PhantomData<T>> for () {
39+
fn empty() -> PhantomData<T> {
40+
PhantomData
41+
}
42+
43+
fn is_empty(_: &PhantomData<T>) -> bool {
44+
true
45+
}
46+
47+
fn clear(_: &mut PhantomData<T>) {}
48+
}
49+
50+
bilrost::delegate_proxied_encoding!(
51+
use encoding (::bilrost::encoding::General)
52+
to encode proxied type (PhantomData<T>)
53+
using proxy tag (PhantomDataTag)
54+
with encoding (RestateEncoding)
55+
with generics (T)
56+
);

crates/invoker-api/src/effects.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@ use restate_types::invocation::InvocationEpoch;
1616
use restate_types::journal::EntryIndex;
1717
use restate_types::journal::enriched::EnrichedRawEntry;
1818
use restate_types::journal_events::raw::RawEvent;
19-
use restate_types::journal_v2;
2019
use restate_types::journal_v2::CommandIndex;
2120
use restate_types::journal_v2::raw::RawEntry;
2221
use restate_types::storage::{StoredRawEntry, StoredRawEntryHeader};
2322
use restate_types::time::MillisSinceEpoch;
23+
use restate_types::{flexbuffers_storage_encode_decode, journal_v2};
2424
use std::collections::HashSet;
2525

2626
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -35,6 +35,8 @@ pub struct Effect {
3535
pub kind: EffectKind,
3636
}
3737

38+
flexbuffers_storage_encode_decode!(Effect);
39+
3840
#[derive(Debug, Clone, PartialEq, Eq)]
3941
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
4042
// todo: fix this and box the large variant (EffectKind is 320 bytes)

crates/types/src/identifiers.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -349,15 +349,26 @@ impl From<InvocationUuid> for opentelemetry::trace::SpanId {
349349
/// Services are isolated by key. This means that there cannot be two concurrent
350350
/// invocations for the same service instance (service name, key).
351351
#[derive(
352-
Eq, Hash, PartialEq, PartialOrd, Ord, Clone, Debug, serde::Serialize, serde::Deserialize,
352+
Eq,
353+
Hash,
354+
PartialEq,
355+
PartialOrd,
356+
Ord,
357+
Clone,
358+
Debug,
359+
serde::Serialize,
360+
serde::Deserialize,
361+
bilrost::Message,
353362
)]
354363
pub struct ServiceId {
355364
// TODO rename this to KeyedServiceId. This type can be used only by keyed service types (virtual objects and workflows)
356365
/// Identifies the grpc service
366+
#[bilrost(1)]
357367
pub service_name: ByteString,
358368
/// Identifies the service instance for the given service name
369+
#[bilrost(2)]
359370
pub key: ByteString,
360-
371+
#[bilrost(3)]
361372
partition_key: PartitionKey,
362373
}
363374

crates/types/src/invocation/mod.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@ use crate::identifiers::{
1919
};
2020
use crate::journal_v2::{CompletionId, GetInvocationOutputResult, Signal};
2121
use crate::time::MillisSinceEpoch;
22-
use crate::{GenerationalNodeId, RestateVersion};
22+
use crate::{
23+
GenerationalNodeId, RestateVersion, bilrost_storage_encode_decode,
24+
flexbuffers_storage_encode_decode,
25+
};
2326

2427
use bytes::Bytes;
2528
use bytestring::ByteString;
@@ -435,6 +438,8 @@ pub struct ServiceInvocation {
435438
pub restate_version: RestateVersion,
436439
}
437440

441+
flexbuffers_storage_encode_decode!(ServiceInvocation);
442+
438443
#[derive(Debug, Copy, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
439444
#[serde(
440445
from = "serde_hacks::SubmitNotificationSink",
@@ -577,6 +582,8 @@ pub struct InvocationResponse {
577582
pub result: ResponseResult,
578583
}
579584

585+
flexbuffers_storage_encode_decode!(InvocationResponse);
586+
580587
impl WithInvocationId for InvocationResponse {
581588
fn invocation_id(&self) -> InvocationId {
582589
self.target.invocation_id()
@@ -623,6 +630,8 @@ pub struct GetInvocationOutputResponse {
623630
pub result: GetInvocationOutputResult,
624631
}
625632

633+
bilrost_storage_encode_decode!(GetInvocationOutputResponse);
634+
626635
impl WithInvocationId for GetInvocationOutputResponse {
627636
fn invocation_id(&self) -> InvocationId {
628637
self.target.invocation_id()
@@ -944,6 +953,8 @@ pub struct InvocationTermination {
944953
pub response_sink: Option<InvocationMutationResponseSink>,
945954
}
946955

956+
flexbuffers_storage_encode_decode!(InvocationTermination);
957+
947958
/// Flavor of the termination. Can be kill (hard stop) or graceful cancel.
948959
#[derive(
949960
Debug, Clone, Copy, Eq, PartialEq, serde::Serialize, serde::Deserialize, bilrost::Enumeration,
@@ -963,6 +974,8 @@ pub struct PurgeInvocationRequest {
963974
pub response_sink: Option<InvocationMutationResponseSink>,
964975
}
965976

977+
flexbuffers_storage_encode_decode!(PurgeInvocationRequest);
978+
966979
impl WithInvocationId for PurgeInvocationRequest {
967980
fn invocation_id(&self) -> InvocationId {
968981
self.invocation_id
@@ -979,6 +992,8 @@ pub struct ResumeInvocationRequest {
979992
pub response_sink: Option<InvocationMutationResponseSink>,
980993
}
981994

995+
flexbuffers_storage_encode_decode!(ResumeInvocationRequest);
996+
982997
impl WithInvocationId for ResumeInvocationRequest {
983998
fn invocation_id(&self) -> InvocationId {
984999
self.invocation_id
@@ -1001,6 +1016,8 @@ pub struct RestartAsNewInvocationRequest {
10011016
pub response_sink: Option<InvocationMutationResponseSink>,
10021017
}
10031018

1019+
flexbuffers_storage_encode_decode!(RestartAsNewInvocationRequest);
1020+
10041021
impl WithInvocationId for RestartAsNewInvocationRequest {
10051022
fn invocation_id(&self) -> InvocationId {
10061023
self.invocation_id
@@ -1320,6 +1337,8 @@ pub struct AttachInvocationRequest {
13201337
pub response_sink: ServiceInvocationResponseSink,
13211338
}
13221339

1340+
flexbuffers_storage_encode_decode!(AttachInvocationRequest);
1341+
13231342
impl WithPartitionKey for AttachInvocationRequest {
13241343
fn partition_key(&self) -> PartitionKey {
13251344
self.invocation_query.partition_key()
@@ -1333,6 +1352,8 @@ pub struct NotifySignalRequest {
13331352
pub signal: Signal,
13341353
}
13351354

1355+
flexbuffers_storage_encode_decode!(NotifySignalRequest);
1356+
13361357
impl WithInvocationId for NotifySignalRequest {
13371358
fn invocation_id(&self) -> InvocationId {
13381359
self.invocation_id

crates/types/src/message.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
//! This module defines types used for the internal messaging between Restate components.
1212
13-
use crate::identifiers::PartitionId;
13+
use crate::{bilrost_storage_encode_decode, identifiers::PartitionId};
1414

1515
/// Wrapper that extends a message with its target peer to which the message should be sent.
1616
pub type PartitionTarget<Msg> = (PartitionId, Msg);
@@ -29,3 +29,11 @@ pub enum AckKind {
2929
last_known_seq_number: MessageIndex,
3030
},
3131
}
32+
33+
#[derive(Debug, Clone, Copy, bilrost::Message)]
34+
pub struct MessageIndexRecrod {
35+
#[bilrost(1)]
36+
pub index: MessageIndex,
37+
}
38+
39+
bilrost_storage_encode_decode!(MessageIndexRecrod);

crates/types/src/state_mut.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,27 @@ use bytes::Bytes;
1616
use serde_with::serde_as;
1717
use sha2::{Digest, Sha256};
1818

19+
use crate::bilrost_storage_encode_decode;
1920
use crate::identifiers::ServiceId;
2021

2122
#[serde_as]
2223
/// ExternalStateMutation
2324
///
2425
/// represents an external request to mutate a user's state.
25-
#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)]
26+
#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize, bilrost::Message)]
2627
pub struct ExternalStateMutation {
28+
#[bilrost(1)]
2729
pub service_id: ServiceId,
30+
#[bilrost(2)]
2831
pub version: Option<String>,
2932
// flexbuffers only supports string-keyed maps :-( --> so we store it as vector of kv pairs
33+
#[bilrost(3)]
3034
#[serde_as(as = "serde_with::Seq<(_, _)>")]
3135
pub state: HashMap<Bytes, Bytes>,
3236
}
3337

38+
bilrost_storage_encode_decode!(ExternalStateMutation);
39+
3440
/// # StateMutationVersion
3541
///
3642
/// This type represents a user state version. This implementation hashes canonically the raw key-value

crates/types/src/storage.rs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ use crate::journal_v2::raw::{RawEntry, RawEntryError, TryFromEntry};
2727
use crate::journal_v2::{Decoder, EntryMetadata, EntryType};
2828
use crate::time::MillisSinceEpoch;
2929

30+
pub use tracing;
31+
3032
#[derive(Debug, thiserror::Error)]
3133
pub enum StorageEncodeError {
3234
#[error("encoding failed: {0}")]
@@ -187,7 +189,7 @@ macro_rules! flexbuffers_storage_encode_decode {
187189
Self: Sized,
188190
{
189191
$crate::storage::decode::decode_serde(buf, kind).map_err(|err| {
190-
::tracing::error!(%err, "{} decode failure (decoding {})", kind, stringify!($name));
192+
$crate::storage::tracing::error!(%err, "{} decode failure (decoding {})", kind, stringify!($name));
191193
err
192194
})
193195

@@ -196,6 +198,40 @@ macro_rules! flexbuffers_storage_encode_decode {
196198
};
197199
}
198200

201+
/// Implements the [`StorageEncode`] and [`StorageDecode`] by encoding/decoding the implementing
202+
/// type using [`bilrost`].
203+
#[macro_export]
204+
macro_rules! bilrost_storage_encode_decode {
205+
($name:tt) => {
206+
impl $crate::storage::StorageEncode for $name {
207+
fn default_codec(&self) -> $crate::storage::StorageCodecKind {
208+
$crate::storage::StorageCodecKind::Bilrost
209+
}
210+
211+
fn encode(
212+
&self,
213+
buf: &mut ::bytes::BytesMut,
214+
) -> Result<(), $crate::storage::StorageEncodeError> {
215+
bytes::BufMut::put(buf, $crate::storage::encode::encode_bilrost(self));
216+
Ok(())
217+
}
218+
}
219+
220+
impl $crate::storage::StorageDecode for $name {
221+
fn decode<B: ::bytes::Buf>(
222+
buf: &mut B,
223+
kind: $crate::storage::StorageCodecKind,
224+
) -> Result<Self, $crate::storage::StorageDecodeError>
225+
where
226+
Self: Sized,
227+
{
228+
debug_assert_eq!(kind, $crate::storage::StorageCodecKind::Bilrost);
229+
$crate::storage::decode::decode_bilrost(buf)
230+
}
231+
}
232+
};
233+
}
234+
199235
/// A polymorphic container of a buffer or a cached storage-encodeable object
200236
#[derive(Clone, derive_more::Debug, BilrostAs)]
201237
#[bilrost_as(dto::PolyBytes)]

crates/wal-protocol/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ serde = ["dep:serde", "enum-map/serde", "bytestring/serde", "restate-invoker-api
1313

1414
[dependencies]
1515
restate-workspace-hack = { workspace = true }
16-
16+
restate-encoding = { workspace = true }
1717
restate-invoker-api = { workspace = true }
1818
restate-storage-api = { workspace = true }
1919
restate-types = { workspace = true }

0 commit comments

Comments
 (0)