Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -982,8 +982,6 @@ private KafkaStreams(final TopologyMetadata topologyMetadata,
streamsMetrics = new StreamsMetricsImpl(
metrics,
clientId,
processId.toString(),
applicationId,
time
);

Expand All @@ -992,8 +990,8 @@ private KafkaStreams(final TopologyMetadata topologyMetadata,
ClientMetrics.addApplicationIdMetric(streamsMetrics, applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG));
ClientMetrics.addTopologyDescriptionMetric(streamsMetrics, (metricsConfig, now) -> this.topologyMetadata.topologyDescriptionString());
ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) -> state.name());
ClientMetrics.addClientStateTelemetryMetric(streamsMetrics, (metricsConfig, now) -> state.ordinal());
ClientMetrics.addClientRecordingLevelMetric(streamsMetrics, calculateMetricsRecordingLevel());
ClientMetrics.addClientStateTelemetryMetric(processId.toString(), applicationId, streamsMetrics, (metricsConfig, now) -> state.ordinal());
ClientMetrics.addClientRecordingLevelMetric(processId.toString(), streamsMetrics, calculateMetricsRecordingLevel());
threads = Collections.synchronizedList(new LinkedList<>());
ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) -> numLiveStreamThreads());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,14 @@
import org.slf4j.LoggerFactory;

import java.io.InputStream;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;

import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.APPLICATION_ID_TAG;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.CLIENT_LEVEL_GROUP;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESS_ID_TAG;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addSumMetricToSensor;

public class ClientMetrics {
Expand Down Expand Up @@ -126,21 +131,30 @@ public static void addStateMetric(final StreamsMetricsImpl streamsMetrics,
);
}

