1414package io .nats .client .impl ;
1515
1616import io .nats .client .Options ;
17- import io .nats .client .support .ByteArrayBuilder ;
1817
1918import java .io .IOException ;
2019import java .nio .BufferOverflowException ;
20+ import java .time .Duration ;
21+ import java .util .Arrays ;
2122import java .util .concurrent .CancellationException ;
2223import java .util .concurrent .CompletableFuture ;
2324import java .util .concurrent .ExecutionException ;
2425import java .util .concurrent .Future ;
2526import java .util .concurrent .atomic .AtomicBoolean ;
2627import java .util .concurrent .locks .ReentrantLock ;
2728
28- public class NatsConnectionWriter implements Runnable {
29+ import static io .nats .client .support .NatsConstants .OP_PING_BYTES ;
30+ import static io .nats .client .support .NatsConstants .OP_PONG_BYTES ;
2931
30- private static final int TOTAL_SLEEP = 40 ;
31- private static final int EACH_SLEEP = 4 ;
32- private static final int MAX_BEFORE_FLUSH = 10 ;
32+ class NatsConnectionWriter implements Runnable {
3333
3434 private final NatsConnection connection ;
3535
@@ -40,14 +40,11 @@ public class NatsConnectionWriter implements Runnable {
4040 private final AtomicBoolean reconnectMode ;
4141 private final ReentrantLock startStopLock ;
4242
43- private final ByteArrayBuilder regularSendBuffer ;
44- private final ByteArrayBuilder reconnectSendBuffer ;
45- private final int discardMessageCountThreshold ;
46- private final long reconnectBufferSize ;
43+ private byte [] sendBuffer ;
4744
48- private final ReentrantLock buffersAccessLock ;
49- private long regularQueuedMessageCount ;
50- private long reconnectQueuedMessageCount ;
45+ private final MessageQueue outgoing ;
46+ private final MessageQueue reconnectOutgoing ;
47+ private final long reconnectBufferSize ;
5148
5249 NatsConnectionWriter (NatsConnection connection ) {
5350 this .connection = connection ;
@@ -60,16 +57,15 @@ public class NatsConnectionWriter implements Runnable {
6057
6158 Options options = connection .getOptions ();
6259 int bufSize = options .getBufferSize ();
63- this .regularSendBuffer = new ByteArrayBuilder (bufSize );
64- this .reconnectSendBuffer = new ByteArrayBuilder (bufSize );
60+ this .sendBuffer = new byte [bufSize ];
6561
66- discardMessageCountThreshold = options . isDiscardMessagesWhenOutgoingQueueFull ()
67- ? options .getMaxMessagesInOutgoingQueue () : Integer . MAX_VALUE ;
68- reconnectBufferSize = options .getReconnectBufferSize ( );
62+ outgoing = new MessageQueue ( true ,
63+ options .getMaxMessagesInOutgoingQueue (),
64+ options .isDiscardMessagesWhenOutgoingQueueFull () );
6965
70- buffersAccessLock = new ReentrantLock ();
71- regularQueuedMessageCount = 0 ;
72- reconnectQueuedMessageCount = 0 ;
66+ // The reconnect buffer contains internal messages, and we will keep it unlimited in size
67+ reconnectOutgoing = new MessageQueue ( true , 0 ) ;
68+ reconnectBufferSize = options . getReconnectBufferSize () ;
7369 }
7470
7571 // Should only be called if the current thread has exited.
@@ -80,6 +76,8 @@ void start(Future<DataPort> dataPortFuture) {
8076 try {
8177 this .dataPortFuture = dataPortFuture ;
8278 this .running .set (true );
79+ this .outgoing .resume ();
80+ this .reconnectOutgoing .resume ();
8381 this .stopped = connection .getExecutor ().submit (this , Boolean .TRUE );
8482 } finally {
8583 this .startStopLock .unlock ();
@@ -91,107 +89,131 @@ void start(Future<DataPort> dataPortFuture) {
9189 // method does.
9290 Future <Boolean > stop () {
9391 this .running .set (false );
92+ this .startStopLock .lock ();
93+ try {
94+ this .outgoing .pause ();
95+ this .reconnectOutgoing .pause ();
96+ // Clear old ping/pong requests
97+ this .outgoing .filter ((msg ) ->
98+ Arrays .equals (OP_PING_BYTES , msg .getProtocolBytes ())
99+ || Arrays .equals (OP_PONG_BYTES , msg .getProtocolBytes ()));
100+
101+ } finally {
102+ this .startStopLock .unlock ();
103+ }
104+
94105 return this .stopped ;
95106 }
96107
108+ synchronized void sendMessageBatch (NatsMessage msg , DataPort dataPort , NatsStatistics stats )
109+ throws IOException {
110+
111+ int sendPosition = 0 ;
112+
113+ while (msg != null ) {
114+ long size = msg .getSizeInBytes ();
115+
116+ if (sendPosition + size > sendBuffer .length ) {
117+ if (sendPosition == 0 ) { // have to resize
118+ this .sendBuffer = new byte [(int )Math .max (sendBuffer .length + size , sendBuffer .length * 2L )];
119+ } else { // else send and go to next message
120+ dataPort .write (sendBuffer , sendPosition );
121+ connection .getNatsStatistics ().registerWrite (sendPosition );
122+ sendPosition = 0 ;
123+ msg = msg .next ;
124+
125+ if (msg == null ) {
126+ break ;
127+ }
128+ }
129+ }
130+
131+ byte [] bytes = msg .getProtocolBytes ();
132+ System .arraycopy (bytes , 0 , sendBuffer , sendPosition , bytes .length );
133+ sendPosition += bytes .length ;
134+
135+ sendBuffer [sendPosition ++] = '\r' ;
136+ sendBuffer [sendPosition ++] = '\n' ;
137+
138+ if (!msg .isProtocol ()) {
139+ bytes = msg .getSerializedHeader ();
140+ if (bytes != null && bytes .length > 0 ) {
141+ System .arraycopy (bytes , 0 , sendBuffer , sendPosition , bytes .length );
142+ sendPosition += bytes .length ;
143+ }
144+
145+ bytes = msg .getData (); // guaranteed to not be null
146+ if (bytes .length > 0 ) {
147+ System .arraycopy (bytes , 0 , sendBuffer , sendPosition , bytes .length );
148+ sendPosition += bytes .length ;
149+ }
150+
151+ sendBuffer [sendPosition ++] = '\r' ;
152+ sendBuffer [sendPosition ++] = '\n' ;
153+ }
154+
155+ stats .incrementOutMsgs ();
156+ stats .incrementOutBytes (size );
157+
158+ msg = msg .next ;
159+ }
160+
161+ dataPort .write (sendBuffer , sendPosition );
162+ connection .getNatsStatistics ().registerWrite (sendPosition );
163+ }
164+
97165 @ Override
98166 public void run () {
167+ Duration waitForMessage = Duration .ofMinutes (2 ); // This can be long since no one is sending
168+ Duration reconnectWait = Duration .ofMillis (1 ); // This should be short, since we are trying to get the reconnect through
169+
99170 try {
100- dataPort = dataPortFuture .get (); // Will wait for the future to complete
101- // --------------------------------------------------------------------------------
102- // NOTE
103- // --------------------------------------------------------------------------------
104- // regularQueuedMessageCount/reconnectQueuedMessageCount are volatile variables
105- // that are read in this method outside of the buffersAccessLock.lock() block.
106- // They are written to inside the _queue method, inside of a lock.
107- // Since we are reading, if we happen to miss a write, we don't care, as the loop
108- // will just check (read) those variables again soon.
109- // --------------------------------------------------------------------------------
110- int waits = 0 ;
111- while (running .get ()) {
112- boolean rmode = reconnectMode .get ();
113- long mcount = rmode ? reconnectQueuedMessageCount : regularQueuedMessageCount ;
114-
115- while (waits < TOTAL_SLEEP && mcount < MAX_BEFORE_FLUSH ) {
116- try { //noinspection BusyWait
117- Thread .sleep (EACH_SLEEP );
118- } catch (Exception ignore ) { /* don't care */ }
119- waits += EACH_SLEEP ;
171+ dataPort = this .dataPortFuture .get (); // Will wait for the future to complete
172+ NatsStatistics stats = this .connection .getNatsStatistics ();
173+ int maxAccumulate = Options .MAX_MESSAGES_IN_NETWORK_BUFFER ;
174+
175+ while (this .running .get ()) {
176+ NatsMessage msg = null ;
177+
178+ if (this .reconnectMode .get ()) {
179+ msg = this .reconnectOutgoing .accumulate (this .sendBuffer .length , maxAccumulate , reconnectWait );
180+ } else {
181+ msg = this .outgoing .accumulate (this .sendBuffer .length , maxAccumulate , waitForMessage );
120182 }
121183
122- if (mcount > 0 ) {
123- buffersAccessLock .lock ();
124- try {
125- ByteArrayBuilder bab = rmode ? reconnectSendBuffer : regularSendBuffer ;
126- int byteCount = bab .length ();
127- dataPort .write (bab .internalArray (), byteCount );
128- bab .clear ();
129- connection .getNatsStatistics ().registerWrite (byteCount );
130- if (rmode ) {
131- reconnectQueuedMessageCount = 0 ;
132- }
133- else {
134- regularQueuedMessageCount = 0 ;
135- }
136- } finally {
137- buffersAccessLock .unlock ();
138- }
184+ if (msg == null ) { // Make sure we are still running
185+ continue ;
139186 }
187+
188+ sendMessageBatch (msg , dataPort , stats );
140189 }
141190 } catch (IOException | BufferOverflowException io ) {
142- connection .handleCommunicationIssue (io );
191+ this . connection .handleCommunicationIssue (io );
143192 } catch (CancellationException | ExecutionException | InterruptedException ex ) {
144193 // Exit
145194 } finally {
146- running .set (false );
195+ this . running .set (false );
147196 }
148197 }
149198
150- void setReconnectMode (boolean reconnectMode ) {
151- this . reconnectMode .set (reconnectMode );
199+ void setReconnectMode (boolean tf ) {
200+ reconnectMode .set (tf );
152201 }
153202
154203 boolean canQueueDuringReconnect (NatsMessage msg ) {
155204 // don't over fill the send buffer while waiting to reconnect
156- return (reconnectBufferSize < 0 || (regularSendBuffer . length () + msg .getSizeInBytes ()) < reconnectBufferSize );
205+ return (reconnectBufferSize < 0 || (outgoing . sizeInBytes () + msg .getSizeInBytes ()) < reconnectBufferSize );
157206 }
158207
159208 boolean queue (NatsMessage msg ) {
160- if (regularQueuedMessageCount >= discardMessageCountThreshold ) {
161- return false ;
162- }
163- _queue (msg , regularSendBuffer );
164- return true ;
209+ return this .outgoing .push (msg );
165210 }
166211
167212 void queueInternalMessage (NatsMessage msg ) {
168- if (reconnectMode .get ()) {
169- _queue (msg , reconnectSendBuffer );
213+ if (this . reconnectMode .get ()) {
214+ this . reconnectOutgoing . push (msg );
170215 } else {
171- _queue (msg , regularSendBuffer );
172- }
173- }
174-
175- void _queue (NatsMessage msg , ByteArrayBuilder bab ) {
176-
177- buffersAccessLock .lock ();
178- try {
179- long startSize = bab .length ();
180- msg .appendSerialized (bab );
181- long added = bab .length () - startSize ;
182-
183- // it's safe check for object equality
184- if (bab == regularSendBuffer ) {
185- regularQueuedMessageCount ++;
186- }
187- else {
188- reconnectQueuedMessageCount ++;
189- }
190-
191- connection .getNatsStatistics ().incrementOutMsgsAndBytes (added );
192- }
193- finally {
194- buffersAccessLock .unlock ();
216+ this .outgoing .push (msg , true );
195217 }
196218 }
197219
0 commit comments