Skip to content

Commit f85e064

Browse files
author
Benjamin Black
committed
Initial commit
0 parents  commit f85e064

14 files changed

+751
-0
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
.idea
2+
kafka-websocket.iml
3+
target

README.md

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# kafka-websocket
2+
3+
kafka-websocket is a simple websocket server interface to the kafka distributed message broker. It supports clients
4+
subscribing to topics, including multiple topics at once, and sending messages to topics. Messages may be either text
5+
or binary, the format for each is described below.
6+
7+
A client may produce and consume messages on the same connection.
8+
9+
## Consuming from topics
10+
11+
Clients subscribe to topics by specifying them in the path used when connecting to kafka-websocket. The path is:
12+
13+
/v1/topics/{topics}
14+
15+
where topics is a comma-separated list of topic names. If no topics are given, the client will not receive messages.
16+
The format of messages sent to clients is determined by the subprotocol negotiated: kafka-text or kafka-binary. If no
17+
subprotocol is specified, kafka-text is used.
18+
19+
## Producing to topics
20+
21+
Clients publish to topics by connecting to /v1/topics/ and sending either text or binary messages that include a topic
22+
and a message. A client need not subscribe to a topic to publish to it.
23+
24+
## Binary messages
25+
26+
Binary messages are formatted as:
27+
28+
[null terminated topic name string][message]
29+
30+
This will likely change in the near future.
31+
32+
## Text messages
33+
34+
Text messages are JSON objects with two attributes: topic and message.
35+
36+
{ "topic": "my_topic", "message": "my amazing message" }
37+
38+
## Configuration
39+
40+
See property files in conf/

conf/consumer.properties

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
group.id=kafka-websocket
2+
zookeeper.connect=localhost:2181

conf/log4j.properties

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# output messages into a rolling log file as well as stdout
2+
log4j.rootLogger=INFO,stdout
3+
4+
# stdout
5+
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
6+
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
7+
log4j.appender.stdout.layout.ConversionPattern=%5p %d{HH:mm:ss,SSS} %m%n
8+
9+
log4j.logger.us.b3k.kafka.ws=DEBUG

conf/producer.properties

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
metadata.broker.list=localhost:9092
2+
request.required.acks=1
3+
producer.type=async

conf/server.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
ws.port=8080

