Skip to content

Commit f88b650

Browse files
committed
Initial commit
0 parents  commit f88b650

16 files changed

+1366
-0
lines changed

.gitignore

+35
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
target/
2+
!.mvn/wrapper/maven-wrapper.jar
3+
!**/src/main/**/target/
4+
!**/src/test/**/target/
5+
6+
### IntelliJ IDEA ###
7+
.idea
8+
*.iws
9+
*.iml
10+
*.ipr
11+
12+
### Eclipse ###
13+
.apt_generated
14+
.classpath
15+
.factorypath
16+
.project
17+
.settings
18+
.springBeans
19+
.sts4-cache
20+
21+
### NetBeans ###
22+
/nbproject/private/
23+
/nbbuild/
24+
/dist/
25+
/nbdist/
26+
/.nb-gradle/
27+
build/
28+
!**/src/main/**/build/
29+
!**/src/test/**/build/
30+
31+
### VS Code ###
32+
.vscode/
33+
34+
### Mac OS ###
35+
.DS_Store

LICENSE

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
MIT License
2+
3+
Copyright (c) 2023 Maxim Fenixov <Feniksovich>
4+
5+
Permission is hereby granted, free of charge, to any person obtaining a copy
6+
of this software and associated documentation files (the "Software"), to deal
7+
in the Software without restriction, including without limitation the rights
8+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
copies of the Software, and to permit persons to whom the Software is
10+
furnished to do so, subject to the following conditions:
11+
12+
The above copyright notice and this permission notice shall be included in all
13+
copies or substantial portions of the Software.
14+
15+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
SOFTWARE.

README.md

