Skip to content

Commit 1e95d04

Browse files
authored
KAFKA-19747: Update ClientTelemetryReporter telemetry push error handling (#20661)
When a failure occurs with a push telemetry request, any exception is treated as fatal, increasing the time interval to `Integer.MAX_VALUE` effectively turning telemetry off. This PR updates the error handling to check if the exception is a transient one with expected recovery and keeps the telemetry interval value the same in those cases since a recovery is expected. Reviewers: Apoorv Mittal <[email protected]>, Matthias Sax<[email protected]>
1 parent 6ef10ec commit 1e95d04

File tree

3 files changed

+172
-3
lines changed

3 files changed

+172
-3
lines changed

checkstyle/suppressions.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@
8888
files="ClientUtils.java"/>
8989

9090
<suppress checks="ClassDataAbstractionCoupling"
91-
files="(KafkaConsumer|ConsumerCoordinator|AbstractFetch|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaNetworkChannelTest).java"/>
91+
files="(KafkaConsumer|ConsumerCoordinator|AbstractFetch|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaNetworkChannelTest|ClientTelemetryReporterTest).java"/>
9292
<suppress checks="ClassDataAbstractionCoupling"
9393
files="(Errors|SaslAuthenticatorTest|AgentTest|CoordinatorTest|NetworkClientTest).java"/>
9494

clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.kafka.common.KafkaException;
2121
import org.apache.kafka.common.Uuid;
2222
import org.apache.kafka.common.errors.InterruptException;
23+
import org.apache.kafka.common.errors.RetriableException;
2324
import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
2425
import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
2526
import org.apache.kafka.common.message.PushTelemetryRequestData;
@@ -528,13 +529,13 @@ public void handleResponse(PushTelemetryResponse response) {
528529
@Override
529530
public void handleFailedGetTelemetrySubscriptionsRequest(KafkaException maybeFatalException) {
530531
log.debug("The broker generated an error for the get telemetry network API request", maybeFatalException);
531-
handleFailedRequest(maybeFatalException != null);
532+
handleFailedRequest(isRetryable(maybeFatalException));
532533
}
533534

534535
@Override
535536
public void handleFailedPushTelemetryRequest(KafkaException maybeFatalException) {
536537
log.debug("The broker generated an error for the push telemetry network API request", maybeFatalException);
537-
handleFailedRequest(maybeFatalException != null);
538+
handleFailedRequest(isRetryable(maybeFatalException));
538539
}
539540

540541
@Override
@@ -628,6 +629,12 @@ public void initiateClose() {
628629
}
629630
}
630631

632+
private boolean isRetryable(final KafkaException maybeFatalException) {
633+
return maybeFatalException == null ||
634+
(maybeFatalException instanceof RetriableException) ||
635+
(maybeFatalException.getCause() != null && maybeFatalException.getCause() instanceof RetriableException);
636+
}
637+
631638
private Optional<Builder<?>> createSubscriptionRequest(ClientTelemetrySubscription localSubscription) {
632639
/*
633640
If we've previously retrieved a subscription, it will contain the client instance ID

clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@
2121
import org.apache.kafka.clients.producer.ProducerConfig;
2222
import org.apache.kafka.common.KafkaException;
2323
import org.apache.kafka.common.Uuid;
24+
import org.apache.kafka.common.errors.AuthorizationException;
25+
import org.apache.kafka.common.errors.DisconnectException;
26+
import org.apache.kafka.common.errors.NetworkException;
27+
import org.apache.kafka.common.errors.TimeoutException;
28+
import org.apache.kafka.common.errors.UnsupportedVersionException;
2429
import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
2530
import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
2631
import org.apache.kafka.common.message.PushTelemetryRequestData;
@@ -901,6 +906,163 @@ public void testTelemetryReporterInitiateCloseAlreadyInTerminatedStates() {
901906
.telemetrySender()).state());
902907
}
903908

909+
@Test
910+
public void testHandleFailedGetTelemetrySubscriptionsRequestWithRetriableException() {
911+
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
912+
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
913+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
914+
915+
KafkaException retriableException = new TimeoutException("Request timed out");
916+
telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(retriableException);
917+
918+
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
919+
assertEquals(ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS, telemetrySender.intervalMs());
920+
assertTrue(telemetrySender.enabled());
921+
}
922+
923+
@Test
924+
public void testHandleFailedGetTelemetrySubscriptionsRequestWithWrappedRetriableException() {
925+
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
926+
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
927+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
928+
929+
KafkaException wrappedException = new KafkaException(new DisconnectException("Connection lost"));
930+
telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(wrappedException);
931+
932+
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
933+
assertEquals(ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS, telemetrySender.intervalMs());
934+
assertTrue(telemetrySender.enabled());
935+
}
936+
937+
@Test
938+
public void testHandleFailedGetTelemetrySubscriptionsRequestWithFatalException() {
939+
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
940+
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
941+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
942+
943+
KafkaException fatalException = new AuthorizationException("Not authorized for telemetry");
944+
telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(fatalException);
945+
946+
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
947+
assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs());
948+
assertFalse(telemetrySender.enabled());
949+
}
950+
951+
@Test
952+
public void testHandleFailedGetTelemetrySubscriptionsRequestWithWrappedFatalException() {
953+
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
954+
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
955+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
956+
957+
KafkaException wrappedException = new KafkaException("Version check failed",
958+
new UnsupportedVersionException("Broker doesn't support telemetry"));
959+
telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(wrappedException);
960+
961+
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
962+
assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs());
963+
assertFalse(telemetrySender.enabled());
964+
}
965+
966+
@Test
967+
public void testHandleFailedPushTelemetryRequestWithRetriableException() {
968+
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
969+
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
970+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
971+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
972+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
973+
974+
KafkaException networkException = new NetworkException("Network failure");
975+
telemetrySender.handleFailedPushTelemetryRequest(networkException);
976+
977+
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
978+
assertEquals(ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS, telemetrySender.intervalMs());
979+
assertTrue(telemetrySender.enabled());
980+
}
981+
982+
@Test
983+
public void testHandleFailedPushTelemetryRequestWithFatalException() {
984+
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
985+
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
986+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
987+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
988+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
989+
990+
KafkaException authException = new AuthorizationException("Not authorized to push telemetry");
991+
telemetrySender.handleFailedPushTelemetryRequest(authException);
992+
993+
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
994+
assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs());
995+
assertFalse(telemetrySender.enabled());
996+
}
997+
998+
@Test
999+
public void testHandleFailedRequestWithMultipleRetriableExceptionsInChain() {
1000+
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
1001+
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
1002+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
1003+
1004+
KafkaException chainedException = new TimeoutException("Outer timeout",
1005+
new DisconnectException("Inner disconnect"));
1006+
telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(chainedException);
1007+
1008+
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
1009+
assertEquals(ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS, telemetrySender.intervalMs());
1010+
assertTrue(telemetrySender.enabled());
1011+
}
1012+
1013+
@Test
1014+
public void testHandleFailedRequestWithGenericKafkaException() {
1015+
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
1016+
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
1017+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
1018+
1019+
KafkaException genericException = new KafkaException("Unknown error");
1020+
telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(genericException);
1021+
1022+
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
1023+
assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs());
1024+
assertFalse(telemetrySender.enabled());
1025+
}
1026+
1027+
@Test
1028+
public void testHandleFailedRequestDuringTermination() {
1029+
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
1030+
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
1031+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
1032+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
1033+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.TERMINATING_PUSH_NEEDED));
1034+
1035+
KafkaException exception = new TimeoutException("Timeout");
1036+
telemetrySender.handleFailedPushTelemetryRequest(exception);
1037+
1038+
assertEquals(ClientTelemetryState.TERMINATING_PUSH_NEEDED, telemetrySender.state());
1039+
assertTrue(telemetrySender.enabled());
1040+
}
1041+
1042+
@Test
1043+
public void testSequentialFailuresWithDifferentExceptionTypes() {
1044+
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
1045+
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
1046+
1047+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
1048+
telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(
1049+
new TimeoutException("Timeout 1"));
1050+
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
1051+
assertTrue(telemetrySender.enabled());
1052+
1053+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
1054+
telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(
1055+
new DisconnectException("Disconnect"));
1056+
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
1057+
assertTrue(telemetrySender.enabled());
1058+
1059+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
1060+
telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(
1061+
new UnsupportedVersionException("Version not supported"));
1062+
assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
1063+
assertFalse(telemetrySender.enabled());
1064+
}
1065+
9041066
@AfterEach
9051067
public void tearDown() {
9061068
clientTelemetryReporter.close();

0 commit comments

Comments
 (0)