Skip to content

Commit ba81052

Browse files
author
Ivan Franchin
committed
adapt project to run with JSON or Avro SerDes
1 parent eb0853a commit ba81052

34 files changed

+508
-152
lines changed

.env

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
##
2+
# CONNECT_KEY_CONVERTER - for Avro use: io.confluent.connect.avro.AvroConverter
3+
CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
4+
5+
##
6+
# CONNECT_VALUE_CONVERTER - for Avro use: io.confluent.connect.avro.AvroConverter
7+
CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter

README.md

+67-14
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ The main goal of this project is to play with [Kafka](https://kafka.apache.org),
77
[Kafka Streams](https://docs.confluent.io/current/streams/index.html). For this, we have: `store-api` that
88
inserts/updates records in [MySQL](https://www.mysql.com); `Kafka source connectors` that monitor
99
inserted/updated records in MySQL and push messages related to those changes to Kafka; `Kafka sink connectors` that
10-
read messages from Kafka and insert documents in [Elasticsearch](https://www.elastic.co); finally, `store-streams` that
10+
read messages from Kafka and insert/update documents in [Elasticsearch](https://www.elastic.co); finally, `store-streams` that
1111
listens for messages in Kafka, treats them using Kafka Streams and push new messages back to Kafka.
1212

1313
## Microservices
@@ -24,23 +24,47 @@ in MySQL.
2424
Spring-boot application that connects to Kafka and uses Kafka Streams API to transform some "input" topics into a new
2525
"output" topic in Kafka.
2626

27+
## Serialization/Deserialization (SerDes) formats
28+
29+
In order to run this project, you can use [JSON](https://www.json.org) or
30+
[Avro](http://avro.apache.org/docs/current/gettingstartedjava.html) formats for data serialization/deserialization
31+
to/from Kafka. The default format is `JSON`. Throughout this document, I will point out what to do if you want to use
32+
`Avro`.
33+
34+
**P.S. Avro SerDes is a work in progress and it is not completely implemented!**
35+
2736
## Start Environment
2837

2938
### Docker Compose
3039

3140
1. Open one terminal
3241

3342
2. Inside `/springboot-kafka-connect-streams` root folder run
43+
44+
**For JSON SerDes**
45+
```
46+
docker-compose up -d
47+
```
48+
49+
**For Avro SerDes**
3450
```
51+
export CONNECT_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
52+
export CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
53+
3554
docker-compose up -d
3655
```
56+
> Note 1.
57+
> The file `docker-compose.yml` has two environment variables: `CONNECT_KEY_CONVERTER` and `CONNECT_VALUE_CONVERTER`.
58+
> The default value is defined in `.env` file and is `org.apache.kafka.connect.json.JsonConverter` for `KEY` and `VALUE`.
59+
> So, we just need to change in case we are configuring `For Avro SerDes`.
60+
61+
> Note 2.
3762
> To stop and remove containers, networks and volumes type:
3863
> ```
3964
> docker-compose down -v
4065
> ```
4166
42-
- Wait a little bit until all containers are `Up (healthy)`
43-
- In order to check the status of the containers run the command
67+
3. Wait a little bit until all containers are `Up (healthy)`. In order to check the status of the containers run the command
4468
```
4569
docker-compose ps
4670
```
@@ -63,11 +87,24 @@ mvn spring-boot:run
6387
### Create connectors
6488
6589
1. In a terminal, run the following script to create the connectors on `kafka-connect`
90+
91+
**For JSON SerDes**
92+
```
93+
./create-connectors-jsonconverter.sh
6694
```
67-
./create-connectors.sh
68-
```
6995
70-
2. At the end of the script, it shows (besides other info) the state connectors and their tasks. You must see something like
96+
**For Avro SerDes**
97+
```
98+
./create-connectors-avroconverter.sh
99+
```
100+
101+
2. You can check the state of the connectors and their tasks on `Kafka Connect UI` (http://localhost:8086) or
102+
running the following script
103+
```
104+
./check-connectors-state.sh
105+
```
106+
107+
Once the connectors and their tasks are ready (RUNNING state), you should see something like
71108
```
72109
{"name":"mysql-source-customers","connector":{"state":"RUNNING","worker_id":"kafka-connect:8083"},"tasks":[{"state":"RUNNING","id":0,"worker_id":"kafka-connect:8083"}],"type":"source"}
73110
{"name":"mysql-source-products","connector":{"state":"RUNNING","worker_id":"kafka-connect:8083"},"tasks":[{"state":"RUNNING","id":0,"worker_id":"kafka-connect:8083"}],"type":"source"}
@@ -77,36 +114,47 @@ mvn spring-boot:run
77114
{"name":"elasticsearch-sink-products","connector":{"state":"RUNNING","worker_id":"kafka-connect:8083"},"tasks":[{"state":"RUNNING","id":0,"worker_id":"kafka-connect:8083"}],"type":"sink"}
78115
{"name":"elasticsearch-sink-orders","connector":{"state":"RUNNING","worker_id":"kafka-connect:8083"},"tasks":[{"state":"RUNNING","id":0,"worker_id":"kafka-connect:8083"}],"type":"sink"}
79116
```
80-
**Connectors and their tasks must have a RUNNING state**
81-
82-
3. You can also check the state of the connectors and their tasks at http://localhost:8086
83117
118+
On Kafka Connect UI, you should see
84119
![kafka-connect-ui](images/kafka-connect-ui.png)
85120
86121
4. If there is any problem, you can check `kafka-connect` container logs.
87122
```
88123
docker logs kafka-connect -f
89124
```
90125
91-
5. Source connectors use `JSONConverter` to serialize data from MySQL to Kafka. The same way, sink connectors use
92-
`JSONConverter` to deserialize messages from Kafka to Elasticsearch.
126+
5. Connectors use *Converters* for data serialization and deserialization. If you are configuring `For JSON SerDes`, the
127+
converter used is `JsonConverter`. On the other hand, if the configuration is `For Avro SerDes`, the converter used is
128+
`AvroConverter`.
129+
130+
**IMPORTANT**: if the Source Connector Converter serializes data, for instance, from `JSON` to `Bytes` (using
131+
`JsonConverter`), then the Sink Connector Converter must also use `JsonConverter` to deserialize the `Bytes`,
132+
otherwise an error will be thrown. The document
133+
[Kafka Connect Deep Dive – Converters and Serialization Explained](https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained)
134+
explains it very well.
93135
94136
### Start store-streams
95137
96138
1. Open a new terminal
97139
98140
2. In `/springboot-kafka-connect-streams/store-streams` folder, run
141+
142+
**For JSON SerDes**
99143
```
100144
mvn spring-boot:run
101145
```
102146
103-
3. It runs on port `9081`. The `health` endpoint is http://localhost:9081/actuator/health
104-
147+
**(NOT READY!) For Avro SerDes**
148+
```
149+
mvn spring-boot:run -Dspring-boot.run.profiles=avroconverter
150+
```
105151
> Note: the command below generates Java classes from Avro files
106152
> ```
107153
> mvn generate-sources
108154
> ```
109155
156+
3. This service runs on port `9081`. The `health` endpoint is http://localhost:9081/actuator/health
157+
110158
## Useful Links/Commands
111159
112160
### Elasticsearch
@@ -115,7 +163,7 @@ mvn spring-boot:run
115163
116164
- You can use `curl` to check some documents, for example in `store-mysql-customers` index
117165
```
118-
curl http://localhost:9200/store-mysql-customers/_search?pretty
166+
curl http://localhost:9200/store-streams-orders/_search?pretty
119167
```
120168
121169
### Kafka Topics UI
@@ -156,6 +204,11 @@ docker run --tty --interactive --rm --network=springboot-kafka-connect-streams_d
156204
-t store-mysql-customers -C -c1
157205
```
158206
207+
## TODO
208+
209+
- adapt `store-streams` to run `For Avro SerDes`. I am having problem with making `spring-cloud-stream-kafka-streams`
210+
and `Avro` work together.
211+
159212
## References
160213
161214
- https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-1 (2 and 3)

check-connectors-state.sh

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
#!/usr/bin/env bash
2+
3+
echo "-------------------------------"
4+
echo "Connector and their tasks state"
5+
echo "-------------------------------"
6+
7+
curl localhost:8083/connectors/mysql-source-customers/status
8+
9+
echo
10+
curl localhost:8083/connectors/mysql-source-products/status
11+
12+
echo
13+
curl localhost:8083/connectors/mysql-source-orders/status
14+
15+
echo
16+
curl localhost:8083/connectors/mysql-source-orders_products/status
17+
18+
echo
19+
curl localhost:8083/connectors/elasticsearch-sink-customers/status
20+
21+
echo
22+
curl localhost:8083/connectors/elasticsearch-sink-products/status
23+
24+
echo
25+
curl localhost:8083/connectors/elasticsearch-sink-orders/status
26+
27+
echo
28+
echo "You can also use Kafka Connect UI, link http://localhost:8086"
29+
echo
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"name": "elasticsearch-sink-customers",
3+
"config": {
4+
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
5+
"topics": "store-mysql-customers",
6+
"connection.url": "http://elasticsearch:9200",
7+
"type.name": "kafka-connect",
8+
"tasks.max": "1"
9+
}
10+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"name": "elasticsearch-sink-orders",
3+
"config": {
4+
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
5+
"topics": "store-streams-orders",
6+
"connection.url": "http://elasticsearch:9200",
7+
"type.name": "kafka-connect",
8+
"tasks.max": "1"
9+
}
10+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"name": "elasticsearch-sink-products",
3+
"config": {
4+
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
5+
"topics": "store-mysql-products",
6+
"connection.url": "http://elasticsearch:9200",
7+
"type.name": "kafka-connect",
8+
"tasks.max": "1"
9+
}
10+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
{
2+
"name": "mysql-source-customers",
3+
"config": {
4+
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
5+
"connection.url": "jdbc:mysql://store-mysql:3306/storedb?autoReconnect=true&useSSL=false&useUnicode=yes&characterEncoding=UTF-8&useLegacyDatetimeCode=false&serverTimezone=UTC",
6+
"connection.user": "root",
7+
"connection.password": "secret",
8+
"table.whitelist": "customers",
9+
"mode": "timestamp+incrementing",
10+
"timestamp.column.name": "updated_at",
11+
"incrementing.column.name": "id",
12+
"topic.prefix": "store-mysql-",
13+
"tasks.max": "1",
14+
15+
"_comment": "--- SMT (Single Message Transform) ---",
16+
"transforms": "setSchemaName, dropFields, maskFields, createKey, extractId",
17+
18+
"_comment": "--- Change the schema name ---",
19+
"transforms.setSchemaName.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
20+
"transforms.setSchemaName.schema.name": "com.mycompany.commons.storeapp.avro.Customer",
21+
22+
"_comment": "--- Drop fields ---",
23+
"transforms.dropFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
24+
"transforms.dropFields.blacklist": "updated_at",
25+
26+
"_comment": "--- Mask fields ---",
27+
"transforms.maskFields.type":"org.apache.kafka.connect.transforms.MaskField$Value",
28+
"transforms.maskFields.fields":"phone",
29+
30+
"_comment": "--- Add key to the message based on the entity id field ---",
31+
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
32+
"transforms.createKey.fields": "id",
33+
"transforms.extractId.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
34+
"transforms.extractId.field": "id"
35+
}
36+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
{
2+
"name": "mysql-source-orders",
3+
"config": {
4+
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
5+
"connection.url": "jdbc:mysql://store-mysql:3306/storedb?autoReconnect=true&useSSL=false&useUnicode=yes&characterEncoding=UTF-8&useLegacyDatetimeCode=false&serverTimezone=UTC",
6+
"connection.user": "root",
7+
"connection.password": "secret",
8+
"table.whitelist": "orders",
9+
"mode": "timestamp",
10+
"timestamp.column.name": "updated_at",
11+
"topic.prefix": "store-mysql-",
12+
"tasks.max": "1",
13+
14+
"_comment": "--- SMT (Single Message Transform) ---",
15+
"transforms": "setSchemaName, dropFields, createKey, extractId",
16+
17+
"_comment": "--- Change the schema name ---",
18+
"transforms.setSchemaName.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
19+
"transforms.setSchemaName.schema.name": "com.mycompany.commons.storeapp.avro.Order",
20+
21+
"_comment": "--- Drop fields ---",
22+
"transforms.dropFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
23+
"transforms.dropFields.blacklist": "updated_at",
24+
25+
"_comment": "--- Add key to the message based on the entity id field ---",
26+
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
27+
"transforms.createKey.fields": "id",
28+
"transforms.extractId.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
29+
"transforms.extractId.field": "id"
30+
}
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
{
2+
"name": "mysql-source-orders_products",
3+
"config": {
4+
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
5+
"connection.url": "jdbc:mysql://store-mysql:3306/storedb?autoReconnect=true&useSSL=false&useUnicode=yes&characterEncoding=UTF-8&useLegacyDatetimeCode=false&serverTimezone=UTC",
6+
"connection.user": "root",
7+
"connection.password": "secret",
8+
"table.whitelist": "orders_products",
9+
"mode": "timestamp",
10+
"timestamp.column.name": "updated_at",
11+
"topic.prefix": "store-mysql-",
12+
"tasks.max": "1",
13+
14+
"_comment": "--- SMT (Single Message Transform) ---",
15+
"transforms": "setSchemaName, dropFields, createKey, extractId",
16+
17+
"_comment": "--- Change the schema name ---",
18+
"transforms.setSchemaName.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
19+
"transforms.setSchemaName.schema.name": "com.mycompany.commons.storeapp.avro.OrderProduct",
20+
21+
"_comment": "--- Drop fields ---",
22+
"transforms.dropFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
23+
"transforms.dropFields.blacklist": "created_at, updated_at",
24+
25+
"_comment": "--- Add key to the message based on the entity id field ---",
26+
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
27+
"transforms.createKey.fields": "order_id",
28+
"transforms.extractId.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
29+
"transforms.extractId.field": "order_id"
30+
}
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
{
2+
"name": "mysql-source-products",
3+
"config": {
4+
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
5+
"connection.url": "jdbc:mysql://store-mysql:3306/storedb?autoReconnect=true&useSSL=false&useUnicode=yes&characterEncoding=UTF-8&useLegacyDatetimeCode=false&serverTimezone=UTC",
6+
"connection.user": "root",
7+
"connection.password": "secret",
8+
"table.whitelist": "products",
9+
"mode": "timestamp+incrementing",
10+
"timestamp.column.name": "updated_at",
11+
"incrementing.column.name": "id",
12+
"topic.prefix": "store-mysql-",
13+
"tasks.max": "1",
14+
15+
"_comment": "--- SMT (Single Message Transform) ---",
16+
"transforms": "setSchemaName, dropFields, createKey, extractId",
17+
18+
"_comment": "--- Change the schema name ---",
19+
"transforms.setSchemaName.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
20+
"transforms.setSchemaName.schema.name": "com.mycompany.commons.storeapp.avro.Product",
21+
22+
"_comment": "--- Drop fields ---",
23+
"transforms.dropFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
24+
"transforms.dropFields.blacklist": "updated_at, price",
25+
26+
"_comment": "--- Add key to the message based on the entity id field ---",
27+
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
28+
"transforms.createKey.fields": "id",
29+
"transforms.extractId.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
30+
"transforms.extractId.field": "id",
31+
32+
"_comment": "--- numeric.mapping doesn't work for DECIMAL fields #563 ---",
33+
"_comment": "--- https://github.com/confluentinc/kafka-connect-jdbc/issues/563 ---",
34+
"_comment": "--- price field dropped for now ---",
35+
"numeric.mapping": "best_fit"
36+
}
37+
}

0 commit comments

Comments
 (0)