Skip to content

Commit 52c0cfa

Browse files
committed
add builder pattern to consumer and producer
update readme
1 parent 5f57ef6 commit 52c0cfa

File tree

4 files changed

+49
-82
lines changed

4 files changed

+49
-82
lines changed

LICENSE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
The MIT License (MIT)
22

33
Copyright (c) 2013 Dustin Norlander
4+
Copyright (c) 2015 Peter Nimmervoll
45

56
Permission is hereby granted, free of charge, to any person obtaining a copy
67
of this software and associated documentation files (the "Software"), to deal

README.md

Lines changed: 7 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,11 @@
11
## JavaNSQClient
22

3-
A fast netty-based java client for [NSQ][nsq]
4-
5-
## Notes:
6-
7-
X connection per Consumer and X connections per Producer where X is the nummer of how many nsqds the user
8-
wants to connect.
3+
A fast netty-based Java8 client for [NSQ][nsq]
4+
heavily forked of TrendrrNSQClient
95

106
## TODO:
117
auth
8+
ssl
129
snappy
1310
....
1411

@@ -17,77 +14,26 @@ snappy
1714
Example usage:
1815

1916
```
20-
NSQLookup lookup = new NSQLookupDynMapImpl();
17+
NSQLookup lookup = new NSQLookup();
2118
lookup.addAddr("localhost", 4161);
22-
NSQConsumer consumer = new NSQConsumer(lookup, "speedtest", "dustin", new NSQMessageCallback() {
23-
24-
@Override
25-
public void message(NSQMessage message) {
19+
NSQConsumer consumer = new NSQConsumer(lookup, "speedtest", "dustin", (message) -> {
2620
System.out.println("received: " + message);
2721
//now mark the message as finished.
2822
message.finished();
2923
3024
//or you could requeue it, which indicates a failure and puts it back on the queue.
3125
//message.requeue();
32-
}
33-
@Override
34-
public void error(Exception x) {
35-
//handle errors
36-
log.warn("Caught", x);
37-
}
3826
});
3927
4028
consumer.start();
4129
```
4230

43-
4431
## Producer
4532

4633
Example usage:
4734

4835
```
49-
NSQProducer producer = new NSQProducer().addAddress("localhost", 4150, 1);
50-
producer.start();
51-
for (int i=0; i < 50000; i++) {
52-
producer.produce("speedtest", ("this is a message" + i).getBytes());
53-
}
36+
NSQProducer producer = new NSQProducer().addAddress("localhost", 4150).start();
37+
producer.produce("TestTopic", ("this is a message").getBytes());
5438
```
5539

56-
The producer also has a Batch collector that will collect messages until some threshold is reached (currently maxbytes or maxmessages) then send as a MPUB request. This gives much greater throughput then producing messages one at a time.
57-
58-
```
59-
producer.configureBatch("speedtest",
60-
new BatchCallback() {
61-
@Override
62-
public void batchSuccess(String topic, int num) {
63-
}
64-
@Override
65-
public void batchError(Exception ex, String topic, List<byte[]> messages) {
66-
ex.printStackTrace();
67-
}
68-
},
69-
batchsize,
70-
null, //use default maxbytes
71-
null //use default max seconds
72-
);
73-
74-
producer.start();
75-
for (int i=0; i < iterations; i++) {
76-
producer.produceBatch("speedtest", ("this is a message" + i).getBytes());
77-
}
78-
```
79-
80-
81-
## Dependancies
82-
83-
* [netty][netty]
84-
* [slf4j][slf4j]
85-
* [trendrr-oss][trendrr-oss]
86-
87-
Note: the trendrr-oss dependancy can easily be swapped out by implementing the com.trendrr.nsq.NSQLookup interface using a different json parser
88-
89-
90-
[nsq]: https://github.com/bitly/nsq
91-
[netty]: http://netty.io/
92-
[slf4j]: http://www.slf4j.org/
93-
[trendrr-oss]: https://github.com/trendrr/java-oss-lib

src/main/java/io/nsq/NSQConsumer.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public NSQConsumer(NSQLookup lookup, String topic, String channel, NSQMessageCal
4949
this.errorCallback = errCallback;
5050
}
5151

52-
public void start() {
52+
public NSQConsumer start() {
5353
if (!started) {
5454
started = true;
5555
//connect once otherwise we might have to wait one lookupPeriod
@@ -62,6 +62,7 @@ public void run() {
6262
}
6363
}, lookupPeriod, lookupPeriod);
6464
}
65+
return this;
6566
}
6667

6768
private Connection createConnection(ServerAddress serverAddress) {
@@ -102,14 +103,18 @@ private void cleanClose() {
102103
}
103104
}
104105

