2121import java .io .IOException ;
2222import java .io .InputStream ;
2323import java .security .NoSuchAlgorithmException ;
24+ import java .security .Provider ;
25+ import java .security .Security ;
2426import java .time .Duration ;
2527import java .util .ArrayList ;
2628import java .util .Arrays ;
3739import java .util .concurrent .atomic .AtomicLong ;
3840
3941/**
40- * A utility class for measuring NATS performance, similar to the version in go and node.
41- * The various tradeoffs to make this code act/work like the other versions, including the
42- * previous java version, make it a bit "crufty" for an example. See autobench for
43- * an example with minimal boilerplate.
42+ * A utility class for measuring NATS performance, similar to the version in go
43+ * and node. The various tradeoffs to make this code act/work like the other
44+ * versions, including the previous java version, make it a bit
45+ * "crufty" for an example. See autobench for an example with minimal
46+ * boilerplate.
4447 */
4548public class NatsBench {
4649 final BlockingQueue <Throwable > errorQueue = new LinkedBlockingQueue <Throwable >();
@@ -57,23 +60,24 @@ public class NatsBench {
5760 private final AtomicLong received = new AtomicLong ();
5861 private boolean csv = false ;
5962 private boolean stats = false ;
63+ private boolean conscrypt = false ;
6064
6165 private Thread shutdownHook ;
6266 private final AtomicBoolean shutdown = new AtomicBoolean (false );
6367
6468 private boolean secure = false ;
6569 private Benchmark bench ;
6670
67- static final String usageString =
68- " \n Usage: java NatsBench [-s server] [-tls] [-np num] [-ns num] [-n num] [-ms size] "
69- + "[-csv file] <subject> \n \n Options: \n "
70- + " -s <urls > The nats server URLs (comma-separated), use tls:// or opentls:// to require tls \n "
71- + " -np Number of concurrent publishers (1 )\n "
72- + " -ns Number of concurrent subscribers (0 )\n "
73- + " -n Number of messages to publish (100,000 )\n "
74- + " -ms Size of the message (128 )\n "
75- + " -csv Print results to stdout as csv (false)\n "
76- + " -stats Track and print out internal statistics (false)\n " ;
71+ static final String usageString = " \n Usage: java NatsBench [-s server] [-tls] [-np num] [-ns num] [-n num] [-ms size] "
72+ + "[-csv file] <subject> \n \n Options: \n "
73+ + " -s <urls> The nats server URLs (comma-separated), use tls:// or opentls:// to require tls \n "
74+ + " -np <int > Number of concurrent publishers (1) \n "
75+ + " -ns <int> Number of concurrent subscribers (0 )\n "
76+ + " -n <int> Number of messages to publish (100,000 )\n "
77+ + " -ms <int> Size of the message (128 )\n "
78+ + " -csv Print results to stdout as csv (false )\n "
79+ + " -tls Set the secure flag on the SSL context to true (false)\n "
80+ + " -stats Track and print out internal statistics (false)\n " ;
7781
7882 public NatsBench (String [] args ) throws Exception {
7983 if (args == null || args .length < 1 ) {
@@ -85,18 +89,12 @@ public NatsBench(String[] args) throws Exception {
8589
8690 public NatsBench (Properties properties ) throws NoSuchAlgorithmException {
8791 urls = properties .getProperty ("bench.nats.servers" , urls );
88- secure = Boolean .parseBoolean (
89- properties .getProperty ("bench.nats.secure" , Boolean .toString (secure )));
90- numMsgs = Integer .parseInt (
91- properties .getProperty ("bench.nats.msg.count" , Integer .toString (numMsgs )));
92- size = Integer
93- .parseInt (properties .getProperty ("bench.nats.msg.size" , Integer .toString (numSubs )));
94- numPubs = Integer
95- .parseInt (properties .getProperty ("bench.nats.pubs" , Integer .toString (numPubs )));
96- numSubs = Integer
97- .parseInt (properties .getProperty ("bench.nats.subs" , Integer .toString (numSubs )));
98- csv = Boolean .parseBoolean (
99- properties .getProperty ("bench.nats.csv" , Boolean .toString (csv )));
92+ secure = Boolean .parseBoolean (properties .getProperty ("bench.nats.secure" , Boolean .toString (secure )));
93+ numMsgs = Integer .parseInt (properties .getProperty ("bench.nats.msg.count" , Integer .toString (numMsgs )));
94+ size = Integer .parseInt (properties .getProperty ("bench.nats.msg.size" , Integer .toString (numSubs )));
95+ numPubs = Integer .parseInt (properties .getProperty ("bench.nats.pubs" , Integer .toString (numPubs )));
96+ numSubs = Integer .parseInt (properties .getProperty ("bench.nats.subs" , Integer .toString (numSubs )));
97+ csv = Boolean .parseBoolean (properties .getProperty ("bench.nats.csv" , Boolean .toString (csv )));
10098 subject = properties .getProperty ("bench.nats.subject" , NUID .nextGlobal ());
10199 }
102100
@@ -106,7 +104,7 @@ Options prepareOptions(boolean secure) throws NoSuchAlgorithmException {
106104 builder .noReconnect ();
107105 builder .connectionName ("NatsBench" );
108106 builder .servers (servers );
109- builder .errorListener (new ErrorListener (){
107+ builder .errorListener (new ErrorListener () {
110108 @ Override
111109 public void errorOccurred (Connection conn , String error ) {
112110 System .out .printf ("An error occurred %s\n " , error );
@@ -128,6 +126,30 @@ public void slowConsumerDetected(Connection conn, Consumer consumer) {
128126 builder .turnOnAdvancedStats ();
129127 }
130128
129+ /**
130+ * The conscrypt flag is provided for testing with the conscrypt jar. Using it
131+ * through reflection is deprecated but allows the library to ship without a
132+ * dependency. Using conscrypt should only require the jar plus the flag. For
133+ * example, to run after building locally and using the test cert files: java
134+ * -cp
135+ * ./build/libs/jnats-2.5.1-SNAPSHOT-examples.jar:./build/libs/jnats-2.5.1-SNAPSHOT-fat.jar:<path
136+ * to conscrypt.jar> \ -Djavax.net.ssl.keyStore=src/test/resources/keystore.jks
137+ * -Djavax.net.ssl.keyStorePassword=password \
138+ * -Djavax.net.ssl.trustStore=src/test/resources/cacerts
139+ * -Djavax.net.ssl.trustStorePassword=password \
140+ * io.nats.examples.autobench.NatsAutoBench tls://localhost:4443 med conscrypt
141+ */
142+ if (conscrypt ) {
143+ try {
144+ Provider provider = null ;
145+ provider = (Provider ) Class .forName ("org.conscrypt.OpenSSLProvider" ).newInstance ();
146+ Security .insertProviderAt (provider , 1 );
147+ } catch (Exception e ) {
148+ e .printStackTrace ();
149+ System .exit (-1 );
150+ }
151+ }
152+
131153 if (secure ) {
132154 builder .secure ();
133155 }
@@ -157,9 +179,12 @@ public void run() {
157179
158180 class SyncSubWorker extends Worker {
159181 final Phaser subReady ;
182+ private AtomicLong start ;
183+
160184 SyncSubWorker (Future <Boolean > starter , Phaser subReady , Phaser finisher , int numMsgs , int size , boolean secure ) {
161185 super (starter , finisher , numMsgs , size , secure );
162186 this .subReady = subReady ;
187+ this .start = new AtomicLong ();
163188 }
164189
165190 @ Override
@@ -180,16 +205,22 @@ public void run() {
180205 Duration timeout = Duration .ofMillis (1000 );
181206
182207 int receivedCount = 0 ;
183- long start = System .nanoTime ();
184208 while (receivedCount < numMsgs ) {
185209 if (sub .nextMessage (timeout ) != null ) {
210+ if (receivedCount == 0 ) {
211+ start .set (System .nanoTime ());
212+ }
186213 received .incrementAndGet ();
187214 receivedCount ++;
188215 }
189216 }
190217 long end = System .nanoTime ();
191218
192- bench .addSubSample (new Sample (numMsgs , size , start , end , nc .getStatistics ()));
219+ if (start .get () > 0 ) {
220+ bench .addSubSample (new Sample (numMsgs , size , start .get (), end , nc .getStatistics ()));
221+ } else {
222+ throw new Exception ("start time was never set" );
223+ }
193224
194225 if (stats ) {
195226 System .out .println (nc .getStatistics ());
@@ -269,11 +300,13 @@ public void start() throws Exception {
269300 System .out .println ("Use ctrl-C to cancel." );
270301 System .out .println ();
271302
272- if (this .numPubs > 0 ) {
303+ if (this .numPubs > 0 && this . numSubs > 0 ) {
273304 runTest ("Pub Only" , this .numPubs , 0 );
274305 runTest ("Pub/Sub" , this .numPubs , this .numSubs );
306+ } else if (this .numPubs > 0 ) {
307+ runTest ("Pub Only" , this .numPubs , 0 );
275308 } else {
276- runTest ("Sub" , this . numPubs , this .numSubs );
309+ runTest ("Sub Only " , 0 , this .numSubs );
277310 }
278311
279312 System .out .println ();
@@ -394,7 +427,6 @@ private void parseArgs(String[] args) {
394427 String arg = it .next ();
395428 switch (arg ) {
396429 case "-s" :
397- case "--server" :
398430 if (!it .hasNext ()) {
399431 usage ();
400432 }
@@ -403,9 +435,6 @@ private void parseArgs(String[] args) {
403435 it .remove ();
404436 continue ;
405437 case "-tls" :
406- if (!it .hasNext ()) {
407- usage ();
408- }
409438 it .remove ();
410439 secure = true ;
411440 continue ;
@@ -443,19 +472,17 @@ private void parseArgs(String[] args) {
443472 it .remove ();
444473 continue ;
445474 case "-csv" :
446- if (it .hasNext ()) {
447- usage ();
448- }
449475 it .remove ();
450476 csv = true ;
451477 continue ;
452478 case "-stats" :
453- if (it .hasNext ()) {
454- usage ();
455- }
456479 it .remove ();
457480 stats = true ;
458481 continue ;
482+ case "-conscrypt" :
483+ it .remove ();
484+ conscrypt = true ;
485+ continue ;
459486 default :
460487 System .err .printf ("Unexpected token: '%s'\n " , arg );
461488 usage ();
0 commit comments