Skip to content

Commit 322e488

Browse files
committed
Revert "core: Report marshaller error for uncompressed size too large back to the client 2 (grpc#12477)"
This reverts commit 14087f8. It caused the same failures as the previous attempt that was reverted in 7eab160. cl/831837315 ``` java.lang.AssertionError: Failed executing read operation at io.grpc.internal.CompositeReadableBuffer.execute(CompositeReadableBuffer.java:328) at io.grpc.internal.CompositeReadableBuffer.executeNoThrow(CompositeReadableBuffer.java:336) at io.grpc.internal.CompositeReadableBuffer.readBytes(CompositeReadableBuffer.java:151) at io.grpc.internal.ReadableBuffers$BufferInputStream.read(ReadableBuffers.java:377) at io.grpc.protobuf.lite.ProtoLiteUtils$MessageMarshaller.parse(ProtoLiteUtils.java:205) at io.grpc.protobuf.lite.ProtoLiteUtils$MessageMarshaller.parse(ProtoLiteUtils.java:133) at io.grpc.MethodDescriptor.parseRequest(MethodDescriptor.java:307) ```
1 parent 28a6130 commit 322e488

File tree

6 files changed

+7
-129
lines changed

6 files changed

+7
-129
lines changed

core/src/main/java/io/grpc/internal/CloseWithHeadersMarker.java

Lines changed: 0 additions & 32 deletions
This file was deleted.

core/src/main/java/io/grpc/internal/ServerCallImpl.java

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -279,17 +279,6 @@ private void handleInternalError(Throwable internalError) {
279279
serverCallTracer.reportCallEnded(false); // error so always false
280280
}
281281

