Skip to content

Commit a1148b3

Browse files
Initial Commit
1 parent 1b829ec commit a1148b3

File tree

8 files changed

+1223
-0
lines changed

8 files changed

+1223
-0
lines changed

AvroConsumer-V1/AvroConsumer.java

+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import java.util.*;
2+
import org.apache.kafka.clients.consumer.*;
3+
4+
5+
public class AvroConsumer{
6+
7+
public static void main(String[] args) throws Exception{
8+
9+
String topicName = "AvroClicks";
10+
11+
String groupName = "RG";
12+
Properties props = new Properties();
13+
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
14+
props.put("group.id", groupName);
15+
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
16+
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
17+
props.put("schema.registry.url", "http://localhost:8081");
18+
props.put("specific.avro.reader", "true");
19+
20+
KafkaConsumer<String, ClickRecord> consumer = new KafkaConsumer<>(props);
21+
consumer.subscribe(Arrays.asList(topicName));
22+
try{
23+
while (true){
24+
ConsumerRecords<String, ClickRecord> records = consumer.poll(100);
25+
for (ConsumerRecord<String, ClickRecord> record : records){
26+
System.out.println("Session id="+ record.value().getSessionId()
27+
+ " Channel=" + record.value().getChannel()
28+
+ " Referrer=" + record.value().getReferrer());
29+
}
30+
}
31+
}catch(Exception ex){
32+
ex.printStackTrace();
33+
}
34+
finally{
35+
consumer.close();
36+
}
37+
}
38+
39+
}

0 commit comments

Comments
 (0)