Skip to content

Commit

Permalink
lstarch: changes from code-review
Browse files Browse the repository at this point in the history
  • Loading branch information
LeStarch authored and r4space committed May 21, 2015
1 parent 58583f1 commit 618cec7
Show file tree
Hide file tree
Showing 11 changed files with 170 additions and 172 deletions.
15 changes: 10 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ the License.
</scm>
<url>https://github.com/LeStarch/kafka-benchmarking/</url>
<developers>
<developer>
<id>r4space</id>
<name>J Wyngaard</name>
<email></email>
<organization>JPL</organization>
<timezone>-8</timezone>
<roles>
<role>committer</role>
</roles>
</developer>
<developer>
<id>lestarch</id>
<name>M Starch</name>
Expand Down Expand Up @@ -91,11 +101,6 @@ the License.
<descriptors>
<descriptor>src/main/assembly/assembly.xml</descriptor>
</descriptors>
<!--archive>
<manifest>
<mainClass>org.apache.oodt.cas.resource.system.XmlRpcResourceManagerClient</mainClass>
</manifest>
</archive-->
</configuration>
<executions>
<execution>
Expand Down
2 changes: 1 addition & 1 deletion src/main/bin/publisher.sh → src/main/bin/producer.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ then
else
JAVA=${JAVA_HOME}/bin/java
fi
${JAVA} -cp '../lib/*' -Djava.security.policy=../etc/security.policy -DPROPERTY_FILE=../etc/benchmark.properties org.dia.benchmark.kafka.BandwidthAggregator org.dia.benchmark.kafka.consumer.BandwidthPublisher
${JAVA} -cp '../lib/*' -Djava.security.policy=../etc/security.policy -DPROPERTY_FILE=../etc/benchmark.properties org.dia.benchmark.kafka.BandwidthAggregator org.dia.benchmark.kafka.producer.BandwidthProducer
52 changes: 0 additions & 52 deletions src/main/java/org/dia/benchmark/kafka/Aggregator.java

This file was deleted.