pom.xml

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>us.b3k.kafka</groupId>
8+
<artifactId>kafka-websocket</artifactId>
9+
<version>0.8.1-SNAPSHOT</version>
10+
11+
<properties>
12+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
13+
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
14+
<jetty.version>9.1.3.v20140225</jetty.version>
15+
</properties>
16+
17+
<repositories>
18+
<repository>
19+
<id>Sonatype-public</id>
20+
<name>SnakeYAML repository</name>
21+
<url>http://oss.sonatype.org/content/groups/public/</url>
22+
</repository>
23+
24+
<repository>
25+
<id>oss.sonatype.org</id>
26+
<name>OSS Sonatype Staging</name>
27+
<url>https://oss.sonatype.org/content/groups/staging</url>
28+
</repository>
29+
30+
<repository>
31+
<id>sonatype-nexus-snapshots</id>
32+
<name>Sonatype Nexus Snapshots</name>
33+
<url>http://oss.sonatype.org/content/repositories/snapshots</url>
34+
</repository>
35+
</repositories>
36+
37+
<dependencies>
38+
<dependency>
39+
<groupId>org.eclipse.jetty</groupId>
40+
<artifactId>jetty-server</artifactId>
41+
<version>${jetty.version}</version>
42+
</dependency>
43+
<dependency>
44+
<groupId>org.eclipse.jetty.websocket</groupId>
45+
<artifactId>javax-websocket-server-impl</artifactId>
46+
<version>${jetty.version}</version>
47+
</dependency>
48+
<dependency>
49+
<groupId>javax.websocket</groupId>
50+
<artifactId>javax.websocket-api</artifactId>
51+
<version>1.0</version>
52+
</dependency>
53+
54+
<dependency>
55+
<groupId>org.apache.kafka</groupId>
56+
<artifactId>kafka_2.9.2</artifactId>
57+
<version>0.8.0</version>
58+
<exclusions>
59+
<exclusion>
60+
<groupId>com.sun.jmx</groupId>
61+
<artifactId>jmxri</artifactId>
62+
</exclusion>
63+
<exclusion>
64+
<groupId>com.sun.jdmk</groupId>
65+
<artifactId>jmxtools</artifactId>
66+
</exclusion>
67+
<exclusion>
68+
<groupId>javax.jms</groupId>
69+
<artifactId>jms</artifactId>
70+
</exclusion>
71+
</exclusions>
72+
</dependency>
73+
74+
<dependency>
75+
<groupId>com.google.code.gson</groupId>
76+
<artifactId>gson</artifactId>
77+
<version>2.2.4</version>
78+
</dependency>
79+
</dependencies>
80+
81+
<build>
82+
<plugins>
83+
<plugin>
84+
<groupId>org.apache.maven.plugins</groupId>
85+
<artifactId>maven-compiler-plugin</artifactId>
86+
<version>2.3.2</version>
87+
<configuration>
88+
<source>1.7</source>
89+
<target>1.7</target>
90+
</configuration>
91+
</plugin>
92+
<plugin>
93+
<groupId>org.apache.maven.plugins</groupId>
94+
<artifactId>maven-shade-plugin</artifactId>
95+
<version>1.6</version>
96+
<configuration>
97+
<createDependencyReducedPom>false</createDependencyReducedPom>
98+
<filters>
99+
<filter>
100+
<artifact>*:*</artifact>
101+
<excludes>
102+
<exclude>META-INF/*.SF</exclude>
103+
<exclude>META-INF/*.DSA</exclude>
104+
<exclude>META-INF/*.RSA</exclude>
105+
</excludes>
106+
</filter>
107+
</filters>
108+
</configuration>
109+
<executions>
110+
<execution>
111+
<phase>package</phase>
112+
<goals>
113+
<goal>shade</goal>
114+
</goals>
115+
<configuration>
116+
<shadedArtifactAttached>true</shadedArtifactAttached>
117+
<transformers>
118+
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
119+
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
120+
<mainClass>us.b3k.kafka.ws.KafkaWebsocketMain</mainClass>
121+
</transformer>
122+
</transformers>
123+
</configuration>
124+
</execution>
125+
</executions>
126+
</plugin>
127+
128+
</plugins>
129+
</build>
130+
131+
</project>
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package us.b3k.kafka.ws;
2+
3+
import org.apache.log4j.Logger;
4+
import us.b3k.kafka.ws.consumer.KafkaConsumer;
5+
import us.b3k.kafka.ws.messages.BinaryMessage;
6+
import us.b3k.kafka.ws.messages.BinaryMessage.*;
7+
import us.b3k.kafka.ws.messages.TextMessage;
8+
import us.b3k.kafka.ws.messages.TextMessage.*;
9+
import us.b3k.kafka.ws.producer.KafkaProducer;
10+
11+
import javax.websocket.Session;
12+
import javax.websocket.OnOpen;
13+
import javax.websocket.OnMessage;
14+
import javax.websocket.OnClose;
15+
import javax.websocket.OnError;
16+
import javax.websocket.CloseReason;
17+
import javax.websocket.server.ServerEndpoint;
18+
import javax.websocket.server.ServerEndpointConfig;
19+
20+
import java.io.IOException;
21+
import java.io.UnsupportedEncodingException;
22+
import java.text.MessageFormat;
23+
import java.util.Properties;
24+
25+
@ServerEndpoint(
26+
value = "/v1/topics/{topics}",
27+
subprotocols = {"kafka-text", "kafka-binary"},
28+
decoders = {BinaryMessageDecoder.class, TextMessageDecoder.class},
29+
encoders = {BinaryMessageEncoder.class, TextMessageEncoder.class},
30+
configurator = KafkaWebsocketEndpoint.Configurator.class
31+
)
32+
public class KafkaWebsocketEndpoint {
33+
private static Logger LOG = Logger.getLogger(KafkaWebsocketEndpoint.class);
34+
35+
private Properties configProps;
36+
private KafkaConsumer consumer = null;
37+
private KafkaProducer producer = null;
38+
39+
@OnOpen
40+
@SuppressWarnings("unchecked")
41+
public void onOpen(final Session session) {
42+
String topics = session.getPathParameters().get("topics");
43+
LOG.debug("Opening new session...");
44+
if (!topics.isEmpty()) {
45+
LOG.debug(" topics are " + topics);
46+
consumer = new KafkaConsumer(Configurator.getConsumerProps(), session);
47+
consumer.start();
48+
}
49+
50+
producer = new KafkaProducer(Configurator.getProducerProps());
51+
producer.start();
52+
}
53+
54+
@OnClose
55+
public void onClose(final Session session) {
56+
if (consumer != null) {
57+
consumer.stop();
58+
}
59+
60+
if (producer != null) {
61+
producer.stop();
62+
}
63+
}
64+
65+
@OnMessage
66+
public void onMessage(final BinaryMessage message, final Session session) {
67+
LOG.debug("Received binary message: topic - " + message.getTopic() + "; message - " + message.getMessage());
68+
producer.send(message.getTopic(), message.getMessage());
69+
}
70+
71+
@OnMessage
72+
public void onMessage(final TextMessage message, final Session session) {
73+
try {
74+
LOG.debug("Received text message: topic - " + message.getTopic() + "; message - " + message.getMessage());
75+
producer.send(message.getTopic(), message.getMessage().getBytes("UTF-8"));
76+
} catch (UnsupportedEncodingException e) {
77+
closeSession(session, new CloseReason(CloseReason.CloseCodes.CLOSED_ABNORMALLY, e.getMessage()));
78+
}
79+
}
80+
81+
private void closeSession(Session session, CloseReason reason) {
82+
try {
83+
session.close(reason);
84+
} catch (IOException e) {
85+
e.printStackTrace();
86+
}
87+
}
88+
89+
public static class Configurator extends ServerEndpointConfig.Configurator
90+
{
91+
private static Properties consumerProps;
92+
private static Properties producerProps;
93+
94+
public static void setKafkaProps(Properties consumerProps, Properties producerProps) {
95+
Configurator.consumerProps = consumerProps;
96+
Configurator.producerProps = producerProps;
97+
}
98+
99+
public static Properties getConsumerProps() { return Configurator.consumerProps; }
100+
public static Properties getProducerProps() { return Configurator.producerProps; }
101+
102+
@Override
103+
public <T> T getEndpointInstance(Class<T> endpointClass) throws InstantiationException
104+
{
105+
T endpoint = super.getEndpointInstance(endpointClass);
106+
107+
if (endpoint instanceof KafkaWebsocketEndpoint) { return endpoint; }
108+
else
109+
{
110+
throw new InstantiationException(
111+
MessageFormat.format("Expected instanceof \"{0}\". Got instanceof \"{1}\".",
112+
KafkaWebsocketEndpoint.class, endpoint.getClass()));
113+
}
114+
}
115+
}
116+
}
117+
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package us.b3k.kafka.ws;
2+
3+
import java.io.FileInputStream;
4+
import java.io.InputStream;
5+
import java.util.Properties;
6+
import org.apache.log4j.PropertyConfigurator;
7+
8+
public class KafkaWebsocketMain {
9+
private static final String LOG4J_PROPS_PATH = "conf/log4j.properties";
10+
private static final String SERVER_PROPS_PATH = "conf/server.properties";
11+
private static final String CONSUMER_PROPS_PATH = "conf/consumer.properties";
12+
private static final String PRODUCER_PROPS_PATH = "conf/producer.properties";
13+
14+
private static Properties loadPropsFromFile(String filename) {
15+
try {
16+
Properties props = new Properties();
17+
props.load(new FileInputStream(filename));
18+
return props;
19+
} catch (java.io.IOException e) {
20+
e.printStackTrace();
21+
System.exit(-1);
22+
}
23+
return null;
24+
}
25+
26+
public static void main(String[] args) {
27+
PropertyConfigurator.configure(LOG4J_PROPS_PATH);
28+
Properties wsProps = loadPropsFromFile(SERVER_PROPS_PATH);
29+
Properties consumerProps = loadPropsFromFile(CONSUMER_PROPS_PATH);
30+
Properties producerProps = loadPropsFromFile(PRODUCER_PROPS_PATH);
31+
32+
KafkaWebsocketServer server = new KafkaWebsocketServer(wsProps, consumerProps, producerProps);
33+
server.run();
34+
}
35+
}

0 commit comments

Comments
 (0)