From a06568cdce08da442f68879be9e73cf2e7e29158 Mon Sep 17 00:00:00 2001 From: AgraVator Date: Mon, 7 Jul 2025 11:47:08 +0530 Subject: [PATCH 01/10] otel: subchannel metrics --- .../io/grpc/internal/InternalSubchannel.java | 35 +++++- .../io/grpc/internal/ManagedChannelImpl.java | 13 ++- .../grpc/internal/OtelMetricsAttributes.java | 69 ++++++++++++ .../io/grpc/internal/SubchannelMetrics.java | 105 ++++++++++++++++++ .../grpc/internal/InternalSubchannelTest.java | 9 +- .../internal/OpenTelemetryConstants.java | 6 + 6 files changed, 233 insertions(+), 4 deletions(-) create mode 100644 core/src/main/java/io/grpc/internal/OtelMetricsAttributes.java create mode 100644 core/src/main/java/io/grpc/internal/SubchannelMetrics.java diff --git a/core/src/main/java/io/grpc/internal/InternalSubchannel.java b/core/src/main/java/io/grpc/internal/InternalSubchannel.java index a27e46eaf60..35f8719bbba 100644 --- a/core/src/main/java/io/grpc/internal/InternalSubchannel.java +++ b/core/src/main/java/io/grpc/internal/InternalSubchannel.java @@ -48,6 +48,7 @@ import io.grpc.LoadBalancer; import io.grpc.Metadata; import io.grpc.MethodDescriptor; +import io.grpc.MetricRecorder; import io.grpc.Status; import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext.ScheduledHandle; @@ -160,6 +161,11 @@ protected void handleNotInUse() { private Status shutdownReason; private volatile Attributes connectedAddressAttributes; + private final SubchannelMetrics subchannelMetrics; + private final String target; + private final String backendService; + private final String locality; + private final String securityLevel; InternalSubchannel(LoadBalancer.CreateSubchannelArgs args, String authority, String userAgent, BackoffPolicy.Provider backoffPolicyProvider, @@ -168,7 +174,9 @@ protected void handleNotInUse() { Supplier stopwatchSupplier, SynchronizationContext syncContext, Callback callback, InternalChannelz channelz, CallTracer callsTracer, ChannelTracer channelTracer, InternalLogId logId, - ChannelLogger channelLogger, List transportFilters) { + ChannelLogger channelLogger, List transportFilters, + String target, String backendService, String locality, String securityLevel, + MetricRecorder metricRecorder) { List addressGroups = args.getAddresses(); Preconditions.checkNotNull(addressGroups, "addressGroups"); Preconditions.checkArgument(!addressGroups.isEmpty(), "addressGroups is empty"); @@ -192,6 +200,11 @@ protected void handleNotInUse() { this.channelLogger = Preconditions.checkNotNull(channelLogger, "channelLogger"); this.transportFilters = transportFilters; this.reconnectDisabled = args.getOption(LoadBalancer.DISABLE_SUBCHANNEL_RECONNECT_KEY); + this.target = target; + this.backendService = backendService; + this.locality = locality; + this.securityLevel = securityLevel; + this.subchannelMetrics = new SubchannelMetrics(metricRecorder); } ChannelLogger getChannelLogger() { @@ -579,6 +592,8 @@ public Attributes filterTransport(Attributes attributes) { @Override public void transportReady() { channelLogger.log(ChannelLogLevel.INFO, "READY"); + subchannelMetrics.recordConnectionAttemptSucceeded( + buildLabelSet(null, extractSecurityLevel())); syncContext.execute(new Runnable() { @Override public void run() { @@ -608,6 +623,7 @@ public void transportShutdown(final Status s) { channelLogger.log( ChannelLogLevel.INFO, "{0} SHUTDOWN with {1}", transport.getLogId(), printShortStatus(s)); shutdownInitiated = true; + subchannelMetrics.recordConnectionAttemptFailed(buildLabelSet("Peer Pressure", null)); syncContext.execute(new Runnable() { @Override public void run() { @@ -648,6 +664,8 @@ public void transportTerminated() { for (ClientTransportFilter filter : transportFilters) { filter.transportTerminated(transport.getAttributes()); } + subchannelMetrics.recordDisconnection(buildLabelSet("Peer Pressure", + null)); syncContext.execute(new Runnable() { @Override public void run() { @@ -658,6 +676,10 @@ public void run() { } }); } + + private String extractSecurityLevel() { + return "Hold the door!"; + } } // All methods are called in syncContext @@ -817,6 +839,17 @@ private String printShortStatus(Status status) { return buffer.toString(); } + private OtelMetricsAttributes buildLabelSet(String disconnectError, String secLevel) { + return new OtelMetricsAttributes( + target, + backendService, + locality, + disconnectError, + secLevel != null ? secLevel : securityLevel + ); + } + + @VisibleForTesting static final class TransportLogger extends ChannelLogger { // Changed just after construction to break a cyclic dependency. diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 16b8adbd347..aaaaf65de05 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -1464,7 +1464,12 @@ void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { subchannelTracer, subchannelLogId, subchannelLogger, - transportFilters); + transportFilters, + target, + "", + "", + "", + lbHelper.getMetricRecorder()); oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder() .setDescription("Child Subchannel created") .setSeverity(ChannelTrace.Event.Severity.CT_INFO) @@ -1895,7 +1900,11 @@ void onNotInUse(InternalSubchannel is) { subchannelTracer, subchannelLogId, subchannelLogger, - transportFilters); + transportFilters, target, + "", + "", + "", + lbHelper.getMetricRecorder()); channelTracer.reportEvent(new ChannelTrace.Event.Builder() .setDescription("Child Subchannel started") diff --git a/core/src/main/java/io/grpc/internal/OtelMetricsAttributes.java b/core/src/main/java/io/grpc/internal/OtelMetricsAttributes.java new file mode 100644 index 00000000000..494182256e0 --- /dev/null +++ b/core/src/main/java/io/grpc/internal/OtelMetricsAttributes.java @@ -0,0 +1,69 @@ +/* + * Copyright 2025 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.internal; + + +import io.grpc.Attributes; + +class OtelMetricsAttributes { + final String target; + final String backendService; + final String locality; + final String disconnectError; + final String securityLevel; + + public OtelMetricsAttributes(String target, String backendService, String locality, + String disconnectError, String securityLevel) { + this.target = target; + this.backendService = backendService; + this.locality = locality; + this.disconnectError = disconnectError; + this.securityLevel = securityLevel; + } + + public Attributes toOtelMetricsAttributes() { + Attributes attributes = + Attributes.EMPTY; + + if (target != null) { + attributes.toBuilder() + .set(Attributes.Key.create("grpc.target"), target) + .build(); + } + if (backendService != null) { + attributes.toBuilder() + .set(Attributes.Key.create("grpc.lb.backend_service"), backendService) + .build(); + } + if (locality != null) { + attributes.toBuilder() + .set(Attributes.Key.create("grpc.lb.locality"), locality) + .build(); + } + if (disconnectError != null) { + attributes.toBuilder() + .set(Attributes.Key.create("grpc.disconnect_error"), disconnectError) + .build(); + } + if (securityLevel != null) { + attributes.toBuilder() + .set(Attributes.Key.create("grpc.security_level"), securityLevel) + .build(); + } + return attributes; + } +} \ No newline at end of file diff --git a/core/src/main/java/io/grpc/internal/SubchannelMetrics.java b/core/src/main/java/io/grpc/internal/SubchannelMetrics.java new file mode 100644 index 00000000000..94c4d2a3883 --- /dev/null +++ b/core/src/main/java/io/grpc/internal/SubchannelMetrics.java @@ -0,0 +1,105 @@ +/* + * Copyright 2025 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.internal; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import io.grpc.LongCounterMetricInstrument; +import io.grpc.MetricInstrumentRegistry; +import io.grpc.MetricRecorder; + +public final class SubchannelMetrics { + + private static final LongCounterMetricInstrument disconnections; + private static final LongCounterMetricInstrument connectionAttemptsSucceeded; + private static final LongCounterMetricInstrument connectionAttemptsFailed; + private static final LongCounterMetricInstrument openConnections; + private final MetricRecorder metricRecorder; + + public SubchannelMetrics(MetricRecorder metricRecorder) { + this.metricRecorder = metricRecorder; + } + + static { + MetricInstrumentRegistry metricInstrumentRegistry + = MetricInstrumentRegistry.getDefaultRegistry(); + disconnections = metricInstrumentRegistry.registerLongCounter( + "grpc.subchannel.disconnections1", + "EXPERIMENTAL. Number of times the selected subchannel becomes disconnected", + "{disconnection}", + Lists.newArrayList("grpc.target"), + Lists.newArrayList("grpc.lb.backend_service", "grpc.lb.locality", "grpc.disconnect_error"), + false + ); + + connectionAttemptsSucceeded = metricInstrumentRegistry.registerLongCounter( + "grpc.subchannel.connection_attempts_succeeded", + "EXPERIMENTAL. Number of successful connection attempts", + "{attempt}", + Lists.newArrayList("grpc.target"), + Lists.newArrayList("grpc.lb.backend_service", "grpc.lb.locality"), + false + ); + + connectionAttemptsFailed = metricInstrumentRegistry.registerLongCounter( + "grpc.subchannel.connection_attempts_failed", + "EXPERIMENTAL. Number of failed connection attempts", + "{attempt}", + Lists.newArrayList("grpc.target"), + Lists.newArrayList("grpc.lb.backend_service", "grpc.lb.locality"), + false + ); + + openConnections = metricInstrumentRegistry.registerLongCounter( + "grpc.subchannel.open_connections", + "EXPERIMENTAL. Number of open connections.", + "{connection}", + Lists.newArrayList("grpc.target"), + Lists.newArrayList("grpc.security_level", "grpc.lb.backend_service", "grpc.lb.locality"), + false + ); + } + + public void recordConnectionAttemptSucceeded(OtelMetricsAttributes labelSet) { + metricRecorder + .addLongCounter(connectionAttemptsSucceeded, 1, + ImmutableList.of(labelSet.target), + ImmutableList.of(labelSet.backendService, labelSet.locality)); + metricRecorder + .addLongCounter(openConnections, 1, + ImmutableList.of(labelSet.target), + ImmutableList.of(labelSet.securityLevel, labelSet.backendService, labelSet.locality)); + } + + public void recordConnectionAttemptFailed(OtelMetricsAttributes labelSet) { + metricRecorder + .addLongCounter(connectionAttemptsFailed, 1, + ImmutableList.of(labelSet.target), + ImmutableList.of(labelSet.backendService, labelSet.locality)); + } + + public void recordDisconnection(OtelMetricsAttributes labelSet) { + metricRecorder + .addLongCounter(disconnections, 1, + ImmutableList.of(labelSet.target), + ImmutableList.of(labelSet.backendService, labelSet.locality, labelSet.disconnectError)); + metricRecorder + .addLongCounter(openConnections, -11, + ImmutableList.of(labelSet.target), + ImmutableList.of(labelSet.securityLevel, labelSet.backendService, labelSet.locality)); + } +} diff --git a/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java b/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java index bed722f5f3a..49918262102 100644 --- a/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java +++ b/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java @@ -48,6 +48,7 @@ import io.grpc.InternalLogId; import io.grpc.InternalWithLogId; import io.grpc.LoadBalancer; +import io.grpc.MetricRecorder; import io.grpc.Status; import io.grpc.SynchronizationContext; import io.grpc.internal.InternalSubchannel.CallTracingTransport; @@ -1446,7 +1447,13 @@ private void createInternalSubchannel(boolean reconnectDisabled, subchannelTracer, logId, new ChannelLoggerImpl(subchannelTracer, fakeClock.getTimeProvider()), - Collections.emptyList()); + Collections.emptyList(), + "", + "", + "", + "", + new MetricRecorder() {} + ); } private void assertNoCallbackInvoke() { diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java index 5214804d369..ef21903c8e7 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java @@ -36,6 +36,12 @@ public final class OpenTelemetryConstants { public static final AttributeKey BACKEND_SERVICE_KEY = AttributeKey.stringKey("grpc.lb.backend_service"); + public static final AttributeKey DISCONNECT_ERROR_KEY = + AttributeKey.stringKey("grpc.disconnect_error"); + + public static final AttributeKey SECURITY_LEVEL_KEY = + AttributeKey.stringKey("grpc.security_level"); + public static final List LATENCY_BUCKETS = ImmutableList.of( 0d, 0.00001d, 0.00005d, 0.0001d, 0.0003d, 0.0006d, 0.0008d, 0.001d, 0.002d, From daf901b1d557edaef6e06ec238b722e6971d4f3c Mon Sep 17 00:00:00 2001 From: AgraVator Date: Mon, 7 Jul 2025 13:24:49 +0530 Subject: [PATCH 02/10] otel: add LongUpDownCounterMetricInstrument --- .../LongUpDownCounterMetricInstrument.java | 32 +++++++++++++++ .../io/grpc/MetricInstrumentRegistry.java | 41 +++++++++++++++++++ api/src/main/java/io/grpc/MetricRecorder.java | 25 ++++++++++- api/src/main/java/io/grpc/MetricSink.java | 18 +++++++- .../io/grpc/internal/SubchannelMetrics.java | 9 ++-- .../OpenTelemetryMetricSink.java | 18 ++++++++ 6 files changed, 136 insertions(+), 7 deletions(-) create mode 100644 api/src/main/java/io/grpc/LongUpDownCounterMetricInstrument.java diff --git a/api/src/main/java/io/grpc/LongUpDownCounterMetricInstrument.java b/api/src/main/java/io/grpc/LongUpDownCounterMetricInstrument.java new file mode 100644 index 00000000000..07e099cde5d --- /dev/null +++ b/api/src/main/java/io/grpc/LongUpDownCounterMetricInstrument.java @@ -0,0 +1,32 @@ +/* + * Copyright 2025 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc; + +import java.util.List; + +/** + * Represents a long-valued up down counter metric instrument. + */ +@Internal +public final class LongUpDownCounterMetricInstrument extends PartialMetricInstrument { + public LongUpDownCounterMetricInstrument(int index, String name, String description, String unit, + List requiredLabelKeys, + List optionalLabelKeys, + boolean enableByDefault) { + super(index, name, description, unit, requiredLabelKeys, optionalLabelKeys, enableByDefault); + } +} \ No newline at end of file diff --git a/api/src/main/java/io/grpc/MetricInstrumentRegistry.java b/api/src/main/java/io/grpc/MetricInstrumentRegistry.java index 1b33ed17a71..ce0f8f1b5cb 100644 --- a/api/src/main/java/io/grpc/MetricInstrumentRegistry.java +++ b/api/src/main/java/io/grpc/MetricInstrumentRegistry.java @@ -144,6 +144,47 @@ public LongCounterMetricInstrument registerLongCounter(String name, } } + /** + * Registers a new Long Up Down Counter metric instrument. + * + * @param name the name of the metric + * @param description a description of the metric + * @param unit the unit of measurement for the metric + * @param requiredLabelKeys a list of required label keys + * @param optionalLabelKeys a list of optional label keys + * @param enableByDefault whether the metric should be enabled by default + * @return the newly created LongUpDownCounterMetricInstrument + * @throws IllegalStateException if a metric with the same name already exists + */ + public LongUpDownCounterMetricInstrument registerLongUpDownCounter(String name, + String description, + String unit, + List requiredLabelKeys, + List optionalLabelKeys, + boolean enableByDefault) { + checkArgument(!Strings.isNullOrEmpty(name), "missing metric name"); + checkNotNull(description, "description"); + checkNotNull(unit, "unit"); + checkNotNull(requiredLabelKeys, "requiredLabelKeys"); + checkNotNull(optionalLabelKeys, "optionalLabelKeys"); + synchronized (lock) { + if (registeredMetricNames.contains(name)) { + throw new IllegalStateException("Metric with name " + name + " already exists"); + } + int index = nextAvailableMetricIndex; + if (index + 1 == metricInstruments.length) { + resizeMetricInstruments(); + } + LongUpDownCounterMetricInstrument instrument = new LongUpDownCounterMetricInstrument( + index, name, description, unit, requiredLabelKeys, optionalLabelKeys, + enableByDefault); + metricInstruments[index] = instrument; + registeredMetricNames.add(name); + nextAvailableMetricIndex += 1; + return instrument; + } + } + /** * Registers a new Double Histogram metric instrument. * diff --git a/api/src/main/java/io/grpc/MetricRecorder.java b/api/src/main/java/io/grpc/MetricRecorder.java index d418dcbf590..897c28011cd 100644 --- a/api/src/main/java/io/grpc/MetricRecorder.java +++ b/api/src/main/java/io/grpc/MetricRecorder.java @@ -50,7 +50,7 @@ default void addDoubleCounter(DoubleCounterMetricInstrument metricInstrument, do * Adds a value for a long valued counter metric instrument. * * @param metricInstrument The counter metric instrument to add the value against. - * @param value The value to add. + * @param value The value to add. MUST be non-negative. * @param requiredLabelValues A list of required label values for the metric. * @param optionalLabelValues A list of additional, optional label values for the metric. */ @@ -66,6 +66,29 @@ default void addLongCounter(LongCounterMetricInstrument metricInstrument, long v metricInstrument.getOptionalLabelKeys().size()); } + /** + * Adds a value for a long valued up down counter metric instrument. + * + * @param metricInstrument The counter metric instrument to add the value against. + * @param value The value to add. May be positive, negative or zero. + * @param requiredLabelValues A list of required label values for the metric. + * @param optionalLabelValues A list of additional, optional label values for the metric. + */ + default void addLongUpDownCounter(LongUpDownCounterMetricInstrument metricInstrument, + long value, + List requiredLabelValues, + List optionalLabelValues) { + checkArgument(requiredLabelValues != null + && requiredLabelValues.size() == metricInstrument.getRequiredLabelKeys().size(), + "Incorrect number of required labels provided. Expected: %s", + metricInstrument.getRequiredLabelKeys().size()); + checkArgument(optionalLabelValues != null + && optionalLabelValues.size() == metricInstrument.getOptionalLabelKeys().size(), + "Incorrect number of optional labels provided. Expected: %s", + metricInstrument.getOptionalLabelKeys().size()); + } + + /** * Records a value for a double-precision histogram metric instrument. * diff --git a/api/src/main/java/io/grpc/MetricSink.java b/api/src/main/java/io/grpc/MetricSink.java index 0f56b1acb73..ce5d3822520 100644 --- a/api/src/main/java/io/grpc/MetricSink.java +++ b/api/src/main/java/io/grpc/MetricSink.java @@ -65,12 +65,26 @@ default void addDoubleCounter(DoubleCounterMetricInstrument metricInstrument, do * Adds a value for a long valued counter metric associated with specified metric instrument. * * @param metricInstrument The counter metric instrument identifies metric measure to add. - * @param value The value to record. + * @param value The value to record. MUST be non-negative. * @param requiredLabelValues A list of required label values for the metric. * @param optionalLabelValues A list of additional, optional label values for the metric. */ default void addLongCounter(LongCounterMetricInstrument metricInstrument, long value, - List requiredLabelValues, List optionalLabelValues) { + List requiredLabelValues, List optionalLabelValues) { + } + + /** + * Adds a value for a long valued up down counter metric associated with specified metric + * instrument. + * + * @param metricInstrument The counter metric instrument identifies metric measure to add. + * @param value The value to record. May be positive, negative or zero. + * @param requiredLabelValues A list of required label values for the metric. + * @param optionalLabelValues A list of additional, optional label values for the metric. + */ + default void addLongUpDownCounter(LongUpDownCounterMetricInstrument metricInstrument, long value, + List requiredLabelValues, + List optionalLabelValues) { } /** diff --git a/core/src/main/java/io/grpc/internal/SubchannelMetrics.java b/core/src/main/java/io/grpc/internal/SubchannelMetrics.java index 94c4d2a3883..36f47affba1 100644 --- a/core/src/main/java/io/grpc/internal/SubchannelMetrics.java +++ b/core/src/main/java/io/grpc/internal/SubchannelMetrics.java @@ -19,6 +19,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import io.grpc.LongCounterMetricInstrument; +import io.grpc.LongUpDownCounterMetricInstrument; import io.grpc.MetricInstrumentRegistry; import io.grpc.MetricRecorder; @@ -27,7 +28,7 @@ public final class SubchannelMetrics { private static final LongCounterMetricInstrument disconnections; private static final LongCounterMetricInstrument connectionAttemptsSucceeded; private static final LongCounterMetricInstrument connectionAttemptsFailed; - private static final LongCounterMetricInstrument openConnections; + private static final LongUpDownCounterMetricInstrument openConnections; private final MetricRecorder metricRecorder; public SubchannelMetrics(MetricRecorder metricRecorder) { @@ -64,7 +65,7 @@ public SubchannelMetrics(MetricRecorder metricRecorder) { false ); - openConnections = metricInstrumentRegistry.registerLongCounter( + openConnections = metricInstrumentRegistry.registerLongUpDownCounter( "grpc.subchannel.open_connections", "EXPERIMENTAL. Number of open connections.", "{connection}", @@ -80,7 +81,7 @@ public void recordConnectionAttemptSucceeded(OtelMetricsAttributes labelSet) { ImmutableList.of(labelSet.target), ImmutableList.of(labelSet.backendService, labelSet.locality)); metricRecorder - .addLongCounter(openConnections, 1, + .addLongUpDownCounter(openConnections, 1, ImmutableList.of(labelSet.target), ImmutableList.of(labelSet.securityLevel, labelSet.backendService, labelSet.locality)); } @@ -98,7 +99,7 @@ public void recordDisconnection(OtelMetricsAttributes labelSet) { ImmutableList.of(labelSet.target), ImmutableList.of(labelSet.backendService, labelSet.locality, labelSet.disconnectError)); metricRecorder - .addLongCounter(openConnections, -11, + .addLongUpDownCounter(openConnections, -1, ImmutableList.of(labelSet.target), ImmutableList.of(labelSet.securityLevel, labelSet.backendService, labelSet.locality)); } diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricSink.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricSink.java index 8f612804436..8fae0b911e9 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricSink.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricSink.java @@ -27,6 +27,7 @@ import io.grpc.LongCounterMetricInstrument; import io.grpc.LongGaugeMetricInstrument; import io.grpc.LongHistogramMetricInstrument; +import io.grpc.LongUpDownCounterMetricInstrument; import io.grpc.MetricInstrument; import io.grpc.MetricSink; import io.opentelemetry.api.common.Attributes; @@ -36,6 +37,7 @@ import io.opentelemetry.api.metrics.DoubleHistogram; import io.opentelemetry.api.metrics.LongCounter; import io.opentelemetry.api.metrics.LongHistogram; +import io.opentelemetry.api.metrics.LongUpDownCounter; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.metrics.ObservableLongMeasurement; import io.opentelemetry.api.metrics.ObservableMeasurement; @@ -117,6 +119,22 @@ public void addLongCounter(LongCounterMetricInstrument metricInstrument, long va counter.add(value, attributes); } + @Override + public void addLongUpDownCounter(LongUpDownCounterMetricInstrument metricInstrument, long value, + List requiredLabelValues, + List optionalLabelValues) { + MeasuresData instrumentData = measures.get(metricInstrument.getIndex()); + if (instrumentData == null) { + // Disabled metric + return; + } + Attributes attributes = createAttributes(metricInstrument.getRequiredLabelKeys(), + metricInstrument.getOptionalLabelKeys(), requiredLabelValues, optionalLabelValues, + instrumentData.getOptionalLabelsBitSet()); + LongUpDownCounter counter = (LongUpDownCounter) instrumentData.getMeasure(); + counter.add(value, attributes); + } + @Override public void recordDoubleHistogram(DoubleHistogramMetricInstrument metricInstrument, double value, List requiredLabelValues, List optionalLabelValues) { From 735665bcdf49dc8875c9d6e6fc8119003474d70b Mon Sep 17 00:00:00 2001 From: AgraVator Date: Mon, 7 Jul 2025 13:25:02 +0530 Subject: [PATCH 03/10] otel: add LongUpDownCounterMetricInstrument --- .../io/grpc/internal/MetricRecorderImpl.java | 29 ++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/io/grpc/internal/MetricRecorderImpl.java b/core/src/main/java/io/grpc/internal/MetricRecorderImpl.java index 452b1c5df07..ded9d5ce589 100644 --- a/core/src/main/java/io/grpc/internal/MetricRecorderImpl.java +++ b/core/src/main/java/io/grpc/internal/MetricRecorderImpl.java @@ -26,6 +26,7 @@ import io.grpc.LongCounterMetricInstrument; import io.grpc.LongGaugeMetricInstrument; import io.grpc.LongHistogramMetricInstrument; +import io.grpc.LongUpDownCounterMetricInstrument; import io.grpc.MetricInstrument; import io.grpc.MetricInstrumentRegistry; import io.grpc.MetricRecorder; @@ -82,7 +83,7 @@ public void addDoubleCounter(DoubleCounterMetricInstrument metricInstrument, dou * Records a long counter value. * * @param metricInstrument the {@link LongCounterMetricInstrument} to record. - * @param value the value to record. + * @param value the value to record. Must be non-negative. * @param requiredLabelValues the required label values for the metric. * @param optionalLabelValues the optional label values for the metric. */ @@ -103,6 +104,32 @@ public void addLongCounter(LongCounterMetricInstrument metricInstrument, long va } } + /** + * Adds a long up down counter value. + * + * @param metricInstrument the {@link io.grpc.LongUpDownCounterMetricInstrument} to record. + * @param value the value to record. May be positive, negative or zero. + * @param requiredLabelValues the required label values for the metric. + * @param optionalLabelValues the optional label values for the metric. + */ + @Override + public void addLongUpDownCounter(LongUpDownCounterMetricInstrument metricInstrument, long value, + List requiredLabelValues, + List optionalLabelValues) { + MetricRecorder.super.addLongUpDownCounter(metricInstrument, value, requiredLabelValues, + optionalLabelValues); + for (MetricSink sink : metricSinks) { + int measuresSize = sink.getMeasuresSize(); + if (measuresSize <= metricInstrument.getIndex()) { + // Measures may need updating in two cases: + // 1. When the sink is initially created with an empty list of measures. + // 2. When new metric instruments are registered, requiring the sink to accommodate them. + sink.updateMeasures(registry.getMetricInstruments()); + } + sink.addLongUpDownCounter(metricInstrument, value, requiredLabelValues, optionalLabelValues); + } + } + /** * Records a double histogram value. * From a664b2fd92c32e29596f43108c1b09bf63eb7248 Mon Sep 17 00:00:00 2001 From: AgraVator Date: Wed, 9 Jul 2025 14:38:13 +0530 Subject: [PATCH 04/10] otel: plumb optional labels other than disconnect_error --- api/src/main/java/io/grpc/LoadBalancer.java | 6 ++ .../io/grpc/internal/InternalSubchannel.java | 55 +++++++++++++------ .../io/grpc/internal/ManagedChannelImpl.java | 8 +-- .../internal/PickFirstLeafLoadBalancer.java | 2 +- .../grpc/internal/InternalSubchannelTest.java | 3 - .../io/grpc/xds/ClusterImplLoadBalancer.java | 2 +- .../grpc/xds/ClusterResolverLoadBalancer.java | 4 +- .../io/grpc/xds/WrrLocalityLoadBalancer.java | 2 +- .../main/java/io/grpc/xds/XdsAttributes.java | 7 --- .../grpc/xds/ClusterImplLoadBalancerTest.java | 2 +- .../grpc/xds/WrrLocalityLoadBalancerTest.java | 2 +- 11 files changed, 53 insertions(+), 40 deletions(-) diff --git a/api/src/main/java/io/grpc/LoadBalancer.java b/api/src/main/java/io/grpc/LoadBalancer.java index d2fd8409e01..433bddc372e 100644 --- a/api/src/main/java/io/grpc/LoadBalancer.java +++ b/api/src/main/java/io/grpc/LoadBalancer.java @@ -135,6 +135,12 @@ public abstract class LoadBalancer { public static final Attributes.Key IS_PETIOLE_POLICY = Attributes.Key.create("io.grpc.IS_PETIOLE_POLICY"); + /** + * The name of the locality that this EquivalentAddressGroup is in. + */ + public static final Attributes.Key ATTR_LOCALITY_NAME = + Attributes.Key.create("io.grpc.lb.locality"); + /** * A picker that always returns an erring pick. * diff --git a/core/src/main/java/io/grpc/internal/InternalSubchannel.java b/core/src/main/java/io/grpc/internal/InternalSubchannel.java index 35f8719bbba..5ae082cbb68 100644 --- a/core/src/main/java/io/grpc/internal/InternalSubchannel.java +++ b/core/src/main/java/io/grpc/internal/InternalSubchannel.java @@ -49,7 +49,9 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.MetricRecorder; +import io.grpc.NameResolver; import io.grpc.Status; +import io.grpc.SecurityLevel; import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext.ScheduledHandle; import java.net.SocketAddress; @@ -163,9 +165,6 @@ protected void handleNotInUse() { private volatile Attributes connectedAddressAttributes; private final SubchannelMetrics subchannelMetrics; private final String target; - private final String backendService; - private final String locality; - private final String securityLevel; InternalSubchannel(LoadBalancer.CreateSubchannelArgs args, String authority, String userAgent, BackoffPolicy.Provider backoffPolicyProvider, @@ -175,7 +174,7 @@ protected void handleNotInUse() { Callback callback, InternalChannelz channelz, CallTracer callsTracer, ChannelTracer channelTracer, InternalLogId logId, ChannelLogger channelLogger, List transportFilters, - String target, String backendService, String locality, String securityLevel, + String target, MetricRecorder metricRecorder) { List addressGroups = args.getAddresses(); Preconditions.checkNotNull(addressGroups, "addressGroups"); @@ -201,9 +200,6 @@ protected void handleNotInUse() { this.transportFilters = transportFilters; this.reconnectDisabled = args.getOption(LoadBalancer.DISABLE_SUBCHANNEL_RECONNECT_KEY); this.target = target; - this.backendService = backendService; - this.locality = locality; - this.securityLevel = securityLevel; this.subchannelMetrics = new SubchannelMetrics(metricRecorder); } @@ -592,8 +588,13 @@ public Attributes filterTransport(Attributes attributes) { @Override public void transportReady() { channelLogger.log(ChannelLogLevel.INFO, "READY"); - subchannelMetrics.recordConnectionAttemptSucceeded( - buildLabelSet(null, extractSecurityLevel())); + subchannelMetrics.recordConnectionAttemptSucceeded(buildLabelSet( + addressIndex.getCurrentEagAttributes().get(NameResolver.ATTR_BACKEND_SERVICE), + addressIndex.getCurrentEagAttributes().get(LoadBalancer.ATTR_LOCALITY_NAME), + null, + extractSecurityLevel( + addressIndex.getCurrentEagAttributes().get(GrpcAttributes.ATTR_SECURITY_LEVEL)) + )); syncContext.execute(new Runnable() { @Override public void run() { @@ -623,7 +624,11 @@ public void transportShutdown(final Status s) { channelLogger.log( ChannelLogLevel.INFO, "{0} SHUTDOWN with {1}", transport.getLogId(), printShortStatus(s)); shutdownInitiated = true; - subchannelMetrics.recordConnectionAttemptFailed(buildLabelSet("Peer Pressure", null)); + subchannelMetrics.recordConnectionAttemptFailed(buildLabelSet( + addressIndex.getCurrentEagAttributes().get(NameResolver.ATTR_BACKEND_SERVICE), + addressIndex.getCurrentEagAttributes().get(LoadBalancer.ATTR_LOCALITY_NAME), + null, null + )); syncContext.execute(new Runnable() { @Override public void run() { @@ -664,8 +669,13 @@ public void transportTerminated() { for (ClientTransportFilter filter : transportFilters) { filter.transportTerminated(transport.getAttributes()); } - subchannelMetrics.recordDisconnection(buildLabelSet("Peer Pressure", - null)); + subchannelMetrics.recordDisconnection(buildLabelSet( + addressIndex.getCurrentEagAttributes().get(NameResolver.ATTR_BACKEND_SERVICE), + addressIndex.getCurrentEagAttributes().get(LoadBalancer.ATTR_LOCALITY_NAME), + "Peer Pressure", + extractSecurityLevel( + addressIndex.getCurrentEagAttributes().get(GrpcAttributes.ATTR_SECURITY_LEVEL)) + )); syncContext.execute(new Runnable() { @Override public void run() { @@ -677,8 +687,20 @@ public void run() { }); } - private String extractSecurityLevel() { - return "Hold the door!"; + private String extractSecurityLevel(SecurityLevel securityLevel) { + if (securityLevel == null) { + return "none"; + } + switch (securityLevel) { + case NONE: + return "none"; + case INTEGRITY: + return "integrity_only"; + case PRIVACY_AND_INTEGRITY: + return "privacy_and_integrity"; + default: + throw new IllegalArgumentException("Unknown SecurityLevel: " + securityLevel); + } } } @@ -839,13 +861,14 @@ private String printShortStatus(Status status) { return buffer.toString(); } - private OtelMetricsAttributes buildLabelSet(String disconnectError, String secLevel) { + private OtelMetricsAttributes buildLabelSet(String backendService, String locality, + String disconnectError, String securityLevel) { return new OtelMetricsAttributes( target, backendService, locality, disconnectError, - secLevel != null ? secLevel : securityLevel + securityLevel ); } diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index aaaaf65de05..78c5181502f 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -415,7 +415,7 @@ void exitIdleMode() { LbHelperImpl lbHelper = new LbHelperImpl(); lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper); // Delay setting lbHelper until fully initialized, since loadBalancerFactory is user code and - // may throw. We don't want to confuse our state, even if we will enter panic mode. + // may throw. We don't want to confuse our state, even if we enter panic mode. this.lbHelper = lbHelper; channelStateManager.gotoState(CONNECTING); @@ -1466,9 +1466,6 @@ void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { subchannelLogger, transportFilters, target, - "", - "", - "", lbHelper.getMetricRecorder()); oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder() .setDescription("Child Subchannel created") @@ -1901,9 +1898,6 @@ void onNotInUse(InternalSubchannel is) { subchannelLogId, subchannelLogger, transportFilters, target, - "", - "", - "", lbHelper.getMetricRecorder()); channelTracer.reportEvent(new ChannelTrace.Event.Builder() diff --git a/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java index bbc144ea775..96e42ac10a4 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java @@ -92,7 +92,7 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { return Status.FAILED_PRECONDITION.withDescription("Already shut down"); } - // Cache whether or not this is a petiole policy, which is based off of an address attribute + // Check weather or not this is a petiole policy, which is based off of an address attribute Boolean isPetiolePolicy = resolvedAddresses.getAttributes().get(IS_PETIOLE_POLICY); this.notAPetiolePolicy = isPetiolePolicy == null || !isPetiolePolicy; diff --git a/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java b/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java index 49918262102..48bb0f97453 100644 --- a/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java +++ b/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java @@ -1449,9 +1449,6 @@ private void createInternalSubchannel(boolean reconnectDisabled, new ChannelLoggerImpl(subchannelTracer, fakeClock.getTimeProvider()), Collections.emptyList(), "", - "", - "", - "", new MetricRecorder() {} ); } diff --git a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java index 034cdee0815..6643a73b23a 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java @@ -305,7 +305,7 @@ private List withAdditionalAttributes( private ClusterLocality createClusterLocalityFromAttributes(Attributes addressAttributes) { Locality locality = addressAttributes.get(XdsAttributes.ATTR_LOCALITY); - String localityName = addressAttributes.get(XdsAttributes.ATTR_LOCALITY_NAME); + String localityName = addressAttributes.get(LoadBalancer.ATTR_LOCALITY_NAME); // Endpoint addresses resolved by ClusterResolverLoadBalancer should always contain // attributes with its locality, including endpoints in LOGICAL_DNS clusters. diff --git a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java index 080760303bf..8cee0483cbb 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java @@ -428,7 +428,7 @@ public void run() { Attributes attr = endpoint.eag().getAttributes().toBuilder() .set(XdsAttributes.ATTR_LOCALITY, locality) - .set(XdsAttributes.ATTR_LOCALITY_NAME, localityName) + .set(LoadBalancer.ATTR_LOCALITY_NAME, localityName) .set(XdsAttributes.ATTR_LOCALITY_WEIGHT, localityLbInfo.localityWeight()) .set(XdsAttributes.ATTR_SERVER_WEIGHT, weight) @@ -679,7 +679,7 @@ public Status onResult2(final ResolutionResult resolutionResult) { String localityName = localityName(LOGICAL_DNS_CLUSTER_LOCALITY); Attributes attr = eag.getAttributes().toBuilder() .set(XdsAttributes.ATTR_LOCALITY, LOGICAL_DNS_CLUSTER_LOCALITY) - .set(XdsAttributes.ATTR_LOCALITY_NAME, localityName) + .set(LoadBalancer.ATTR_LOCALITY_NAME, localityName) .set(XdsAttributes.ATTR_ADDRESS_NAME, dnsHostName) .build(); eag = new EquivalentAddressGroup(eag.getAddresses(), attr); diff --git a/xds/src/main/java/io/grpc/xds/WrrLocalityLoadBalancer.java b/xds/src/main/java/io/grpc/xds/WrrLocalityLoadBalancer.java index ab1abb1da15..c10aecc5107 100644 --- a/xds/src/main/java/io/grpc/xds/WrrLocalityLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/WrrLocalityLoadBalancer.java @@ -74,7 +74,7 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { Map localityWeights = new HashMap<>(); for (EquivalentAddressGroup eag : resolvedAddresses.getAddresses()) { Attributes eagAttrs = eag.getAttributes(); - String locality = eagAttrs.get(XdsAttributes.ATTR_LOCALITY_NAME); + String locality = eagAttrs.get(LoadBalancer.ATTR_LOCALITY_NAME); Integer localityWeight = eagAttrs.get(XdsAttributes.ATTR_LOCALITY_WEIGHT); if (locality == null) { diff --git a/xds/src/main/java/io/grpc/xds/XdsAttributes.java b/xds/src/main/java/io/grpc/xds/XdsAttributes.java index 4a64fdb1453..2e165201e5f 100644 --- a/xds/src/main/java/io/grpc/xds/XdsAttributes.java +++ b/xds/src/main/java/io/grpc/xds/XdsAttributes.java @@ -81,13 +81,6 @@ final class XdsAttributes { static final Attributes.Key ATTR_LOCALITY = Attributes.Key.create("io.grpc.xds.XdsAttributes.locality"); - /** - * The name of the locality that this EquivalentAddressGroup is in. - */ - @EquivalentAddressGroup.Attr - static final Attributes.Key ATTR_LOCALITY_NAME = - Attributes.Key.create("io.grpc.xds.XdsAttributes.localityName"); - /** * Endpoint weight for load balancing purposes. */ diff --git a/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java index 7df0630b779..eef8087cee2 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java @@ -1017,7 +1017,7 @@ public String toString() { Attributes.Builder attributes = Attributes.newBuilder() .set(XdsAttributes.ATTR_LOCALITY, locality) // Unique but arbitrary string - .set(XdsAttributes.ATTR_LOCALITY_NAME, locality.toString()); + .set(LoadBalancer.ATTR_LOCALITY_NAME, locality.toString()); if (authorityHostname != null) { attributes.set(XdsAttributes.ATTR_ADDRESS_NAME, authorityHostname); } diff --git a/xds/src/test/java/io/grpc/xds/WrrLocalityLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/WrrLocalityLoadBalancerTest.java index b6a5d8dbf73..89528161225 100644 --- a/xds/src/test/java/io/grpc/xds/WrrLocalityLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/WrrLocalityLoadBalancerTest.java @@ -254,7 +254,7 @@ public String toString() { } Attributes.Builder attrBuilder = Attributes.newBuilder() - .set(XdsAttributes.ATTR_LOCALITY_NAME, locality); + .set(LoadBalancer.ATTR_LOCALITY_NAME, locality); if (localityWeight != null) { attrBuilder.set(XdsAttributes.ATTR_LOCALITY_WEIGHT, localityWeight); } From 2ce087eebb7142b303f82b96e723cf246cc820a1 Mon Sep 17 00:00:00 2001 From: AgraVator Date: Wed, 9 Jul 2025 15:16:27 +0530 Subject: [PATCH 05/10] otel: fixes NPE --- .../io/grpc/internal/InternalSubchannel.java | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/InternalSubchannel.java b/core/src/main/java/io/grpc/internal/InternalSubchannel.java index 5ae082cbb68..3e8c6ff4241 100644 --- a/core/src/main/java/io/grpc/internal/InternalSubchannel.java +++ b/core/src/main/java/io/grpc/internal/InternalSubchannel.java @@ -589,8 +589,10 @@ public Attributes filterTransport(Attributes attributes) { public void transportReady() { channelLogger.log(ChannelLogLevel.INFO, "READY"); subchannelMetrics.recordConnectionAttemptSucceeded(buildLabelSet( - addressIndex.getCurrentEagAttributes().get(NameResolver.ATTR_BACKEND_SERVICE), - addressIndex.getCurrentEagAttributes().get(LoadBalancer.ATTR_LOCALITY_NAME), + getAttributeOrDefault( + addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE), + getAttributeOrDefault( + addressIndex.getCurrentEagAttributes(), LoadBalancer.ATTR_LOCALITY_NAME), null, extractSecurityLevel( addressIndex.getCurrentEagAttributes().get(GrpcAttributes.ATTR_SECURITY_LEVEL)) @@ -625,10 +627,10 @@ public void transportShutdown(final Status s) { ChannelLogLevel.INFO, "{0} SHUTDOWN with {1}", transport.getLogId(), printShortStatus(s)); shutdownInitiated = true; subchannelMetrics.recordConnectionAttemptFailed(buildLabelSet( - addressIndex.getCurrentEagAttributes().get(NameResolver.ATTR_BACKEND_SERVICE), - addressIndex.getCurrentEagAttributes().get(LoadBalancer.ATTR_LOCALITY_NAME), + getAttributeOrDefault(addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE), + getAttributeOrDefault(addressIndex.getCurrentEagAttributes(), LoadBalancer.ATTR_LOCALITY_NAME), null, null - )); + )); syncContext.execute(new Runnable() { @Override public void run() { @@ -670,8 +672,10 @@ public void transportTerminated() { filter.transportTerminated(transport.getAttributes()); } subchannelMetrics.recordDisconnection(buildLabelSet( - addressIndex.getCurrentEagAttributes().get(NameResolver.ATTR_BACKEND_SERVICE), - addressIndex.getCurrentEagAttributes().get(LoadBalancer.ATTR_LOCALITY_NAME), + getAttributeOrDefault( + addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE), + getAttributeOrDefault( + addressIndex.getCurrentEagAttributes(), LoadBalancer.ATTR_LOCALITY_NAME), "Peer Pressure", extractSecurityLevel( addressIndex.getCurrentEagAttributes().get(GrpcAttributes.ATTR_SECURITY_LEVEL)) @@ -702,6 +706,11 @@ private String extractSecurityLevel(SecurityLevel securityLevel) { throw new IllegalArgumentException("Unknown SecurityLevel: " + securityLevel); } } + + private String getAttributeOrDefault(Attributes attributes, Attributes.Key key) { + String value = attributes.get(key); + return value == null ? "" : value; + } } // All methods are called in syncContext From 2293e7fada544e6789778905bbdbd903664f6f1c Mon Sep 17 00:00:00 2001 From: AgraVator Date: Wed, 9 Jul 2025 15:31:59 +0530 Subject: [PATCH 06/10] otel: formatting fixes --- .../main/java/io/grpc/internal/InternalSubchannel.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/InternalSubchannel.java b/core/src/main/java/io/grpc/internal/InternalSubchannel.java index 3e8c6ff4241..61bdb088f0b 100644 --- a/core/src/main/java/io/grpc/internal/InternalSubchannel.java +++ b/core/src/main/java/io/grpc/internal/InternalSubchannel.java @@ -50,8 +50,8 @@ import io.grpc.MethodDescriptor; import io.grpc.MetricRecorder; import io.grpc.NameResolver; -import io.grpc.Status; import io.grpc.SecurityLevel; +import io.grpc.Status; import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext.ScheduledHandle; import java.net.SocketAddress; @@ -627,8 +627,10 @@ public void transportShutdown(final Status s) { ChannelLogLevel.INFO, "{0} SHUTDOWN with {1}", transport.getLogId(), printShortStatus(s)); shutdownInitiated = true; subchannelMetrics.recordConnectionAttemptFailed(buildLabelSet( - getAttributeOrDefault(addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE), - getAttributeOrDefault(addressIndex.getCurrentEagAttributes(), LoadBalancer.ATTR_LOCALITY_NAME), + getAttributeOrDefault( + addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE), + getAttributeOrDefault( + addressIndex.getCurrentEagAttributes(), LoadBalancer.ATTR_LOCALITY_NAME), null, null )); syncContext.execute(new Runnable() { From 623c0f91c560f829192271ec26517bf23a09d627 Mon Sep 17 00:00:00 2001 From: AgraVator Date: Tue, 15 Jul 2025 15:29:24 +0530 Subject: [PATCH 07/10] add test case --- .../io/grpc/internal/InternalSubchannel.java | 18 +-- .../io/grpc/internal/SubchannelMetrics.java | 2 +- .../grpc/internal/InternalSubchannelTest.java | 108 +++++++++++++++++- 3 files changed, 117 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/InternalSubchannel.java b/core/src/main/java/io/grpc/internal/InternalSubchannel.java index 61bdb088f0b..58cf8dadfde 100644 --- a/core/src/main/java/io/grpc/internal/InternalSubchannel.java +++ b/core/src/main/java/io/grpc/internal/InternalSubchannel.java @@ -633,6 +633,15 @@ public void transportShutdown(final Status s) { addressIndex.getCurrentEagAttributes(), LoadBalancer.ATTR_LOCALITY_NAME), null, null )); + subchannelMetrics.recordDisconnection(buildLabelSet( + getAttributeOrDefault( + addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE), + getAttributeOrDefault( + addressIndex.getCurrentEagAttributes(), LoadBalancer.ATTR_LOCALITY_NAME), + "Peer Pressure", + extractSecurityLevel( + addressIndex.getCurrentEagAttributes().get(GrpcAttributes.ATTR_SECURITY_LEVEL)) + )); syncContext.execute(new Runnable() { @Override public void run() { @@ -673,15 +682,6 @@ public void transportTerminated() { for (ClientTransportFilter filter : transportFilters) { filter.transportTerminated(transport.getAttributes()); } - subchannelMetrics.recordDisconnection(buildLabelSet( - getAttributeOrDefault( - addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE), - getAttributeOrDefault( - addressIndex.getCurrentEagAttributes(), LoadBalancer.ATTR_LOCALITY_NAME), - "Peer Pressure", - extractSecurityLevel( - addressIndex.getCurrentEagAttributes().get(GrpcAttributes.ATTR_SECURITY_LEVEL)) - )); syncContext.execute(new Runnable() { @Override public void run() { diff --git a/core/src/main/java/io/grpc/internal/SubchannelMetrics.java b/core/src/main/java/io/grpc/internal/SubchannelMetrics.java index 36f47affba1..8588157d4ad 100644 --- a/core/src/main/java/io/grpc/internal/SubchannelMetrics.java +++ b/core/src/main/java/io/grpc/internal/SubchannelMetrics.java @@ -39,7 +39,7 @@ public SubchannelMetrics(MetricRecorder metricRecorder) { MetricInstrumentRegistry metricInstrumentRegistry = MetricInstrumentRegistry.getDefaultRegistry(); disconnections = metricInstrumentRegistry.registerLongCounter( - "grpc.subchannel.disconnections1", + "grpc.subchannel.disconnections", "EXPERIMENTAL. Number of times the selected subchannel becomes disconnected", "{disconnection}", Lists.newArrayList("grpc.target"), diff --git a/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java b/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java index 48bb0f97453..d86fac5d01d 100644 --- a/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java +++ b/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java @@ -29,10 +29,13 @@ import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import static org.mockito.AdditionalAnswers.delegatesTo; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -48,7 +51,10 @@ import io.grpc.InternalLogId; import io.grpc.InternalWithLogId; import io.grpc.LoadBalancer; +import io.grpc.MetricInstrument; import io.grpc.MetricRecorder; +import io.grpc.NameResolver; +import io.grpc.SecurityLevel; import io.grpc.Status; import io.grpc.SynchronizationContext; import io.grpc.internal.InternalSubchannel.CallTracingTransport; @@ -69,6 +75,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; @@ -82,6 +89,9 @@ public class InternalSubchannelTest { public final MockitoRule mocks = MockitoJUnit.rule(); private static final String AUTHORITY = "fakeauthority"; + private static final String BACKEND_SERVICE = "ice-cream-factory-service"; + private static final String LOCALITY = "mars-olympus-mons-datacenter"; + private static final SecurityLevel SECURITY_LEVEL = SecurityLevel.PRIVACY_AND_INTEGRITY; private static final String USER_AGENT = "mosaic"; private static final ConnectivityStateInfo UNAVAILABLE_STATE = ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE); @@ -109,6 +119,12 @@ public void uncaughtException(Thread t, Throwable e) { @Mock private BackoffPolicy.Provider mockBackoffPolicyProvider; @Mock private ClientTransportFactory mockTransportFactory; + @Mock private BackoffPolicy mockBackoffPolicy; + private MetricRecorder mockMetricRecorder = mock(MetricRecorder.class, + delegatesTo(new MetricRecorderImpl())); + + private static final long RECONNECT_BACKOFF_DELAY_NANOS = TimeUnit.SECONDS.toNanos(1); + private final LinkedList callbackInvokes = new LinkedList<>(); private final InternalSubchannel.Callback mockInternalSubchannelCallback = new InternalSubchannel.Callback() { @@ -1449,8 +1465,90 @@ private void createInternalSubchannel(boolean reconnectDisabled, new ChannelLoggerImpl(subchannelTracer, fakeClock.getTimeProvider()), Collections.emptyList(), "", - new MetricRecorder() {} + new MetricRecorder() { + } + ); + } + + @Test + public void subchannelStateChanges_triggersMetrics_disconnectionOnly() { + // 1. Mock the backoff policy + when(mockBackoffPolicyProvider.get()).thenReturn(mockBackoffPolicy); + when(mockBackoffPolicy.nextBackoffNanos()).thenReturn(RECONNECT_BACKOFF_DELAY_NANOS); + + // 2. Setup Subchannel with attributes + SocketAddress addr = mock(SocketAddress.class); + Attributes eagAttributes = Attributes.newBuilder() + .set(NameResolver.ATTR_BACKEND_SERVICE, BACKEND_SERVICE) + .set(LoadBalancer.ATTR_LOCALITY_NAME, LOCALITY) + .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SECURITY_LEVEL) + .build(); + List addressGroups = + Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr), eagAttributes)); + createInternalSubchannel(new EquivalentAddressGroup(addr)); + InternalLogId logId = InternalLogId.allocate("Subchannel", /*details=*/ AUTHORITY); + ChannelTracer subchannelTracer = new ChannelTracer(logId, 10, + fakeClock.getTimeProvider().currentTimeNanos(), "Subchannel"); + LoadBalancer.CreateSubchannelArgs createSubchannelArgs = + LoadBalancer.CreateSubchannelArgs.newBuilder().setAddresses(addressGroups).build(); + internalSubchannel = new InternalSubchannel( + createSubchannelArgs, AUTHORITY, USER_AGENT, mockBackoffPolicyProvider, + mockTransportFactory, fakeClock.getScheduledExecutorService(), + fakeClock.getStopwatchSupplier(), syncContext, mockInternalSubchannelCallback, channelz, + CallTracer.getDefaultFactory().create(), subchannelTracer, logId, + new ChannelLoggerImpl(subchannelTracer, fakeClock.getTimeProvider()), + Collections.emptyList(), AUTHORITY, mockMetricRecorder + ); + + // --- Action --- + internalSubchannel.obtainActiveTransport(); + MockClientTransportInfo transportInfo = transports.poll(); + assertNotNull(transportInfo); + transportInfo.listener.transportReady(); + fakeClock.runDueTasks(); + + transportInfo.listener.transportShutdown(Status.UNAVAILABLE); + fakeClock.runDueTasks(); + + // --- Verification --- + InOrder inOrder = inOrder(mockMetricRecorder); + + // Verify successful connection metrics + inOrder.verify(mockMetricRecorder).addLongCounter( + eqMetricInstrumentName("grpc.subchannel.connection_attempts_succeeded"), + eq(1L), + eq(Arrays.asList(AUTHORITY)), + eq(Arrays.asList(BACKEND_SERVICE, LOCALITY)) + ); + inOrder.verify(mockMetricRecorder).addLongUpDownCounter( + eqMetricInstrumentName("grpc.subchannel.open_connections"), + eq(1L), + eq(Arrays.asList(AUTHORITY)), + eq(Arrays.asList("privacy_and_integrity", BACKEND_SERVICE, LOCALITY)) + ); + + inOrder.verify(mockMetricRecorder).addLongCounter( + eqMetricInstrumentName("grpc.subchannel.connection_attempts_failed"), + eq(1L), + eq(Arrays.asList(AUTHORITY)), + eq(Arrays.asList(BACKEND_SERVICE, LOCALITY)) ); + + // Verify disconnection and automatic failure metrics + inOrder.verify(mockMetricRecorder).addLongCounter( + eqMetricInstrumentName("grpc.subchannel.disconnections"), + eq(1L), + eq(Arrays.asList(AUTHORITY)), + eq(Arrays.asList(BACKEND_SERVICE, LOCALITY, "Peer Pressure")) + ); + inOrder.verify(mockMetricRecorder).addLongUpDownCounter( + eqMetricInstrumentName("grpc.subchannel.open_connections"), + eq(-1L), + eq(Arrays.asList(AUTHORITY)), + eq(Arrays.asList("privacy_and_integrity", BACKEND_SERVICE, LOCALITY)) + ); + + inOrder.verifyNoMoreInteractions(); } private void assertNoCallbackInvoke() { @@ -1463,5 +1561,13 @@ private void assertExactCallbackInvokes(String ... expectedInvokes) { callbackInvokes.clear(); } + static class MetricRecorderImpl implements MetricRecorder { + } + + @SuppressWarnings("TypeParameterUnusedInFormals") + private T eqMetricInstrumentName(String name) { + return argThat(instrument -> instrument.getName().equals(name)); + } + private static class FakeSocketAddress extends SocketAddress {} } From c7135618ae305cb89808b2728c026ee88b9d96eb Mon Sep 17 00:00:00 2001 From: AgraVator Date: Tue, 22 Jul 2025 23:25:11 +0530 Subject: [PATCH 08/10] logs the metrics inside sync context --- .../grpc/internal/DelayedClientTransport.java | 4 +- .../io/grpc/internal/InternalSubchannel.java | 52 ++++++------- .../grpc/internal/InternalSubchannelTest.java | 76 ++++++++++++++----- 3 files changed, 87 insertions(+), 45 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index eccd8fadc8c..f2eccf0af4c 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -201,8 +201,8 @@ public ListenableFuture getStats() { } /** - * Prevents creating any new streams. Buffered streams are not failed and may still proceed - * when {@link #reprocess} is called. The delayed transport will be terminated when there is no + * Prevents creating any new streams. Buffered streams are not failed and may still proceed + * when {@link #reprocess} is called. The delayed transport will be terminated when there is no * more buffered streams. */ @Override diff --git a/core/src/main/java/io/grpc/internal/InternalSubchannel.java b/core/src/main/java/io/grpc/internal/InternalSubchannel.java index 58cf8dadfde..bced2aaf097 100644 --- a/core/src/main/java/io/grpc/internal/InternalSubchannel.java +++ b/core/src/main/java/io/grpc/internal/InternalSubchannel.java @@ -588,15 +588,6 @@ public Attributes filterTransport(Attributes attributes) { @Override public void transportReady() { channelLogger.log(ChannelLogLevel.INFO, "READY"); - subchannelMetrics.recordConnectionAttemptSucceeded(buildLabelSet( - getAttributeOrDefault( - addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE), - getAttributeOrDefault( - addressIndex.getCurrentEagAttributes(), LoadBalancer.ATTR_LOCALITY_NAME), - null, - extractSecurityLevel( - addressIndex.getCurrentEagAttributes().get(GrpcAttributes.ATTR_SECURITY_LEVEL)) - )); syncContext.execute(new Runnable() { @Override public void run() { @@ -611,6 +602,15 @@ public void run() { pendingTransport = null; connectedAddressAttributes = addressIndex.getCurrentEagAttributes(); gotoNonErrorState(READY); + subchannelMetrics.recordConnectionAttemptSucceeded(buildLabelSet( + getAttributeOrDefault( + addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE), + getAttributeOrDefault( + addressIndex.getCurrentEagAttributes(), LoadBalancer.ATTR_LOCALITY_NAME), + null, + extractSecurityLevel( + addressIndex.getCurrentEagAttributes().get(GrpcAttributes.ATTR_SECURITY_LEVEL)) + )); } } }); @@ -626,22 +626,6 @@ public void transportShutdown(final Status s) { channelLogger.log( ChannelLogLevel.INFO, "{0} SHUTDOWN with {1}", transport.getLogId(), printShortStatus(s)); shutdownInitiated = true; - subchannelMetrics.recordConnectionAttemptFailed(buildLabelSet( - getAttributeOrDefault( - addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE), - getAttributeOrDefault( - addressIndex.getCurrentEagAttributes(), LoadBalancer.ATTR_LOCALITY_NAME), - null, null - )); - subchannelMetrics.recordDisconnection(buildLabelSet( - getAttributeOrDefault( - addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE), - getAttributeOrDefault( - addressIndex.getCurrentEagAttributes(), LoadBalancer.ATTR_LOCALITY_NAME), - "Peer Pressure", - extractSecurityLevel( - addressIndex.getCurrentEagAttributes().get(GrpcAttributes.ATTR_SECURITY_LEVEL)) - )); syncContext.execute(new Runnable() { @Override public void run() { @@ -652,11 +636,27 @@ public void run() { activeTransport = null; addressIndex.reset(); gotoNonErrorState(IDLE); + subchannelMetrics.recordDisconnection(buildLabelSet( + getAttributeOrDefault( + addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE), + getAttributeOrDefault( + addressIndex.getCurrentEagAttributes(), LoadBalancer.ATTR_LOCALITY_NAME), + "Peer Pressure", + extractSecurityLevel( + addressIndex.getCurrentEagAttributes().get(GrpcAttributes.ATTR_SECURITY_LEVEL)) + )); } else if (pendingTransport == transport) { + subchannelMetrics.recordConnectionAttemptFailed(buildLabelSet( + getAttributeOrDefault( + addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE), + getAttributeOrDefault( + addressIndex.getCurrentEagAttributes(), LoadBalancer.ATTR_LOCALITY_NAME), + null, null + )); Preconditions.checkState(state.getState() == CONNECTING, "Expected state is CONNECTING, actual state is %s", state.getState()); addressIndex.increment(); - // Continue reconnect if there are still addresses to try. + // Continue reconnecting with remaining addresses. if (!addressIndex.isValid()) { pendingTransport = null; addressIndex.reset(); diff --git a/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java b/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java index d86fac5d01d..c2bacd488b4 100644 --- a/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java +++ b/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java @@ -123,8 +123,6 @@ public void uncaughtException(Thread t, Throwable e) { private MetricRecorder mockMetricRecorder = mock(MetricRecorder.class, delegatesTo(new MetricRecorderImpl())); - private static final long RECONNECT_BACKOFF_DELAY_NANOS = TimeUnit.SECONDS.toNanos(1); - private final LinkedList callbackInvokes = new LinkedList<>(); private final InternalSubchannel.Callback mockInternalSubchannelCallback = new InternalSubchannel.Callback() { @@ -1471,10 +1469,60 @@ private void createInternalSubchannel(boolean reconnectDisabled, } @Test - public void subchannelStateChanges_triggersMetrics_disconnectionOnly() { - // 1. Mock the backoff policy + public void subchannelStateChanges_triggersAttemptFailedMetric() { + // 1. Setup: Standard subchannel initialization + when(mockBackoffPolicyProvider.get()).thenReturn(mockBackoffPolicy); + SocketAddress addr = mock(SocketAddress.class); + Attributes eagAttributes = Attributes.newBuilder() + .set(NameResolver.ATTR_BACKEND_SERVICE, BACKEND_SERVICE) + .set(LoadBalancer.ATTR_LOCALITY_NAME, LOCALITY) + .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SECURITY_LEVEL) + .build(); + List addressGroups = + Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr), eagAttributes)); + InternalLogId logId = InternalLogId.allocate("Subchannel", /*details=*/ AUTHORITY); + ChannelTracer subchannelTracer = new ChannelTracer(logId, 10, + fakeClock.getTimeProvider().currentTimeNanos(), "Subchannel"); + LoadBalancer.CreateSubchannelArgs createSubchannelArgs = + LoadBalancer.CreateSubchannelArgs.newBuilder().setAddresses(addressGroups).build(); + internalSubchannel = new InternalSubchannel( + createSubchannelArgs, AUTHORITY, USER_AGENT, mockBackoffPolicyProvider, + mockTransportFactory, fakeClock.getScheduledExecutorService(), + fakeClock.getStopwatchSupplier(), syncContext, mockInternalSubchannelCallback, channelz, + CallTracer.getDefaultFactory().create(), subchannelTracer, logId, + new ChannelLoggerImpl(subchannelTracer, fakeClock.getTimeProvider()), + Collections.emptyList(), AUTHORITY, mockMetricRecorder + ); + + // --- Action: Simulate the "connecting to failed" transition --- + // a. Initiate the connection attempt. The subchannel is now CONNECTING. + internalSubchannel.obtainActiveTransport(); + MockClientTransportInfo transportInfo = transports.poll(); + assertNotNull("A connection attempt should have been made", transportInfo); + + // b. Fail the transport before it can signal `transportReady()`. + transportInfo.listener.transportShutdown( + Status.INTERNAL.withDescription("Simulated connect failure")); + fakeClock.runDueTasks(); // Process the failure event + + // --- Verification --- + // a. Verify that the "connection_attempts_failed" metric was recorded exactly once. + verify(mockMetricRecorder).addLongCounter( + eqMetricInstrumentName("grpc.subchannel.connection_attempts_failed"), + eq(1L), + eq(Arrays.asList(AUTHORITY)), + eq(Arrays.asList(BACKEND_SERVICE, LOCALITY)) + ); + + // b. Verify no other metrics were recorded. This confirms it wasn't incorrectly + // logged as a success, disconnection, or open connection. + verifyNoMoreInteractions(mockMetricRecorder); + } + + @Test + public void subchannelStateChanges_triggersSuccessAndDisconnectMetrics() { + // 1. Mock the backoff policy (needed for subchannel creation) when(mockBackoffPolicyProvider.get()).thenReturn(mockBackoffPolicy); - when(mockBackoffPolicy.nextBackoffNanos()).thenReturn(RECONNECT_BACKOFF_DELAY_NANOS); // 2. Setup Subchannel with attributes SocketAddress addr = mock(SocketAddress.class); @@ -1500,15 +1548,16 @@ public void subchannelStateChanges_triggersMetrics_disconnectionOnly() { Collections.emptyList(), AUTHORITY, mockMetricRecorder ); - // --- Action --- + // --- Action: Successful connection --- internalSubchannel.obtainActiveTransport(); MockClientTransportInfo transportInfo = transports.poll(); assertNotNull(transportInfo); transportInfo.listener.transportReady(); - fakeClock.runDueTasks(); + fakeClock.runDueTasks(); // Process the successful connection - transportInfo.listener.transportShutdown(Status.UNAVAILABLE); - fakeClock.runDueTasks(); + // --- Action: Transport is shut down by the "peer" --- + transportInfo.listener.transportShutdown(Status.UNAVAILABLE.withDescription("Peer Pressure")); + fakeClock.runDueTasks(); // Process the shutdown // --- Verification --- InOrder inOrder = inOrder(mockMetricRecorder); @@ -1527,14 +1576,7 @@ public void subchannelStateChanges_triggersMetrics_disconnectionOnly() { eq(Arrays.asList("privacy_and_integrity", BACKEND_SERVICE, LOCALITY)) ); - inOrder.verify(mockMetricRecorder).addLongCounter( - eqMetricInstrumentName("grpc.subchannel.connection_attempts_failed"), - eq(1L), - eq(Arrays.asList(AUTHORITY)), - eq(Arrays.asList(BACKEND_SERVICE, LOCALITY)) - ); - - // Verify disconnection and automatic failure metrics + // Verify disconnection metrics inOrder.verify(mockMetricRecorder).addLongCounter( eqMetricInstrumentName("grpc.subchannel.disconnections"), eq(1L), From ca8e9ede7e0a384e2a7794b5a3b0d6de2478885a Mon Sep 17 00:00:00 2001 From: AgraVator Date: Wed, 23 Jul 2025 10:27:10 +0530 Subject: [PATCH 09/10] reframe comments --- .../main/java/io/grpc/internal/DelayedClientTransport.java | 4 ++-- core/src/main/java/io/grpc/internal/InternalSubchannel.java | 2 +- .../main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index f2eccf0af4c..eccd8fadc8c 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -201,8 +201,8 @@ public ListenableFuture getStats() { } /** - * Prevents creating any new streams. Buffered streams are not failed and may still proceed - * when {@link #reprocess} is called. The delayed transport will be terminated when there is no + * Prevents creating any new streams. Buffered streams are not failed and may still proceed + * when {@link #reprocess} is called. The delayed transport will be terminated when there is no * more buffered streams. */ @Override diff --git a/core/src/main/java/io/grpc/internal/InternalSubchannel.java b/core/src/main/java/io/grpc/internal/InternalSubchannel.java index bced2aaf097..fdb6fc2b617 100644 --- a/core/src/main/java/io/grpc/internal/InternalSubchannel.java +++ b/core/src/main/java/io/grpc/internal/InternalSubchannel.java @@ -656,7 +656,7 @@ public void run() { Preconditions.checkState(state.getState() == CONNECTING, "Expected state is CONNECTING, actual state is %s", state.getState()); addressIndex.increment(); - // Continue reconnecting with remaining addresses. + // Continue to reconnect if there are still addresses to try. if (!addressIndex.isValid()) { pendingTransport = null; addressIndex.reset(); diff --git a/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java index 96e42ac10a4..ebe329ca591 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java @@ -92,7 +92,7 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { return Status.FAILED_PRECONDITION.withDescription("Already shut down"); } - // Check weather or not this is a petiole policy, which is based off of an address attribute + // Check whether this is a petiole policy, which is based off of an address attribute Boolean isPetiolePolicy = resolvedAddresses.getAttributes().get(IS_PETIOLE_POLICY); this.notAPetiolePolicy = isPetiolePolicy == null || !isPetiolePolicy; From b45a907ab8a1b7777eae82334d9c7af6a7035721 Mon Sep 17 00:00:00 2001 From: AgraVator Date: Tue, 29 Jul 2025 14:43:16 +0530 Subject: [PATCH 10/10] suggested changes --- .../java/io/grpc/EquivalentAddressGroup.java | 5 ++ api/src/main/java/io/grpc/LoadBalancer.java | 6 -- .../io/grpc/internal/InternalSubchannel.java | 63 ++++++------- .../io/grpc/internal/MetricsAttributes.java | 78 ++++++++++++++++ .../grpc/internal/OtelMetricsAttributes.java | 69 -------------- .../io/grpc/internal/SubchannelMetrics.java | 89 ++++++++++++++++++- .../grpc/internal/InternalSubchannelTest.java | 10 +-- .../grpc/internal/MetricRecorderImplTest.java | 33 ++++++- .../io/grpc/xds/ClusterImplLoadBalancer.java | 2 +- .../grpc/xds/ClusterResolverLoadBalancer.java | 4 +- .../io/grpc/xds/WrrLocalityLoadBalancer.java | 2 +- .../grpc/xds/ClusterImplLoadBalancerTest.java | 2 +- .../grpc/xds/WrrLocalityLoadBalancerTest.java | 2 +- 13 files changed, 237 insertions(+), 128 deletions(-) create mode 100644 core/src/main/java/io/grpc/internal/MetricsAttributes.java delete mode 100644 core/src/main/java/io/grpc/internal/OtelMetricsAttributes.java diff --git a/api/src/main/java/io/grpc/EquivalentAddressGroup.java b/api/src/main/java/io/grpc/EquivalentAddressGroup.java index 4b3db006684..038ae91689c 100644 --- a/api/src/main/java/io/grpc/EquivalentAddressGroup.java +++ b/api/src/main/java/io/grpc/EquivalentAddressGroup.java @@ -50,6 +50,11 @@ public final class EquivalentAddressGroup { @ExperimentalApi("https://github.com/grpc/grpc-java/issues/6138") public static final Attributes.Key ATTR_AUTHORITY_OVERRIDE = Attributes.Key.create("io.grpc.EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE"); + /** + * The name of the locality that this EquivalentAddressGroup is in. + */ + public static final Attributes.Key ATTR_LOCALITY_NAME = + Attributes.Key.create("io.grpc.lb.locality"); private final List addrs; private final Attributes attrs; diff --git a/api/src/main/java/io/grpc/LoadBalancer.java b/api/src/main/java/io/grpc/LoadBalancer.java index 433bddc372e..d2fd8409e01 100644 --- a/api/src/main/java/io/grpc/LoadBalancer.java +++ b/api/src/main/java/io/grpc/LoadBalancer.java @@ -135,12 +135,6 @@ public abstract class LoadBalancer { public static final Attributes.Key IS_PETIOLE_POLICY = Attributes.Key.create("io.grpc.IS_PETIOLE_POLICY"); - /** - * The name of the locality that this EquivalentAddressGroup is in. - */ - public static final Attributes.Key ATTR_LOCALITY_NAME = - Attributes.Key.create("io.grpc.lb.locality"); - /** * A picker that always returns an erring pick. * diff --git a/core/src/main/java/io/grpc/internal/InternalSubchannel.java b/core/src/main/java/io/grpc/internal/InternalSubchannel.java index fdb6fc2b617..701bf886f95 100644 --- a/core/src/main/java/io/grpc/internal/InternalSubchannel.java +++ b/core/src/main/java/io/grpc/internal/InternalSubchannel.java @@ -602,15 +602,15 @@ public void run() { pendingTransport = null; connectedAddressAttributes = addressIndex.getCurrentEagAttributes(); gotoNonErrorState(READY); - subchannelMetrics.recordConnectionAttemptSucceeded(buildLabelSet( - getAttributeOrDefault( - addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE), - getAttributeOrDefault( - addressIndex.getCurrentEagAttributes(), LoadBalancer.ATTR_LOCALITY_NAME), - null, - extractSecurityLevel( - addressIndex.getCurrentEagAttributes().get(GrpcAttributes.ATTR_SECURITY_LEVEL)) - )); + subchannelMetrics.recordConnectionAttemptSucceeded(MetricsAttributes.newBuilder(target) + .backendService(getAttributeOrDefault( + addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE)) + .locality(getAttributeOrDefault( + addressIndex.getCurrentEagAttributes(), + EquivalentAddressGroup.ATTR_LOCALITY_NAME)) + .securityLevel(extractSecurityLevel( + addressIndex.getCurrentEagAttributes().get(GrpcAttributes.ATTR_SECURITY_LEVEL))) + .build()); } } }); @@ -636,23 +636,24 @@ public void run() { activeTransport = null; addressIndex.reset(); gotoNonErrorState(IDLE); - subchannelMetrics.recordDisconnection(buildLabelSet( - getAttributeOrDefault( - addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE), - getAttributeOrDefault( - addressIndex.getCurrentEagAttributes(), LoadBalancer.ATTR_LOCALITY_NAME), - "Peer Pressure", - extractSecurityLevel( - addressIndex.getCurrentEagAttributes().get(GrpcAttributes.ATTR_SECURITY_LEVEL)) - )); + subchannelMetrics.recordDisconnection(MetricsAttributes.newBuilder(target) + .backendService(getAttributeOrDefault( + addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE)) + .locality(getAttributeOrDefault( + addressIndex.getCurrentEagAttributes(), + EquivalentAddressGroup.ATTR_LOCALITY_NAME)) + .disconnectError(SubchannelMetrics.DisconnectError.UNKNOWN.getErrorString(null)) + .securityLevel(extractSecurityLevel( + addressIndex.getCurrentEagAttributes().get(GrpcAttributes.ATTR_SECURITY_LEVEL))) + .build()); } else if (pendingTransport == transport) { - subchannelMetrics.recordConnectionAttemptFailed(buildLabelSet( - getAttributeOrDefault( - addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE), - getAttributeOrDefault( - addressIndex.getCurrentEagAttributes(), LoadBalancer.ATTR_LOCALITY_NAME), - null, null - )); + subchannelMetrics.recordConnectionAttemptFailed(MetricsAttributes.newBuilder(target) + .backendService(getAttributeOrDefault( + addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE)) + .locality(getAttributeOrDefault( + addressIndex.getCurrentEagAttributes(), + EquivalentAddressGroup.ATTR_LOCALITY_NAME)) + .build()); Preconditions.checkState(state.getState() == CONNECTING, "Expected state is CONNECTING, actual state is %s", state.getState()); addressIndex.increment(); @@ -872,18 +873,6 @@ private String printShortStatus(Status status) { return buffer.toString(); } - private OtelMetricsAttributes buildLabelSet(String backendService, String locality, - String disconnectError, String securityLevel) { - return new OtelMetricsAttributes( - target, - backendService, - locality, - disconnectError, - securityLevel - ); - } - - @VisibleForTesting static final class TransportLogger extends ChannelLogger { // Changed just after construction to break a cyclic dependency. diff --git a/core/src/main/java/io/grpc/internal/MetricsAttributes.java b/core/src/main/java/io/grpc/internal/MetricsAttributes.java new file mode 100644 index 00000000000..774ec5251e0 --- /dev/null +++ b/core/src/main/java/io/grpc/internal/MetricsAttributes.java @@ -0,0 +1,78 @@ +/* + * Copyright 2025 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.internal; + +class MetricsAttributes { + final String target; + final String backendService; + final String locality; + final String disconnectError; + final String securityLevel; + + // Constructor is private, only the Builder can call it + private MetricsAttributes(Builder builder) { + this.target = builder.target; + this.backendService = builder.backendService; + this.locality = builder.locality; + this.disconnectError = builder.disconnectError; + this.securityLevel = builder.securityLevel; + } + + // Public static method to get a new builder instance + public static Builder newBuilder(String target) { + return new Builder(target); + } + + public static class Builder { + // Required parameter + private final String target; + + // Optional parameters - initialized to default values + private String backendService = null; + private String locality = null; + private String disconnectError = null; + private String securityLevel = null; + + public Builder(String target) { + this.target = target; + } + + public Builder backendService(String val) { + this.backendService = val; + return this; + } + + public Builder locality(String val) { + this.locality = val; + return this; + } + + public Builder disconnectError(String val) { + this.disconnectError = val; + return this; + } + + public Builder securityLevel(String val) { + this.securityLevel = val; + return this; + } + + public MetricsAttributes build() { + return new MetricsAttributes(this); + } + } +} \ No newline at end of file diff --git a/core/src/main/java/io/grpc/internal/OtelMetricsAttributes.java b/core/src/main/java/io/grpc/internal/OtelMetricsAttributes.java deleted file mode 100644 index 494182256e0..00000000000 --- a/core/src/main/java/io/grpc/internal/OtelMetricsAttributes.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright 2025 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.internal; - - -import io.grpc.Attributes; - -class OtelMetricsAttributes { - final String target; - final String backendService; - final String locality; - final String disconnectError; - final String securityLevel; - - public OtelMetricsAttributes(String target, String backendService, String locality, - String disconnectError, String securityLevel) { - this.target = target; - this.backendService = backendService; - this.locality = locality; - this.disconnectError = disconnectError; - this.securityLevel = securityLevel; - } - - public Attributes toOtelMetricsAttributes() { - Attributes attributes = - Attributes.EMPTY; - - if (target != null) { - attributes.toBuilder() - .set(Attributes.Key.create("grpc.target"), target) - .build(); - } - if (backendService != null) { - attributes.toBuilder() - .set(Attributes.Key.create("grpc.lb.backend_service"), backendService) - .build(); - } - if (locality != null) { - attributes.toBuilder() - .set(Attributes.Key.create("grpc.lb.locality"), locality) - .build(); - } - if (disconnectError != null) { - attributes.toBuilder() - .set(Attributes.Key.create("grpc.disconnect_error"), disconnectError) - .build(); - } - if (securityLevel != null) { - attributes.toBuilder() - .set(Attributes.Key.create("grpc.security_level"), securityLevel) - .build(); - } - return attributes; - } -} \ No newline at end of file diff --git a/core/src/main/java/io/grpc/internal/SubchannelMetrics.java b/core/src/main/java/io/grpc/internal/SubchannelMetrics.java index 8588157d4ad..2112e7bc2c4 100644 --- a/core/src/main/java/io/grpc/internal/SubchannelMetrics.java +++ b/core/src/main/java/io/grpc/internal/SubchannelMetrics.java @@ -22,8 +22,9 @@ import io.grpc.LongUpDownCounterMetricInstrument; import io.grpc.MetricInstrumentRegistry; import io.grpc.MetricRecorder; +import javax.annotation.Nullable; -public final class SubchannelMetrics { +final class SubchannelMetrics { private static final LongCounterMetricInstrument disconnections; private static final LongCounterMetricInstrument connectionAttemptsSucceeded; @@ -75,7 +76,7 @@ public SubchannelMetrics(MetricRecorder metricRecorder) { ); } - public void recordConnectionAttemptSucceeded(OtelMetricsAttributes labelSet) { + public void recordConnectionAttemptSucceeded(MetricsAttributes labelSet) { metricRecorder .addLongCounter(connectionAttemptsSucceeded, 1, ImmutableList.of(labelSet.target), @@ -86,14 +87,14 @@ public void recordConnectionAttemptSucceeded(OtelMetricsAttributes labelSet) { ImmutableList.of(labelSet.securityLevel, labelSet.backendService, labelSet.locality)); } - public void recordConnectionAttemptFailed(OtelMetricsAttributes labelSet) { + public void recordConnectionAttemptFailed(MetricsAttributes labelSet) { metricRecorder .addLongCounter(connectionAttemptsFailed, 1, ImmutableList.of(labelSet.target), ImmutableList.of(labelSet.backendService, labelSet.locality)); } - public void recordDisconnection(OtelMetricsAttributes labelSet) { + public void recordDisconnection(MetricsAttributes labelSet) { metricRecorder .addLongCounter(disconnections, 1, ImmutableList.of(labelSet.target), @@ -103,4 +104,84 @@ public void recordDisconnection(OtelMetricsAttributes labelSet) { ImmutableList.of(labelSet.target), ImmutableList.of(labelSet.securityLevel, labelSet.backendService, labelSet.locality)); } + + /** + * Represents the reason for a subchannel failure. + */ + public enum DisconnectError { + + /** + * Represents an HTTP/2 GOAWAY frame. The specific error code + * (e.g., "NO_ERROR", "PROTOCOL_ERROR") should be handled separately + * as it is a dynamic part of the error. + * See RFC 9113 for error codes: https://www.rfc-editor.org/rfc/rfc9113.html#name-error-codes + */ + GOAWAY("goaway"), + + /** + * The subchannel was shut down for various reasons like parent channel shutdown, + * idleness, or load balancing policy changes. + */ + SUBCHANNEL_SHUTDOWN("subchannel shutdown"), + + /** + * Connection was reset (e.g., ECONNRESET, WSAECONNERESET). + */ + CONNECTION_RESET("connection reset"), + + /** + * Connection timed out (e.g., ETIMEDOUT, WSAETIMEDOUT), including closures + * from gRPC keepalives. + */ + CONNECTION_TIMED_OUT("connection timed out"), + + /** + * Connection was aborted (e.g., ECONNABORTED, WSAECONNABORTED). + */ + CONNECTION_ABORTED("connection aborted"), + + /** + * Any socket error not covered by other specific disconnect errors. + */ + SOCKET_ERROR("socket error"), + + /** + * A catch-all for any other unclassified reason. + */ + UNKNOWN("unknown"); + + private final String errorTag; + + /** + * Private constructor to associate a description with each enum constant. + * + * @param errorTag The detailed explanation of the error. + */ + DisconnectError(String errorTag) { + this.errorTag = errorTag; + } + + /** + * Gets the error string suitable for use as a metric tag. + * + *

If the reason is {@code GOAWAY}, this method requires the specific + * HTTP/2 error code to create the complete tag (e.g., "goaway PROTOCOL_ERROR"). + * For all other reasons, the parameter is ignored.

+ * + * @param goawayErrorCode The specific HTTP/2 error code. This is only + * used if the reason is GOAWAY and should not be null in that case. + * @return The formatted error string. + */ + public String getErrorString(@Nullable String goawayErrorCode) { + if (this == GOAWAY) { + if (goawayErrorCode == null || goawayErrorCode.isEmpty()) { + // Return the base tag if the code is missing, or consider throwing an exception + // throw new IllegalArgumentException("goawayErrorCode is required for GOAWAY reason."); + return this.errorTag; + } + return this.errorTag + " " + goawayErrorCode; + } + return this.errorTag; + } + } } diff --git a/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java b/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java index c2bacd488b4..4ac5fbac362 100644 --- a/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java +++ b/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java @@ -1475,7 +1475,7 @@ public void subchannelStateChanges_triggersAttemptFailedMetric() { SocketAddress addr = mock(SocketAddress.class); Attributes eagAttributes = Attributes.newBuilder() .set(NameResolver.ATTR_BACKEND_SERVICE, BACKEND_SERVICE) - .set(LoadBalancer.ATTR_LOCALITY_NAME, LOCALITY) + .set(EquivalentAddressGroup.ATTR_LOCALITY_NAME, LOCALITY) .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SECURITY_LEVEL) .build(); List addressGroups = @@ -1528,7 +1528,7 @@ public void subchannelStateChanges_triggersSuccessAndDisconnectMetrics() { SocketAddress addr = mock(SocketAddress.class); Attributes eagAttributes = Attributes.newBuilder() .set(NameResolver.ATTR_BACKEND_SERVICE, BACKEND_SERVICE) - .set(LoadBalancer.ATTR_LOCALITY_NAME, LOCALITY) + .set(EquivalentAddressGroup.ATTR_LOCALITY_NAME, LOCALITY) .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SECURITY_LEVEL) .build(); List addressGroups = @@ -1555,8 +1555,8 @@ public void subchannelStateChanges_triggersSuccessAndDisconnectMetrics() { transportInfo.listener.transportReady(); fakeClock.runDueTasks(); // Process the successful connection - // --- Action: Transport is shut down by the "peer" --- - transportInfo.listener.transportShutdown(Status.UNAVAILABLE.withDescription("Peer Pressure")); + // --- Action: Transport is shut down --- + transportInfo.listener.transportShutdown(Status.UNAVAILABLE.withDescription("unknown")); fakeClock.runDueTasks(); // Process the shutdown // --- Verification --- @@ -1581,7 +1581,7 @@ public void subchannelStateChanges_triggersSuccessAndDisconnectMetrics() { eqMetricInstrumentName("grpc.subchannel.disconnections"), eq(1L), eq(Arrays.asList(AUTHORITY)), - eq(Arrays.asList(BACKEND_SERVICE, LOCALITY, "Peer Pressure")) + eq(Arrays.asList(BACKEND_SERVICE, LOCALITY, "unknown")) ); inOrder.verify(mockMetricRecorder).addLongUpDownCounter( eqMetricInstrumentName("grpc.subchannel.open_connections"), diff --git a/core/src/test/java/io/grpc/internal/MetricRecorderImplTest.java b/core/src/test/java/io/grpc/internal/MetricRecorderImplTest.java index 08f34a267f9..33bf9bb41e2 100644 --- a/core/src/test/java/io/grpc/internal/MetricRecorderImplTest.java +++ b/core/src/test/java/io/grpc/internal/MetricRecorderImplTest.java @@ -32,6 +32,7 @@ import io.grpc.LongCounterMetricInstrument; import io.grpc.LongGaugeMetricInstrument; import io.grpc.LongHistogramMetricInstrument; +import io.grpc.LongUpDownCounterMetricInstrument; import io.grpc.MetricInstrumentRegistry; import io.grpc.MetricInstrumentRegistryAccessor; import io.grpc.MetricRecorder; @@ -79,6 +80,9 @@ public class MetricRecorderImplTest { private final LongGaugeMetricInstrument longGaugeInstrument = registry.registerLongGauge("gauge0", DESCRIPTION, UNIT, REQUIRED_LABEL_KEYS, OPTIONAL_LABEL_KEYS, ENABLED); + private final LongUpDownCounterMetricInstrument longUpDownCounterInstrument = + registry.registerLongUpDownCounter("upDownCounter0", DESCRIPTION, UNIT, + REQUIRED_LABEL_KEYS, OPTIONAL_LABEL_KEYS, ENABLED); private MetricRecorder recorder; @Before @@ -88,7 +92,7 @@ public void setUp() { @Test public void addCounter() { - when(mockSink.getMeasuresSize()).thenReturn(4); + when(mockSink.getMeasuresSize()).thenReturn(6); recorder.addDoubleCounter(doubleCounterInstrument, 1.0, REQUIRED_LABEL_VALUES, OPTIONAL_LABEL_VALUES); @@ -100,6 +104,12 @@ public void addCounter() { verify(mockSink, times(2)).addLongCounter(eq(longCounterInstrument), eq(1L), eq(REQUIRED_LABEL_VALUES), eq(OPTIONAL_LABEL_VALUES)); + recorder.addLongUpDownCounter(longUpDownCounterInstrument, -10, REQUIRED_LABEL_VALUES, + OPTIONAL_LABEL_VALUES); + verify(mockSink, times(2)) + .addLongUpDownCounter(eq(longUpDownCounterInstrument), eq(-10L), + eq(REQUIRED_LABEL_VALUES), eq(OPTIONAL_LABEL_VALUES)); + verify(mockSink, never()).updateMeasures(registry.getMetricInstruments()); } @@ -190,6 +200,13 @@ public void newRegisteredMetricUpdateMeasures() { verify(mockSink, times(2)) .registerBatchCallback(any(Runnable.class), eq(longGaugeInstrument)); registration.close(); + + // Long UpDown Counter + recorder.addLongUpDownCounter(longUpDownCounterInstrument, -10, REQUIRED_LABEL_VALUES, + OPTIONAL_LABEL_VALUES); + verify(mockSink, times(12)).updateMeasures(anyList()); + verify(mockSink, times(2)).addLongUpDownCounter(eq(longUpDownCounterInstrument), eq(-10L), + eq(REQUIRED_LABEL_VALUES), eq(OPTIONAL_LABEL_VALUES)); } @Test(expected = IllegalArgumentException.class) @@ -208,6 +225,13 @@ public void addLongCounterMismatchedRequiredLabelValues() { OPTIONAL_LABEL_VALUES); } + @Test(expected = IllegalArgumentException.class) + public void addLongUpDownCounterMismatchedRequiredLabelValues() { + when(mockSink.getMeasuresSize()).thenReturn(6); + recorder.addLongUpDownCounter(longUpDownCounterInstrument, 1, ImmutableList.of(), + OPTIONAL_LABEL_VALUES); + } + @Test(expected = IllegalArgumentException.class) public void recordDoubleHistogramMismatchedRequiredLabelValues() { when(mockSink.getMeasuresSize()).thenReturn(4); @@ -260,6 +284,13 @@ public void addLongCounterMismatchedOptionalLabelValues() { ImmutableList.of()); } + @Test(expected = IllegalArgumentException.class) + public void addLongUpDownCounterMismatchedOptionalLabelValues() { + when(mockSink.getMeasuresSize()).thenReturn(6); + recorder.addLongUpDownCounter(longUpDownCounterInstrument, 1, REQUIRED_LABEL_VALUES, + ImmutableList.of()); + } + @Test(expected = IllegalArgumentException.class) public void recordDoubleHistogramMismatchedOptionalLabelValues() { when(mockSink.getMeasuresSize()).thenReturn(4); diff --git a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java index 6643a73b23a..fba66e2e8d7 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java @@ -305,7 +305,7 @@ private List withAdditionalAttributes( private ClusterLocality createClusterLocalityFromAttributes(Attributes addressAttributes) { Locality locality = addressAttributes.get(XdsAttributes.ATTR_LOCALITY); - String localityName = addressAttributes.get(LoadBalancer.ATTR_LOCALITY_NAME); + String localityName = addressAttributes.get(EquivalentAddressGroup.ATTR_LOCALITY_NAME); // Endpoint addresses resolved by ClusterResolverLoadBalancer should always contain // attributes with its locality, including endpoints in LOGICAL_DNS clusters. diff --git a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java index 8cee0483cbb..7a1e9a36603 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java @@ -428,7 +428,7 @@ public void run() { Attributes attr = endpoint.eag().getAttributes().toBuilder() .set(XdsAttributes.ATTR_LOCALITY, locality) - .set(LoadBalancer.ATTR_LOCALITY_NAME, localityName) + .set(EquivalentAddressGroup.ATTR_LOCALITY_NAME, localityName) .set(XdsAttributes.ATTR_LOCALITY_WEIGHT, localityLbInfo.localityWeight()) .set(XdsAttributes.ATTR_SERVER_WEIGHT, weight) @@ -679,7 +679,7 @@ public Status onResult2(final ResolutionResult resolutionResult) { String localityName = localityName(LOGICAL_DNS_CLUSTER_LOCALITY); Attributes attr = eag.getAttributes().toBuilder() .set(XdsAttributes.ATTR_LOCALITY, LOGICAL_DNS_CLUSTER_LOCALITY) - .set(LoadBalancer.ATTR_LOCALITY_NAME, localityName) + .set(EquivalentAddressGroup.ATTR_LOCALITY_NAME, localityName) .set(XdsAttributes.ATTR_ADDRESS_NAME, dnsHostName) .build(); eag = new EquivalentAddressGroup(eag.getAddresses(), attr); diff --git a/xds/src/main/java/io/grpc/xds/WrrLocalityLoadBalancer.java b/xds/src/main/java/io/grpc/xds/WrrLocalityLoadBalancer.java index c10aecc5107..1a12412f923 100644 --- a/xds/src/main/java/io/grpc/xds/WrrLocalityLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/WrrLocalityLoadBalancer.java @@ -74,7 +74,7 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { Map localityWeights = new HashMap<>(); for (EquivalentAddressGroup eag : resolvedAddresses.getAddresses()) { Attributes eagAttrs = eag.getAttributes(); - String locality = eagAttrs.get(LoadBalancer.ATTR_LOCALITY_NAME); + String locality = eagAttrs.get(EquivalentAddressGroup.ATTR_LOCALITY_NAME); Integer localityWeight = eagAttrs.get(XdsAttributes.ATTR_LOCALITY_WEIGHT); if (locality == null) { diff --git a/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java index eef8087cee2..c5e3f80f170 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java @@ -1017,7 +1017,7 @@ public String toString() { Attributes.Builder attributes = Attributes.newBuilder() .set(XdsAttributes.ATTR_LOCALITY, locality) // Unique but arbitrary string - .set(LoadBalancer.ATTR_LOCALITY_NAME, locality.toString()); + .set(EquivalentAddressGroup.ATTR_LOCALITY_NAME, locality.toString()); if (authorityHostname != null) { attributes.set(XdsAttributes.ATTR_ADDRESS_NAME, authorityHostname); } diff --git a/xds/src/test/java/io/grpc/xds/WrrLocalityLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/WrrLocalityLoadBalancerTest.java index 89528161225..584c32738c5 100644 --- a/xds/src/test/java/io/grpc/xds/WrrLocalityLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/WrrLocalityLoadBalancerTest.java @@ -254,7 +254,7 @@ public String toString() { } Attributes.Builder attrBuilder = Attributes.newBuilder() - .set(LoadBalancer.ATTR_LOCALITY_NAME, locality); + .set(EquivalentAddressGroup.ATTR_LOCALITY_NAME, locality); if (localityWeight != null) { attrBuilder.set(XdsAttributes.ATTR_LOCALITY_WEIGHT, localityWeight); }