Skip to content

Commit 1647610

Browse files
KAFKA-19882: Removing process id from default client level tags (#20939)
## Summary As a follow-up to #20906, also removing the process-id tag from 4.1. This is because when working on KIP-1091, we mistakenly applied the process-id tag to all client-level metrics, rather than just the client-state, thread-state, and recording-level metrics as specified in the KIP. ## Tests Unit tests in `ClientMetricsTest.java` and `StreamsMetricsImplTest.java` Reviewers: Matthias Sax <[email protected]>, Bill Bejeck<[email protected]>
1 parent c4e83aa commit 1647610

37 files changed

+273
-167
lines changed

streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -980,7 +980,6 @@ private KafkaStreams(final TopologyMetadata topologyMetadata,
980980
streamsMetrics = new StreamsMetricsImpl(
981981
metrics,
982982
clientId,
983-
processId.toString(),
984983
time
985984
);
986985

@@ -989,8 +988,8 @@ private KafkaStreams(final TopologyMetadata topologyMetadata,
989988
ClientMetrics.addApplicationIdMetric(streamsMetrics, applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG));
990989
ClientMetrics.addTopologyDescriptionMetric(streamsMetrics, (metricsConfig, now) -> this.topologyMetadata.topologyDescriptionString());
991990
ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) -> state.name());
992-
ClientMetrics.addClientStateTelemetryMetric(streamsMetrics, (metricsConfig, now) -> state.ordinal());
993-
ClientMetrics.addClientRecordingLevelMetric(streamsMetrics, calculateMetricsRecordingLevel());
991+
ClientMetrics.addClientStateTelemetryMetric(processId.toString(), streamsMetrics, (metricsConfig, now) -> state.ordinal());
992+
ClientMetrics.addClientRecordingLevelMetric(processId.toString(), streamsMetrics, calculateMetricsRecordingLevel());
994993
threads = Collections.synchronizedList(new LinkedList<>());
995994
ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) -> numLiveStreamThreads());
996995

streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@
2525
import org.slf4j.LoggerFactory;
2626

2727
import java.io.InputStream;
28+
import java.util.Collections;
2829
import java.util.Properties;
2930

3031
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.CLIENT_LEVEL_GROUP;
32+
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESS_ID_TAG;
3133
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addSumMetricToSensor;
3234

3335
public class ClientMetrics {
@@ -126,21 +128,25 @@ public static void addStateMetric(final StreamsMetricsImpl streamsMetrics,
126128
);
127129
}
128130

