From 06a5be8a5e43d4516198d8e621228917e7ebbdeb Mon Sep 17 00:00:00 2001 From: "Cleary, Paul" Date: Wed, 25 Nov 2015 17:54:51 -0500 Subject: [PATCH] Removing Kafka IT Spec We had previously copy pasted a lot of code in and attribtion was not previously made. Removing the specs for the time being. --- .../money/kafka/KafkaConsumerApp.scala | 103 ----------- .../comcast/money/kafka/KafkaEmbedded.scala | 95 ---------- .../money/kafka/KafkaProducerApp.scala | 96 ----------- .../com/comcast/money/kafka/KafkaSpec.scala | 162 ------------------ .../money/kafka/ZooKeeperEmbedded.scala | 57 ------ project/MoneyBuild.scala | 12 +- 6 files changed, 2 insertions(+), 523 deletions(-) delete mode 100755 money-kafka/src/it/scala/com/comcast/money/kafka/KafkaConsumerApp.scala delete mode 100755 money-kafka/src/it/scala/com/comcast/money/kafka/KafkaEmbedded.scala delete mode 100755 money-kafka/src/it/scala/com/comcast/money/kafka/KafkaProducerApp.scala delete mode 100755 money-kafka/src/it/scala/com/comcast/money/kafka/KafkaSpec.scala delete mode 100755 money-kafka/src/it/scala/com/comcast/money/kafka/ZooKeeperEmbedded.scala diff --git a/money-kafka/src/it/scala/com/comcast/money/kafka/KafkaConsumerApp.scala b/money-kafka/src/it/scala/com/comcast/money/kafka/KafkaConsumerApp.scala deleted file mode 100755 index 109d3fc5..00000000 --- a/money-kafka/src/it/scala/com/comcast/money/kafka/KafkaConsumerApp.scala +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright 2012-2015 Comcast Cable Communications Management, LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.comcast.money.kafka - -import java.util.Properties -import java.util.concurrent.Executors - -import kafka.consumer.{Consumer, ConsumerConfig, KafkaStream} -import kafka.message.MessageAndMetadata -import kafka.serializer.DefaultDecoder -import kafka.utils.Logging - -/** - * Demonstrates how to implement a simple Kafka consumer application to read data from Kafka. - * - * Don't read too much into the actual implementation of this class. Its sole purpose is to showcase the use of the - * Kafka API. - * - * @param topic The Kafka topic to read data from. - * @param zookeeperConnect The ZooKeeper connection string aka `zookeeper.connect` in `hostnameOrIp:port` format. - * Example: `127.0.0.1:2181`. - * @param numThreads The number of threads used by the consumer application to read from Kafka. - * @param config Additional consumer configuration settings. - */ -class KafkaConsumerApp( - val topic: String, - val zookeeperConnect: String, - val numThreads: Int, - config: Properties = new Properties - ) extends Logging { - - private val effectiveConfig = { - val c = new Properties - c.load(this.getClass.getResourceAsStream("/consumer-defaults.properties")) - c.putAll(config) - c.put("zookeeper.connect", zookeeperConnect) - c - } - - private val executor = Executors.newFixedThreadPool(numThreads) - private val consumerConnector = Consumer.create(new ConsumerConfig(effectiveConfig)) - - info(s"Connecting to topic $topic via ZooKeeper $zookeeperConnect") - - def startConsumers(f: (MessageAndMetadata[Array[Byte], Array[Byte]], ConsumerTaskContext) => Unit) { - val topicCountMap = Map(topic -> numThreads) - val valueDecoder = new DefaultDecoder - val keyDecoder = valueDecoder - val consumerMap = consumerConnector.createMessageStreams(topicCountMap, keyDecoder, valueDecoder) - val consumerThreads = consumerMap.get(topic) match { - case Some(streams) => streams.view.zipWithIndex map { - case (stream, threadId) => - new ConsumerTask(stream, new ConsumerTaskContext(threadId), f) - } - case _ => Seq() - } - consumerThreads foreach executor.submit - } - - def shutdown() { - consumerConnector.shutdown() - executor.shutdown() - } - - Runtime.getRuntime.addShutdownHook(new Thread() { - override def run() { - shutdown() - } - }) - -} - -class ConsumerTask[K, V, C <: ConsumerTaskContext](stream: KafkaStream[K, V], context: C, - f: (MessageAndMetadata[K, V], C) => Unit) extends Runnable with Logging { - - override def run() { - info(s"Consumer thread ${context.threadId} started") - stream foreach { - case msg: MessageAndMetadata[_, _] => - trace(s"Thread ${context.threadId} received message: " + msg) - f(msg, context) - case _ => trace(s"Received unexpected message type from broker") - } - info(s"Shutting down consumer thread ${context.threadId}") - } - -} - -case class ConsumerTaskContext(threadId: Int) diff --git a/money-kafka/src/it/scala/com/comcast/money/kafka/KafkaEmbedded.scala b/money-kafka/src/it/scala/com/comcast/money/kafka/KafkaEmbedded.scala deleted file mode 100755 index 7d5c2c46..00000000 --- a/money-kafka/src/it/scala/com/comcast/money/kafka/KafkaEmbedded.scala +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright 2012-2015 Comcast Cable Communications Management, LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.comcast.money.kafka - -import java.io.File -import java.util.Properties - -import kafka.server.{KafkaConfig, KafkaServerStartable} -import kafka.utils.Logging -import org.apache.commons.io.FileUtils - -/** - * Runs an in-memory, "embedded" instance of a Kafka broker, which listens at `127.0.0.1:9092` by default. - * - * Requires a running ZooKeeper instance to connect to. By default, it expects a ZooKeeper instance running at - * `127.0.0.1:2181`. You can specify a different ZooKeeper instance by setting the `zookeeper.connect` parameter in the - * broker's configuration. - * - * @param config Broker configuration settings. Used to modify, for example, on which port the broker should listen to. - * Note that you cannot change the `log.dirs` setting currently. - */ -class KafkaEmbedded(config: Properties = new Properties) extends Logging { - - private val defaultZkConnect = "127.0.0.1:2181" - private val logDir = { - val random = (new scala.util.Random).nextInt - val path = Seq(System.getProperty("java.io.tmpdir"), "kafka-test", "logs-" + random).mkString(File.separator) - new File(path) - } - - private val effectiveConfig = { - val c = new Properties - c.load(this.getClass.getResourceAsStream("/broker-defaults.properties")) - c.putAll(config) - c.setProperty("log.dirs", logDir.getAbsolutePath) - c - } - - private val kafkaConfig = new KafkaConfig(effectiveConfig) - private val kafka = new KafkaServerStartable(kafkaConfig) - - /** - * This broker's `metadata.broker.list` value. Example: `127.0.0.1:9092`. - * - * You can use this to tell Kafka producers and consumers how to connect to this instance. - */ - val brokerList = kafka.serverConfig.hostName + ":" + kafka.serverConfig.port - - /** - * The ZooKeeper connection string aka `zookeeper.connect`. - */ - val zookeeperConnect = { - val zkConnectLookup = Option(effectiveConfig.getProperty("zookeeper.connect")) - zkConnectLookup match { - case Some(zkConnect) => zkConnect - case _ => - warn(s"zookeeper.connect is not configured -- falling back to default setting $defaultZkConnect") - defaultZkConnect - } - } - - /** - * Start the broker. - */ - def start() { - debug(s"Starting embedded Kafka broker at $brokerList (using ZooKeeper server at $zookeeperConnect) ...") - kafka.startup() - debug("Embedded Kafka broker startup completed") - } - - /** - * Stop the broker. - */ - def stop() { - debug("Shutting down embedded Kafka broker...") - kafka.shutdown() - FileUtils.deleteQuietly(logDir) - debug("Embedded Kafka broker shutdown completed") - } - -} diff --git a/money-kafka/src/it/scala/com/comcast/money/kafka/KafkaProducerApp.scala b/money-kafka/src/it/scala/com/comcast/money/kafka/KafkaProducerApp.scala deleted file mode 100755 index bb74253b..00000000 --- a/money-kafka/src/it/scala/com/comcast/money/kafka/KafkaProducerApp.scala +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Copyright 2012-2015 Comcast Cable Communications Management, LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.comcast.money.kafka - -import java.util.Properties - -import kafka.producer.{KeyedMessage, Producer, ProducerConfig} - -/** - * Demonstrates how to implement a simple Kafka producer application to send data to Kafka. - * - * Don't read too much into the actual implementation of this class. Its sole purpose is to showcase the use of the - * Kafka API. - * - * @param topic The Kafka topic to send data to. - * @param brokerList Value for Kafka's `metadata.broker.list` setting. - * @param producerConfig Additional producer configuration settings. - */ -case class KafkaProducerApp( - val topic: String, - val brokerList: String, - producerConfig: Properties = new Properties - ) { - - private val producer = { - val effectiveConfig = { - val c = new Properties - c.load(this.getClass.getResourceAsStream("/producer-defaults.properties")) - c.putAll(producerConfig) - c.put("metadata.broker.list", brokerList) - c - } - new Producer[Array[Byte], Array[Byte]](new ProducerConfig(effectiveConfig)) - } - - // The configuration field of the wrapped producer is immutable (including its nested fields), so it's safe to expose - // it directly. - val config = producer.config - - private def toMessage(key: Option[Array[Byte]], value: Array[Byte]): KeyedMessage[Array[Byte], Array[Byte]] = - key match { - case Some(key) => new KeyedMessage(topic, key, value) - case _ => new KeyedMessage(topic, value) - } - - def send(key: Array[Byte], value: Array[Byte]): Unit = producer.send(toMessage(Some(key), value)) - - def send(value: Array[Byte]): Unit = producer.send(toMessage(None, value)) - - def shutdown(): Unit = producer.close() - -} - -/** - * Creates KafkaProducerApp instances. - * - * We require such a factory because of how Storm and notably - * [[http://storm.incubator.apache.org/documentation/Serialization.html serialization within Storm]] work. - * Without such a factory we cannot properly unit tests Storm bolts that need to write to Kafka. - * - * Preferably we would simply pass a Kafka producer directly to a Storm bolt. During testing we could then mock this - * collaborator. However this intuitive approach fails at (Storm) runtime because Kafka producers are not serializable. - * The alternative approach of instantiating the Kafka producer from within the bolt (e.g. using a `@transient lazy val` - * field) does work at runtime but prevents us from verifying the correct interaction between our bolt's code and its - * collaborator, the Kafka producer, because we cannot easily mock the producer in this setup. The chosen approach of - * the factory method, while introducing some level of unwanted indirection and complexity, is a pragmatic approach to - * make our Storm code work correctly at runtime and to make it testable. - * - * @param topic The Kafka topic to send data to. - * @param brokerList Value for Kafka's `metadata.broker.list` setting. - * @param config Additional producer configuration settings. - */ -abstract class KafkaProducerAppFactory(topic: String, brokerList: String, config: Properties) extends Serializable { - def newInstance(): KafkaProducerApp -} - -class BaseKafkaProducerAppFactory(topic: String, brokerList: String, config: Properties = new Properties) - extends KafkaProducerAppFactory(topic, brokerList, config) { - - override def newInstance() = new KafkaProducerApp(topic, brokerList, config) - -} diff --git a/money-kafka/src/it/scala/com/comcast/money/kafka/KafkaSpec.scala b/money-kafka/src/it/scala/com/comcast/money/kafka/KafkaSpec.scala deleted file mode 100755 index 2f06c72b..00000000 --- a/money-kafka/src/it/scala/com/comcast/money/kafka/KafkaSpec.scala +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Copyright 2012-2015 Comcast Cable Communications Management, LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.comcast.money.kafka - -import java.util.Properties - -import _root_.kafka.message.MessageAndMetadata -import _root_.kafka.utils.{Logging, ZKStringSerializer} -import com.comcast.money.core.Money -import com.comcast.money.wire.avro._ -import com.twitter.bijection.Injection -import com.twitter.bijection.avro.SpecificAvroCodecs -import kafka.admin.AdminUtils -import org.I0Itec.zkclient.ZkClient -import org.scalatest._ - -import scala.collection.mutable -import scala.concurrent.duration._ -import scala.language.reflectiveCalls - -class KafkaSpec extends FunSpec with Matchers with BeforeAndAfterAll with GivenWhenThen with Logging { - - implicit val specificAvroBinaryInjectionForTweet = SpecificAvroCodecs.toBinary[Span] - - private val testTopic = "money" - private val testTopicNumPartitions = 1 - private val testTopicReplicationFactor = 1 - //private val zookeeperPort = InstanceSpec.getRandomPort - private val zookeeperPort = 2181 - //private val kafkaPort = InstanceSpec.getRandomPort - private val kafkaPort = 9092 - - private var zookeeperEmbedded: Option[ZooKeeperEmbedded] = None - private var zkClient: Option[ZkClient] = None - private var kafkaEmbedded: Option[KafkaEmbedded] = None - - override def beforeAll() { - // Start embedded ZooKeeper server - zookeeperEmbedded = Some(new ZooKeeperEmbedded(zookeeperPort)) - - for {z <- zookeeperEmbedded} { - // Start embedded Kafka broker - val brokerConfig = new Properties - brokerConfig.put("zookeeper.connect", z.connectString) - brokerConfig.put("port", kafkaPort.toString) - kafkaEmbedded = Some(new KafkaEmbedded(brokerConfig)) - for {k <- kafkaEmbedded} k.start() - - // Create test topic - val sessionTimeout = 30.seconds - val connectionTimeout = 30.seconds - zkClient = Some(new ZkClient(z.connectString, sessionTimeout.toMillis.toInt, connectionTimeout.toMillis.toInt, - ZKStringSerializer)) - for { - zc <- zkClient - } { - val topicConfig = new Properties - AdminUtils.createTopic(zc, testTopic, testTopicNumPartitions, testTopicReplicationFactor, topicConfig) - } - } - - Money.tracer - } - - override def afterAll() { - for {k <- kafkaEmbedded} k.stop() - - for { - zc <- zkClient - } { - info("ZooKeeper client: shutting down...") - zc.close() - info("ZooKeeper client: shutdown completed") - } - - for {z <- zookeeperEmbedded} z.stop() - } - - import scala.collection.JavaConversions._ - val fixture = { - new { - val notes = seqAsJavaList(Seq(new Note("foo", 1L, new NoteValue(NoteType.String, "foo")), new Note("foo", 1L, new NoteValue(NoteType.Double, "4.0")))) - val s1 = new Span("root", "money", "localhost", 1000L, true, 1L, new SpanId("1", 2L, 3L), notes) - val s2 = new Span("service-1", "money", "com.service1", 1000L, true, 1L, new SpanId("1", 3L, 4L), notes) - val s3 = new Span("service-2", "money", "com.service2", 1000L, true, 1L, new SpanId("1", 3L, 5L), notes) - - val messages = Seq(s1, s2, s3) - } - } - - describe("Kafka") { - - it("should asynchronously send and receive a Span in Avro format") { - for { - z <- zookeeperEmbedded - k <- kafkaEmbedded - } { - Given("a ZooKeeper instance") - And("a Kafka broker instance") - And("some spans") - val f = fixture - val spans = f.messages - And("a single-threaded Kafka consumer group") - // The Kafka consumer group must be running before the first messages are being sent to the topic. - val consumer = { - val numConsumerThreads = 1 - val config = { - val c = new Properties - c.put("group.id", "test-consumer") - c - } - new KafkaConsumerApp(testTopic, z.connectString, numConsumerThreads, config) - } - val actualSpans = new mutable.SynchronizedQueue[Span] - consumer.startConsumers( - (m: MessageAndMetadata[Array[Byte], Array[Byte]], c: ConsumerTaskContext) => { - val span = Injection.invert(m.message) - for {t <- span} { - println(s"Consumer thread ${c.threadId}: received Span $t from partition ${m.partition} of topic ${m.topic} (offset: ${m.offset})") - actualSpans += t - } - }) - val waitForConsumerStartup = 300.millis - debug(s"Waiting $waitForConsumerStartup for Kafka consumer threads to launch") - Thread.sleep(waitForConsumerStartup.toMillis) - debug("Finished waiting for Kafka consumer threads to launch") - - When("I send a trace using the kafka emitter") - Money.tracer.startSpan("foo") - Money.tracer.stopSpan() - - Then("the consumer app should receive the spans") - // XXX: REMEMBER, SPANS WAIT ONE FULL SECOND AFTER STOPPING TO EMIT!!! - val waitForConsumerToReadStormOutput = 2000.millis - debug(s"Waiting $waitForConsumerToReadStormOutput for Kafka consumer threads to read messages") - Thread.sleep(waitForConsumerToReadStormOutput.toMillis) - debug("Finished waiting for Kafka consumer threads to read messages") - - And("the span that was sent from money should have been received") - actualSpans(0).getName shouldEqual "foo" - - // Cleanup - debug("Shutting down Kafka consumer threads") - consumer.shutdown() - } - } - } -} diff --git a/money-kafka/src/it/scala/com/comcast/money/kafka/ZooKeeperEmbedded.scala b/money-kafka/src/it/scala/com/comcast/money/kafka/ZooKeeperEmbedded.scala deleted file mode 100755 index f8d4a076..00000000 --- a/money-kafka/src/it/scala/com/comcast/money/kafka/ZooKeeperEmbedded.scala +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright 2012-2015 Comcast Cable Communications Management, LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.comcast.money.kafka - -import kafka.utils.Logging -import org.apache.curator.test.TestingServer - -/** - * Runs an in-memory, "embedded" instance of a ZooKeeper server. - * - * The ZooKeeper server instance is automatically started when you create a new instance of this class. - * - * @param port The port (aka `clientPort`) to listen to. Default: 2181. - */ -class ZooKeeperEmbedded(val port: Int = 2181) extends Logging { - - debug(s"Starting embedded ZooKeeper server on port ${port}...") - - private val server = new TestingServer(port) - - /** - * Stop the instance. - */ - def stop() { - debug("Shutting down embedded ZooKeeper server...") - server.close() - debug("Embedded ZooKeeper server shutdown completed") - } - - /** - * The ZooKeeper connection string aka `zookeeper.connect` in `hostnameOrIp:port` format. - * Example: `127.0.0.1:2181`. - * - * You can use this to e.g. tell Kafka and Storm how to connect to this instance. - */ - val connectString: String = server.getConnectString - - /** - * The hostname of the ZooKeeper instance. Example: `127.0.0.1` - */ - val hostname: String = connectString.splitAt(connectString lastIndexOf ':')._1 // "foo:1:2:3" -> ("foo:1:2", ":3) - -} diff --git a/project/MoneyBuild.scala b/project/MoneyBuild.scala index 160e6d64..2f86cb6d 100644 --- a/project/MoneyBuild.scala +++ b/project/MoneyBuild.scala @@ -142,8 +142,6 @@ object MoneyBuild extends Build { chillAvro, chillBijection, commonsIo, - curator, - zkClient, akkaTestkit(v), scalaTest, mockito @@ -210,7 +208,7 @@ object MoneyBuild extends Build { def basicSettings = Defaults.itSettings ++ SbtScalariform.scalariformSettings ++ Seq( organization := "com.comcast.money", - version := "0.8.9", + version := "0.8.10-SNAPSHOT", crossScalaVersions := Seq("2.10.6", "2.11.7"), scalaVersion := "2.10.6", resolvers ++= Seq( @@ -295,7 +293,7 @@ object MoneyBuild extends Build { val javaxServlet = "javax.servlet" % "servlet-api" % "2.5" // Kafka, exclude dependencies that we will not need, should work for 2.10 and 2.11 - val kafka = ("org.apache.kafka" % "kafka_2.10" % "0.8.1.1") + val kafka = ("org.apache.kafka" %% "kafka" % "0.8.2.2") .exclude("javax.jms", "jms") .exclude("com.sun.jdmk", "jmxtools") .exclude("com.sun.jmx", "jmxri") @@ -319,12 +317,6 @@ object MoneyBuild extends Build { val springAop3 = "org.springframework" % "spring-aop" % "3.2.6.RELEASE" val springContext = "org.springframework" % "spring-context" % "4.1.1.RELEASE" - val curator = ("org.apache.curator" % "curator-test" % "2.4.0") - .exclude("org.slf4j", "slf4j-log4j12") - - val zkClient = ("com.101tec" % "zkclient" % "0.4") - .exclude("org.apache.zookeeper", "zookeeper") - // Test val mockito = "org.mockito" % "mockito-core" % "1.9.5" % "test" val scalaTest = "org.scalatest" %% "scalatest" % "2.2.3" % "it,test"