Skip to content

Commit 487c051

Browse files
committed
WIP
1 parent 1415f1a commit 487c051

File tree

3 files changed

+25
-9
lines changed

3 files changed

+25
-9
lines changed

core/src/main/java/io/grpc/internal/GrpcUtil.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -888,7 +888,7 @@ static <T> boolean iterableContains(Iterable<T> iterable, T item) {
888888
* byte is then represented by the 3-character string "%XY", where "XY" is the two-digit,
889889
* uppercase, hexadecimal representation of the byte value.
890890
* </ul>
891-
*
891+
*
892892
* <p>This section does not use URLEscapers from Guava Net as its not Android-friendly thus core
893893
* can't depend on it.
894894
*/
@@ -972,7 +972,16 @@ public static boolean getFlag(String envVarName, boolean enableByDefault) {
972972
}
973973
}
974974

975-
975+
/**
976+
* Marker to be used in {@link Status#withCause(Throwable)} to signal that stream should be closed
977+
* by sending headers.
978+
*/
979+
public static final Throwable CLOSE_WITH_HEADERS = new Throwable("CLOSE_WITH_HEADERS") {
980+
@Override
981+
public synchronized Throwable fillInStackTrace() {
982+
return this;
983+
}
984+
};
976985

977986
private GrpcUtil() {}
978987
}

core/src/main/java/io/grpc/internal/ServerCallImpl.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -327,22 +327,24 @@ private void messagesAvailableInternal(final MessageProducer producer) {
327327
return;
328328
}
329329

330+
InputStream message = null;
330331
try {
331-
InputStream message;
332332
while ((message = producer.next()) != null) {
333-
ReqT parsedMessage;
334-
try (InputStream ignored = message) {
335-
parsedMessage = call.method.parseRequest(message);
333+
ReqT parsed;
334+
try {
335+
parsed = call.method.parseRequest(message);
336336
} catch (StatusRuntimeException e) {
337337
GrpcUtil.closeQuietly(message);
338338
GrpcUtil.closeQuietly(producer);
339339
call.cancelled = true;
340-
call.close(e.getStatus(), new Metadata());
340+
call.handleInternalError(InternalStatus.asRuntimeExceptionWithoutStacktrace(
341+
e.getStatus().withCause(GrpcUtil.CLOSE_WITH_HEADERS), null));
341342
return;
342343
}
343-
listener.onMessage(parsedMessage);
344+
listener.onMessage(parsed);
344345
}
345346
} catch (Throwable t) {
347+
GrpcUtil.closeQuietly(message);
346348
GrpcUtil.closeQuietly(producer);
347349
Throwables.throwIfUnchecked(t);
348350
throw new RuntimeException(t);

netty/src/main/java/io/grpc/netty/NettyServerStream.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.grpc.Metadata;
2424
import io.grpc.Status;
2525
import io.grpc.internal.AbstractServerStream;
26+
import io.grpc.internal.GrpcUtil;
2627
import io.grpc.internal.StatsTraceContext;
2728
import io.grpc.internal.TransportTracer;
2829
import io.grpc.internal.WritableBuffer;
@@ -130,7 +131,11 @@ public void writeTrailers(Metadata trailers, boolean headersSent, Status status)
130131
@Override
131132
public void cancel(Status status) {
132133
try (TaskCloseable ignore = PerfMark.traceTask("NettyServerStream$Sink.cancel")) {
133-
writeQueue.enqueue(CancelServerStreamCommand.withReset(transportState(), status), true);
134+
CancelServerStreamCommand cmd =
135+
status.getCause() == GrpcUtil.CLOSE_WITH_HEADERS
136+
? CancelServerStreamCommand.withReason(transportState(), status)
137+
: CancelServerStreamCommand.withReset(transportState(), status);
138+
writeQueue.enqueue(cmd, true);
134139
}
135140
}
136141
}

0 commit comments

Comments
 (0)