diff --git a/.project b/.project index ec866b2..4f48a53 100644 --- a/.project +++ b/.project @@ -15,14 +15,8 @@ - - org.springframework.ide.eclipse.core.springbuilder - - - - org.springframework.ide.eclipse.core.springnature org.maven.ide.eclipse.maven2Nature org.eclipse.jdt.core.javanature diff --git a/pom.xml b/pom.xml index 5524e7e..316a644 100644 --- a/pom.xml +++ b/pom.xml @@ -18,7 +18,7 @@ false - + com.springsource.repository.bundles.release @@ -158,7 +158,7 @@ org.apache.hadoop hadoop-core - 0.20.2-cdh3u3 + 0.20.2-cdh3u3 diff --git a/src/main/java/com/cloudera/fbus/HDFSDirectoryDestination.java b/src/main/java/com/cloudera/fbus/HDFSDirectoryDestination.java index 450783c..c23a1dd 100644 --- a/src/main/java/com/cloudera/fbus/HDFSDirectoryDestination.java +++ b/src/main/java/com/cloudera/fbus/HDFSDirectoryDestination.java @@ -18,12 +18,12 @@ */ public class HDFSDirectoryDestination { - private static final String tmpSuffix = ".tmp"; - private static final Logger logger = LoggerFactory + protected static final String tmpSuffix = ".tmp"; + protected static final Logger logger = LoggerFactory .getLogger(HDFSDirectoryDestination.class); - private File directory; - private FileSystem fileSystem; + protected File directory; + protected FileSystem fileSystem; public HDFSDirectoryDestination() throws IOException { fileSystem = FileSystem.get(new Configuration()); @@ -38,12 +38,12 @@ public void deliver(File file) throws DeliveryException { Assert.isTrue(file.canRead(), "File " + file + " does not exist or is unreadable"); - source = new Path(file.getPath()); - destinationTmp = new Path(directory.getPath(), source.getName() + tmpSuffix); - destination = new Path(directory.getPath(), source.getName()); + + destinationTmp = new Path(directory.getPath(), file.getName() + tmpSuffix); + destination = new Path(directory.getPath(), file.getName()); try { - fileSystem.copyFromLocalFile(false, source, destinationTmp); + copyFromLocalFile(file, destinationTmp); if (!fileSystem.rename(destinationTmp, destination)) { if (!fileSystem.delete(destinationTmp, false)) { @@ -73,6 +73,12 @@ public void deliver(File file) throws DeliveryException { } } + protected void copyFromLocalFile(File file, Path destinationTmp) throws IOException { + Path source; + source = new Path(file.getPath()); + fileSystem.copyFromLocalFile(false, source, destinationTmp); + } + public File getDirectory() { return directory; } diff --git a/src/main/java/com/cloudera/fbus/HDFSSequenceFileDirectoryDestination.java b/src/main/java/com/cloudera/fbus/HDFSSequenceFileDirectoryDestination.java index 886a4dc..f83b69a 100644 --- a/src/main/java/com/cloudera/fbus/HDFSSequenceFileDirectoryDestination.java +++ b/src/main/java/com/cloudera/fbus/HDFSSequenceFileDirectoryDestination.java @@ -24,74 +24,21 @@ * {@link File} payload to the Hadoop Distributed File System. * */ -public class HDFSSequenceFileDirectoryDestination { +public class HDFSSequenceFileDirectoryDestination extends HDFSDirectoryDestination{ - private static final String tmpSuffix = ".tmp"; - private static final Logger logger = LoggerFactory - .getLogger(HDFSDirectoryDestination.class); - - private File directory; - private FileSystem fileSystem; - private CompressionCodec compressionCodec = new SnappyCodec(); //Snappy is the default compression - private Configuration config; + protected CompressionCodec compressionCodec = new SnappyCodec(); //Snappy is the default compression public HDFSSequenceFileDirectoryDestination() throws IOException { - config = new Configuration(); - fileSystem = FileSystem.get(config); + super(); } - public void deliver(File file) throws DeliveryException { - Path source; - Path destinationTmp; - Path destination; - - Assert.notNull(file, "File may not be null"); - Assert.isTrue(file.canRead(), "File " + file - + " does not exist or is unreadable"); - - source = new Path(file.getPath()); - destinationTmp = new Path(directory.getPath(), source.getName() + tmpSuffix); - destination = new Path(directory.getPath(), source.getName()); - - try { - - copyFromLocalToSeqFile(file, destinationTmp); - - if (!fileSystem.rename(destinationTmp, destination)) { - if (!fileSystem.delete(destinationTmp, false)) { - logger - .warn( - "Failed to clean up temporary file at {} after rename failed. File will be retried.", - destinationTmp); - } - - throw new IOException("Can't rename " + destinationTmp + " to " - + destination); - } else { - if (!file.delete()) { - /* - * Purposefully do not throw an exception here as it would cause retry - * as well. -esammer - */ - logger - .error( - "Unable to delete file {} after moving it to HDFS @ {}. File may be retransfered!", - file, destination); - } - } - } catch (Throwable e) { - throw DeliveryException.newWith(MessageBuilder.withPayload(file).build(), - e); - } - } - - private void copyFromLocalToSeqFile(File file, Path destinationTmp) throws IOException, FileNotFoundException { + @Override + protected void copyFromLocalFile(File file, Path destinationTmp) throws IOException { SequenceFile.Writer writer = null; BufferedReader reader = null; - try{ - writer = SequenceFile.createWriter(fileSystem, config, destinationTmp, NullWritable.class, Text.class, SequenceFile.CompressionType.BLOCK, compressionCodec); + writer = SequenceFile.createWriter(fileSystem, fileSystem.getConf(), destinationTmp, NullWritable.class, Text.class, SequenceFile.CompressionType.BLOCK, compressionCodec); reader = new BufferedReader(new FileReader(file)); String line = null; @@ -99,7 +46,9 @@ private void copyFromLocalToSeqFile(File file, Path destinationTmp) throws IOExc while((line = reader.readLine()) != null){ text.set(line); writer.append(NullWritable.get(), text); - } + } + }catch(Exception e){ + throw new IOException(e); }finally { if (writer != null) @@ -113,14 +62,6 @@ private void copyFromLocalToSeqFile(File file, Path destinationTmp) throws IOExc } } - public File getDirectory() { - return directory; - } - - public void setDirectory(File directory) { - this.directory = directory; - } - public CompressionCodec getCompressionCodec() { return compressionCodec;