#Integrating NODEJS and Kafka
###Table of Contents
- Integrating NODEJS and Kafka
- Installing KAFKA Single Node - Quick Start.
- Installing NodeJS on Centos 6.6.
- Installing
nodejs
andnpm
on centos is very simple. - Installing
gcc-c++
andmake
. - Later on we will need
kafka-node
lets install that as well. - Lets do a test.
- Lets make some simple changes to exsisting script to handle JSON.
- NodeJS Kafka Producer - Using
kafka-node
- Step 1 - Copy the below script in a file called
producer_nodejs.js
. - Step 2 - Start the
kafka
cluster as we already did inInstallation of Kafka
. Assuming topic astest
- Step 3 - Start the consumer service as in the below command.
- Step 4 - Execute below command. This will send
This is the First Message I am sending
Message to the Kafka consumer. - Step 5 - Check on the consumer you will see the message sent from
nodejs
.
- Step 1 - Copy the below script in a file called
- Sending JSON to NodeJS to Kafka.
- Installing
Below are the list required to integrate NodeJS and Kafka. This is a simple HOWTO to get started.
Download the tgz
file and extract.
[kafka-admin@kafka Downloads]$ ls
jdk-7u75-linux-x64.rpm kafka_2.9.2-0.8.2.0.tgz
[kafka-admin@kafka Downloads]$ sudo rpm -ivh jdk-7u75-linux-x64.rpm
...
[kafka-admin@kafka Downloads]$ sudo tar -xzf kafka_kafka_2.9.2-0.8.2.0.tgz -C /opt
[kafka-admin@kafka Downloads]$ cd /opt
[kafka-admin@kafka opt]$ sudo ln -s kafka_2.9.2-0.8.2.0 kafka
[kafka-admin@kafka opt]$ ls
kafka kafka_2.9.2-0.8.2.0
[kafka-admin@kafka opt]$ sudo chmod kafka-admin:kafka-admin -R kafka
[kafka-admin@kafka opt]$ cd kafka
[kafka-admin@kafka kafka]$ ls
bin config libs LICENSE logs NOTICE
[kafka-admin@kafka kafka]$ bin/zookeeper-server-start.sh config/zookeeper.properties
This will start us a zookeeper in localhost
on port 2181
. This configuration can be changed in the config/zookeeper.properties
file. NOTE : If you want to run the zookeeper on a separate machine make sure the change in the config/server.properties
so that the kafka server points to the correct zookeeper. By default it points to localhost:2181
.
[kafka-admin@kafka kafka]$ bin/kafka-server-start.sh config/server.properties
NOTE : If you want to start multiple make sure you make multiple copies of the server.properties
file and change the below information.
-
broker.id
is the unique identifier for the service. -
port
where this server is going tolisten
on. -
log.dir
where to right the log.config/server-1.properties: broker.id=1 port=9093 log.dir=/tmp/kafka-logs-1
config/server-2.properties: broker.id=2 port=9094 log.dir=/tmp/kafka-logs-2
Now our server has started, lets assume we start only one server.
To create a topic just execute below command, this will create a single partition.
[kafka-admin@kafka kafka]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
To check topics currently running. Execute below command.
[kafka-admin@kafka kafka]$ bin/kafka-topics.sh --list --zookeeper localhost:2181
test
[kafka-admin@kafka kafka]$
We see currently we have only one topic. Now we are all set to send and recv messages.
Open up a new terminal and fire up the Kafka producer script as below. And start typing some message \n
or cr
will be end of each message
[kafka-admin@kafka kafka]$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is a message2
Open a new terminal and start the consumer.
Option --from-beginning
will give all the messages from the beginning. You will see 2 messages as we typed above This is a message
and This is a message2
.
[kafka-admin@kafka kafka]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is a message2
Our single node Kafka cluster is Ready.
##Installing nodejs
and npm
on centos is very simple.
[nginx-admin@nginx ~]$ sudo su
[nginx-admin@nginx ~]# curl -sL https://rpm.nodesource.com/setup | bash -
[nginx-admin@nginx ~]# yum install -y nodejs
##Installing gcc-c++
and make
.
[nginx-admin@nginx ~]$ sudo yum install gcc-c++ make
[sudo] password for nginx-admin:
Loaded plugins: fastestmirror, refresh-packagekit, security
Setting up Install Process
Loading mirror speeds from cached hostfile
* base: mirrors.123host.vn
* epel: ftp.cuhk.edu.hk
* extras: centos-hn.viettelidc.com.vn
* updates: mirrors.vonline.vn
Package 1:make-3.81-20.el6.x86_64 already installed and latest version
Resolving Dependencies
...
Complete!
##Later on we will need kafka-node
lets install that as well.
[nginx-admin@nginx ~]$ sudo npm install kafka-node
[sudo] password for nginx-admin:
> [email protected] install /home/nginx-admin/node_modules/kafka-node/node_modules/snappy
> node-gyp rebuild
gyp WARN EACCES user "root" does not have permission to access the dev dir "/root/.node-gyp/0.10.36"
gyp WARN EACCES attempting to reinstall using temporary dev dir "/home/nginx-admin/node_modules/kafka-node/node_modules/snappy/.node-gyp"
make: Entering directory `/home/nginx-admin/node_modules/kafka-node/node_modules/snappy/build'
CXX(target) Release/obj.target/snappy/deps/snappy/snappy-1.1.2/snappy-sinksource.o
CXX(target) Release/obj.target/snappy/deps/snappy/snappy-1.1.2/snappy-stubs-internal.o
CXX(target) Release/obj.target/snappy/deps/snappy/snappy-1.1.2/snappy.o
AR(target) Release/obj.target/deps/snappy/snappy.a
COPY Release/snappy.a
CXX(target) Release/obj.target/binding/src/binding.o
SOLINK_MODULE(target) Release/obj.target/binding.node
SOLINK_MODULE(target) Release/obj.target/binding.node: Finished
COPY Release/binding.node
make: Leaving directory `/home/nginx-admin/node_modules/kafka-node/node_modules/snappy/build'
[email protected] node_modules/kafka-node
├── [email protected]
├── [email protected]
├── [email protected]
├── [email protected]
├── [email protected]
├── [email protected] ([email protected])
├── [email protected] ([email protected], [email protected])
├── [email protected] ([email protected], [email protected])
├── [email protected] ([email protected])
└── [email protected] ([email protected], [email protected])
[nginx-admin@nginx ~]$ ls
Create a script called example.js
with below code.
var http = require('http');
http.createServer(function (req, res) {
res.writeHead(200, {'Content-Type': 'text/plain'});
res.end('Hello World\n');
}).listen(1337, '127.0.0.1');
console.log('Server running at http://127.0.0.1:1337/');
Lets start the server on a terminal.
[nginx-admin@nginx nodejs]$ node example.js
Server running at http://127.0.0.1:1337/
Hit the URL from the browser and We can see Hello World
.
So we are all set.
NodeJS is Ready.
Here is a simple script to handle JSON data.
// Getting some 'http' power
var http=require('http');
// Setting where we are expecting the request to arrive.
// http://localhost:8125/upload
var request = {
hostname: 'localhost',
port: 8125,
path: '/upload',
method: 'GET'
};
// Lets create a server to wait for request.
http.createServer(function(request, response)
{
// Making sure we are waiting for a JSON.
response.writeHeader(200, {"Content-Type": "application/json"});
// request.on waiting for data to arrive.
request.on('data', function (chunk)
{
// CHUNK which we recive from the clients
// For out request we are assuming its going to be a JSON data.
// We print it here on the console.
console.log(chunk.toString('utf8'))
});
//end of request
response.end();
// Listen on port 8125
}).listen(8125);
Lets fire up the script.
[nginx-admin@nginx nodejs]$ node node_recv_json.js
On a new terminal send some request to our script. Our script is listening on 8125
port.
[nginx-admin@nginx nodejs]$ curl -H "Content-Type: application/json" -d '{"username":"xyz","password":"xyz"}' http://localhost:8125/upload
You will see the message received on the script terminal.
[nginx-admin@nginx nodejs]$ node node_recv_json.js
{"username":"xyz","password":"xyz"}
Now we are all set to do some RND.
##NodeJS Kafka Producer - Using kafka-node
Now that we have Kafka and NodeJS ready. Lets some data to our Kafka Cluster.
Below is a basic producer code.
below are the Server Details.
nodejs
is the nodejs serverkafka
is the kafka server (single node)
###Step 1 - Copy the below script in a file called producer_nodejs.js
.
/*
Basic producer to send data to kafka from nodejs.
More Information Here : https://www.npmjs.com/package/kafka-node
*/
// Using kafka-node - really nice library
// create a producer and connect to a Zookeeper to send the payloads.
var kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.Client('kafka:2181'),
producer = new Producer(client);
/*
Creating a payload, which takes below information
'topic' --> this is the topic we have created in kafka. (test)
'messages' --> data which needs to be sent to kafka. (JSON in our case)
'partition' --> which partition should we send the request to. (default)
example command to create a topic in kafka:
[kafka@kafka kafka]$ bin/kafka-topics.sh \
--create --zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic test
If there are multiple partition, then we optimize the code here,
so that we send request to different partitions.
*/
payloads = [
{ topic: 'test', messages: 'This is the First Message I am sending', partition: 0 },
];
// producer 'on' ready to send payload to kafka.
producer.on('ready', function(){
producer.send(payloads, function(err, data){
console.log(data)
});
});
producer.on('error', function(err){}
###Step 2 - Start the kafka
cluster as we already did in Installation of Kafka
. Assuming topic as test
###Step 3 - Start the consumer service as in the below command.
[kafka-admin@kafka kafka]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
###Step 4 - Execute below command. This will send This is the First Message I am sending
Message to the Kafka consumer.
[nodejs-admin@nodejs nodejs]$ node producer_nodejs.js
###Step 5 - Check on the consumer you will see the message sent from nodejs
.
[kafka-admin@kafka kafka_2.9.2-0.8.2.0]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message here
This is the First Message I am sending
What we are trying to achieve ?
- Send
json
from and browser/curl
tonodejs
. nodejs
will redirectjson
data tokafka
.- Further processing is done on
kafka
. - We can then see the
json
arrival inkafka
, usingkafka-console-consumer.sh
script.
###Step 1 - Create a script called json_nodejs_kafka.js
with below script.
/*
Getting some 'http' power
*/
var http=require('http');
/*
Setting where we are expecting the request to arrive.
http://localhost:8125/upload
*/
var request = {
hostname: 'localhost',
port: 8125,
path: '/upload',
method: 'GET'
};
/*
Lets create a server to wait for request.
*/
http.createServer(function(request, response)
{
/*
Making sure we are waiting for a JSON.
*/
response.writeHeader(200, {"Content-Type": "application/json"});
/*
request.on waiting for data to arrive.
*/
request.on('data', function (chunk)
{
/*
CHUNK which we recive from the clients
For out request we are assuming its going to be a JSON data.
We print it here on the console.
*/
console.log(chunk.toString('utf8'))
/*
Using kafka-node - really nice library
create a producer and connect to a Zookeeper to send the payloads.
*/
var kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.Client('kafka:2181'),
producer = new Producer(client);
/*
Creating a payload, which takes below information
'topic' --> this is the topic we have created in kafka.
'messages' --> data which needs to be sent to kafka. (JSON in our case)
'partition' --> which partition should we send the request to.
If there are multiple partition, then we optimize the code here,
so that we send request to different partitions.
*/
payloads = [
{ topic: 'test', messages: chunk.toString('utf8'), partition: 0 },
];
/*
producer 'on' ready to send payload to kafka.
*/
producer.on('ready', function(){
producer.send(payloads, function(err, data){
console.log(data)
});
});
/*
if we have some error.
*/
producer.on('error', function(err){})
});
/*
end of request
*/
response.end();
/*
Listen on port 8125
*/
}).listen(8125);
###Step 2 - Start above script on the nodejs
server.
[nodejs-admin@nodejs nodejs]$ vim json_nodejs_kafka.js
[nodejs-admin@nodejs nodejs]$ node json_nodejs_kafka.js
###Step 3 - Execute curl
command to send the JSON to nodejs
.
[nodejs-admin@nodejs nodejs]$ curl -H "Content-Type: application/json" -d '{"username":"xyz","password":"xyz"}' http://localhost:8125/upload
###Step 4 - Output on nodejs console
[nodejs-admin@nodejs nodejs]$ node json_nodejs_kafka.js
{"username":"xyz","password":"xyz"}
{ test: { '0': 29 } }
{"username":"xyz","password":"xyz"}
request from the curl
command.
{ test: { '0': 29 } }
response from the kafka cluster that, it has received the json
.
###Step 5 - Output on the kafka
consumer side.
[kafka-admin@kafka kafka_2.9.2-0.8.2.0]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
{"username":"xyz","password":"xyz"}
{"username":"xyz","password":"xyz"}
data received from nodejs
server.
http://kafka.apache.org/documentation.html
http://nodejs.org/api/http.html
https://www.npmjs.com/package/kafka-node
https://cwiki.apache.org/confluence/display/KAFKA/Index