@@ -250,7 +250,9 @@ mod envelope {
250250
251251    use  restate_storage_api:: protobuf_types:: v1 as  protobuf; 
252252    use  restate_types:: storage:: decode:: { decode_bilrost,  decode_serde} ; 
253-     use  restate_types:: storage:: encode:: { encode_bilrost,  encode_serde} ; 
253+     use  restate_types:: storage:: encode:: { 
254+         encode_bilrost,  encode_serde,  estimate_encoded_serde_len, 
255+     } ; 
254256    use  restate_types:: storage:: { 
255257        StorageCodecKind ,  StorageDecode ,  StorageDecodeError ,  StorageEncode ,  StorageEncodeError , 
256258    } ; 
@@ -261,7 +263,9 @@ mod envelope {
261263        fn  encode ( & self ,  buf :  & mut  BytesMut )  -> Result < ( ) ,  StorageEncodeError >  { 
262264            use  bytes:: BufMut ; 
263265            match  self . default_codec ( )  { 
264-                 StorageCodecKind :: FlexbuffersSerde  => encode_serde ( self ,  buf,  self . default_codec ( ) ) , 
266+                 StorageCodecKind :: FlexbuffersSerde  => { 
267+                     encode_serde ( self ,  buf,  StorageCodecKind :: FlexbuffersSerde ) 
268+                 } 
265269                StorageCodecKind :: Custom  => { 
266270                    buf. put_slice ( & encode ( self ) ?) ; 
267271                    Ok ( ( ) ) 
@@ -270,6 +274,19 @@ mod envelope {
270274            } 
271275        } 
272276
277+         fn  estimated_encoded_len ( & self )  -> usize  { 
278+             match  self . default_codec ( )  { 
279+                 StorageCodecKind :: FlexbuffersSerde  => { 
280+                     restate_types:: storage:: encode:: estimate_encoded_serde_len ( 
281+                         self , 
282+                         StorageCodecKind :: FlexbuffersSerde , 
283+                     ) 
284+                 } 
285+                 StorageCodecKind :: Custom  => estimate_custom_encoding_len ( self ) , 
286+                 _ => unreachable ! ( "developer error" ) , 
287+             } 
288+         } 
289+ 
273290        fn  default_codec ( & self )  -> StorageCodecKind  { 
274291            // TODO(azmy): Change to `Custom` in v1.5 
275292            StorageCodecKind :: FlexbuffersSerde 
@@ -359,13 +376,49 @@ mod envelope {
359376            } ) 
360377        } 
361378
379+         fn  serde_encoded_len < T :  serde:: Serialize > ( value :  & T ,  codec :  StorageCodecKind )  -> usize  { 
380+             static  EMPTY :  Field  = Field  { 
381+                 // note: the value of codec is irrelevant for this method, we assume that 
382+                 // it takes a similar amount of space to encode all values of the StorageCodecKind 
383+                 // enum. 
384+                 codec :  Some ( StorageCodecKind :: FlexbuffersSerde ) , 
385+                 bytes :  Bytes :: new ( ) , 
386+             } ; 
387+ 
388+             let  value_len = estimate_encoded_serde_len ( value,  codec) ; 
389+             EMPTY . encoded_len ( )  + value_len
390+         } 
391+ 
392+         fn  bilrost_encoded_len < T :  bilrost:: Message > ( value :  & T )  -> usize  { 
393+             static  EMPTY :  Field  = Field  { 
394+                 codec :  Some ( StorageCodecKind :: Bilrost ) , 
395+                 bytes :  Bytes :: new ( ) , 
396+             } ; 
397+ 
398+             let  value_len = bilrost:: Message :: encoded_len ( value) ; 
399+             EMPTY . encoded_len ( )  + value_len
400+         } 
401+ 
362402        fn  encode_bilrost < T :  bilrost:: Message > ( value :  & T )  -> Result < Self ,  StorageEncodeError >  { 
363403            Ok ( Self  { 
364404                codec :  Some ( StorageCodecKind :: Bilrost ) , 
365405                bytes :  encode_bilrost ( value) , 
366406            } ) 
367407        } 
368408
409+         fn  protobuf_encoded_len < T :  prost:: Message > ( value :  & T )  -> usize  { 
410+             static  EMPTY :  Field  = Field  { 
411+                 // note: the value of codec is irrelevant for this method, we assume that 
412+                 // it takes a similar amount of space to encode all values of the StorageCodecKind 
413+                 // enum. 
414+                 codec :  Some ( StorageCodecKind :: Protobuf ) , 
415+                 bytes :  Bytes :: new ( ) , 
416+             } ; 
417+ 
418+             let  value_len = prost:: Message :: encoded_len ( value) ; 
419+             EMPTY . encoded_len ( )  + value_len
420+         } 
421+ 
369422        fn  encode_protobuf < T :  prost:: Message > ( value :  & T )  -> Result < Self ,  StorageEncodeError >  { 
370423            let  mut  buf = BytesMut :: new ( ) ; 
371424            value
@@ -437,6 +490,71 @@ mod envelope {
437490        } } ; 
438491    } 
439492
493+     pub  fn  estimate_custom_encoding_len ( envelope :  & super :: Envelope )  -> usize  { 
494+         let  command_len = match  & envelope. command  { 
495+             Command :: UpdatePartitionDurability ( value)  => Field :: bilrost_encoded_len ( value) , 
496+             Command :: VersionBarrier ( value)  => Field :: bilrost_encoded_len ( value) , 
497+             Command :: AnnounceLeader ( value)  => { 
498+                 Field :: serde_encoded_len ( value,  StorageCodecKind :: FlexbuffersSerde ) 
499+             } 
500+             Command :: PatchState ( value)  => { 
501+                 // we are copying because we _assume_ that PatchState is not widely used. 
502+                 // The clone will allocate a new hashmap but kvpairs are Bytes (cheap clones) 
503+                 let  value = protobuf:: StateMutation :: from ( value. clone ( ) ) ; 
504+                 Field :: protobuf_encoded_len ( & value) 
505+             } 
506+             Command :: TerminateInvocation ( value)  => { 
507+                 Field :: serde_encoded_len ( value,  StorageCodecKind :: FlexbuffersSerde ) 
508+             } 
509+             Command :: PurgeInvocation ( value)  => { 
510+                 Field :: serde_encoded_len ( value,  StorageCodecKind :: FlexbuffersSerde ) 
511+             } 
512+             Command :: PurgeJournal ( value)  => { 
513+                 Field :: serde_encoded_len ( value,  StorageCodecKind :: FlexbuffersSerde ) 
514+             } 
515+             Command :: Invoke ( value)  => { 
516+                 let  value = protobuf:: ServiceInvocation :: from ( value. as_ref ( ) ) ; 
517+                 // ideally, the envelope would carry the protobuf wrapper instead of doing the 
518+                 // conversion twice (once for length estimate and another for serialization) 
519+                 Field :: protobuf_encoded_len ( & value) 
520+             } 
521+             Command :: TruncateOutbox ( value)  => { 
522+                 Field :: serde_encoded_len ( value,  StorageCodecKind :: FlexbuffersSerde ) 
523+             } 
524+             Command :: ProxyThrough ( value)  => { 
525+                 let  value = protobuf:: ServiceInvocation :: from ( value. as_ref ( ) ) ; 
526+                 Field :: protobuf_encoded_len ( & value) 
527+             } 
528+             Command :: AttachInvocation ( value)  => { 
529+                 let  value = protobuf:: outbox_message:: AttachInvocationRequest :: from ( value. clone ( ) ) ; 
530+                 Field :: protobuf_encoded_len ( & value) 
531+             } 
532+             Command :: InvokerEffect ( value)  => { 
533+                 Field :: serde_encoded_len ( value,  StorageCodecKind :: FlexbuffersSerde ) 
534+             } 
535+             Command :: Timer ( value)  => { 
536+                 Field :: serde_encoded_len ( value,  StorageCodecKind :: FlexbuffersSerde ) 
537+             } 
538+             Command :: ScheduleTimer ( value)  => { 
539+                 Field :: serde_encoded_len ( value,  StorageCodecKind :: FlexbuffersSerde ) 
540+             } 
541+             Command :: InvocationResponse ( value)  => { 
542+                 let  value =
543+                     protobuf:: outbox_message:: OutboxServiceInvocationResponse :: from ( value. clone ( ) ) ; 
544+                 Field :: protobuf_encoded_len ( & value) 
545+             } 
546+             Command :: NotifyGetInvocationOutputResponse ( value)  => Field :: bilrost_encoded_len ( value) , 
547+             Command :: NotifySignal ( value)  => { 
548+                 let  value = protobuf:: outbox_message:: NotifySignal :: from ( value. clone ( ) ) ; 
549+                 Field :: protobuf_encoded_len ( & value) 
550+             } 
551+         } ; 
552+ 
553+         // Assuming 350 bytes for the header and the envelope type-tag + 8 bytes for the command kind 
554+         // overhead = 358 
555+         358  + command_len
556+     } 
557+ 
440558    pub  fn  encode ( envelope :  & super :: Envelope )  -> Result < Bytes ,  StorageEncodeError >  { 
441559        // todo(azmy): avoid clone? this will require change to `From` implementation 
442560        let  ( command_kind,  command)  = match  & envelope. command  { 
0 commit comments