@@ -140,7 +140,8 @@ public void onError(Throwable throwable) {
140140 Objects .requireNonNull (throwable , "'throwable' is required." );
141141
142142 if (isRetryPending .get () && retryPolicy .calculateRetryDelay (throwable , retryAttempts .get ()) != null ) {
143- logger .warning ("Retry is already pending. Ignoring transient error." , throwable );
143+ logger .warning ("namespace[{}] entityPath[{}]: Retry is already pending. Ignoring transient error." ,
144+ fullyQualifiedNamespace , entityPath , throwable );
144145 return ;
145146 }
146147
@@ -182,22 +183,24 @@ public void onError(Throwable throwable) {
182183 return ;
183184 }
184185
185- logger .info ("Retry #{}. Transient error occurred. Retrying after {} ms." , attempts ,
186- retryInterval .toMillis (), throwable );
186+ logger .info ("namespace[{}] entityPath[{}]: Retry #{}. Transient error occurred. Retrying after {} ms." ,
187+ fullyQualifiedNamespace , entityPath , attempts , retryInterval .toMillis (), throwable );
187188
188189 retrySubscription = Mono .delay (retryInterval ).subscribe (i -> {
189190 if (isDisposed ()) {
190- logger .info ("Retry #{}. Not requesting from upstream. Processor is disposed." , attempts );
191+ logger .info ("namespace[{}] entityPath[{}]: Retry #{}. Not requesting from upstream. Processor is disposed." ,
192+ fullyQualifiedNamespace , entityPath , attempts );
191193 } else {
192- logger .info ("Retry #{}. Requesting from upstream." , attempts );
194+ logger .info ("namespace[{}] entityPath[{}]: Retry #{}. Requesting from upstream." ,
195+ fullyQualifiedNamespace , entityPath , attempts );
193196
194197 requestUpstream ();
195198 isRetryPending .set (false );
196199 }
197200 });
198201 } else {
199- logger .warning ("entityPath[{}] Retry #{}. Retry attempts exhausted or exception was not retriable." ,
200- entityPath , attempts , throwable );
202+ logger .warning ("namespace[{}] entityPath[{}]: Retry #{}. Retry attempts exhausted or exception was not retriable." ,
203+ fullyQualifiedNamespace , entityPath , attempts , throwable );
201204
202205 lastError = throwable ;
203206 isDisposed .set (true );
@@ -206,8 +209,8 @@ public void onError(Throwable throwable) {
206209 synchronized (lock ) {
207210 final ConcurrentLinkedDeque <ChannelSubscriber <T >> currentSubscribers = subscribers ;
208211 subscribers = new ConcurrentLinkedDeque <>();
209- logger .info ("namespace[{}] entityPath[{}]: Error in AMQP channel processor. Notifying {} "
210- + "subscribers." , fullyQualifiedNamespace , entityPath , currentSubscribers .size ());
212+ logger .info ("namespace[{}] entityPath[{}]: Error in AMQP channel processor. Notifying {} subscribers." ,
213+ fullyQualifiedNamespace , entityPath , currentSubscribers .size ());
211214
212215 currentSubscribers .forEach (subscriber -> subscriber .onError (throwable ));
213216 }
@@ -254,8 +257,8 @@ public void subscribe(CoreSubscriber<? super T> actual) {
254257 }
255258
256259 subscribers .add (subscriber );
257- logger .verbose ("Added a subscriber {} to AMQP channel processor. Total "
258- + "subscribers = {}" , subscriber , subscribers .size ());
260+ logger .verbose ("namespace[{}] entityPath[{}]: Added a subscriber {} to AMQP channel processor. Total "
261+ + "subscribers = {}" , fullyQualifiedNamespace , entityPath , subscriber , subscribers .size ());
259262
260263 if (!isRetryPending .get ()) {
261264 requestUpstream ();
@@ -350,7 +353,11 @@ private void close(T channel) {
350353 }
351354
352355 /**
353- * Represents a subscriber, waiting for an AMQP connection.
356+ * Represents the decorator-subscriber wrapping a downstream subscriber to AmqpChannelProcessor.
357+ * These are the subscribers waiting to receive a channel that is yet to be available in the AmqpChannelProcessor.
358+ * The AmqpChannelProcessor tracks a list of such waiting subscribers; once the processor receives
359+ * a result (channel, error or disposal) from it's upstream, each decorated-subscriber will be notified,
360+ * which removes itself from the tracking list, then propagates the notification to the wrapped subscriber.
354361 */
355362 private static final class ChannelSubscriber <T > extends Operators .MonoSubscriber <T , T > {
356363 private final AmqpChannelProcessor <T > processor ;
@@ -362,15 +369,16 @@ private ChannelSubscriber(CoreSubscriber<? super T> actual, AmqpChannelProcessor
362369
363370 @ Override
364371 public void cancel () {
365- super .cancel ();
366372 processor .subscribers .remove (this );
373+ super .cancel ();
367374 }
368375
369376 @ Override
370377 public void onComplete () {
371378 if (!isCancelled ()) {
372- actual . onComplete ();
379+ // first untrack before calling into external code.
373380 processor .subscribers .remove (this );
381+ actual .onComplete ();
374382 }
375383 }
376384
@@ -384,8 +392,8 @@ public void onNext(T channel) {
384392 @ Override
385393 public void onError (Throwable throwable ) {
386394 if (!isCancelled ()) {
387- actual .onError (throwable );
388395 processor .subscribers .remove (this );
396+ actual .onError (throwable );
389397 } else {
390398 Operators .onErrorDropped (throwable , currentContext ());
391399 }
0 commit comments