Skip to content

Commit 38fe9f3

Browse files
committed
feat(rpcstream): proxy rpc calls over a rpc
Signed-off-by: Christian Stewart <[email protected]>
1 parent edbd58b commit 38fe9f3

16 files changed

+1416
-13
lines changed

README.md

+4
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ Can use any Stream multiplexer: defaults to [libp2p-mplex] over a WebSocket.
1313

1414
[libp2p-mplex]: https://github.com/libp2p/js-libp2p-mplex
1515

16+
[rpcstream] supports sub-streams for per-component sub-services.
17+
18+
[rpcstream]: ./rpcstream
19+
1620
# Usage
1721

1822
Starting with the [protobuf-project] repository on the "starpc" branch.

echo/echo_srpc.pb.go

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

+4-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "starpc",
3-
"version": "0.4.0",
3+
"version": "0.4.1",
44
"description": "Streaming protobuf RPC service protocol over any two-way channel.",
55
"license": "MIT",
66
"author": {
@@ -22,6 +22,7 @@
2222
"Makefile",
2323
"dist/echo",
2424
"dist/srpc",
25+
"dist/rpcstream",
2526
"e2e",
2627
"echo",
2728
"go.mod",
@@ -39,7 +40,7 @@
3940
"deps": "depcheck",
4041
"codegen": "npm run gen",
4142
"ci": "npm run build && npm run lint:js && npm run lint:go",
42-
"format": "prettier --write './{srpc,echo,e2e,integration}/**/(*.ts|*.tsx|*.html|*.css)'",
43+
"format": "prettier --write './{srpc,echo,e2e,integration,rpcstream}/**/(*.ts|*.tsx|*.html|*.css)'",
4344
"gen": "make genproto",
4445
"test": "npm run test:js && npm run test:go",
4546
"test:go": "make test",
@@ -48,7 +49,7 @@
4849
"integration": "npm run test:integration",
4950
"lint": "npm run lint:go && npm run lint:js",
5051
"lint:go": "make lint",
51-
"lint:js": "eslint -c .eslintrc.js --ext .ts ./{srpc,echo}/**/*.ts",
52+
"lint:js": "eslint -c .eslintrc.js --ext .ts ./{srpc,echo,rpcstream}/**/*.ts",
5253
"prepare": "patch-package",
5354
"precommit": "npm run format"
5455
},

rpcstream/README.md

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# RPC Stream
2+
3+
This package implements running a RPC service on top of another.
4+
5+
The "host" service has a signature like:
6+
7+
```protobuf
8+
syntax = "proto3";
9+
package mypackage;
10+
11+
import "github.com/aperturerobotics/starpc/rpcstream/rpcstream.proto";
12+
13+
// HostService proxies RPC calls to a target Mux.
14+
service HostService {
15+
// MyRpc opens a stream to proxy a RPC call.
16+
rpc MyRpc(stream .rpcstream.RpcStreamPacket) returns (stream .rpcstream.RpcStreamPacket);
17+
}
18+
```
19+
20+
`NewRpcStreamOpenStream(componentID, hostService.MyRpc)` will construct a new
21+
`OpenStreamFunc` which starts a RPC call to `MyRpc` and forwards the starpc
22+
packets over the two-way stream.
23+
24+
The component ID can be used to determine which Mux the client should access.
25+

rpcstream/rpcstream.go

