Skip to content

Latest commit





Folders and files

Last commit message
Last commit date

parent directory


Flattening JSON Kafka Messages


KSQL can be used to flatten the schema of data in a Kafka message. This can be useful when a downstream system requires the schema to be flat, and not nested. For example you may have data on a Kafka topic that looks like this:

  "user": {
    "first_name": "Lars",
    "last_name": "Treagus",
    "email": "[email protected]"
  "ip_address": "",
  "logon_date": "2018-02-05T19:45:59Z"

You can use KSQL to process every message as it arrives on the source topic and write it to a new Kafka topic with the nesting removed so that that the message looks like this:

  "user_first_name": "Lars",
  "user_last_name": "Treagus",
  "user_email": "[email protected]"
  "ip_address": "",
  "logon_date": "2018-02-05T19:45:59Z"


  • Docker

  • If running on Mac/Windows, at least 4GB allocated to Docker:

    docker system info | grep Memory

    Should return a value greater than 8GB - if not, the Kafka stack will probably not work.

Try it at home!

Minimum version is Confluent Platform 5.0

  1. Clone this repository

    git clone
  2. Launch:

    cd ksql-recipes-try-it-at-home/flattening-json-kafka-messages
    docker-compose up -d
  3. Run KSQL CLI:

    docker-compose exec ksql-cli ksql http://ksql-server:8088
  4. Register the existing user_logons topic for use as a KSQL stream called user_logons. Note the STRUCT data type for the nested field:

    CREATE STREAM user_logons \
            (user      STRUCT<\
                              first_name VARCHAR, \
                              last_name  VARCHAR, \
                              email      VARCHAR>, \
            ip_address VARCHAR, \
            logon_date VARCHAR) \
            WITH (KAFKA_TOPIC='user_logons', \
  5. Optionally, inspect the first few messages as they arrive:

    SELECT * FROM user_logons LIMIT 5;
  6. Write the flattened structure as a new Kafka topic, updated continually from new messages arriving on the source topic. Note the use of the operator to access the nested columns.

    CREATE STREAM user_logons_all_cols \
            WITH (KAFKA_TOPIC='user_logons_flat') AS \
            SELECT user->first_name AS USER_FIRST_NAME, \
                    user->last_name AS USER_LAST_NAME, \
                    user->email AS USER_EMAIL, \
                    ip_address, \
                    logon_date \
                    FROM user_logons;

    Note how the target Kafka topic is explicitly set. Without KAFKA_TOPIC specified, the name of the stream will be used.

    The new stream populates a Kafka topic. You can see this from LIST TOPICS:

    ksql> LIST TOPICS;
     Kafka Topic        | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
     user_logons        | true       | 1          | 1                  | 1         | 1
     user_logons_flat   | true       | 4          | 1                  | 0         | 0

    The contents of the topic can be viewed by any Kafka client, or simply with PRINT from KSQL:

    ksql> PRINT 'user_logons_flat';
    {"ROWTIME":1547205974896,"ROWKEY":"null","USER_FIRST_NAME":"Hetti","USER_LAST_NAME":"Debrett","USER_EMAIL":"[email protected]","IP_ADDRESS":"","LOGON_DATE":"2017-11-17T06:26:31Z"}

    Press Ctrl-C to exit the PRINT command.