Skip to content

Commit

Permalink
add test code inchpater 2
Browse files Browse the repository at this point in the history
  • Loading branch information
IISI-1204003 committed Apr 28, 2022
1 parent e22ded2 commit 0f098b9
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 120 deletions.
9 changes: 8 additions & 1 deletion chapter-02/hello-streams/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ repositories {
// You can declare any Maven/Ivy/file repository here.
jcenter()
}
ext {
kafkaVersion = '2.7.2'
}

dependencies {
implementation 'org.apache.kafka:kafka-streams:2.7.2'
implementation "org.apache.kafka:kafka-streams:${kafkaVersion}"

// Used for fatjar excuting showing console when using command "java -jar ./build/libs/hello-streams-all.jar"
implementation 'org.slf4j:slf4j-api:1.7.36'
Expand All @@ -39,6 +42,10 @@ dependencies {

// Use JUnit Jupiter Engine for testing.
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.2'

// test dependencies
testImplementation "org.apache.kafka:kafka-streams-test-utils:${kafkaVersion}"
testImplementation 'org.assertj:assertj-core:3.22.0'
}

application {
Expand Down
105 changes: 50 additions & 55 deletions chapter-02/hello-streams/src/test/java/com/example/DslExampleTest.java
Original file line number Diff line number Diff line change
@@ -1,73 +1,68 @@
package com.example;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.KStream;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class DslExampleTest {
private TopologyTestDriver testDriver;
private TestInputTopic<Void, String> inputTopic;


private final ByteArrayOutputStream outContent = new ByteArrayOutputStream();
private final ByteArrayOutputStream errContent = new ByteArrayOutputStream();
private final PrintStream originalOut = System.out;
private final PrintStream originalErr = System.err;

@BeforeEach
protected void setUp() throws Exception {
System.setOut(new PrintStream(outContent));
System.setErr(new PrintStream(errContent));


// the builder is used to construct the topology
StreamsBuilder builder = new StreamsBuilder();
private TopologyTestDriver testDriver;
private TestInputTopic<Void, String> inputTopic;

private final ByteArrayOutputStream outContent = new ByteArrayOutputStream();
private final ByteArrayOutputStream errContent = new ByteArrayOutputStream();
private final PrintStream originalOut = System.out;
private final PrintStream originalErr = System.err;

@BeforeEach
protected void setUp() throws Exception {
System.setOut(new PrintStream(outContent));
System.setErr(new PrintStream(errContent));

// the builder is used to construct the topology
StreamsBuilder builder = new StreamsBuilder();

// read from the source topic, "users"
KStream<Void, String> stream = builder.stream("users");

// for each record that appears in the source topic,
// print the value (as SayHelloProcessor)
stream.foreach(
(key, value) -> {
System.out.println("(DSL) Hello, " + value);
});

// the builder is used to construct the topology
Topology topology = builder.build();

// read from the source topic, "users"
KStream<Void, String> stream = builder.stream("users");
// create a test driver. we will use this to pipe data to our topology
testDriver = Utils.createTestDriverAndTestTopic(topology);

// for each record that appears in the source topic,
// print the value (as SayHelloProcessor)
stream.foreach(
(key, value) -> {
System.out.println("(DSL) Hello, " + value);
});


// the builder is used to construct the topology
Topology topology = builder.build();
// create the test input topic
inputTopic =
testDriver.createInputTopic(
"users", Serdes.Void().serializer(), Serdes.String().serializer());
}



// create a test driver. we will use this to pipe data to our topology
testDriver = Utils.createTestDriverAndTestTopic(topology);

// create the test input topic
inputTopic =
testDriver.createInputTopic(
"users", Serdes.Void().serializer(), Serdes.String().serializer());

}
@Test
public void testMain() {
// stimulate to use topic named "users"
inputTopic.pipeInput("Robert");
assertThat(outContent.toString()).isEqualTo("(DSL) Hello, Robert" + System.lineSeparator());
}

@Test
public void testMain() {
//stimulate to use topic named "users"
inputTopic.pipeInput( "Robert");
assertThat(outContent.toString()).isEqualTo("(DSL) Hello, Robert"+System.lineSeparator() );
}
@AfterEach
public void restoreStreams() {
System.setOut(originalOut);
System.setErr(originalErr);
}
@AfterEach
public void restoreStreams() {
System.setOut(originalOut);
System.setErr(originalErr);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.Topology;
Expand All @@ -14,40 +13,43 @@
import org.junit.jupiter.api.Test;

public class ProcessorApiExampleTest {
private TopologyTestDriver testDriver;
private TestInputTopic<Void, String> inputTopic;

private final ByteArrayOutputStream outContent = new ByteArrayOutputStream();
private final ByteArrayOutputStream errContent = new ByteArrayOutputStream();
private final PrintStream originalOut = System.out;
private final PrintStream originalErr = System.err;

@BeforeEach
protected void setUp() throws Exception {
System.setOut(new PrintStream(outContent));
System.setErr(new PrintStream(errContent));

Topology topology = new Topology();
topology.addSource("UserSource", "users");
topology.addProcessor("SayHello", SayHelloProcessor::new, "UserSource");

// create a test driver. we will use this to pipe data to our topology
testDriver = Utils.createTestDriverAndTestTopic(topology);

// create the test input topic
inputTopic = testDriver.createInputTopic("users", Serdes.Void().serializer(), Serdes.String().serializer());
}

@Test
public void testMain() {
// stimulate to use topic named "users"
inputTopic.pipeInput("Robert");
assertThat(outContent.toString()).isEqualTo("(Processor API) Hello, Robert" + System.lineSeparator());
}

@AfterEach
public void restoreStreams() {
System.setOut(originalOut);
System.setErr(originalErr);
}
private TopologyTestDriver testDriver;
private TestInputTopic<Void, String> inputTopic;

private final ByteArrayOutputStream outContent = new ByteArrayOutputStream();
private final ByteArrayOutputStream errContent = new ByteArrayOutputStream();
private final PrintStream originalOut = System.out;
private final PrintStream originalErr = System.err;

@BeforeEach
protected void setUp() throws Exception {
System.setOut(new PrintStream(outContent));
System.setErr(new PrintStream(errContent));

Topology topology = new Topology();
topology.addSource("UserSource", "users");
topology.addProcessor("SayHello", SayHelloProcessor::new, "UserSource");

// create a test driver. we will use this to pipe data to our topology
testDriver = Utils.createTestDriverAndTestTopic(topology);

// create the test input topic
inputTopic =
testDriver.createInputTopic(
"users", Serdes.Void().serializer(), Serdes.String().serializer());
}

@Test
public void testMain() {
// stimulate to use topic named "users"
inputTopic.pipeInput("Robert");
assertThat(outContent.toString())
.isEqualTo("(Processor API) Hello, Robert" + System.lineSeparator());
}

@AfterEach
public void restoreStreams() {
System.setOut(originalOut);
System.setErr(originalErr);
}
}
53 changes: 26 additions & 27 deletions chapter-02/hello-streams/src/test/java/com/example/Utils.java
Original file line number Diff line number Diff line change
@@ -1,38 +1,37 @@
package com.example;

import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;

public class Utils {
/***
* set the required properties for running Kafka Streams
* **/
static Properties createPros() {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Void().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return config;
}
/****
* create a test driver. we will use this to pipe data to our topology
* **/
static TopologyTestDriver createTestDriverAndTestTopic(final Topology topology) {
// set the required properties for running Kafka Streams
Properties config = createPros();
// create a test driver. we will use this to pipe data to our topology
TopologyTestDriver testDriver = new TopologyTestDriver(topology, config);
return testDriver;
}

/***
* set the required properties for running Kafka Streams
* **/
static Properties createPros() {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Void().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return config;
}

/****
* create a test driver. we will use this to pipe data to our topology
* **/
static TopologyTestDriver createTestDriverAndTestTopic(final Topology topology) {
// set the required properties for running Kafka Streams
Properties config = createPros();

// create a test driver. we will use this to pipe data to our topology
TopologyTestDriver testDriver = new TopologyTestDriver(topology, config);

return testDriver;
}
}

0 comments on commit 0f098b9

Please sign in to comment.