|
34 | 34 | * @since 3.3.0
|
35 | 35 | */
|
36 | 36 | public class RecoveryAwareChannelN extends ChannelN {
|
37 |
| - private long maxSeenDeliveryTag = 0; |
38 |
| - private long activeDeliveryTagOffset = 0; |
| 37 | + private volatile long maxSeenDeliveryTag = 0; |
| 38 | + private volatile long activeDeliveryTagOffset = 0; |
39 | 39 |
|
40 | 40 | /**
|
41 | 41 | * Construct a new channel on the given connection with the given
|
@@ -83,21 +83,19 @@ private AMQImpl.Basic.Deliver offsetDeliveryTag(AMQImpl.Basic.Deliver method) {
|
83 | 83 |
|
84 | 84 | @Override
|
85 | 85 | public void basicAck(long deliveryTag, boolean multiple) throws IOException {
|
86 |
| - // FIXME no check if deliveryTag = 0 (ack all) |
87 | 86 | long realTag = deliveryTag - activeDeliveryTagOffset;
|
88 |
| - // 0 tag means ack all |
89 |
| - if (realTag >= 0) { |
| 87 | + // 0 tag means ack all when multiple is set |
| 88 | + if (realTag > 0 || (multiple && realTag == 0)) { |
90 | 89 | transmit(new Basic.Ack(realTag, multiple));
|
91 | 90 | metricsCollector.basicAck(this, deliveryTag, multiple);
|
92 | 91 | }
|
93 | 92 | }
|
94 | 93 |
|
95 | 94 | @Override
|
96 | 95 | public void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException {
|
97 |
| - // FIXME no check if deliveryTag = 0 (nack all) |
98 | 96 | long realTag = deliveryTag - activeDeliveryTagOffset;
|
99 |
| - // 0 tag means nack all |
100 |
| - if (realTag >= 0) { |
| 97 | + // 0 tag means nack all when multiple is set |
| 98 | + if (realTag > 0 || (multiple && realTag == 0)) { |
101 | 99 | transmit(new Basic.Nack(realTag, multiple, requeue));
|
102 | 100 | metricsCollector.basicNack(this, deliveryTag);
|
103 | 101 | }
|
|
0 commit comments