This is a sample Spark Streaming application written in Scala, the purpose of which is to take a stream of logs from Flume, parse the raw logs, create a Spark dataframe, and write the data to Parquet in HDFS.
A similar Java version has also been implemented which has additional features like restoring from checkpoint after failover, write-ahead-log, persisting in serialized format in both memory and disk etc.
The following walkthrough is meant to get the user up and running with an example in localmode; however, there are a few minor changes—particularly with the Flume set up—that allows this to run over a network and on an entire Spark cluster.
Below steps are only for using the Scala version. To use Java version, conventional Flume setup should be sufficient.
- Download the latest release of Flume.
- Download jar for spark-streaming-flume-sink.
- Download the scala-lang jar.
- Download the common-lang jar.
Because we're going to create a custom Flume configuration for Spark Streaming, we need to make sure the necessary jars are in the classpath. Flume has a convenient way of doing this using the plugins.d
directory structure.
- Create the following directory setup within your Flume location, add the jars from above:
apache-flume-1.6.0-bin/
plugins.d/
spark/
lib/
libext/
commons-lang3-3.3.2.jar
scala-library-2.10.5.jar
spark-assembly-1.5.2-hadoop2.6.0-amzn-2.jar
spark-streaming-flume-assembly_2.10-1.6.1.jar
spark-streaming-flume-sink_2.10-1.6.1.jar
- Configure the Flume agent (
conf/logdata.conf
):
# name the components of agent
agent.sources = terminal
agent.sinks = logger spark
agent.channels = memory1 memory2
# describe source
agent.sources.terminal.type = exec
agent.sources.terminal.command = tail -f /home/hadoop/generator/logs/access.log
# describe logger sink (in production, pipe raw logs to HDFS)
agent.sinks.logger.type = logger
# describe spark sink
agent.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
agent.sinks.spark.hostname = localhost
agent.sinks.spark.port = 9988
agent.sinks.spark.channel = memory1
# channel buffers events in memory (used with logger sink)
agent.channels.memory1.type = memory
agent.channels.memory1.capacity = 10000
agent.channels.memory1.transactionCapacity = 1000
# channel buffers events in memory (used with spark sink)
agent.channels.memory2.type = memory
agent.channels.memory2.capacity = 10000
agent.channels.memory2.transactionCapacity = 1000
# tie source and sinks with respective channels
agent.sources.terminal.channels = memory1 memory2
agent.sinks.logger.channel = memory1
agent.sinks.spark.channel = memory2
- Start Flume agent:
./bin/flume-ng agent --conf conf --conf-file conf/logdata.conf --name agent -Dflume.root.logger=INFO,console
- Clone the repo:
[email protected]:looker/spark_log_data.git
- Open
/src/main/resources/application.conf
and set your HDFS output location. - Compile into uber jar:
sbt assembly
- Submit application to Spark:
./bin/spark-submit --master local[2] --class logDataWebinar /spark_log_data/target/scala-2.10/Log\ Data\ Webinar-assembly-1.0.jar localhost 9988 60
The java version is available under Java/LogDataWebinar
directory.
- Open
config.properties
andlog4j2.xml
underJava/LogDataWebinar/resources
directory and set necessary properties. - Compile into uber jar:
mvn package -DskipTests
An executable Webinar-1.0.0-SNAPSHOT.jar
will be created under Java/LogDataWebinar/target
directory.
- Submit application to Spark:
./bin/spark-submit --master local[2] --class com.looker.logDataWebinar.LogDataWebinar /spark_log_data/target/scala-2.10/Log\ Data\ Webinar-1.0.0-SNAPSHOT.jar localhost 9988
We're going to use the Hive Metastore to interface with our Parquet files by creating an external table.
- Fire up Hive command-line client:
hive
- Create database:
create database if not exists logdata;
- Create table:
drop table if exists logdata.event;
create external table logdata.event (
ip_address string
, identifier string
, user_id string
, created_at timestamp
, method string
, uri string
, protocol string
, status string
, size string
, referer string
, agent string
, user_meta_info string)
stored as parquet
location 'hdfs://YOUR-HDFS-ENDPOINT:PORT/YOUR/PATH/loglines.parquet';
msck repair table logdata.event; /* recover partitions */
- Start Thrift Server:
sudo -u spark HADOOP_USER_NAME=hadoop HIVE_SERVER2_THRIFT_PORT=10001 /usr/lib/spark/sbin/start-thriftserver.sh
- Use Beeline to interface with external tables:
./bin/beeline --color=yes -u 'jdbc:hive2://localhost:10001/logdata' -n hadoop
- Issue SQL:
select count(*) from logdata.event;
- Stop Thrift Server:
sudo -u spark /usr/lib/spark/sbin/stop-thriftserver.sh