129-
public static void addClientStateTelemetryMetric(final StreamsMetricsImpl streamsMetrics,
131+
public static void addClientStateTelemetryMetric(final String processId,
132+
final StreamsMetricsImpl streamsMetrics,
130133
final Gauge<Integer> stateProvider) {
131134
streamsMetrics.addClientLevelMutableMetric(
132135
CLIENT_STATE,
133136
STATE_DESCRIPTION,
137+
Collections.singletonMap(PROCESS_ID_TAG, processId),
134138
RecordingLevel.INFO,
135139
stateProvider
136140
);
137141
}
138142

139-
public static void addClientRecordingLevelMetric(final StreamsMetricsImpl streamsMetrics,
143+
public static void addClientRecordingLevelMetric(final String processId,
144+
final StreamsMetricsImpl streamsMetrics,
140145
final int recordingLevel) {
141146
streamsMetrics.addClientLevelImmutableMetric(
142147
RECORDING_LEVEL,
143148
RECORDING_LEVEL_DESCRIPTION,
149+
Collections.singletonMap(PROCESS_ID_TAG, processId),
144150
RecordingLevel.INFO,
145151
recordingLevel
146152
);

streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ public int hashCode() {
8989
private final Metrics metrics;
9090
private final Map<Sensor, Sensor> parentSensors;
9191
private final String clientId;
92-
private final String processId;
9392

9493
private final Version version;
9594
private final Deque<MetricName> clientLevelMetrics = new LinkedList<>();
@@ -167,12 +166,10 @@ public int hashCode() {
167166

168167
public StreamsMetricsImpl(final Metrics metrics,
169168
final String clientId,
170-
final String processId,
171169
final Time time) {
172170
Objects.requireNonNull(metrics, "Metrics cannot be null");
173171
this.metrics = metrics;
174172
this.clientId = clientId;
175-
this.processId = processId;
176173
version = Version.LATEST;
177174
rocksDBMetricsRecordingTrigger = new RocksDBMetricsRecordingTrigger(time);
178175

@@ -195,7 +192,20 @@ public <T> void addClientLevelImmutableMetric(final String name,
195192
final String description,
196193
final RecordingLevel recordingLevel,
197194
final T value) {
198-
final MetricName metricName = metrics.metricName(name, CLIENT_LEVEL_GROUP, description, clientLevelTagMap());
195+
addClientLevelImmutableMetric(name, description, Collections.emptyMap(), recordingLevel, value);
196+
}
197+
198+
public <T> void addClientLevelImmutableMetric(final String name,
199+
final String description,
200+
final Map<String, String> additionalTags,
201+
final RecordingLevel recordingLevel,
202+
final T value) {
203+
final MetricName metricName = metrics.metricName(
204+
name,
205+
CLIENT_LEVEL_GROUP,
206+
description,
207+
clientLevelTagMap(additionalTags)
208+
);
199209
final MetricConfig metricConfig = new MetricConfig().recordLevel(recordingLevel);
200210
synchronized (clientLevelMetrics) {
201211
metrics.addMetric(metricName, metricConfig, new ImmutableMetricValue<>(value));
@@ -207,7 +217,20 @@ public <T> void addClientLevelMutableMetric(final String name,
207217
final String description,
208218
final RecordingLevel recordingLevel,
209219
final Gauge<T> valueProvider) {
210-
final MetricName metricName = metrics.metricName(name, CLIENT_LEVEL_GROUP, description, clientLevelTagMap());
220+
addClientLevelMutableMetric(name, description, Collections.emptyMap(), recordingLevel, valueProvider);
221+
}
222+
223+
public <T> void addClientLevelMutableMetric(final String name,
224+
final String description,
225+
final Map<String, String> additionalTags,
226+
final RecordingLevel recordingLevel,
227+
final Gauge<T> valueProvider) {
228+
final MetricName metricName = metrics.metricName(
229+
name,
230+
CLIENT_LEVEL_GROUP,
231+
description,
232+
clientLevelTagMap(additionalTags)
233+
);
211234
final MetricConfig metricConfig = new MetricConfig().recordLevel(recordingLevel);
212235
synchronized (clientLevelMetrics) {
213236
metrics.addMetric(metricName, metricConfig, valueProvider);
@@ -281,9 +304,12 @@ private String threadSensorPrefix(final String threadId) {
281304
}
282305

283306
public Map<String, String> clientLevelTagMap() {
284-
final Map<String, String> tagMap = new LinkedHashMap<>();
307+
return clientLevelTagMap(Collections.emptyMap());
308+
}
309+
310+
public Map<String, String> clientLevelTagMap(final Map<String, String> additionalTags) {
311+
final Map<String, String> tagMap = new LinkedHashMap<>(additionalTags);
285312
tagMap.put(CLIENT_ID_TAG, clientId);
286-
tagMap.put(PROCESS_ID_TAG, processId);
287313
return tagMap;
288314
}
289315

streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737

3838
public class ClientMetricsTest {
3939
private static final String COMMIT_ID = "test-commit-ID";
40+
private static final String PROCESS_ID = "test-process-id";
4041
private static final String VERSION = "test-version";
4142

4243
private final StreamsMetricsImpl streamsMetrics = mock(StreamsMetricsImpl.class);
@@ -116,11 +117,15 @@ public void shouldAddClientStateTelemetryMetric() {
116117
final String name = "client-state";
117118
final String description = "The state of the Kafka Streams client";
118119
final Gauge<Integer> stateProvider = (config, now) -> State.RUNNING.ordinal();
119-
setUpAndVerifyMutableMetric(
120-
name,
121-
description,
122-
stateProvider,
123-
() -> ClientMetrics.addClientStateTelemetryMetric(streamsMetrics, stateProvider)
120+
121+
ClientMetrics.addClientStateTelemetryMetric(PROCESS_ID, streamsMetrics, stateProvider);
122+
123+
verify(streamsMetrics).addClientLevelMutableMetric(
124+
eq(name),
125+
eq(description),
126+
eq(Collections.singletonMap("process-id", PROCESS_ID)),
127+
eq(RecordingLevel.INFO),
128+
eq(stateProvider)
124129
);
125130
}
126131

@@ -129,11 +134,15 @@ public void shouldAddRecordingLevelMetric() {
129134
final String name = "recording-level";
130135
final String description = "The metrics recording level of the Kafka Streams client";
131136
final int recordingLevel = 1;
132-
setUpAndVerifyImmutableMetric(
133-
name,
134-
description,
135-
recordingLevel,
136-
() -> ClientMetrics.addClientRecordingLevelMetric(streamsMetrics, recordingLevel)
137+
138+
ClientMetrics.addClientRecordingLevelMetric(PROCESS_ID, streamsMetrics, recordingLevel);
139+
140+
verify(streamsMetrics).addClientLevelImmutableMetric(
141+
eq(name),
142+
eq(description),
143+
eq(Collections.singletonMap("process-id", PROCESS_ID)),
144+
eq(RecordingLevel.INFO),
145+
eq(recordingLevel)
137146
);
138147
}
139148

streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
8080

8181
private final MockTime time = new MockTime();
8282
private final Metrics metrics = new Metrics();
83-
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test", "processId", time);
83+
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test", time);
8484
private final String threadId = Thread.currentThread().getName();
8585
private final Initializer<Long> initializer = () -> 0L;
8686
private final Aggregator<String, String, Long> aggregator = (aggKey, value, aggregate) -> aggregate + 1;

streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public class ActiveTaskCreatorTest {
7272
private ChangelogReader changeLogReader;
7373

7474
private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
75-
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "clientId", "processId", new MockTime());
75+
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "clientId", new MockTime());
7676
private final Map<String, Object> properties = mkMap(
7777
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
7878
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234")

streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ class DefaultStateUpdaterTest {
106106

107107
// need an auto-tick timer to work for draining with timeout
108108
private final Time time = new MockTime(1L);
109-
private final StreamsMetricsImpl metrics = new StreamsMetricsImpl(new Metrics(time), "", "", time);
109+
private final StreamsMetricsImpl metrics = new StreamsMetricsImpl(new Metrics(time), "", time);
110110
private final StreamsConfig config = new StreamsConfig(configProps(COMMIT_INTERVAL));
111111
private final ChangelogReader changelogReader = mock(ChangelogReader.class);
112112
private final TopologyMetadata topologyMetadata = unnamedTopology().build();

streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ public void process(final Record<Object, Object> record) {
132132
mockConsumer,
133133
new StateDirectory(config, time, true, false),
134134
0,
135-
new StreamsMetricsImpl(new Metrics(), "test-client", "processId", time),
135+
new StreamsMetricsImpl(new Metrics(), "test-client", time),
136136
time,
137137
"clientId",
138138
stateRestoreListener,
@@ -171,7 +171,7 @@ public List<PartitionInfo> partitionsFor(final String topic) {
171171
mockConsumer,
172172
new StateDirectory(config, time, true, false),
173173
0,
174-
new StreamsMetricsImpl(new Metrics(), "test-client", "processId", time),
174+
new StreamsMetricsImpl(new Metrics(), "test-client", time),
175175
time,
176176
"clientId",
177177
stateRestoreListener,
@@ -420,7 +420,7 @@ public List<PartitionInfo> partitionsFor(final String topic) {
420420
consumer,
421421
new StateDirectory(config, time, true, false),
422422
0,
423-
new StreamsMetricsImpl(new Metrics(), "test-client", "processId", time),
423+
new StreamsMetricsImpl(new Metrics(), "test-client", time),
424424
time,
425425
"clientId",
426426
stateRestoreListener,

streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,6 @@
2323
public class MockStreamsMetrics extends StreamsMetricsImpl {
2424

2525
public MockStreamsMetrics(final Metrics metrics) {
26-
super(metrics, "test", "processId", new MockTime());
26+
super(metrics, "test", new MockTime());
2727
}
2828
}

streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ public Set<TopicPartition> partitions() {
219219
public void testMetricsWithBuiltInMetricsVersionLatest() {
220220
final Metrics metrics = new Metrics();
221221
final StreamsMetricsImpl streamsMetrics =
222-
new StreamsMetricsImpl(metrics, "test-client", "processId", new MockTime());
222+
new StreamsMetricsImpl(metrics, "test-client", new MockTime());
223223
final InternalMockProcessorContext<Object, Object> context = new InternalMockProcessorContext<>(streamsMetrics);
224224
final ProcessorNode<Object, Object, Object, Object> node =
225225
new ProcessorNode<>(NAME, new NoOpProcessor(), Collections.emptySet());
@@ -303,7 +303,7 @@ public void process(final Record<Object, Object> record) {
303303
public void testTopologyLevelClassCastExceptionDirect() {
304304
final Metrics metrics = new Metrics();
305305
final StreamsMetricsImpl streamsMetrics =
306-
new StreamsMetricsImpl(metrics, "test-client", "processId", new MockTime());
306+
new StreamsMetricsImpl(metrics, "test-client", new MockTime());
307307
final InternalMockProcessorContext<Object, Object> context = new InternalMockProcessorContext<>(streamsMetrics);
308308
final ProcessorNode<Object, Object, Object, Object> node =
309309
new ProcessorNode<>("pname", new ClassCastProcessor(), Collections.emptySet());
@@ -323,7 +323,7 @@ private InternalProcessorContext<Object, Object> mockInternalProcessorContext()
323323
final InternalProcessorContext<Object, Object> internalProcessorContext = mock(InternalProcessorContext.class, withSettings().strictness(Strictness.LENIENT));
324324

325325
when(internalProcessorContext.taskId()).thenReturn(TASK_ID);
326-
when(internalProcessorContext.metrics()).thenReturn(new StreamsMetricsImpl(new Metrics(), "test-client", "processId", new MockTime()));
326+
when(internalProcessorContext.metrics()).thenReturn(new StreamsMetricsImpl(new Metrics(), "test-client", new MockTime()));
327327
when(internalProcessorContext.topic()).thenReturn(TOPIC);
328328
when(internalProcessorContext.partition()).thenReturn(PARTITION);
329329
when(internalProcessorContext.offset()).thenReturn(OFFSET);

0 commit comments

Comments
 (0)