+150
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
package rpcstream
2+
3+
import (
4+
"context"
5+
"errors"
6+
7+
"github.com/aperturerobotics/starpc/srpc"
8+
)
9+
10+
// RpcStream implements a RPC call stream over a RPC call. Used to implement
11+
// sub-components which have a different set of services & calls available.
12+
type RpcStream interface {
13+
srpc.Stream
14+
Send(*Packet) error
15+
Recv() (*Packet, error)
16+
}
17+
18+
// RpcStreamGetter returns the Mux for the component ID from the remote.
19+
type RpcStreamGetter func(ctx context.Context, componentID string) (srpc.Mux, error)
20+
21+
// RpcStreamCaller is a function which starts the RpcStream call.
22+
type RpcStreamCaller func(ctx context.Context) (RpcStream, error)
23+
24+
// NewRpcStreamOpenStream constructs an OpenStream function with a RpcStream.
25+
func NewRpcStreamOpenStream(componentID string, rpcCaller RpcStreamCaller) srpc.OpenStreamFunc {
26+
return func(ctx context.Context, msgHandler srpc.PacketHandler) (srpc.Writer, error) {
27+
// open the rpc stream
28+
rpcStream, err := rpcCaller(ctx)
29+
if err != nil {
30+
return nil, err
31+
}
32+
33+
// write the component id
34+
err = rpcStream.Send(&Packet{
35+
Body: &Packet_Init{
36+
Init: &RpcStreamInit{
37+
ComponentId: componentID,
38+
},
39+
},
40+
})
41+
if err != nil {
42+
_ = rpcStream.Close()
43+
return nil, err
44+
}
45+
46+
// initialize the rpc
47+
rw := NewRpcStreamReadWriter(rpcStream, msgHandler)
48+
49+
// start the read pump
50+
go func() {
51+
err := rw.ReadPump()
52+
if err != nil {
53+
_ = rw.Close()
54+
}
55+
}()
56+
57+
// return the writer
58+
return rw, nil
59+
}
60+
}
61+
62+
// HandleRpcStream handles an incoming RPC stream (remote is the initiator).
63+
func HandleRpcStream(stream RpcStream, getter RpcStreamGetter) error {
64+
// Read the "init" packet.
65+
initPkt, err := stream.Recv()
66+
if err != nil {
67+
return err
68+
}
69+
initInner, ok := initPkt.GetBody().(*Packet_Init)
70+
if !ok || initInner.Init == nil {
71+
return errors.New("expected init packet")
72+
}
73+
componentID := initInner.Init.GetComponentId()
74+
if componentID == "" {
75+
return errors.New("invalid init packet: empty component id")
76+
}
77+
78+
// lookup the server for this component id
79+
ctx := stream.Context()
80+
mux, err := getter(ctx, componentID)
81+
if err != nil {
82+
return err
83+
}
84+
if mux == nil {
85+
return errors.New("no server for that component")
86+
}
87+
88+
// handle the rpc
89+
serverRPC := srpc.NewServerRPC(ctx, mux)
90+
prw := NewRpcStreamReadWriter(stream, serverRPC.HandlePacket)
91+
serverRPC.SetWriter(prw)
92+
err = prw.ReadPump()
93+
_ = prw.Close()
94+
return err
95+
}
96+
97+
// RpcStreamReadWriter reads and writes packets from a RpcStream.
98+
type RpcStreamReadWriter struct {
99+
// stream is the RpcStream
100+
stream RpcStream
101+
// cb is the callback
102+
cb srpc.PacketHandler
103+
}
104+
105+
// NewRpcStreamReadWriter constructs a new read/writer.
106+
func NewRpcStreamReadWriter(stream RpcStream, cb srpc.PacketHandler) *RpcStreamReadWriter {
107+
return &RpcStreamReadWriter{stream: stream, cb: cb}
108+
}
109+
110+
// WritePacket writes a packet to the writer.
111+
func (r *RpcStreamReadWriter) WritePacket(p *srpc.Packet) error {
112+
data, err := p.MarshalVT()
113+
if err != nil {
114+
return err
115+
}
116+
return r.stream.Send(&Packet{
117+
Body: &Packet_Data{
118+
Data: data,
119+
},
120+
})
121+
}
122+
123+
// ReadPump executes the read pump in a goroutine.
124+
func (r *RpcStreamReadWriter) ReadPump() error {
125+
for {
126+
rpcStreamPkt, err := r.stream.Recv()
127+
if err != nil {
128+
return err
129+
}
130+
dataPkt, ok := rpcStreamPkt.GetBody().(*Packet_Data)
131+
if !ok {
132+
return errors.New("expected data packet")
133+
}
134+
pkt := &srpc.Packet{}
135+
if err := pkt.UnmarshalVT(dataPkt.Data); err != nil {
136+
return err
137+
}
138+
if err := r.cb(pkt); err != nil {
139+
return err
140+
}
141+
}
142+
}
143+
144+
// Close closes the packet rw.
145+
func (r *RpcStreamReadWriter) Close() error {
146+
return r.stream.Close()
147+
}
148+
149+
// _ is a type assertion
150+
var _ srpc.Writer = (*RpcStreamReadWriter)(nil)

0 commit comments

Comments
 (0)