Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Made the sequence writer extend the default writer #2

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions .project
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,8 @@
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.springframework.ide.eclipse.core.springbuilder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.springframework.ide.eclipse.core.springnature</nature>
<nature>org.maven.ide.eclipse.maven2Nature</nature>
<nature>org.eclipse.jdt.core.javanature</nature>
</natures>
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repository>

<repository>
<id>com.springsource.repository.bundles.release</id>
Expand Down Expand Up @@ -158,7 +158,7 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>0.20.2-cdh3u3</version>
<version>0.20.2-cdh3u3</version>
</dependency>

</dependencies>
Expand Down
22 changes: 14 additions & 8 deletions src/main/java/com/cloudera/fbus/HDFSDirectoryDestination.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
*/
public class HDFSDirectoryDestination {

private static final String tmpSuffix = ".tmp";
private static final Logger logger = LoggerFactory
protected static final String tmpSuffix = ".tmp";
protected static final Logger logger = LoggerFactory
.getLogger(HDFSDirectoryDestination.class);

private File directory;
private FileSystem fileSystem;
protected File directory;
protected FileSystem fileSystem;

public HDFSDirectoryDestination() throws IOException {
fileSystem = FileSystem.get(new Configuration());
Expand All @@ -38,12 +38,12 @@ public void deliver(File file) throws DeliveryException {
Assert.isTrue(file.canRead(), "File " + file
+ " does not exist or is unreadable");

source = new Path(file.getPath());
destinationTmp = new Path(directory.getPath(), source.getName() + tmpSuffix);
destination = new Path(directory.getPath(), source.getName());

destinationTmp = new Path(directory.getPath(), file.getName() + tmpSuffix);
destination = new Path(directory.getPath(), file.getName());

try {
fileSystem.copyFromLocalFile(false, source, destinationTmp);
copyFromLocalFile(file, destinationTmp);

if (!fileSystem.rename(destinationTmp, destination)) {
if (!fileSystem.delete(destinationTmp, false)) {
Expand Down Expand Up @@ -73,6 +73,12 @@ public void deliver(File file) throws DeliveryException {
}
}

protected void copyFromLocalFile(File file, Path destinationTmp) throws IOException {
Path source;
source = new Path(file.getPath());
fileSystem.copyFromLocalFile(false, source, destinationTmp);
}

public File getDirectory() {
return directory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,82 +24,31 @@
* {@link File} payload to the Hadoop Distributed File System.
*
*/
public class HDFSSequenceFileDirectoryDestination {
public class HDFSSequenceFileDirectoryDestination extends HDFSDirectoryDestination{

private static final String tmpSuffix = ".tmp";
private static final Logger logger = LoggerFactory
.getLogger(HDFSDirectoryDestination.class);

private File directory;
private FileSystem fileSystem;
private CompressionCodec compressionCodec = new SnappyCodec(); //Snappy is the default compression
private Configuration config;
protected CompressionCodec compressionCodec = new SnappyCodec(); //Snappy is the default compression

public HDFSSequenceFileDirectoryDestination() throws IOException {
config = new Configuration();
fileSystem = FileSystem.get(config);
super();
}

public void deliver(File file) throws DeliveryException {
Path source;
Path destinationTmp;
Path destination;

Assert.notNull(file, "File may not be null");
Assert.isTrue(file.canRead(), "File " + file
+ " does not exist or is unreadable");

source = new Path(file.getPath());
destinationTmp = new Path(directory.getPath(), source.getName() + tmpSuffix);
destination = new Path(directory.getPath(), source.getName());

try {

copyFromLocalToSeqFile(file, destinationTmp);

if (!fileSystem.rename(destinationTmp, destination)) {
if (!fileSystem.delete(destinationTmp, false)) {
logger
.warn(
"Failed to clean up temporary file at {} after rename failed. File will be retried.",
destinationTmp);
}

throw new IOException("Can't rename " + destinationTmp + " to "
+ destination);
} else {
if (!file.delete()) {
/*
* Purposefully do not throw an exception here as it would cause retry
* as well. -esammer
*/
logger
.error(
"Unable to delete file {} after moving it to HDFS @ {}. File may be retransfered!",
file, destination);
}
}
} catch (Throwable e) {
throw DeliveryException.newWith(MessageBuilder.withPayload(file).build(),
e);
}
}

private void copyFromLocalToSeqFile(File file, Path destinationTmp) throws IOException, FileNotFoundException {
@Override
protected void copyFromLocalFile(File file, Path destinationTmp) throws IOException {
SequenceFile.Writer writer = null;
BufferedReader reader = null;


try{
writer = SequenceFile.createWriter(fileSystem, config, destinationTmp, NullWritable.class, Text.class, SequenceFile.CompressionType.BLOCK, compressionCodec);
writer = SequenceFile.createWriter(fileSystem, fileSystem.getConf(), destinationTmp, NullWritable.class, Text.class, SequenceFile.CompressionType.BLOCK, compressionCodec);
reader = new BufferedReader(new FileReader(file));

String line = null;
Text text = new Text();
while((line = reader.readLine()) != null){
text.set(line);
writer.append(NullWritable.get(), text);
}
}
}catch(Exception e){
throw new IOException(e);
}finally
{
if (writer != null)
Expand All @@ -113,14 +62,6 @@ private void copyFromLocalToSeqFile(File file, Path destinationTmp) throws IOExc
}
}

public File getDirectory() {
return directory;
}

public void setDirectory(File directory) {
this.directory = directory;
}

public CompressionCodec getCompressionCodec()
{
return compressionCodec;
Expand Down