Skip to content

Commit 4fec5bb

Browse files
authored
examples: grpc-level proxy
1 parent 4e3ee44 commit 4fec5bb

File tree

2 files changed

+240
-0
lines changed

2 files changed

+240
-0
lines changed

examples/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ createStartScripts('io.grpc.examples.deadline.DeadlineClient')
8989
createStartScripts('io.grpc.examples.deadline.DeadlineServer')
9090
createStartScripts('io.grpc.examples.errordetails.ErrorDetailsExample')
9191
createStartScripts('io.grpc.examples.experimental.CompressingHelloWorldClient')
92+
createStartScripts('io.grpc.examples.grpcproxy.GrpcProxy')
9293
createStartScripts('io.grpc.examples.healthservice.HealthServiceClient')
9394
createStartScripts('io.grpc.examples.healthservice.HealthServiceServer')
9495
createStartScripts('io.grpc.examples.hedging.HedgingHelloWorldClient')
Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
/*
2+
* Copyright 2023, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc.examples.grpcproxy;
18+
19+
import com.google.common.io.ByteStreams;
20+
import io.grpc.CallOptions;
21+
import io.grpc.Channel;
22+
import io.grpc.ClientCall;
23+
import io.grpc.Grpc;
24+
import io.grpc.HandlerRegistry;
25+
import io.grpc.InsecureChannelCredentials;
26+
import io.grpc.InsecureServerCredentials;
27+
import io.grpc.ManagedChannel;
28+
import io.grpc.Metadata;
29+
import io.grpc.MethodDescriptor;
30+
import io.grpc.Server;
31+
import io.grpc.ServerCall;
32+
import io.grpc.ServerCallHandler;
33+
import io.grpc.ServerMethodDefinition;
34+
import io.grpc.Status;
35+
import java.io.ByteArrayInputStream;
36+
import java.io.IOException;
37+
import java.io.InputStream;
38+
import java.util.concurrent.TimeUnit;
39+
import java.util.logging.Logger;
40+
41+
/**
42+
* A grpc-level proxy. GrpcProxy itself can be used unmodified to proxy any service for both unary
43+
* and streaming. It doesn't care what type of messages are being used. The Registry class causes it
44+
* to be called for any inbound RPC, and uses plain bytes for messages which avoids marshalling
45+
* messages and the need for Protobuf schema information.
46+
*
47+
* <p>Route guide has unary and streaming RPCs which makes it a nice showcase. To test with route
48+
* guide, run each in a separate terminal window:
49+
* <pre>{@code
50+
* ./build/install/examples/bin/route-guide-server
51+
* ./build/install/examples/bin/grpc-proxy
52+
* ./build/install/examples/bin/route-guide-client localhost:8981
53+
* }<pre>
54+
*
55+
* <p>You can verify the proxy is being used by shutting down the proxy and seeing the client fail.
56+
*/
57+
public final class GrpcProxy<ReqT, RespT> implements ServerCallHandler<ReqT, RespT> {
58+
private static final Logger logger = Logger.getLogger(GrpcProxy.class.getName());
59+
60+
private final Channel channel;
61+
62+
public GrpcProxy(Channel channel) {
63+
this.channel = channel;
64+
}
65+
66+
@Override
67+
public ServerCall.Listener<ReqT> startCall(
68+
ServerCall<ReqT, RespT> serverCall, Metadata headers) {
69+
ClientCall<ReqT, RespT> clientCall
70+
= channel.newCall(serverCall.getMethodDescriptor(), CallOptions.DEFAULT);
71+
CallProxy<ReqT, RespT> proxy = new CallProxy<ReqT, RespT>(serverCall, clientCall);
72+
clientCall.start(proxy.clientCallListener, headers);
73+
serverCall.request(1);
74+
clientCall.request(1);
75+
return proxy.serverCallListener;
76+
}
77+
78+
private static class CallProxy<ReqT, RespT> {
79+
final RequestProxy serverCallListener;
80+
final ResponseProxy clientCallListener;
81+
82+
public CallProxy(ServerCall<ReqT, RespT> serverCall, ClientCall<ReqT, RespT> clientCall) {
83+
serverCallListener = new RequestProxy(clientCall);
84+
clientCallListener = new ResponseProxy(serverCall);
85+
}
86+
87+
private class RequestProxy extends ServerCall.Listener<ReqT> {
88+
private final ClientCall<ReqT, ?> clientCall;
89+
// Hold 'this' lock when accessing
90+
private boolean needToRequest;
91+
92+
public RequestProxy(ClientCall<ReqT, ?> clientCall) {
93+
this.clientCall = clientCall;
94+
}
95+
96+
@Override public void onCancel() {
97+
clientCall.cancel("Server cancelled", null);
98+
}
99+
100+
@Override public void onHalfClose() {
101+
clientCall.halfClose();
102+
}
103+
104+
@Override public void onMessage(ReqT message) {
105+
clientCall.sendMessage(message);
106+
synchronized (this) {
107+
if (clientCall.isReady()) {
108+
clientCallListener.serverCall.request(1);
109+
} else {
110+
// The outgoing call is not ready for more requests. Stop requesting additional data and
111+
// wait for it to catch up.
112+
needToRequest = true;
113+
}
114+
}
115+
}
116+
117+
@Override public void onReady() {
118+
clientCallListener.onServerReady();
119+
}
120+
121+
// Called from ResponseProxy, which is a different thread than the ServerCall.Listener
122+
// callbacks.
123+
synchronized void onClientReady() {
124+
if (needToRequest) {
125+
clientCallListener.serverCall.request(1);
126+
needToRequest = false;
127+
}
128+
}
129+
}
130+
131+
private class ResponseProxy extends ClientCall.Listener<RespT> {
132+
private final ServerCall<?, RespT> serverCall;
133+
// Hold 'this' lock when accessing
134+
private boolean needToRequest;
135+
136+
public ResponseProxy(ServerCall<?, RespT> serverCall) {
137+
this.serverCall = serverCall;
138+
}
139+
140+
@Override public void onClose(Status status, Metadata trailers) {
141+
serverCall.close(status, trailers);
142+
}
143+
144+
@Override public void onHeaders(Metadata headers) {
145+
serverCall.sendHeaders(headers);
146+
}
147+
148+
@Override public void onMessage(RespT message) {
149+
serverCall.sendMessage(message);
150+
synchronized (this) {
151+
if (serverCall.isReady()) {
152+
serverCallListener.clientCall.request(1);
153+
} else {
154+
// The incoming call is not ready for more responses. Stop requesting additional data
155+
// and wait for it to catch up.
156+
needToRequest = true;
157+
}
158+
}
159+
}
160+
161+
@Override public void onReady() {
162+
serverCallListener.onClientReady();
163+
}
164+
165+
// Called from RequestProxy, which is a different thread than the ClientCall.Listener
166+
// callbacks.
167+
synchronized void onServerReady() {
168+
if (needToRequest) {
169+
serverCallListener.clientCall.request(1);
170+
needToRequest = false;
171+
}
172+
}
173+
}
174+
}
175+
176+
private static class ByteMarshaller implements MethodDescriptor.Marshaller<byte[]> {
177+
@Override public byte[] parse(InputStream stream) {
178+
try {
179+
return ByteStreams.toByteArray(stream);
180+
} catch (IOException ex) {
181+
throw new RuntimeException();
182+
}
183+
}
184+
185+
@Override public InputStream stream(byte[] value) {
186+
return new ByteArrayInputStream(value);
187+
}
188+
};
189+
190+
public static class Registry extends HandlerRegistry {
191+
private final MethodDescriptor.Marshaller<byte[]> byteMarshaller = new ByteMarshaller();
192+
private final ServerCallHandler<byte[], byte[]> handler;
193+
194+
public Registry(ServerCallHandler<byte[], byte[]> handler) {
195+
this.handler = handler;
196+
}
197+
198+
@Override
199+
public ServerMethodDefinition<?,?> lookupMethod(String methodName, String authority) {
200+
MethodDescriptor<byte[], byte[]> methodDescriptor
201+
= MethodDescriptor.newBuilder(byteMarshaller, byteMarshaller)
202+
.setFullMethodName(methodName)
203+
.setType(MethodDescriptor.MethodType.UNKNOWN)
204+
.build();
205+
return ServerMethodDefinition.create(methodDescriptor, handler);
206+
}
207+
}
208+
209+
public static void main(String[] args) throws IOException, InterruptedException {
210+
String target = "localhost:8980";
211+
ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
212+
.build();
213+
logger.info("Proxy will connect to " + target);
214+
GrpcProxy<byte[], byte[]> proxy = new GrpcProxy<byte[], byte[]>(channel);
215+
int port = 8981;
216+
Server server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create())
217+
.fallbackHandlerRegistry(new Registry(proxy))
218+
.build()
219+
.start();
220+
logger.info("Proxy started, listening on " + port);
221+
Runtime.getRuntime().addShutdownHook(new Thread() {
222+
@Override
223+
public void run() {
224+
server.shutdown();
225+
try {
226+
server.awaitTermination(10, TimeUnit.SECONDS);
227+
} catch (InterruptedException ex) {
228+
Thread.currentThread().interrupt();
229+
}
230+
server.shutdownNow();
231+
channel.shutdownNow();
232+
}
233+
});
234+
server.awaitTermination();
235+
if (!channel.awaitTermination(1, TimeUnit.SECONDS)) {
236+
System.out.println("Channel didn't shut down promptly");
237+
}
238+
}
239+
}

0 commit comments

Comments
 (0)