Skip to content

Commit 3f41101

Browse files
authored
more flow control (#471)
flow control example better document
1 parent 6ec24f9 commit 3f41101

File tree

2 files changed

+26
-5
lines changed

2 files changed

+26
-5
lines changed

src/examples/java/io/nats/examples/jetstream/NatsJsPushSubFlowControl.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,23 @@ public static void main(String[] args) {
6565
// This is configured so the subscriber ends up being considered slow
6666
JetStreamSubscription sub = js.subscribe(subject, pso);
6767
nc.flush(Duration.ofSeconds(5));
68-
sub.setPendingLimits(Consumer.DEFAULT_MAX_MESSAGES, 1024);
68+
69+
// ------------------------------------------------------------------------------------------
70+
// Flow Control limit is set using pending limits. You can mix and match pending messages
71+
// with total bytes or rely on one or the other.
72+
// ------------------------------------------------------------------------------------------
73+
// IMPORTANT!!!! THESE VALUES ARE EXAMPLE ONLY SO THE FLOW CONTROL MESSAGES
74+
// SHOWS UP EASILY IN THIS DEMO. IN REAL SYSTEMS YOU WILL TYPICALLY HAVE MANY MORE
75+
// MESSAGES AND MANY MORE BYTES AS YOUR LIMITS. FOR EXAMPLE THE DEFAULT VALUES
76+
// AS SET IN io.nats.client.Consumer ARE
77+
// public static final long DEFAULT_MAX_MESSAGES = 64 * 1024;
78+
// public static final long DEFAULT_MAX_BYTES = 64 * 1024 * 1024;
79+
// ------------------------------------------------------------------------------------------
80+
// The example first version sets pending to 500 message with 1K total bytes
81+
// The example second version (commented out) sets pending to 1 message with 500K total bytes
82+
// ------------------------------------------------------------------------------------------
83+
sub.setPendingLimits(500, 1024);
84+
// sub.setPendingLimits(1, 500000);
6985

7086
// publish more message data than the subscriber will handle
7187
byte[] data = new byte[1024];

src/test/java/io/nats/client/impl/JetStreamPushTests.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@
1313

1414
package io.nats.client.impl;
1515

16-
import io.nats.client.*;
16+
import io.nats.client.JetStream;
17+
import io.nats.client.JetStreamSubscription;
18+
import io.nats.client.Message;
19+
import io.nats.client.PushSubscribeOptions;
1720
import io.nats.client.api.ConsumerConfiguration;
1821
import io.nats.client.api.ConsumerInfo;
1922
import org.junit.jupiter.api.Test;
@@ -294,8 +297,9 @@ public void testHeartbeat() throws Exception {
294297
});
295298
}
296299

297-
@Test
298-
public void testFlowControl() throws Exception {
300+
@ParameterizedTest
301+
@ValueSource(strings = {"500,1024", "1,500000"})
302+
public void testFlowControl(String pendingLimits) throws Exception {
299303
runInJsServer(nc -> {
300304
// Create our JetStream context to receive JetStream messages.
301305
JetStream js = nc.jetStream();
@@ -312,7 +316,8 @@ public void testFlowControl() throws Exception {
312316
// This is configured so the subscriber ends up being considered slow
313317
JetStreamSubscription sub = js.subscribe(SUBJECT, pso);
314318
nc.flush(Duration.ofSeconds(5));
315-
sub.setPendingLimits(Consumer.DEFAULT_MAX_MESSAGES, 1024);
319+
String[] split = pendingLimits.split(",");
320+
sub.setPendingLimits(Integer.parseInt(split[0]), Integer.parseInt(split[1]));
316321

317322
// publish more message data than the subscriber will handle
318323
byte[] data = new byte[1024];

0 commit comments

Comments
 (0)