Skip to content

Commit cc66c06

Browse files
authored
Merge pull request #1412 from nats-io/write-queue-info
Expose connection outgoingPendingMessageCount and outgoingPendingBytes
2 parents c29bb17 + 5013efc commit cc66c06

File tree

4 files changed

+43
-2
lines changed

4 files changed

+43
-2
lines changed

src/main/java/io/nats/client/Connection.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -785,4 +785,22 @@ enum Status {
785785
*/
786786
@NonNull
787787
ObjectStoreManagement objectStoreManagement(ObjectStoreOptions options) throws IOException;
788+
789+
/**
790+
* Get the number of messages in the outgoing queue for this connection.
791+
* This value is volatile in the sense that it changes often and may be adjusted by more than one message.
792+
* It changes every time a message is published (put in the outgoing queue)
793+
* and every time a message is removed from the queue to be written over the socket
794+
* @return the number of messages in the outgoing queue
795+
*/
796+
long outgoingPendingMessageCount();
797+
798+
/**
799+
* Get the number of bytes based to be written calculated from the messages in the outgoing queue for this connection.
800+
* This value is volatile in the sense that it changes often and may be adjusted by more than one message's bytes.
801+
* It changes every time a message is published (put in the outgoing queue)
802+
* and every time a message is removed from the queue to be written over the socket
803+
* @return the number of messages in the outgoing queue
804+
*/
805+
long outgoingPendingBytes();
788806
}

src/main/java/io/nats/client/impl/MessageQueue.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ void drain() {
133133

134134
boolean isDrained() {
135135
// poison pill is not included in the length count, or the size
136-
return this.running.get() == DRAINING && this.length() == 0;
136+
return this.running.get() == DRAINING && this.length.get() == 0;
137137
}
138138

139139
boolean push(NatsMessage msg) {
@@ -326,7 +326,6 @@ NatsMessage popNow() throws InterruptedException {
326326
return pop(null);
327327
}
328328

329-
// Just for testing
330329
long length() {
331330
return this.length.get();
332331
}

src/main/java/io/nats/client/impl/NatsConnection.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2544,4 +2544,20 @@ private void ensureNotClosing() throws IOException {
25442544
throw new IOException("A JetStream context can't be established during close.");
25452545
}
25462546
}
2547+
2548+
/**
2549+
* {@inheritDoc}
2550+
*/
2551+
@Override
2552+
public long outgoingPendingMessageCount() {
2553+
return writer.outgoingPendingMessageCount();
2554+
}
2555+
2556+
/**
2557+
* {@inheritDoc}
2558+
*/
2559+
@Override
2560+
public long outgoingPendingBytes() {
2561+
return writer.outgoingPendingBytes();
2562+
}
25472563
}

src/main/java/io/nats/client/impl/NatsConnectionWriter.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,4 +254,12 @@ void flushBuffer() {
254254
writerLock.unlock();
255255
}
256256
}
257+
258+
long outgoingPendingMessageCount() {
259+
return outgoing.length();
260+
}
261+
262+
long outgoingPendingBytes() {
263+
return outgoing.sizeInBytes();
264+
}
257265
}

0 commit comments

Comments
 (0)