From 97cc3404e655adb5fa6f1b7a7081b05d96f92214 Mon Sep 17 00:00:00 2001 From: Roman Volosatovs Date: Tue, 14 May 2024 20:40:15 +0200 Subject: [PATCH] feat(go): implement `stream` bindgen Signed-off-by: Roman Volosatovs --- crates/wit-bindgen-go/src/interface.rs | 1916 ++++++++++------- .../wit-bindgen-rust/tests/codegen_no_std.rs | 2 +- .../hello/handler/bindings.wrpc.go | 5 +- .../cmd/hello-client-nats/main.go | 5 +- .../hello/handler/bindings.wrpc.go | 17 +- examples/go/hello-server/go.mod | 1 + examples/go/hello-server/go.sum | 2 + examples/go/http-outgoing-nats-server/main.go | 4 +- .../wrpc/keyvalue/store/bindings.wrpc.go | 164 +- examples/go/keyvalue-server/go.mod | 1 + examples/go/keyvalue-server/go.sum | 2 + go/future.go | 6 +- .../http/outgoing_handler/bindings.go | 2 +- go/interface/http/types/bindings.go | 52 +- go/list.go | 2 +- go/option.go | 2 +- go/stream.go | 117 +- go/tuple.go | 4 +- go/wrpc.go | 76 +- tests/go/integration.go | 11 +- tests/go/integration_test.go | 62 +- tests/wit/test.wit | 2 +- 22 files changed, 1553 insertions(+), 902 deletions(-) diff --git a/crates/wit-bindgen-go/src/interface.rs b/crates/wit-bindgen-go/src/interface.rs index b83debb7..f88c5fdf 100644 --- a/crates/wit-bindgen-go/src/interface.rs +++ b/crates/wit-bindgen-go/src/interface.rs @@ -26,16 +26,22 @@ pub struct InterfaceGenerator<'a> { } impl InterfaceGenerator<'_> { - fn print_read_ty(&mut self, ty: &Type, reader: &str, path: &str) { - // NOTE: u{16,32,64} decoding adapted from - // https://cs.opensource.google/go/go/+/refs/tags/go1.22.2:src/encoding/binary/varint.go;l=128-153 - // NOTE: s{16,32,64} decoding adapted from - // https://github.com/go-delve/delve/blob/26799555e5518e8a9fe2d68e02379257ebda4dd2/pkg/dwarf/leb128/decode.go#L51-L81 - match ty { - Type::Id(t) => self.print_read_tyid(*t, reader, path), - Type::Bool => uwrite!( - self.src, - r#"func(r {wrpc}.ByteReader) (bool, error) {{ + // u{16,32,64} decoding adapted from + // https://cs.opensource.google/go/go/+/refs/tags/go1.22.2:src/encoding/binary/varint.go;l=128-153 + // + // s{16,32,64} decoding adapted from + // https://github.com/go-delve/delve/blob/26799555e5518e8a9fe2d68e02379257ebda4dd2/pkg/dwarf/leb128/decode.go#L51-L81 + // + // s{16,32,64} encoding adapted from + // https://github.com/go-delve/delve/blob/26799555e5518e8a9fe2d68e02379257ebda4dd2/pkg/dwarf/leb128/encode.go#L23-L42 + + fn print_read_bool(&mut self, reader: &str) { + let fmt = self.deps.fmt(); + let slog = self.deps.slog(); + let wrpc = self.deps.wrpc(); + uwrite!( + self.src, + r#"func(r {wrpc}.ByteReader) (bool, error) {{ {slog}.Debug("reading bool byte") v, err := r.ReadByte() if err != nil {{ @@ -51,13 +57,16 @@ impl InterfaceGenerator<'_> { return false, {fmt}.Errorf("invalid bool value %d", v) }} }}({reader})"#, - fmt = self.deps.fmt(), - slog = self.deps.slog(), - wrpc = self.deps.wrpc(), - ), - Type::U8 => uwrite!( - self.src, - r#"func(r {wrpc}.ByteReader) (uint8, error) {{ + ); + } + + fn print_read_u8(&mut self, reader: &str) { + let fmt = self.deps.fmt(); + let io = self.deps.io(); + let slog = self.deps.slog(); + uwrite!( + self.src, + r#"func(r {io}.ByteReader) (uint8, error) {{ {slog}.Debug("reading u8 byte") v, err := r.ReadByte() if err != nil {{ @@ -65,13 +74,17 @@ impl InterfaceGenerator<'_> { }} return v, nil }}({reader})"#, - fmt = self.deps.fmt(), - slog = self.deps.slog(), - wrpc = self.deps.wrpc(), - ), - Type::U16 => uwrite!( - self.src, - r#"func(r {wrpc}.ByteReader) (uint16, error) {{ + ); + } + + fn print_read_u16(&mut self, reader: &str) { + let fmt = self.deps.fmt(); + let errors = self.deps.errors(); + let io = self.deps.io(); + let slog = self.deps.slog(); + uwrite!( + self.src, + r#"func(r {io}.ByteReader) (uint16, error) {{ var x uint16 var s uint for i := 0; i < 3; i++ {{ @@ -94,15 +107,17 @@ impl InterfaceGenerator<'_> { }} return x, {errors}.New("varint overflows a 16-bit integer") }}({reader})"#, - errors = self.deps.errors(), - fmt = self.deps.fmt(), - io = self.deps.io(), - slog = self.deps.slog(), - wrpc = self.deps.wrpc(), - ), - Type::U32 => uwrite!( - self.src, - r#"func(r {wrpc}.ByteReader) (uint32, error) {{ + ); + } + + fn print_read_u32(&mut self, reader: &str) { + let fmt = self.deps.fmt(); + let errors = self.deps.errors(); + let io = self.deps.io(); + let slog = self.deps.slog(); + uwrite!( + self.src, + r#"func(r {io}.ByteReader) (uint32, error) {{ var x uint32 var s uint for i := 0; i < 5; i++ {{ @@ -125,15 +140,17 @@ impl InterfaceGenerator<'_> { }} return x, {errors}.New("varint overflows a 32-bit integer") }}({reader})"#, - errors = self.deps.errors(), - fmt = self.deps.fmt(), - io = self.deps.io(), - slog = self.deps.slog(), - wrpc = self.deps.wrpc(), - ), - Type::U64 => uwrite!( - self.src, - r#"func(r {wrpc}.ByteReader) (uint64, error) {{ + ); + } + + fn print_read_u64(&mut self, reader: &str) { + let fmt = self.deps.fmt(); + let errors = self.deps.errors(); + let io = self.deps.io(); + let slog = self.deps.slog(); + uwrite!( + self.src, + r#"func(r {io}.ByteReader) (uint64, error) {{ var x uint64 var s uint for i := 0; i < 10; i++ {{ @@ -156,15 +173,16 @@ impl InterfaceGenerator<'_> { }} return x, {errors}.New("varint overflows a 64-bit integer") }}({reader})"#, - errors = self.deps.errors(), - fmt = self.deps.fmt(), - io = self.deps.io(), - slog = self.deps.slog(), - wrpc = self.deps.wrpc(), - ), - Type::S8 => uwrite!( - self.src, - r#"func(r {wrpc}.ByteReader) (int8, error) {{ + ); + } + + fn print_read_s8(&mut self, reader: &str) { + let fmt = self.deps.fmt(); + let io = self.deps.io(); + let slog = self.deps.slog(); + uwrite!( + self.src, + r#"func(r {io}.ByteReader) (int8, error) {{ {slog}.Debug("reading s8 byte") v, err := r.ReadByte() if err != nil {{ @@ -172,13 +190,16 @@ impl InterfaceGenerator<'_> { }} return int8(v), nil }}({reader})"#, - fmt = self.deps.fmt(), - slog = self.deps.slog(), - wrpc = self.deps.wrpc(), - ), - Type::S16 => uwrite!( - self.src, - r#"func(r {wrpc}.ByteReader) (int16, error) {{ + ); + } + + fn print_read_s16(&mut self, reader: &str) { + let fmt = self.deps.fmt(); + let io = self.deps.io(); + let slog = self.deps.slog(); + uwrite!( + self.src, + r#"func(r {io}.ByteReader) (int16, error) {{ var ( b byte err error @@ -205,13 +226,16 @@ impl InterfaceGenerator<'_> { }} return result, nil }}({reader})"#, - fmt = self.deps.fmt(), - slog = self.deps.slog(), - wrpc = self.deps.wrpc(), - ), - Type::S32 => uwrite!( - self.src, - r#"func(r {wrpc}.ByteReader) (int32, error) {{ + ); + } + + fn print_read_s32(&mut self, reader: &str) { + let fmt = self.deps.fmt(); + let io = self.deps.io(); + let slog = self.deps.slog(); + uwrite!( + self.src, + r#"func(r {io}.ByteReader) (int32, error) {{ var ( b byte err error @@ -238,13 +262,16 @@ impl InterfaceGenerator<'_> { }} return result, nil }}({reader})"#, - fmt = self.deps.fmt(), - slog = self.deps.slog(), - wrpc = self.deps.wrpc(), - ), - Type::S64 => uwrite!( - self.src, - r#"func(r {wrpc}.ByteReader) (int64, error) {{ + ); + } + + fn print_read_s64(&mut self, reader: &str) { + let fmt = self.deps.fmt(); + let io = self.deps.io(); + let slog = self.deps.slog(); + uwrite!( + self.src, + r#"func(r {io}.ByteReader) (int64, error) {{ var ( b byte err error @@ -271,13 +298,18 @@ impl InterfaceGenerator<'_> { }} return result, nil }}({reader})"#, - fmt = self.deps.fmt(), - slog = self.deps.slog(), - wrpc = self.deps.wrpc(), - ), - Type::F32 => uwrite!( - self.src, - r#"func(r {wrpc}.ByteReader) (float32, error) {{ + ); + } + + fn print_read_f32(&mut self, reader: &str) { + let binary = self.deps.binary(); + let fmt = self.deps.fmt(); + let io = self.deps.io(); + let math = self.deps.math(); + let slog = self.deps.slog(); + uwrite!( + self.src, + r#"func(r {io}.Reader) (float32, error) {{ var b [4]byte {slog}.Debug("reading f32 bytes") _, err := r.Read(b[:]) @@ -286,15 +318,18 @@ impl InterfaceGenerator<'_> { }} return {math}.Float32frombits({binary}.LittleEndian.Uint32(b[:])), nil }}({reader})"#, - binary = self.deps.binary(), - fmt = self.deps.fmt(), - math = self.deps.math(), - slog = self.deps.slog(), - wrpc = self.deps.wrpc(), - ), - Type::F64 => uwrite!( - self.src, - r#"func(r {wrpc}.ByteReader) (float64, error) {{ + ); + } + + fn print_read_f64(&mut self, reader: &str) { + let binary = self.deps.binary(); + let fmt = self.deps.fmt(); + let io = self.deps.io(); + let math = self.deps.math(); + let slog = self.deps.slog(); + uwrite!( + self.src, + r#"func(r {io}.Reader) (float64, error) {{ var b [8]byte {slog}.Debug("reading f64 bytes") _, err := r.Read(b[:]) @@ -303,15 +338,18 @@ impl InterfaceGenerator<'_> { }} return {math}.Float64frombits({binary}.LittleEndian.Uint64(b[:])), nil }}({reader})"#, - binary = self.deps.binary(), - fmt = self.deps.fmt(), - math = self.deps.math(), - slog = self.deps.slog(), - wrpc = self.deps.wrpc(), - ), - Type::Char => uwrite!( - self.src, - r#"func(r {wrpc}.ByteReader) (rune, error) {{ + ); + } + + fn print_read_char(&mut self, reader: &str) { + let errors = self.deps.errors(); + let fmt = self.deps.fmt(); + let io = self.deps.io(); + let slog = self.deps.slog(); + let utf8 = self.deps.utf8(); + uwrite!( + self.src, + r#"func(r {io}.ByteReader) (rune, error) {{ var x uint32 var s uint for i := 0; i < 5; i++ {{ @@ -339,16 +377,18 @@ impl InterfaceGenerator<'_> { }} return {utf8}.RuneError, {errors}.New("char overflows a 32-bit integer") }}({reader})"#, - errors = self.deps.errors(), - fmt = self.deps.fmt(), - io = self.deps.io(), - slog = self.deps.slog(), - utf8 = self.deps.utf8(), - wrpc = self.deps.wrpc(), - ), - Type::String => uwrite!( - self.src, - r#"func(r {wrpc}.ByteReader) (string, error) {{ + ); + } + + fn print_read_string(&mut self, reader: &str) { + let errors = self.deps.errors(); + let fmt = self.deps.fmt(); + let io = self.deps.io(); + let slog = self.deps.slog(); + let utf8 = self.deps.utf8(); + uwrite!( + self.src, + r#"func(r interface {{ {io}.ByteReader; {io}.Reader }}) (string, error) {{ var x uint32 var s uint for i := 0; i < 5; i++ {{ @@ -381,36 +421,80 @@ impl InterfaceGenerator<'_> { }} return "", {errors}.New("string length overflows a 32-bit integer") }}({reader})"#, - errors = self.deps.errors(), - fmt = self.deps.fmt(), - io = self.deps.io(), - slog = self.deps.slog(), - utf8 = self.deps.utf8(), - wrpc = self.deps.wrpc(), - ), - } + ); } - fn print_read_tyid(&mut self, id: TypeId, reader: &str, path: &str) { - let ty = &self.resolve.types[id]; - if let Some(ref name) = ty.name { - let read = self.type_path_with_name(id, format!("Read{}", to_upper_camel_case(name))); - uwrite!(self.src, "{read}({reader})"); - return; - } + fn print_read_byte_list(&mut self, reader: &str) { + let errors = self.deps.errors(); + let fmt = self.deps.fmt(); + let io = self.deps.io(); + let slog = self.deps.slog(); + uwrite!( + self.src, + r#"func(r interface {{ {io}.ByteReader; {io}.Reader }}) ([]byte, error) {{ + var x uint32 + var s uint + for i := 0; i < 5; i++ {{ + {slog}.Debug("reading byte list length", "i", i) + b, err := r.ReadByte() + if err != nil {{ + if i > 0 && err == {io}.EOF {{ + err = {io}.ErrUnexpectedEOF + }} + return nil, {fmt}.Errorf("failed to read byte list length byte: %w", err) + }} + if b < 0x80 {{ + if i == 4 && b > 1 {{ + return nil, {errors}.New("byte list length overflows a 32-bit integer") + }} + x = x | uint32(b)< { - let fmt = self.deps.fmt(); - let io = self.deps.io(); - let errors = self.deps.errors(); - let slog = self.deps.slog(); - let wrpc = self.deps.wrpc(); - uwrite!(self.src, "func(r {wrpc}.ByteReader) ("); - self.print_list(ty); - uwrite!( - self.src, - r#", error) {{ + fn print_read_list(&mut self, ty: &Type, reader: &str, path: &str) { + { + let mut ty = *ty; + let is_u8 = loop { + match ty { + Type::U8 => break true, + Type::Id(id) => { + if let TypeDefKind::Type(t) = self.resolve.types[id].kind { + ty = t; + } else { + break false; + } + } + _ => break false, + } + }; + if is_u8 { + self.print_read_byte_list(reader); + return; + } + } + let fmt = self.deps.fmt(); + let io = self.deps.io(); + let errors = self.deps.errors(); + let slog = self.deps.slog(); + let wrpc = self.deps.wrpc(); + uwrite!(self.src, "func(r {wrpc}.IndexReader) ("); + self.print_list(ty); + uwrite!( + self.src, + r#", error) {{ var x uint32 var s uint for i := 0; i < 5; i++ {{ @@ -428,20 +512,20 @@ impl InterfaceGenerator<'_> { }} x = x | uint32(b)< { }} return nil, {errors}.New("list length overflows a 32-bit integer") }}({reader})"#, - ); - } + ); + } - TypeDefKind::Option(ty) => { - let fmt = self.deps.fmt(); - let slog = self.deps.slog(); - let wrpc = self.deps.wrpc(); - uwrite!(self.src, "func(r {wrpc}.ByteReader) ("); - self.print_option(ty, true); - uwrite!( - self.src, - r#", error) {{ + fn print_read_option(&mut self, ty: &Type, reader: &str, path: &str) { + let fmt = self.deps.fmt(); + let slog = self.deps.slog(); + let wrpc = self.deps.wrpc(); + uwrite!(self.src, "func(r {wrpc}.IndexReader) ("); + self.print_option(ty, true); + uwrite!( + self.src, + r#", error) {{ {slog}.Debug("reading option status byte") status, err := r.ReadByte() if err != nil {{ @@ -475,255 +559,419 @@ impl InterfaceGenerator<'_> { case 1: {slog}.Debug("reading `option::some` payload") v, err := "#, - ); - self.print_read_ty(ty, "r", &format!("{path}, 1")); - self.push_str("\n"); - uwrite!( - self.src, - r#"if err != nil {{ + ); + self.print_read_ty(ty, "r", &format!("{path}, 1")); + self.push_str("\n"); + uwrite!( + self.src, + r#"if err != nil {{ return nil, {fmt}.Errorf("failed to read `option::some` value: %w", err) }} return "#, - ); - self.result_element_ptr(ty, false); - uwrite!( - self.src, - r#"v, nil + ); + self.result_element_ptr(ty, false); + uwrite!( + self.src, + r#"v, nil default: return nil, {fmt}.Errorf("invalid option status byte %d", status) }} }}({reader})"#, - ); - } + ); + } - TypeDefKind::Result(ty) => { - let fmt = self.deps.fmt(); - let slog = self.deps.slog(); - let wrpc = self.deps.wrpc(); - uwrite!(self.src, "func(r {wrpc}.ByteReader) (*"); - self.print_result(ty); - uwriteln!( - self.src, - r#", error) {{ + fn print_read_result(&mut self, ty: &Result_, reader: &str, path: &str) { + let fmt = self.deps.fmt(); + let slog = self.deps.slog(); + let wrpc = self.deps.wrpc(); + uwrite!(self.src, "func(r {wrpc}.IndexReader) (*"); + self.print_result(ty); + uwriteln!( + self.src, + r#", error) {{ {slog}.Debug("reading result status byte") status, err := r.ReadByte() if err != nil {{ return nil, {fmt}.Errorf("failed to read result status byte: %w", err) }}"#, - ); - self.push_str("switch status {\n"); - self.push_str("case 0:\n"); - if let Some(ref ty) = ty.ok { - uwriteln!(self.src, r#"{slog}.Debug("reading `result::ok` payload")"#); - self.push_str("v, err := "); - self.print_read_ty(ty, "r", &format!("{path}, 0")); - self.push_str("\n"); - uwriteln!( - self.src, - r#"if err != nil {{ + ); + self.push_str("switch status {\n"); + self.push_str("case 0:\n"); + if let Some(ref ty) = ty.ok { + uwriteln!(self.src, r#"{slog}.Debug("reading `result::ok` payload")"#); + self.push_str("v, err := "); + self.print_read_ty(ty, "r", &format!("{path}, 0")); + self.push_str("\n"); + uwriteln!( + self.src, + r#"if err != nil {{ return nil, fmt.Errorf("failed to read `result::ok` value: %w", err) }}"#, - ); - } else { - self.push_str("var v struct{}\n"); - } - self.push_str("return &"); - self.print_result(ty); - self.push_str("{ Ok: &v }, nil\n"); - self.push_str("case 1:\n"); - if let Some(ref err) = ty.err { - uwriteln!(self.src, r#"{slog}.Debug("reading `result::err` payload")"#); - self.push_str("v, err := "); - self.print_read_ty(err, "r", &format!("{path}, 1")); - self.push_str("\n"); - uwriteln!( - self.src, - r#"if err != nil {{ - return nil, {fmt}.Errorf("failed to read `result::err` value: %w", err) - }}"#, - ); - self.push_str("return &"); - self.print_result(ty); - self.push_str("{ Err: "); - self.result_element_ptr(err, false); - } else { - self.push_str("var v struct{}\n"); - self.push_str("return &"); - self.print_result(ty); - self.push_str("{ Err: &"); - } - uwrite!( - self.src, - r#"v }}, nil + ); + } else { + self.push_str("var v struct{}\n"); + } + self.push_str("return &"); + self.print_result(ty); + self.push_str("{ Ok: &v }, nil\n"); + self.push_str("case 1:\n"); + if let Some(ref err) = ty.err { + uwriteln!(self.src, r#"{slog}.Debug("reading `result::err` payload")"#); + self.push_str("v, err := "); + self.print_read_ty(err, "r", &format!("{path}, 1")); + self.push_str("\n"); + uwriteln!( + self.src, + r#"if err != nil {{ + return nil, {fmt}.Errorf("failed to read `result::err` value: %w", err) + }}"#, + ); + self.push_str("return &"); + self.print_result(ty); + self.push_str("{ Err: "); + self.result_element_ptr(err, false); + } else { + self.push_str("var v struct{}\n"); + self.push_str("return &"); + self.print_result(ty); + self.push_str("{ Err: &"); + } + uwrite!( + self.src, + r#"v }}, nil default: return nil, {fmt}.Errorf("invalid result status byte %d", status) }} }}({reader})"#, - ); - } + ); + } - TypeDefKind::Variant(_) => panic!("unsupported anonymous variant"), + fn print_read_tuple(&mut self, ty: &Tuple, reader: &str, path: &str) { + match ty.types.as_slice() { + [] => self.push_str("struct{}{}, nil"), + [ty] => self.print_read_ty(ty, reader, &format!("{path}, 0")), + _ => { + let wrpc = self.deps.wrpc(); - TypeDefKind::Tuple(ty) => match ty.types.as_slice() { - [] => self.push_str("struct{}{}, nil"), - [ty] => self.print_read_ty(ty, reader, &format!("{path}, 0")), - _ => { - let wrpc = self.deps.wrpc(); - - uwrite!(self.src, "func(r {wrpc}.ByteReader) ("); - self.print_tuple(ty, true); - self.push_str(", error) {\n"); - self.push_str("v := "); - self.print_tuple(ty, false); - self.push_str("{}\n"); - self.push_str("var err error\n"); - for (i, ty) in ty.types.iter().enumerate() { - let fmt = self.deps.fmt(); - let slog = self.deps.slog(); - uwrite!( - self.src, - r#"{slog}.Debug("reading tuple element {i}") + uwrite!(self.src, "func(r {wrpc}.IndexReader) ("); + self.print_tuple(ty, true); + self.push_str(", error) {\n"); + self.push_str("v := "); + self.print_tuple(ty, false); + self.push_str("{}\n"); + self.push_str("var err error\n"); + for (i, ty) in ty.types.iter().enumerate() { + let fmt = self.deps.fmt(); + let slog = self.deps.slog(); + uwrite!( + self.src, + r#"{slog}.Debug("reading tuple element {i}") v.V{i}, err = "# - ); - self.print_read_ty(ty, "r", &format!("{path}, {i}")); - self.push_str("\n"); - uwriteln!( - self.src, - r#"if err != nil {{ + ); + self.print_read_ty(ty, "r", &format!("{path}, {i}")); + self.push_str("\n"); + uwriteln!( + self.src, + r#"if err != nil {{ return nil, {fmt}.Errorf("failed to read tuple element {i}: %w", err) }}"# - ); - } - self.push_str("return v, nil\n"); - uwrite!(self.src, "}}({reader})"); + ); } - }, - TypeDefKind::Resource => { - panic!("unsupported anonymous type reference: resource") - } - TypeDefKind::Record(_) => { - panic!("unsupported anonymous type reference: record") - } - TypeDefKind::Flags(_) => { - panic!("unsupported anonymous type reference: flags") + self.push_str("return v, nil\n"); + uwrite!(self.src, "}}({reader})"); } - TypeDefKind::Enum(_) => { - panic!("unsupported anonymous type reference: enum") + } + } + + fn print_read_stream(&mut self, Stream { element, .. }: &Stream, reader: &str, path: &str) { + match element { + Some(Type::U8) => { + let bytes = self.deps.bytes(); + let fmt = self.deps.fmt(); + let slog = self.deps.slog(); + let wrpc = self.deps.wrpc(); + + uwriteln!( + self.src, + r#"func(r {wrpc}.IndexReader, path ...uint32) ({wrpc}.ReadCompleter, error) {{ + {slog}.Debug("reading byte stream status byte") + status, err := r.ReadByte() + if err != nil {{ + return nil, {fmt}.Errorf("failed to read byte stream status byte: %w", err) + }} + switch status {{ + case 0: + r, err = r.Index(path...) + if err != nil {{ + return nil, {fmt}.Errorf("failed to index reader: %w", err) + }} + return {wrpc}.NewByteStreamReader({wrpc}.NewPendingByteReader(r)), nil + case 1: + {slog}.Debug("reading ready byte stream contents") + buf, err := "# + ); + self.print_read_byte_list("r"); + uwriteln!( + self.src, + r#" + if err != nil {{ + return nil, {fmt}.Errorf("failed to read ready byte stream contents: %w", err) + }} + {slog}.Debug("read ready byte stream contents", "len", len(buf)) + return {wrpc}.NewCompleteReader({bytes}.NewReader(buf)), nil + default: + return nil, {fmt}.Errorf("invalid stream status byte %d", status) + }} +}}({reader}, {path})"# + ); } - TypeDefKind::Future(_ty) => uwrite!( - self.src, - r#"nil, {errors}.New("reading futures not supported yet")"#, - errors = self.deps.errors(), - ), - TypeDefKind::Stream(ty) => match ty.element { - Some(Type::U8) => uwrite!( + Some(ty) => { + let errors = self.deps.errors(); + let fmt = self.deps.fmt(); + let io = self.deps.io(); + let math = self.deps.math(); + let slog = self.deps.slog(); + let wrpc = self.deps.wrpc(); + + uwrite!( self.src, - "{wrpc}.ReadByteStream({reader}, {path})", - wrpc = self.deps.wrpc(), - ), - _ => uwrite!( + r#"func(r {wrpc}.IndexReader, path ...uint32) ({wrpc}.ReceiveCompleter["# + ); + self.print_list(ty); + uwrite!( self.src, - r#"nil, {errors}.New("reading non-byte streams not supported yet")"#, - errors = self.deps.errors(), - ), - }, + r#"], error) {{ + {slog}.Debug("reading stream status byte") + status, err := r.ReadByte() + if err != nil {{ + return nil, {fmt}.Errorf("failed to read stream status byte: %w", err) + }} + switch status {{ + case 0: + r, err = r.Index(path...) + if err != nil {{ + return nil, {fmt}.Errorf("failed to index reader: %w", err) + }} + var total uint32 + return {wrpc}.NewDecodeReceiver(r, func(r {wrpc}.IndexReader) ("# + ); + self.print_list(ty); + uwrite!( + self.src, + r#", error) {{ + {slog}.Debug("reading pending stream chunk length") + n, err := "# + ); + self.print_read_u32("r"); + uwrite!( + self.src, + r#" + if err != nil {{ + return nil, {fmt}.Errorf("failed to read pending stream chunk length: %w", err) + }} + if n == 0 {{ + return nil, {io}.EOF + }} + if {math}.MaxUint32 - n < total {{ + return nil, {errors}.New("total incoming pending stream element count would overflow a 32-bit unsigned integer") + }} + vs := make("# + ); + self.print_list(ty); + uwrite!( + self.src, + r#", n) + for i := range vs {{ + {slog}.Debug("reading pending stream element", "i", total) + v, err := "# + ); + self.print_read_ty(ty, "r", "total"); + uwriteln!( + self.src, + r#" + if err != nil {{ + return nil, {fmt}.Errorf("failed to read pending stream chunk element %d: %w", i, err) + }} + vs[i] = v + total++ + }} + return vs, nil + }}), nil + case 1: + {slog}.Debug("reading ready stream contents") + vs, err := "# + ); + self.print_read_list(ty, "r", path); + uwriteln!( + self.src, + r#" + if err != nil {{ + return nil, {fmt}.Errorf("failed to read ready stream contents: %w", err) + }} + {slog}.Debug("read ready stream contents", "len", len(vs)) + return {wrpc}.NewCompleteReceiver(vs), nil + default: + return nil, {fmt}.Errorf("invalid stream status byte %d", status) + }} +}}({reader}, {path})"# + ); + } + None => panic!("streams with no element types are not supported"), + } + } + + fn print_read_ty(&mut self, ty: &Type, reader: &str, path: &str) { + match ty { + Type::Id(ty) => self.print_read_tyid(*ty, reader, path), + Type::Bool => self.print_read_bool(reader), + Type::U8 => self.print_read_u8(reader), + Type::U16 => self.print_read_u16(reader), + Type::U32 => self.print_read_u32(reader), + Type::U64 => self.print_read_u64(reader), + Type::S8 => self.print_read_s8(reader), + Type::S16 => self.print_read_s16(reader), + Type::S32 => self.print_read_s32(reader), + Type::S64 => self.print_read_s64(reader), + Type::F32 => self.print_read_f32(reader), + Type::F64 => self.print_read_f64(reader), + Type::Char => self.print_read_char(reader), + Type::String => self.print_read_string(reader), + } + } + fn print_read_tyid(&mut self, id: TypeId, reader: &str, path: &str) { + let ty = &self.resolve.types[id]; + if let Some(ref name) = ty.name { + let read = self.type_path_with_name(id, format!("Read{}", to_upper_camel_case(name))); + uwrite!(self.src, "{read}({reader})"); + return; + } + match &ty.kind { + TypeDefKind::Record(_) => panic!("unsupported anonymous type reference: record"), + TypeDefKind::Resource => panic!("unsupported anonymous type reference: resource"), TypeDefKind::Handle(Handle::Own(_ty)) => uwrite!( self.src, r#"0, {errors}.New("reading owned handles not supported yet")"#, errors = self.deps.errors(), ), - TypeDefKind::Handle(Handle::Borrow(_ty)) => uwrite!( self.src, r#"0, {errors}.New("reading borrowed handles not supported yet")"#, errors = self.deps.errors(), ), - + TypeDefKind::Flags(_) => panic!("unsupported anonymous type reference: flags"), + TypeDefKind::Tuple(ty) => self.print_read_tuple(ty, reader, path), + TypeDefKind::Variant(_) => panic!("unsupported anonymous variant"), + TypeDefKind::Enum(_) => panic!("unsupported anonymous type reference: enum"), + TypeDefKind::Option(ty) => self.print_read_option(ty, reader, path), + TypeDefKind::Result(ty) => self.print_read_result(ty, reader, path), + TypeDefKind::List(ty) => self.print_read_list(ty, reader, path), + TypeDefKind::Future(_ty) => uwrite!( + self.src, + r#"nil, {errors}.New("reading futures not supported yet")"#, + errors = self.deps.errors(), + ), + TypeDefKind::Stream(ty) => self.print_read_stream(ty, reader, path), TypeDefKind::Type(t) => self.print_read_ty(t, reader, path), - TypeDefKind::Unknown => unreachable!(), } } - fn print_write_ty(&mut self, ty: &Type, name: &str, writer: &str) { - // NOTE: s{16,32,64} encoding adapted from - // https://github.com/go-delve/delve/blob/26799555e5518e8a9fe2d68e02379257ebda4dd2/pkg/dwarf/leb128/encode.go#L23-L42 - match ty { - Type::Id(t) => self.print_write_tyid(*t, name, writer), - Type::Bool => uwrite!( - self.src, - r#"func(v bool, w {wrpc}.ByteWriter) (func({wrpc}.IndexWriter) error, error) {{ + fn print_write_bool(&mut self, name: &str, writer: &str) { + let io = self.deps.io(); + let slog = self.deps.slog(); + uwrite!( + self.src, + r#"func(v bool, w {io}.ByteWriter) error {{ if !v {{ {slog}.Debug("writing `false` byte") - return nil, w.WriteByte(0) + return w.WriteByte(0) }} {slog}.Debug("writing `true` byte") - return nil, w.WriteByte(1) + return w.WriteByte(1) }}({name}, {writer})"#, - slog = self.deps.slog(), - wrpc = self.deps.wrpc(), - ), - Type::U8 => uwrite!( - self.src, - r#"func(v uint8, w {wrpc}.ByteWriter) (func({wrpc}.IndexWriter) error, error) {{ + ); + } + + fn print_write_u8(&mut self, name: &str, writer: &str) { + let io = self.deps.io(); + let slog = self.deps.slog(); + uwrite!( + self.src, + r#"func(v uint8, w {io}.ByteWriter) error {{ {slog}.Debug("writing u8 byte") - return nil, w.WriteByte(v) + return w.WriteByte(v) }}({name}, {writer})"#, - slog = self.deps.slog(), - wrpc = self.deps.wrpc(), - ), - Type::U16 => uwrite!( - self.src, - r#"func(v uint16, w {wrpc}.ByteWriter) (func({wrpc}.IndexWriter) error, error) {{ + ); + } + + fn print_write_u16(&mut self, name: &str, writer: &str) { + let binary = self.deps.binary(); + let io = self.deps.io(); + let slog = self.deps.slog(); + uwrite!( + self.src, + r#"func(v uint16, w interface {{ {io}.ByteWriter; {io}.Writer }}) (err error) {{ b := make([]byte, {binary}.MaxVarintLen16) i := {binary}.PutUvarint(b, uint64(v)) {slog}.Debug("writing u16") - _, err := w.Write(b[:i]) - return nil, err + _, err = w.Write(b[:i]) + return err }}({name}, {writer})"#, - binary = self.deps.binary(), - slog = self.deps.slog(), - wrpc = self.deps.wrpc(), - ), - Type::U32 => uwrite!( - self.src, - r#"func(v uint32, w {wrpc}.ByteWriter) (func({wrpc}.IndexWriter) error, error) {{ + ); + } + + fn print_write_u32(&mut self, name: &str, writer: &str) { + let binary = self.deps.binary(); + let io = self.deps.io(); + let slog = self.deps.slog(); + uwrite!( + self.src, + r#"func(v uint32, w interface {{ {io}.ByteWriter; {io}.Writer }}) (err error) {{ b := make([]byte, {binary}.MaxVarintLen32) i := {binary}.PutUvarint(b, uint64(v)) {slog}.Debug("writing u32") - _, err := w.Write(b[:i]) - return nil, err + _, err = w.Write(b[:i]) + return err }}({name}, {writer})"#, - binary = self.deps.binary(), - slog = self.deps.slog(), - wrpc = self.deps.wrpc(), - ), - Type::U64 => uwrite!( - self.src, - r#"func(v uint64, w {wrpc}.ByteWriter) (func({wrpc}.IndexWriter) error, error) {{ + ); + } + + fn print_write_u64(&mut self, name: &str, writer: &str) { + let binary = self.deps.binary(); + let io = self.deps.io(); + let slog = self.deps.slog(); + uwrite!( + self.src, + r#"func(v uint64, w interface {{ {io}.ByteWriter; {io}.Writer }}) (err error) {{ b := make([]byte, {binary}.MaxVarintLen64) i := {binary}.PutUvarint(b, uint64(v)) {slog}.Debug("writing u64") - _, err := w.Write(b[:i]) - return nil, err + _, err = w.Write(b[:i]) + return err }}({name}, {writer})"#, - binary = self.deps.binary(), - slog = self.deps.slog(), - wrpc = self.deps.wrpc(), - ), - Type::S8 => uwrite!( - self.src, - r#"func(v int8, w {wrpc}.ByteWriter) (func({wrpc}.IndexWriter) error, error) {{ + ); + } + + fn print_write_s8(&mut self, name: &str, writer: &str) { + let io = self.deps.io(); + let slog = self.deps.slog(); + uwrite!( + self.src, + r#"func(v int8, w {io}.ByteWriter) error {{ {slog}.Debug("writing s8 byte") - return nil, w.WriteByte(byte(v)) + return w.WriteByte(byte(v)) }}({name}, {writer})"#, - slog = self.deps.slog(), - wrpc = self.deps.wrpc(), - ), - Type::S16 => uwrite!( - self.src, - r#"func(v int16, w {wrpc}.ByteWriter) (func({wrpc}.IndexWriter) error, error) {{ + ); + } + + fn print_write_s16(&mut self, name: &str, writer: &str) { + let io = self.deps.io(); + let fmt = self.deps.fmt(); + let slog = self.deps.slog(); + uwrite!( + self.src, + r#"func(v int16, w {io}.ByteWriter) (err error) {{ for {{ b := byte(v & 0x7f) v >>= 7 @@ -737,21 +985,24 @@ impl InterfaceGenerator<'_> { b = b | 0x80 }} {slog}.Debug("writing s16 byte") - if err := w.WriteByte(b); err != nil {{ - return nil, {fmt}.Errorf("failed to write `s16` byte: %w", err) + if err = w.WriteByte(b); err != nil {{ + return {fmt}.Errorf("failed to write `s16` byte: %w", err) }} if last {{ - return nil, nil + return nil }} }} }}({name}, {writer})"#, - fmt = self.deps.fmt(), - slog = self.deps.slog(), - wrpc = self.deps.wrpc(), - ), - Type::S32 => uwrite!( - self.src, - r#"func(v int32, w {wrpc}.ByteWriter) (func({wrpc}.IndexWriter) error, error) {{ + ); + } + + fn print_write_s32(&mut self, name: &str, writer: &str) { + let io = self.deps.io(); + let fmt = self.deps.fmt(); + let slog = self.deps.slog(); + uwrite!( + self.src, + r#"func(v int32, w {io}.ByteWriter) (err error) {{ for {{ b := byte(v & 0x7f) v >>= 7 @@ -765,21 +1016,24 @@ impl InterfaceGenerator<'_> { b = b | 0x80 }} {slog}.Debug("writing s32 byte") - if err := w.WriteByte(b); err != nil {{ - return nil, {fmt}.Errorf("failed to write `s32` byte: %w", err) + if err = w.WriteByte(b); err != nil {{ + return {fmt}.Errorf("failed to write `s32` byte: %w", err) }} if last {{ - return nil, nil + return nil }} }} }}({name}, {writer})"#, - fmt = self.deps.fmt(), - slog = self.deps.slog(), - wrpc = self.deps.wrpc(), - ), - Type::S64 => uwrite!( - self.src, - r#"func(v int64, w {wrpc}.ByteWriter) (func({wrpc}.IndexWriter) error, error) {{ + ); + } + + fn print_write_s64(&mut self, name: &str, writer: &str) { + let io = self.deps.io(); + let fmt = self.deps.fmt(); + let slog = self.deps.slog(); + uwrite!( + self.src, + r#"func(v int64, w {io}.ByteWriter) (err error) {{ for {{ b := byte(v & 0x7f) v >>= 7 @@ -793,229 +1047,122 @@ impl InterfaceGenerator<'_> { b = b | 0x80 }} {slog}.Debug("writing s64 byte") - if err := w.WriteByte(b); err != nil {{ - return nil, {fmt}.Errorf("failed to write `s64` byte: %w", err) + if err = w.WriteByte(b); err != nil {{ + return {fmt}.Errorf("failed to write `s64` byte: %w", err) }} if last {{ - return nil, nil + return nil }} }} }}({name}, {writer})"#, - fmt = self.deps.fmt(), - slog = self.deps.slog(), - wrpc = self.deps.wrpc(), - ), - Type::F32 => uwrite!( - self.src, - r#"func(v float32, w {wrpc}.ByteWriter) (func({wrpc}.IndexWriter) error, error) {{ + ); + } + + fn print_write_f32(&mut self, name: &str, writer: &str) { + let binary = self.deps.binary(); + let io = self.deps.io(); + let math = self.deps.math(); + let slog = self.deps.slog(); + uwrite!( + self.src, + r#"func(v float32, w {io}.Writer) (err error) {{ b := make([]byte, 4) {binary}.LittleEndian.PutUint32(b, {math}.Float32bits(v)) {slog}.Debug("writing f32") - _, err := w.Write(b) - return nil, err + _, err = w.Write(b) + return err }}({name}, {writer})"#, - binary = self.deps.binary(), - math = self.deps.math(), - slog = self.deps.slog(), - wrpc = self.deps.wrpc(), - ), - Type::F64 => uwrite!( - self.src, - r#"func(v float64, w {wrpc}.ByteWriter) (func({wrpc}.IndexWriter) error, error) {{ + ); + } + + fn print_write_f64(&mut self, name: &str, writer: &str) { + let binary = self.deps.binary(); + let io = self.deps.io(); + let math = self.deps.math(); + let slog = self.deps.slog(); + uwrite!( + self.src, + r#"func(v float64, w {io}.Writer) (err error) {{ b := make([]byte, 8) {binary}.LittleEndian.PutUint64(b, {math}.Float64bits(v)) {slog}.Debug("writing f64") - _, err := w.Write(b) - return nil, err + _, err = w.Write(b) + return err }}({name}, {writer})"#, - binary = self.deps.binary(), - math = self.deps.math(), - slog = self.deps.slog(), - wrpc = self.deps.wrpc(), - ), - Type::Char => uwrite!( - self.src, - r#"func(v rune, w {wrpc}.ByteWriter) (func({wrpc}.IndexWriter) error, error) {{ + ); + } + + fn print_write_char(&mut self, name: &str, writer: &str) { + let binary = self.deps.binary(); + let io = self.deps.io(); + let slog = self.deps.slog(); + uwrite!( + self.src, + r#"func(v rune, w {io}.Writer) (err error) {{ b := make([]byte, {binary}.MaxVarintLen32) i := {binary}.PutUvarint(b, uint64(v)) {slog}.Debug("writing char") - _, err := w.Write(b[:i]) - return nil, err + _, err = w.Write(b[:i]) + return err }}({name}, {writer})"#, - binary = self.deps.binary(), - slog = self.deps.slog(), - wrpc = self.deps.wrpc(), - ), - Type::String => { - let binary = self.deps.binary(); - let fmt = self.deps.fmt(); - let math = self.deps.math(); - let slog = self.deps.slog(); - let wrpc = self.deps.wrpc(); + ); + } - uwrite!( - self.src, - r#"func(v string, w {wrpc}.ByteWriter) (func({wrpc}.IndexWriter) error, error) {{ + fn print_write_string(&mut self, name: &str, writer: &str) { + let binary = self.deps.binary(); + let io = self.deps.io(); + let fmt = self.deps.fmt(); + let math = self.deps.math(); + let slog = self.deps.slog(); + uwrite!( + self.src, + r#"func(v string, w {io}.Writer) (err error) {{ n := len(v) if n > {math}.MaxUint32 {{ - return nil, {fmt}.Errorf("string byte length of %d overflows a 32-bit integer", n) + return {fmt}.Errorf("string byte length of %d overflows a 32-bit integer", n) }} - if err := func(v int, w {wrpc}.ByteWriter) error {{ + if err = func(v int, w {io}.Writer) error {{ b := make([]byte, {binary}.MaxVarintLen32) i := {binary}.PutUvarint(b, uint64(v)) {slog}.Debug("writing string byte length", "len", n) - _, err := w.Write(b[:i]) + _, err = w.Write(b[:i]) return err }}(n, w); err != nil {{ - return nil, {fmt}.Errorf("failed to write string length of %d: %w", n, err) + return {fmt}.Errorf("failed to write string byte length of %d: %w", n, err) }} {slog}.Debug("writing string bytes") - _, err := w.Write([]byte(v)) + _, err = w.Write([]byte(v)) if err != nil {{ - return nil, {fmt}.Errorf("failed to write string bytes: %w", err) + return {fmt}.Errorf("failed to write string bytes: %w", err) }} - return nil, nil - }}({name}, {writer})"# - ); - } - } - } - - fn print_read_discriminant(&mut self, repr: Int, reader: &str) { - match repr { - Int::U8 => { - uwrite!( - self.src, - r#"func(r {wrpc}.ByteReader) (uint8, error) {{ - var x uint8 - var s uint - for i := 0; i < 2; i++ {{ - {slog}.Debug("reading u8 discriminant byte", "i", i) - b, err := r.ReadByte() - if err != nil {{ - if i > 0 && err == {io}.EOF {{ - err = {io}.ErrUnexpectedEOF - }} - return x, {fmt}.Errorf("failed to read u8 discriminant byte: %w", err) - }} - if b < 0x80 {{ - if i == 2 && b > 1 {{ - return x, {errors}.New("discriminant overflows a 8-bit integer") - }} - return x | uint8(b)< { - self.print_read_ty(&Type::U16, reader, ""); - } - Int::U32 => { - self.print_read_ty(&Type::U32, reader, ""); - } - Int::U64 => { - self.print_read_ty(&Type::U64, reader, ""); - } - } - } - - fn print_write_discriminant(&mut self, repr: Int, name: &str, writer: &str) { - match repr { - Int::U8 => uwrite!( - self.src, - r#"func(v uint8, w {wrpc}.ByteWriter) error {{ - b := make([]byte, 2) - i := {binary}.PutUvarint(b, uint64(v)) - {slog}.Debug("writing u8 discriminant") - _, err := w.Write(b[:i]) - return err - }}(uint8({name}), {writer})"#, - binary = self.deps.binary(), - slog = self.deps.slog(), - wrpc = self.deps.wrpc(), - ), - Int::U16 => uwrite!( - self.src, - r#"func(v uint16, w {wrpc}.ByteWriter) error {{ - b := make([]byte, {binary}.MaxVarintLen16) - i := {binary}.PutUvarint(b, uint64(v)) - {slog}.Debug("writing u16 discriminant") - _, err := w.Write(b[:i]) - return err - }}(uint16({name}), {writer})"#, - binary = self.deps.binary(), - slog = self.deps.slog(), - wrpc = self.deps.wrpc(), - ), - Int::U32 => uwrite!( - self.src, - r#"func(v uint32, w {wrpc}.ByteWriter) (any, error) {{ - b := make([]byte, {binary}.MaxVarintLen32) - i := {binary}.PutUvarint(b, uint64(v)) - {slog}.Debug("writing u32 discriminant") - _, err := w.Write(b[:i]) - return err - }}(uint32({name}), {writer})"#, - binary = self.deps.binary(), - slog = self.deps.slog(), - wrpc = self.deps.wrpc(), - ), - Int::U64 => uwrite!( - self.src, - r#"func(v uint64, w {wrpc}.ByteWriter) (any, error) {{ - b := make([]byte, {binary}.MaxVarintLen64) - i := {binary}.PutUvarint(b, uint64(v)) - {slog}.Debug("writing u64 discriminant") - _, err := w.Write(b[:i]) - return err - }}(uint64({name}), {writer})"#, - binary = self.deps.binary(), - slog = self.deps.slog(), - wrpc = self.deps.wrpc(), - ), - } + return nil + }}({name}, {writer})"#, + ); } - fn print_write_tyid(&mut self, id: TypeId, name: &str, writer: &str) { - let ty = &self.resolve.types[id]; - if ty.name.is_some() { - // TODO: Support async - uwrite!(self.src, "({name}).WriteToIndex({writer})"); - return; - } - - match &ty.kind { - TypeDefKind::List(ty) => { - let binary = self.deps.binary(); - let errgroup = self.deps.errgroup(); - let fmt = self.deps.fmt(); - let math = self.deps.math(); - let slog = self.deps.slog(); - let wrpc = self.deps.wrpc(); + fn print_write_list(&mut self, ty: &Type, name: &str, writer: &str) { + let binary = self.deps.binary(); + let errgroup = self.deps.errgroup(); + let fmt = self.deps.fmt(); + let io = self.deps.io(); + let math = self.deps.math(); + let slog = self.deps.slog(); + let wrpc = self.deps.wrpc(); - self.push_str("func(v "); - self.print_list(ty); - uwrite!( - self.src, - r#", w {wrpc}.ByteWriter) (func({wrpc}.IndexWriter) error, error) {{ + self.push_str("func(v "); + self.print_list(ty); + uwrite!( + self.src, + r#", w {wrpc}.ByteWriter) (write func({wrpc}.IndexWriter) error, err error) {{ n := len(v) if n > {math}.MaxUint32 {{ return nil, {fmt}.Errorf("list length of %d overflows a 32-bit integer", n) }} - if err := func(v int, w {wrpc}.ByteWriter) error {{ + if err = func(v int, w {io}.Writer) error {{ b := make([]byte, {binary}.MaxVarintLen32) i := {binary}.PutUvarint(b, uint64(v)) {slog}.Debug("writing list length", "len", n) - _, err := w.Write(b[:i]) + _, err = w.Write(b[:i]) return err }}(n, w); err != nil {{ return nil, {fmt}.Errorf("failed to write list length of %d: %w", n, err) @@ -1024,11 +1171,11 @@ impl InterfaceGenerator<'_> { writes := make(map[uint32]func({wrpc}.IndexWriter) error, n) for i, e := range v {{ write, err := "# - ); - self.print_write_ty(ty, "e", "w"); - uwrite!( - self.src, - r#" + ); + self.print_write_ty(ty, "e", "w"); + uwrite!( + self.src, + r#" if err != nil {{ return nil, {fmt}.Errorf("failed to write list element %d: %w", i, err) }} @@ -1054,19 +1201,19 @@ impl InterfaceGenerator<'_> { }} return nil, nil }}({name}, {writer})"# - ); - } + ); + } - TypeDefKind::Option(ty) => { - let fmt = self.deps.fmt(); - let slog = self.deps.slog(); - let wrpc = self.deps.wrpc(); + fn print_write_option(&mut self, ty: &Type, name: &str, writer: &str) { + let fmt = self.deps.fmt(); + let slog = self.deps.slog(); + let wrpc = self.deps.wrpc(); - self.push_str("func(v "); - self.print_option(ty, true); - uwrite!( - self.src, - r#", w {wrpc}.ByteWriter) (func({wrpc}.IndexWriter) error, error) {{ + self.push_str("func(v "); + self.print_option(ty, true); + uwrite!( + self.src, + r#", w {wrpc}.ByteWriter) (func({wrpc}.IndexWriter) error, error) {{ if v == nil {{ {slog}.Debug("writing `option::none` status byte") if err := w.WriteByte(0); err != nil {{ @@ -1080,23 +1227,23 @@ impl InterfaceGenerator<'_> { }} {slog}.Debug("writing `option::some` payload") write, err := "# - ); + ); - let param = match ty { - Type::Id(id) => { - let ty = &self.resolve.types[*id]; - match &ty.kind { - TypeDefKind::Enum(..) => "*v", - TypeDefKind::List(..) => "v", - _ => "*v", - } - } + let param = match ty { + Type::Id(id) => { + let ty = &self.resolve.types[*id]; + match &ty.kind { + TypeDefKind::Enum(..) => "*v", + TypeDefKind::List(..) => "v", _ => "*v", - }; - self.print_write_ty(ty, param, "w"); - uwrite!( - self.src, - r#" + } + } + _ => "*v", + }; + self.print_write_ty(ty, param, "w"); + uwrite!( + self.src, + r#" if err != nil {{ return nil, {fmt}.Errorf("failed to write `option::some` payload: %w", err) }} @@ -1111,45 +1258,45 @@ impl InterfaceGenerator<'_> { }} return nil, nil }}({name}, {writer})"# - ); - } + ); + } - TypeDefKind::Result(ty) => { - let errors = self.deps.errors(); - let fmt = self.deps.fmt(); - let slog = self.deps.slog(); - let wrpc = self.deps.wrpc(); + fn print_write_result(&mut self, ty: &Result_, name: &str, writer: &str) { + let errors = self.deps.errors(); + let fmt = self.deps.fmt(); + let slog = self.deps.slog(); + let wrpc = self.deps.wrpc(); - self.push_str("func(v *"); - self.print_result(ty); - uwriteln!( - self.src, - r#", w {wrpc}.ByteWriter) (func({wrpc}.IndexWriter) error, error) {{ + self.push_str("func(v *"); + self.print_result(ty); + uwriteln!( + self.src, + r#", w {wrpc}.ByteWriter) (func({wrpc}.IndexWriter) error, error) {{ switch {{ case v.Ok == nil && v.Err == nil: return nil, {errors}.New("both result variants cannot be nil") case v.Ok != nil && v.Err != nil: return nil, {errors}.New("exactly one result variant must non-nil")"# - ); - uwriteln!( - self.src, - r#" + ); + uwriteln!( + self.src, + r#" case v.Ok != nil: {slog}.Debug("writing `result::ok` status byte") if err := w.WriteByte(0); err != nil {{ return nil, {fmt}.Errorf("failed to write `result::ok` status byte: %w", err) }}"# - ); - if let Some(ref ty) = ty.ok { - uwrite!( - self.src, - r#"{slog}.Debug("writing `result::ok` payload") + ); + if let Some(ref ty) = ty.ok { + uwrite!( + self.src, + r#"{slog}.Debug("writing `result::ok` payload") write, err := "# - ); - self.print_write_ty(ty, "*v.Ok", "w"); - uwriteln!( - self.src, - r#" + ); + self.print_write_ty(ty, "*v.Ok", "w"); + uwriteln!( + self.src, + r#" if err != nil {{ return nil, {fmt}.Errorf("failed to write `result::ok` payload: %w", err) }} @@ -1162,27 +1309,27 @@ impl InterfaceGenerator<'_> { return write(w) }}, nil }}"# - ); - } - uwriteln!( - self.src, - r#"return nil, nil + ); + } + uwriteln!( + self.src, + r#"return nil, nil default: {slog}.Debug("writing `result::err` status byte") if err := w.WriteByte(1); err != nil {{ return nil, {fmt}.Errorf("failed to write `result::err` status byte: %w", err) }}"# - ); - if let Some(ref ty) = ty.err { - uwrite!( - self.src, - r#"{slog}.Debug("writing `result::err` payload") + ); + if let Some(ref ty) = ty.err { + uwrite!( + self.src, + r#"{slog}.Debug("writing `result::err` payload") write, err := "# - ); - self.print_write_ty(ty, "*v.Err", "w"); - uwriteln!( - self.src, - r#" + ); + self.print_write_ty(ty, "*v.Err", "w"); + uwriteln!( + self.src, + r#" if err != nil {{ return nil, {fmt}.Errorf("failed to write `result::err` payload: %w", err) }} @@ -1195,56 +1342,55 @@ impl InterfaceGenerator<'_> { return write(w) }}, nil }}"# - ); - } - uwrite!( - self.src, - r#"return nil, nil + ); + } + uwrite!( + self.src, + r#"return nil, nil }} }}({name}, {writer})"# - ); - } - - TypeDefKind::Variant(_) => panic!("unsupported anonymous variant"), + ); + } - TypeDefKind::Tuple(ty) => match ty.types.as_slice() { - [] => self.push_str("error(nil), error(nil)"), - [ty] => self.print_write_ty(ty, name, writer), - _ => { - let fmt = self.deps.fmt(); - let errgroup = self.deps.errgroup(); - let wrpc = self.deps.wrpc(); + fn print_write_tuple(&mut self, ty: &Tuple, name: &str, writer: &str) { + match ty.types.as_slice() { + [] => self.push_str("error(nil), error(nil)"), + [ty] => self.print_write_ty(ty, name, writer), + _ => { + let fmt = self.deps.fmt(); + let errgroup = self.deps.errgroup(); + let wrpc = self.deps.wrpc(); - self.push_str("func(v "); - self.print_tuple(ty, true); - uwriteln!( - self.src, - r", w {wrpc}.ByteWriter) (func({wrpc}.IndexWriter) error, error) {{ + self.push_str("func(v "); + self.print_tuple(ty, true); + uwriteln!( + self.src, + r", w {wrpc}.ByteWriter) (func({wrpc}.IndexWriter) error, error) {{ writes := make(map[uint32]func({wrpc}.IndexWriter) error, {})", - ty.types.len(), - ); - for (i, ty) in ty.types.iter().enumerate() { - let slog = self.deps.slog(); - uwrite!( - self.src, - r#"{slog}.Debug("writing tuple element {i}") + ty.types.len(), + ); + for (i, ty) in ty.types.iter().enumerate() { + let slog = self.deps.slog(); + uwrite!( + self.src, + r#"{slog}.Debug("writing tuple element {i}") write{i}, err := "# - ); - self.print_write_ty(ty, &format!("v.V{i}"), "w"); - uwriteln!( - self.src, - r#" + ); + self.print_write_ty(ty, &format!("v.V{i}"), "w"); + uwriteln!( + self.src, + r#" if err != nil {{ return nil, {fmt}.Errorf("failed to write tuple element {i}: %w", err) }} if write{i} != nil {{ writes[{i}] = write{i} }}"# - ); - } - uwrite!( - self.src, - r#"if len(writes) > 0 {{ + ); + } + uwrite!( + self.src, + r#"if len(writes) > 0 {{ return func(w {wrpc}.IndexWriter) error {{ var wg {errgroup}.Group for index, write := range writes {{ @@ -1262,32 +1408,24 @@ impl InterfaceGenerator<'_> { }} return nil, nil }}({name}, {writer})"# - ); - } - }, - TypeDefKind::Resource => { - panic!("unsupported anonymous type reference: resource") - } - TypeDefKind::Record(_) => { - panic!("unsupported anonymous type reference: record") - } - TypeDefKind::Flags(_) => { - panic!("unsupported anonymous type reference: flags") - } - TypeDefKind::Enum(_) => { - panic!("unsupported anonymous type reference: enum") + ); } - TypeDefKind::Future(_ty) => uwrite!( - self.src, - r#"0, {errors}.New("writing futures not supported yet")"#, - errors = self.deps.errors(), - ), - TypeDefKind::Stream(ty) => match ty.element { - Some(Type::U8) => { - uwrite!( - self.src, - r#"func(v {wrpc}.ReadyReader, w {wrpc}.ByteWriter) (write func({wrpc}.IndexWriter) error, err error) {{ - if v.Ready() {{ + } + } + + fn print_write_stream(&mut self, Stream { element, .. }: &Stream, name: &str, writer: &str) { + match element { + Some(Type::U8) => { + let bytes = self.deps.bytes(); + let fmt = self.deps.fmt(); + let io = self.deps.io(); + let math = self.deps.math(); + let slog = self.deps.slog(); + let wrpc = self.deps.wrpc(); + uwrite!( + self.src, + r#"func(v {wrpc}.ReadCompleter, w {wrpc}.ByteWriter) (write func({wrpc}.IndexWriter) error, err error) {{ + if v.IsComplete() {{ defer func() {{ body, ok := v.({io}.Closer) if ok {{ @@ -1301,12 +1439,13 @@ impl InterfaceGenerator<'_> { }} }}() {slog}.Debug("writing byte stream `stream::ready` status byte") - if err := w.WriteByte(1); err != nil {{ + if err = w.WriteByte(1); err != nil {{ return nil, {fmt}.Errorf("failed to write `stream::ready` byte: %w", err) }} {slog}.Debug("reading ready byte stream contents") var buf {bytes}.Buffer - n, err := {io}.Copy(&buf, v) + var n int64 + n, err = {io}.Copy(&buf, v) if err != nil {{ return nil, {fmt}.Errorf("failed to read ready byte stream contents: %w", err) }} @@ -1317,7 +1456,7 @@ impl InterfaceGenerator<'_> { return nil, nil }} else {{ {slog}.Debug("writing byte stream `stream::pending` status byte") - if err := w.WriteByte(0); err != nil {{ + if err = w.WriteByte(0); err != nil {{ return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) }} return func(w {wrpc}.IndexWriter) (err error) {{ @@ -1365,21 +1504,229 @@ impl InterfaceGenerator<'_> { }}, nil }} }}({name}, {writer})"#, - bytes = self.deps.bytes(), - fmt = self.deps.fmt(), - io = self.deps.io(), - math = self.deps.math(), - slog = self.deps.slog(), - wrpc = self.deps.wrpc(), - ); - } - _ => uwrite!( + ); + } + Some(ty) => { + let errgroup = self.deps.errgroup(); + let errors = self.deps.errors(); + let fmt = self.deps.fmt(); + let io = self.deps.io(); + let math = self.deps.math(); + let slog = self.deps.slog(); + let wrpc = self.deps.wrpc(); + uwrite!(self.src, "func(v {wrpc}.ReceiveCompleter[",); + self.print_list(ty); + uwrite!( self.src, - r#"0, {errors}.New("writing non-byte streams not supported yet")"#, - errors = self.deps.errors(), - ), - }, + r#"], w {wrpc}.ByteWriter) (write func({wrpc}.IndexWriter) error, err error) {{ + if v.IsComplete() {{ + defer func() {{ + body, ok := v.({io}.Closer) + if ok {{ + if cErr := body.Close(); cErr != nil {{ + if err == nil {{ + err = {fmt}.Errorf("failed to close ready stream: %w", cErr) + }} else {{ + slog.Warn("failed to close ready stream", "err", cErr) + }} + }} + }} + }}() + {slog}.Debug("writing stream `stream::ready` status byte") + if err = w.WriteByte(1); err != nil {{ + return nil, {fmt}.Errorf("failed to write `stream::ready` byte: %w", err) + }} + {slog}.Debug("receiving ready stream contents") + vs, err := v.Receive() + if err != nil && err != {io}.EOF {{ + return nil, {fmt}.Errorf("failed to receive ready stream contents: %w", err) + }} + if err != {io}.EOF && len(vs) > 0 {{ + for {{ + chunk, err := v.Receive() + if err != nil && err != {io}.EOF {{ + return nil, {fmt}.Errorf("failed to receive ready stream contents: %w", err) + }} + if len(chunk) > 0 {{ + vs = append(vs, chunk...) + }} + if err == {io}.EOF {{ + break + }} + }} + }} + {slog}.Debug("writing ready stream contents", "len", len(vs)) + write, err := "#, + ); + self.print_write_list(ty, "vs", "w"); + uwrite!( + self.src, + r#" + if err != nil {{ + return nil, {fmt}.Errorf("failed to write ready stream contents: %w", err) + }} + return write, nil + }} else {{ + {slog}.Debug("writing stream `stream::pending` status byte") + if err := w.WriteByte(0); err != nil {{ + return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) + }} + return func(w {wrpc}.IndexWriter) (err error) {{ + defer func() {{ + body, ok := v.({io}.Closer) + if ok {{ + if cErr := body.Close(); cErr != nil {{ + if err == nil {{ + err = {fmt}.Errorf("failed to close pending stream: %w", cErr) + }} else {{ + {slog}.Warn("failed to close pending stream", "err", cErr) + }} + }} + }} + }}() + var wg {errgroup}.Group + var total uint32 + for {{ + var end bool + {slog}.Debug("receiving outgoing pending stream contents") + chunk, err := v.Receive() + n := len(chunk) + if n == 0 || err == {io}.EOF {{ + end = true + {slog}.Debug("outgoing pending stream reached EOF") + }} else if err != nil {{ + return {fmt}.Errorf("failed to receive outgoing pending stream chunk: %w", err) + }} + if n > {math}.MaxUint32 {{ + return {fmt}.Errorf("outgoing pending stream chunk length of %d overflows a 32-bit integer", n) + }} + if {math}.MaxUint32 - uint32(n) < total {{ + return {errors}.New("total outgoing pending stream element count would overflow a 32-bit unsigned integer") + }} + {slog}.Debug("writing pending stream chunk length", "len", n) + if err = {wrpc}.WriteUint32(uint32(n), w); err != nil {{ + return {fmt}.Errorf("failed to write pending stream chunk length of %d: %w", n, err) + }} + for _, v := range chunk {{ + {slog}.Debug("writing pending stream element", "i", total) + write, err :="#, + ); + self.print_write_ty(ty, "v", "w"); + uwrite!( + self.src, + r#" + if err != nil {{ + return {fmt}.Errorf("failed to write pending stream chunk element %d: %w", total, err) + }} + if write != nil {{ + w, err := w.Index(total) + if err != nil {{ + return {fmt}.Errorf("failed to index writer: %w", err) + }} + wg.Go(func() error {{ + return write(w) + }}) + }} + total++ + }} + if end {{ + if err := w.WriteByte(0); err != nil {{ + return {fmt}.Errorf("failed to write pending stream end byte: %w", err) + }} + return wg.Wait() + }} + }} + }}, nil + }} + }}({name}, {writer})"#, + ); + } + None => panic!("streams with no element types are not supported"), + } + } + + fn print_write_ty(&mut self, ty: &Type, name: &str, writer: &str) { + match ty { + Type::Id(t) => self.print_write_tyid(*t, name, writer), + Type::Bool => { + let wrpc = self.deps.wrpc(); + uwrite!(self.src, "(func({wrpc}.IndexWriter) error)(nil), "); + self.print_write_bool(name, writer); + } + Type::U8 => { + let wrpc = self.deps.wrpc(); + uwrite!(self.src, "(func({wrpc}.IndexWriter) error)(nil), "); + self.print_write_u8(name, writer); + } + Type::U16 => { + let wrpc = self.deps.wrpc(); + uwrite!(self.src, "(func({wrpc}.IndexWriter) error)(nil), "); + self.print_write_u16(name, writer); + } + Type::U32 => { + let wrpc = self.deps.wrpc(); + uwrite!(self.src, "(func({wrpc}.IndexWriter) error)(nil), "); + self.print_write_u32(name, writer); + } + Type::U64 => { + let wrpc = self.deps.wrpc(); + uwrite!(self.src, "(func({wrpc}.IndexWriter) error)(nil), "); + self.print_write_u64(name, writer); + } + Type::S8 => { + let wrpc = self.deps.wrpc(); + uwrite!(self.src, "(func({wrpc}.IndexWriter) error)(nil), "); + self.print_write_s8(name, writer); + } + Type::S16 => { + let wrpc = self.deps.wrpc(); + uwrite!(self.src, "(func({wrpc}.IndexWriter) error)(nil), "); + self.print_write_s16(name, writer); + } + Type::S32 => { + let wrpc = self.deps.wrpc(); + uwrite!(self.src, "(func({wrpc}.IndexWriter) error)(nil), "); + self.print_write_s32(name, writer); + } + Type::S64 => { + let wrpc = self.deps.wrpc(); + uwrite!(self.src, "(func({wrpc}.IndexWriter) error)(nil), "); + self.print_write_s64(name, writer); + } + Type::F32 => { + let wrpc = self.deps.wrpc(); + uwrite!(self.src, "(func({wrpc}.IndexWriter) error)(nil), "); + self.print_write_f32(name, writer); + } + Type::F64 => { + let wrpc = self.deps.wrpc(); + uwrite!(self.src, "(func({wrpc}.IndexWriter) error)(nil), "); + self.print_write_f64(name, writer); + } + Type::Char => { + let wrpc = self.deps.wrpc(); + uwrite!(self.src, "(func({wrpc}.IndexWriter) error)(nil), "); + self.print_write_char(name, writer); + } + Type::String => { + let wrpc = self.deps.wrpc(); + uwrite!(self.src, "(func({wrpc}.IndexWriter) error)(nil), "); + self.print_write_string(name, writer); + } + } + } + + fn print_write_tyid(&mut self, id: TypeId, name: &str, writer: &str) { + let ty = &self.resolve.types[id]; + if ty.name.is_some() { + // TODO: Support async + uwrite!(self.src, "({name}).WriteToIndex({writer})"); + return; + } + match &ty.kind { + TypeDefKind::Record(_) => panic!("unsupported anonymous type reference: record"), + TypeDefKind::Resource => panic!("unsupported anonymous type reference: resource"), TypeDefKind::Handle(Handle::Own(_ty)) => uwrite!( self.src, r#"0, {errors}.New("writing owned handles not supported yet")"#, @@ -1391,13 +1738,122 @@ impl InterfaceGenerator<'_> { r#"0, {errors}.New("writing borrowed handles not supported yet")"#, errors = self.deps.errors(), ), - + TypeDefKind::Flags(_) => panic!("unsupported anonymous type reference: flags"), + TypeDefKind::Tuple(ty) => self.print_write_tuple(ty, name, writer), + TypeDefKind::Variant(_) => panic!("unsupported anonymous variant"), + TypeDefKind::Enum(_) => panic!("unsupported anonymous type reference: enum"), + TypeDefKind::Option(ty) => self.print_write_option(ty, name, writer), + TypeDefKind::Result(ty) => self.print_write_result(ty, name, writer), + TypeDefKind::List(ty) => self.print_write_list(ty, name, writer), + TypeDefKind::Future(_ty) => uwrite!( + self.src, + r#"0, {errors}.New("writing futures not supported yet")"#, + errors = self.deps.errors(), + ), + TypeDefKind::Stream(ty) => self.print_write_stream(ty, name, writer), TypeDefKind::Type(ty) => self.print_write_ty(ty, name, writer), - TypeDefKind::Unknown => unreachable!(), } } + fn print_read_discriminant(&mut self, repr: Int, reader: &str) { + match repr { + Int::U8 => { + uwrite!( + self.src, + r#"func(r {wrpc}.ByteReader) (uint8, error) {{ + var x uint8 + var s uint + for i := 0; i < 2; i++ {{ + {slog}.Debug("reading u8 discriminant byte", "i", i) + b, err := r.ReadByte() + if err != nil {{ + if i > 0 && err == {io}.EOF {{ + err = {io}.ErrUnexpectedEOF + }} + return x, {fmt}.Errorf("failed to read u8 discriminant byte: %w", err) + }} + if b < 0x80 {{ + if i == 2 && b > 1 {{ + return x, {errors}.New("discriminant overflows a 8-bit integer") + }} + return x | uint8(b)< self.print_read_u16(reader), + Int::U32 => self.print_read_u32(reader), + Int::U64 => self.print_read_u64(reader), + } + } + + fn print_write_discriminant(&mut self, repr: Int, name: &str, writer: &str) { + match repr { + Int::U8 => uwrite!( + self.src, + r#"func(v uint8, w {wrpc}.ByteWriter) error {{ + b := make([]byte, 2) + i := {binary}.PutUvarint(b, uint64(v)) + {slog}.Debug("writing u8 discriminant") + _, err := w.Write(b[:i]) + return err + }}(uint8({name}), {writer})"#, + binary = self.deps.binary(), + slog = self.deps.slog(), + wrpc = self.deps.wrpc(), + ), + Int::U16 => uwrite!( + self.src, + r#"func(v uint16, w {wrpc}.ByteWriter) error {{ + b := make([]byte, {binary}.MaxVarintLen16) + i := {binary}.PutUvarint(b, uint64(v)) + {slog}.Debug("writing u16 discriminant") + _, err := w.Write(b[:i]) + return err + }}(uint16({name}), {writer})"#, + binary = self.deps.binary(), + slog = self.deps.slog(), + wrpc = self.deps.wrpc(), + ), + Int::U32 => uwrite!( + self.src, + r#"func(v uint32, w {wrpc}.ByteWriter) (any, error) {{ + b := make([]byte, {binary}.MaxVarintLen32) + i := {binary}.PutUvarint(b, uint64(v)) + {slog}.Debug("writing u32 discriminant") + _, err := w.Write(b[:i]) + return err + }}(uint32({name}), {writer})"#, + binary = self.deps.binary(), + slog = self.deps.slog(), + wrpc = self.deps.wrpc(), + ), + Int::U64 => uwrite!( + self.src, + r#"func(v uint64, w {wrpc}.ByteWriter) (any, error) {{ + b := make([]byte, {binary}.MaxVarintLen64) + i := {binary}.PutUvarint(b, uint64(v)) + {slog}.Debug("writing u64 discriminant") + _, err := w.Write(b[:i]) + return err + }}(uint64({name}), {writer})"#, + binary = self.deps.binary(), + slog = self.deps.slog(), + wrpc = self.deps.wrpc(), + ), + } + } + fn async_paths_ty(&mut self, ty: &Type) -> (Vec>>, bool) { if let Type::Id(ty) = ty { self.async_paths_tyid(*ty) @@ -1413,10 +1869,10 @@ impl InterfaceGenerator<'_> { let (nested, fut) = self.async_paths_ty(ty); for mut path in nested { path.push_front(None); - paths.push(path) + paths.push(path); } if fut { - paths.push(vec![None].into()) + paths.push(vec![None].into()); } (paths, false) } @@ -1425,10 +1881,10 @@ impl InterfaceGenerator<'_> { let (nested, fut) = self.async_paths_ty(ty); for mut path in nested { path.push_front(Some(1)); - paths.push(path) + paths.push(path); } if fut { - paths.push(vec![Some(1)].into()) + paths.push(vec![Some(1)].into()); } (paths, false) } @@ -1438,20 +1894,20 @@ impl InterfaceGenerator<'_> { let (nested, fut) = self.async_paths_ty(ty); for mut path in nested { path.push_front(Some(0)); - paths.push(path) + paths.push(path); } if fut { - paths.push(vec![Some(0)].into()) + paths.push(vec![Some(0)].into()); } } if let Some(ty) = ty.err.as_ref() { let (nested, fut) = self.async_paths_ty(ty); for mut path in nested { path.push_front(Some(1)); - paths.push(path) + paths.push(path); } if fut { - paths.push(vec![Some(1)].into()) + paths.push(vec![Some(1)].into()); } } (paths, false) @@ -1463,10 +1919,10 @@ impl InterfaceGenerator<'_> { let (nested, fut) = self.async_paths_ty(ty); for mut path in nested { path.push_front(Some(i.try_into().unwrap())); - paths.push(path) + paths.push(path); } if fut { - paths.push(vec![Some(i.try_into().unwrap())].into()) + paths.push(vec![Some(i.try_into().unwrap())].into()); } } } @@ -1478,10 +1934,10 @@ impl InterfaceGenerator<'_> { let (nested, fut) = self.async_paths_ty(ty); for mut path in nested { path.push_front(Some(i.try_into().unwrap())); - paths.push(path) + paths.push(path); } if fut { - paths.push(vec![Some(i.try_into().unwrap())].into()) + paths.push(vec![Some(i.try_into().unwrap())].into()); } } (paths, false) @@ -1492,10 +1948,10 @@ impl InterfaceGenerator<'_> { let (nested, fut) = self.async_paths_ty(ty); for mut path in nested { path.push_front(Some(i.try_into().unwrap())); - paths.push(path) + paths.push(path); } if fut { - paths.push(vec![Some(i.try_into().unwrap())].into()) + paths.push(vec![Some(i.try_into().unwrap())].into()); } } (paths, false) @@ -1506,10 +1962,10 @@ impl InterfaceGenerator<'_> { let (nested, fut) = self.async_paths_ty(ty); for mut path in nested { path.push_front(Some(0)); - paths.push(path) + paths.push(path); } if fut { - paths.push(vec![Some(0)].into()) + paths.push(vec![Some(0)].into()); } } (paths, true) @@ -1520,10 +1976,10 @@ impl InterfaceGenerator<'_> { let (nested, fut) = self.async_paths_ty(ty); for mut path in nested { path.push_front(None); - paths.push(path) + paths.push(path); } if fut { - paths.push(vec![None].into()) + paths.push(vec![None].into()); } } (paths, true) @@ -1786,15 +2242,15 @@ impl InterfaceGenerator<'_> { uwrite!(self.src, "{i})"); for p in path { if let Some(p) = p { - uwrite!(self.src, ".Index({p})") + uwrite!(self.src, ".Index({p})"); } else { - self.push_str(".Wildcard()") + self.push_str(".Wildcard()"); } } - self.push_str(", ") + self.push_str(", "); } if fut { - uwrite!(self.src, "{wrpc}.NewSubscribePath().Index({i})"); + uwrite!(self.src, "{wrpc}.NewSubscribePath().Index({i}), "); } } uwriteln!( @@ -1991,15 +2447,15 @@ impl InterfaceGenerator<'_> { uwrite!(self.src, "{i})"); for p in path { if let Some(p) = p { - uwrite!(self.src, ".Index({p})") + uwrite!(self.src, ".Index({p})"); } else { - self.push_str(".Wildcard()") + self.push_str(".Wildcard()"); } } - self.push_str(", ") + self.push_str(", "); } if fut { - uwrite!(self.src, "{wrpc}.NewSubscribePath().Index({i})"); + uwrite!(self.src, "{wrpc}.NewSubscribePath().Index({i}), "); } } self.src.push_str("); err__ != nil {\n"); @@ -2265,8 +2721,9 @@ impl InterfaceGenerator<'_> { fn print_future(&mut self, ty: &Option) { let wrpc = self.deps.wrpc(); self.push_str(wrpc); - self.push_str(".ReadyReceiver["); - self.print_optional_ty(ty.as_ref()); + self.push_str(".ReceiveCompleter["); + let ty = ty.expect("futures with not element types are not supported"); + self.print_opt_ty(&ty, true); self.push_str("]"); } @@ -2275,13 +2732,16 @@ impl InterfaceGenerator<'_> { self.push_str(wrpc); match ty.element { Some(Type::U8) => { - self.push_str(".ReadyReader"); + self.push_str(".ReadCompleter"); } - _ => { - self.push_str(".ReadyReceiver[[]"); - self.print_optional_ty(ty.element.as_ref()); + Some(ty) => { + self.push_str(".ReceiveCompleter["); + self.print_list(&ty); self.push_str("]"); } + None => { + panic!("streams with no element types are not supported") + } } } @@ -2795,7 +3255,7 @@ func (v *{name}) WriteToIndex(w {wrpc}.ByteWriter) (func({wrpc}.IndexWriter) err return write(w) }}, nil }} "# - ) + ); } } self.push_str("default: return nil, "); diff --git a/crates/wit-bindgen-rust/tests/codegen_no_std.rs b/crates/wit-bindgen-rust/tests/codegen_no_std.rs index 368ee9ff..d04feb6a 100644 --- a/crates/wit-bindgen-rust/tests/codegen_no_std.rs +++ b/crates/wit-bindgen-rust/tests/codegen_no_std.rs @@ -1,4 +1,4 @@ -//! Like `codegen_tests` in codegen.rs, but with no_std. +//! Like `codegen_tests` in codegen.rs, but with `no_std`. //! //! We use `std_feature` and don't enable the "std" feature. diff --git a/examples/go/hello-client/bindings/wrpc_examples/hello/handler/bindings.wrpc.go b/examples/go/hello-client/bindings/wrpc_examples/hello/handler/bindings.wrpc.go index 0e5b88ea..0de7d003 100644 --- a/examples/go/hello-client/bindings/wrpc_examples/hello/handler/bindings.wrpc.go +++ b/examples/go/hello-client/bindings/wrpc_examples/hello/handler/bindings.wrpc.go @@ -18,7 +18,10 @@ func Hello(ctx__ context.Context, wrpc__ wrpc.Client) (r0__ string, close__ func if err__ != nil { return fmt.Errorf("failed to write empty parameters: %w", err__) } - r0__, err__ = func(r wrpc.ByteReader) (string, error) { + r0__, err__ = func(r interface { + io.ByteReader + io.Reader + }) (string, error) { var x uint32 var s uint for i := 0; i < 5; i++ { diff --git a/examples/go/hello-client/cmd/hello-client-nats/main.go b/examples/go/hello-client/cmd/hello-client-nats/main.go index 782d2493..43a7cc07 100644 --- a/examples/go/hello-client/cmd/hello-client-nats/main.go +++ b/examples/go/hello-client/cmd/hello-client-nats/main.go @@ -30,11 +30,14 @@ func run() (err error) { for _, prefix := range os.Args[1:] { wrpc := wrpcnats.NewClient(nc, prefix) - greeting, err := handler.Hello(context.Background(), wrpc) + greeting, cleanup, err := handler.Hello(context.Background(), wrpc) if err != nil { return fmt.Errorf("failed to call `wrpc-examples:hello/handler.hello`: %w", err) } fmt.Printf("%s: %s\n", prefix, greeting) + if err := cleanup(); err != nil { + return fmt.Errorf("failed to shutdown `wrpc-examples:hello/handler.hello` invocation: %w", err) + } } return nil } diff --git a/examples/go/hello-server/bindings/exports/wrpc_examples/hello/handler/bindings.wrpc.go b/examples/go/hello-server/bindings/exports/wrpc_examples/hello/handler/bindings.wrpc.go index 55dd4bb8..7cbcc13c 100644 --- a/examples/go/hello-server/bindings/exports/wrpc_examples/hello/handler/bindings.wrpc.go +++ b/examples/go/hello-server/bindings/exports/wrpc_examples/hello/handler/bindings.wrpc.go @@ -8,6 +8,7 @@ import ( fmt "fmt" wrpc "github.com/wrpc/wrpc/go" errgroup "golang.org/x/sync/errgroup" + io "io" slog "log/slog" math "math" ) @@ -35,26 +36,26 @@ func ServeInterface(c wrpc.Client, h Handler) (stop func() error, err error) { var buf bytes.Buffer writes := make(map[uint32]func(wrpc.IndexWriter) error, 1) - write0, err := func(v string, w wrpc.ByteWriter) (func(wrpc.IndexWriter) error, error) { + write0, err := (func(wrpc.IndexWriter) error)(nil), func(v string, w io.Writer) (err error) { n := len(v) if n > math.MaxUint32 { - return nil, fmt.Errorf("string byte length of %d overflows a 32-bit integer", n) + return fmt.Errorf("string byte length of %d overflows a 32-bit integer", n) } - if err := func(v int, w wrpc.ByteWriter) error { + if err = func(v int, w io.Writer) error { b := make([]byte, binary.MaxVarintLen32) i := binary.PutUvarint(b, uint64(v)) slog.Debug("writing string byte length", "len", n) - _, err := w.Write(b[:i]) + _, err = w.Write(b[:i]) return err }(n, w); err != nil { - return nil, fmt.Errorf("failed to write string length of %d: %w", n, err) + return fmt.Errorf("failed to write string byte length of %d: %w", n, err) } slog.Debug("writing string bytes") - _, err := w.Write([]byte(v)) + _, err = w.Write([]byte(v)) if err != nil { - return nil, fmt.Errorf("failed to write string bytes: %w", err) + return fmt.Errorf("failed to write string bytes: %w", err) } - return nil, nil + return nil }(r0, &buf) if err != nil { return fmt.Errorf("failed to write result value 0: %w", err) diff --git a/examples/go/hello-server/go.mod b/examples/go/hello-server/go.mod index 8ccc633f..52b24a39 100644 --- a/examples/go/hello-server/go.mod +++ b/examples/go/hello-server/go.mod @@ -5,6 +5,7 @@ go 1.22.2 require ( github.com/nats-io/nats.go v1.34.1 github.com/wrpc/wrpc/go v0.0.0-unpublished + golang.org/x/sync v0.7.0 ) require ( diff --git a/examples/go/hello-server/go.sum b/examples/go/hello-server/go.sum index e81e64a2..cf9c014f 100644 --- a/examples/go/hello-server/go.sum +++ b/examples/go/hello-server/go.sum @@ -8,5 +8,7 @@ github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30= golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= diff --git a/examples/go/http-outgoing-nats-server/main.go b/examples/go/http-outgoing-nats-server/main.go index 6d0c72da..aadf291b 100644 --- a/examples/go/http-outgoing-nats-server/main.go +++ b/examples/go/http-outgoing-nats-server/main.go @@ -79,12 +79,12 @@ type trailerReceiver <-chan []*wrpc.Tuple2[string, [][]byte] func (r trailerReceiver) Receive() ([]*wrpc.Tuple2[string, [][]byte], error) { trailers, ok := <-r if !ok { - return nil, errors.New("trailer receiver channel closed") + return nil, io.EOF } return trailers, nil } -func (r trailerReceiver) Ready() bool { +func (r trailerReceiver) IsComplete() bool { return false } diff --git a/examples/go/keyvalue-server/bindings/exports/wrpc/keyvalue/store/bindings.wrpc.go b/examples/go/keyvalue-server/bindings/exports/wrpc/keyvalue/store/bindings.wrpc.go index 59953b61..c13fcd70 100644 --- a/examples/go/keyvalue-server/bindings/exports/wrpc/keyvalue/store/bindings.wrpc.go +++ b/examples/go/keyvalue-server/bindings/exports/wrpc/keyvalue/store/bindings.wrpc.go @@ -111,26 +111,26 @@ func (v *Error) WriteToIndex(w wrpc.ByteWriter) (func(wrpc.IndexWriter) error, e if !ok { return nil, errors.New("invalid payload") } - write, err := func(v string, w wrpc.ByteWriter) (func(wrpc.IndexWriter) error, error) { + write, err := (func(wrpc.IndexWriter) error)(nil), func(v string, w io.Writer) (err error) { n := len(v) if n > math.MaxUint32 { - return nil, fmt.Errorf("string byte length of %d overflows a 32-bit integer", n) + return fmt.Errorf("string byte length of %d overflows a 32-bit integer", n) } - if err := func(v int, w wrpc.ByteWriter) error { + if err = func(v int, w io.Writer) error { b := make([]byte, binary.MaxVarintLen32) i := binary.PutUvarint(b, uint64(v)) slog.Debug("writing string byte length", "len", n) - _, err := w.Write(b[:i]) + _, err = w.Write(b[:i]) return err }(n, w); err != nil { - return nil, fmt.Errorf("failed to write string length of %d: %w", n, err) + return fmt.Errorf("failed to write string byte length of %d: %w", n, err) } slog.Debug("writing string bytes") - _, err := w.Write([]byte(v)) + _, err = w.Write([]byte(v)) if err != nil { - return nil, fmt.Errorf("failed to write string bytes: %w", err) + return fmt.Errorf("failed to write string bytes: %w", err) } - return nil, nil + return nil }(payload, w) if err != nil { return nil, fmt.Errorf("failed to write payload: %w", err) @@ -183,7 +183,10 @@ func ReadError(r wrpc.ByteReader) (*Error, error) { case ErrorDiscriminant_AccessDenied: return NewError_AccessDenied(), nil case ErrorDiscriminant_Other: - payload, err := func(r wrpc.ByteReader) (string, error) { + payload, err := func(r interface { + io.ByteReader + io.Reader + }) (string, error) { var x uint32 var s uint for i := 0; i < 5; i++ { @@ -239,16 +242,16 @@ func (v *KeyResponse) String() string { return "KeyResponse" } func (v *KeyResponse) WriteToIndex(w wrpc.ByteWriter) (func(wrpc.IndexWriter) error, error) { writes := make(map[uint32]func(wrpc.IndexWriter) error, 2) slog.Debug("writing field", "name", "keys") - write0, err := func(v []string, w wrpc.ByteWriter) (func(wrpc.IndexWriter) error, error) { + write0, err := func(v []string, w wrpc.ByteWriter) (write func(wrpc.IndexWriter) error, err error) { n := len(v) if n > math.MaxUint32 { return nil, fmt.Errorf("list length of %d overflows a 32-bit integer", n) } - if err := func(v int, w wrpc.ByteWriter) error { + if err = func(v int, w io.Writer) error { b := make([]byte, binary.MaxVarintLen32) i := binary.PutUvarint(b, uint64(v)) slog.Debug("writing list length", "len", n) - _, err := w.Write(b[:i]) + _, err = w.Write(b[:i]) return err }(n, w); err != nil { return nil, fmt.Errorf("failed to write list length of %d: %w", n, err) @@ -256,26 +259,26 @@ func (v *KeyResponse) WriteToIndex(w wrpc.ByteWriter) (func(wrpc.IndexWriter) er slog.Debug("writing list elements") writes := make(map[uint32]func(wrpc.IndexWriter) error, n) for i, e := range v { - write, err := func(v string, w wrpc.ByteWriter) (func(wrpc.IndexWriter) error, error) { + write, err := (func(wrpc.IndexWriter) error)(nil), func(v string, w io.Writer) (err error) { n := len(v) if n > math.MaxUint32 { - return nil, fmt.Errorf("string byte length of %d overflows a 32-bit integer", n) + return fmt.Errorf("string byte length of %d overflows a 32-bit integer", n) } - if err := func(v int, w wrpc.ByteWriter) error { + if err = func(v int, w io.Writer) error { b := make([]byte, binary.MaxVarintLen32) i := binary.PutUvarint(b, uint64(v)) slog.Debug("writing string byte length", "len", n) - _, err := w.Write(b[:i]) + _, err = w.Write(b[:i]) return err }(n, w); err != nil { - return nil, fmt.Errorf("failed to write string length of %d: %w", n, err) + return fmt.Errorf("failed to write string byte length of %d: %w", n, err) } slog.Debug("writing string bytes") - _, err := w.Write([]byte(v)) + _, err = w.Write([]byte(v)) if err != nil { - return nil, fmt.Errorf("failed to write string bytes: %w", err) + return fmt.Errorf("failed to write string bytes: %w", err) } - return nil, nil + return nil }(e, w) if err != nil { return nil, fmt.Errorf("failed to write list element %d: %w", i, err) @@ -322,13 +325,10 @@ func (v *KeyResponse) WriteToIndex(w wrpc.ByteWriter) (func(wrpc.IndexWriter) er return nil, fmt.Errorf("failed to write `option::some` status byte: %w", err) } slog.Debug("writing `option::some` payload") - write, err := func(v uint64, w wrpc.ByteWriter) (func(wrpc.IndexWriter) error, error) { - b := make([]byte, binary.MaxVarintLen64) - i := binary.PutUvarint(b, uint64(v)) - slog.Debug("writing u64") - _, err := w.Write(b[:i]) - return nil, err - }(*v, w) + write, err := (func(wrpc.IndexWriter) error)(nil), func(v uint64, w interface { + io.ByteWriter + io.Writer + }) (err error) { b := make([]byte, binary.MaxVarintLen64); i := binary.PutUvarint(b, uint64(v)); slog.Debug("writing u64"); _, err = w.Write(b[:i]); return err }(*v, w) if err != nil { return nil, fmt.Errorf("failed to write `option::some` payload: %w", err) } @@ -372,7 +372,7 @@ func ReadKeyResponse(r wrpc.ByteReader) (*KeyResponse, error) { v := &KeyResponse{} var err error slog.Debug("reading field", "name", "keys") - v.Keys, err = func(r wrpc.ByteReader) ([]string, error) { + v.Keys, err = func(r wrpc.IndexReader) ([]string, error) { var x uint32 var s uint for i := 0; i < 5; i++ { @@ -392,7 +392,10 @@ func ReadKeyResponse(r wrpc.ByteReader) (*KeyResponse, error) { vs := make([]string, x) for i := range vs { slog.Debug("reading list element", "i", i) - vs[i], err = func(r wrpc.ByteReader) (string, error) { + vs[i], err = func(r interface { + io.ByteReader + io.Reader + }) (string, error) { var x uint32 var s uint for i := 0; i < 5; i++ { @@ -440,7 +443,7 @@ func ReadKeyResponse(r wrpc.ByteReader) (*KeyResponse, error) { return nil, fmt.Errorf("failed to read `keys` field: %w", err) } slog.Debug("reading field", "name", "cursor") - v.Cursor, err = func(r wrpc.ByteReader) (*uint64, error) { + v.Cursor, err = func(r wrpc.IndexReader) (*uint64, error) { slog.Debug("reading option status byte") status, err := r.ReadByte() if err != nil { @@ -451,7 +454,7 @@ func ReadKeyResponse(r wrpc.ByteReader) (*KeyResponse, error) { return nil, nil case 1: slog.Debug("reading `option::some` payload") - v, err := func(r wrpc.ByteReader) (uint64, error) { + v, err := func(r io.ByteReader) (uint64, error) { var x uint64 var s uint for i := 0; i < 10; i++ { @@ -567,7 +570,10 @@ func ServeInterface(c wrpc.Client, h Handler) (stop func() error, err error) { } stop0, err := c.Serve("wrpc:keyvalue/store@0.2.0-draft", "get", func(ctx context.Context, w wrpc.IndexWriter, r wrpc.IndexReadCloser) error { slog.DebugContext(ctx, "reading parameter", "i", 0) - p0, err := func(r wrpc.ByteReader) (string, error) { + p0, err := func(r interface { + io.ByteReader + io.Reader + }) (string, error) { var x uint32 var s uint for i := 0; i < 5; i++ { @@ -604,7 +610,10 @@ func ServeInterface(c wrpc.Client, h Handler) (stop func() error, err error) { return fmt.Errorf("failed to read parameter 0: %w", err) } slog.DebugContext(ctx, "reading parameter", "i", 1) - p1, err := func(r wrpc.ByteReader) (string, error) { + p1, err := func(r interface { + io.ByteReader + io.Reader + }) (string, error) { var x uint32 var s uint for i := 0; i < 5; i++ { @@ -674,16 +683,16 @@ func ServeInterface(c wrpc.Client, h Handler) (stop func() error, err error) { return nil, fmt.Errorf("failed to write `option::some` status byte: %w", err) } slog.Debug("writing `option::some` payload") - write, err := func(v []uint8, w wrpc.ByteWriter) (func(wrpc.IndexWriter) error, error) { + write, err := func(v []uint8, w wrpc.ByteWriter) (write func(wrpc.IndexWriter) error, err error) { n := len(v) if n > math.MaxUint32 { return nil, fmt.Errorf("list length of %d overflows a 32-bit integer", n) } - if err := func(v int, w wrpc.ByteWriter) error { + if err = func(v int, w io.Writer) error { b := make([]byte, binary.MaxVarintLen32) i := binary.PutUvarint(b, uint64(v)) slog.Debug("writing list length", "len", n) - _, err := w.Write(b[:i]) + _, err = w.Write(b[:i]) return err }(n, w); err != nil { return nil, fmt.Errorf("failed to write list length of %d: %w", n, err) @@ -691,9 +700,9 @@ func ServeInterface(c wrpc.Client, h Handler) (stop func() error, err error) { slog.Debug("writing list elements") writes := make(map[uint32]func(wrpc.IndexWriter) error, n) for i, e := range v { - write, err := func(v uint8, w wrpc.ByteWriter) (func(wrpc.IndexWriter) error, error) { + write, err := (func(wrpc.IndexWriter) error)(nil), func(v uint8, w io.ByteWriter) error { slog.Debug("writing u8 byte") - return nil, w.WriteByte(v) + return w.WriteByte(v) }(e, w) if err != nil { return nil, fmt.Errorf("failed to write list element %d: %w", i, err) @@ -802,7 +811,10 @@ func ServeInterface(c wrpc.Client, h Handler) (stop func() error, err error) { stops = append(stops, stop0) stop1, err := c.Serve("wrpc:keyvalue/store@0.2.0-draft", "set", func(ctx context.Context, w wrpc.IndexWriter, r wrpc.IndexReadCloser) error { slog.DebugContext(ctx, "reading parameter", "i", 0) - p0, err := func(r wrpc.ByteReader) (string, error) { + p0, err := func(r interface { + io.ByteReader + io.Reader + }) (string, error) { var x uint32 var s uint for i := 0; i < 5; i++ { @@ -839,7 +851,10 @@ func ServeInterface(c wrpc.Client, h Handler) (stop func() error, err error) { return fmt.Errorf("failed to read parameter 0: %w", err) } slog.DebugContext(ctx, "reading parameter", "i", 1) - p1, err := func(r wrpc.ByteReader) (string, error) { + p1, err := func(r interface { + io.ByteReader + io.Reader + }) (string, error) { var x uint32 var s uint for i := 0; i < 5; i++ { @@ -876,44 +891,38 @@ func ServeInterface(c wrpc.Client, h Handler) (stop func() error, err error) { return fmt.Errorf("failed to read parameter 1: %w", err) } slog.DebugContext(ctx, "reading parameter", "i", 2) - p2, err := func(r wrpc.ByteReader) ([]uint8, error) { + p2, err := func(r interface { + io.ByteReader + io.Reader + }) ([]byte, error) { var x uint32 var s uint for i := 0; i < 5; i++ { - slog.Debug("reading list length byte", "i", i) + slog.Debug("reading byte list length", "i", i) b, err := r.ReadByte() if err != nil { if i > 0 && err == io.EOF { err = io.ErrUnexpectedEOF } - return nil, fmt.Errorf("failed to read list length byte: %w", err) + return nil, fmt.Errorf("failed to read byte list length byte: %w", err) } if b < 0x80 { if i == 4 && b > 1 { - return nil, errors.New("list length overflows a 32-bit integer") + return nil, errors.New("byte list length overflows a 32-bit integer") } x = x | uint32(b)< int(n) { return rn, errors.New("read more bytes than requested") } - r.buffered = n - uint32(rn) + r.buf = n - uint32(rn) return rn, nil } -func (*byteStreamReceiver) Ready() bool { - return false +func (r *ByteStreamReader) IsComplete() bool { + return r.r.IsComplete() +} + +func (r *ByteStreamReader) Close() error { + c, ok := r.r.(io.Closer) + if ok { + return c.Close() + } + return nil +} + +func NewByteStreamReader(r ByteReadCompleter) *ByteStreamReader { + return &ByteStreamReader{ + r: r, + } } // ReadStreamStatus reads a single byte from `r` and returns: @@ -115,8 +176,8 @@ func ReadStreamStatus(r ByteReader) (bool, error) { } } -// ReadByteStream reads a stream of bytes from `r` and `ch` -func ReadByteStream(r IndexReader, path ...uint32) (ReadyReader, error) { +// ReadByteStream reads a stream of bytes from `r` +func ReadByteStream(r IndexReader, path ...uint32) (ReadCompleter, error) { slog.Debug("reading byte stream status byte") ok, err := ReadStreamStatus(r) if err != nil { @@ -125,9 +186,9 @@ func ReadByteStream(r IndexReader, path ...uint32) (ReadyReader, error) { if !ok { r, err = r.Index(path...) if err != nil { - return nil, fmt.Errorf("failed to get byte stream reader: %w", err) + return nil, fmt.Errorf("failed to index reader: %w", err) } - return &byteStreamReceiver{r, 0}, nil + return NewByteStreamReader(NewPendingByteReader(r)), nil } slog.Debug("reading ready byte stream") buf, err := ReadByteList(r) @@ -135,18 +196,22 @@ func ReadByteStream(r IndexReader, path ...uint32) (ReadyReader, error) { return nil, fmt.Errorf("failed to read bytes: %w", err) } slog.Debug("read ready byte stream", "len", len(buf)) - return &byteReader{bytes.NewReader(buf)}, nil + return NewCompleteReader(bytes.NewReader(buf)), nil } -// ReadStream reads a stream from `r` and `ch` -func ReadStream[T any](ctx context.Context, r ByteReader, ch <-chan []byte, f func(ByteReader) (T, error)) (ReadyReceiver[[]T], error) { - slog.DebugContext(ctx, "reading stream status byte") +// ReadStream reads a stream from `r` +func ReadStream[T any](r IndexReader, f func(IndexReader) (T, error), path ...uint32) (ReceiveCompleter[[]T], error) { + slog.Debug("reading stream status byte") ok, err := ReadStreamStatus(r) if err != nil { return nil, err } if !ok { - return &decodeReceiver[[]T]{&ChanReader{ctx, ch, nil}, func(r ByteReader) ([]T, error) { + r, err = r.Index(path...) + if err != nil { + return nil, fmt.Errorf("failed to index reader: %w", err) + } + return NewDecodeReceiver(r, func(r IndexReader) ([]T, error) { n, err := ReadUint32(r) if err != nil { return nil, fmt.Errorf("failed to read pending stream chunk length: %w", err) @@ -163,19 +228,19 @@ func ReadStream[T any](ctx context.Context, r ByteReader, ch <-chan []byte, f fu vs[i] = v } return vs, nil - }}, nil + }), nil } - slog.DebugContext(ctx, "reading ready stream") + slog.Debug("reading ready stream") vs, err := ReadList(r, f) if err != nil { return nil, fmt.Errorf("failed to read ready stream: %w", err) } - slog.DebugContext(ctx, "read ready stream", "len", len(vs)) - return &ready[[]T]{vs}, nil + slog.Debug("read ready stream", "len", len(vs)) + return NewCompleteReceiver(vs), nil } -func WriteByteStream(r ReadyReader, w ByteWriter, chunk []byte, path ...uint32) (*ByteStreamWriter, error) { - if r.Ready() { +func WriteByteStream(r ReadCompleter, w ByteWriter, chunk []byte, path ...uint32) (*ByteStreamWriter, error) { + if r.IsComplete() { slog.Debug("writing byte stream `stream::ready` status byte") if err := w.WriteByte(1); err != nil { return nil, fmt.Errorf("failed to write `stream::ready` byte: %w", err) diff --git a/go/tuple.go b/go/tuple.go index edf0f7ff..70ba02aa 100644 --- a/go/tuple.go +++ b/go/tuple.go @@ -10,7 +10,7 @@ type Tuple2[T0, T1 any] struct { V1 T1 } -func ReadTuple2[T0, T1 any](r ByteReader, f0 func(ByteReader) (T0, error), f1 func(ByteReader) (T1, error)) (*Tuple2[T0, T1], error, +func ReadTuple2[T0, T1 any](r IndexReader, f0 func(IndexReader) (T0, error), f1 func(IndexReader) (T1, error)) (*Tuple2[T0, T1], error, ) { v0, err := f0(r) if err != nil { @@ -41,7 +41,7 @@ type Tuple3[T0, T1, T2 any] struct { V2 T2 } -func ReadTuple3[T0, T1, T2 any](r ByteReader, f0 func(ByteReader) (T0, error), f1 func(ByteReader) (T1, error), f2 func(ByteReader) (T2, error)) (*Tuple3[T0, T1, T2], error, +func ReadTuple3[T0, T1, T2 any](r IndexReader, f0 func(IndexReader) (T0, error), f1 func(IndexReader) (T1, error), f2 func(IndexReader) (T2, error)) (*Tuple3[T0, T1, T2], error, ) { v0, err := f0(r) if err != nil { diff --git a/go/wrpc.go b/go/wrpc.go index 420102e3..d2ae3384 100644 --- a/go/wrpc.go +++ b/go/wrpc.go @@ -1,7 +1,6 @@ package wrpc import ( - "bytes" "context" "io" ) @@ -70,70 +69,89 @@ type ByteReader interface { io.Reader } -type Ready interface { - Ready() bool +type Completer interface { + IsComplete() bool } type Receiver[T any] interface { Receive() (T, error) } -type ReadyReceiver[T any] interface { +type ReceiveCompleter[T any] interface { Receiver[T] - Ready + Completer } -type ReadyReader interface { +type ReadCompleter interface { io.Reader - Ready + Completer } -type ReadyByteReader interface { +type ByteReadCompleter interface { ByteReader - Ready + Completer } -type byteReader struct { - *bytes.Reader +type PendingReceiver[T any] struct { + Receiver[T] } -func (*byteReader) Ready() bool { - return true +func (r *PendingReceiver[T]) Receive() (T, error) { + return r.Receiver.Receive() } -type PendingByteReader struct { - ByteReader -} - -func (*PendingByteReader) Ready() bool { +func (*PendingReceiver[T]) IsComplete() bool { return false } -func NewPendingByteReader(r ByteReader) *PendingByteReader { - return &PendingByteReader{r} +func NewPendingReceiver[T any](rx Receiver[T]) *PendingReceiver[T] { + return &PendingReceiver[T]{rx} } -type ready[T any] struct { - v T +type CompleteReceiver[T any] struct { + v T + ready bool } -func (r *ready[T]) Receive() (T, error) { +func (r *CompleteReceiver[T]) Receive() (T, error) { + defer func() { + *r = CompleteReceiver[T]{} + }() + if !r.ready { + return r.v, io.EOF + } return r.v, nil } -func (*ready[T]) Ready() bool { +func (*CompleteReceiver[T]) IsComplete() bool { return true } -type decodeReceiver[T any] struct { - r ByteReader - decode func(ByteReader) (T, error) +func NewCompleteReceiver[T any](v T) *CompleteReceiver[T] { + return &CompleteReceiver[T]{v, true} } -func (r *decodeReceiver[T]) Receive() (T, error) { +type DecodeReceiver[T any] struct { + r IndexReader + decode func(IndexReader) (T, error) +} + +func (r *DecodeReceiver[T]) Receive() (T, error) { return r.decode(r.r) } -func (*decodeReceiver[T]) Ready() bool { +func (*DecodeReceiver[T]) IsComplete() bool { return false } + +func (r *DecodeReceiver[T]) Close() error { + c, ok := r.r.(io.Closer) + if ok { + return c.Close() + } + return nil +} + +func NewDecodeReceiver[T any](r IndexReader, decode func(IndexReader) (T, error)) *DecodeReceiver[T] { + return &DecodeReceiver[T]{r, decode} +} diff --git a/tests/go/integration.go b/tests/go/integration.go index c47ebc3d..ded7826a 100644 --- a/tests/go/integration.go +++ b/tests/go/integration.go @@ -112,6 +112,13 @@ func (SyncHandler) WithEnum(ctx context.Context) (sync.Foobar, error) { type AsyncHandler struct{} -func (AsyncHandler) WithStream(ctx context.Context) (wrpc.ReadyReader, error) { - return wrpc.ReadyReader(wrpc.NewPendingByteReader(bytes.NewBuffer([]byte("test")))), nil +func (AsyncHandler) WithStreams(ctx context.Context, complete bool) (wrpc.ReadCompleter, wrpc.ReceiveCompleter[[][]string], error) { + slog.DebugContext(ctx, "handling `with-streams`", "complete", complete) + buf := bytes.NewBuffer([]byte("test")) + str := wrpc.NewCompleteReceiver([][]string{{"foo", "bar"}, {"baz"}}) + if complete { + return wrpc.NewCompleteReader(buf), str, nil + } else { + return wrpc.NewPendingByteReader(buf), wrpc.NewPendingReceiver(str), nil + } } diff --git a/tests/go/integration_test.go b/tests/go/integration_test.go index 95f92f6b..c048159a 100644 --- a/tests/go/integration_test.go +++ b/tests/go/integration_test.go @@ -314,13 +314,13 @@ func TestAsync(t *testing.T) { defer cancel() { - slog.DebugContext(ctx, "calling `wrpc-test:integration/async.with-stream`") - v, shutdown, err := async.WithStream(ctx, client) + slog.DebugContext(ctx, "calling `wrpc-test:integration/async.with-streams`") + byteRx, stringListRx, shutdown, err := async.WithStreams(ctx, client, true) if err != nil { - t.Errorf("failed to call `wrpc-test:integration/async.with-stream`: %s", err) + t.Errorf("failed to call `wrpc-test:integration/async.with-streams`: %s", err) return } - b, err := io.ReadAll(v) + b, err := io.ReadAll(byteRx) if err != nil { t.Errorf("failed to read from stream: %s", err) return @@ -329,6 +329,58 @@ func TestAsync(t *testing.T) { t.Errorf("expected: `test`, got: %s", string(b)) return } + ss, err := stringListRx.Receive() + if err != nil { + t.Errorf("failed to receive ready list stream: %s", err) + return + } + expected := [][]string{{"foo", "bar"}, {"baz"}} + if !reflect.DeepEqual(ss, expected) { + t.Errorf("expected: `%#v`, got: %#v", expected, ss) + return + } + ss, err = stringListRx.Receive() + if ss != nil || err != io.EOF { + t.Errorf("ready list should have returned (nil, io.EOF), got: (%#v, %v)", ss, err) + return + } + if err := shutdown(); err != nil { + t.Errorf("failed to shutdown: %s", err) + return + } + } + + { + slog.DebugContext(ctx, "calling `wrpc-test:integration/async.with-streams`") + byteRx, stringListRx, shutdown, err := async.WithStreams(ctx, client, false) + if err != nil { + t.Errorf("failed to call `wrpc-test:integration/async.with-streams`: %s", err) + return + } + b, err := io.ReadAll(byteRx) + if err != nil { + t.Errorf("failed to read from stream: %s", err) + return + } + if string(b) != "test" { + t.Errorf("expected: `test`, got: %s", string(b)) + return + } + ss, err := stringListRx.Receive() + if err != nil { + t.Errorf("failed to receive ready list stream: %s", err) + return + } + expected := [][]string{{"foo", "bar"}, {"baz"}} + if !reflect.DeepEqual(ss, expected) { + t.Errorf("expected: `%#v`, got: %#v", expected, ss) + return + } + ss, err = stringListRx.Receive() + if ss != nil || err != io.EOF { + t.Errorf("ready list should have returned (nil, io.EOF), got: (%#v, %v)", ss, err) + return + } if err := shutdown(); err != nil { t.Errorf("failed to shutdown: %s", err) return @@ -336,7 +388,7 @@ func TestAsync(t *testing.T) { } if err = stop(); err != nil { - t.Errorf("failed to stop serving `sync-server` world: %s", err) + t.Errorf("failed to stop serving `async-server` world: %s", err) return } } diff --git a/tests/wit/test.wit b/tests/wit/test.wit index 748ea4b8..39433ae0 100644 --- a/tests/wit/test.wit +++ b/tests/wit/test.wit @@ -36,7 +36,7 @@ interface sync { } interface async { - with-stream: func() -> stream; + with-streams: func(complete: bool) -> (bytes: stream, lists: stream>); } world sync-server {