+260
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
![header](https://i.imgur.com/9tjhHQG.png)
2+
3+
**Redis Extended PubSub** (`RxPubSub` for short) is the ultimate tool for utilizing the [Redis Pub/Sub](https://redis.io/docs/interact/pubsub/) feature in Java applications. It offers elegant methods for publishing and receiving messages, implementing callbacks and more!
4+
5+
# Getting Started
6+
**Maven**
7+
```xml
8+
<dependency>
9+
<groupId>io.github.feniksovich</groupId>
10+
<artifactId>rxpubsub</artifactId>
11+
<version>1.0.0</version>
12+
</dependency>
13+
```
14+
**Gradle**
15+
```gradle
16+
dependencies {
17+
implementation 'io.github.feniksovich:rxpubsub:1.0.0'
18+
}
19+
```
20+
21+
# Quick Reference
22+
## Pub/Sub Message
23+
### Constructing Message
24+
Pub/Sub messages are represented by its own Java classes now. Every Java class that represents your Pub/Sub message must extend `PubSubMessage` abstract class.
25+
**For example:**
26+
27+
```java
28+
public class MyPubSubMessage extends PubSubMessage {
29+
30+
private String payload;
31+
32+
public MyPubSubMessage(String payload) {
33+
this.payload = payload;
34+
}
35+
36+
public String getPayload() {
37+
return payload;
38+
}
39+
40+
public MyPubSubMessage setPayload(String payload) {
41+
this.payload = payload;
42+
return this;
43+
}
44+
45+
// etc...
46+
}
47+
```
48+
49+
### Registering Message
50+
To receive and publish a message class must be registered with `MessagingService#registerMessageClass(PubSubMessage)`.
51+
It's required in order to generate internal classes identifiers to deserialize messages properly when received.
52+
53+
## Pub/Sub Channel
54+
### Constructing Channel
55+
Pub/Sub channels now consist of two identifiers called `namespace` & `name` and construct with `PubSubChannel#from(String, String)` method:
56+
57+
```java
58+
public class MyApp {
59+
public static final PubSubChannel channel = PubSubChannel.from("app", "requests");
60+
}
61+
```
62+
Internally in Redis these channels are represented as a string in the `namespace:name` format.
63+
64+
### Registering Channel
65+
There are two channels directions exist:
66+
- **Incoming** – you will receive messages on this channel, registers with `MessagingService#registerIncomingChannel(PubSubChannel)`.
67+
- **Outgoing** – you will publish messages on this channel, registers with `MessagingService#registerOutgoingChannel(PubSubChannel)`.
68+
69+
A channel that is receiving (incoming) and publishing (outgoing) messages is called a **duplex** channel and registers with `MessagingService#registerDuplexChannel(PubSubChannel)`.
70+
This method is actually calls two methods described above.
71+
72+
## Messaging Service
73+
`MessagingService` is a library entrypoint that provides all required methods to register components and publish messages. It uses own `MessagingServiceConfig` to construct that actually encapsulates authentication credentials and other settings such as queries execution timeout.
74+
75+
```java
76+
import io.github.feniksovich.rxpubsub.MessagingService;
77+
import io.github.feniksovich.rxpubsub.misc.MessagingServiceConfig;
78+
79+
public class MyApp {
80+
81+
public static final PubSubChannel channel = PubSubChannel.from("app", "requests");
82+
83+
public void runExample() {
84+
MessagingServiceConfig config = new MessagingServiceConfig(
85+
"localhost", 6379, "default", "password", 2000
86+
);
87+
MessagingService messagingService = new MessagingService(config);
88+
}
89+
}
90+
```
91+
92+
## Reception Listeners
93+
There are two types of listeners exist.
94+
95+
### Simple Listener
96+
Just listens for the pub/sub message reception and runs something actions.
97+
98+
```java
99+
import io.github.feniksovich.rxpubsub.MessagingService;
100+
import io.github.feniksovich.rxpubsub.misc.MessagingServiceConfig;
101+
102+
public class MyApp {
103+
104+
public static final PubSubChannel channel = PubSubChannel.from("app", "notifications");
105+
106+
public void runExample() {
107+
MessagingServiceConfig config = new MessagingServiceConfig(
108+
"localhost", 6379, "default", "password", 2000
109+
);
110+
111+
MessagingService messagingService = new MessagingService(config)
112+
.registerIncomingChannel(channel)
113+
.registerMessageClass(MyPubSubMessage.class);
114+
115+
messagingService.getEventBus().registerReceiptListener(MyPubSubMessage.class, this::onMessage);
116+
}
117+
118+
private void onMessage(MyPubSubMessage message) {
119+
System.out.println("Received message: " + message);
120+
}
121+
}
122+
```
123+
124+
### Responding Listener
125+
Like a simple one listens for pub/sub message reception, runs something actions and **returns a new `PubSubMessage` as a result and publishes it**.
126+
It should be used when the received message describes some kind of request and is sent in a special way (see "Publish and wait for response" section below).
127+
128+
```java
129+
import io.github.feniksovich.rxpubsub.MessagingService;
130+
import io.github.feniksovich.rxpubsub.misc.MessagingServiceConfig;
131+
import io.github.feniksovich.rxpubsub.model.PubSubMessage;
132+
133+
public class MyApp {
134+
135+
public static final PubSubChannel channel = PubSubChannel.from("app", "math");
136+
137+
public void runExample() {
138+
MessagingServiceConfig config = new MessagingServiceConfig(
139+
"localhost", 6379, "default", "password", 2000
140+
);
141+
142+
MessagingService messagingService = new MessagingService(config)
143+
.registerDuplexChannel(channel) // register as duplex to respond
144+
.registerMessageClass(MyPubSubRequest.class)
145+
.registerMessageClass(MyPubSubCallback.class);
146+
147+
messagingService.getEventBus().registerRespondingListener(MyPubSubMessage.class, channel, this::onRequest);
148+
}
149+
150+
private PubSubMessage onRequest(MyPubSubRequest request) {
151+
int result = request.getA() + request.getB();
152+
return new MyPubSubCallback().setResult(result);
153+
}
154+
}
155+
```
156+
157+
## Publishing Messages
158+
159+
### Simple Publish
160+
We can publish pub/sub message in a channel as usual.
161+
162+
```java
163+
import io.github.feniksovich.rxpubsub.MessagingService;
164+
import io.github.feniksovich.rxpubsub.misc.MessagingServiceConfig;
165+
166+
public class MyApp {
167+
168+
public static final PubSubChannel channel = PubSubChannel.from("app", "messages");
169+
170+
public void runExample() {
171+
MessagingServiceConfig config = new MessagingServiceConfig(
172+
"localhost", 6379, "default", "password", 2000
173+
);
174+
175+
MessagingService messagingService = new MessagingService(config)
176+
.registerOutgoingChannel(channel)
177+
.registerMessageClass(MyPubSubMessage.class);
178+
179+
MyPubSubMessage message = new MyPubSubMessage().setPayload("Hello!");
180+
messagingService.publishMessage(channel, message);
181+
}
182+
}
183+
```
184+
185+
### Publish & Wait for Response
186+
Let's imagine we need to get information from another application via Pub/Sub messages. In this case we can publish a message and wait for a response to it!
187+
188+
```java
189+
import io.github.feniksovich.rxpubsub.MessagingService;
190+
import io.github.feniksovich.rxpubsub.model.CallbackHandler;
191+
import io.github.feniksovich.rxpubsub.misc.MessagingServiceConfig;
192+
193+
import java.util.concurrent.TimeUnit;
194+
195+
public class MyApp {
196+
197+
public static final PubSubChannel channel = PubSubChannel.from("app", "math");
198+
199+
public void runExample() {
200+
MessagingServiceConfig config = new MessagingServiceConfig(
201+
"localhost", 6379, "default", "password", 2000, 2000
202+
);
203+
204+
MessagingService messagingService = new MessagingService(config)
205+
.registerDuplexChannel(channel) // register as duplex to receive response
206+
.registerMessageClass(MyPubSubRequest.class)
207+
.registerMessageClass(MyPubSubCallback.class);
208+
209+
MyPubSubMessage message = new MyPubSubRequest().setA(2).setB(2);
210+
messagingService.publishMessage(channel, message, new CallbackHandler<>(MyPubSubCallback.class)
211+
.handle(message -> System.out.println(message.getResult()))
212+
.handleError(throwable -> System.out.println("Publish error occurred: " + throwable.getMessage()))
213+
.setTimeout(2, TimeUnit.SECONDS)
214+
.handleTimeout(() -> System.out.println("Timeout!"))
215+
);
216+
}
217+
}
218+
```
219+
> **Note**
220+
> A message is identified as a response by the presence of the reserved field `@rxps_response_to` in the string representation of the message that contains source message ID.
221+
> The best way to handle such requests and send response on it is use **Responding Listeners**. If it's not possible (for example, if you need asynchronous calculations to respond), you can manually specify the ID of the message to which you are responding using the `PubSubMessage#setResponseTo(String)` method.
222+
223+
# Advanced
224+
## Message Signature
225+
`RxPubSub` injects their own fields and values in the pub/sub JSON representation during message serialization or publishing. These fields always has `@rxps` prefix:
226+
- `@rxps_message_id` – includes during message object serialization and provides a unique ID (UUID v4) of the message that generates automatically on message object creation.
227+
- `@rxps_class_id` – includes before message publishing and provides the class ID to use to deserialize JSON message.
228+
- `@rxps_respond_to` *(optional)* – includes during message object serialization, if present, and provides source message ID that this message responds to. It assigns automatically if message publishes by **Responding Listener**.
229+
230+
```
231+
{
232+
"@rxps_message_id": "60b14a18-b251-4ba5-9926-3fc7b50bd928",
233+
"@rxps_class_id": "MyPubSubMessage",
234+
"@rxps_respond_to": "5bfcbdb6-05ce-4242-9e50-65a63f5c74ad",
235+
// your custom fields
236+
}
237+
```
238+
239+
## Class ID
240+
Class ID (value of `@rxps_class_id` field) is the [simple name](https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/lang/Class.html#getSimpleName()) of the class by default. It's also allowed override class ID by providing it with `@OverridePubSubClassId` annotation in the message class.
241+
242+
```java
243+
import annotations.io.github.feniksovich.rxpubsub.OverridePubSubClassId;
244+
import io.github.feniksovich.rxpubsub.model.PubSubMessage;
245+
246+
@OverridePubSubClassId("CustomClassIdOfMyPubSubMessage")
247+
public class MyPubSubMessage extends PubSubMessage {
248+
// fields, getters & setters, etc...
249+
}
250+
```
251+
Keep in mind that a class represents the same pub/sub message must have the same identifier across all your applications that use `RxPubSub`!
252+
253+
# Third-party Libraries
254+
- [Lettuce](https://github.com/lettuce-io/lettuce-core) – Java Redis client implementation with stateful connections.
255+
- [Gson](https://github.com/google/gson) – uses to serialize/deserialize message to operate with native Redis Pub/Sub.
256+
- [Guava](https://github.com/google/guava) – preconditions is so neat!
257+
258+
# License
259+
This package includes software licensed under the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0).
260+
<br>RxPubSub released under the [MIT License](LICENSE), enjoy!

0 commit comments

Comments
 (0)