Skip to content

Commit

Permalink
Initial stream processing guidelines and examples using Flink and Spark
Browse files Browse the repository at this point in the history
  • Loading branch information
Nguyen Truong Duong committed Aug 21, 2019
0 parents commit 10ab19f
Show file tree
Hide file tree
Showing 35 changed files with 2,397 additions and 0 deletions.
12 changes: 12 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib

# Test binary, build with `go test -c`
*.test

# Output of the go coverage tool, specifically when used with LiteIDE
*.out
565 changes: 565 additions & 0 deletions README.md

Large diffs are not rendered by default.

Binary file added docs/images/batch_processing.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/bigdata_analysis_overview.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
749 changes: 749 additions & 0 deletions docs/images/flink/flink_architecture.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/flink/flink_cluster.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/flink/flink_cluster_overview.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/flink/flink_ecosystem.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/flink/flink_features.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/flink/flink_job_flows.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/flink/flink_job_results.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
546 changes: 546 additions & 0 deletions docs/images/flink/program_dataflow.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/flink/stream_data_flow.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/flink/submit_job.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/flink/task_managers.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/spark/job_execution.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/spark/job_execution_example.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/spark/rdd_features.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/spark/rdd_workflow.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/spark/spark_architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/spark/spark_ecosystem.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/spark/spark_features.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/spark/spark_job_flow.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/streaming_processing.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/stream_processing.pdf
Binary file not shown.
80 changes: 80 additions & 0 deletions src/flink-word-count/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<!-- parent pom -->
<parent>
<groupId>com.seedotech</groupId>
<artifactId>stream-processing</artifactId>
<version>0.1</version>
</parent>

<groupId>com.seedotech</groupId>
<artifactId>flink_word_count</artifactId>
<version>0.1</version>
<packaging>jar</packaging>

<name>Flink Word Count</name>
<url>http://www.seedotech.com</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.8.1</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>

<dependencies>
<!-- Apache Flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kinesis_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.26</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.seedotech.StreamingWordCount</mainClass>
</transformer>

<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.seedotech;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

@SuppressWarnings("serial")
public class FileWordCount {
final static String DS_FILE_PATH = "file:///home/raycad/tmp/raycad/test1.txt";

public static void main(String[] args) throws Exception {
// Set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.setStateBackend(new FsStateBackend("file:///home/raycad/tmp/raycad/flink/checkpoints", false));

// Get input data
DataStream<String> source = env.readTextFile(DS_FILE_PATH);

final int windowSize = 100;
final int slideSize = 40;

// Parse the data, group it and aggregate the counts
DataStream<WordWithCount> windowCounts = source
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split(" ")) {
out.collect(new WordWithCount(word, 1));
}
}
})
.keyBy("word")
.countWindow(windowSize, slideSize)
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
// long threadId = Thread.currentThread().getId();
// System.out.println(String.format(">>>> [RAYCAD] - Thread ID = %d - %s: %d", threadId, a.word, a.count + b.count));
return new WordWithCount(a.word, a.count + b.count);
}
});

// Print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);

env.execute("File WordCount");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package com.seedotech;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.io.OutputStream;
import java.io.PrintStream;