282-
/**
283-
* Close the {@link ServerStream} because parsing request message failed.
284-
* Similar to {@link #handleInternalError(Throwable)}.
285-
*/
286-
private void handleParseError(StatusRuntimeException parseError) {
287-
cancelled = true;
288-
log.log(Level.WARNING, "Cancelling the stream because of parse error", parseError);
289-
stream.cancel(parseError.getStatus().withCause(new CloseWithHeadersMarker()));
290-
serverCallTracer.reportCallEnded(false); // error so always false
291-
}
292-
293282
/**
294283
* All of these callbacks are assumed to called on an application thread, and the caller is
295284
* responsible for handling thrown exceptions.
@@ -338,23 +327,18 @@ private void messagesAvailableInternal(final MessageProducer producer) {
338327
return;
339328
}
340329

341-
InputStream message = null;
330+
InputStream message;
342331
try {
343332
while ((message = producer.next()) != null) {
344-
ReqT parsed;
345333
try {
346-
parsed = call.method.parseRequest(message);
347-
} catch (StatusRuntimeException e) {
334+
listener.onMessage(call.method.parseRequest(message));
335+
} catch (Throwable t) {
348336
GrpcUtil.closeQuietly(message);
349-
GrpcUtil.closeQuietly(producer);
350-
call.handleParseError(e);
351-
return;
337+
throw t;
352338
}
353339
message.close();
354-
listener.onMessage(parsed);
355340
}
356341
} catch (Throwable t) {
357-
GrpcUtil.closeQuietly(message);
358342
GrpcUtil.closeQuietly(producer);
359343
Throwables.throwIfUnchecked(t);
360344
throw new RuntimeException(t);

core/src/test/java/io/grpc/internal/ServerCallImplTest.java

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,9 @@
4848
import io.grpc.SecurityLevel;
4949
import io.grpc.ServerCall;
5050
import io.grpc.Status;
51-
import io.grpc.StatusRuntimeException;
5251
import io.grpc.internal.ServerCallImpl.ServerStreamListenerImpl;
5352
import io.perfmark.PerfMark;
5453
import java.io.ByteArrayInputStream;
55-
import java.io.IOException;
5654
import java.io.InputStream;
5755
import java.io.InputStreamReader;
5856
import org.junit.Before;
@@ -71,8 +69,6 @@ public class ServerCallImplTest {
7169

7270
@Mock private ServerStream stream;
7371
@Mock private ServerCall.Listener<Long> callListener;
74-
@Mock private StreamListener.MessageProducer messageProducer;
75-
@Mock private InputStream message;
7672

7773
private final CallTracer serverCallTracer = CallTracer.getDefaultFactory().create();
7874
private ServerCallImpl<Long, Long> call;
@@ -497,44 +493,6 @@ public void streamListener_unexpectedRuntimeException() {
497493
assertThat(e).hasMessageThat().isEqualTo("unexpected exception");
498494
}
499495

500-
@Test
501-
public void streamListener_statusRuntimeException() throws IOException {
502-
MethodDescriptor<Long, Long> failingParseMethod = MethodDescriptor.<Long, Long>newBuilder()
503-
.setType(MethodType.UNARY)
504-
.setFullMethodName("service/method")
505-
.setRequestMarshaller(new LongMarshaller() {
506-
@Override
507-
public Long parse(InputStream stream) {
508-
throw new StatusRuntimeException(Status.RESOURCE_EXHAUSTED
509-
.withDescription("Decompressed gRPC message exceeds maximum size"));
510-
}
511-
})
512-
.setResponseMarshaller(new LongMarshaller())
513-
.build();
514-
515-
call = new ServerCallImpl<>(stream, failingParseMethod, requestHeaders, context,
516-
DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(),
517-
serverCallTracer, PerfMark.createTag());
518-
519-
ServerStreamListenerImpl<Long> streamListener =
520-
new ServerCallImpl.ServerStreamListenerImpl<>(call, callListener, context);
521-
522-
when(messageProducer.next()).thenReturn(message, (InputStream) null);
523-
streamListener.messagesAvailable(messageProducer);
524-
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
525-
verify(stream).cancel(statusCaptor.capture());
526-
Status status = statusCaptor.getValue();
527-
assertEquals(Status.Code.RESOURCE_EXHAUSTED, status.getCode());
528-
assertEquals("Decompressed gRPC message exceeds maximum size", status.getDescription());
529-
530-
streamListener.halfClosed();
531-
verify(callListener, never()).onHalfClose();
532-
533-
when(messageProducer.next()).thenReturn(message, (InputStream) null);
534-
streamListener.messagesAvailable(messageProducer);
535-
verify(callListener, never()).onMessage(any());
536-
}
537-
538496
private static class LongMarshaller implements Marshaller<Long> {
539497
@Override
540498
public InputStream stream(Long value) {

interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2024,7 +2024,7 @@ private void assertPayload(Payload expected, Payload actual) {
20242024
}
20252025
}
20262026

2027-
protected static void assertCodeEquals(Status.Code expected, Status actual) {
2027+
private static void assertCodeEquals(Status.Code expected, Status actual) {
20282028
assertWithMessage("Unexpected status: %s", actual).that(actual.getCode()).isEqualTo(expected);
20292029
}
20302030

interop-testing/src/test/java/io/grpc/testing/integration/TransportCompressionTest.java

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package io.grpc.testing.integration;
1818

1919
import static org.junit.Assert.assertEquals;
20-
import static org.junit.Assert.assertThrows;
2120
import static org.junit.Assert.assertTrue;
2221

2322
import com.google.protobuf.ByteString;
@@ -38,8 +37,6 @@
3837
import io.grpc.ServerCall.Listener;
3938
import io.grpc.ServerCallHandler;
4039
import io.grpc.ServerInterceptor;
41-
import io.grpc.Status.Code;
42-
import io.grpc.StatusRuntimeException;
4340
import io.grpc.internal.GrpcUtil;
4441
import io.grpc.netty.InternalNettyChannelBuilder;
4542
import io.grpc.netty.InternalNettyServerBuilder;
@@ -56,9 +53,7 @@
5653
import java.io.OutputStream;
5754
import org.junit.Before;
5855
import org.junit.BeforeClass;
59-
import org.junit.Rule;
6056
import org.junit.Test;
61-
import org.junit.rules.TestName;
6257
import org.junit.runner.RunWith;
6358
import org.junit.runners.JUnit4;
6459

@@ -89,16 +84,10 @@ public static void registerCompressors() {
8984
compressors.register(Codec.Identity.NONE);
9085
}
9186

92-
@Rule
93-
public final TestName currentTest = new TestName();
94-
9587
@Override
9688
protected ServerBuilder<?> getServerBuilder() {
9789
NettyServerBuilder builder = NettyServerBuilder.forPort(0, InsecureServerCredentials.create())
98-
.maxInboundMessageSize(
99-
DECOMPRESSED_MESSAGE_TOO_LONG_METHOD_NAME.equals(currentTest.getMethodName())
100-
? 1000
101-
: AbstractInteropTest.MAX_MESSAGE_SIZE)
90+
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
10291
.compressorRegistry(compressors)
10392
.decompressorRegistry(decompressors)
10493
.intercept(new ServerInterceptor() {
@@ -137,22 +126,6 @@ public void compresses() {
137126
assertTrue(FZIPPER.anyWritten);
138127
}
139128

140-
private static final String DECOMPRESSED_MESSAGE_TOO_LONG_METHOD_NAME =
141-
"decompressedMessageTooLong";
142-
143-
@Test
144-
public void decompressedMessageTooLong() {
145-
assertEquals(DECOMPRESSED_MESSAGE_TOO_LONG_METHOD_NAME, currentTest.getMethodName());
146-
final SimpleRequest bigRequest = SimpleRequest.newBuilder()
147-
.setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[10_000])))
148-
.build();
149-
StatusRuntimeException e = assertThrows(StatusRuntimeException.class,
150-
() -> blockingStub.withCompression("gzip").unaryCall(bigRequest));
151-
assertCodeEquals(Code.RESOURCE_EXHAUSTED, e.getStatus());
152-
assertEquals("Decompressed gRPC message exceeds maximum size 1000",
153-
e.getStatus().getDescription());
154-
}
155-
156129
@Override
157130
protected NettyChannelBuilder createChannelBuilder() {
158131
NettyChannelBuilder builder = NettyChannelBuilder.forAddress(getListenAddress())

netty/src/main/java/io/grpc/netty/NettyServerStream.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import io.grpc.Metadata;
2424
import io.grpc.Status;
2525
import io.grpc.internal.AbstractServerStream;
26-
import io.grpc.internal.CloseWithHeadersMarker;
2726
import io.grpc.internal.StatsTraceContext;
2827
import io.grpc.internal.TransportTracer;
2928
import io.grpc.internal.WritableBuffer;
@@ -131,11 +130,7 @@ public void writeTrailers(Metadata trailers, boolean headersSent, Status status)
131130
@Override
132131
public void cancel(Status status) {
133132
try (TaskCloseable ignore = PerfMark.traceTask("NettyServerStream$Sink.cancel")) {
134-
CancelServerStreamCommand cmd =
135-
status.getCause() instanceof CloseWithHeadersMarker
136-
? CancelServerStreamCommand.withReason(transportState(), status)
137-
: CancelServerStreamCommand.withReset(transportState(), status);
138-
writeQueue.enqueue(cmd, true);
133+
writeQueue.enqueue(CancelServerStreamCommand.withReset(transportState(), status), true);
139134
}
140135
}
141136
}

0 commit comments

Comments
 (0)