-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathStockStreamingController.java
55 lines (47 loc) · 1.96 KB
/
StockStreamingController.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.javatechie.controller;
import com.google.protobuf.util.JsonFormat;
import com.javatechie.grpc.StockRequest;
import com.javatechie.grpc.StockResponse;
import com.javatechie.grpc.StockTradingServiceGrpc;
import io.grpc.stub.StreamObserver;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import net.devh.boot.grpc.client.inject.GrpcClient;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@RestController
@RequestMapping("/stocks")
public class StockStreamingController {
@GrpcClient("stockService")
private StockTradingServiceGrpc.StockTradingServiceStub stockServiceStub;
private final ExecutorService executor = Executors.newCachedThreadPool();
@GetMapping(value = "/subscribe/{symbol}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter subscribeStockPrice(@PathVariable String symbol) {
SseEmitter emitter = new SseEmitter();
executor.execute(() -> {
StockRequest request = StockRequest.newBuilder().setStockSymbol(symbol).build();
stockServiceStub.subscribeStockPrice(request, new StreamObserver<>() {
@Override
public void onNext(StockResponse response) {
try {
String jsonResponse = JsonFormat.printer().print(response);
emitter.send(jsonResponse);
} catch (IOException e) {
emitter.completeWithError(e);
}
}
@Override
public void onError(Throwable t) {
emitter.completeWithError(t);
}
@Override
public void onCompleted() {
emitter.complete();
}
});
});
return emitter;
}
}