Skip to content


add ch6 spring version of sample
Browse files Browse the repository at this point in the history
  • Loading branch information
IISI-1204003 committed May 10, 2022
1 parent 40d5869 commit 751a2ef
Show file tree
Hide file tree
Showing 23 changed files with 1,293 additions and 0 deletions.
33 changes: 33 additions & 0 deletions chapter-05/patient-monitoring-app-spring/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@

### STS ###

### IntelliJ IDEA ###

### NetBeans ###

### VS Code ###
10 changes: 10 additions & 0 deletions chapter-05/patient-monitoring-app-spring/
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@

## Spring Cloud Stream - functional and reactive

* [Spring Cloud Function](
* [Cloud Events and Spring - part 1 (Oleg Zhurakousky)](
* [Cloud Events and Spring - part 2 (Oleg Zhurakousky)](
* [Spring Cloud Stream - demystified and simplified](
* [Spring Cloud Stream - functional and reactive (Oleg Zhurakousky)](
109 changes: 109 additions & 0 deletions chapter-05/patient-monitoring-app-spring/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="" xmlns:xsi=""
<relativePath/> <!-- lookup parent from repository -->
<description>Demo project for Spring Boot</description>
<!-- Spring-kafka-test include EmbeddedKafka -->


Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.magicalpipelines;

import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
import org.apache.kafka.streams.state.HostInfo;

class App {
public static void main(String[] args) {
Topology topology =;

// we allow the following system properties to be overridden
String host = System.getProperty("host");
Integer port = Integer.parseInt(System.getProperty("port"));
String stateDir = System.getProperty("stateDir");
String endpoint = String.format("%s:%s", host, port);

// set the required properties for running Kafka Streams
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "dev-consumer");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, endpoint);
props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
// an example of setting the timestamp extractor using a streams config
// note that we override this in our topology implementation, this is
// just here for demonstration purposes
StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);

// build the topology
System.out.println("Starting Patient Monitoring Application");
KafkaStreams streams = new KafkaStreams(topology, props);
// close Kafka Streams when the JVM shuts down (e.g. SIGTERM)
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

// clean up local state since many of the tutorials write to the same location
// you should run this sparingly in production since it will force the state
// store to be rebuilt on start up

// start streaming

// start the REST service
HostInfo hostInfo = new HostInfo(host, port);
// RestService service = new RestService(hostInfo, streams);
// service.start();
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.magicalpipelines;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class MyRestoreListener implements StateRestoreListener {

private static final Logger log = LoggerFactory.getLogger(MyRestoreListener.class);

public void onRestoreStart(
TopicPartition topicPartition, String storeName, long startingOffset, long endingOffset) {"The following state store is being restored: {}", storeName);

public void onRestoreEnd(TopicPartition topicPartition, String storeName, long totalRestored) {"Restore complete for the following state store: {}", storeName);

public void onBatchRestored(
TopicPartition topicPartition, String storeName, long batchEndOffset, long numRestored) {
// this is very noisy. don't log anything
"A batch of {} records has been restored in the following state store: {}",
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package com.magicalpipelines;

import com.magicalpipelines.model.BodyTemp;
import com.magicalpipelines.model.CombinedVitals;
import com.magicalpipelines.model.Pulse;
import com.magicalpipelines.serialization.json.JsonSerdes;
import java.time.Duration;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Printed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.Suppressed.BufferConfig;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.Windowed;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class PatientMonitoringTopology {
private static final Logger log = LoggerFactory.getLogger(PatientMonitoringTopology.class);

public static Topology build() {
// the builder is used to construct the topology
StreamsBuilder builder = new StreamsBuilder();

// the following topology steps are numbered. these numbers correlate with
// the topology design shown in the book (Chapter 5: Windows and Time)

// 1.1
Consumed<String, Pulse> pulseConsumerOptions =
Consumed.with(Serdes.String(), JsonSerdes.Pulse())
.withTimestampExtractor(new VitalTimestampExtractor());

KStream<String, Pulse> pulseEvents =
// register the pulse-events stream"pulse-events", pulseConsumerOptions);

// 1.2
Consumed<String, BodyTemp> bodyTempConsumerOptions =
Consumed.with(Serdes.String(), JsonSerdes.BodyTemp())
.withTimestampExtractor(new VitalTimestampExtractor());

KStream<String, BodyTemp> tempEvents =
// register the body-temp-events stream"body-temp-events", bodyTempConsumerOptions);

// turn pulse into a rate (bpm)
TimeWindows tumblingWindow =

* Examples of other windows (not needed for the tutorial) are commented
* out below
* TimeWindows hoppingWindow =
* TimeWindows.of(Duration.ofSeconds(5)).advanceBy(Duration.ofSeconds(4));
* SessionWindows sessionWindow = SessionWindows.with(Duration.ofSeconds(5));
* JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5));
* SlidingWindows slidingWindow =
* SlidingWindows.withTimeDifferenceAndGrace(Duration.ofSeconds(5), Duration.ofSeconds(0));

KTable<Windowed<String>, Long> pulseCounts =
// 2
// 3.1 - windowed aggregation
// 3.2 - windowed aggregation
// 4

// 5.1
// filter for any pulse that exceeds our threshold
KStream<String, Long> highPulse =
// this peek operator is not included in the book, but was added
// to this code example so you could view some additional information
// when running the application locally :)
(key, value) -> {
String id = new String(key.key());
Long start = key.window().start();
Long end = key.window().end();
"Patient {} had a heart rate of {} between {} and {}", id, value, start, end);
// 5.1
.filter((key, value) -> value >= 100)
// 6
(windowedKey, value) -> {
return KeyValue.pair(windowedKey.key(), value);

// 5.2
// filter for any temperature reading that exceeds our threshold
KStream<String, BodyTemp> highTemp =
(key, value) ->
value != null && value.getTemperature() != null && value.getTemperature() > 100.4);

// looking for step 6? it's chained right after 5.1

// 7
StreamJoined<String, Long, BodyTemp> joinParams =
StreamJoined.with(Serdes.String(), Serdes.Long(), JsonSerdes.BodyTemp());

JoinWindows joinWindows =
// timestamps must be 1 minute apart
// tolerate late arriving data for up to 10 seconds

ValueJoiner<Long, BodyTemp, CombinedVitals> valueJoiner =
(pulseRate, bodyTemp) -> new CombinedVitals(pulseRate.intValue(), bodyTemp);

KStream<String, CombinedVitals> vitalsJoined =
highPulse.join(highTemp, valueJoiner, joinWindows, joinParams);

// 8"alerts", Produced.with(Serdes.String(), JsonSerdes.CombinedVitals()));

// debug only
.print(Printed.<Windowed<String>, Long>toSysOut().withLabel("pulse-counts"));
highPulse.print(Printed.<String, Long>toSysOut().withLabel("high-pulse"));
highTemp.print(Printed.<String, BodyTemp>toSysOut().withLabel("high-temp"));
vitalsJoined.print(Printed.<String, CombinedVitals>toSysOut().withLabel("vitals-joined"));


0 comments on commit 751a2ef

Please sign in to comment.