Skip to content

Commit 6a7784f

Browse files
committed
Initial Commit
0 parents  commit 6a7784f

10 files changed

+573
-0
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
target/*

README.md

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
2+
Configuring Flume
3+
------------------
4+
5+
1. **Build or Download the custom Flume Source**
6+
7+
The `flume-sources` directory contains a Maven project with a custom Flume source designed to connect to the specified SSH remote path and ingest the contents of the files there into HDFS.
8+
9+
To build the flume-sources JAR, from the root of the git repository:
10+
11+
$ cd flume-sources
12+
$ mvn package
13+
$ cd ..
14+
15+
16+
This will generate a file called `flume-sources-1.0-SNAPSHOT.jar` in the `target` directory.
17+
18+
2. **Add the JAR to the Flume classpath**
19+
20+
<pre>$ sudo cp /etc/flume-ng/conf/flume-env.sh.template /etc/flume-ng/conf/flume-env.sh</pre>
21+
22+
Edit the `flume-env.sh` file and uncomment the `FLUME_CLASSPATH` line, and enter the path to the JAR. If adding multiple paths, separate them with a colon.
23+
24+
3. **Set the Flume agent name to SshAgent in /etc/default/flume-ng-agent**
25+
26+
If you don't see the `/etc/default/flume-ng-agent` file, it likely means that you didn't install the `flume-ng-agent` package. In the file, you should have the following:
27+
28+
<pre>FLUME_AGENT_NAME=SshAgent</pre>
29+
30+
4. **Modify the provided Flume configuration and copy it to /etc/flume-ng/conf**
31+
32+
There is a file called `flume.conf` in the `flume-sources` directory, which needs some minor editing. There are five fields which need to be filled in with values.
33+
34+
<pre>$ sudo cp flume.conf /etc/flume-ng/conf</pre>
35+
36+
37+
Starting the data pipeline
38+
------------------------
39+
40+
1. **Start the Flume agent**
41+
42+
Create the HDFS directory hierarchy for the Flume sink.
43+
44+
<pre>
45+
$ hadoop fs -mkdir /user/flume/ssh
46+
$ hadoop fs -chown -R flume:flume /user/flume/ssh
47+
$ hadoop fs -chmod -R 770 /user/flume
48+
$ sudo /etc/init.d/flume-ng-agent start
49+
</pre>

flume.conf

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
SshAgent.sources = SshSource
2+
SshAgent.channels = MemChannel
3+
SshAgent.sinks = HDFS
4+
5+
SshAgent.sources.SshSource.type = com.cloudera.flume.source.SshSpoolDirectorySource
6+
SshAgent.sources.SshSource.channels = MemChannel
7+
SshAgent.sources.SshSource.userName = [required]
8+
SshAgent.sources.SshSource.userPass = [required]
9+
SshAgent.sources.SshSource.hostName = [required]
10+
SshAgent.sources.SshSource.remotePath = [required]
11+
SshAgent.sources.SshSource.localPersistPath = [required]
12+
13+
SshAgent.sinks.HDFS.channel = MemChannel
14+
SshAgent.sinks.HDFS.type = hdfs
15+
SshAgent.sinks.HDFS.hdfs.path = hdfs://localhost.localdomain:8020/user/flume/ssh/
16+
SshAgent.sinks.HDFS.hdfs.fileType = DataStream
17+
SshAgent.sinks.HDFS.hdfs.writeFormat = Text
18+
SshAgent.sinks.HDFS.hdfs.batchSize = 1000
19+
SshAgent.sinks.HDFS.hdfs.rollSize = 0
20+
SshAgent.sinks.HDFS.hdfs.rollCount = 10000
21+
SshAgent.sinks.HDFS.hdfs.rollInterval = 600
22+
23+
SshAgent.channels.MemChannel.type = memory
24+
SshAgent.channels.MemChannel.capacity = 10000
25+
SshAgent.channels.MemChannel.transactionCapacity = 100

pom.xml

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
2+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
3+
4+
<modelVersion>4.0.0</modelVersion>
5+
6+
<groupId>com.cloudera</groupId>
7+
<artifactId>ssh-spool-source</artifactId>
8+
<version>1.0-SNAPSHOT</version>
9+
<packaging>jar</packaging>
10+
11+
<name>ssh-spool-source</name>
12+
<url>http://www.cloudera.com</url>
13+
14+
<properties>
15+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
16+
<flume.version>1.3.0-cdh4.2.0</flume.version>
17+
<hadoop.version>2.0.0-cdh4.2.0</hadoop.version>
18+
</properties>
19+
20+
<build>
21+
<plugins>
22+
<plugin>
23+
<groupId>org.apache.maven.plugins</groupId>
24+
<artifactId>maven-eclipse-plugin</artifactId>
25+
<version>2.9</version>
26+
<configuration>
27+
<buildOutputDirectory>eclipse-classes</buildOutputDirectory>
28+
<downloadSources>true</downloadSources>
29+
<downloadJavadocs>false</downloadJavadocs>
30+
</configuration>
31+
</plugin>
32+
33+
<plugin>
34+
<groupId>org.apache.maven.plugins</groupId>
35+
<artifactId>maven-shade-plugin</artifactId>
36+
<version>1.7.1</version>
37+
<executions>
38+
<execution>
39+
<phase>package</phase>
40+
<goals>
41+
<goal>shade</goal>
42+
</goals>
43+
</execution>
44+
</executions>
45+
<configuration>
46+
<filters>
47+
<filter>
48+
<artifact>*:*</artifact>
49+
<excludes>
50+
<exclude>META-INF/*.SF</exclude>
51+
<exclude>META-INF/*.DSA</exclude>
52+
<exclude>META-INF/*.RSA</exclude>
53+
</excludes>
54+
</filter>
55+
</filters>
56+
</configuration>
57+
</plugin>
58+
</plugins>
59+
60+
<pluginManagement>
61+
<plugins>
62+
<plugin>
63+
<groupId>org.apache.maven.plugins</groupId>
64+
<artifactId>maven-compiler-plugin</artifactId>
65+
<version>2.3.2</version>
66+
<configuration>
67+
<source>1.6</source>
68+
<target>1.6</target>
69+
</configuration>
70+
</plugin>
71+
</plugins>
72+
</pluginManagement>
73+
</build>
74+
75+
<dependencies>
76+
<dependency>
77+
<groupId>junit</groupId>
78+
<artifactId>junit</artifactId>
79+
<version>4.8.2</version>
80+
<scope>test</scope>
81+
</dependency>
82+
83+
<dependency>
84+
<groupId>org.slf4j</groupId>
85+
<artifactId>slf4j-api</artifactId>
86+
<version>1.6.1</version>
87+
</dependency>
88+
89+
<dependency>
90+
<groupId>org.slf4j</groupId>
91+
<artifactId>slf4j-jdk14</artifactId>
92+
<version>1.6.1</version>
93+
<scope>compile</scope>
94+
</dependency>
95+
96+
<!-- For the FileUtils API -->
97+
<dependency>
98+
<groupId>commons-io</groupId>
99+
<artifactId>commons-io</artifactId>
100+
<version>1.3.2</version>
101+
</dependency>
102+
103+
<!-- For the SSH API -->
104+
<dependency>
105+
<groupId>net.schmizz</groupId>
106+
<artifactId>sshj</artifactId>
107+
<version>0.9.0</version>
108+
</dependency>
109+
110+
111+
<!-- Hadoop Dependencies -->
112+
<dependency>
113+
<groupId>org.apache.flume</groupId>
114+
<artifactId>flume-ng-core</artifactId>
115+
<version>${flume.version}</version>
116+
<scope>provided</scope>
117+
</dependency>
118+
<dependency>
119+
<groupId>org.apache.flume</groupId>
120+
<artifactId>flume-ng-sdk</artifactId>
121+
<version>${flume.version}</version>
122+
<scope>provided</scope>
123+
</dependency>
124+
<dependency>
125+
<groupId>org.apache.hadoop</groupId>
126+
<artifactId>hadoop-common</artifactId>
127+
<version>${hadoop.version}</version>
128+
<scope>provided</scope>
129+
</dependency>
130+
</dependencies>
131+
132+
<repositories>
133+
<repository>
134+
<id>cloudera</id>
135+
<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
136+
<releases>
137+
<enabled>true</enabled>
138+
</releases>
139+
<snapshots>
140+
<enabled>false</enabled>
141+
</snapshots>
142+
</repository>
143+
</repositories>
144+
</project>
Binary file not shown.
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package com.cloudera.flume.source;
2+
3+
import java.util.ArrayList;
4+
import java.util.HashMap;
5+
import java.util.Map;
6+
7+
import org.apache.flume.Context;
8+
import org.apache.flume.Event;
9+
import org.apache.flume.event.EventBuilder;
10+
import org.apache.flume.conf.Configurable;
11+
import org.apache.flume.source.AbstractSource;
12+
import org.apache.flume.PollableSource;
13+
import org.apache.flume.EventDeliveryException;
14+
15+
import org.slf4j.Logger;
16+
import org.slf4j.LoggerFactory;
17+
18+
import com.cloudera.sshHelper.SshClientJ;
19+
20+
public class SshSpoolDirectorySource extends AbstractSource
21+
implements Configurable, PollableSource
22+
{
23+
private static final Logger logger =
24+
LoggerFactory.getLogger(SshSpoolDirectorySource.class);
25+
26+
private String hostName, userName, userPass;
27+
private String remoteSpoolPath;
28+
private String localPersistPath;
29+
30+
private SshClientJ sshClient ;
31+
private SshSpoolStateManager filesState;
32+
33+
@Override
34+
public void configure(Context context) {
35+
hostName = context.getString( SshSpoolDirectorySourceConstants.HOST_NAME );
36+
userName = context.getString( SshSpoolDirectorySourceConstants.USER_NAME );
37+
userPass = context.getString( SshSpoolDirectorySourceConstants.USER_PASS );
38+
39+
remoteSpoolPath = context.getString( SshSpoolDirectorySourceConstants.REMOTE_DIR_PATH );
40+
localPersistPath = context.getString( SshSpoolDirectorySourceConstants.LOCAL_PERSIST_PATH );
41+
42+
sshClient = new SshClientJ( hostName, userName, userPass );
43+
filesState = new SshSpoolStateManager( localPersistPath );
44+
}
45+
46+
@Override
47+
public void start() { }
48+
49+
@Override
50+
public void stop () {
51+
filesState.saveState();
52+
}
53+
54+
@Override
55+
public Status process() throws EventDeliveryException {
56+
57+
// Get pending files
58+
ArrayList< String > pendingFiles;
59+
try {
60+
ArrayList< String > files = sshClient.getFilesInPath( remoteSpoolPath );
61+
filesState.addProcessingList( files );
62+
pendingFiles = filesState.getPending();
63+
} catch (Exception e) {
64+
logger.error( e.toString() );
65+
return Status.BACKOFF;
66+
}
67+
68+
// Start transaction
69+
for( String file: pendingFiles )
70+
{
71+
try {
72+
73+
filesState.markInProcess( file );
74+
byte[] fileContents = sshClient.getContents( file );
75+
if( fileContents == null ) {
76+
logger.error( "Unable to retrieve contents: " + file );
77+
logger.error( "Marking file in error state: " + file );
78+
filesState.markError( file );
79+
continue;
80+
}
81+
Map< String, String > headers = new HashMap< String, String >();
82+
headers.put( "filename", file );
83+
84+
Event e = EventBuilder.withBody( fileContents, headers );
85+
// Store the Event into this Source's associated Channel(s)
86+
getChannelProcessor().processEvent(e);
87+
88+
filesState.markFinished( file );
89+
logger.info("Successfully parsed: " + file );
90+
91+
} catch (Throwable t) {
92+
// Log exception, handle individual exceptions as needed
93+
logger.error( "While processing: " + file + " - " + t.toString() );
94+
filesState.markError( file );
95+
}
96+
97+
}
98+
99+
return Status.READY;
100+
}
101+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package com.cloudera.flume.source;
2+
3+
public class SshSpoolDirectorySourceConstants {
4+
public static final String USER_NAME = "userName";
5+
public static final String USER_PASS = "userPass";
6+
public static final String HOST_NAME = "hostName";
7+
8+
public static final String REMOTE_DIR_PATH = "remotePath";
9+
public static final String LOCAL_PERSIST_PATH = "localPersistPath";
10+
}

0 commit comments

Comments
 (0)