diff --git a/binder/src/androidTest/java/io/grpc/binder/internal/BinderClientTransportTest.java b/binder/src/androidTest/java/io/grpc/binder/internal/BinderClientTransportTest.java index 0038a054854..51dacaa3e51 100644 --- a/binder/src/androidTest/java/io/grpc/binder/internal/BinderClientTransportTest.java +++ b/binder/src/androidTest/java/io/grpc/binder/internal/BinderClientTransportTest.java @@ -48,6 +48,7 @@ import io.grpc.internal.ClientStream; import io.grpc.internal.ClientStreamListener; import io.grpc.internal.ClientTransportFactory.ClientTransportOptions; +import io.grpc.internal.DisconnectError; import io.grpc.internal.FixedObjectPool; import io.grpc.internal.ManagedClientTransport; import io.grpc.internal.ObjectPool; @@ -514,7 +515,7 @@ private static final class TestTransportListener implements ManagedClientTranspo private final SettableFuture isTerminated = SettableFuture.create(); @Override - public void transportShutdown(Status shutdownStatus) { + public void transportShutdown(Status shutdownStatus, DisconnectError disconnectError) { if (!this.shutdownStatus.set(shutdownStatus)) { throw new IllegalStateException("transportShutdown() already called"); } diff --git a/binder/src/main/java/io/grpc/binder/internal/BinderClientTransport.java b/binder/src/main/java/io/grpc/binder/internal/BinderClientTransport.java index 144ad56eec3..2455cb4c153 100644 --- a/binder/src/main/java/io/grpc/binder/internal/BinderClientTransport.java +++ b/binder/src/main/java/io/grpc/binder/internal/BinderClientTransport.java @@ -54,6 +54,7 @@ import io.grpc.internal.GrpcUtil; import io.grpc.internal.ManagedClientTransport; import io.grpc.internal.ObjectPool; +import io.grpc.internal.SimpleDisconnectError; import io.grpc.internal.StatsTraceContext; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; @@ -303,7 +304,7 @@ public synchronized void shutdownNow(Status reason) { @Override @GuardedBy("this") void notifyShutdown(Status status) { - clientTransportListener.transportShutdown(status); + clientTransportListener.transportShutdown(status, SimpleDisconnectError.UNKNOWN); } @Override diff --git a/binder/src/test/java/io/grpc/binder/internal/RobolectricBinderTransportTest.java b/binder/src/test/java/io/grpc/binder/internal/RobolectricBinderTransportTest.java index d3d73f0e9eb..00d43852063 100644 --- a/binder/src/test/java/io/grpc/binder/internal/RobolectricBinderTransportTest.java +++ b/binder/src/test/java/io/grpc/binder/internal/RobolectricBinderTransportTest.java @@ -25,6 +25,7 @@ import static io.grpc.binder.internal.BinderTransport.SHUTDOWN_TRANSPORT; import static io.grpc.binder.internal.BinderTransport.WIRE_FORMAT_VERSION; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -56,6 +57,7 @@ import io.grpc.internal.ClientTransport; import io.grpc.internal.ClientTransportFactory.ClientTransportOptions; import io.grpc.internal.ConnectionClientTransport; +import io.grpc.internal.DisconnectError; import io.grpc.internal.GrpcUtil; import io.grpc.internal.InternalServer; import io.grpc.internal.ManagedClientTransport; @@ -336,7 +338,7 @@ public void clientIgnoresTransactionFromNonServerUids() throws Exception { sendShutdownTransportTransactionAsUid(client, serverUid); verify(mockClientTransportListener, timeout(TIMEOUT_MS)) - .transportShutdown(statusCaptor.capture()); + .transportShutdown(statusCaptor.capture(), any(DisconnectError.class)); assertThat(statusCaptor.getValue().getCode()).isEqualTo(Status.Code.UNAVAILABLE); assertThat(statusCaptor.getValue().getDescription()).contains("shutdown"); } diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index eccd8fadc8c..b534ac4d8b8 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -201,8 +201,8 @@ public ListenableFuture getStats() { } /** - * Prevents creating any new streams. Buffered streams are not failed and may still proceed - * when {@link #reprocess} is called. The delayed transport will be terminated when there is no + * Prevents creating any new streams. Buffered streams are not failed and may still proceed + * when {@link #reprocess} is called. The delayed transport will be terminated when there is no * more buffered streams. */ @Override @@ -215,7 +215,7 @@ public final void shutdown(final Status status) { syncContext.executeLater(new Runnable() { @Override public void run() { - listener.transportShutdown(status); + listener.transportShutdown(status, SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); } }); if (!hasPendingStreams() && reportTransportTerminated != null) { diff --git a/core/src/main/java/io/grpc/internal/DisconnectError.java b/core/src/main/java/io/grpc/internal/DisconnectError.java new file mode 100644 index 00000000000..771024f106e --- /dev/null +++ b/core/src/main/java/io/grpc/internal/DisconnectError.java @@ -0,0 +1,34 @@ +/* + * Copyright 2025 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.internal; + +import javax.annotation.concurrent.Immutable; + +/** + * Represents the reason for a subchannel disconnection. + * Implementations are either the SimpleDisconnectError enum or the GoAwayDisconnectError class for + * dynamic ones. + */ +@Immutable +public interface DisconnectError { + /** + * Returns the string representation suitable for use as an error tag. + * + * @return The formatted error tag string. + */ + String toErrorString(); +} diff --git a/core/src/main/java/io/grpc/internal/GoAwayDisconnectError.java b/core/src/main/java/io/grpc/internal/GoAwayDisconnectError.java new file mode 100644 index 00000000000..20c8c709932 --- /dev/null +++ b/core/src/main/java/io/grpc/internal/GoAwayDisconnectError.java @@ -0,0 +1,64 @@ +/* + * Copyright 2025 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.internal; + + +import javax.annotation.concurrent.Immutable; + +/** + * Represents a dynamic disconnection due to an HTTP/2 GOAWAY frame. + * This class is immutable and holds the specific error code from the frame. + */ +@Immutable +public final class GoAwayDisconnectError implements DisconnectError { + private static final String ERROR_TAG = "GOAWAY"; + private final GrpcUtil.Http2Error errorCode; + + /** + * Creates a GoAway reason. + * + * @param errorCode The specific, non-null HTTP/2 error code (e.g., "NO_ERROR"). + */ + public GoAwayDisconnectError(GrpcUtil.Http2Error errorCode) { + if (errorCode == null) { + throw new NullPointerException("Http2Error cannot be null for GOAWAY"); + } + this.errorCode = errorCode; + } + + @Override + public String toErrorString() { + return ERROR_TAG + " " + errorCode.name(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + GoAwayDisconnectError goAwayDisconnectError = (GoAwayDisconnectError) o; + return errorCode == goAwayDisconnectError.errorCode; + } + + @Override + public int hashCode() { + return errorCode.hashCode(); + } +} diff --git a/core/src/main/java/io/grpc/internal/InternalSubchannel.java b/core/src/main/java/io/grpc/internal/InternalSubchannel.java index 649843c5c03..7a48bf642fe 100644 --- a/core/src/main/java/io/grpc/internal/InternalSubchannel.java +++ b/core/src/main/java/io/grpc/internal/InternalSubchannel.java @@ -326,7 +326,7 @@ public void run() { } /** - * Immediately attempt to reconnect if the current state is TRANSIENT_FAILURE. Otherwise this + * Immediately attempt to reconnect if the current state is TRANSIENT_FAILURE. Otherwise, this * method has no effect. */ void resetConnectBackoff() { @@ -620,7 +620,7 @@ public void transportInUse(boolean inUse) { } @Override - public void transportShutdown(final Status s) { + public void transportShutdown(final Status s, final DisconnectError disconnectError) { channelLogger.log( ChannelLogLevel.INFO, "{0} SHUTDOWN with {1}", transport.getLogId(), printShortStatus(s)); shutdownInitiated = true; @@ -639,8 +639,7 @@ public void run() { NameResolver.ATTR_BACKEND_SERVICE), /* locality= */ getAttributeOrDefault(addressIndex.getCurrentEagAttributes(), EquivalentAddressGroup.ATTR_LOCALITY_NAME), - /* disconnectError= */ SubchannelMetrics.DisconnectError.UNKNOWN - .getErrorString(null), + /* disconnectError= */ disconnectError.toErrorString(), /* securityLevel= */ extractSecurityLevel(addressIndex.getCurrentEagAttributes() .get(GrpcAttributes.ATTR_SECURITY_LEVEL))); } else if (pendingTransport == transport) { diff --git a/core/src/main/java/io/grpc/internal/KeepAliveManager.java b/core/src/main/java/io/grpc/internal/KeepAliveManager.java index d831a096087..0f389436aa3 100644 --- a/core/src/main/java/io/grpc/internal/KeepAliveManager.java +++ b/core/src/main/java/io/grpc/internal/KeepAliveManager.java @@ -27,6 +27,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import javax.annotation.concurrent.ThreadSafe; /** * Manages keepalive pings. @@ -262,9 +263,34 @@ public interface KeepAlivePinger { * Default client side {@link KeepAlivePinger}. */ public static final class ClientKeepAlivePinger implements KeepAlivePinger { - private final ConnectionClientTransport transport; - public ClientKeepAlivePinger(ConnectionClientTransport transport) { + + /** + * A {@link ClientTransport} that has life-cycle management. + * + */ + @ThreadSafe + public interface TransportWithDisconnectReason extends ClientTransport { + + /** + * Initiates an orderly shutdown of the transport. Existing streams continue, but the + * transport will not own any new streams. New streams will either fail (once + * {@link ManagedClientTransport.Listener#transportShutdown} callback called), or be + * transferred off this transport (in which case they may succeed). This method may only be + * called once. + */ + void shutdown(Status reason, DisconnectError disconnectError); + + /** + * Initiates a forceful shutdown in which preexisting and new calls are closed. Existing calls + * should be closed with the provided {@code reason} and {@code disconnectError}. + */ + void shutdownNow(Status reason, DisconnectError disconnectError); + } + + private final TransportWithDisconnectReason transport; + + public ClientKeepAlivePinger(TransportWithDisconnectReason transport) { this.transport = transport; } @@ -277,7 +303,8 @@ public void onSuccess(long roundTripTimeNanos) {} @Override public void onFailure(Status cause) { transport.shutdownNow(Status.UNAVAILABLE.withDescription( - "Keepalive failed. The connection is likely gone")); + "Keepalive failed. The connection is likely gone"), + SimpleDisconnectError.CONNECTION_TIMED_OUT); } }, MoreExecutors.directExecutor()); } @@ -285,7 +312,8 @@ public void onFailure(Status cause) { @Override public void onPingTimeout() { transport.shutdownNow(Status.UNAVAILABLE.withDescription( - "Keepalive failed. The connection is likely gone")); + "Keepalive failed. The connection is likely gone"), + SimpleDisconnectError.CONNECTION_TIMED_OUT); } } } diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 849e4b8e45c..e9fda4e9ec3 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -2056,7 +2056,7 @@ public String toString() { */ private final class DelayedTransportListener implements ManagedClientTransport.Listener { @Override - public void transportShutdown(Status s) { + public void transportShutdown(Status s, DisconnectError e) { checkState(shutdown.get(), "Channel must have been shut down"); } diff --git a/core/src/main/java/io/grpc/internal/ManagedClientTransport.java b/core/src/main/java/io/grpc/internal/ManagedClientTransport.java index 184a4d98955..8350a005409 100644 --- a/core/src/main/java/io/grpc/internal/ManagedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/ManagedClientTransport.java @@ -77,8 +77,9 @@ interface Listener { *

This is called exactly once, and must be called prior to {@link #transportTerminated}. * * @param s the reason for the shutdown. + * @param e the disconnect error. */ - void transportShutdown(Status s); + void transportShutdown(Status s, DisconnectError e); /** * The transport completed shutting down. All resources have been released. All streams have diff --git a/core/src/main/java/io/grpc/internal/OobChannel.java b/core/src/main/java/io/grpc/internal/OobChannel.java index 30c9f55e796..71973ed5d64 100644 --- a/core/src/main/java/io/grpc/internal/OobChannel.java +++ b/core/src/main/java/io/grpc/internal/OobChannel.java @@ -117,7 +117,7 @@ public ClientStream newStream(MethodDescriptor method, this.channelz = Preconditions.checkNotNull(channelz); this.delayedTransport.start(new ManagedClientTransport.Listener() { @Override - public void transportShutdown(Status s) { + public void transportShutdown(Status s, DisconnectError e) { // Don't care } diff --git a/core/src/main/java/io/grpc/internal/SimpleDisconnectError.java b/core/src/main/java/io/grpc/internal/SimpleDisconnectError.java new file mode 100644 index 00000000000..addbfbe10a3 --- /dev/null +++ b/core/src/main/java/io/grpc/internal/SimpleDisconnectError.java @@ -0,0 +1,68 @@ +/* + * Copyright 2025 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.internal; + +import javax.annotation.concurrent.Immutable; + +/** + * Represents a fixed, static reason for disconnection. + */ +@Immutable +public enum SimpleDisconnectError implements DisconnectError { + /** + * The subchannel was shut down for various reasons like parent channel shutdown, + * idleness, or load balancing policy changes. + */ + SUBCHANNEL_SHUTDOWN("subchannel shutdown"), + + /** + * Connection was reset (e.g., ECONNRESET, WSAECONNERESET). + */ + CONNECTION_RESET("connection reset"), + + /** + * Connection timed out (e.g., ETIMEDOUT, WSAETIMEDOUT), including closures + * from gRPC keepalives. + */ + CONNECTION_TIMED_OUT("connection timed out"), + + /** + * Connection was aborted (e.g., ECONNABORTED, WSAECONNABORTED). + */ + CONNECTION_ABORTED("connection aborted"), + + /** + * Any socket error not covered by other specific disconnect errors. + */ + SOCKET_ERROR("socket error"), + + /** + * A catch-all for any other unclassified reason. + */ + UNKNOWN("unknown"); + + private final String errorTag; + + SimpleDisconnectError(String errorTag) { + this.errorTag = errorTag; + } + + @Override + public String toErrorString() { + return this.errorTag; + } +} diff --git a/core/src/main/java/io/grpc/internal/SubchannelMetrics.java b/core/src/main/java/io/grpc/internal/SubchannelMetrics.java index 8921f13ebe6..4bc2cf47046 100644 --- a/core/src/main/java/io/grpc/internal/SubchannelMetrics.java +++ b/core/src/main/java/io/grpc/internal/SubchannelMetrics.java @@ -22,7 +22,6 @@ import io.grpc.LongUpDownCounterMetricInstrument; import io.grpc.MetricInstrumentRegistry; import io.grpc.MetricRecorder; -import javax.annotation.Nullable; final class SubchannelMetrics { @@ -106,84 +105,4 @@ public void recordDisconnection(String target, String backendService, String loc ImmutableList.of(target), ImmutableList.of(securityLevel, backendService, locality)); } - - /** - * Represents the reason for a subchannel failure. - */ - public enum DisconnectError { - - /** - * Represents an HTTP/2 GOAWAY frame. The specific error code - * (e.g., "NO_ERROR", "PROTOCOL_ERROR") should be handled separately - * as it is a dynamic part of the error. - * See RFC 9113 for error codes: https://www.rfc-editor.org/rfc/rfc9113.html#name-error-codes - */ - GOAWAY("goaway"), - - /** - * The subchannel was shut down for various reasons like parent channel shutdown, - * idleness, or load balancing policy changes. - */ - SUBCHANNEL_SHUTDOWN("subchannel shutdown"), - - /** - * Connection was reset (e.g., ECONNRESET, WSAECONNERESET). - */ - CONNECTION_RESET("connection reset"), - - /** - * Connection timed out (e.g., ETIMEDOUT, WSAETIMEDOUT), including closures - * from gRPC keepalives. - */ - CONNECTION_TIMED_OUT("connection timed out"), - - /** - * Connection was aborted (e.g., ECONNABORTED, WSAECONNABORTED). - */ - CONNECTION_ABORTED("connection aborted"), - - /** - * Any socket error not covered by other specific disconnect errors. - */ - SOCKET_ERROR("socket error"), - - /** - * A catch-all for any other unclassified reason. - */ - UNKNOWN("unknown"); - - private final String errorTag; - - /** - * Private constructor to associate a description with each enum constant. - * - * @param errorTag The detailed explanation of the error. - */ - DisconnectError(String errorTag) { - this.errorTag = errorTag; - } - - /** - * Gets the error string suitable for use as a metric tag. - * - *

If the reason is {@code GOAWAY}, this method requires the specific - * HTTP/2 error code to create the complete tag (e.g., "goaway PROTOCOL_ERROR"). - * For all other reasons, the parameter is ignored.

- * - * @param goawayErrorCode The specific HTTP/2 error code. This is only - * used if the reason is GOAWAY and should not be null in that case. - * @return The formatted error string. - */ - public String getErrorString(@Nullable String goawayErrorCode) { - if (this == GOAWAY) { - if (goawayErrorCode == null || goawayErrorCode.isEmpty()) { - // Return the base tag if the code is missing, or consider throwing an exception - // throw new IllegalArgumentException("goawayErrorCode is required for GOAWAY reason."); - return this.errorTag; - } - return this.errorTag + " " + goawayErrorCode; - } - return this.errorTag; - } - } } diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java index 902c2835a92..2587877ba7b 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java @@ -175,7 +175,8 @@ public void uncaughtException(Thread t, Throwable e) { delayedTransport.reprocess(mockPicker); assertEquals(0, delayedTransport.getPendingStreamsCount()); delayedTransport.shutdown(SHUTDOWN_STATUS); - verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); + verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS), + eq(SimpleDisconnectError.SUBCHANNEL_SHUTDOWN)); verify(transportListener).transportTerminated(); assertEquals(0, fakeExecutor.runDueTasks()); verify(mockRealTransport).newStream( @@ -187,7 +188,8 @@ public void uncaughtException(Thread t, Throwable e) { @Test public void transportTerminatedThenAssignTransport() { delayedTransport.shutdown(SHUTDOWN_STATUS); - verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); + verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS), + eq(SimpleDisconnectError.SUBCHANNEL_SHUTDOWN)); verify(transportListener).transportTerminated(); delayedTransport.reprocess(mockPicker); verifyNoMoreInteractions(transportListener); @@ -196,7 +198,8 @@ public void uncaughtException(Thread t, Throwable e) { @Test public void assignTransportThenShutdownThenNewStream() { delayedTransport.reprocess(mockPicker); delayedTransport.shutdown(SHUTDOWN_STATUS); - verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); + verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS), + eq(SimpleDisconnectError.SUBCHANNEL_SHUTDOWN)); verify(transportListener).transportTerminated(); ClientStream stream = delayedTransport.newStream( method, headers, callOptions, tracers); @@ -210,7 +213,8 @@ public void uncaughtException(Thread t, Throwable e) { @Test public void assignTransportThenShutdownNowThenNewStream() { delayedTransport.reprocess(mockPicker); delayedTransport.shutdownNow(Status.UNAVAILABLE); - verify(transportListener).transportShutdown(any(Status.class)); + verify(transportListener).transportShutdown(any(Status.class), + eq(SimpleDisconnectError.SUBCHANNEL_SHUTDOWN)); verify(transportListener).transportTerminated(); ClientStream stream = delayedTransport.newStream( method, headers, callOptions, tracers); @@ -241,7 +245,8 @@ public void uncaughtException(Thread t, Throwable e) { delayedTransport.shutdown(SHUTDOWN_STATUS); // Stream is still buffered - verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); + verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS), + eq(SimpleDisconnectError.SUBCHANNEL_SHUTDOWN)); verify(transportListener, times(0)).transportTerminated(); assertEquals(1, delayedTransport.getPendingStreamsCount()); @@ -275,7 +280,8 @@ public void uncaughtException(Thread t, Throwable e) { ClientStream stream = delayedTransport.newStream( method, new Metadata(), CallOptions.DEFAULT, tracers); delayedTransport.shutdown(SHUTDOWN_STATUS); - verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); + verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS), + eq(SimpleDisconnectError.SUBCHANNEL_SHUTDOWN)); verify(transportListener, times(0)).transportTerminated(); assertEquals(1, delayedTransport.getPendingStreamsCount()); stream.start(streamListener); @@ -288,7 +294,8 @@ public void uncaughtException(Thread t, Throwable e) { @Test public void shutdownThenNewStream() { delayedTransport.shutdown(SHUTDOWN_STATUS); - verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); + verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS), + eq(SimpleDisconnectError.SUBCHANNEL_SHUTDOWN)); verify(transportListener).transportTerminated(); ClientStream stream = delayedTransport.newStream( method, new Metadata(), CallOptions.DEFAULT, tracers); @@ -303,7 +310,8 @@ public void uncaughtException(Thread t, Throwable e) { method, new Metadata(), CallOptions.DEFAULT, tracers); stream.start(streamListener); delayedTransport.shutdownNow(Status.UNAVAILABLE); - verify(transportListener).transportShutdown(any(Status.class)); + verify(transportListener).transportShutdown(any(Status.class), + eq(SimpleDisconnectError.SUBCHANNEL_SHUTDOWN)); verify(transportListener).transportTerminated(); verify(streamListener) .closed(statusCaptor.capture(), eq(RpcProgress.REFUSED), any(Metadata.class)); @@ -312,7 +320,8 @@ public void uncaughtException(Thread t, Throwable e) { @Test public void shutdownNowThenNewStream() { delayedTransport.shutdownNow(Status.UNAVAILABLE); - verify(transportListener).transportShutdown(any(Status.class)); + verify(transportListener).transportShutdown(any(Status.class), + eq(SimpleDisconnectError.SUBCHANNEL_SHUTDOWN)); verify(transportListener).transportTerminated(); ClientStream stream = delayedTransport.newStream( method, new Metadata(), CallOptions.DEFAULT, tracers); @@ -487,7 +496,8 @@ public void uncaughtException(Thread t, Throwable e) { // wfr5 will stop delayed transport from terminating delayedTransport.shutdown(SHUTDOWN_STATUS); - verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); + verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS), + eq(SimpleDisconnectError.SUBCHANNEL_SHUTDOWN)); verify(transportListener, never()).transportTerminated(); // ... until it's gone picker = mock(SubchannelPicker.class); diff --git a/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java b/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java index 4ac5fbac362..811344da307 100644 --- a/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java +++ b/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java @@ -233,7 +233,8 @@ public void constructor_eagListWithNull_throws() { // Fail this one. Because there is only one address to try, enter TRANSIENT_FAILURE. assertNoCallbackInvoke(); - transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE); // Backoff reset and using first back-off value interval @@ -264,7 +265,8 @@ public void constructor_eagListWithNull_throws() { assertNoCallbackInvoke(); // Here we use a different status from the first failure, and verify that it's passed to // the callback. - transports.poll().listener.transportShutdown(Status.RESOURCE_EXHAUSTED); + transports.poll().listener.transportShutdown(Status.RESOURCE_EXHAUSTED, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); assertExactCallbackInvokes("onStateChange:" + RESOURCE_EXHAUSTED_STATE); // Second back-off interval @@ -302,7 +304,8 @@ public void constructor_eagListWithNull_throws() { // Close the READY transport, will enter IDLE state. assertNoCallbackInvoke(); - transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); assertEquals(IDLE, internalSubchannel.getState()); assertExactCallbackInvokes("onStateChange:IDLE"); @@ -334,7 +337,8 @@ public void constructor_eagListWithNull_throws() { assertEquals(CONNECTING, internalSubchannel.getState()); verify(mockTransportFactory).newClientTransport(eq(addr1), any(), any()); // Let this one fail without success - transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); // Still in CONNECTING assertNull(internalSubchannel.obtainActiveTransport()); assertNoCallbackInvoke(); @@ -350,7 +354,8 @@ public void constructor_eagListWithNull_throws() { assertNull(internalSubchannel.obtainActiveTransport()); // Fail this one too assertNoCallbackInvoke(); - transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); // All addresses have failed, but we aren't controlling retries. assertEquals(IDLE, internalSubchannel.getState()); assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE); @@ -394,7 +399,8 @@ public void constructor_eagListWithNull_throws() { isA(TransportLogger.class)); // Let this one fail without success - transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); // Still in CONNECTING assertNull(internalSubchannel.obtainActiveTransport()); assertNoCallbackInvoke(); @@ -410,7 +416,8 @@ public void constructor_eagListWithNull_throws() { assertNull(internalSubchannel.obtainActiveTransport()); // Fail this one too assertNoCallbackInvoke(); - transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); // All addresses have failed. Delayed transport will be in back-off interval. assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE); @@ -441,7 +448,8 @@ public void constructor_eagListWithNull_throws() { eq(createClientTransportOptions()), isA(TransportLogger.class)); // Fail this one too - transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); assertEquals(CONNECTING, internalSubchannel.getState()); // Forth attempt will start immediately. Keep back-off policy. @@ -455,7 +463,8 @@ public void constructor_eagListWithNull_throws() { isA(TransportLogger.class)); // Fail this one too assertNoCallbackInvoke(); - transports.poll().listener.transportShutdown(Status.RESOURCE_EXHAUSTED); + transports.poll().listener.transportShutdown(Status.RESOURCE_EXHAUSTED, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); // All addresses have failed again. Delayed transport will be in back-off interval. assertExactCallbackInvokes("onStateChange:" + RESOURCE_EXHAUSTED_STATE); assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); @@ -492,7 +501,8 @@ public void constructor_eagListWithNull_throws() { ((CallTracingTransport) internalSubchannel.obtainActiveTransport()).delegate()); // Then close it. assertNoCallbackInvoke(); - transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); assertExactCallbackInvokes("onStateChange:IDLE"); assertEquals(IDLE, internalSubchannel.getState()); @@ -508,7 +518,8 @@ public void constructor_eagListWithNull_throws() { eq(createClientTransportOptions()), isA(TransportLogger.class)); // Fail the transport - transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); assertEquals(CONNECTING, internalSubchannel.getState()); // Second attempt will start immediately. Still no new back-off policy. @@ -520,7 +531,8 @@ public void constructor_eagListWithNull_throws() { isA(TransportLogger.class)); // Fail this one too assertEquals(CONNECTING, internalSubchannel.getState()); - transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); // All addresses have failed. Enter TRANSIENT_FAILURE. Back-off in effect. assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE); assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); @@ -584,7 +596,8 @@ public void updateAddresses_eagListWithNull_throws() { eq(addr1), eq(createClientTransportOptions()), isA(TransportLogger.class)); - transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); assertEquals(CONNECTING, internalSubchannel.getState()); // Second address connects @@ -606,7 +619,8 @@ public void updateAddresses_eagListWithNull_throws() { verify(transports.peek().transport, never()).shutdownNow(any(Status.class)); // And new addresses chosen when re-connecting - transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); assertExactCallbackInvokes("onStateChange:IDLE"); assertNull(internalSubchannel.obtainActiveTransport()); @@ -616,13 +630,15 @@ public void updateAddresses_eagListWithNull_throws() { eq(addr2), eq(createClientTransportOptions()), isA(TransportLogger.class)); - transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); verify(mockTransportFactory) .newClientTransport( eq(addr3), eq(createClientTransportOptions()), isA(TransportLogger.class)); - transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); verifyNoMoreInteractions(mockTransportFactory); fakeClock.forwardNanos(10); // Drain retry, but don't care about result @@ -643,7 +659,8 @@ public void updateAddresses_eagListWithNull_throws() { eq(addr1), eq(createClientTransportOptions()), isA(TransportLogger.class)); - transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); assertEquals(CONNECTING, internalSubchannel.getState()); // Second address connecting @@ -666,7 +683,8 @@ public void updateAddresses_eagListWithNull_throws() { // And new addresses chosen when re-connecting transports.peek().listener.transportReady(); assertExactCallbackInvokes("onStateChange:READY"); - transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); assertExactCallbackInvokes("onStateChange:IDLE"); assertNull(internalSubchannel.obtainActiveTransport()); @@ -676,13 +694,15 @@ public void updateAddresses_eagListWithNull_throws() { eq(addr2), eq(createClientTransportOptions()), isA(TransportLogger.class)); - transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); verify(mockTransportFactory) .newClientTransport( eq(addr3), eq(createClientTransportOptions()), isA(TransportLogger.class)); - transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); verifyNoMoreInteractions(mockTransportFactory); fakeClock.forwardNanos(10); // Drain retry, but don't care about result @@ -721,7 +741,8 @@ public void updateAddresses_eagListWithNull_throws() { // And no other addresses attempted assertEquals(0, fakeClock.numPendingTasks()); - transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE); assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); verifyNoMoreInteractions(mockTransportFactory); @@ -745,7 +766,8 @@ public void updateAddresses_eagListWithNull_throws() { eq(addr1), eq(createClientTransportOptions()), isA(TransportLogger.class)); - transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); assertEquals(CONNECTING, internalSubchannel.getState()); // Second address connects @@ -769,7 +791,8 @@ public void updateAddresses_eagListWithNull_throws() { verify(transports.peek().transport).shutdown(any(Status.class)); // And new addresses chosen when re-connecting - transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); assertNoCallbackInvoke(); assertEquals(IDLE, internalSubchannel.getState()); @@ -780,13 +803,15 @@ public void updateAddresses_eagListWithNull_throws() { eq(addr3), eq(createClientTransportOptions()), isA(TransportLogger.class)); - transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); verify(mockTransportFactory) .newClientTransport( eq(addr4), eq(createClientTransportOptions()), isA(TransportLogger.class)); - transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); verifyNoMoreInteractions(mockTransportFactory); fakeClock.forwardNanos(10); // Drain retry, but don't care about result @@ -808,7 +833,8 @@ public void updateAddresses_eagListWithNull_throws() { eq(addr1), eq(createClientTransportOptions()), isA(TransportLogger.class)); - transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); assertEquals(CONNECTING, internalSubchannel.getState()); // Second address connecting @@ -838,13 +864,15 @@ public void updateAddresses_eagListWithNull_throws() { eq(addr3), eq(createClientTransportOptions()), isA(TransportLogger.class)); - transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); verify(mockTransportFactory) .newClientTransport( eq(addr4), eq(createClientTransportOptions()), isA(TransportLogger.class)); - transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); verifyNoMoreInteractions(mockTransportFactory); fakeClock.forwardNanos(10); // Drain retry, but don't care about result @@ -928,7 +956,8 @@ public void connectIsLazy() { isA(TransportLogger.class)); // Fail this one - transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE); // Will always reconnect after back-off @@ -944,7 +973,8 @@ public void connectIsLazy() { transports.peek().listener.transportReady(); assertExactCallbackInvokes("onStateChange:READY"); // Then go-away - transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); assertExactCallbackInvokes("onStateChange:IDLE"); // No scheduled tasks that would ever try to reconnect ... @@ -974,7 +1004,8 @@ public void shutdownWhenReady() throws Exception { internalSubchannel.shutdown(SHUTDOWN_REASON); verify(transportInfo.transport).shutdown(same(SHUTDOWN_REASON)); assertExactCallbackInvokes("onStateChange:SHUTDOWN"); - transportInfo.listener.transportShutdown(SHUTDOWN_REASON); + transportInfo.listener.transportShutdown(SHUTDOWN_REASON, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); transportInfo.listener.transportTerminated(); assertExactCallbackInvokes("onTerminated"); @@ -997,7 +1028,8 @@ public void shutdownBeforeTransportCreated() throws Exception { // Fail this one MockClientTransportInfo transportInfo = transports.poll(); - transportInfo.listener.transportShutdown(Status.UNAVAILABLE); + transportInfo.listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); transportInfo.listener.transportTerminated(); // Entering TRANSIENT_FAILURE, waiting for back-off @@ -1053,7 +1085,8 @@ public void shutdownBeforeTransportReady() throws Exception { // The transport should've been shut down even though it's not the active transport yet. verify(transportInfo.transport).shutdown(same(SHUTDOWN_REASON)); - transportInfo.listener.transportShutdown(Status.UNAVAILABLE); + transportInfo.listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); assertNoCallbackInvoke(); transportInfo.listener.transportTerminated(); assertExactCallbackInvokes("onTerminated"); @@ -1069,7 +1102,7 @@ public void shutdownNow() throws Exception { MockClientTransportInfo t1 = transports.poll(); t1.listener.transportReady(); assertExactCallbackInvokes("onStateChange:CONNECTING", "onStateChange:READY"); - t1.listener.transportShutdown(Status.UNAVAILABLE); + t1.listener.transportShutdown(Status.UNAVAILABLE, SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); assertExactCallbackInvokes("onStateChange:IDLE"); internalSubchannel.obtainActiveTransport(); @@ -1126,7 +1159,7 @@ public void inUseState() { t0.listener.transportInUse(true); assertExactCallbackInvokes("onInUse"); - t0.listener.transportShutdown(Status.UNAVAILABLE); + t0.listener.transportShutdown(Status.UNAVAILABLE, SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); assertExactCallbackInvokes("onStateChange:IDLE"); assertNull(internalSubchannel.obtainActiveTransport()); @@ -1159,7 +1192,7 @@ public void transportTerminateWithoutExitingInUse() { t0.listener.transportInUse(true); assertExactCallbackInvokes("onInUse"); - t0.listener.transportShutdown(Status.UNAVAILABLE); + t0.listener.transportShutdown(Status.UNAVAILABLE, SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); assertExactCallbackInvokes("onStateChange:IDLE"); t0.listener.transportTerminated(); assertExactCallbackInvokes("onNotInUse"); @@ -1186,12 +1219,12 @@ public void run() { assertEquals(1, runnableInvokes.get()); MockClientTransportInfo t0 = transports.poll(); - t0.listener.transportShutdown(Status.UNAVAILABLE); + t0.listener.transportShutdown(Status.UNAVAILABLE, SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); assertEquals(2, runnableInvokes.get()); // 2nd address: reconnect immediatly MockClientTransportInfo t1 = transports.poll(); - t1.listener.transportShutdown(Status.UNAVAILABLE); + t1.listener.transportShutdown(Status.UNAVAILABLE, SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); // Addresses exhausted, waiting for back-off. assertEquals(2, runnableInvokes.get()); @@ -1218,7 +1251,8 @@ public void resetConnectBackoff() throws Exception { eq(addr), eq(createClientTransportOptions()), isA(TransportLogger.class)); - transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE); // Save the reconnectTask @@ -1254,7 +1288,8 @@ public void resetConnectBackoff() throws Exception { // Fail the reconnect attempt to verify that a fresh reconnect policy is generated after // invoking resetConnectBackoff() - transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE); verify(mockBackoffPolicyProvider, times(2)).get(); fakeClock.forwardNanos(10); @@ -1282,7 +1317,8 @@ public void channelzMembership() throws Exception { MockClientTransportInfo t0 = transports.poll(); t0.listener.transportReady(); assertTrue(channelz.containsClientSocket(t0.transport.getLogId())); - t0.listener.transportShutdown(Status.RESOURCE_EXHAUSTED); + t0.listener.transportShutdown(Status.RESOURCE_EXHAUSTED, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); t0.listener.transportTerminated(); assertFalse(channelz.containsClientSocket(t0.transport.getLogId())); } @@ -1502,7 +1538,8 @@ public void subchannelStateChanges_triggersAttemptFailedMetric() { // b. Fail the transport before it can signal `transportReady()`. transportInfo.listener.transportShutdown( - Status.INTERNAL.withDescription("Simulated connect failure")); + Status.INTERNAL.withDescription("Simulated connect failure"), + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); fakeClock.runDueTasks(); // Process the failure event // --- Verification --- @@ -1556,7 +1593,8 @@ public void subchannelStateChanges_triggersSuccessAndDisconnectMetrics() { fakeClock.runDueTasks(); // Process the successful connection // --- Action: Transport is shut down --- - transportInfo.listener.transportShutdown(Status.UNAVAILABLE.withDescription("unknown")); + transportInfo.listener.transportShutdown(Status.UNAVAILABLE.withDescription("unknown"), + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); fakeClock.runDueTasks(); // Process the shutdown // --- Verification --- @@ -1581,7 +1619,7 @@ public void subchannelStateChanges_triggersSuccessAndDisconnectMetrics() { eqMetricInstrumentName("grpc.subchannel.disconnections"), eq(1L), eq(Arrays.asList(AUTHORITY)), - eq(Arrays.asList(BACKEND_SERVICE, LOCALITY, "unknown")) + eq(Arrays.asList(BACKEND_SERVICE, LOCALITY, "subchannel shutdown")) ); inOrder.verify(mockMetricRecorder).addLongUpDownCounter( eqMetricInstrumentName("grpc.subchannel.open_connections"), diff --git a/core/src/test/java/io/grpc/internal/KeepAliveManagerTest.java b/core/src/test/java/io/grpc/internal/KeepAliveManagerTest.java index 3cf7bfcedfe..81e3d1b2638 100644 --- a/core/src/test/java/io/grpc/internal/KeepAliveManagerTest.java +++ b/core/src/test/java/io/grpc/internal/KeepAliveManagerTest.java @@ -19,6 +19,7 @@ import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -104,13 +105,15 @@ public void keepAlivePingDelayedByIncomingData() { @Test public void clientKeepAlivePinger_pingTimeout() { - ConnectionClientTransport transport = mock(ConnectionClientTransport.class); + ClientKeepAlivePinger.TransportWithDisconnectReason transport = + mock(ClientKeepAlivePinger.TransportWithDisconnectReason.class); ClientKeepAlivePinger pinger = new ClientKeepAlivePinger(transport); pinger.onPingTimeout(); ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); - verify(transport).shutdownNow(statusCaptor.capture()); + verify(transport).shutdownNow(statusCaptor.capture(), + eq(SimpleDisconnectError.CONNECTION_TIMED_OUT)); Status status = statusCaptor.getValue(); assertThat(status.getCode()).isEqualTo(Status.Code.UNAVAILABLE); assertThat(status.getDescription()).isEqualTo( @@ -119,7 +122,8 @@ public void clientKeepAlivePinger_pingTimeout() { @Test public void clientKeepAlivePinger_pingFailure() { - ConnectionClientTransport transport = mock(ConnectionClientTransport.class); + ClientKeepAlivePinger.TransportWithDisconnectReason transport = + mock(ClientKeepAlivePinger.TransportWithDisconnectReason.class); ClientKeepAlivePinger pinger = new ClientKeepAlivePinger(transport); pinger.ping(); ArgumentCaptor pingCallbackCaptor = @@ -130,7 +134,8 @@ public void clientKeepAlivePinger_pingFailure() { pingCallback.onFailure(Status.UNAVAILABLE.withDescription("I must write descriptions")); ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); - verify(transport).shutdownNow(statusCaptor.capture()); + verify(transport).shutdownNow(statusCaptor.capture(), + eq(SimpleDisconnectError.CONNECTION_TIMED_OUT)); Status status = statusCaptor.getValue(); assertThat(status.getCode()).isEqualTo(Status.Code.UNAVAILABLE); assertThat(status.getDescription()).isEqualTo( diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index efc582703ba..ab4b60efb18 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -798,7 +798,8 @@ public void channelzMembership_subchannel() throws Exception { transportInfo.listener.transportReady(); // terminate transport - transportInfo.listener.transportShutdown(Status.CANCELLED); + transportInfo.listener.transportShutdown(Status.CANCELLED, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); transportInfo.listener.transportTerminated(); assertFalse(channelz.containsClientSocket(transportInfo.transport.getLogId())); @@ -841,7 +842,8 @@ public void channelzMembership_oob() throws Exception { assertTrue(channelz.containsClientSocket(transportInfo.transport.getLogId())); // terminate transport - transportInfo.listener.transportShutdown(Status.INTERNAL); + transportInfo.listener.transportShutdown(Status.INTERNAL, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); transportInfo.listener.transportTerminated(); assertFalse(channelz.containsClientSocket(transportInfo.transport.getLogId())); @@ -1002,7 +1004,8 @@ private void subtestCallsAndShutdown(boolean shutdownNow, boolean shutdownNowAft } // Killing the remaining real transport will terminate the channel - transportListener.transportShutdown(Status.UNAVAILABLE); + transportListener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); assertFalse(channel.isTerminated()); verify(executorPool, never()).returnObject(any()); transportListener.transportTerminated(); @@ -1072,7 +1075,8 @@ public void noMoreCallbackAfterLoadBalancerShutdown() { // Since subchannels are shutdown, SubchannelStateListeners will only get SHUTDOWN regardless of // the transport states. - transportInfo1.listener.transportShutdown(Status.UNAVAILABLE); + transportInfo1.listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); transportInfo2.listener.transportReady(); verify(stateListener1).onSubchannelState(ConnectivityStateInfo.forNonError(SHUTDOWN)); verify(stateListener2).onSubchannelState(ConnectivityStateInfo.forNonError(SHUTDOWN)); @@ -1141,7 +1145,8 @@ public void noMoreCallbackAfterLoadBalancerShutdown_configError() throws Interru // Since subchannels are shutdown, SubchannelStateListeners will only get SHUTDOWN regardless of // the transport states. - transportInfo1.listener.transportShutdown(Status.UNAVAILABLE); + transportInfo1.listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); transportInfo2.listener.transportReady(); verify(stateListener1).onSubchannelState(ConnectivityStateInfo.forNonError(SHUTDOWN)); verify(stateListener2).onSubchannelState(ConnectivityStateInfo.forNonError(SHUTDOWN)); @@ -1277,7 +1282,8 @@ public void callOptionsExecutor() { verify(mockCallListener).onClose(same(Status.CANCELLED), same(trailers)); - transportListener.transportShutdown(Status.UNAVAILABLE); + transportListener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); transportListener.transportTerminated(); // Clean up as much as possible to allow the channel to terminate. @@ -1429,7 +1435,8 @@ public void firstResolvedServerFailedToConnect() throws Exception { MockClientTransportInfo badTransportInfo = transports.poll(); // Which failed to connect - badTransportInfo.listener.transportShutdown(Status.UNAVAILABLE); + badTransportInfo.listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); inOrder.verifyNoMoreInteractions(); // The channel then try the second address (goodAddress) @@ -1579,7 +1586,8 @@ public void allServersFailedToConnect() throws Exception { .newClientTransport( same(addr2), any(ClientTransportOptions.class), any(ChannelLogger.class)); MockClientTransportInfo transportInfo1 = transports.poll(); - transportInfo1.listener.transportShutdown(Status.UNAVAILABLE); + transportInfo1.listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); // Connecting to server2, which will fail too verify(mockTransportFactory) @@ -1587,7 +1595,8 @@ public void allServersFailedToConnect() throws Exception { same(addr2), any(ClientTransportOptions.class), any(ChannelLogger.class)); MockClientTransportInfo transportInfo2 = transports.poll(); Status server2Error = Status.UNAVAILABLE.withDescription("Server2 failed to connect"); - transportInfo2.listener.transportShutdown(server2Error); + transportInfo2.listener.transportShutdown(server2Error, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); // ... which makes the subchannel enter TRANSIENT_FAILURE. The last error Status is propagated // to LoadBalancer. @@ -1697,9 +1706,11 @@ public void run() { verify(transportInfo2.transport).shutdown(same(ManagedChannelImpl.SHUTDOWN_STATUS)); // Cleanup - transportInfo1.listener.transportShutdown(Status.UNAVAILABLE); + transportInfo1.listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); transportInfo1.listener.transportTerminated(); - transportInfo2.listener.transportShutdown(Status.UNAVAILABLE); + transportInfo2.listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); transportInfo2.listener.transportTerminated(); timer.forwardTime(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS); } @@ -1739,8 +1750,10 @@ public void subchannelsWhenChannelShutdownNow() { verify(ti1.transport).shutdownNow(any(Status.class)); verify(ti2.transport).shutdownNow(any(Status.class)); - ti1.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now")); - ti2.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now")); + ti1.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now"), + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); + ti2.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now"), + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); ti1.listener.transportTerminated(); assertFalse(channel.isTerminated()); @@ -1829,7 +1842,8 @@ public void oobchannels() { ArgumentMatchers.any()); // The transport goes away - transportInfo.listener.transportShutdown(Status.UNAVAILABLE); + transportInfo.listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); transportInfo.listener.transportTerminated(); // A new call will trigger a new transport @@ -1848,7 +1862,8 @@ public void oobchannels() { // This transport fails Status transportError = Status.UNAVAILABLE.withDescription("Connection refused"); assertEquals(0, balancerRpcExecutor.numPendingTasks()); - transportInfo.listener.transportShutdown(transportError); + transportInfo.listener.transportShutdown(transportError, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); assertTrue(balancerRpcExecutor.runDueTasks() > 0); // Fail-fast RPC will fail, while wait-for-ready RPC will still be pending @@ -2102,8 +2117,10 @@ public void oobChannelsWhenChannelShutdownNow() { verify(ti1.transport).shutdownNow(any(Status.class)); verify(ti2.transport).shutdownNow(any(Status.class)); - ti1.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now")); - ti2.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now")); + ti1.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now"), + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); + ti2.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now"), + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); ti1.listener.transportTerminated(); assertFalse(channel.isTerminated()); @@ -2331,7 +2348,8 @@ private void subtestNameResolutionRefreshWhenConnectionFailed(boolean isIdle) { // Transport closed when connecting assertEquals(expectedRefreshCount, resolver.refreshCalled); - transportInfo.listener.transportShutdown(Status.UNAVAILABLE); + transportInfo.listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); // When channel enters idle, new resolver is created but not started. if (!isIdle) { expectedRefreshCount++; @@ -2346,7 +2364,8 @@ private void subtestNameResolutionRefreshWhenConnectionFailed(boolean isIdle) { // Transport closed when ready assertEquals(expectedRefreshCount, resolver.refreshCalled); - transportInfo.listener.transportShutdown(Status.UNAVAILABLE); + transportInfo.listener.transportShutdown(Status.UNAVAILABLE, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); // When channel enters idle, new resolver is created but not started. if (!isIdle) { expectedRefreshCount++; @@ -3917,7 +3936,8 @@ public double nextDouble() { verify(mockLoadBalancer).shutdown(); // simulating the shutdown of load balancer triggers the shutdown of subchannel shutdownSafely(helper, subchannel); - transportInfo.listener.transportShutdown(Status.INTERNAL); + transportInfo.listener.transportShutdown(Status.INTERNAL, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); transportInfo.listener.transportTerminated(); // simulating transport terminated assertTrue( "channel.isTerminated() is expected to be true but was false", @@ -4022,7 +4042,8 @@ public void hedgingScheduledThenChannelShutdown_hedgeShouldStillHappen_newCallSh // simulating the shutdown of load balancer triggers the shutdown of subchannel shutdownSafely(helper, subchannel); // simulating transport shutdown & terminated - transportInfo.listener.transportShutdown(Status.INTERNAL); + transportInfo.listener.transportShutdown(Status.INTERNAL, + SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); transportInfo.listener.transportTerminated(); assertTrue( "channel.isTerminated() is expected to be true but was false", @@ -4792,7 +4813,7 @@ public void transportTerminated(Attributes transportAttrs) { assertEquals(1, readyCallbackCalled.get()); assertEquals(0, terminationCallbackCalled.get()); - transportListener.transportShutdown(Status.OK); + transportListener.transportShutdown(Status.OK, SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); transportListener.transportTerminated(); assertEquals(1, terminationCallbackCalled.get()); diff --git a/core/src/test/java/io/grpc/internal/ManagedClientTransportTest.java b/core/src/test/java/io/grpc/internal/ManagedClientTransportTest.java index 0af88a62728..5ddea08131b 100644 --- a/core/src/test/java/io/grpc/internal/ManagedClientTransportTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedClientTransportTest.java @@ -32,7 +32,7 @@ public class ManagedClientTransportTest { public void testListener() { ManagedClientTransport.Listener listener = new ManagedClientTransport.Listener() { @Override - public void transportShutdown(Status s) {} + public void transportShutdown(Status s, DisconnectError e) {} @Override public void transportTerminated() {} @@ -45,7 +45,7 @@ public void transportInUse(boolean inUse) {} }; // Test that the listener methods do not throw. - listener.transportShutdown(Status.OK); + listener.transportShutdown(Status.OK, SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); listener.transportTerminated(); listener.transportReady(); listener.transportInUse(true); diff --git a/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java b/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java index c60174b2faf..5d6b88a1392 100644 --- a/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java +++ b/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java @@ -326,7 +326,8 @@ public void serverNotListening() throws Exception { runIfNotNull(client.start(mockClientTransportListener)); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated(); ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); - inOrder.verify(mockClientTransportListener).transportShutdown(statusCaptor.capture()); + inOrder.verify(mockClientTransportListener).transportShutdown(statusCaptor.capture(), + any(DisconnectError.class)); assertCodeEquals(Status.UNAVAILABLE, statusCaptor.getValue()); inOrder.verify(mockClientTransportListener).transportTerminated(); verify(mockClientTransportListener, never()).transportReady(); @@ -342,7 +343,8 @@ public void clientStartStop() throws Exception { Status shutdownReason = Status.UNAVAILABLE.withDescription("shutdown called"); client.shutdown(shutdownReason); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated(); - inOrder.verify(mockClientTransportListener).transportShutdown(same(shutdownReason)); + inOrder.verify(mockClientTransportListener).transportShutdown(same(shutdownReason), + any(DisconnectError.class)); inOrder.verify(mockClientTransportListener).transportTerminated(); verify(mockClientTransportListener, never()).transportInUse(anyBoolean()); } @@ -358,7 +360,8 @@ public void clientStartAndStopOnceConnected() throws Exception { = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); client.shutdown(Status.UNAVAILABLE); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated(); - inOrder.verify(mockClientTransportListener).transportShutdown(any(Status.class)); + inOrder.verify(mockClientTransportListener).transportShutdown(any(Status.class), + any(DisconnectError.class)); inOrder.verify(mockClientTransportListener).transportTerminated(); assertTrue(serverTransportListener.waitForTermination(TIMEOUT_MS, TimeUnit.MILLISECONDS)); server.shutdown(); @@ -454,7 +457,8 @@ public void openStreamPreventsTermination() throws Exception { serverTransport.shutdown(); serverTransport = null; - verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportShutdown(any(Status.class)); + verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportShutdown(any(Status.class), + any(DisconnectError.class)); assertTrue(serverListener.waitForShutdown(TIMEOUT_MS, TimeUnit.MILLISECONDS)); // A new server should be able to start listening, since the current server has given up @@ -504,7 +508,8 @@ public void shutdownNowKillsClientStream() throws Exception { client.shutdownNow(status); client = null; - verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportShutdown(any(Status.class)); + verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportShutdown(any(Status.class), + any(DisconnectError.class)); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated(); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(false); assertTrue(serverTransportListener.waitForTermination(TIMEOUT_MS, TimeUnit.MILLISECONDS)); @@ -543,7 +548,8 @@ public void shutdownNowKillsServerStream() throws Exception { serverTransport.shutdownNow(shutdownStatus); serverTransport = null; - verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportShutdown(any(Status.class)); + verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportShutdown(any(Status.class), + any(DisconnectError.class)); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated(); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(false); assertTrue(serverTransportListener.waitForTermination(TIMEOUT_MS, TimeUnit.MILLISECONDS)); @@ -591,7 +597,8 @@ public void ping_duringShutdown() throws Exception { ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); stream.start(clientStreamListener); client.shutdown(Status.UNAVAILABLE); - verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportShutdown(any(Status.class)); + verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportShutdown(any(Status.class), + any(DisconnectError.class)); ClientTransport.PingCallback mockPingCallback = mock(ClientTransport.PingCallback.class); try { client.ping(mockPingCallback, MoreExecutors.directExecutor()); @@ -635,7 +642,8 @@ public void newStream_duringShutdown() throws Exception { ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); stream.start(clientStreamListener); client.shutdown(Status.UNAVAILABLE); - verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportShutdown(any(Status.class)); + verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportShutdown(any(Status.class), + any(DisconnectError.class)); ClientStream stream2 = client.newStream( methodDescriptor, new Metadata(), callOptions, tracers); diff --git a/cronet/src/main/java/io/grpc/cronet/CronetClientTransport.java b/cronet/src/main/java/io/grpc/cronet/CronetClientTransport.java index 465df8b2cc9..99eb88737aa 100644 --- a/cronet/src/main/java/io/grpc/cronet/CronetClientTransport.java +++ b/cronet/src/main/java/io/grpc/cronet/CronetClientTransport.java @@ -34,6 +34,7 @@ import io.grpc.internal.ConnectionClientTransport; import io.grpc.internal.GrpcAttributes; import io.grpc.internal.GrpcUtil; +import io.grpc.internal.SimpleDisconnectError; import io.grpc.internal.StatsTraceContext; import io.grpc.internal.TransportTracer; import java.net.InetSocketAddress; @@ -229,7 +230,7 @@ private void startGoAway(Status status) { startedGoAway = true; } - listener.transportShutdown(status); + listener.transportShutdown(status, SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); synchronized (lock) { goAway = true; diff --git a/cronet/src/test/java/io/grpc/cronet/CronetClientTransportTest.java b/cronet/src/test/java/io/grpc/cronet/CronetClientTransportTest.java index 03c31f93329..3a79cc0b6a8 100644 --- a/cronet/src/test/java/io/grpc/cronet/CronetClientTransportTest.java +++ b/cronet/src/test/java/io/grpc/cronet/CronetClientTransportTest.java @@ -34,6 +34,7 @@ import io.grpc.Status; import io.grpc.cronet.CronetChannelBuilder.StreamBuilderFactory; import io.grpc.internal.ClientStreamListener; +import io.grpc.internal.DisconnectError; import io.grpc.internal.GrpcAttributes; import io.grpc.internal.ManagedClientTransport; import io.grpc.internal.TransportTracer; @@ -128,7 +129,8 @@ public void shutdownTransport() throws Exception { BidirectionalStream.Callback callback2 = callbackCaptor.getValue(); // Shut down the transport. transportShutdown should be called immediately. transport.shutdown(); - verify(clientTransportListener).transportShutdown(any(Status.class)); + verify(clientTransportListener).transportShutdown(any(Status.class), + any(DisconnectError.class)); // Have two live streams. Transport has not been terminated. verify(clientTransportListener, times(0)).transportTerminated(); diff --git a/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java b/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java index e31696eb631..a92f10fd5c5 100644 --- a/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java +++ b/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java @@ -58,6 +58,7 @@ import io.grpc.internal.ServerStreamListener; import io.grpc.internal.ServerTransport; import io.grpc.internal.ServerTransportListener; +import io.grpc.internal.SimpleDisconnectError; import io.grpc.internal.StatsTraceContext; import io.grpc.internal.StreamListener; import java.io.ByteArrayInputStream; @@ -327,7 +328,7 @@ private synchronized void notifyShutdown(Status s) { return; } shutdown = true; - clientTransportListener.transportShutdown(s); + clientTransportListener.transportShutdown(s, SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); } private synchronized void notifyTerminated() { diff --git a/netty/src/main/java/io/grpc/netty/ClientTransportLifecycleManager.java b/netty/src/main/java/io/grpc/netty/ClientTransportLifecycleManager.java index b4e53d5568c..01e7bc3ed12 100644 --- a/netty/src/main/java/io/grpc/netty/ClientTransportLifecycleManager.java +++ b/netty/src/main/java/io/grpc/netty/ClientTransportLifecycleManager.java @@ -19,6 +19,7 @@ import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.grpc.Attributes; import io.grpc.Status; +import io.grpc.internal.DisconnectError; import io.grpc.internal.ManagedClientTransport; /** Maintainer of transport lifecycle status. */ @@ -55,18 +56,18 @@ public void notifyReady() { * Marks transport as shutdown, but does not set the error status. This must eventually be * followed by a call to notifyShutdown. */ - public void notifyGracefulShutdown(Status s) { + public void notifyGracefulShutdown(Status s, DisconnectError disconnectError) { if (transportShutdown) { return; } transportShutdown = true; - listener.transportShutdown(s); + listener.transportShutdown(s, disconnectError); } /** Returns {@code true} if was the first shutdown. */ @CanIgnoreReturnValue - public boolean notifyShutdown(Status s) { - notifyGracefulShutdown(s); + public boolean notifyShutdown(Status s, DisconnectError disconnectError) { + notifyGracefulShutdown(s, disconnectError); if (shutdownStatus != null) { return false; } @@ -82,12 +83,12 @@ public void notifyInUse(boolean inUse) { listener.transportInUse(inUse); } - public void notifyTerminated(Status s) { + public void notifyTerminated(Status s, DisconnectError disconnectError) { if (transportTerminated) { return; } transportTerminated = true; - notifyShutdown(s); + notifyShutdown(s, disconnectError); listener.transportTerminated(); } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index d6bb3790433..8ebf89842ad 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -34,11 +34,14 @@ import io.grpc.StatusException; import io.grpc.internal.ClientStreamListener.RpcProgress; import io.grpc.internal.ClientTransport.PingCallback; +import io.grpc.internal.DisconnectError; +import io.grpc.internal.GoAwayDisconnectError; import io.grpc.internal.GrpcAttributes; import io.grpc.internal.GrpcUtil; import io.grpc.internal.Http2Ping; import io.grpc.internal.InUseStateAggregator; import io.grpc.internal.KeepAliveManager; +import io.grpc.internal.SimpleDisconnectError; import io.grpc.internal.TransportTracer; import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ClientHeadersDecoder; import io.netty.buffer.ByteBuf; @@ -478,7 +481,8 @@ public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exce logger.fine("Network channel being closed by the application."); if (ctx.channel().isActive()) { // Ignore notification that the socket was closed lifecycleManager.notifyShutdown( - Status.UNAVAILABLE.withDescription("Transport closed for unknown reason")); + Status.UNAVAILABLE.withDescription("Transport closed for unknown reason"), + SimpleDisconnectError.UNKNOWN); } super.close(ctx, promise); } @@ -491,7 +495,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { try { logger.fine("Network channel is closed"); Status status = Status.UNAVAILABLE.withDescription("Network closed for unknown reason"); - lifecycleManager.notifyShutdown(status); + lifecycleManager.notifyShutdown(status, SimpleDisconnectError.UNKNOWN); final Status streamStatus; if (channelInactiveReason != null) { streamStatus = channelInactiveReason; @@ -512,7 +516,7 @@ public boolean visit(Http2Stream stream) throws Http2Exception { } }); } finally { - lifecycleManager.notifyTerminated(status); + lifecycleManager.notifyTerminated(status, SimpleDisconnectError.UNKNOWN); } } finally { // Close any open streams @@ -560,7 +564,8 @@ InternalChannelz.Security getSecurityInfo() { protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound, Throwable cause, Http2Exception http2Ex) { logger.log(Level.FINE, "Caught a connection error", cause); - lifecycleManager.notifyShutdown(Utils.statusFromThrowable(cause)); + lifecycleManager.notifyShutdown(Utils.statusFromThrowable(cause), + SimpleDisconnectError.SOCKET_ERROR); // Parent class will shut down the Channel super.onConnectionError(ctx, outbound, cause, http2Ex); } @@ -667,7 +672,7 @@ private void createStream(CreateStreamCommand command, ChannelPromise promise) if (!connection().goAwaySent()) { logger.fine("Stream IDs have been exhausted for this connection. " + "Initiating graceful shutdown of the connection."); - lifecycleManager.notifyShutdown(e.getStatus()); + lifecycleManager.notifyShutdown(e.getStatus(), SimpleDisconnectError.UNKNOWN); close(ctx(), ctx().newPromise()); } return; @@ -893,7 +898,7 @@ public void operationComplete(ChannelFuture future) throws Exception { private void gracefulClose(ChannelHandlerContext ctx, GracefulCloseCommand msg, ChannelPromise promise) throws Exception { - lifecycleManager.notifyShutdown(msg.getStatus()); + lifecycleManager.notifyShutdown(msg.getStatus(), SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); // Explicitly flush to create any buffered streams before sending GOAWAY. // TODO(ejona): determine if the need to flush is a bug in Netty flush(ctx); @@ -929,13 +934,15 @@ public boolean visit(Http2Stream stream) throws Http2Exception { private void goingAway(long errorCode, byte[] debugData) { Status finalStatus = statusFromH2Error( Status.Code.UNAVAILABLE, "GOAWAY shut down transport", errorCode, debugData); - lifecycleManager.notifyGracefulShutdown(finalStatus); + DisconnectError disconnectError = new GoAwayDisconnectError( + GrpcUtil.Http2Error.forCode(errorCode)); + lifecycleManager.notifyGracefulShutdown(finalStatus, disconnectError); abruptGoAwayStatus = statusFromH2Error( Status.Code.UNAVAILABLE, "Abrupt GOAWAY closed unsent stream", errorCode, debugData); // While this _should_ be UNAVAILABLE, Netty uses the wrong stream id in the GOAWAY when it // fails streams due to HPACK failures (e.g., header list too large). To be more conservative, // we assume any sent streams may be related to the GOAWAY. This should rarely impact users - // since the main time servers should use abrupt GOAWAYs is if there is a protocol error, and if + // since the main time servers should use abrupt GOAWAYs if there is a protocol error, and if // there wasn't a protocol error the error code was probably NO_ERROR which is mapped to // UNAVAILABLE. https://github.com/netty/netty/issues/10670 final Status abruptGoAwayStatusConservative = statusFromH2Error( @@ -950,7 +957,7 @@ private void goingAway(long errorCode, byte[] debugData) { // This can cause reentrancy, but should be minor since it is normal to handle writes in // response to a read. Also, the call stack is rather shallow at this point clientWriteQueue.drainNow(); - if (lifecycleManager.notifyShutdown(finalStatus)) { + if (lifecycleManager.notifyShutdown(finalStatus, disconnectError)) { // This is for the only RPCs that are actually covered by the GOAWAY error code. All other // RPCs were not observed by the remote and so should be UNAVAILABLE. channelInactiveReason = statusFromH2Error( diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index e03989e9906..af5ce5f5f4b 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -37,11 +37,13 @@ import io.grpc.Status; import io.grpc.internal.ClientStream; import io.grpc.internal.ConnectionClientTransport; +import io.grpc.internal.DisconnectError; import io.grpc.internal.FailingClientStream; import io.grpc.internal.GrpcUtil; import io.grpc.internal.Http2Ping; import io.grpc.internal.KeepAliveManager; import io.grpc.internal.KeepAliveManager.ClientKeepAlivePinger; +import io.grpc.internal.SimpleDisconnectError; import io.grpc.internal.StatsTraceContext; import io.grpc.internal.TransportTracer; import io.grpc.netty.NettyChannelBuilder.LocalSocketPicker; @@ -68,7 +70,8 @@ /** * A Netty-based {@link ConnectionClientTransport} implementation. */ -class NettyClientTransport implements ConnectionClientTransport { +class NettyClientTransport implements ConnectionClientTransport, + ClientKeepAlivePinger.TransportWithDisconnectReason { private final InternalLogId logId; private final Map, ?> channelOptions; @@ -231,8 +234,8 @@ public Runnable start(Listener transportListener) { EventLoop eventLoop = group.next(); if (keepAliveTimeNanos != KEEPALIVE_TIME_NANOS_DISABLED) { keepAliveManager = new KeepAliveManager( - new ClientKeepAlivePinger(this), eventLoop, keepAliveTimeNanos, keepAliveTimeoutNanos, - keepAliveWithoutCalls); + new ClientKeepAlivePinger(this), eventLoop, keepAliveTimeNanos, + keepAliveTimeoutNanos, keepAliveWithoutCalls); } handler = NettyClientHandler.newHandler( @@ -291,7 +294,8 @@ public void run() { // could use GlobalEventExecutor (which is what regFuture would use for notifying // listeners in this case), but avoiding on-demand thread creation in an error case seems // a good idea and is probably clearer threading. - lifecycleManager.notifyTerminated(statusExplainingWhyTheChannelIsNull); + lifecycleManager.notifyTerminated(statusExplainingWhyTheChannelIsNull, + SimpleDisconnectError.UNKNOWN); } }; } @@ -323,7 +327,8 @@ public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { // Need to notify of this failure, because NettyClientHandler may not have been added to // the pipeline before the error occurred. - lifecycleManager.notifyTerminated(Utils.statusFromThrowable(future.cause())); + lifecycleManager.notifyTerminated(Utils.statusFromThrowable(future.cause()), + SimpleDisconnectError.UNKNOWN); } } }); @@ -345,6 +350,11 @@ public void operationComplete(ChannelFuture future) throws Exception { @Override public void shutdown(Status reason) { + shutdown(reason, SimpleDisconnectError.UNKNOWN); + } + + @Override + public void shutdown(Status reason, DisconnectError disconnectError) { // start() could have failed if (channel == null) { return; @@ -357,12 +367,17 @@ public void shutdown(Status reason) { @Override public void shutdownNow(final Status reason) { + shutdownNow(reason, SimpleDisconnectError.UNKNOWN); + } + + @Override + public void shutdownNow(final Status reason, DisconnectError disconnectError) { // Notifying of termination is automatically done when the channel closes. if (channel != null && channel.isOpen()) { handler.getWriteQueue().enqueue(new Runnable() { @Override public void run() { - lifecycleManager.notifyShutdown(reason); + lifecycleManager.notifyShutdown(reason, disconnectError); channel.write(new ForcefulCloseCommand(reason)); } }, true); diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java index c5289296ed0..53598727efd 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java @@ -67,6 +67,7 @@ import io.grpc.internal.GrpcUtil; import io.grpc.internal.KeepAliveManager; import io.grpc.internal.ManagedClientTransport; +import io.grpc.internal.SimpleDisconnectError; import io.grpc.internal.StatsTraceContext; import io.grpc.internal.StreamListener; import io.grpc.internal.TransportTracer; @@ -764,7 +765,7 @@ public void exhaustedStreamsShouldFail() throws Exception { public void nonExistentStream() throws Exception { Status status = Status.INTERNAL.withDescription("zz"); - lifecycleManager.notifyShutdown(status); + lifecycleManager.notifyShutdown(status, SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); // Stream creation can race with the transport shutting down, with the create command already // enqueued. ChannelFuture future1 = createStream(); diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java index 55abe29e93a..177fa1e9760 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java @@ -63,6 +63,7 @@ import io.grpc.internal.ClientStream; import io.grpc.internal.ClientStreamListener; import io.grpc.internal.ClientTransport; +import io.grpc.internal.DisconnectError; import io.grpc.internal.FakeClock; import io.grpc.internal.FixedObjectPool; import io.grpc.internal.GrpcUtil; @@ -1462,7 +1463,7 @@ public FakeClientTransportListener(SettableFuture connected) { } @Override - public void transportShutdown(Status s) {} + public void transportShutdown(Status s, DisconnectError e) {} @Override public void transportTerminated() {} diff --git a/netty/src/test/java/io/grpc/netty/NettyTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyTransportTest.java index b1c89e22f93..b779dfbe980 100644 --- a/netty/src/test/java/io/grpc/netty/NettyTransportTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyTransportTest.java @@ -26,6 +26,7 @@ import io.grpc.Status; import io.grpc.internal.AbstractTransportTest; import io.grpc.internal.ClientTransportFactory; +import io.grpc.internal.DisconnectError; import io.grpc.internal.FakeClock; import io.grpc.internal.InternalServer; import io.grpc.internal.ManagedClientTransport; @@ -127,7 +128,7 @@ public void channelHasUnresolvedHostname() throws Exception { .setChannelLogger(logger), logger); Runnable runnable = transport.start(new ManagedClientTransport.Listener() { @Override - public void transportShutdown(Status s) { + public void transportShutdown(Status s, DisconnectError e) { future.set(s); } diff --git a/netty/src/test/java/io/grpc/netty/ProtocolNegotiatorsTest.java b/netty/src/test/java/io/grpc/netty/ProtocolNegotiatorsTest.java index 638fe960a32..5142bf44545 100644 --- a/netty/src/test/java/io/grpc/netty/ProtocolNegotiatorsTest.java +++ b/netty/src/test/java/io/grpc/netty/ProtocolNegotiatorsTest.java @@ -55,6 +55,7 @@ import io.grpc.TlsChannelCredentials; import io.grpc.TlsServerCredentials; import io.grpc.internal.ClientTransportFactory; +import io.grpc.internal.DisconnectError; import io.grpc.internal.GrpcAttributes; import io.grpc.internal.InternalServer; import io.grpc.internal.ManagedClientTransport; @@ -408,7 +409,7 @@ private Object expectHandshake( } else { ArgumentCaptor captor = ArgumentCaptor.forClass(Status.class); verify(clientTransportListener, timeout(TIMEOUT_SECONDS * 1000)) - .transportShutdown(captor.capture()); + .transportShutdown(captor.capture(), any(DisconnectError.class)); result = captor.getValue(); } diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index e76799845c7..8777b013a6c 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -48,6 +48,8 @@ import io.grpc.internal.CertificateUtils; import io.grpc.internal.ClientStreamListener.RpcProgress; import io.grpc.internal.ConnectionClientTransport; +import io.grpc.internal.DisconnectError; +import io.grpc.internal.GoAwayDisconnectError; import io.grpc.internal.GrpcAttributes; import io.grpc.internal.GrpcUtil; import io.grpc.internal.Http2Ping; @@ -56,6 +58,7 @@ import io.grpc.internal.KeepAliveManager.ClientKeepAlivePinger; import io.grpc.internal.NoopSslSession; import io.grpc.internal.SerializingExecutor; +import io.grpc.internal.SimpleDisconnectError; import io.grpc.internal.StatsTraceContext; import io.grpc.internal.TransportTracer; import io.grpc.okhttp.ExceptionHandlingFrameWriter.TransportExceptionHandler; @@ -129,7 +132,7 @@ * A okhttp-based {@link ConnectionClientTransport} implementation. */ class OkHttpClientTransport implements ConnectionClientTransport, TransportExceptionHandler, - OutboundFlowController.Transport { + OutboundFlowController.Transport, ClientKeepAlivePinger.TransportWithDisconnectReason { private static final Map ERROR_CODE_TO_STATUS = buildErrorCodeToStatusMap(); private static final Logger log = Logger.getLogger(OkHttpClientTransport.class.getName()); private static final String GRPC_ENABLE_PER_RPC_AUTHORITY_CHECK = @@ -990,13 +993,31 @@ public void shutdown(Status reason) { } goAwayStatus = reason; - listener.transportShutdown(goAwayStatus); + listener.transportShutdown(goAwayStatus, SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); + stopIfNecessary(); + } + } + + @Override + public void shutdown(Status reason, DisconnectError disconnectError) { + synchronized (lock) { + if (goAwayStatus != null) { + return; + } + + goAwayStatus = reason; + listener.transportShutdown(goAwayStatus, disconnectError); stopIfNecessary(); } } @Override public void shutdownNow(Status reason) { + shutdownNow(reason, SimpleDisconnectError.UNKNOWN); + } + + @Override + public void shutdownNow(Status reason, DisconnectError disconnectError) { shutdown(reason); synchronized (lock) { Iterator> it = streams.entrySet().iterator(); @@ -1086,7 +1107,13 @@ private void startGoAway(int lastKnownStreamId, ErrorCode errorCode, Status stat synchronized (lock) { if (goAwayStatus == null) { goAwayStatus = status; - listener.transportShutdown(status); + GrpcUtil.Http2Error http2Error; + if (errorCode == null) { + http2Error = GrpcUtil.Http2Error.NO_ERROR; + } else { + http2Error = GrpcUtil.Http2Error.forCode(errorCode.httpCode); + } + listener.transportShutdown(status, new GoAwayDisconnectError(http2Error)); } if (errorCode != null && !goAwaySent) { // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java index 99f430be009..41ad7607292 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java @@ -71,9 +71,12 @@ import io.grpc.internal.ClientStream; import io.grpc.internal.ClientStreamListener; import io.grpc.internal.ClientTransport; +import io.grpc.internal.DisconnectError; import io.grpc.internal.FakeClock; +import io.grpc.internal.GoAwayDisconnectError; import io.grpc.internal.GrpcUtil; import io.grpc.internal.ManagedClientTransport; +import io.grpc.internal.SimpleDisconnectError; import io.grpc.okhttp.OkHttpClientTransport.ClientFrameHandler; import io.grpc.okhttp.OkHttpFrameLogger.Direction; import io.grpc.okhttp.internal.Protocol; @@ -280,7 +283,8 @@ public void testTransportExecutorWithTooFewThreads() throws Exception { null); clientTransport.start(transportListener); ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); - verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(statusCaptor.capture()); + verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(statusCaptor.capture(), + eq(new GoAwayDisconnectError(GrpcUtil.Http2Error.INTERNAL_ERROR))); Status capturedStatus = statusCaptor.getValue(); assertEquals("Timed out waiting for second handshake thread. " + "The transport executor pool may have run out of threads", @@ -481,7 +485,8 @@ public void nextFrameThrowIoException() throws Exception { assertEquals(NETWORK_ISSUE_MESSAGE, listener1.status.getCause().getMessage()); assertEquals(Status.INTERNAL.getCode(), listener2.status.getCode()); assertEquals(NETWORK_ISSUE_MESSAGE, listener2.status.getCause().getMessage()); - verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(isA(Status.class)); + verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(isA(Status.class), + any(DisconnectError.class)); verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); shutdownAndVerify(); } @@ -507,7 +512,8 @@ public void nextFrameThrowsError() throws Exception { assertEquals(0, activeStreamCount()); assertEquals(Status.INTERNAL.getCode(), listener.status.getCode()); assertEquals(ERROR_MESSAGE, listener.status.getCause().getMessage()); - verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(isA(Status.class)); + verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(isA(Status.class), + any(DisconnectError.class)); verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); shutdownAndVerify(); } @@ -523,7 +529,8 @@ public void nextFrameReturnFalse() throws Exception { frameReader.nextFrameAtEndOfStream(); listener.waitUntilStreamClosed(); assertEquals(Status.UNAVAILABLE.getCode(), listener.status.getCode()); - verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(isA(Status.class)); + verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(isA(Status.class), + any(DisconnectError.class)); verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); shutdownAndVerify(); } @@ -565,7 +572,8 @@ public void receivedHeadersForInvalidStreamShouldKillConnection() throws Excepti HeadersMode.HTTP_20_HEADERS); verify(frameWriter, timeout(TIME_OUT_MS)) .goAway(eq(0), eq(ErrorCode.PROTOCOL_ERROR), any(byte[].class)); - verify(transportListener).transportShutdown(isA(Status.class)); + verify(transportListener).transportShutdown(isA(Status.class), + any(DisconnectError.class)); verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); shutdownAndVerify(); } @@ -577,7 +585,8 @@ public void receivedDataForInvalidStreamShouldKillConnection() throws Exception 1000, 1000); verify(frameWriter, timeout(TIME_OUT_MS)) .goAway(eq(0), eq(ErrorCode.PROTOCOL_ERROR), any(byte[].class)); - verify(transportListener).transportShutdown(isA(Status.class)); + verify(transportListener).transportShutdown(isA(Status.class), + any(DisconnectError.class)); verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); shutdownAndVerify(); } @@ -1173,7 +1182,8 @@ public void stopNormally() throws Exception { clientTransport.shutdown(SHUTDOWN_REASON); assertEquals(2, activeStreamCount()); - verify(transportListener).transportShutdown(same(SHUTDOWN_REASON)); + verify(transportListener).transportShutdown(same(SHUTDOWN_REASON), + eq(SimpleDisconnectError.SUBCHANNEL_SHUTDOWN)); stream1.cancel(Status.CANCELLED); stream2.cancel(Status.CANCELLED); @@ -1207,7 +1217,8 @@ public void receiveGoAway() throws Exception { frameHandler().goAway(3, ErrorCode.CANCEL, ByteString.EMPTY); // Transport should be in STOPPING state. - verify(transportListener).transportShutdown(isA(Status.class)); + verify(transportListener).transportShutdown(isA(Status.class), + any(DisconnectError.class)); verify(transportListener, never()).transportTerminated(); // Stream 2 should be closed. @@ -1277,7 +1288,8 @@ public void streamIdExhausted() throws Exception { // Should only have the first message delivered. assertEquals(message, listener.messages.get(0)); verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(startId), eq(ErrorCode.CANCEL)); - verify(transportListener).transportShutdown(isA(Status.class)); + verify(transportListener).transportShutdown(isA(Status.class), + any(DisconnectError.class)); verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); shutdownAndVerify(); } @@ -1588,7 +1600,8 @@ public void receiveDataForUnknownStreamUpdateConnectionWindow() throws Exception (int) buffer.size()); verify(frameWriter, timeout(TIME_OUT_MS)) .goAway(eq(0), eq(ErrorCode.PROTOCOL_ERROR), any(byte[].class)); - verify(transportListener).transportShutdown(isA(Status.class)); + verify(transportListener).transportShutdown(isA(Status.class), + any(DisconnectError.class)); verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); shutdownAndVerify(); } @@ -1608,7 +1621,8 @@ public void receiveWindowUpdateForUnknownStream() throws Exception { frameHandler().windowUpdate(5, 73); verify(frameWriter, timeout(TIME_OUT_MS)) .goAway(eq(0), eq(ErrorCode.PROTOCOL_ERROR), any(byte[].class)); - verify(transportListener).transportShutdown(isA(Status.class)); + verify(transportListener).transportShutdown(isA(Status.class), + any(DisconnectError.class)); verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); shutdownAndVerify(); } @@ -1821,11 +1835,16 @@ public void unreachableServer() throws Exception { ManagedClientTransport.Listener listener = mock(ManagedClientTransport.Listener.class); clientTransport.start(listener); - ArgumentCaptor captor = ArgumentCaptor.forClass(Status.class); - verify(listener, timeout(TIME_OUT_MS)).transportShutdown(captor.capture()); - Status status = captor.getValue(); + ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); + ArgumentCaptor errorCaptor = ArgumentCaptor.forClass(DisconnectError.class); + + verify(listener, timeout(TIME_OUT_MS)).transportShutdown(statusCaptor.capture(), + errorCaptor.capture()); + Status status = statusCaptor.getValue(); + DisconnectError error = errorCaptor.getValue(); assertEquals(Status.UNAVAILABLE.getCode(), status.getCode()); assertTrue(status.getCause().toString(), status.getCause() instanceof IOException); + assertEquals(new GoAwayDisconnectError(GrpcUtil.Http2Error.INTERNAL_ERROR), error); MockStreamListener streamListener = new MockStreamListener(); clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers) @@ -1852,10 +1871,14 @@ public void customSocketFactory() throws Exception { ManagedClientTransport.Listener listener = mock(ManagedClientTransport.Listener.class); clientTransport.start(listener); - ArgumentCaptor captor = ArgumentCaptor.forClass(Status.class); - verify(listener, timeout(TIME_OUT_MS)).transportShutdown(captor.capture()); - Status status = captor.getValue(); + ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); + ArgumentCaptor errorCaptor = ArgumentCaptor.forClass(DisconnectError.class); + verify(listener, timeout(TIME_OUT_MS)).transportShutdown(statusCaptor.capture(), + errorCaptor.capture()); + Status status = statusCaptor.getValue(); + DisconnectError error = errorCaptor.getValue(); assertEquals(Status.UNAVAILABLE.getCode(), status.getCode()); + assertEquals(new GoAwayDisconnectError(GrpcUtil.Http2Error.INTERNAL_ERROR), error); assertSame(exception, status.getCause()); } @@ -1903,7 +1926,8 @@ public void proxy_200() throws Exception { }); sock.getOutputStream().flush(); - verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(isA(Status.class)); + verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(isA(Status.class), + any(DisconnectError.class)); while (sock.getInputStream().read() != -1) {} verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); sock.close(); @@ -1943,17 +1967,21 @@ public void proxy_500() throws Exception { assertEquals(-1, sock.getInputStream().read()); - ArgumentCaptor captor = ArgumentCaptor.forClass(Status.class); - verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(captor.capture()); - Status error = captor.getValue(); - assertTrue("Status didn't contain error code: " + captor.getValue(), - error.getDescription().contains("500")); - assertTrue("Status didn't contain error description: " + captor.getValue(), - error.getDescription().contains("OH NO")); - assertTrue("Status didn't contain error text: " + captor.getValue(), - error.getDescription().contains(errorText)); - assertEquals("Not UNAVAILABLE: " + captor.getValue(), - Status.UNAVAILABLE.getCode(), error.getCode()); + ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); + ArgumentCaptor errorCaptor = ArgumentCaptor.forClass(DisconnectError.class); + verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(statusCaptor.capture(), + errorCaptor.capture()); + Status status = statusCaptor.getValue(); + DisconnectError error = errorCaptor.getValue(); + assertTrue("Status didn't contain error code: " + statusCaptor.getValue(), + status.getDescription().contains("500")); + assertTrue("Status didn't contain error description: " + statusCaptor.getValue(), + status.getDescription().contains("OH NO")); + assertTrue("Status didn't contain error text: " + statusCaptor.getValue(), + status.getDescription().contains(errorText)); + assertEquals("Not UNAVAILABLE: " + statusCaptor.getValue(), + Status.UNAVAILABLE.getCode(), status.getCode()); + assertEquals(new GoAwayDisconnectError(GrpcUtil.Http2Error.INTERNAL_ERROR), error); sock.close(); verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); } @@ -1980,13 +2008,17 @@ public void proxy_immediateServerClose() throws Exception { serverSocket.close(); sock.close(); - ArgumentCaptor captor = ArgumentCaptor.forClass(Status.class); - verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(captor.capture()); - Status error = captor.getValue(); - assertTrue("Status didn't contain proxy: " + captor.getValue(), - error.getDescription().contains("proxy")); - assertEquals("Not UNAVAILABLE: " + captor.getValue(), - Status.UNAVAILABLE.getCode(), error.getCode()); + ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); + ArgumentCaptor errorCaptor = ArgumentCaptor.forClass(DisconnectError.class); + verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(statusCaptor.capture(), + errorCaptor.capture()); + Status status = statusCaptor.getValue(); + DisconnectError error = errorCaptor.getValue(); + assertTrue("Status didn't contain proxy: " + statusCaptor.getValue(), + status.getDescription().contains("proxy")); + assertEquals("Not UNAVAILABLE: " + statusCaptor.getValue(), + Status.UNAVAILABLE.getCode(), status.getCode()); + assertEquals(new GoAwayDisconnectError(GrpcUtil.Http2Error.INTERNAL_ERROR), error); verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); } @@ -2017,7 +2049,8 @@ public void proxy_serverHangs() throws Exception { assertEquals("Host: theservice:80", reader.readLine()); while (!"".equals(reader.readLine())) {} - verify(transportListener, timeout(200)).transportShutdown(any(Status.class)); + verify(transportListener, timeout(200)).transportShutdown(any(Status.class), + any(DisconnectError.class)); verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); sock.close(); }