diff --git a/README.md b/README.md index b8b1c55..9a07b7e 100644 --- a/README.md +++ b/README.md @@ -4,10 +4,16 @@ kafka-s3-consumer Store batched Kafka messages in S3. Build +----------------- mvn package Run +----------------- java -jar kafka-s3-consumer-1.0.jar + #in order to start after some time of inactivity (so there are logs stored, + #but the gap is larger between last uploaded log end the first record in kafka + + java -jar kafka-s3-consumer-1.0.jar clean \ No newline at end of file diff --git a/kafka-s3-consumer.properties b/kafka-s3-consumer.properties new file mode 100644 index 0000000..78078ad --- /dev/null +++ b/kafka-s3-consumer.properties @@ -0,0 +1,21 @@ +# Kafka settings +kafka.host=127.0.0.1 +kafka.port=9092 +kafka.brokerid=0 +kafka.topics=topic:1 +#max size of a single message in the queue +kafka.maxmessagesize=4096 + +# S3 settings +s3.accesskey=$AWS_ACCESS_KEY_ID +s3.secretkey=$AWS_SECRET_ACCESS_KEY + +# Consumer settings +s3.bucket=persuasionapi-test +s3.prefix=s3-consumer + +#object size 1024*1024*10 = 10mb (10506200) +s3.maxobjectsize=10506200 + +#compress the file which is send to s3 +s3.compression=true diff --git a/pom.xml b/pom.xml index 978a0b2..87e770b 100644 --- a/pom.xml +++ b/pom.xml @@ -42,7 +42,7 @@ kafka kafka - 0.7.1 + 0.7.2 diff --git a/repo/kafka/kafka/0.7.2/kafka-0.7.2.jar b/repo/kafka/kafka/0.7.2/kafka-0.7.2.jar new file mode 100644 index 0000000..894b516 Binary files /dev/null and b/repo/kafka/kafka/0.7.2/kafka-0.7.2.jar differ diff --git a/src/main/java/kafka/s3/consumer/App.java b/src/main/java/kafka/s3/consumer/App.java index 95b46dc..fb35428 100644 --- a/src/main/java/kafka/s3/consumer/App.java +++ b/src/main/java/kafka/s3/consumer/App.java @@ -1,241 +1,350 @@ package kafka.s3.consumer; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import kafka.api.FetchRequest; +import kafka.api.OffsetRequest; +import kafka.javaapi.consumer.SimpleConsumer; +import kafka.javaapi.message.MessageSet; +import kafka.message.MessageAndOffset; +import org.apache.log4j.Logger; + +import java.io.*; +import java.nio.MappedByteBuffer; +import java.nio.charset.Charset; import java.nio.channels.Channels; +import java.nio.channels.FileChannel; import java.nio.channels.WritableByteChannel; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; +import java.util.*; +import java.util.regex.Pattern; +import java.util.regex.Matcher; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.zip.GZIPOutputStream; -import kafka.api.FetchRequest; -import kafka.javaapi.consumer.SimpleConsumer; -import kafka.javaapi.message.MessageSet; -import kafka.message.MessageAndOffset; +public class App { + static Logger logger = Logger.getLogger(App.class); -import org.apache.log4j.Logger; + static Configuration conf; + private static ExecutorService pool; + private static boolean cleanStart = false; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.services.s3.AmazonS3Client; -import com.amazonaws.services.s3.model.ListObjectsRequest; -import com.amazonaws.services.s3.model.S3ObjectSummary; + /* + mvn exec:java -Dexec.mainClass="kafka.s3.consumer.App" -Dexec.args="app.properties" + */ + public static void main(String[] args) throws IOException, java.lang.InterruptedException { -public class App -{ - static Logger logger = Logger.getLogger(App.class); + conf = loadConfiguration(args); - static Configuration conf; - private static ExecutorService pool; + Map topics = conf.getTopicsAndPartitions(); - /* - mvn exec:java -Dexec.mainClass="kafka.s3.consumer.App" -Dexec.args="app.properties" - */ - public static void main( String[] args ) throws IOException, java.lang.InterruptedException { + List workers = new LinkedList(); - conf = loadConfiguration(args); + for (String topic : topics.keySet()) { + for (int partition = 0; partition < topics.get(topic); partition++) { + workers.add(new Worker(topic, partition)); + } + } - Map topics = conf.getTopicsAndPartitions(); + pool = Executors.newFixedThreadPool(workers.size()); - List workers = new LinkedList(); + Runtime.getRuntime().addShutdownHook(new GracefulWorkerShutdown(pool)); - for (String topic: topics.keySet()) { - for (int partition=0; partition messages = new MessageStream(topic, partition, offset); + while (messages.hasNext()) { + MessageAndOffset messageAndOffset = messages.next(); + this.sink.append(messageAndOffset); + } + } catch (IOException e) { + closeAndNullifySink("exception encountered"); + throw new RuntimeException(e); + } + finally { + closeAndNullifySink("thread run() completed"); + } + } + + private void closeAndNullifySink(String logline) { + if (this.sink != null) { + try { + logger.debug("Writing out current buffer to s3 before termination: " + logline); + this.sink.exportCurrentChunkToS3(); + this.sink = null; + } + catch(IOException ignored) {} + } + } + + @Override + protected void finalize() throws Throwable { + try{ + if (this.sink != null) { this.sink.exportCurrentChunkToS3(); } + } catch(Throwable t) { + throw t; + } finally { + super.finalize(); + } + } + } - try { - S3Sink sink = new S3Sink(topic,partition); - long offset = sink.getMaxCommittedOffset(); - Iterator messages = new MessageStream(topic,partition,offset); - while (messages.hasNext()) { - MessageAndOffset messageAndOffset = messages.next(); - sink.append(messageAndOffset); + private static Configuration loadConfiguration(String[] args) { + Properties props = new Properties(); + + try { + if(args.length > 0) { + props.load(new ByteArrayInputStream(resolveEnvVars(args[0]).getBytes())); + if (args.length >= 2) { + if (args[1].equals("clean")) { + cleanStart = true; + } + } + } else { + System.err.println("Usage: jar -jar property file "); + System.exit(1); + } + } catch (IOException e) { + throw new RuntimeException(e); } - } catch (IOException e) { - throw new RuntimeException(e); - } + return new PropertyConfiguration(props); } - } - - private static Configuration loadConfiguration(String[] args) { - Properties props = new Properties(); - - try { - if (args == null || args.length != 1) { - props.load(App.class.getResourceAsStream("/app.properties")); - } else { - props.load(new FileInputStream(new File(args[0]))); - } - } catch (IOException e) { - throw new RuntimeException(e); + + /* Environment variable substitution for properties files from http://stackoverflow.com/a/9725352/370800 + * Returns input string with environment variable references expanded, e.g. $SOME_VAR or ${SOME_VAR} + */ + private static String resolveEnvVars(String filename) throws IOException { + String input; + FileInputStream stream = new FileInputStream(new File(filename)); + try { + FileChannel fc = stream.getChannel(); + MappedByteBuffer bb = fc.map(FileChannel.MapMode.READ_ONLY, 0, fc.size()); + /* Instead of using default, pass in a decoder. */ + input = Charset.defaultCharset().decode(bb).toString(); + } + finally { + stream.close(); + } + + if (null == input) { + return null; + } + // match ${ENV_VAR_NAME} or $ENV_VAR_NAME + Pattern p = Pattern.compile("\\$\\{(\\w+)\\}|\\$(\\w+)"); + Matcher m = p.matcher(input); // get a matcher object + StringBuffer sb = new StringBuffer(); + while (m.find()) { + String envVarName = null == m.group(1) ? m.group(2) : m.group(1); + String envVarValue = System.getenv(envVarName); + m.appendReplacement(sb, null == envVarValue ? "" : envVarValue); + } + m.appendTail(sb); + return sb.toString(); } - return new PropertyConfiguration(props); - } - private static class S3Sink { - private String topic; - private int partition; + private static class S3Sink { - private String bucket; - private AmazonS3Client awsClient; + private String topic; + private int partition; + private boolean compression; - long startOffset; - long endOffset; - int bytesWritten; + private String bucket; + private AmazonS3Client awsClient; - File tmpFile; - OutputStream tmpOutputStream; - WritableByteChannel tmpChannel; + long startOffset; + long endOffset; + int bytesWritten; - public S3Sink(String topic, int partition) throws FileNotFoundException, IOException { - this.topic = topic; - this.partition = partition; + File tmpFile; + OutputStream tmpOutputStream; + WritableByteChannel tmpChannel; - bucket = conf.getS3Bucket(); - awsClient = new AmazonS3Client(new BasicAWSCredentials(conf.getS3AccessKey(), conf.getS3SecretKey())); + public S3Sink(String topic, int partition, boolean compression) throws FileNotFoundException, IOException { + this.topic = topic; + this.partition = partition; + this.compression = compression; - startOffset = endOffset = fetchLastCommittedOffset(); - bytesWritten = 0; + bucket = conf.getS3Bucket(); + awsClient = new AmazonS3Client(new BasicAWSCredentials(conf.getS3AccessKey(), conf.getS3SecretKey())); - tmpFile = File.createTempFile("s3sink", null); - logger.debug("Created tmpFile: " + tmpFile); - tmpOutputStream = new FileOutputStream(tmpFile); - tmpChannel = Channels.newChannel(tmpOutputStream); - } + startOffset = endOffset = fetchLastCommittedOffset(); + bytesWritten = 0; - public void append(MessageAndOffset messageAndOffset) throws IOException { - - int messageSize = messageAndOffset.message().payload().remaining(); - logger.debug("Appending message with size: " + messageSize); - - if (bytesWritten + messageSize + 1 > conf.getS3MaxObjectSize()) { - logger.debug("Uploading chunk to S3. Size is: " + bytesWritten); - String key = getKeyPrefix() + startOffset + "_" + endOffset; - awsClient.putObject(bucket, key, tmpFile); - tmpChannel.close(); - tmpOutputStream.close(); - tmpFile.delete(); - tmpFile = File.createTempFile("s3sink", null); - logger.debug("Created tmpFile: " + tmpFile); - tmpOutputStream = new FileOutputStream(tmpFile); - tmpChannel = Channels.newChannel(tmpOutputStream); - startOffset = endOffset; - bytesWritten = 0; - } - - tmpChannel.write(messageAndOffset.message().payload()); - tmpOutputStream.write('\n'); - bytesWritten += messageSize + 1; - - endOffset = messageAndOffset.offset(); - } + tmpFile = File.createTempFile("s3sink", null); + logger.debug("Created tmpFile: " + tmpFile); - public long getMaxCommittedOffset() { - return startOffset; - } + logger.debug("Compression: " + compression); + if (compression) { + tmpOutputStream = new GZIPOutputStream(new FileOutputStream(tmpFile)); + } else { + tmpOutputStream = new FileOutputStream(tmpFile); + } + tmpChannel = Channels.newChannel(tmpOutputStream); + } - public long fetchLastCommittedOffset() { - logger.debug("Getting max offset for " + topic + ":" + partition); - String prefix = getKeyPrefix(); - logger.debug("Listing keys for bucket/prefix " + bucket + "/" + prefix); - List objectSummaries = awsClient.listObjects(new ListObjectsRequest().withBucketName(bucket).withDelimiter("/").withPrefix(prefix)).getObjectSummaries(); - logger.debug("Received result " + objectSummaries); - - long maxOffset = 0; - - for (S3ObjectSummary objectSummary : objectSummaries) { - logger.debug(objectSummary.getKey()); - String[] offsets = objectSummary.getKey().substring(prefix.length()).split("_"); - long endOffset = Long.valueOf(offsets[1]); - if (endOffset > maxOffset) - maxOffset = endOffset; - } - return maxOffset; - } + public void exportCurrentChunkToS3() throws IOException { + logger.debug("Uploading chunk to S3. Size is: " + bytesWritten); + String key = getKeyPrefix() + startOffset + "_" + endOffset; + awsClient.putObject(bucket, key, tmpFile); + tmpChannel.close(); + tmpOutputStream.close(); + tmpFile.delete(); + tmpFile = File.createTempFile("s3sink", null); + logger.debug("Created tmpFile: " + tmpFile); + if (compression) { + tmpOutputStream = new GZIPOutputStream(new FileOutputStream(tmpFile)); + } else { + tmpOutputStream = new FileOutputStream(tmpFile); + } + tmpChannel = Channels.newChannel(tmpOutputStream); + startOffset = endOffset; + bytesWritten = 0; + } - private String getKeyPrefix() { - return conf.getS3Prefix() + "/" + topic + "/" + conf.getKafkaBrokerId() + "_" + partition + "_"; - } - } + public void append(MessageAndOffset messageAndOffset) throws IOException { - private static class MessageStream implements Iterator { + int messageSize = messageAndOffset.message().payload().remaining(); + logger.debug("Appending message with size: " + messageSize); - private SimpleConsumer consumer; - private Iterator messageSetIterator; + if (bytesWritten + messageSize + 1 > conf.getS3MaxObjectSize()) { + exportCurrentChunkToS3(); + } - private String topic; - private int partition; - private long offset; + tmpChannel.write(messageAndOffset.message().payload()); + tmpOutputStream.write('\n'); + bytesWritten += messageSize + 1; - public MessageStream(String topic, int partition, long offset) { - logger.debug("Message stream created: " + topic + ":" + partition + "/" + offset); - this.topic = topic; - this.partition = partition; - this.offset = offset; - consumer = new SimpleConsumer(conf.getKafkaHost(), conf.getKafkaPort(), 5000, 4*1024); - logger.debug("Created kafka consumer: " + consumer); - } + endOffset = messageAndOffset.offset(); + } - @Override - public boolean hasNext() { - return true; - } + public long getMaxCommittedOffset() { + return startOffset; + } - @Override - public MessageAndOffset next() { - if (messageSetIterator == null || !messageSetIterator.hasNext()) { - logger.debug("Fetching message from offset: " + offset); - FetchRequest fetchRequest = new FetchRequest(topic, partition, offset, conf.getKafkaMaxMessageSize()); - MessageSet messageSet = consumer.fetch(fetchRequest); - while (!messageSet.iterator().hasNext()) { - logger.debug("No messages returned. Sleeping for 10s."); - try { - Thread.sleep(10000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - messageSet = consumer.fetch(fetchRequest); - } - messageSetIterator = messageSet.iterator(); - } - MessageAndOffset message = messageSetIterator.next(); - offset = message.offset(); - return message; + public long fetchLastCommittedOffset() { + logger.debug("Getting max offset for " + topic + ":" + partition); + String prefix = getKeyPrefix(); + logger.debug("Listing keys for bucket/prefix " + bucket + "/" + prefix); + List objectSummaries = awsClient.listObjects(new ListObjectsRequest().withBucketName(bucket).withDelimiter("/").withPrefix(prefix)).getObjectSummaries(); + logger.debug("Received result " + objectSummaries); + + long maxOffset = 0; + + for (S3ObjectSummary objectSummary : objectSummaries) { + logger.debug(objectSummary.getKey()); + String[] offsets = objectSummary.getKey().substring(prefix.length()).split("_"); + long endOffset = Long.valueOf(offsets[1]); + if (endOffset > maxOffset) + maxOffset = endOffset; + } + /** + * Fix to start in the beginning after rotate, to avoid errors + */ + + + return maxOffset; + } + + private String getKeyPrefix() { + return conf.getS3Prefix() + "/" + topic + "/" + conf.getKafkaBrokerId() + "_" + partition + "_"; + } } - @Override - public void remove() { - throw new UnsupportedOperationException("Method remove is not supported by this iterator."); + private static class MessageStream implements Iterator { + + private SimpleConsumer consumer; + private Iterator messageSetIterator; + + private String topic; + private int partition; + private long offset; + + public MessageStream(String topic, int partition, long offset) { + logger.debug("Message stream created: " + topic + ":" + partition + "/" + offset); + this.topic = topic; + this.partition = partition; + this.offset = offset; + consumer = new SimpleConsumer(conf.getKafkaHost(), conf.getKafkaPort(), 5000, 4 * 1024); + logger.debug("Created kafka consumer: " + consumer); + } + + @Override + public boolean hasNext() { + return true; + } + + @Override + public MessageAndOffset next() { + try { + if (offset == 0 || cleanStart) { + offset = consumer.getOffsetsBefore(topic, partition, OffsetRequest.EarliestTime(), 1)[0]; + logger.info("Offset re-configured to :" + offset); + cleanStart = false; + } + + if (messageSetIterator == null || !messageSetIterator.hasNext()) { + logger.debug("Fetching message from offset: " + offset); + FetchRequest fetchRequest = new FetchRequest(topic, partition, offset, conf.getKafkaMaxMessageSize()); + MessageSet messageSet = consumer.fetch(fetchRequest); + while (!messageSet.iterator().hasNext()) { + logger.debug("No messages returned. Sleeping for 10s."); + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + messageSet = consumer.fetch(fetchRequest); + } + messageSetIterator = messageSet.iterator(); + } + MessageAndOffset message = messageSetIterator.next(); + offset = message.offset(); + return message; + } catch (Exception e) { + e.printStackTrace(); + } + return null; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Method remove is not supported by this iterator."); + } } - } } + diff --git a/src/main/java/kafka/s3/consumer/Configuration.java b/src/main/java/kafka/s3/consumer/Configuration.java index e526f1e..87c5b59 100644 --- a/src/main/java/kafka/s3/consumer/Configuration.java +++ b/src/main/java/kafka/s3/consumer/Configuration.java @@ -16,4 +16,5 @@ public interface Configuration { public int getS3MaxObjectSize(); public int getKafkaMaxMessageSize(); + public boolean isCompressed(); } diff --git a/src/main/java/kafka/s3/consumer/PropertyConfiguration.java b/src/main/java/kafka/s3/consumer/PropertyConfiguration.java index 62c7459..38db0ce 100644 --- a/src/main/java/kafka/s3/consumer/PropertyConfiguration.java +++ b/src/main/java/kafka/s3/consumer/PropertyConfiguration.java @@ -6,119 +6,126 @@ public class PropertyConfiguration implements Configuration { - private final Properties props; - - private static final String PROP_S3_ACCESS_KEY = "s3.accesskey"; - private static final String PROP_S3_SECRET_KEY = "s3.secretkey"; - private static final String PROP_S3_BUCKET = "s3.bucket"; - private static final String PROP_S3_PREFIX = "s3.prefix"; - private static final String PROP_KAFKA_HOST = "kafka.host"; - private static final String PROP_KAFKA_PORT = "kafka.port"; - private static final String PROP_KAFKA_BROKER_ID = "kafka.brokerid"; - private static final String PROP_S3_MAX_OBJECT_SIZE = "s3.maxobjectsize"; - private static final String PROP_KAFKA_MAX_MESSAGE_SIZE = "kafka.maxmessagesize"; - private static final String PROP_KAFKA_TOPICS = "kafka.topics"; - - public PropertyConfiguration(Properties props) { - this.props = props; - } - - @Override - public String getS3AccessKey() { - String s3AccessKey = props.getProperty(PROP_S3_ACCESS_KEY); - if (s3AccessKey == null || s3AccessKey.isEmpty()) { - throw new RuntimeException("Invalid property " + PROP_S3_ACCESS_KEY); + private final Properties props; + + private static final String PROP_S3_ACCESS_KEY = "s3.accesskey"; + private static final String PROP_S3_SECRET_KEY = "s3.secretkey"; + private static final String PROP_S3_BUCKET = "s3.bucket"; + private static final String PROP_S3_PREFIX = "s3.prefix"; + private static final String PROP_KAFKA_HOST = "kafka.host"; + private static final String PROP_KAFKA_PORT = "kafka.port"; + private static final String PROP_KAFKA_BROKER_ID = "kafka.brokerid"; + private static final String PROP_S3_MAX_OBJECT_SIZE = "s3.maxobjectsize"; + private static final String PROP_KAFKA_MAX_MESSAGE_SIZE = "kafka.maxmessagesize"; + private static final String PROP_KAFKA_TOPICS = "kafka.topics"; + private static final String PROP_S3_COMPRESSION = "s3.compression"; + + public PropertyConfiguration(Properties props) { + this.props = props; } - return s3AccessKey; - } - - @Override - public String getS3SecretKey() { - String s3SecretKey = props.getProperty(PROP_S3_SECRET_KEY); - if (s3SecretKey == null || s3SecretKey.isEmpty()) { - throw new RuntimeException("Invalid property " + PROP_S3_SECRET_KEY); + + @Override + public String getS3AccessKey() { + String s3AccessKey = props.getProperty(PROP_S3_ACCESS_KEY); + if (s3AccessKey == null || s3AccessKey.isEmpty()) { + throw new RuntimeException("Invalid property " + PROP_S3_ACCESS_KEY); + } + return s3AccessKey; } - return s3SecretKey; - } - - @Override - public String getS3Bucket() { - String s3Bucket = props.getProperty(PROP_S3_BUCKET); - if (s3Bucket == null || s3Bucket.isEmpty()) { - throw new RuntimeException("Invalid property " + PROP_S3_BUCKET); + + @Override + public String getS3SecretKey() { + String s3SecretKey = props.getProperty(PROP_S3_SECRET_KEY); + if (s3SecretKey == null || s3SecretKey.isEmpty()) { + throw new RuntimeException("Invalid property " + PROP_S3_SECRET_KEY); + } + return s3SecretKey; } - return s3Bucket; - } - - @Override - public String getS3Prefix() { - String s3Prefix = props.getProperty(PROP_S3_PREFIX); - if (s3Prefix == null || s3Prefix.isEmpty()) { - throw new RuntimeException("Invalid property " + PROP_S3_PREFIX); + + @Override + public String getS3Bucket() { + String s3Bucket = props.getProperty(PROP_S3_BUCKET); + if (s3Bucket == null || s3Bucket.isEmpty()) { + throw new RuntimeException("Invalid property " + PROP_S3_BUCKET); + } + return s3Bucket; } - return s3Prefix.replaceAll("/$", ""); - } - - @Override - public String getKafkaHost() { - String kafkaHost = props.getProperty(PROP_KAFKA_HOST); - if (kafkaHost == null || kafkaHost.isEmpty()) { - throw new RuntimeException("Invalid property " + PROP_KAFKA_HOST); + + @Override + public String getS3Prefix() { + String s3Prefix = props.getProperty(PROP_S3_PREFIX); + if (s3Prefix == null || s3Prefix.isEmpty()) { + throw new RuntimeException("Invalid property " + PROP_S3_PREFIX); + } + return s3Prefix.replaceAll("/$", ""); } - return kafkaHost; - } - - @Override - public int getKafkaPort() { - String kafkaPort = props.getProperty(PROP_KAFKA_PORT); - if (kafkaPort == null || kafkaPort.isEmpty()) { - throw new RuntimeException("Invalid property " + PROP_KAFKA_PORT); + + @Override + public String getKafkaHost() { + String kafkaHost = props.getProperty(PROP_KAFKA_HOST); + if (kafkaHost == null || kafkaHost.isEmpty()) { + throw new RuntimeException("Invalid property " + PROP_KAFKA_HOST); + } + return kafkaHost; } - return Integer.valueOf(kafkaPort); - } - - @Override - public int getKafkaBrokerId() { - String kafkaBrokerId = props.getProperty(PROP_KAFKA_BROKER_ID); - if (kafkaBrokerId == null || kafkaBrokerId.isEmpty()) { - throw new RuntimeException("Invalid property " + PROP_KAFKA_BROKER_ID); + + @Override + public int getKafkaPort() { + String kafkaPort = props.getProperty(PROP_KAFKA_PORT); + if (kafkaPort == null || kafkaPort.isEmpty()) { + throw new RuntimeException("Invalid property " + PROP_KAFKA_PORT); + } + return Integer.valueOf(kafkaPort); } - return Integer.valueOf(kafkaBrokerId); - } - - @Override - public Map getTopicsAndPartitions() { - HashMap result = new HashMap(); - String kafkaTopics = props.getProperty(PROP_KAFKA_TOPICS); - if (kafkaTopics == null || kafkaTopics.isEmpty()) { - throw new RuntimeException("Invalid property " + PROP_KAFKA_TOPICS); + + @Override + public int getKafkaBrokerId() { + String kafkaBrokerId = props.getProperty(PROP_KAFKA_BROKER_ID); + if (kafkaBrokerId == null || kafkaBrokerId.isEmpty()) { + throw new RuntimeException("Invalid property " + PROP_KAFKA_BROKER_ID); + } + return Integer.valueOf(kafkaBrokerId); } - for (String topics: kafkaTopics.split(",")) { - String[] topicPart = topics.split(":"); - if (result.containsKey(topicPart[0])) - throw new RuntimeException("Duplicate topic " + topicPart[0]); - result.put(topicPart[0], Integer.valueOf(topicPart[1])); + + @Override + public Map getTopicsAndPartitions() { + HashMap result = new HashMap(); + String kafkaTopics = props.getProperty(PROP_KAFKA_TOPICS); + if (kafkaTopics == null || kafkaTopics.isEmpty()) { + throw new RuntimeException("Invalid property " + PROP_KAFKA_TOPICS); + } + for (String topics : kafkaTopics.split(",")) { + String[] topicPart = topics.split(":"); + if (result.containsKey(topicPart[0])) + throw new RuntimeException("Duplicate topic " + topicPart[0]); + result.put(topicPart[0], Integer.valueOf(topicPart[1])); + } + return result; } - return result; - } - - @Override - public int getS3MaxObjectSize() { - String maxBatchObjectSize = props.getProperty(PROP_S3_MAX_OBJECT_SIZE); - if (maxBatchObjectSize == null || maxBatchObjectSize.isEmpty()) { - return 256; + + @Override + public int getS3MaxObjectSize() { + String maxBatchObjectSize = props.getProperty(PROP_S3_MAX_OBJECT_SIZE); + if (maxBatchObjectSize == null || maxBatchObjectSize.isEmpty()) { + return 256; + } + return Integer.valueOf(maxBatchObjectSize); + } - return Integer.valueOf(maxBatchObjectSize); - } + @Override + public int getKafkaMaxMessageSize() { + String maxMessageSize = props.getProperty(PROP_KAFKA_MAX_MESSAGE_SIZE); + if (maxMessageSize == null || maxMessageSize.isEmpty()) { + return 256; + } + return Integer.valueOf(maxMessageSize); - @Override - public int getKafkaMaxMessageSize() { - String maxMessageSize = props.getProperty(PROP_KAFKA_MAX_MESSAGE_SIZE); - if (maxMessageSize == null || maxMessageSize.isEmpty()) { - return 256; } - return Integer.valueOf(maxMessageSize); - } + @Override + public boolean isCompressed(){ + String compress = props.getProperty(PROP_S3_COMPRESSION); + return !(compress == null || compress.isEmpty()) && Boolean.parseBoolean(compress); + } }