forked from confluentinc/kafka-streams-examples
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathWordCountLambdaExampleTest.java
143 lines (127 loc) · 5.16 KB
/
WordCountLambdaExampleTest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
/*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.confluent.examples.streams;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.streams.test.OutputVerifier;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Stream processing unit test of {@link WordCountLambdaExample}, using TopologyTestDriver.
*
* See {@link WordCountLambdaExample} for further documentation.
*/
public class WordCountLambdaExampleTest {
private TopologyTestDriver testDriver;
private StringDeserializer stringDeserializer = new StringDeserializer();
private LongDeserializer longDeserializer = new LongDeserializer();
private ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
@Before
public void setup() {
final StreamsBuilder builder = new StreamsBuilder();
//Create Actual Stream Processing pipeline
WordCountLambdaExample.createWordCountStream(builder);
testDriver = new TopologyTestDriver(builder.build(), WordCountLambdaExample.getStreamsConfiguration("localhost:9092"));
}
@After
public void tearDown() {
try {
testDriver.close();
} catch (final RuntimeException e) {
// https://issues.apache.org/jira/browse/KAFKA-6647 causes exception when executed in Windows, ignoring it
// Logged stacktrace cannot be avoided
System.out.println("Ignoring exception, test failing in Windows due this exception:" + e.getLocalizedMessage());
}
}
/**
* Read one Record from output topic.
*
* @return ProducerRecord containing word as key and count as value
*/
private ProducerRecord<String, Long> readOutput() {
return testDriver.readOutput(WordCountLambdaExample.outputTopic, stringDeserializer, longDeserializer);
}
/**
* Read counts from output to map.
* If the existing word is incremented, it can appear twice in output and is replaced in map
*
* @return Map of Word and counts
*/
private Map<String, Long> getOutputList() {
final Map<String, Long> output = new HashMap<>();
ProducerRecord<String, Long> outputRow;
while((outputRow = readOutput()) != null) {
output.put(outputRow.key(), outputRow.value());
}
return output;
}
/**
* Simple test validating count of one word
*/
@Test
public void testOneWord() {
final String nullKey = null;
//Feed word "Hello" to inputTopic and no kafka key, timestamp is irrelevant in this case
testDriver.pipeInput(recordFactory.create(WordCountLambdaExample.inputTopic, nullKey, "Hello", 1L));
//Read and validate output
final ProducerRecord<String, Long> output = readOutput();
OutputVerifier.compareKeyValue(output, "hello", 1L);
//No more output in topic
assertThat(readOutput()).isNull();
}
/**
* Test Word count of sentence list.
*/
@Test
public void shouldCountWords() {
final List<String> inputValues = Arrays.asList(
"Hello Kafka Streams",
"All streams lead to Kafka",
"Join Kafka Summit",
"И теперь пошли русские слова"
);
final Map<String, Long> expectedWordCounts = new HashMap<>();
expectedWordCounts.put("hello", 1L);
expectedWordCounts.put("all", 1L);
expectedWordCounts.put("streams", 2L);
expectedWordCounts.put("lead", 1L);
expectedWordCounts.put("to", 1L);
expectedWordCounts.put("join", 1L);
expectedWordCounts.put("kafka", 3L);
expectedWordCounts.put("summit", 1L);
expectedWordCounts.put("и", 1L);
expectedWordCounts.put("теперь", 1L);
expectedWordCounts.put("пошли", 1L);
expectedWordCounts.put("русские", 1L);
expectedWordCounts.put("слова", 1L);
final List<KeyValue<String, String>> records = inputValues.stream().map(v -> new KeyValue<String, String>(null, v)).collect(Collectors.toList());
testDriver.pipeInput(recordFactory.create(WordCountLambdaExample.inputTopic, records, 1L, 100L));
final Map<String, Long> actualWordCounts = getOutputList();
assertThat(actualWordCounts).containsAllEntriesOf(expectedWordCounts);
}
}