Skip to content
dbrambilla edited this page Jun 6, 2015 · 2 revisions

Kafka Integration

There are two approaches to integrate Kafka with Spark Streaming

  1. the old approach using Receivers and Kafka’s high-level API
  2. a new experimental approach (introduced in Spark 1.3) without using Receivers. They have different programming models, performance characteristics, and semantics guarantees.

Receiver-based approach

This approach uses a Receiver to receive the data, which is implemented using the Kafka high-level consumer API. The data received from Kafka through a Receiver is stored in Spark executors, and then data is processed.

Under default configuration, this approach can lose data under failures, to ensure zero-data loss, you have to additionally enable Write Ahead Logs in Spark Streaming, the WAL synchronously saves all the received Kafka data into write ahead logs on a distributed file system so all data can be recovered on failure.

Examples for this approach are:

  • KafkaSingleTopic
  • KafkaMultiTopics

These are the steps required to setup a kafka environment to test examples:

  • Start zookeeper & Kafka Broker
    • bin/zookeeper-server-start.sh config/zookeeper.properties
    • bin/kafka-server-start.sh config/server.properties
  • Create topic(s):
    • bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic <TOPIC_NAME> (for multi topics example create two topic with different name)
  • Start producer
    • you can use any producer, one of the simplest and that comes with kafka installation is the console producer that can be useful for testing purpose bin/kafka-console-producer.sh --broker-list localhost:9092 --topic <TOPIC_NAME>. Once started it will send any line you entered in the console.

Kafka examples require the following input parameters:

  • KafkaSingleTopic:
  • <spark_master>: local[*], local[3] or spark://host:port
  • <topic_name>: name of kafka topic
  • : host:port of zookeeper server (e.g. localhost:2181)
  • KafkaMultiTopics
  • <spark_master>: local[*], local[3] or spark://host:port
  • <topic_names>: a comma separated string of topic names
  • : host:port of zookeeper server (e.g. localhost:2181)

Level of Parallelism in Data Receiving

Receiving data over the network (like Kafka, Flume, socket, etc.) requires the data to deserialized and stored in Spark.

If the data receiving becomes a bottleneck in the system, then consider parallelizing the data receiving. Note that each input DStream creates a single receiver (running on a worker machine) that receives a single stream of data.

Receiving multiple data streams can therefore be achieved by creating multiple input DStreams and configuring them to receive different partitions of the data stream from the source(s). An example is Kafka input DStream receiving two topics of data, it can be split into two Kafka input streams, each receiving only one topic. This would run two receivers on two workers, thus allowing data to be received in parallel, and increasing overall throughput. These multiple DStream can be unioned together to create a single DStream (see Kafka multiple topic example)

Direct approach

This is a new receiver-less “direct” approach introduced in Spark 1.3, this is an experimental feature in Spark 1.3 and is only available in the Scala and Java API.

This approach had been introduced to ensure stronger end-to-end guarantees, instead of using receivers to receive data, this approach periodically queries Kafka for the latest offsets in each topic+partition, and accordingly defines the offset ranges to process in each batch.

When the jobs to process the data are launched, Kafka’s simple consumer API is used to read the defined ranges of offsets from Kafka (similar to read files from a file system).

This approach has the following advantages over the received-based approach:

  • Simplified Parallelism: No need to create multiple input Kafka streams and union-ing them. With directStream, Spark Streaming will create as many RDD partitions as there is Kafka partitions to consume, which will all read data from Kafka in parallel.

  • Efficiency: Achieving zero-data loss in the first approach required the data to be stored in a Write Ahead Log, this second approach eliminate the problem as there is no receiver, and hence no need for Write Ahead Logs.

  • Exactly-once semantics: The first approach uses Kafka’s high level API to store consumed offsets in Zookeeper, while this approach (in combination with write ahead logs) can ensure zero data loss there is a small chance some records may get consumed twice under some failures. This second approach uses simple Kafka API that does not use Zookeeper and offsets tracked only by Spark Streaming within its checkpoints.

One disadvantage of this approach is that it does not update offsets in Zookeeper, hence Zookeeper-based Kafka monitoring tools will not show progress. However, you can access the offsets processed by this approach in each batch and update Zookeeper yourself.

Clone this wiki locally