KSQL can be used to flatten the schema of data in a Kafka message. This can be useful when a downstream system requires the schema to be flat, and not nested. For example you may have data on a Kafka topic that looks like this:
{
"user": {
"first_name": "Lars",
"last_name": "Treagus",
"email": "[email protected]"
},
"ip_address": "242.115.235.56",
"logon_date": "2018-02-05T19:45:59Z"
}
You can use KSQL to process every message as it arrives on the source topic and write it to a new Kafka topic with the nesting removed so that that the message looks like this:
{
"user_first_name": "Lars",
"user_last_name": "Treagus",
"user_email": "[email protected]"
"ip_address": "242.115.235.56",
"logon_date": "2018-02-05T19:45:59Z"
}
-
Docker
-
If running on Mac/Windows, at least 4GB allocated to Docker:
docker system info | grep Memory
Should return a value greater than 8GB - if not, the Kafka stack will probably not work.
Minimum version is Confluent Platform 5.0
-
Clone this repository
git clone https://github.com/confluentinc/ksql-recipes-try-it-at-home.git
-
Launch:
cd ksql-recipes-try-it-at-home/flattening-json-kafka-messages docker-compose up -d
-
Run KSQL CLI:
docker-compose exec ksql-cli ksql http://ksql-server:8088
-
Register the existing
user_logons
topic for use as a KSQL stream calleduser_logons
. Note theSTRUCT
data type for the nested field:CREATE STREAM user_logons \ (user STRUCT<\ first_name VARCHAR, \ last_name VARCHAR, \ email VARCHAR>, \ ip_address VARCHAR, \ logon_date VARCHAR) \ WITH (KAFKA_TOPIC='user_logons', \ VALUE_FORMAT='JSON');
-
Optionally, inspect the first few messages as they arrive:
SELECT * FROM user_logons LIMIT 5;
-
Write the flattened structure as a new Kafka topic, updated continually from new messages arriving on the source topic. Note the use of the
→
operator to access the nested columns.CREATE STREAM user_logons_all_cols \ WITH (KAFKA_TOPIC='user_logons_flat') AS \ SELECT user->first_name AS USER_FIRST_NAME, \ user->last_name AS USER_LAST_NAME, \ user->email AS USER_EMAIL, \ ip_address, \ logon_date \ FROM user_logons;
Note how the target Kafka topic is explicitly set. Without
KAFKA_TOPIC
specified, the name of the stream will be used.The new stream populates a Kafka topic. You can see this from
LIST TOPICS
:ksql> LIST TOPICS; Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups ------------------------------------------------------------------------------------------------ user_logons | true | 1 | 1 | 1 | 1 user_logons_flat | true | 4 | 1 | 0 | 0
The contents of the topic can be viewed by any Kafka client, or simply with
PRINT
from KSQL:ksql> PRINT 'user_logons_flat'; Format:JSON {"ROWTIME":1547205974896,"ROWKEY":"null","USER_FIRST_NAME":"Hetti","USER_LAST_NAME":"Debrett","USER_EMAIL":"[email protected]","IP_ADDRESS":"115.102.56.33","LOGON_DATE":"2017-11-17T06:26:31Z"}
Press Ctrl-C to exit the
PRINT
command.