35 changes: 19 additions & 16 deletions src/main/java/org/dia/benchmark/kafka/BandwidthAggregator.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ Licensed to the Apache Software Foundation (ASF) under one or more contributor

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.Properties;
import java.util.PropertyPermission;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.dia.benchmark.kafka.Aggregator;
/**
* This consumer measures bandwidth as it consumes messages.
* This aggregator (consumer,producer) measures bandwidth as it handles messages.
*
* @author starchmd
*/
Expand All @@ -38,24 +40,24 @@ public abstract class BandwidthAggregator implements Runnable,Aggregator {

@Override
public void start() {
log.log(Level.INFO, String.format("Starting instance of type %s",this.getClass().getName()));
log.log(Level.INFO, String.format("\nStarting instance of type %s",this.getClass().getName()));
new Thread(this).start();
}

@Override
public synchronized long stop() {
log.log(Level.INFO, String.format("Stoping instance of type %s",this.getClass().getName()));
log.log(Level.INFO, String.format("\nStopping instance of type %s",this.getClass().getName()));
stop = true;
return count;
}
@Override
public synchronized long count() {
log.log(Level.FINER, String.format("Counting instance of type %s",this.getClass().getName()));
log.log(Level.FINER, String.format("\nCounting instance of type %s",this.getClass().getName()));
return count;
}
@Override
public void run() {
log.log(Level.INFO, String.format("Running thread %s of type %s",Thread.currentThread().getName(), this.getClass().getName()));
log.log(Level.INFO, String.format("\nRunning thread %s of type %s",Thread.currentThread().getName(), this.getClass().getName()));
count = 0;
//Set termination flag safely
boolean terminate = false;
Expand All @@ -66,7 +68,7 @@ public void run() {
long ret = act();
synchronized (this) {
terminate = stop;
count += ret;
// count += ret;
}
}
}
Expand All @@ -81,22 +83,23 @@ public void run() {
*/
public static void main(String[] args) {
Configuration config = null;
try {
config = new Configuration(Configuration.getProperties());
try {
config = new Configuration(Configuration.getProperties());

Constructor<?> ctor = Class.forName(args[0]).getConstructor();
Aggregator agg = (Aggregator)ctor.newInstance(new Object[] {});
agg.setup(config);
Monitor m = new Monitor(agg);
agg.start();
m.run();
} catch (IOException e) {
System.err.println("Error properties file does not exist."+e);
System.err.println("\nError properties file does not exist."+e);
e.printStackTrace();
} catch (IllegalAccessException e) {
System.err.println("Illegal access exception in Configuration.java(this)"+e);
System.err.println("\nIllegal access exception in Configuration.java(this)"+e);
e.printStackTrace();
} catch (Exception e) {
System.err.println("Exception occured upon execution: "+e);
System.err.println("\nException occured upon execution: "+e);
e.printStackTrace();
}
}
Expand All @@ -117,21 +120,21 @@ public Monitor(Aggregator agg) {
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
try {
log.log(Level.INFO,String.format("Final message count: %d",aggor.stop()));
} catch (Exception e) {log.log(Level.INFO, "Exception caught: "+e);}
log.log(Level.INFO,String.format("\nFinal message count: %d",aggor.stop()));
} catch (Exception e) {log.log(Level.INFO, "\nException caught: "+e);}
}
});
}
@Override
public void run() {
while(true) {
try {
log.log(Level.INFO,String.format("Current message count: %d",aggor.count()));
log.log(Level.INFO,String.format("\nCurrent message count: %d",aggor.count()));
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
log.log(Level.WARNING,"Exception caught while monitoring: "+e);
log.log(Level.WARNING,"\nException caught while monitoring: "+e);
e.printStackTrace();
}
}
Expand Down
61 changes: 30 additions & 31 deletions src/main/java/org/dia/benchmark/kafka/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ public class Configuration implements Serializable {
//Number of producers
public int PRODUCER_COUNT = 1;

public String USE_MONITOR = "true";
//Number of topics
public int TOPIC_COUNT = 1;
public int TOPIC_INDEX = 0;
public int THREADS_PER_TOPIC = 1;
public int NUM_MESSAGES = 1000;
//Kafka config
public String GROUP_ID = "CosumerGroup";
public String GROUP_ID = "ConsumerGroup";

//Zookeeper properties
public String ZOOKEEPER_CONNECT = "localhost:2181";
Expand All @@ -64,36 +64,36 @@ public class Configuration implements Serializable {

//Node configurations
public String[] CONSUMER_NODES = {"localhost"};
public String[] PRODUCER_NODES = {/*"localhost"*/};
public String[] BROKER_NODES = {"localhost:9092","localhost:9093"};
public String[] PRODUCER_NODES = {"localhost"};
private String[] BROKER_NODES = {"localhost:9092","localhost:9093"};

//ProducerConfig
public String BOOTSTRAP_SERVERS_CONFIG = StringUtils.join(BROKER_NODES,",");
public int BATCH_SIZE_CONFIG = 0; //Batch size in bytes, default 16384
public String ACKS_CONFIG = "0"; // 0 = none, 1 = write locally no waiting, all = full sync
public int TIMEOUT_CONFIG = 30000; // Wait for ACK time in ms
public long BUFFER_MEMORY_CONFIG = 33554432; //Total memory in bytes the producer can use to buffer records
public String COMPRESSION_TYPE_CONFIG = "none"; //gzip, snappy also valid for batch compression
public int RETRIES_CONFIG = 0; //Can affect ordering if >0
public String VALUE_SERIALIZER_CLASS_CONFIG = "org.apache.kafka.common.serialization.ByteArraySerializer";
public String KEY_SERIALIZER_CLASS_CONFIG = "org.apache.kafka.common.serialization.StringSerializer";
public Boolean BLOCK_ON_BUFFER_FULL_CONFIG = true; //Setting to false will cause full memory to throw an error rather than block records
public int RECEIVE_BUFFER_CONFIG = 32768; //Size of TCP receive buffer to use when receiving
public int SEND_BUFFER_CONFIG = 131072; //Size of TCP send buffer to use when sending
public int MAX_REQUEST_SIZE_CONFIG = 1048576; //Max size of a request and record size.
public long LINGER_MS_CONFIG = 0; //Delay in ms, to impose a spreading out of arriving records
public String CLIENT_ID_CONFIG = "RequestSourceName"; //Unused now, string passed to server when making requests, for debugging
public long RECONNECT_BACKOFF_MS_CONFIG = 10; //ms delay before attempting to reconnect to a given host
public long RETRY_BACKOFF_MS_CONFIG = 100; //ms delay before retrying a produce request to a given topic partition
public long METRICS_SAMPLE_WINDOW_MS_CONFIG = 30000; //ms metric collection window before overwrite
public int METRICS_NUM_SAMPLES_CONFIG = 2; //Number of samples maintained to compute metrics
public long METADATA_MAX_AGE_CONFIG = 300000; //ms delay between forced metadata refreshes to discover new leaders/brokers/partitions
public long METADATA_FETCH_TIMEOUT_CONFIG = 60000; //ms Delay prior to throwing an exception when no metadata is received
public String BOOTSTRAP_SERVERS_CONFIG = StringUtils.join(BROKER_NODES,",");
public int BATCH_SIZE_CONFIG = 0; //Batch size in bytes, default 16384
public String ACKS_CONFIG = "0"; // 0 = none, 1 = write locally no waiting, all = full sync
public int TIMEOUT_CONFIG = 30000; // Wait for ACK time in ms
public long BUFFER_MEMORY_CONFIG = 33554432; //Total memory in bytes the producer can use to buffer records
public String COMPRESSION_TYPE_CONFIG = "none"; //gzip, snappy also valid for batch compression
public int RETRIES_CONFIG = 0; //Can affect ordering if >0
public String VALUE_SERIALIZER_CLASS_CONFIG = "org.apache.kafka.common.serialization.ByteArraySerializer";
public String KEY_SERIALIZER_CLASS_CONFIG = "org.apache.kafka.common.serialization.ByteArraySerializer";
public Boolean BLOCK_ON_BUFFER_FULL_CONFIG = true; //Setting to false will cause full memory to throw an error rather than block records
public int RECEIVE_BUFFER_CONFIG = 32768; //Size of TCP receive buffer to use when receiving
public int SEND_BUFFER_CONFIG = 131072; //Size of TCP send buffer to use when sending
public int MAX_REQUEST_SIZE_CONFIG = 1048576; //Max size of a request and record size.
public long LINGER_MS_CONFIG = 0; //Delay in ms, to impose a spreading out of arriving records
public String CLIENT_ID_CONFIG = "RequestSourceName"; //Unused now, string passed to server when making requests, for debugging
public long RECONNECT_BACKOFF_MS_CONFIG = 10; //ms delay before attempting to reconnect to a given host
public long RETRY_BACKOFF_MS_CONFIG = 100; //ms delay before retrying a produce request to a given topic partition
public long METRICS_SAMPLE_WINDOW_MS_CONFIG = 30000; //ms metric collection window before overwrite
public int METRICS_NUM_SAMPLES_CONFIG = 2; //Number of samples maintained to compute metrics
public long METADATA_MAX_AGE_CONFIG = 300000; //ms delay between forced metadata refreshes to discover new leaders/brokers/partitions
public long METADATA_FETCH_TIMEOUT_CONFIG = 60000; //ms Delay prior to throwing an exception when no metadata is received

//How often to report
public int REPORTING_PERIOD = 1000;
//Size of messages
public int MESSAGE_SIZE = 4096;
public int MESSAGE_SIZE = 4096 * 1024*1024;

public int RMI_REGISTRY_PORT = 1099;

Expand Down Expand Up @@ -150,7 +150,6 @@ public ConsumerConfig getKafkaConsumerProperties() {
/**
* Get a map of topic to thread count
* @param topic - number of topics
* @param threadPreTopic - number of threads per topic
* @return map of topic name to thread count
*/
public Map<String,Integer> getTopicThreadCounts(int topic) {
Expand All @@ -168,11 +167,11 @@ public Map<String,Integer> getTopicThreadCounts(int topic) {
* @throws IOException - thrown on failure to read file
*/
public static Properties getProperties() throws IOException {
String file = System.getProperty(PROPERTY_FILE_PROP);
// String file = System.getProperty(PROPERTY_FILE_PROP);
Properties properties = new Properties();
properties.load(new FileInputStream(file));
properties.putAll(System.getProperties());
properties.putAll(System.getenv());
//properties.load(new FileInputStream(file));
// properties.putAll(System.getProperties());
//properties.putAll(System.getenv());
return properties;
}
}
3 changes: 0 additions & 3 deletions src/main/java/org/dia/benchmark/kafka/NetworkAggregator.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ Licensed to the Apache Software Foundation (ASF) under one or more contributor
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;

import org.dia.benchmark.kafka.Aggregator;
import org.dia.benchmark.kafka.Configuration;

/**
* This aggregator wraps another aggregator for network usage.
*
Expand Down
2 changes: 0 additions & 2 deletions src/main/java/org/dia/benchmark/kafka/RmiAggregator.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ Licensed to the Apache Software Foundation (ASF) under one or more contributor

import java.rmi.Remote;

import org.dia.benchmark.kafka.Aggregator;

/**
* Interface used to support aggregation of measured data over rmi.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ Licensed to the Apache Software Foundation (ASF) under one or more contributor
import java.util.logging.Level;
import java.util.logging.Logger;

import org.dia.benchmark.kafka.Aggregator;
import org.dia.benchmark.kafka.Configuration;

/**
* This aggregator wraps another aggregator for network usage
* providing an RMI interface to other aggregators.
Expand All @@ -49,7 +46,9 @@ public void spawn(Class<?> clazz) throws Exception {
public void setup(Configuration config) throws Exception {
log.log(Level.INFO, "Setting up RMI aggregator");
child.setup(config);
new Thread(new BandwidthAggregator.Monitor(child)).start();
if (config.USE_MONITOR.equalsIgnoreCase("true")) {
new Thread(new BandwidthAggregator.Monitor(child)).start();
}
}
@Override
public void start() throws Exception {
Expand All @@ -68,7 +67,7 @@ public long count() throws Exception {
}

/**
* Test main program.
* Network main program.
* @param args - command line arguments
*/
public static void main(String[] args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,13 @@ public class BandwidthConsumer extends BandwidthAggregator {
@Override
public void setup(Configuration config) {
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config.getKafkaConsumerProperties());

Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(config.getTopicThreadCounts(config.TOPIC_COUNT));

String name = config.TOPIC_PREFIX+config.TOPIC_INDEX;

log.log(Level.INFO, String.format("Setting up consumer on topic %s with %d threads",name,config.THREADS_PER_TOPIC));

iterator = messageStreams.get(name).get(0).iterator();
}
/**
Expand All @@ -54,6 +58,8 @@ public long act() {
log.log(Level.INFO,String.format("Thread(%s) consuming message",Thread.currentThread().getName()));
if (iterator.hasNext()) {
iterator.next();
count ++;
System.out.println("Got a message, count: "+count);
return 1;
}
return 0;
Expand Down
Loading

0 comments on commit 618cec7

Please sign in to comment.