/**
* Implements a streaming windowed version of the "WordCount" program.
*
* <p>This program connects to a server socket and reads strings from the socket.
* The easiest way to try this out is to open a text server (at port 9000)
* using the <i>netcat</i> tool via
* <pre>
* nc -l 9000 on Linux or nc -l -p 9000 on Windows
* </pre>
* and run this example with the hostname and the port as arguments.
*/
@SuppressWarnings("serial")
public class StreamingWordCount {
final static int MAX_MEM_STATE_SIZE = 1000000000;
final static String OUTPUT_FILE_PATH = "/home/raycad/tmp/raycad";
// Computation window time in seconds
final static int COMPUTATION_WINDOW_TIME = 1;

public static void main(String[] args) throws Exception {

// the host and the port to connect to
final String hostname;
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
hostname = params.has("hostname") ? params.get("hostname") : "localhost";
port = params.getInt("port");
} catch (Exception e) {
System.err.println("No port specified. Please run 'StreamingWordCount " +
"--hostname <hostname> --port <port>', where hostname (localhost by default) " +
"and port is the address of the text server");
System.err.println("To start a simple text server, run 'netcat -l <port>' and " +
"type the input text into the command line");
return;
}

// Get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.setStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE, false));
// env.setStateBackend(new FsStateBackend("file:///home/raycad/tmp/raycad/flink/checkpoints", false));

// Get input data by connecting to the socket
DataStream<String> source = env.socketTextStream(hostname, port, "\n");

// Parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts = source
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split(" ")) {
out.collect(new WordWithCount(word, 1));
}
}
})

.keyBy("word")
.timeWindow(Time.seconds(COMPUTATION_WINDOW_TIME))

.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
// long threadId = Thread.currentThread().getId();
// System.out.println(String.format(">>>> [RAYCAD] - Thread ID = %d - %s: %d", threadId, a.word, a.count + b.count));
return new WordWithCount(a.word, a.count + b.count);
}
});

// Configure File sink
StreamingFileSink<WordWithCount> sink = StreamingFileSink
.forRowFormat(new Path(String.format(OUTPUT_FILE_PATH)),
(Encoder<WordWithCount>) (element, stream) -> {
PrintStream out = new PrintStream(stream);
out.println(element.toString());
})
// Determine S3 folder for each element
.withBucketAssigner(new WordWithCountBucketAssigner())
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.build();

windowCounts.addSink(sink);

// Print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);

env.execute("Streaming Window WordCount");
}

/**
* Output folder assigner
*/
private static class WordWithCountBucketAssigner extends DateTimeBucketAssigner<WordWithCount> {

@Override
public String getBucketId(WordWithCount element, Context context) {
String bucketId = super.getBucketId(element, context);
System.out.println(String.format(">>>> [RAYCAD] - Bucket ID = %s. Data = %s", bucketId, element.toString()));
return bucketId + "/" + element.word;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.seedotech;

// Data type for words with count
public class WordWithCount {
public String word;
public long count;

public WordWithCount() {}

public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}

@Override
public String toString() {
return word + ": " + count;
}
}
23 changes: 23 additions & 0 deletions src/flink-word-count/src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

log4j.rootLogger=INFO, console

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
21 changes: 21 additions & 0 deletions src/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<!-- parent pom -->
<groupId>com.seedotech</groupId>
<artifactId>stream-processing</artifactId>
<name>Stream Processing</name>
<packaging>pom</packaging>
<version>0.1</version>

<!-- sub modules -->
<modules>
<module>spark-word-count</module>
<module>flink-word-count</module>
</modules>

</project>
39 changes: 39 additions & 0 deletions src/scripts/tcp_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import socket
import threading
import time

bind_ip = '0.0.0.0'
bind_port = 9000
file_path_name = '/home/raycad/tmp/raycad/test1.txt'
# Message loop times
message_loop_time = 200
# Time in seconds
seep_time = 0.1

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((bind_ip, bind_port))
server.listen(5) # max backlog of connections

print('Listening on {}:{}'.format(bind_ip, bind_port))

def handle_client_connection(client_socket):
try:
with open(file_path_name, 'r') as content_file:
content = content_file.read()

#while True:
for i in range(message_loop_time):
client_socket.sendall(bytes(content + "\n", "utf-8"))
time.sleep(seep_time)

except:
print("Lost connection")

while True:
client_sock, address = server.accept()
print('Accepted connection from {}:{}'.format(address[0], address[1]))
client_handler = threading.Thread(
target=handle_client_connection,
args=(client_sock,) # without comma you'd get a... TypeError: handle_client_connection() argument after * must be a sequence, not _socketobject
)
client_handler.start()
Loading

0 comments on commit 10ab19f

Please sign in to comment.