public static void addClientStateTelemetryMetric(final StreamsMetricsImpl streamsMetrics,
public static void addClientStateTelemetryMetric(final String processId,
final String applicationId,
final StreamsMetricsImpl streamsMetrics,
final Gauge<Integer> stateProvider) {
final Map<String, String> additionalTags = new LinkedHashMap<>();
additionalTags.put(PROCESS_ID_TAG, processId);
additionalTags.put(APPLICATION_ID_TAG, applicationId);

streamsMetrics.addClientLevelMutableMetric(
CLIENT_STATE,
STATE_DESCRIPTION,
additionalTags,
RecordingLevel.INFO,
stateProvider
);
}

public static void addClientRecordingLevelMetric(final StreamsMetricsImpl streamsMetrics,
public static void addClientRecordingLevelMetric(final String processId,
final StreamsMetricsImpl streamsMetrics,
final int recordingLevel) {
streamsMetrics.addClientLevelImmutableMetric(
RECORDING_LEVEL,
RECORDING_LEVEL_DESCRIPTION,
Collections.singletonMap(PROCESS_ID_TAG, processId),
RecordingLevel.INFO,
recordingLevel
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,6 @@ public int hashCode() {
private final Metrics metrics;
private final Map<Sensor, Sensor> parentSensors;
private final String clientId;
private final String processId;
private final String applicationId;

private final Version version;
private final Deque<MetricName> clientLevelMetrics = new LinkedList<>();
Expand Down Expand Up @@ -169,14 +167,10 @@ public int hashCode() {

public StreamsMetricsImpl(final Metrics metrics,
final String clientId,
final String processId,
final String applicationId,
final Time time) {
Objects.requireNonNull(metrics, "Metrics cannot be null");
this.metrics = metrics;
this.clientId = clientId;
this.processId = processId;
this.applicationId = applicationId;
version = Version.LATEST;
rocksDBMetricsRecordingTrigger = new RocksDBMetricsRecordingTrigger(time);

Expand All @@ -199,7 +193,20 @@ public <T> void addClientLevelImmutableMetric(final String name,
final String description,
final RecordingLevel recordingLevel,
final T value) {
final MetricName metricName = metrics.metricName(name, CLIENT_LEVEL_GROUP, description, clientLevelTagMap());
addClientLevelImmutableMetric(name, description, Collections.emptyMap(), recordingLevel, value);
}

public <T> void addClientLevelImmutableMetric(final String name,
final String description,
final Map<String, String> additionalTags,
final RecordingLevel recordingLevel,
final T value) {
final MetricName metricName = metrics.metricName(
name,
CLIENT_LEVEL_GROUP,
description,
clientLevelTagMap(additionalTags)
);
final MetricConfig metricConfig = new MetricConfig().recordLevel(recordingLevel);
synchronized (clientLevelMetrics) {
metrics.addMetric(metricName, metricConfig, new ImmutableMetricValue<>(value));
Expand All @@ -211,7 +218,20 @@ public <T> void addClientLevelMutableMetric(final String name,
final String description,
final RecordingLevel recordingLevel,
final Gauge<T> valueProvider) {
final MetricName metricName = metrics.metricName(name, CLIENT_LEVEL_GROUP, description, clientLevelTagMap());
addClientLevelMutableMetric(name, description, Collections.emptyMap(), recordingLevel, valueProvider);
}

public <T> void addClientLevelMutableMetric(final String name,
final String description,
final Map<String, String> additionalTags,
final RecordingLevel recordingLevel,
final Gauge<T> valueProvider) {
final MetricName metricName = metrics.metricName(
name,
CLIENT_LEVEL_GROUP,
description,
clientLevelTagMap(additionalTags)
);
final MetricConfig metricConfig = new MetricConfig().recordLevel(recordingLevel);
synchronized (clientLevelMetrics) {
metrics.addMetric(metricName, metricConfig, valueProvider);
Expand Down Expand Up @@ -285,10 +305,12 @@ private String threadSensorPrefix(final String threadId) {
}

public Map<String, String> clientLevelTagMap() {
final Map<String, String> tagMap = new LinkedHashMap<>();
return clientLevelTagMap(Collections.emptyMap());
}

public Map<String, String> clientLevelTagMap(final Map<String, String> additionalTags) {
final Map<String, String> tagMap = new LinkedHashMap<>(additionalTags);
tagMap.put(CLIENT_ID_TAG, clientId);
tagMap.put(PROCESS_ID_TAG, processId);
tagMap.put(APPLICATION_ID_TAG, applicationId);
return tagMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;

import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.CLIENT_LEVEL_GROUP;
Expand All @@ -36,7 +37,9 @@
import static org.mockito.Mockito.when;

public class ClientMetricsTest {
private static final String APPLICATION_ID = "test-application-id";
private static final String COMMIT_ID = "test-commit-ID";
private static final String PROCESS_ID = "test-process-id";
private static final String VERSION = "test-version";

private final StreamsMetricsImpl streamsMetrics = mock(StreamsMetricsImpl.class);
Expand Down Expand Up @@ -116,11 +119,19 @@ public void shouldAddClientStateTelemetryMetric() {
final String name = "client-state";
final String description = "The state of the Kafka Streams client";
final Gauge<Integer> stateProvider = (config, now) -> State.RUNNING.ordinal();
setUpAndVerifyMutableMetric(
name,
description,
stateProvider,
() -> ClientMetrics.addClientStateTelemetryMetric(streamsMetrics, stateProvider)

final Map<String, String> additionalTags = new LinkedHashMap<>();
additionalTags.put("process-id", PROCESS_ID);
additionalTags.put("application-id", APPLICATION_ID);

ClientMetrics.addClientStateTelemetryMetric(PROCESS_ID, APPLICATION_ID, streamsMetrics, stateProvider);

verify(streamsMetrics).addClientLevelMutableMetric(
eq(name),
eq(description),
eq(additionalTags),
eq(RecordingLevel.INFO),
eq(stateProvider)
);
}

Expand All @@ -129,11 +140,15 @@ public void shouldAddRecordingLevelMetric() {
final String name = "recording-level";
final String description = "The metrics recording level of the Kafka Streams client";
final int recordingLevel = 1;
setUpAndVerifyImmutableMetric(
name,
description,
recordingLevel,
() -> ClientMetrics.addClientRecordingLevelMetric(streamsMetrics, recordingLevel)

ClientMetrics.addClientRecordingLevelMetric(PROCESS_ID, streamsMetrics, recordingLevel);

verify(streamsMetrics).addClientLevelImmutableMetric(
eq(name),
eq(description),
eq(Collections.singletonMap("process-id", PROCESS_ID)),
eq(RecordingLevel.INFO),
eq(recordingLevel)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public class KStreamSessionWindowAggregateProcessorTest {

private final MockTime time = new MockTime();
private final Metrics metrics = new Metrics();
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test", "processId", "applicationId", time);
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test", time);
private final String threadId = Thread.currentThread().getName();
private final Initializer<Long> initializer = () -> 0L;
private final Aggregator<String, String, Long> aggregator = (aggKey, value, aggregate) -> aggregate + 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class ActiveTaskCreatorTest {
private ChangelogReader changeLogReader;

private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "clientId", "processId", "applicationId", new MockTime());
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "clientId", new MockTime());
private final Map<String, Object> properties = mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class DefaultStateUpdaterTest {

// need an auto-tick timer to work for draining with timeout
private final Time time = new MockTime(1L);
private final StreamsMetricsImpl metrics = new StreamsMetricsImpl(new Metrics(time), "", "", "", time);
private final StreamsMetricsImpl metrics = new StreamsMetricsImpl(new Metrics(time), "", time);
private final StreamsConfig config = new StreamsConfig(configProps(COMMIT_INTERVAL));
private final ChangelogReader changelogReader = mock(ChangelogReader.class);
private final TopologyMetadata topologyMetadata = unnamedTopology().build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public void process(final Record<Object, Object> record) {
mockConsumer,
new StateDirectory(config, time, true, false),
0,
new StreamsMetricsImpl(new Metrics(), "test-client", "processId", "applicationId", time),
new StreamsMetricsImpl(new Metrics(), "test-client", time),
time,
"clientId",
stateRestoreListener,
Expand Down Expand Up @@ -169,7 +169,7 @@ public List<PartitionInfo> partitionsFor(final String topic) {
mockConsumer,
new StateDirectory(config, time, true, false),
0,
new StreamsMetricsImpl(new Metrics(), "test-client", "processId", "applicationId", time),
new StreamsMetricsImpl(new Metrics(), "test-client", time),
time,
"clientId",
stateRestoreListener,
Expand Down Expand Up @@ -418,7 +418,7 @@ public List<PartitionInfo> partitionsFor(final String topic) {
consumer,
new StateDirectory(config, time, true, false),
0,
new StreamsMetricsImpl(new Metrics(), "test-client", "processId", "applicationId", time),
new StreamsMetricsImpl(new Metrics(), "test-client", time),
time,
"clientId",
stateRestoreListener,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@
public class MockStreamsMetrics extends StreamsMetricsImpl {

public MockStreamsMetrics(final Metrics metrics) {
super(metrics, "test", "processId", "applicationId", new MockTime());
super(metrics, "test", new MockTime());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ public Set<TopicPartition> partitions() {
public void testMetricsWithBuiltInMetricsVersionLatest() {
final Metrics metrics = new Metrics();
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, "test-client", "processId", "applicationId", new MockTime());
new StreamsMetricsImpl(metrics, "test-client", new MockTime());
final InternalMockProcessorContext<Object, Object> context = new InternalMockProcessorContext<>(streamsMetrics);
final ProcessorNode<Object, Object, Object, Object> node =
new ProcessorNode<>(NAME, new NoOpProcessor(), Collections.emptySet());
Expand Down Expand Up @@ -363,7 +363,7 @@ public void process(final Record<Object, Object> record) {
public void testTopologyLevelClassCastExceptionDirect() {
final Metrics metrics = new Metrics();
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, "test-client", "processId", "applicationId", new MockTime());
new StreamsMetricsImpl(metrics, "test-client", new MockTime());
final InternalMockProcessorContext<Object, Object> context = new InternalMockProcessorContext<>(streamsMetrics);
final ProcessorNode<Object, Object, Object, Object> node =
new ProcessorNode<>("pname", new ClassCastProcessor(), Collections.emptySet());
Expand Down Expand Up @@ -441,7 +441,7 @@ private InternalProcessorContext<Object, Object> mockInternalProcessorContext()
final InternalProcessorContext<Object, Object> internalProcessorContext = mock(InternalProcessorContext.class, withSettings().strictness(Strictness.LENIENT));

when(internalProcessorContext.taskId()).thenReturn(TASK_ID);
when(internalProcessorContext.metrics()).thenReturn(new StreamsMetricsImpl(new Metrics(), "test-client", "processId", "applicationId", new MockTime()));
when(internalProcessorContext.metrics()).thenReturn(new StreamsMetricsImpl(new Metrics(), "test-client", new MockTime()));
when(internalProcessorContext.topic()).thenReturn(TOPIC);
when(internalProcessorContext.partition()).thenReturn(PARTITION);
when(internalProcessorContext.offset()).thenReturn(OFFSET);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class RecordQueueTest {

private final Metrics metrics = new Metrics();
private final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, "mock", "processId", "applicationId", new MockTime());
new StreamsMetricsImpl(metrics, "mock", new MockTime());

final InternalMockProcessorContext<Integer, Integer> context = new InternalMockProcessorContext<>(
StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public String deserialize(final String topic, final byte[] data) {
public void shouldExposeProcessMetrics() {
final Metrics metrics = new Metrics();
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, "test-client", "processId", "applicationId", new MockTime());
new StreamsMetricsImpl(metrics, "test-client", new MockTime());
final InternalMockProcessorContext<String, String> context = new InternalMockProcessorContext<>(streamsMetrics);
final SourceNode<String, String> node =
new SourceNode<>(context.currentNode().name(), new TheDeserializer(), new TheDeserializer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public class StandbyTaskTest {

private final MockTime time = new MockTime();
private final Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG), time);
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, threadName, "processId", "applicationId", time);
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, threadName, time);

private File baseDir;
private StreamsConfig config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -987,7 +987,7 @@ private StateStore initializeStartupTasks(final TaskId taskId, final boolean cre
Mockito.when(metadata.buildSubtopology(ArgumentMatchers.any())).thenReturn(processorTopology);
Mockito.when(metadata.taskConfig(ArgumentMatchers.any())).thenReturn(topologyConfig.getTaskConfig());

directory.initializeStartupTasks(metadata, new StreamsMetricsImpl(new Metrics(), "test", "processId", "applicationId", time), new LogContext("test"));
directory.initializeStartupTasks(metadata, new StreamsMetricsImpl(new Metrics(), "test", time), new LogContext("test"));

return store;
}
Expand Down
Loading