Skip to content

Commit

Permalink
Use separate logger for PulsarListener exceptions (#1039)
Browse files Browse the repository at this point in the history
This creates a separate logger for DefaultPulsarMessageListenerContainer
that it uses to log exceptions thrown from listener callback methods.

The exceptions are still logged at debug level in order to not change
behavior in a patch release. However, the log category used by the logger
can then be set to debug level but not spam the logs with the other
debug statements in the listener container.

Also, adds exception logging to the batch listener invocation using
the same listener error logger as the record listener invocation.

Resolves #1008
  • Loading branch information
onobc authored Feb 17, 2025
1 parent 147844b commit fe95ac4
Showing 1 changed file with 9 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ public class DefaultPulsarMessageListenerContainer<T> extends AbstractPulsarMess

private final Condition pausedCondition = this.lockOnPause.newCondition();

private final LogAccessor listenerErrorLogger = new LogAccessor(
"%s-ListenerErrors".formatted(DefaultPulsarMessageListenerContainer.class.getName()));

public DefaultPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
PulsarContainerProperties pulsarContainerProperties) {
super(pulsarConsumerFactory, pulsarContainerProperties);
Expand Down Expand Up @@ -629,7 +632,7 @@ else if (this.listener != null) {
inRetryMode.compareAndSet(true, false);
}
catch (RuntimeException e) {
DefaultPulsarMessageListenerContainer.this.logger.debug(e,
DefaultPulsarMessageListenerContainer.this.listenerErrorLogger.debug(e,
() -> "Error dispatching the message to the listener.");
if (this.pulsarConsumerErrorHandler != null) {
invokeRecordListenerErrorHandler(inRetryMode, message, e, txn);
Expand All @@ -642,9 +645,9 @@ else if (this.ackMode.equals(AckMode.BATCH)) {
this.nackableMessages.add(message.getMessageId());
}
else {
throw new IllegalStateException("Exception occurred and message %s was not auto-nacked; "
+ "switch to AckMode BATCH or RECORD to enable auto-nacks"
.formatted(message.getMessageId()),
throw new IllegalStateException(
"Exception occurred and message %".formatted(message.getMessageId())
+ "was not auto-nacked; switch to AckMode BATCH or RECORD to enable auto-nacks",
e);
}
}
Expand Down Expand Up @@ -713,6 +716,8 @@ private List<Message<T>> doInvokeBatchListener(Messages<T> messages, List<Messag
return Collections.emptyList();
}
catch (RuntimeException ex) {
DefaultPulsarMessageListenerContainer.this.listenerErrorLogger.debug(ex,
() -> "Error dispatching the messages to the batch listener.");
if (this.pulsarConsumerErrorHandler != null) {
return invokeBatchListenerErrorHandler(inRetryMode, messagesPendingInBatch, messageList, ex, txn);
}
Expand Down

0 comments on commit fe95ac4

Please sign in to comment.