Skip to content

Commit bb95e3a

Browse files
KAFKA-19882: Cleaning client level metric tags (#20906)
## Summary When working on [KIP-1091](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1091%3A+Improved+Kafka+Streams+operator+metrics), we mistakenly applied the `process-id` tag to all client-level metrics, rather than just the `client-state`, `thread-state`, and `recording-level` metrics as specified in the KIP. This issue came to light while working on KIP-1221, which aimed to add the `application-id` as a tag to the `client-state` metric introduced by KIP-1091. This PR removes these tags from all metrics by default, and adds them to only the `client-state` (application-id + process-id) and the `recording-level` (process-id only) Reviewers: Matthias Sax<[email protected]> Bill Bejeck<[email protected]> ## Tests Unit tests in `ClientMetricsTest.java` and `StreamsMetricsImplTest.java`
1 parent c1ded05 commit bb95e3a

File tree

38 files changed

+292
-233
lines changed

38 files changed

+292
-233
lines changed

streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -982,8 +982,6 @@ private KafkaStreams(final TopologyMetadata topologyMetadata,
982982
streamsMetrics = new StreamsMetricsImpl(
983983
metrics,
984984
clientId,
985-
processId.toString(),
986-
applicationId,
987985
time
988986
);
989987

@@ -992,8 +990,8 @@ private KafkaStreams(final TopologyMetadata topologyMetadata,
992990
ClientMetrics.addApplicationIdMetric(streamsMetrics, applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG));
993991
ClientMetrics.addTopologyDescriptionMetric(streamsMetrics, (metricsConfig, now) -> this.topologyMetadata.topologyDescriptionString());
994992
ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) -> state.name());
995-
ClientMetrics.addClientStateTelemetryMetric(streamsMetrics, (metricsConfig, now) -> state.ordinal());
996-
ClientMetrics.addClientRecordingLevelMetric(streamsMetrics, calculateMetricsRecordingLevel());
993+
ClientMetrics.addClientStateTelemetryMetric(processId.toString(), applicationId, streamsMetrics, (metricsConfig, now) -> state.ordinal());
994+
ClientMetrics.addClientRecordingLevelMetric(processId.toString(), streamsMetrics, calculateMetricsRecordingLevel());
997995
threads = Collections.synchronizedList(new LinkedList<>());
998996
ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) -> numLiveStreamThreads());
999997

streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,14 @@
2525
import org.slf4j.LoggerFactory;
2626

2727
import java.io.InputStream;
28+
import java.util.Collections;
29+
import java.util.LinkedHashMap;
30+
import java.util.Map;
2831
import java.util.Properties;
2932

33+
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.APPLICATION_ID_TAG;
3034
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.CLIENT_LEVEL_GROUP;
35+
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESS_ID_TAG;
3136
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addSumMetricToSensor;
3237

3338
public class ClientMetrics {
@@ -126,21 +131,30 @@ public static void addStateMetric(final StreamsMetricsImpl streamsMetrics,
126131
);
127132
}
128133

