Skip to content

Commit

Permalink
add reactive programing api endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
IISI-1204003 committed May 10, 2022
1 parent 2969a40 commit 40d5869
Show file tree
Hide file tree
Showing 9 changed files with 380 additions and 157 deletions.
6 changes: 6 additions & 0 deletions chapter-04/video-game-leaderboard-spring/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,16 @@
<artifactId>gson</artifactId>
<version>2.9.0</version>
</dependency>
<!--
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,30 +1,19 @@
package com.magicalpipelines;

import java.util.Map;
import java.util.UUID;
import java.util.function.BiFunction;
import java.util.function.Function;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.state.KeyValueStore;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerde;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueJoiner;

import com.magicalpipelines.model.Player;
import com.magicalpipelines.model.Product;
import com.magicalpipelines.model.ScoreEvent;
import com.magicalpipelines.model.join.Enriched;
import com.magicalpipelines.model.join.ScoreWithPlayer;
import com.magicalpipelines.serialization.json.JsonSerdes;


public interface Topology {
Expand All @@ -35,13 +24,53 @@ public interface Topology {
* stream - ScoreWithPlayer is the value type for the score events stream - String is the lookup
* key type
*/
KeyValueMapper<String, ScoreWithPlayer, String> keyMapper =
final KeyValueMapper<String, ScoreWithPlayer, String> keyMapper =
(leftKey, scoreWithPlayer) -> {
return String.valueOf(scoreWithPlayer.getScoreEvent().getProductId());
};


// join the withPlayers stream to the product global ktable
ValueJoiner<ScoreWithPlayer, Product, Enriched> productJoiner =
/***
* join the withPlayers stream to the product global ktable <br/> <br/>
* This join needs to combine a ScoreWithPlayer (from the output of the first join) with a Product (from the products GlobalKTable) <br/> <br/>
* A ValueJoiner, expressed as a lambda, that for the KStream-GlobalKTable <br/> <br/>
***/
final ValueJoiner<ScoreWithPlayer, Product, Enriched> productJoiner =
(scoreWithPlayer, product) -> new Enriched(scoreWithPlayer, product);


/***
* KStream to KTable Join (players Join) <br/> <br/>
* join params for scoreEvents -> players join
* ***/
final Joined<String, ScoreEvent, Player> playerJoinParams =
Joined.with(
Serdes.String(),
JsonSerdes.ScoreEvent(),
JsonSerdes.Player());
/***
* join scoreEvents -> players <br/> <br/>
* ValueJoiner combines a ScoreEvent (from the score-events KStream) and a Player (from the players KTable) into a ScoreWithPlayer instance <br/> <br/>
* (score,player) -> new ScoreWithPlayer(score, player) , a static method -> ScoreWithPlayer::new
* ***/
final ValueJoiner<ScoreEvent, Player, ScoreWithPlayer> scorePlayerJoiner
// = (score,player) -> new ScoreWithPlayer(score, player);
= ScoreWithPlayer::new ;



/**
* The initial value of our aggregation will be a new HighScores instances <br/><br/>
* It tell Kafka Streams how to initialize our new data class. Initializing a class is simple; we just need to instantiate it
* */
final Initializer<HighScores> highScoresInitializer = HighScores::new;

/**
* The logic for aggregating high scores is implemented in the HighScores.add method <br/>
* This is accomplished using the Aggregator interface, which, like Initializer, is a functional interface that can be implemented using a lambda. The implementing function needs to accept three parameters:<br/>
* (1)The record key<br/>
* (2)The record value <br/>
* (3)The current aggregate value
* */
final Aggregator<String, Enriched, HighScores> highScoresAdder =
(key, value, aggregate) -> aggregate.add(value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,19 @@
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Printed;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.KeyValueStore;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.context.annotation.Bean;

import com.magicalpipelines.model.Player;
Expand All @@ -41,8 +38,9 @@ public static void main(String[] args) {
}

@Bean
public HostInfo getHostInfo() {
return new HostInfo("localhost",8080);
public HostInfo getHostInfo(@Autowired final KafkaBinderConfigurationProperties propertes) {
final String data = propertes.getConfiguration().get(StreamsConfig.APPLICATION_SERVER_CONFIG);
return HostInfo.buildFromEndpoint(data);
}
//For spring-cloud-stream to auto-register Serializer/Deserializer;
/***
Expand All @@ -58,67 +56,52 @@ public Serde<HighScores> highScoresJsonSerde() {
* **/
@Bean
public BiFunction<KStream<String, ScoreEvent>, KTable<String, Player>, KStream<String, ScoreWithPlayer>> withPlayeers() {
// join params for scoreEvents -> players join
final Joined<String, ScoreEvent, Player> playerJoinParams = Joined.with(Serdes.String(),
JsonSerdes.ScoreEvent(), JsonSerdes.Player());

// join scoreEvents -> players
final ValueJoiner<ScoreEvent, Player, ScoreWithPlayer> scorePlayerJoiner = (score,
player) -> new ScoreWithPlayer(score, player);

return (scoreEvents, players) -> scoreEvents
// now marked for re-partitioning
.selectKey((k, v) -> v.getPlayerId().toString())
// join scoreEvents -> players
.join(players, scorePlayerJoiner, playerJoinParams)
.join(players, Topology.scorePlayerJoiner, Topology.playerJoinParams)
.peek((k, v) -> log.info("Done -> {}", v));
}

// create the global product table
@Bean
public BiFunction<KStream<String, ScoreWithPlayer>, GlobalKTable<String, Product> ,KStream<String, HighScores> > jointProducts() {
final KeyValueMapper<String, ScoreWithPlayer, String> keyMapper = Topology.keyMapper;
final ValueJoiner<ScoreWithPlayer, Product, Enriched> productJoiner =Topology.productJoiner;

return (withPlayers ,products)->{
withPlayers.print(Printed.<String, ScoreWithPlayer>toSysOut().withLabel("withPlayers"));

return (withPlayers ,products)->{

KStream<String, Enriched> withProducts =
withPlayers
.join( products, Topology.keyMapper, Topology.productJoiner)
.peek((k, v) -> log.info("withProducts -> {}", v));

KStream<String, Enriched> withProducts = withPlayers.join(products, keyMapper, productJoiner);

withProducts.print(Printed.<String, Enriched>toSysOut().withLabel("with-products"));


/** Group the enriched product stream */
final KGroupedStream<String, Enriched> grouped =
withProducts.groupBy(
KGroupedStream<String, Enriched> grouped =
withProducts
.groupBy(
(key, value) -> value.getProductId().toString(),
Grouped.with(Serdes.String(), JsonSerdes.Enriched()));


// alternatively, use the following if you want to name the grouped repartition topic:
Grouped.with(Serdes.String(), JsonSerdes.Enriched()));
// alternatively, use the following if you want to name the grouped repartition topic:
// Grouped.with("grouped-enriched", Serdes.String(), JsonSerdes.Enriched()))

/** The initial value of our aggregation will be a new HighScores instances */
Initializer<HighScores> highScoresInitializer = HighScores::new;

/** The logic for aggregating high scores is implemented in the HighScores.add method */
Aggregator<String, Enriched, HighScores> highScoresAdder =
(key, value, aggregate) -> aggregate.add(value);


/** Perform the aggregation, and materialize the underlying state store for querying */
KTable<String, HighScores> highScores =
grouped.aggregate(
highScoresInitializer,
highScoresAdder,
Topology.highScoresInitializer,
Topology.highScoresAdder,
Materialized.<String, HighScores, KeyValueStore<Bytes, byte[]>>
// give the state store an explicit name to make it available for interactive
// queries
as("leader-boards")
.withKeySerde(Serdes.String())
.withValueSerde(JsonSerdes.HighScores()));
final KStream<String, HighScores> stream = highScores.toStream() ;

stream.print(Printed.<String, HighScores>toSysOut().withLabel("high-scores"));

KStream<String, HighScores> stream = highScores
.toStream()
.peek((k, v) -> log.info("high-scores -> {}", v));
return stream ;
};
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.magicalpipelines.api;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService;
import org.springframework.context.ApplicationContext;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.stereotype.Component;

import com.magicalpipelines.HighScores;

@Component
public class KafkaComponent {
@Autowired
private InteractiveQueryService interactiveQueryService;
@Autowired
private ApplicationContext context;
protected KafkaStreams getStream() {
String[] clazzNames = context.getBeanNamesForType( StreamsBuilderFactoryBean.class);
for(String name:clazzNames) {
System.out.println(name);
}
final StreamsBuilderFactoryBean streamsBuilderFactoryBean =
context.getBean("&stream-builder-jointProducts", StreamsBuilderFactoryBean.class);

KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
return kafkaStreams ;
}

protected ReadOnlyKeyValueStore<String, HighScores> getStore() {
int tmp =(int)(Math.random()*10000);

if ( (tmp % 2) == 0)
return getStoreV1();
else
return getStoreV2();
}

protected ReadOnlyKeyValueStore<String, HighScores> getStoreV1() {
return interactiveQueryService.getQueryableStore(
// state store name
"leader-boards",
// state store type
QueryableStoreTypes.keyValueStore());
}

protected ReadOnlyKeyValueStore<String, HighScores> getStoreV2() {

return getStream().store(StoreQueryParameters.fromNameAndType(
// state store name
"leader-boards",
// state store type
QueryableStoreTypes.keyValueStore()));
}
}
Loading

0 comments on commit 40d5869

Please sign in to comment.