105-
public void setMessagesPerBatch(int messagesPerBatch) {
106-
this.messagesPerBatch = messagesPerBatch;
106+
public NSQConsumer setMessagesPerBatch(int messagesPerBatch) {
107+
if (!started) {
108+
this.messagesPerBatch = messagesPerBatch;
109+
}
110+
return this;
107111
}
108112

109-
public void setLookupPeriod(long periodMillis) {
113+
public NSQConsumer setLookupPeriod(long periodMillis) {
110114
if (!started) {
111115
this.lookupPeriod = periodMillis;
112116
}
117+
return this;
113118
}
114119

115120

@@ -140,10 +145,11 @@ private void connect() {
140145
*
141146
* @param executor
142147
*/
143-
public void setExecutor(ExecutorService executor) {
148+
public NSQConsumer setExecutor(ExecutorService executor) {
144149
if (!started) {
145150
this.executor = executor;
146151
}
152+
return this;
147153
}
148154

149155
private Set<ServerAddress> lookupAddresses() {

src/main/java/io/nsq/NSQProducer.java

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,12 @@ public class NSQProducer {
3232
*/
3333
private int connectionRetries = 5;
3434

35-
public void start() {
35+
public NSQProducer start() {
3636
if (!started) {
3737
createPool();
3838
started = true;
3939
}
40+
return this;
4041
}
4142

4243
private void createPool() {
@@ -70,6 +71,9 @@ protected Connection getConnection() throws NoConnectionsException {
7071
* produce multiple messages.
7172
*/
7273
public void produceMulti(String topic, List<byte[]> messages) throws TimeoutException, NSQException {
74+
if (!started) {
75+
throw new IllegalStateException("Producer must be started before producing messages!");
76+
}
7377
if (messages == null || messages.isEmpty()) {
7478
return;
7579
}
@@ -81,24 +85,30 @@ public void produceMulti(String topic, List<byte[]> messages) throws TimeoutExce
8185
}
8286

8387
Connection c = this.getConnection();
84-
85-
NSQCommand command = NSQCommand.instance("MPUB " + topic);
86-
command.setData(messages);
88+
try {
89+
NSQCommand command = NSQCommand.instance("MPUB " + topic);
90+
command.setData(messages);
8791

8892

89-
NSQFrame frame = c.commandAndWait(command);
90-
if (frame instanceof ErrorFrame) {
91-
String err = ((ErrorFrame) frame).getErrorMessage();
92-
if (err.startsWith("E_BAD_TOPIC")) {
93-
throw new BadTopicException(err);
94-
}
95-
if (err.startsWith("E_BAD_MESSAGE")) {
96-
throw new BadMessageException(err);
93+
NSQFrame frame = c.commandAndWait(command);
94+
if (frame instanceof ErrorFrame) {
95+
String err = ((ErrorFrame) frame).getErrorMessage();
96+
if (err.startsWith("E_BAD_TOPIC")) {
97+
throw new BadTopicException(err);
98+
}
99+
if (err.startsWith("E_BAD_MESSAGE")) {
100+
throw new BadMessageException(err);
101+
}
97102
}
103+
} finally {
104+
pool.returnObject(c.getServerAddress(), c);
98105
}
99106
}
100107

101108
public void produce(String topic, byte[] message) throws NSQException, TimeoutException {
109+
if (!started) {
110+
throw new IllegalStateException("Producer must be started before producing messages!");
111+
}
102112
Connection c = getConnection();
103113
try {
104114
NSQCommand command = NSQCommand.instance("PUB " + topic, message);
@@ -117,18 +127,21 @@ public void produce(String topic, byte[] message) throws NSQException, TimeoutEx
117127
}
118128
}
119129

120-
public void addAddress(String host, int port) {
130+
public NSQProducer addAddress(String host, int port) {
121131
addresses.add(new ServerAddress(host, port));
132+
return this;
122133
}
123134

124-
public void removeAddress(String host, int port) {
135+
public NSQProducer removeAddress(String host, int port) {
125136
addresses.remove(new ServerAddress(host, port));
137+
return this;
126138
}
127139

128-
public void setPoolConfig(GenericKeyedObjectPoolConfig poolConfig) {
140+
public NSQProducer setPoolConfig(GenericKeyedObjectPoolConfig poolConfig) {
129141
if (!started) {
130142
this.poolConfig = poolConfig;
131143
}
144+
return this;
132145
}
133146

134147
/**
@@ -138,10 +151,11 @@ public void setPoolConfig(GenericKeyedObjectPoolConfig poolConfig) {
138151
*
139152
* @param executor
140153
*/
141-
public void setExecutor(ExecutorService executor) {
154+
public NSQProducer setExecutor(ExecutorService executor) {
142155
if (!started) {
143156
this.executor = executor;
144157
}
158+
return this;
145159
}
146160

147161
protected ExecutorService getExecutor() {

0 commit comments

Comments
 (0)