129-
public static void addClientStateTelemetryMetric(final StreamsMetricsImpl streamsMetrics,
134+
public static void addClientStateTelemetryMetric(final String processId,
135+
final String applicationId,
136+
final StreamsMetricsImpl streamsMetrics,
130137
final Gauge<Integer> stateProvider) {
138+
final Map<String, String> additionalTags = new LinkedHashMap<>();
139+
additionalTags.put(PROCESS_ID_TAG, processId);
140+
additionalTags.put(APPLICATION_ID_TAG, applicationId);
141+
131142
streamsMetrics.addClientLevelMutableMetric(
132143
CLIENT_STATE,
133144
STATE_DESCRIPTION,
145+
additionalTags,
134146
RecordingLevel.INFO,
135147
stateProvider
136148
);
137149
}
138150

139-
public static void addClientRecordingLevelMetric(final StreamsMetricsImpl streamsMetrics,
151+
public static void addClientRecordingLevelMetric(final String processId,
152+
final StreamsMetricsImpl streamsMetrics,
140153
final int recordingLevel) {
141154
streamsMetrics.addClientLevelImmutableMetric(
142155
RECORDING_LEVEL,
143156
RECORDING_LEVEL_DESCRIPTION,
157+
Collections.singletonMap(PROCESS_ID_TAG, processId),
144158
RecordingLevel.INFO,
145159
recordingLevel
146160
);

streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,6 @@ public int hashCode() {
8989
private final Metrics metrics;
9090
private final Map<Sensor, Sensor> parentSensors;
9191
private final String clientId;
92-
private final String processId;
93-
private final String applicationId;
9492

9593
private final Version version;
9694
private final Deque<MetricName> clientLevelMetrics = new LinkedList<>();
@@ -169,14 +167,10 @@ public int hashCode() {
169167

170168
public StreamsMetricsImpl(final Metrics metrics,
171169
final String clientId,
172-
final String processId,
173-
final String applicationId,
174170
final Time time) {
175171
Objects.requireNonNull(metrics, "Metrics cannot be null");
176172
this.metrics = metrics;
177173
this.clientId = clientId;
178-
this.processId = processId;
179-
this.applicationId = applicationId;
180174
version = Version.LATEST;
181175
rocksDBMetricsRecordingTrigger = new RocksDBMetricsRecordingTrigger(time);
182176

@@ -199,7 +193,20 @@ public <T> void addClientLevelImmutableMetric(final String name,
199193
final String description,
200194
final RecordingLevel recordingLevel,
201195
final T value) {
202-
final MetricName metricName = metrics.metricName(name, CLIENT_LEVEL_GROUP, description, clientLevelTagMap());
196+
addClientLevelImmutableMetric(name, description, Collections.emptyMap(), recordingLevel, value);
197+
}
198+
199+
public <T> void addClientLevelImmutableMetric(final String name,
200+
final String description,
201+
final Map<String, String> additionalTags,
202+
final RecordingLevel recordingLevel,
203+
final T value) {
204+
final MetricName metricName = metrics.metricName(
205+
name,
206+
CLIENT_LEVEL_GROUP,
207+
description,
208+
clientLevelTagMap(additionalTags)
209+
);
203210
final MetricConfig metricConfig = new MetricConfig().recordLevel(recordingLevel);
204211
synchronized (clientLevelMetrics) {
205212
metrics.addMetric(metricName, metricConfig, new ImmutableMetricValue<>(value));
@@ -211,7 +218,20 @@ public <T> void addClientLevelMutableMetric(final String name,
211218
final String description,
212219
final RecordingLevel recordingLevel,
213220
final Gauge<T> valueProvider) {
214-
final MetricName metricName = metrics.metricName(name, CLIENT_LEVEL_GROUP, description, clientLevelTagMap());
221+
addClientLevelMutableMetric(name, description, Collections.emptyMap(), recordingLevel, valueProvider);
222+
}
223+
224+
public <T> void addClientLevelMutableMetric(final String name,
225+
final String description,
226+
final Map<String, String> additionalTags,
227+
final RecordingLevel recordingLevel,
228+
final Gauge<T> valueProvider) {
229+
final MetricName metricName = metrics.metricName(
230+
name,
231+
CLIENT_LEVEL_GROUP,
232+
description,
233+
clientLevelTagMap(additionalTags)
234+
);
215235
final MetricConfig metricConfig = new MetricConfig().recordLevel(recordingLevel);
216236
synchronized (clientLevelMetrics) {
217237
metrics.addMetric(metricName, metricConfig, valueProvider);
@@ -285,10 +305,12 @@ private String threadSensorPrefix(final String threadId) {
285305
}
286306

287307
public Map<String, String> clientLevelTagMap() {
288-
final Map<String, String> tagMap = new LinkedHashMap<>();
308+
return clientLevelTagMap(Collections.emptyMap());
309+
}
310+
311+
public Map<String, String> clientLevelTagMap(final Map<String, String> additionalTags) {
312+
final Map<String, String> tagMap = new LinkedHashMap<>(additionalTags);
289313
tagMap.put(CLIENT_ID_TAG, clientId);
290-
tagMap.put(PROCESS_ID_TAG, processId);
291-
tagMap.put(APPLICATION_ID_TAG, applicationId);
292314
return tagMap;
293315
}
294316

streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.junit.jupiter.api.Test;
2626

2727
import java.util.Collections;
28+
import java.util.LinkedHashMap;
2829
import java.util.Map;
2930

3031
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.CLIENT_LEVEL_GROUP;
@@ -36,7 +37,9 @@
3637
import static org.mockito.Mockito.when;
3738

3839
public class ClientMetricsTest {
40+
private static final String APPLICATION_ID = "test-application-id";
3941
private static final String COMMIT_ID = "test-commit-ID";
42+
private static final String PROCESS_ID = "test-process-id";
4043
private static final String VERSION = "test-version";
4144

4245
private final StreamsMetricsImpl streamsMetrics = mock(StreamsMetricsImpl.class);
@@ -116,11 +119,19 @@ public void shouldAddClientStateTelemetryMetric() {
116119
final String name = "client-state";
117120
final String description = "The state of the Kafka Streams client";
118121
final Gauge<Integer> stateProvider = (config, now) -> State.RUNNING.ordinal();
119-
setUpAndVerifyMutableMetric(
120-
name,
121-
description,
122-
stateProvider,
123-
() -> ClientMetrics.addClientStateTelemetryMetric(streamsMetrics, stateProvider)
122+
123+
final Map<String, String> additionalTags = new LinkedHashMap<>();
124+
additionalTags.put("process-id", PROCESS_ID);
125+
additionalTags.put("application-id", APPLICATION_ID);
126+
127+
ClientMetrics.addClientStateTelemetryMetric(PROCESS_ID, APPLICATION_ID, streamsMetrics, stateProvider);
128+
129+
verify(streamsMetrics).addClientLevelMutableMetric(
130+
eq(name),
131+
eq(description),
132+
eq(additionalTags),
133+
eq(RecordingLevel.INFO),
134+
eq(stateProvider)
124135
);
125136
}
126137

@@ -129,11 +140,15 @@ public void shouldAddRecordingLevelMetric() {
129140
final String name = "recording-level";
130141
final String description = "The metrics recording level of the Kafka Streams client";
131142
final int recordingLevel = 1;
132-
setUpAndVerifyImmutableMetric(
133-
name,
134-
description,
135-
recordingLevel,
136-
() -> ClientMetrics.addClientRecordingLevelMetric(streamsMetrics, recordingLevel)
143+
144+
ClientMetrics.addClientRecordingLevelMetric(PROCESS_ID, streamsMetrics, recordingLevel);
145+
146+
verify(streamsMetrics).addClientLevelImmutableMetric(
147+
eq(name),
148+
eq(description),
149+
eq(Collections.singletonMap("process-id", PROCESS_ID)),
150+
eq(RecordingLevel.INFO),
151+
eq(recordingLevel)
137152
);
138153
}
139154

streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
8080

8181
private final MockTime time = new MockTime();
8282
private final Metrics metrics = new Metrics();
83-
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test", "processId", "applicationId", time);
83+
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test", time);
8484
private final String threadId = Thread.currentThread().getName();
8585
private final Initializer<Long> initializer = () -> 0L;
8686
private final Aggregator<String, String, Long> aggregator = (aggKey, value, aggregate) -> aggregate + 1;

streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public class ActiveTaskCreatorTest {
7272
private ChangelogReader changeLogReader;
7373

7474
private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
75-
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "clientId", "processId", "applicationId", new MockTime());
75+
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "clientId", new MockTime());
7676
private final Map<String, Object> properties = mkMap(
7777
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
7878
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234")

streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ class DefaultStateUpdaterTest {
106106

107107
// need an auto-tick timer to work for draining with timeout
108108
private final Time time = new MockTime(1L);
109-
private final StreamsMetricsImpl metrics = new StreamsMetricsImpl(new Metrics(time), "", "", "", time);
109+
private final StreamsMetricsImpl metrics = new StreamsMetricsImpl(new Metrics(time), "", time);
110110
private final StreamsConfig config = new StreamsConfig(configProps(COMMIT_INTERVAL));
111111
private final ChangelogReader changelogReader = mock(ChangelogReader.class);
112112
private final TopologyMetadata topologyMetadata = unnamedTopology().build();

streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ public void process(final Record<Object, Object> record) {
132132
mockConsumer,
133133
new StateDirectory(config, time, true, false),
134134
0,
135-
new StreamsMetricsImpl(new Metrics(), "test-client", "processId", "applicationId", time),
135+
new StreamsMetricsImpl(new Metrics(), "test-client", time),
136136
time,
137137
"clientId",
138138
stateRestoreListener,
@@ -169,7 +169,7 @@ public List<PartitionInfo> partitionsFor(final String topic) {
169169
mockConsumer,
170170
new StateDirectory(config, time, true, false),
171171
0,
172-
new StreamsMetricsImpl(new Metrics(), "test-client", "processId", "applicationId", time),
172+
new StreamsMetricsImpl(new Metrics(), "test-client", time),
173173
time,
174174
"clientId",
175175
stateRestoreListener,
@@ -418,7 +418,7 @@ public List<PartitionInfo> partitionsFor(final String topic) {
418418
consumer,
419419
new StateDirectory(config, time, true, false),
420420
0,
421-
new StreamsMetricsImpl(new Metrics(), "test-client", "processId", "applicationId", time),
421+
new StreamsMetricsImpl(new Metrics(), "test-client", time),
422422
time,
423423
"clientId",
424424
stateRestoreListener,

streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,6 @@
2323
public class MockStreamsMetrics extends StreamsMetricsImpl {
2424

2525
public MockStreamsMetrics(final Metrics metrics) {
26-
super(metrics, "test", "processId", "applicationId", new MockTime());
26+
super(metrics, "test", new MockTime());
2727
}
2828
}

streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ public Set<TopicPartition> partitions() {
279279
public void testMetricsWithBuiltInMetricsVersionLatest() {
280280
final Metrics metrics = new Metrics();
281281
final StreamsMetricsImpl streamsMetrics =
282-
new StreamsMetricsImpl(metrics, "test-client", "processId", "applicationId", new MockTime());
282+
new StreamsMetricsImpl(metrics, "test-client", new MockTime());
283283
final InternalMockProcessorContext<Object, Object> context = new InternalMockProcessorContext<>(streamsMetrics);
284284
final ProcessorNode<Object, Object, Object, Object> node =
285285
new ProcessorNode<>(NAME, new NoOpProcessor(), Collections.emptySet());
@@ -363,7 +363,7 @@ public void process(final Record<Object, Object> record) {
363363
public void testTopologyLevelClassCastExceptionDirect() {
364364
final Metrics metrics = new Metrics();
365365
final StreamsMetricsImpl streamsMetrics =
366-
new StreamsMetricsImpl(metrics, "test-client", "processId", "applicationId", new MockTime());
366+
new StreamsMetricsImpl(metrics, "test-client", new MockTime());
367367
final InternalMockProcessorContext<Object, Object> context = new InternalMockProcessorContext<>(streamsMetrics);
368368
final ProcessorNode<Object, Object, Object, Object> node =
369369
new ProcessorNode<>("pname", new ClassCastProcessor(), Collections.emptySet());
@@ -441,7 +441,7 @@ private InternalProcessorContext<Object, Object> mockInternalProcessorContext()
441441
final InternalProcessorContext<Object, Object> internalProcessorContext = mock(InternalProcessorContext.class, withSettings().strictness(Strictness.LENIENT));
442442

443443
when(internalProcessorContext.taskId()).thenReturn(TASK_ID);
444-
when(internalProcessorContext.metrics()).thenReturn(new StreamsMetricsImpl(new Metrics(), "test-client", "processId", "applicationId", new MockTime()));
444+
when(internalProcessorContext.metrics()).thenReturn(new StreamsMetricsImpl(new Metrics(), "test-client", new MockTime()));
445445
when(internalProcessorContext.topic()).thenReturn(TOPIC);
446446
when(internalProcessorContext.partition()).thenReturn(PARTITION);
447447
when(internalProcessorContext.offset()).thenReturn(OFFSET);

0 commit comments

Comments
 (0)