diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 40b6fad5a3aca..e37634fe8ddaf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -982,8 +982,6 @@ private KafkaStreams(final TopologyMetadata topologyMetadata, streamsMetrics = new StreamsMetricsImpl( metrics, clientId, - processId.toString(), - applicationId, time ); @@ -992,8 +990,8 @@ private KafkaStreams(final TopologyMetadata topologyMetadata, ClientMetrics.addApplicationIdMetric(streamsMetrics, applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG)); ClientMetrics.addTopologyDescriptionMetric(streamsMetrics, (metricsConfig, now) -> this.topologyMetadata.topologyDescriptionString()); ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) -> state.name()); - ClientMetrics.addClientStateTelemetryMetric(streamsMetrics, (metricsConfig, now) -> state.ordinal()); - ClientMetrics.addClientRecordingLevelMetric(streamsMetrics, calculateMetricsRecordingLevel()); + ClientMetrics.addClientStateTelemetryMetric(processId.toString(), applicationId, streamsMetrics, (metricsConfig, now) -> state.ordinal()); + ClientMetrics.addClientRecordingLevelMetric(processId.toString(), streamsMetrics, calculateMetricsRecordingLevel()); threads = Collections.synchronizedList(new LinkedList<>()); ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) -> numLiveStreamThreads()); diff --git a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java index 21bac269d5a14..597c20457f29c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java +++ b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java @@ -25,9 +25,14 @@ import org.slf4j.LoggerFactory; import java.io.InputStream; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; import java.util.Properties; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.APPLICATION_ID_TAG; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.CLIENT_LEVEL_GROUP; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESS_ID_TAG; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addSumMetricToSensor; public class ClientMetrics { @@ -126,21 +131,30 @@ public static void addStateMetric(final StreamsMetricsImpl streamsMetrics, ); } - public static void addClientStateTelemetryMetric(final StreamsMetricsImpl streamsMetrics, + public static void addClientStateTelemetryMetric(final String processId, + final String applicationId, + final StreamsMetricsImpl streamsMetrics, final Gauge stateProvider) { + final Map additionalTags = new LinkedHashMap<>(); + additionalTags.put(PROCESS_ID_TAG, processId); + additionalTags.put(APPLICATION_ID_TAG, applicationId); + streamsMetrics.addClientLevelMutableMetric( CLIENT_STATE, STATE_DESCRIPTION, + additionalTags, RecordingLevel.INFO, stateProvider ); } - public static void addClientRecordingLevelMetric(final StreamsMetricsImpl streamsMetrics, + public static void addClientRecordingLevelMetric(final String processId, + final StreamsMetricsImpl streamsMetrics, final int recordingLevel) { streamsMetrics.addClientLevelImmutableMetric( RECORDING_LEVEL, RECORDING_LEVEL_DESCRIPTION, + Collections.singletonMap(PROCESS_ID_TAG, processId), RecordingLevel.INFO, recordingLevel ); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java index c56f079b0644d..641fdf9470e21 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java @@ -89,8 +89,6 @@ public int hashCode() { private final Metrics metrics; private final Map parentSensors; private final String clientId; - private final String processId; - private final String applicationId; private final Version version; private final Deque clientLevelMetrics = new LinkedList<>(); @@ -169,14 +167,10 @@ public int hashCode() { public StreamsMetricsImpl(final Metrics metrics, final String clientId, - final String processId, - final String applicationId, final Time time) { Objects.requireNonNull(metrics, "Metrics cannot be null"); this.metrics = metrics; this.clientId = clientId; - this.processId = processId; - this.applicationId = applicationId; version = Version.LATEST; rocksDBMetricsRecordingTrigger = new RocksDBMetricsRecordingTrigger(time); @@ -199,7 +193,20 @@ public void addClientLevelImmutableMetric(final String name, final String description, final RecordingLevel recordingLevel, final T value) { - final MetricName metricName = metrics.metricName(name, CLIENT_LEVEL_GROUP, description, clientLevelTagMap()); + addClientLevelImmutableMetric(name, description, Collections.emptyMap(), recordingLevel, value); + } + + public void addClientLevelImmutableMetric(final String name, + final String description, + final Map additionalTags, + final RecordingLevel recordingLevel, + final T value) { + final MetricName metricName = metrics.metricName( + name, + CLIENT_LEVEL_GROUP, + description, + clientLevelTagMap(additionalTags) + ); final MetricConfig metricConfig = new MetricConfig().recordLevel(recordingLevel); synchronized (clientLevelMetrics) { metrics.addMetric(metricName, metricConfig, new ImmutableMetricValue<>(value)); @@ -211,7 +218,20 @@ public void addClientLevelMutableMetric(final String name, final String description, final RecordingLevel recordingLevel, final Gauge valueProvider) { - final MetricName metricName = metrics.metricName(name, CLIENT_LEVEL_GROUP, description, clientLevelTagMap()); + addClientLevelMutableMetric(name, description, Collections.emptyMap(), recordingLevel, valueProvider); + } + + public void addClientLevelMutableMetric(final String name, + final String description, + final Map additionalTags, + final RecordingLevel recordingLevel, + final Gauge valueProvider) { + final MetricName metricName = metrics.metricName( + name, + CLIENT_LEVEL_GROUP, + description, + clientLevelTagMap(additionalTags) + ); final MetricConfig metricConfig = new MetricConfig().recordLevel(recordingLevel); synchronized (clientLevelMetrics) { metrics.addMetric(metricName, metricConfig, valueProvider); @@ -285,10 +305,12 @@ private String threadSensorPrefix(final String threadId) { } public Map clientLevelTagMap() { - final Map tagMap = new LinkedHashMap<>(); + return clientLevelTagMap(Collections.emptyMap()); + } + + public Map clientLevelTagMap(final Map additionalTags) { + final Map tagMap = new LinkedHashMap<>(additionalTags); tagMap.put(CLIENT_ID_TAG, clientId); - tagMap.put(PROCESS_ID_TAG, processId); - tagMap.put(APPLICATION_ID_TAG, applicationId); return tagMap; } diff --git a/streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java index 9142835b92c19..94a100d3533c3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java @@ -25,6 +25,7 @@ import org.junit.jupiter.api.Test; import java.util.Collections; +import java.util.LinkedHashMap; import java.util.Map; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.CLIENT_LEVEL_GROUP; @@ -36,7 +37,9 @@ import static org.mockito.Mockito.when; public class ClientMetricsTest { + private static final String APPLICATION_ID = "test-application-id"; private static final String COMMIT_ID = "test-commit-ID"; + private static final String PROCESS_ID = "test-process-id"; private static final String VERSION = "test-version"; private final StreamsMetricsImpl streamsMetrics = mock(StreamsMetricsImpl.class); @@ -116,11 +119,19 @@ public void shouldAddClientStateTelemetryMetric() { final String name = "client-state"; final String description = "The state of the Kafka Streams client"; final Gauge stateProvider = (config, now) -> State.RUNNING.ordinal(); - setUpAndVerifyMutableMetric( - name, - description, - stateProvider, - () -> ClientMetrics.addClientStateTelemetryMetric(streamsMetrics, stateProvider) + + final Map additionalTags = new LinkedHashMap<>(); + additionalTags.put("process-id", PROCESS_ID); + additionalTags.put("application-id", APPLICATION_ID); + + ClientMetrics.addClientStateTelemetryMetric(PROCESS_ID, APPLICATION_ID, streamsMetrics, stateProvider); + + verify(streamsMetrics).addClientLevelMutableMetric( + eq(name), + eq(description), + eq(additionalTags), + eq(RecordingLevel.INFO), + eq(stateProvider) ); } @@ -129,11 +140,15 @@ public void shouldAddRecordingLevelMetric() { final String name = "recording-level"; final String description = "The metrics recording level of the Kafka Streams client"; final int recordingLevel = 1; - setUpAndVerifyImmutableMetric( - name, - description, - recordingLevel, - () -> ClientMetrics.addClientRecordingLevelMetric(streamsMetrics, recordingLevel) + + ClientMetrics.addClientRecordingLevelMetric(PROCESS_ID, streamsMetrics, recordingLevel); + + verify(streamsMetrics).addClientLevelImmutableMetric( + eq(name), + eq(description), + eq(Collections.singletonMap("process-id", PROCESS_ID)), + eq(RecordingLevel.INFO), + eq(recordingLevel) ); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java index c2aaf7f7373e7..f6658997b7bf9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java @@ -80,7 +80,7 @@ public class KStreamSessionWindowAggregateProcessorTest { private final MockTime time = new MockTime(); private final Metrics metrics = new Metrics(); - private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test", "processId", "applicationId", time); + private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test", time); private final String threadId = Thread.currentThread().getName(); private final Initializer initializer = () -> 0L; private final Aggregator aggregator = (aggKey, value, aggregate) -> aggregate + 1; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java index 2d8375939d6c1..96597e635232f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java @@ -72,7 +72,7 @@ public class ActiveTaskCreatorTest { private ChangelogReader changeLogReader; private final MockClientSupplier mockClientSupplier = new MockClientSupplier(); - private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "clientId", "processId", "applicationId", new MockTime()); + private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "clientId", new MockTime()); private final Map properties = mkMap( mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"), mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234") diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java index cd10f2a77039a..8f767af93e01b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java @@ -106,7 +106,7 @@ class DefaultStateUpdaterTest { // need an auto-tick timer to work for draining with timeout private final Time time = new MockTime(1L); - private final StreamsMetricsImpl metrics = new StreamsMetricsImpl(new Metrics(time), "", "", "", time); + private final StreamsMetricsImpl metrics = new StreamsMetricsImpl(new Metrics(time), "", time); private final StreamsConfig config = new StreamsConfig(configProps(COMMIT_INTERVAL)); private final ChangelogReader changelogReader = mock(ChangelogReader.class); private final TopologyMetadata topologyMetadata = unnamedTopology().build(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java index 6ec219a8cd917..7c0216cd07f58 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java @@ -132,7 +132,7 @@ public void process(final Record record) { mockConsumer, new StateDirectory(config, time, true, false), 0, - new StreamsMetricsImpl(new Metrics(), "test-client", "processId", "applicationId", time), + new StreamsMetricsImpl(new Metrics(), "test-client", time), time, "clientId", stateRestoreListener, @@ -169,7 +169,7 @@ public List partitionsFor(final String topic) { mockConsumer, new StateDirectory(config, time, true, false), 0, - new StreamsMetricsImpl(new Metrics(), "test-client", "processId", "applicationId", time), + new StreamsMetricsImpl(new Metrics(), "test-client", time), time, "clientId", stateRestoreListener, @@ -418,7 +418,7 @@ public List partitionsFor(final String topic) { consumer, new StateDirectory(config, time, true, false), 0, - new StreamsMetricsImpl(new Metrics(), "test-client", "processId", "applicationId", time), + new StreamsMetricsImpl(new Metrics(), "test-client", time), time, "clientId", stateRestoreListener, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java index 54238d8bd6b61..4ed68ef81509f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java @@ -23,6 +23,6 @@ public class MockStreamsMetrics extends StreamsMetricsImpl { public MockStreamsMetrics(final Metrics metrics) { - super(metrics, "test", "processId", "applicationId", new MockTime()); + super(metrics, "test", new MockTime()); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java index ccd47e60d9f63..565d6dcf8f9e9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java @@ -279,7 +279,7 @@ public Set partitions() { public void testMetricsWithBuiltInMetricsVersionLatest() { final Metrics metrics = new Metrics(); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "test-client", "processId", "applicationId", new MockTime()); + new StreamsMetricsImpl(metrics, "test-client", new MockTime()); final InternalMockProcessorContext context = new InternalMockProcessorContext<>(streamsMetrics); final ProcessorNode node = new ProcessorNode<>(NAME, new NoOpProcessor(), Collections.emptySet()); @@ -363,7 +363,7 @@ public void process(final Record record) { public void testTopologyLevelClassCastExceptionDirect() { final Metrics metrics = new Metrics(); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "test-client", "processId", "applicationId", new MockTime()); + new StreamsMetricsImpl(metrics, "test-client", new MockTime()); final InternalMockProcessorContext context = new InternalMockProcessorContext<>(streamsMetrics); final ProcessorNode node = new ProcessorNode<>("pname", new ClassCastProcessor(), Collections.emptySet()); @@ -441,7 +441,7 @@ private InternalProcessorContext mockInternalProcessorContext() final InternalProcessorContext internalProcessorContext = mock(InternalProcessorContext.class, withSettings().strictness(Strictness.LENIENT)); when(internalProcessorContext.taskId()).thenReturn(TASK_ID); - when(internalProcessorContext.metrics()).thenReturn(new StreamsMetricsImpl(new Metrics(), "test-client", "processId", "applicationId", new MockTime())); + when(internalProcessorContext.metrics()).thenReturn(new StreamsMetricsImpl(new Metrics(), "test-client", new MockTime())); when(internalProcessorContext.topic()).thenReturn(TOPIC); when(internalProcessorContext.partition()).thenReturn(PARTITION); when(internalProcessorContext.offset()).thenReturn(OFFSET); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java index 9cd4e5fbc63b3..1becc3d240d5d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java @@ -72,7 +72,7 @@ public class RecordQueueTest { private final Metrics metrics = new Metrics(); private final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "mock", "processId", "applicationId", new MockTime()); + new StreamsMetricsImpl(metrics, "mock", new MockTime()); final InternalMockProcessorContext context = new InternalMockProcessorContext<>( StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class), diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java index d10b2800dc9ae..817ffe42f842a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java @@ -97,7 +97,7 @@ public String deserialize(final String topic, final byte[] data) { public void shouldExposeProcessMetrics() { final Metrics metrics = new Metrics(); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "test-client", "processId", "applicationId", new MockTime()); + new StreamsMetricsImpl(metrics, "test-client", new MockTime()); final InternalMockProcessorContext context = new InternalMockProcessorContext<>(streamsMetrics); final SourceNode node = new SourceNode<>(context.currentNode().name(), new TheDeserializer(), new TheDeserializer()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index df7f83d05fe62..9680dd1af1df4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -113,7 +113,7 @@ public class StandbyTaskTest { private final MockTime time = new MockTime(); private final Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG), time); - private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, threadName, "processId", "applicationId", time); + private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, threadName, time); private File baseDir; private StreamsConfig config; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java index a303e827a38c2..616c397d7111b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java @@ -987,7 +987,7 @@ private StateStore initializeStartupTasks(final TaskId taskId, final boolean cre Mockito.when(metadata.buildSubtopology(ArgumentMatchers.any())).thenReturn(processorTopology); Mockito.when(metadata.taskConfig(ArgumentMatchers.any())).thenReturn(topologyConfig.getTaskConfig()); - directory.initializeStartupTasks(metadata, new StreamsMetricsImpl(new Metrics(), "test", "processId", "applicationId", time), new LogContext("test")); + directory.initializeStartupTasks(metadata, new StreamsMetricsImpl(new Metrics(), "test", time), new LogContext("test")); return store; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 9b8a3dc33357a..16a9ddeddf4e1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -2775,7 +2775,7 @@ public void shouldThrowTopologyExceptionIfTaskCreatedForUnknownTopic() { streamsMetrics, null ); - final StreamsMetricsImpl metrics = new StreamsMetricsImpl(this.metrics, "test", "processId", "applicationId", time); + final StreamsMetricsImpl metrics = new StreamsMetricsImpl(this.metrics, "test", time); // The processor topology is missing the topics final ProcessorTopology topology = withSources(emptyList(), mkMap()); @@ -3409,7 +3409,7 @@ private StreamTask createSingleSourceStateless(final StreamsConfig config) { topology, consumer, new TopologyConfig(null, config, new Properties()).getTaskConfig(), - new StreamsMetricsImpl(metrics, "test", "processId", "applicationId", time), + new StreamsMetricsImpl(metrics, "test", time), stateDirectory, cache, time, @@ -3446,7 +3446,7 @@ private StreamTask createStatelessTask(final StreamsConfig config) { topology, consumer, new TopologyConfig(null, config, new Properties()).getTaskConfig(), - new StreamsMetricsImpl(metrics, "test", "processId", "applicationId", time), + new StreamsMetricsImpl(metrics, "test", time), stateDirectory, cache, time, @@ -3486,7 +3486,7 @@ private StreamTask createStatelessTaskWithAnchoredPunctuation( topology, consumer, new TopologyConfig(null, config, new Properties()).getTaskConfig(), - new StreamsMetricsImpl(metrics, "test", "processId", "applicationId", time), + new StreamsMetricsImpl(metrics, "test", time), stateDirectory, cache, time, @@ -3522,7 +3522,7 @@ private StreamTask createStatelessTaskWithForwardingTopology(final SourceNode tasks) { } final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), APPLICATION_ID, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); final Properties props = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); final StreamsConfig config = new StreamsConfig(props); @@ -1947,8 +1941,6 @@ public void shouldReturnActiveTaskMetadataWhileRunningState(final boolean stateU final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl( metrics, APPLICATION_ID, - PROCESS_ID.toString(), - APPLICATION_ID, mockTime ); @@ -2709,7 +2701,7 @@ public void shouldCatchHandleCorruptionOnTaskCorruptedExceptionPath(final boolea when(taskManager.handleCorruption(corruptedTasks)).thenReturn(true); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), APPLICATION_ID, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = new StreamThread( @@ -2770,7 +2762,7 @@ public void shouldCatchTimeoutExceptionFromHandleCorruptionAndInvokeExceptionHan doThrow(new TimeoutException()).when(taskManager).handleCorruption(corruptedTasks); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), APPLICATION_ID, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = new StreamThread( @@ -2839,7 +2831,7 @@ public void shouldCatchTaskMigratedExceptionOnOnTaskCorruptedExceptionPath(final doNothing().when(taskManager).handleLostAll(); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), APPLICATION_ID, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = new StreamThread( @@ -2905,7 +2897,7 @@ public void shouldEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnActiveT doNothing().when(consumer).enforceRebalance("Active tasks corrupted"); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), APPLICATION_ID, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = new StreamThread( @@ -2968,7 +2960,7 @@ public void shouldNotEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnInac when(taskManager.handleCorruption(corruptedTasks)).thenReturn(false); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), APPLICATION_ID, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = new StreamThread( @@ -3203,7 +3195,7 @@ public void shouldConstructAdminMetrics(final boolean stateUpdaterEnabled, final final TaskManager taskManager = mock(TaskManager.class); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), APPLICATION_ID, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = new StreamThread( @@ -3262,7 +3254,7 @@ public void runAndVerifyFailedStreamThreadRecording(final boolean shouldFail, fi when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty()); final TaskManager taskManager = mock(TaskManager.class); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), APPLICATION_ID, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = new StreamThread( @@ -3663,8 +3655,6 @@ public void testNamedTopologyWithStreamsProtocol(final boolean stateUpdaterEnabl final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl( metrics, APPLICATION_ID, - PROCESS_ID.toString(), - APPLICATION_ID, mockTime ); @@ -3724,8 +3714,6 @@ public void testStreamsRebalanceDataWithExtraCopartition(final boolean stateUpda final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl( metrics, APPLICATION_ID, - PROCESS_ID.toString(), - APPLICATION_ID, mockTime ); @@ -3792,8 +3780,6 @@ public void testStreamsRebalanceDataWithStreamsProtocol() { final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl( metrics, APPLICATION_ID, - PROCESS_ID.toString(), - APPLICATION_ID, mockTime ); @@ -3889,7 +3875,7 @@ public void testStreamsProtocolRunOnceWithoutProcessingThreads() { null, mock(TaskManager.class), null, - new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), APPLICATION_ID, mockTime), + new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime), new TopologyMetadata(internalTopologyBuilder, config), PROCESS_ID, CLIENT_ID, @@ -3949,7 +3935,7 @@ public void testStreamsProtocolRunOnceWithoutProcessingThreadsMissingSourceTopic null, mock(TaskManager.class), null, - new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), APPLICATION_ID, mockTime), + new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime), new TopologyMetadata(internalTopologyBuilder, config), PROCESS_ID, CLIENT_ID, @@ -4018,7 +4004,7 @@ public void testStreamsProtocolIncorrectlyPartitionedTopics() { null, mock(TaskManager.class), null, - new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), APPLICATION_ID, mockTime), + new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime), new TopologyMetadata(internalTopologyBuilder, config), PROCESS_ID, CLIENT_ID, @@ -4078,7 +4064,7 @@ public void testStreamsProtocolRunOnceWithProcessingThreads() { null, mock(TaskManager.class), null, - new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), APPLICATION_ID, mockTime), + new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime), new TopologyMetadata(internalTopologyBuilder, config), PROCESS_ID, CLIENT_ID, @@ -4138,7 +4124,7 @@ public void testStreamsProtocolRunOnceWithProcessingThreadsMissingSourceTopic() null, mock(TaskManager.class), null, - new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), APPLICATION_ID, mockTime), + new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime), new TopologyMetadata(internalTopologyBuilder, config), PROCESS_ID, CLIENT_ID, @@ -4207,7 +4193,7 @@ public void testStreamsProtocolMissingSourceTopicRecovery() { null, mock(TaskManager.class), null, - new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), APPLICATION_ID, mockTime), + new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime), new TopologyMetadata(internalTopologyBuilder, config), PROCESS_ID, CLIENT_ID, @@ -4303,7 +4289,7 @@ private StreamThread setUpThread(final Properties streamsConfigProps) { "", taskManager, null, - new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), APPLICATION_ID, mockTime), + new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime), topologyMetadata, PROCESS_ID, "thread-id", @@ -4355,7 +4341,7 @@ private void setupInternalTopologyWithoutState(final StreamsConfig config) { private Collection createStandbyTask(final StreamsConfig config) { final LogContext logContext = new LogContext("test"); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), APPLICATION_ID, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator( new TopologyMetadata(internalTopologyBuilder, config), config, @@ -4414,7 +4400,7 @@ private StreamThread buildStreamThread(final Consumer consumer, final StreamsConfig config, final TopologyMetadata topologyMetadata) { final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), APPLICATION_ID, mockTime); + new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime); return new StreamThread( mockTime, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java index 7070bb4df18cd..c0dac77f867d0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java @@ -26,7 +26,6 @@ import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ImmutableMetricValue; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version; import org.apache.kafka.test.StreamsTestUtils; @@ -48,14 +47,12 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; -import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.APPLICATION_ID_TAG; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.AVG_SUFFIX; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.CLIENT_ID_TAG; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.CLIENT_LEVEL_GROUP; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.MAX_SUFFIX; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_LEVEL_GROUP; -import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESS_ID_TAG; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_SUFFIX; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.STATE_STORE_LEVEL_GROUP; @@ -133,14 +130,13 @@ public class StreamsMetricsImplTest { private final String metricNamePrefix = "metric"; private final String group = "group"; private final Map tags = mkMap(mkEntry("tag", "value")); - private final Map clientLevelTags = mkMap(mkEntry(CLIENT_ID_TAG, CLIENT_ID), - mkEntry(PROCESS_ID_TAG, PROCESS_ID), mkEntry(APPLICATION_ID_TAG, APPLICATION_ID)); + private final Map clientLevelTags = mkMap(mkEntry(CLIENT_ID_TAG, CLIENT_ID)); private final MetricName metricName1 = new MetricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION1, clientLevelTags); private final MetricName metricName2 = new MetricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION2, clientLevelTags); private final MockTime time = new MockTime(0); - private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, time); + private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); private static MetricConfig eqMetricConfig(final MetricConfig metricConfig) { final StringBuffer message = new StringBuffer(); @@ -254,8 +250,7 @@ public void shouldGetNewThreadLevelSensor() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetNewSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, - time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); final Sensor actualSensor = streamsMetrics.threadLevelSensor(THREAD_ID1, SENSOR_NAME_1, recordingLevel); @@ -267,8 +262,7 @@ public void shouldGetExistingThreadLevelSensor() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetExistingSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, - time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); final Sensor actualSensor = streamsMetrics.threadLevelSensor(THREAD_ID1, SENSOR_NAME_1, recordingLevel); @@ -280,8 +274,7 @@ public void shouldGetNewTaskLevelSensor() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetNewSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, - time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); final Sensor actualSensor = streamsMetrics.taskLevelSensor( THREAD_ID1, @@ -298,8 +291,7 @@ public void shouldGetExistingTaskLevelSensor() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetExistingSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, - time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); final Sensor actualSensor = streamsMetrics.taskLevelSensor( THREAD_ID1, @@ -316,8 +308,7 @@ public void shouldGetNewTopicLevelSensor() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetNewSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, - time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); final Sensor actualSensor = streamsMetrics.topicLevelSensor( THREAD_ID1, @@ -336,8 +327,7 @@ public void shouldGetExistingTopicLevelSensor() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetExistingSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, - time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); final Sensor actualSensor = streamsMetrics.topicLevelSensor( THREAD_ID1, @@ -356,8 +346,7 @@ public void shouldGetNewStoreLevelSensorIfNoneExists() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; final ArgumentCaptor sensorKeys = setupGetNewSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, - time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); final Sensor actualSensor = streamsMetrics.storeLevelSensor( TASK_ID1, @@ -375,8 +364,7 @@ public void shouldGetExistingStoreLevelSensor() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetExistingSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, - time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); final Sensor actualSensor = streamsMetrics.storeLevelSensor( TASK_ID1, @@ -392,8 +380,7 @@ public void shouldGetExistingStoreLevelSensor() { public void shouldUseSameStoreLevelSensorKeyWithTwoDifferentSensorNames() { final Metrics metrics = mock(Metrics.class); final ArgumentCaptor sensorKeys = setUpSensorKeyTests(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, - time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_2, INFO_RECORDING_LEVEL); @@ -405,8 +392,7 @@ public void shouldUseSameStoreLevelSensorKeyWithTwoDifferentSensorNames() { public void shouldNotUseSameStoreLevelSensorKeyWithDifferentTaskIds() { final Metrics metrics = mock(Metrics.class); final ArgumentCaptor sensorKeys = setUpSensorKeyTests(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, - time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL); streamsMetrics.storeLevelSensor(TASK_ID2, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL); @@ -418,8 +404,7 @@ public void shouldNotUseSameStoreLevelSensorKeyWithDifferentTaskIds() { public void shouldNotUseSameStoreLevelSensorKeyWithDifferentStoreNames() { final Metrics metrics = mock(Metrics.class); final ArgumentCaptor sensorKeys = setUpSensorKeyTests(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, - time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME2, SENSOR_NAME_1, INFO_RECORDING_LEVEL); @@ -431,8 +416,7 @@ public void shouldNotUseSameStoreLevelSensorKeyWithDifferentStoreNames() { public void shouldNotUseSameStoreLevelSensorKeyWithDifferentThreadIds() throws InterruptedException { final Metrics metrics = mock(Metrics.class); final ArgumentCaptor sensorKeys = setUpSensorKeyTests(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, - time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL); final Thread otherThread = @@ -447,8 +431,7 @@ public void shouldNotUseSameStoreLevelSensorKeyWithDifferentThreadIds() throws I public void shouldUseSameStoreLevelSensorKeyWithSameSensorNames() { final Metrics metrics = mock(Metrics.class); final ArgumentCaptor sensorKeys = setUpSensorKeyTests(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, - time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL); @@ -472,8 +455,7 @@ public void shouldAddNewStoreLevelMutableMetric() { .thenReturn(metricName); when(metrics.metric(metricName)).thenReturn(null); when(metrics.addMetricIfAbsent(eq(metricName), eqMetricConfig(metricConfig), eq(VALUE_PROVIDER))).thenReturn(null); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, - time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); streamsMetrics.addStoreLevelMutableMetric( TASK_ID1, @@ -506,8 +488,7 @@ public void shouldNotAddStoreLevelMutableMetricIfAlreadyExists() { when(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP)) .thenReturn(metricName); when(metrics.metric(metricName)).thenReturn(null); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, - time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); streamsMetrics.addStoreLevelMutableMetric( TASK_ID1, @@ -557,8 +538,7 @@ public void shouldCreateMetricOnceDuringConcurrentMetricCreationRequest() throws @Test public void shouldRemoveStateStoreLevelSensors() { final Metrics metrics = mock(Metrics.class); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, - time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); final MetricName metricName1 = new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP); final MetricName metricName2 = @@ -581,8 +561,7 @@ public void shouldGetNewNodeLevelSensor() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetNewSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, - time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); final Sensor actualSensor = streamsMetrics.nodeLevelSensor( THREAD_ID1, @@ -600,8 +579,7 @@ public void shouldGetExistingNodeLevelSensor() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetExistingSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, - time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); final Sensor actualSensor = streamsMetrics.nodeLevelSensor( THREAD_ID1, @@ -620,8 +598,7 @@ public void shouldGetNewCacheLevelSensor() { final RecordingLevel recordingLevel = RecordingLevel.INFO; final String processorCacheName = "processorNodeName"; setupGetNewSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, - time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); final Sensor actualSensor = streamsMetrics.cacheLevelSensor( THREAD_ID1, @@ -640,8 +617,7 @@ public void shouldGetExistingCacheLevelSensor() { final RecordingLevel recordingLevel = RecordingLevel.INFO; final String processorCacheName = "processorNodeName"; setupGetExistingSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, - time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); final Sensor actualSensor = streamsMetrics.cacheLevelSensor( THREAD_ID1, TASK_ID1, @@ -658,8 +634,7 @@ public void shouldGetNewClientLevelSensor() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetNewSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, - time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); final Sensor actualSensor = streamsMetrics.clientLevelSensor(SENSOR_NAME_1, recordingLevel); @@ -671,8 +646,7 @@ public void shouldGetExistingClientLevelSensor() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; setupGetExistingSensorTest(metrics); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, - time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); final Sensor actualSensor = streamsMetrics.clientLevelSensor(SENSOR_NAME_1, recordingLevel); @@ -681,33 +655,110 @@ public void shouldGetExistingClientLevelSensor() { @Test public void shouldAddClientLevelImmutableMetric() { - final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; - final MetricConfig metricConfig = new MetricConfig().recordLevel(recordingLevel); final String value = "immutable-value"; - final ImmutableMetricValue immutableValue = new ImmutableMetricValue<>(value); - when(metrics.metricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION1, clientLevelTags)) - .thenReturn(metricName1); - doNothing().when(metrics).addMetric(eq(metricName1), eqMetricConfig(metricConfig), eq(immutableValue)); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, - time); - streamsMetrics.addClientLevelImmutableMetric(METRIC_NAME1, DESCRIPTION1, recordingLevel, value); + final StreamsMetricsImpl streamsMetrics + = new StreamsMetricsImpl(metrics, CLIENT_ID, time); + + streamsMetrics.addClientLevelImmutableMetric( + METRIC_NAME1, + DESCRIPTION1, + recordingLevel, + value + ); + + final MetricName name = metrics.metricName( + METRIC_NAME1, + CLIENT_LEVEL_GROUP, + mkMap( + mkEntry("client-id", CLIENT_ID) + ) + ); + assertThat(metrics.metric(name).metricName().name(), equalTo(METRIC_NAME1)); + assertThat(metrics.metric(name).metricValue(), equalTo(value)); + } + + @Test + public void shouldAddClientLevelImmutableMetricWithAdditionalTags() { + final RecordingLevel recordingLevel = RecordingLevel.INFO; + final String value = "immutable-value"; + + final StreamsMetricsImpl streamsMetrics + = new StreamsMetricsImpl(metrics, CLIENT_ID, time); + + streamsMetrics.addClientLevelImmutableMetric( + METRIC_NAME1, + DESCRIPTION1, + Collections.singletonMap("additional-tag", "additional-value"), + recordingLevel, + value + ); + + final MetricName name = metrics.metricName( + METRIC_NAME1, + CLIENT_LEVEL_GROUP, + mkMap( + mkEntry("client-id", CLIENT_ID), + mkEntry("additional-tag", "additional-value") + ) + ); + assertThat(metrics.metric(name).metricName().name(), equalTo(METRIC_NAME1)); + assertThat(metrics.metric(name).metricValue(), equalTo(value)); } @Test public void shouldAddClientLevelMutableMetric() { - final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; - final MetricConfig metricConfig = new MetricConfig().recordLevel(recordingLevel); - final Gauge valueProvider = (config, now) -> "mutable-value"; - when(metrics.metricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION1, clientLevelTags)) - .thenReturn(metricName1); - doNothing().when(metrics).addMetric(eq(metricName1), eqMetricConfig(metricConfig), eq(valueProvider)); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, - time); + final String value = "mutable-value"; + + final StreamsMetricsImpl streamsMetrics + = new StreamsMetricsImpl(metrics, CLIENT_ID, time); - streamsMetrics.addClientLevelMutableMetric(METRIC_NAME1, DESCRIPTION1, recordingLevel, valueProvider); + streamsMetrics.addClientLevelMutableMetric( + METRIC_NAME1, + DESCRIPTION1, + recordingLevel, + (c, t) -> value + ); + + final MetricName name = metrics.metricName( + METRIC_NAME1, + CLIENT_LEVEL_GROUP, + mkMap( + mkEntry("client-id", CLIENT_ID) + ) + ); + assertThat(metrics.metric(name).metricName().name(), equalTo(METRIC_NAME1)); + assertThat(metrics.metric(name).metricValue(), equalTo(value)); + } + + @Test + public void shouldAddClientLevelMutableMetricWithAdditionalTags() { + final RecordingLevel recordingLevel = RecordingLevel.INFO; + final String value = "mutable-value"; + + final StreamsMetricsImpl streamsMetrics + = new StreamsMetricsImpl(metrics, CLIENT_ID, time); + + streamsMetrics.addClientLevelMutableMetric( + METRIC_NAME1, + DESCRIPTION1, + Collections.singletonMap("additional-tag", "additional-value"), + recordingLevel, + (c, t) -> value + ); + + final MetricName name = metrics.metricName( + METRIC_NAME1, + CLIENT_LEVEL_GROUP, + mkMap( + mkEntry("client-id", CLIENT_ID), + mkEntry("additional-tag", "additional-value") + ) + ); + assertThat(metrics.metric(name).metricName().name(), equalTo(METRIC_NAME1)); + assertThat(metrics.metric(name).metricValue(), equalTo(value)); } @Test @@ -726,8 +777,7 @@ private void setupRemoveSensorsTest(final Metrics metrics, @Test public void shouldRemoveClientLevelMetricsAndSensors() { final Metrics metrics = mock(Metrics.class); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, - time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); final ArgumentCaptor sensorKeys = addSensorsOnAllLevels(metrics, streamsMetrics); doNothing().when(metrics).removeSensor(sensorKeys.getAllValues().get(0)); @@ -740,8 +790,7 @@ public void shouldRemoveClientLevelMetricsAndSensors() { @Test public void shouldRemoveThreadLevelSensors() { final Metrics metrics = mock(Metrics.class); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, - time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); addSensorsOnAllLevels(metrics, streamsMetrics); setupRemoveSensorsTest(metrics, THREAD_ID1); @@ -750,8 +799,7 @@ public void shouldRemoveThreadLevelSensors() { @Test public void testNullMetrics() { - assertThrows(NullPointerException.class, () -> new StreamsMetricsImpl(null, "", PROCESS_ID, - APPLICATION_ID, time)); + assertThrows(NullPointerException.class, () -> new StreamsMetricsImpl(null, "", time)); } @Test @@ -784,8 +832,7 @@ public void testRemoveSensor() { @Test public void testMultiLevelSensorRemoval() { final Metrics registry = new Metrics(); - final StreamsMetricsImpl metrics = new StreamsMetricsImpl(registry, THREAD_ID1, PROCESS_ID, APPLICATION_ID, - time); + final StreamsMetricsImpl metrics = new StreamsMetricsImpl(registry, THREAD_ID1, time); for (final MetricName defaultMetric : registry.metrics().keySet()) { registry.removeMetric(defaultMetric); } @@ -891,8 +938,7 @@ public void testTotalMetricDoesntDecrease() { final MockTime time = new MockTime(1); final MetricConfig config = new MetricConfig().timeWindow(1, TimeUnit.MILLISECONDS); final Metrics metrics = new Metrics(config, time); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "", PROCESS_ID, APPLICATION_ID, - time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "", time); final String scope = "scope"; final String entity = "entity"; @@ -926,8 +972,7 @@ public void testTotalMetricDoesntDecrease() { @Test public void shouldAddLatencyRateTotalSensor() { - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, - time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); shouldAddCustomSensor( streamsMetrics.addLatencyRateTotalSensor(SCOPE_NAME, ENTITY_NAME, OPERATION_NAME, RecordingLevel.DEBUG), streamsMetrics, @@ -942,8 +987,7 @@ public void shouldAddLatencyRateTotalSensor() { @Test public void shouldAddRateTotalSensor() { - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, - time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, time); shouldAddCustomSensor( streamsMetrics.addRateTotalSensor(SCOPE_NAME, ENTITY_NAME, OPERATION_NAME, RecordingLevel.DEBUG), streamsMetrics, @@ -1069,10 +1113,8 @@ public void shouldThrowIfRateTotalSensorIsAddedWithOddTags() { public void shouldGetClientLevelTagMap() { final Map tagMap = streamsMetrics.clientLevelTagMap(); - assertThat(tagMap.size(), equalTo(3)); + assertThat(tagMap.size(), equalTo(1)); assertThat(tagMap.get(StreamsMetricsImpl.CLIENT_ID_TAG), equalTo(CLIENT_ID)); - assertThat(tagMap.get(StreamsMetricsImpl.PROCESS_ID_TAG), equalTo(PROCESS_ID)); - assertThat(tagMap.get(StreamsMetricsImpl.APPLICATION_ID_TAG), equalTo(APPLICATION_ID)); } @Test @@ -1080,8 +1122,7 @@ public void shouldGetStoreLevelTagMap() { final String taskName = "test-task"; final String storeType = "remote-window"; final String storeName = "window-keeper"; - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, APPLICATION_ID, - time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, THREAD_ID1, time); final Map tagMap = streamsMetrics.storeLevelTagMap(taskName, storeType, storeName); @@ -1096,8 +1137,7 @@ public void shouldGetStoreLevelTagMap() { @Test public void shouldGetCacheLevelTagMap() { final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, APPLICATION_ID, - time); + new StreamsMetricsImpl(metrics, THREAD_ID1, time); final String taskName = "taskName"; final String storeName = "storeName"; @@ -1114,8 +1154,7 @@ public void shouldGetCacheLevelTagMap() { @Test public void shouldGetThreadLevelTagMap() { - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, APPLICATION_ID, - time); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, THREAD_ID1, time); final Map tagMap = streamsMetrics.threadLevelTagMap(THREAD_ID1); @@ -1248,7 +1287,7 @@ public void shouldAddMinAndMaxMetricsToSensor() { @Test public void shouldReturnMetricsVersionCurrent() { assertThat( - new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, APPLICATION_ID, time).version(), + new StreamsMetricsImpl(metrics, THREAD_ID1, time).version(), equalTo(Version.LATEST) ); } @@ -1306,61 +1345,60 @@ public void shouldNotMeasureLatencyBecauseSensorHasNoMetrics() { @Test public void shouldAddThreadLevelMutableMetric() { final int measuredValue = 123; + final String name = "foobar"; final StreamsMetricsImpl streamsMetrics - = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, APPLICATION_ID, - time); + = new StreamsMetricsImpl(metrics, THREAD_ID1, time); streamsMetrics.addThreadLevelMutableMetric( - "foobar", + name, "test metric", "t1", (c, t) -> measuredValue ); - final MetricName name = metrics.metricName( - "foobar", + final MetricName metricName = metrics.metricName( + name, THREAD_LEVEL_GROUP, mkMap( mkEntry("thread-id", "t1") ) ); - assertThat(metrics.metric(name), notNullValue()); - assertThat(metrics.metric(name).metricValue(), equalTo(measuredValue)); + assertThat(metrics.metric(metricName).metricName().name(), equalTo(name)); + assertThat(metrics.metric(metricName).metricValue(), equalTo(measuredValue)); } @Test public void shouldAddThreadLevelMutableMetricWithAdditionalTags() { final int measuredValue = 123; + final String name = "foobar"; final StreamsMetricsImpl streamsMetrics - = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, APPLICATION_ID, - time); + = new StreamsMetricsImpl(metrics, THREAD_ID1, time); streamsMetrics.addThreadLevelMutableMetric( - "foobar", + name, "test metric", "t1", Collections.singletonMap("additional-tag", "additional-value"), (c, t) -> measuredValue ); - final MetricName name = metrics.metricName( - "foobar", + final MetricName metricName = metrics.metricName( + name, THREAD_LEVEL_GROUP, mkMap( mkEntry("thread-id", "t1"), mkEntry("additional-tag", "additional-value") ) ); - assertThat(metrics.metric(name), notNullValue()); - assertThat(metrics.metric(name).metricValue(), equalTo(measuredValue)); + assertThat(metrics.metric(metricName).metricName().name(), equalTo(name)); + assertThat(metrics.metric(metricName).metricValue(), equalTo(measuredValue)); } @Test public void shouldCleanupThreadLevelMutableMetric() { final int measuredValue = 123; final StreamsMetricsImpl streamsMetrics - = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, APPLICATION_ID, - time); + = new StreamsMetricsImpl(metrics, THREAD_ID1, time); streamsMetrics.addThreadLevelMutableMetric( "foobar", "test metric", @@ -1382,8 +1420,7 @@ public void shouldCleanupThreadLevelMutableMetric() { public void shouldAddThreadLevelImmutableMetric() { final int measuredValue = 123; final StreamsMetricsImpl streamsMetrics - = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, APPLICATION_ID, - time); + = new StreamsMetricsImpl(metrics, THREAD_ID1, time); streamsMetrics.addThreadLevelImmutableMetric( "foobar", @@ -1407,8 +1444,7 @@ public void shouldAddThreadLevelImmutableMetric() { public void shouldCleanupThreadLevelImmutableMetric() { final int measuredValue = 123; final StreamsMetricsImpl streamsMetrics - = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, APPLICATION_ID, - time); + = new StreamsMetricsImpl(metrics, THREAD_ID1, time); streamsMetrics.addThreadLevelImmutableMetric( "foobar", "test metric", diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java index 902b362e1532d..fd5bbef428a46 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java @@ -1350,7 +1350,7 @@ public void shouldRestoreRecordsAndConsistencyVectorSingleTopic() { dir, Serdes.String(), Serdes.String(), - new StreamsMetricsImpl(new Metrics(), "mock", "processId", "applicationId", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()), new StreamsConfig(props), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())), @@ -1386,7 +1386,7 @@ public void shouldRestoreRecordsAndConsistencyVectorMultipleTopics() { dir, Serdes.String(), Serdes.String(), - new StreamsMetricsImpl(new Metrics(), "mock", "processId", "applicationId", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()), new StreamsConfig(props), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())), @@ -1425,7 +1425,7 @@ public void shouldHandleTombstoneRecords() { dir, Serdes.String(), Serdes.String(), - new StreamsMetricsImpl(new Metrics(), "mock", "processId", "applicationId", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()), new StreamsConfig(props), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())), @@ -1466,7 +1466,7 @@ public void shouldNotThrowWhenRestoringOnMissingHeaders() { dir, Serdes.String(), Serdes.String(), - new StreamsMetricsImpl(new Metrics(), "mock", "processId", "applicationId", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()), new StreamsConfig(props), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java index 77a5d9e87106e..ede32caba90ca 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java @@ -572,7 +572,7 @@ public void shouldRestoreRecordsAndConsistencyVectorSingleTopic(final SegmentedB dir, Serdes.String(), Serdes.String(), - new StreamsMetricsImpl(new Metrics(), "mock", "processId", "applicationId", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()), new StreamsConfig(props), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())), @@ -612,7 +612,7 @@ public void shouldRestoreRecordsAndConsistencyVectorMultipleTopics(final Segment dir, Serdes.String(), Serdes.String(), - new StreamsMetricsImpl(new Metrics(), "mock", "processId", "applicationId", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()), new StreamsConfig(props), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())), @@ -654,7 +654,7 @@ public void shouldHandleTombstoneRecords(final SegmentedBytesStore.KeySchema sch dir, Serdes.String(), Serdes.String(), - new StreamsMetricsImpl(new Metrics(), "mock", "processId", "applicationId", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()), new StreamsConfig(props), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())), @@ -698,7 +698,7 @@ public void shouldNotThrowWhenRestoringOnMissingHeaders(final SegmentedBytesStor dir, Serdes.String(), Serdes.String(), - new StreamsMetricsImpl(new Metrics(), "mock", "processId", "applicationId", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()), new StreamsConfig(props), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java index e09a8ebe57bd0..02de04a5247bc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java @@ -97,7 +97,7 @@ private InternalMockProcessorContext mockContext() { TestUtils.tempDirectory(), Serdes.String(), Serdes.Long(), - new StreamsMetricsImpl(new Metrics(), "mock", "processId", "applicationId", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()), streamsConfig, () -> collector, new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java index 573cc02a438cd..0480fbc07213f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java @@ -112,7 +112,7 @@ public void before() { when(mockContext.applicationId()).thenReturn("appId"); when(mockContext.metrics()) .thenReturn( - new StreamsMetricsImpl(new Metrics(), "threadName", "processId", "applicationId", new MockTime()) + new StreamsMetricsImpl(new Metrics(), "threadName", new MockTime()) ); when(mockContext.taskId()).thenReturn(new TaskId(0, 0)); when(mockContext.appConfigs()).thenReturn(CONFIGS); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java index 27aa52b48c6ff..fa0757ac62f6d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java @@ -54,7 +54,7 @@ public class KeyValueSegmentTest { @BeforeEach public void setUp() { metricsRecorder.init( - new StreamsMetricsImpl(new Metrics(), "test-client", "processId", "applicationId", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "test-client", new MockTime()), new TaskId(0, 0) ); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java index 0ecea105badb9..4aba2784971bd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java @@ -122,7 +122,7 @@ private void setUp() { metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG); when(context.applicationId()).thenReturn(APPLICATION_ID); when(context.metrics()).thenReturn( - new StreamsMetricsImpl(metrics, "test", "processId", "applicationId", mockTime) + new StreamsMetricsImpl(metrics, "test", mockTime) ); when(context.taskId()).thenReturn(taskId); when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java index 4546e6b7c1ffd..f22780636f28b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java @@ -126,7 +126,7 @@ public void setUp() { metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG); when(context.applicationId()).thenReturn(APPLICATION_ID); when(context.metrics()) - .thenReturn(new StreamsMetricsImpl(metrics, "test", "processId", "applicationId", mockTime)); + .thenReturn(new StreamsMetricsImpl(metrics, "test", mockTime)); when(context.taskId()).thenReturn(taskId); when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC); when(innerStore.name()).thenReturn(STORE_NAME); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java index 0b6556ac78b4d..9b5d33db96642 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java @@ -127,7 +127,7 @@ private void setUp() { setUpWithoutContext(); when(context.applicationId()).thenReturn(APPLICATION_ID); when(context.metrics()) - .thenReturn(new StreamsMetricsImpl(metrics, "test", "processId", "applicationId", mockTime)); + .thenReturn(new StreamsMetricsImpl(metrics, "test", mockTime)); when(context.taskId()).thenReturn(taskId); when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC); when(inner.name()).thenReturn(STORE_NAME); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java index a3ed1453454e0..6a4871bd3a35b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java @@ -77,7 +77,7 @@ public class MeteredTimestampedWindowStoreTest { public void setUp() { final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "test", "processId", "applicationId", new MockTime()); + new StreamsMetricsImpl(metrics, "test", new MockTime()); context = new InternalMockProcessorContext<>( TestUtils.tempDirectory(), @@ -105,7 +105,7 @@ public void setUp() { public void setUpWithoutContextName() { final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "test", "processId", "applicationId", new MockTime()); + new StreamsMetricsImpl(metrics, "test", new MockTime()); context = new InternalMockProcessorContext<>( TestUtils.tempDirectory(), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java index 1602f1a115bb1..56fe45630d17d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java @@ -110,7 +110,7 @@ public class MeteredVersionedKeyValueStoreTest { @BeforeEach public void setUp() { when(inner.name()).thenReturn(STORE_NAME); - when(context.metrics()).thenReturn(new StreamsMetricsImpl(metrics, "test", "processId", "applicationId", mockTime)); + when(context.metrics()).thenReturn(new StreamsMetricsImpl(metrics, "test", mockTime)); when(context.applicationId()).thenReturn(APPLICATION_ID); when(context.taskId()).thenReturn(TASK_ID); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java index 40e4e52eafae4..3cf17ff830eda 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java @@ -116,7 +116,7 @@ public class MeteredWindowStoreTest { @BeforeEach public void setUp() { final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "test", "processId", "applicationId", new MockTime()); + new StreamsMetricsImpl(metrics, "test", new MockTime()); context = new InternalMockProcessorContext<>( TestUtils.tempDirectory(), Serdes.String(), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index f56ac75a0c911..530a25b4e662d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -923,7 +923,7 @@ public void shouldVerifyThatMetricsRecordedFromStatisticsGetMeasurementsFromRock final Metrics metrics = new Metrics(new MetricConfig().recordLevel(RecordingLevel.DEBUG)); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "test-application", "processId", "applicationId", time); + new StreamsMetricsImpl(metrics, "test-application", time); context = mock(InternalMockProcessorContext.class); when(context.metrics()).thenReturn(streamsMetrics); @@ -956,7 +956,7 @@ public void shouldVerifyThatMetricsRecordedFromPropertiesGetMeasurementsFromRock final Metrics metrics = new Metrics(new MetricConfig().recordLevel(RecordingLevel.INFO)); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "test-application", "processId", "applicationId", time); + new StreamsMetricsImpl(metrics, "test-application", time); context = mock(InternalMockProcessorContext.class); when(context.metrics()).thenReturn(streamsMetrics); @@ -988,7 +988,7 @@ public void shouldVerifyThatPropertyBasedMetricsUseValidPropertyName() { final Metrics metrics = new Metrics(new MetricConfig().recordLevel(RecordingLevel.INFO)); final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(metrics, "test-application", "processId", "applicationId", time); + new StreamsMetricsImpl(metrics, "test-application", time); final Properties props = StreamsTestUtils.getStreamsConfig(); context = mock(InternalMockProcessorContext.class); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java index 5f5495af4251b..e288c04517e27 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java @@ -63,7 +63,7 @@ public class RocksDBTimeOrderedKeyValueBufferTest { public void setUp() { final Metrics metrics = new Metrics(); offset = 0; - streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", "processId", "applicationId", new MockTime()); + streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", new MockTime()); context = new MockInternalProcessorContext<>(StreamsTestUtils.getStreamsConfig(), new TaskId(0, 0), TestUtils.tempDirectory()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java index 0acd2a330da22..11f7e6c13cb41 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java @@ -54,7 +54,7 @@ public class TimestampedSegmentTest { @BeforeEach public void setUp() { metricsRecorder.init( - new StreamsMetricsImpl(new Metrics(), "test-client", "processId", "applicationId", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "test-client", new MockTime()), new TaskId(0, 0) ); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java index 62a37ff8d0aa0..113de5959a4ed 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java @@ -202,7 +202,7 @@ public void shouldGetPinnedUsageOfBlockCacheWithSingleCache() throws Exception { private void runAndVerifySumOfProperties(final String propertyName) throws Exception { final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(new Metrics(), "test-client", "processId", "applicationId", new MockTime()); + new StreamsMetricsImpl(new Metrics(), "test-client", new MockTime()); final RocksDBMetricsRecorder recorder = new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME); recorder.init(streamsMetrics, TASK_ID); @@ -219,7 +219,7 @@ private void runAndVerifySumOfProperties(final String propertyName) throws Excep private void runAndVerifyBlockCacheMetricsWithMultipleCaches(final String propertyName) throws Exception { final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(new Metrics(), "test-client", "processId", "applicationId", new MockTime()); + new StreamsMetricsImpl(new Metrics(), "test-client", new MockTime()); final RocksDBMetricsRecorder recorder = new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME); recorder.init(streamsMetrics, TASK_ID); @@ -236,7 +236,7 @@ private void runAndVerifyBlockCacheMetricsWithMultipleCaches(final String proper private void runAndVerifyBlockCacheMetricsWithSingleCache(final String propertyName) throws Exception { final StreamsMetricsImpl streamsMetrics = - new StreamsMetricsImpl(new Metrics(), "test-client", "processId", "applicationId", new MockTime()); + new StreamsMetricsImpl(new Metrics(), "test-client", new MockTime()); final RocksDBMetricsRecorder recorder = new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME); recorder.init(streamsMetrics, TASK_ID); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java index b0ed10c45fe6e..0436811db79ca 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java @@ -194,7 +194,7 @@ public void shouldThrowIfMetricRecorderIsReInitialisedWithDifferentStreamsMetric assertThrows( IllegalStateException.class, () -> recorder.init( - new StreamsMetricsImpl(new Metrics(), "test-client", "processId", "applicationId", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "test-client", new MockTime()), TASK_ID1 ) ); diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java index debe7066c2fed..318be5cd86a7a 100644 --- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java @@ -91,7 +91,7 @@ public InternalMockProcessorContext() { this(null, null, null, - new StreamsMetricsImpl(new Metrics(), "mock", "processId", "applicationId", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()), new StreamsConfig(StreamsTestUtils.getStreamsConfig()), null, null, @@ -108,8 +108,6 @@ public InternalMockProcessorContext(final File stateDir, new StreamsMetricsImpl( new Metrics(), "mock", - "processId", - "applicationId", new MockTime() ), config, @@ -142,8 +140,6 @@ public InternalMockProcessorContext(final File stateDir, new StreamsMetricsImpl( new Metrics(), "mock", - "processId", - "applicationId", new MockTime() ), config, @@ -161,7 +157,7 @@ public InternalMockProcessorContext(final File stateDir, stateDir, keySerde, valueSerde, - new StreamsMetricsImpl(new Metrics(), "mock", "processId", "applicationId", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()), config, null, null, @@ -181,7 +177,7 @@ public InternalMockProcessorContext(final StateSerdes serdes, null, serdes.keySerde(), serdes.valueSerde(), - new StreamsMetricsImpl(metrics, "mock", "processId", "applicationId", new MockTime()), + new StreamsMetricsImpl(metrics, "mock", new MockTime()), new StreamsConfig(StreamsTestUtils.getStreamsConfig()), () -> collector, null, @@ -198,7 +194,7 @@ public InternalMockProcessorContext(final File stateDir, stateDir, keySerde, valueSerde, - new StreamsMetricsImpl(new Metrics(), "mock", "processId", "applicationId", new MockTime()), + new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()), new StreamsConfig(StreamsTestUtils.getStreamsConfig()), () -> collector, cache, diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 6488dfabfb737..f0e748f35ea0d 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -384,8 +384,6 @@ private StreamsMetricsImpl setupMetrics(final StreamsConfig streamsConfig) { final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl( metrics, "test-client", - "processId", - "applicationId", mockWallClockTime ); TaskMetrics.droppedRecordsSensor(threadId, TASK_ID.toString(), streamsMetrics); diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java index 4cd29b5bc640a..5833b5d25cc2a 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java @@ -252,8 +252,6 @@ public MockProcessorContext(final Properties config, final TaskId taskId, final this.metrics = new StreamsMetricsImpl( new Metrics(metricConfig), threadId, - "processId", - "applicationId", Time.SYSTEM ); TaskMetrics.droppedRecordsSensor(threadId, taskId.toString(), metrics); diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java index 48fad67f0f4a3..8cea2fe329035 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java @@ -269,8 +269,6 @@ public MockProcessorContext(final Properties config, final TaskId taskId, final metrics = new StreamsMetricsImpl( new Metrics(metricConfig), threadId, - "processId", - "applicationId", Time.SYSTEM ); TaskMetrics.droppedRecordsSensor(threadId, taskId.toString(), metrics); diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java index cacedfafbf76a..4551cb6ebe979 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java @@ -291,8 +291,6 @@ public void close() { } when(mockInternalProcessorContext.metrics()).thenReturn(new StreamsMetricsImpl( new Metrics(new MetricConfig()), Thread.currentThread().getName(), - "processId", - "applicationId", Time.SYSTEM )); when(mockInternalProcessorContext.taskId()).thenReturn(new TaskId(1, 1));