Skip to content

Commit

Permalink
implement script to create kafka topics and update ksql files
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivan Franchin committed Jun 9, 2020
1 parent fe6befd commit bcb11bf
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 34 deletions.
57 changes: 36 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,24 @@ The goal of this project is to play with [`Kafka`](https://kafka.apache.org), [`
> docker-compose build
> ```
- Wait a little bit until all containers are `Up (healthy)`. To check the status of the containers, run
- Wait a little bit until all containers are `Up (healthy)`. To check the status of the containers run
```
docker-compose ps
```
## Create Kafka Topics
In order to have topics in `Kafka` with more than `1` partition, we must create them manually and not wait for the connectors to create for us. So, for it:
- Open a new terminal and make sure you are in `springboot-kafka-debezium-ksql` root folder
- Run the script below
```
./create-kafka-topics.sh
```
> **Note:** you can ignore the warnings
It will create the topics `mysql.researchdb.institutes`, `mysql.researchdb.researchers`, `mysql.researchdb.articles` and `mysql.researchdb.reviews` with `5` partitions.
## Create connectors (3/4)
Expand Down Expand Up @@ -67,25 +81,19 @@ The goal of this project is to play with [`Kafka`](https://kafka.apache.org), [`
- Run the command below to start the application
```
./mvnw clean spring-boot:run --projects research-service
./mvnw clean spring-boot:run --projects research-service -Dspring-boot.run.jvmArguments="-Dserver.port=9080"
```
> **Note:** It will create some articles, institutes and researchers. If you don't want it, just set to `false` the properties `load-samples.articles.enabled`, `load-samples.institutes.enabled` and `load-samples.researchers.enabled` in `application.yml`.
- The Swagger link is http://localhost:9080/swagger-ui.html
- **IMPORTANT:** create at least one `review` so that the topic `mysql.researchdb.reviews` is created on Kafka. Below there is a request sample to create a review.
- **Important:** create at least one `review` so that `mysql.researchdb.reviews-key` and `mysql.researchdb.reviews-value` are created in `Schema Registry`. Below there is a request sample to create a review.
```
curl -i -X POST localhost:9080/api/reviews \
-H "Content-Type: application/json" \
-d "{ \"researcherId\": 1, \"articleId\": 1, \"comment\": \"Ln 56: replace the 'a' by 'an'\"}"
```
Otherwise, you will have the following exception while running `ksql-cli`
```
io.confluent.ksql.parser.exception.ParseFailedException: Exception while processing statement: Avro schema for message
values on topic mysql.researchdb.reviews does not exist in the Schema Registry.
```
## Run ksql-cli
- In a new terminal, inside `springboot-kafka-debezium-ksql` root folder, run the `docker` command below to start `ksql-cli`
Expand Down Expand Up @@ -134,7 +142,7 @@ The goal of this project is to play with [`Kafka`](https://kafka.apache.org), [`
- check whether the topic was created
```
DESCRIBE RESEARCHERS_INSTITUTES;
SELECT * from RESEARCHERS_INSTITUTES EMIT CHANGES LIMIT 5;
SELECT * FROM RESEARCHERS_INSTITUTES EMIT CHANGES LIMIT 5;
```
- Run the script below. It will create `REVIEWS_RESEARCHERS_INSTITUTES_ARTICLES` topic
Expand All @@ -145,7 +153,7 @@ The goal of this project is to play with [`Kafka`](https://kafka.apache.org), [`
- Check whether the topic was created
```
DESCRIBE REVIEWS_RESEARCHERS_INSTITUTES_ARTICLES;
SELECT * from REVIEWS_RESEARCHERS_INSTITUTES_ARTICLES EMIT CHANGES LIMIT 1;
SELECT * FROM REVIEWS_RESEARCHERS_INSTITUTES_ARTICLES EMIT CHANGES LIMIT 1;
```
## Create connectors (4/4)
Expand All @@ -168,21 +176,28 @@ The goal of this project is to play with [`Kafka`](https://kafka.apache.org), [`
- Run the command below to start the application
```
./mvnw clean spring-boot:run --projects kafka-research-consumer
./mvnw clean spring-boot:run --projects kafka-research-consumer -Dspring-boot.run.jvmArguments="-Dserver.port=9081"
```
- This service runs on port `9081`. The `health` endpoint is: http://localhost:9081/actuator/health
- \[Optional\] We can start another `kafka-research-consumer` instance by opening another terminal and running
```
./mvnw clean spring-boot:run --projects kafka-research-consumer -Dspring-boot.run.jvmArguments="-Dserver.port=9082"
```
## Testing
- Go to the terminal where `ksql-cli` is running. On `ksql-cli` command line, run the following query
```
SELECT * from REVIEWS_RESEARCHERS_INSTITUTES_ARTICLES EMIT CHANGES;
SELECT * FROM REVIEWS_RESEARCHERS_INSTITUTES_ARTICLES EMIT CHANGES;
```
- In another terminal, call the `research-service` simulation endpoint
```
curl -X POST localhost:9080/api/simulation/reviews \
-H "Content-Type: application/json" \
-d "{\"total\": 200, \"sleep\": 100}"
-d "{\"total\": 100, \"sleep\": 100}"
```
- The GIF below shows it
Expand Down Expand Up @@ -214,11 +229,11 @@ The goal of this project is to play with [`Kafka`](https://kafka.apache.org), [`
- Get the list of subjects
```
curl http://localhost:8081/subjects
curl localhost:8081/subjects
```
- Get the latest version of the subject `mysql.researchdb.researchers-value`
```
curl http://localhost:8081/subjects/mysql.researchdb.researchers-value/versions/latest
curl localhost:8081/subjects/mysql.researchdb.researchers-value/versions/latest
```
- **Kafka Manager**
Expand All @@ -238,14 +253,14 @@ The goal of this project is to play with [`Kafka`](https://kafka.apache.org), [`
- Get all indices
```
curl http://localhost:9200/_cat/indices?v
curl "localhost:9200/_cat/indices?v"
```
- Search for documents
```
curl http://localhost:9200/articles/_search?pretty
curl http://localhost:9200/institutes/_search?pretty
curl http://localhost:9200/researchers/_search?pretty
curl http://localhost:9200/reviews/_search?pretty
curl "localhost:9200/articles/_search?pretty"
curl "localhost:9200/institutes/_search?pretty"
curl "localhost:9200/researchers/_search?pretty"
curl "localhost:9200/reviews/_search?pretty"
```
- **MySQL**
Expand Down
26 changes: 26 additions & 0 deletions create-kafka-topics.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/usr/bin/env bash

echo
echo "Create topic mysql.researchdb.institutes"
echo "----------------------------------------"
docker exec -t zookeeper kafka-topics --create --bootstrap-server kafka:9092 --replication-factor 1 --partitions 5 --topic mysql.researchdb.institutes

echo
echo "Create topic mysql.researchdb.researchers"
echo "-----------------------------------------"
docker exec -t zookeeper kafka-topics --create --bootstrap-server kafka:9092 --replication-factor 1 --partitions 5 --topic mysql.researchdb.researchers

echo
echo "Create topic mysql.researchdb.articles"
echo "--------------------------------------"
docker exec -t zookeeper kafka-topics --create --bootstrap-server kafka:9092 --replication-factor 1 --partitions 5 --topic mysql.researchdb.articles

echo
echo "Create topic mysql.researchdb.reviews"
echo "-------------------------------------"
docker exec -t zookeeper kafka-topics --create --bootstrap-server kafka:9092 --replication-factor 1 --partitions 5 --topic mysql.researchdb.reviews

echo
echo "List topics"
echo "-----------"
docker exec -t zookeeper kafka-topics --list --bootstrap-server kafka:9092
6 changes: 2 additions & 4 deletions docker/ksql/researchers-institutes.ksql
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
CREATE STREAM INSTITUTES_SRC_1 WITH (KAFKA_TOPIC='mysql.researchdb.institutes', VALUE_FORMAT='AVRO');
CREATE STREAM INSTITUTES_SRC WITH (PARTITIONS=5) AS SELECT * FROM INSTITUTES_SRC_1;
CREATE STREAM INSTITUTES_SRC WITH (KAFKA_TOPIC='mysql.researchdb.institutes', VALUE_FORMAT='AVRO', PARTITIONS=5);
CREATE STREAM INSTITUTES_SRC_REKEY WITH (PARTITIONS=5) AS SELECT * FROM INSTITUTES_SRC PARTITION BY ID;

CREATE STREAM RESEARCHERS_SRC_1 WITH (KAFKA_TOPIC='mysql.researchdb.researchers', VALUE_FORMAT='AVRO');
CREATE STREAM RESEARCHERS_SRC WITH (PARTITIONS=5) AS SELECT * FROM RESEARCHERS_SRC_1;
CREATE STREAM RESEARCHERS_SRC WITH (KAFKA_TOPIC='mysql.researchdb.researchers', VALUE_FORMAT='AVRO', PARTITIONS=5);
CREATE STREAM RESEARCHERS_SRC_REKEY WITH (PARTITIONS=5) AS SELECT * FROM RESEARCHERS_SRC PARTITION BY ID;

CREATE TABLE INSTITUTES_TABLE (id BIGINT, name VARCHAR, created_at BIGINT, updated_at BIGINT) WITH (KAFKA_TOPIC='INSTITUTES_SRC_REKEY', VALUE_FORMAT='AVRO', KEY='id', PARTITIONS=5);
Expand Down
6 changes: 2 additions & 4 deletions docker/ksql/reviews-researchers-institutes-articles.ksql
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
CREATE STREAM ARTICLES_SRC_1 WITH (KAFKA_TOPIC='mysql.researchdb.articles', VALUE_FORMAT='AVRO');
CREATE STREAM ARTICLES_SRC WITH (PARTITIONS=5) AS SELECT * FROM ARTICLES_SRC_1;
CREATE STREAM ARTICLES_SRC WITH (KAFKA_TOPIC='mysql.researchdb.articles', VALUE_FORMAT='AVRO', PARTITIONS=5);
CREATE STREAM ARTICLES_SRC_REKEY WITH (PARTITIONS=5) AS SELECT * FROM ARTICLES_SRC PARTITION BY ID;

CREATE STREAM REVIEWS_SRC_1 WITH (KAFKA_TOPIC='mysql.researchdb.reviews', VALUE_FORMAT='AVRO');
CREATE STREAM REVIEWS_SRC WITH (PARTITIONS=5) AS SELECT * FROM REVIEWS_SRC_1;
CREATE STREAM REVIEWS_SRC WITH (KAFKA_TOPIC='mysql.researchdb.reviews', VALUE_FORMAT='AVRO', PARTITIONS=5);
CREATE STREAM REVIEWS_SRC_REKEY WITH (PARTITIONS=5) AS SELECT * FROM REVIEWS_SRC PARTITION BY ID;

CREATE TABLE RESEARCHERS_INSTITUTES_TABLE (researcher_id BIGINT, researcher_first_name VARCHAR, researcher_last_name VARCHAR, institute_id BIGINT, institute_name VARCHAR) WITH (KAFKA_TOPIC='RESEARCHERS_INSTITUTES', VALUE_FORMAT='AVRO', KEY='researcher_id', PARTITIONS=5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import org.springframework.data.elasticsearch.annotations.Document;

@Data
@Document(indexName = "reviews", type = "review")
@Document(indexName = "reviews", createIndex = false)
public class Review {

@Id
Expand Down
2 changes: 0 additions & 2 deletions kafka-research-consumer/src/main/resources/application.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
server.port: 9081

spring:
application:
name: kafka-research-consumer
Expand Down
2 changes: 0 additions & 2 deletions research-service/src/main/resources/application.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
server.port: 9080

spring:
application:
name: research-service
Expand Down

0 comments on commit bcb11bf

Please sign in to comment.