-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathProducer.scala
49 lines (39 loc) · 1.83 KB
/
Producer.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.kafka.demo.specific
import java.util.Properties
import com.typesafe.scalalogging.Logger
import io.confluent.examples.clients.basicavro.Payment
import io.confluent.kafka.serializers.{ AbstractKafkaAvroSerDeConfig, KafkaAvroSerializer }
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerConfig, ProducerRecord }
/*
* https://github.com/confluentinc/examples/blob/5.0.x/clients/avro/src/main/java/io/confluent/examples/clients/basicavro/ProducerExample.java
*/
object Producer {
private[this] val logger = Logger(getClass.getSimpleName)
private[this] val BOOTSTRAP_SERVERS_VALUE = "localhost:9092"
private[this] val SCHEMA_REGISTRY_URL_VALUE = "http://localhost:8081"
private[this] val TOPIC_NAME = "example.with-schema.payment"
private[this] def newProducer(): KafkaProducer[String, Payment] = {
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_VALUE)
props.put(ProducerConfig.ACKS_CONFIG, "all")
props.put(ProducerConfig.RETRIES_CONFIG, "0")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[KafkaAvroSerializer].getName)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[KafkaAvroSerializer].getName)
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL_VALUE)
new KafkaProducer[String, Payment](props)
}
def main(args: Array[String]): Unit = {
logger.info(s"Start to produce on $TOPIC_NAME")
val producer = newProducer()
(1 to 10)
.map { i =>
val orderId = s"id-$i"
val payment = Payment(orderId, 100d + i)
val record = new ProducerRecord[String, Payment](TOPIC_NAME, payment.id, payment)
record
}
.foreach(producer.send)
producer.close()
logger.info(s"Finish to produce on $TOPIC_NAME")
}
}