-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathConsumer.scala
68 lines (57 loc) · 2.54 KB
/
Consumer.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package com.kafka.demo.generic
import java.time.Duration
import cakesolutions.kafka.KafkaConsumer
import cakesolutions.kafka.KafkaConsumer.Conf
import com.typesafe.scalalogging.Logger
import io.confluent.kafka.serializers.{
AbstractKafkaAvroSerDeConfig,
KafkaAvroDeserializer,
KafkaAvroDeserializerConfig
}
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.consumer._
import scala.collection.JavaConverters.asJavaCollectionConverter
import scala.util.{ Failure, Success, Try }
object Consumer {
private[this] val logger = Logger(getClass.getSimpleName)
private[this] val BOOTSTRAP_SERVERS_VALUE = "localhost:9092"
private[this] val SCHEMA_REGISTRY_URL_VALUE = "http://localhost:8081"
private[this] val TOPIC_NAME = "example.with-schema.customer"
private[this] val GROUP_ID_VALUE = "consumer-generic"
private[this] val TIMEOUT_MILLS = 100
private[this] def newConsumer(): KafkaConsumer[String, GenericRecord] =
KafkaConsumer(
Conf(new KafkaAvroDeserializer(), new KafkaAvroDeserializer(), BOOTSTRAP_SERVERS_VALUE, GROUP_ID_VALUE)
.withProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[KafkaAvroDeserializer].getName)
.withProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[KafkaAvroDeserializer].getName)
.withProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL_VALUE)
.withProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "false")
)
.asInstanceOf[KafkaConsumer[String, GenericRecord]]
def main(args: Array[String]): Unit = {
logger.info(s"Start to consume from $TOPIC_NAME")
val consumer = newConsumer()
consumer.subscribe(List(TOPIC_NAME).asJavaCollection)
Try {
while (true) {
val records: ConsumerRecords[String, GenericRecord] = consumer.poll(Duration.ofMillis(TIMEOUT_MILLS))
records.iterator().forEachRemaining { record: ConsumerRecord[String, GenericRecord] =>
logger.info(s"""
|message
| offset=${record.offset}
| partition=${record.partition}
| key=${record.key}
| value=${record.value}
| schema=${record.value.getSchema}
""".stripMargin)
}
}
} match {
case Success(_) =>
logger.info(s"Finish to consume from $TOPIC_NAME")
case Failure(exception) =>
logger.error(s"Finish to consume from $TOPIC_NAME with error", exception)
}
consumer.close()
}
}