Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions docs/integrate/kafka/attic.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
---
orphan: true
---
:::{rubric} About
:::

[Apache Kafka] is an open-source distributed event streaming platform used by
thousands of companies for high-performance data pipelines, streaming analytics,
data integration, and mission-critical applications.

:::{dropdown} **Managed Kafka**
Several companies provide managed Kafka services (see the [overview of managed Kafka offerings]
for examples; note that offerings and features change frequently).

- [Aiven for Apache Kafka]
- [Amazon Managed Streaming for Apache Kafka (MSK)]
- [Apache Kafka on Azure]
- [Azure Event Hubs for Apache Kafka] (Kafka protocol–compatible service, not Apache Kafka)
- [Confluent Cloud]
- [DoubleCloud Managed Service for Apache Kafka]
:::


[Aiven for Apache Kafka]: https://aiven.io/kafka
[Amazon Managed Streaming for Apache Kafka (MSK)]: https://aws.amazon.com/msk/
[Apache Kafka]: https://kafka.apache.org/
[Apache Kafka on Azure]: https://azuremarketplace.microsoft.com/marketplace/consulting-services/canonical.0001-com-ubuntu-managed-kafka
[Azure Event Hubs for Apache Kafka]: https://learn.microsoft.com/en-us/azure/event-hubs/azure-event-hubs-kafka-overview
[Confluent Cloud]: https://www.confluent.io/confluent-cloud/
[DoubleCloud Managed Service for Apache Kafka]: https://double.cloud/services/managed-kafka/
[overview of managed Kafka offerings]: https://keen.io/blog/managed-apache-kafka-vs-diy/
162 changes: 162 additions & 0 deletions docs/integrate/kafka/docker-python.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
(kafka-docker-python)=
# Using Kafka with Python

This walkthrough demonstrates how to load data from a Kafka topic into a
CrateDB table, using a Python consumer and CrateDB's HTTP interface.

## Starting services

Start Kafka and CrateDB using Docker Compose.
```yaml
# docker-compose.yml
services:
kafka:
image: bitnami/kafka:latest
container_name: kafka
environment:
- KAFKA_CFG_NODE_ID=1
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
- KAFKA_ENABLE_KRAFT=yes
- ALLOW_PLAINTEXT_LISTENER=yes
ports:
- "9092:9092"
networks: [ demo ]

cratedb:
image: crate:latest
container_name: cratedb
command: ["crate", "-Ccluster.name=crate-demo", "-Chttp.cors.enabled=true", "-Chttp.cors.allow-origin=*"]
ports:
- "4200:4200" # HTTP API / Admin UI
- "5432:5432" # PostgreSQL wire protocol
networks: [ demo ]

networks:
demo: {}
```

```shell
docker compose up -d
```

## Provisioning CrateDB and Kafka

* CrateDB Admin UI: `http://localhost:4200`
* Kafka broker (inside-compose hostname): kafka:9092

### Create a demo table in CrateDB

The easiest way to do this is through the CrateDB Admin UI at `http://localhost:4200` and execute this using the console:

```sql
CREATE TABLE IF NOT EXISTS sensor_readings (
device_id TEXT, ts TIMESTAMPTZ,
temperature DOUBLE PRECISION,
humidity DOUBLE PRECISION,
PRIMARY KEY (device_id, ts)
);
```

But this can also be done using `curl`:

```bash
curl -sS -H 'Content-Type: application/json' \
-X POST http://localhost:4200/_sql \
-d '{
"stmt":"CREATE TABLE IF NOT EXISTS sensor_readings (device_id TEXT, ts TIMESTAMPTZ, temperature DOUBLE PRECISION, humidity DOUBLE PRECISION, PRIMARY KEY (device_id, ts))"
}'
```

### Create a Kafka topic and send a couple of messages

Creating a Kafka topic can be done in several ways, we are selecting to use
`docker exec` in this way:

```bash
docker exec -it kafka kafka-topics.sh --create \
--topic sensors --bootstrap-server kafka:9092 --partitions 3 --replication-factor 1

docker exec -it kafka kafka-console-producer.sh \
--bootstrap-server kafka:9092 --topic sensors <<'EOF'
{"device_id":"alpha","ts":"2025-08-19T12:00:00Z","temperature":21.4,"humidity":48.0}
{"device_id":"alpha","ts":"2025-08-19T12:01:00Z","temperature":21.5,"humidity":47.6}
{"device_id":"beta","ts":"2025-08-19T12:00:00Z","temperature":19.8,"humidity":55.1}
EOF
```

Messages are newline-delimited JSON for simplicity.

## Loading data

### Create a simple consumer using Python

```python
# quick_consumer.py
import json, requests
from confluent_kafka import Consumer

c = Consumer({
"bootstrap.servers": "localhost:9092",
"group.id": "demo",
"auto.offset.reset": "earliest",
})
c.subscribe(["sensors"])

SQL_ENDPOINT = "http://localhost:4200/_bulk"

def insert_batch(rows):
# Bulk insert via HTTP; one statement, many parameter sets
body = {
"stmt": "INSERT INTO sensor_readings (device_id, ts, temperature, humidity) VALUES (?, ?, ?, ?) ON CONFLICT (device_id, ts) DO UPDATE SET temperature = EXCLUDED.temperature, humidity = EXCLUDED.humidity",
"bulk_args": rows
}
r = requests.post(SQL_ENDPOINT, json=body, timeout=10)
r.raise_for_status()

batch = []
try:
while True:
msg = c.poll(1.0)
if msg is None:
if batch: insert_batch(batch); batch.clear()
continue
if msg.error():
print("Kafka error:", msg.error());
continue

rec = json.loads(msg.value())
batch.append([rec["device_id"], rec["ts"], rec["temperature"], rec["humidity"]])

if len(batch) >= 500:
insert_batch(batch)
batch.clear()
finally:
if batch: insert_batch(batch)
c.close()
```

Run this with:

```bash
pip install confluent-kafka requests
python quick_consumer.py
```

:::{tip}
This shows the custom client path: transform/filter as you like, do idempotent upserts on (device\_id, ts), and batch writes for speed.
:::

## Verifying the data

Use `curl` to submit a `SELECT` statement that verifies data has been stored in CrateDB.
```bash
curl -sS -H 'Content-Type: application/json' -X POST http://localhost:4200/_sql \
-d '{"stmt":"SELECT device_id, ts, temperature, humidity FROM sensor_readings ORDER BY ts LIMIT 10"}' \
| jq
```
112 changes: 88 additions & 24 deletions docs/integrate/kafka/index.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
---
description: >-
Apache Kafka is a widely used open-source distributed event-store and
streaming platform.
---
(kafka)=
# Kafka

Expand All @@ -6,39 +11,104 @@
<br>
<a href="https://github.com/crate/cratedb-examples/actions/workflows/framework-flink-kafka-java.yml" target="_blank" rel="noopener noreferrer">
<img src="https://img.shields.io/github/actions/workflow/status/crate/cratedb-examples/framework-flink-kafka-java.yml?branch=main&label=Apache%20Kafka,%20Apache%20Flink" loading="lazy" alt="CI status: Apache Kafka, Apache Flink"></a>
<a href="https://github.com/crate/cratedb-examples/actions/workflows/application-ingestr.yml" target="_blank" rel="noopener noreferrer">
<img src="https://img.shields.io/github/actions/workflow/status/crate/cratedb-examples/application-ingestr.yml?branch=main&label=Ingestr%2BKafka" loading="lazy" alt="CI status: Ingestr + Kafka"></a>
```
```{div} .clearfix
```

:::{include} /_include/links.md
:::

:::{rubric} About
:::{div} sd-text-muted
Apache Kafka is a widely used open-source distributed event-store and streaming platform.
:::

[Apache Kafka] is an open-source distributed event streaming platform used by
thousands of companies for high-performance data pipelines, streaming analytics,
data integration, and mission-critical applications.

:::{dropdown} **Managed Kafka**
Several companies provide managed Kafka services (see the [overview of managed Kafka offerings]
for a more complete list).
## Overview

- [Aiven for Apache Kafka]
- [Amazon Managed Streaming for Apache Kafka (MSK)]
- [Apache Kafka on Azure]
- [Azure Event Hubs for Apache Kafka]
- [Confluent Cloud]
- [DoubleCloud Managed Service for Apache Kafka]
:::
[Apache Kafka] is a distributed event log for high-throughput, durable, and scalable data streams. CrateDB is a distributed SQL database optimized for time-series, IoT, and analytics at scale. Together, they form a robust pipeline for moving operational events from producers into a queryable store with SQL and real-time analytics.

## Benefits of CrateDB + Apache Kafka

* **Buffering & decoupling** – Kafka absorbs bursty writes and isolates producers from database load. This is particularly useful when it comes to heavy-load ingestion scenarios.
* **Scalability end-to-end** – Partitioned topics and a sharded cluster let you scale producers, brokers, consumers, and CrateDB independently.
* **Near-real-time analytics** – New events are available in CrateDB seconds (or even milliseconds) after production, exposed via SQL to standard BI tools.
* **Operational resilience** – Use Kafka as a durable buffer between producers and CrateDB. Idempotent upserts reduce duplication risks and improve recovery from retries.

:::{rubric} Learn
## Common Ingestion Options

:::{important}
The Apache Kafka PostgreSQL connector is largely compatible with CrateDB. Note that CrateDB does not support transactions; this generally matters only during failures or connection issues that require retries or rollbacks.
:::

::::{grid}
### Kafka Connect → CrateDB (recommended for most)

Use a sink connector to map Kafka topics directly to CrateDB tables, with built-in support for batching, upserts, and retries.&#x20;

This approach also lets you leverage the rich ecosystem of Kafka Connect and Flink connectors — not just for other databases, but also for ERP, CRM, social media platforms, and many other systems.

### Custom consumer application

Instead of using Kafka Connect, you can write your own Kafka consumer in Java, Python, Rust, or another language. The client reads records from Kafka and writes them into CrateDB using standard SQL (over the PostgreSQL wire protocol) or via the HTTP API.

This option gives you full control: you can transform data on the fly, filter out unwanted events, or route records into different CrateDB tables based on business logic. It’s usually chosen when you need flexibility beyond what a connector offers, but it does mean you’re responsible for batching, retries, and error handling yourself.

### Stream processors (Flink/Kafka Streams/Spark)

For more advanced pipelines, you can process events while they’re still in Kafka before they ever reach CrateDB. Frameworks like Flink, Kafka Streams, or Spark let you enrich records, join multiple streams together, run aggregations, or apply windowing functions in real time.

The processed results are then written into CrateDB, where they’re immediately available for SQL queries and dashboards. This approach is powerful when raw events need to be cleaned, combined, or summarised before storing them, though it adds moving parts compared to a simple connector.

## Typical use cases

* **Time-series pipelines (sensors, logs, metrics, events)**

Stream high-volume data from IoT devices, application logs, or monitoring systems into Kafka, then land it in CrateDB for storage and real-time querying. Ideal for scenarios where you need to keep years of historical data but still run live analytics on the latest events.
* **CDC / operational data feeds (Debezium → Kafka → CrateDB)**

Capture data changes from operational databases using Debezium and push them into Kafka. CrateDB acts as the analytics layer, letting you query an always-up-to-date view of your transactional data without putting load on the source systems.
* **Real-time dashboards, anomaly detection, and alerting**

Combine Kafka’s streaming capabilities with CrateDB’s fast SQL engine to power dashboards that update in near real time. Detect unusual patterns (e.g. equipment failure, fraud, traffic spikes) and trigger alerts directly from queries over live data.
* **ETL / ELT landing zone for downstream analytics**

Use Kafka as the ingestion backbone and CrateDB as a staging or analytics store. Raw events can be enriched or aggregated in-flight, written to CrateDB for exploration, and later exported to long-term storage or data warehouses for deeper batch analytics.

## Deployment options

How you run Kafka and CrateDB depends a lot on your environment and preferences. The most common approaches are:

* **Containerised on-premise** – Run both Kafka and CrateDB on Docker or Kubernetes in your own data centre or private cloud. This gives you the most control, but also means you manage scaling, upgrading, and monitoring.
* **Managed Kafka services** – Use a provider such as Confluent Cloud or AWS MSK to offload Kafka operations. Some services (e.g., Azure Event Hubs) provide Kafka‑compatible endpoints rather than Kafka itself. Any of these can connect to a CrateDB deployment you operate or to CrateDB Cloud.
* **Managed CrateDB** – Crate\.io offers CrateDB Cloud, which can pair with either self-managed Kafka or managed Kafka services. This option reduces database operations to a minimum.
* **Hybrid setups** – A common pattern is managed Kafka + self-managed CrateDB, or vice versa, depending on where you want to keep operational control.

In practice, teams usually start containerised (for dev/test or early projects) and move to managed services as scale or reliability requirements grow.

## Key design considerations

* **Topic & partition strategy** – Align Kafka partitions with expected throughput and consumer parallelism - aim for stable keys (e.g., device\_id) to keep ordering where needed.
* **Table modelling in CrateDB** – Choose primary keys and partitioning (e.g., by month on a timestamp column) to balance write speed and query performance - define shard count per table.
* **Upserts & deduplication** – Include a stable event key (id, source+timestamp) to make writes idempotent when possible.
* **Batching & back-pressure** – Tune sink batch size and retries to match CrateDB ingest capacity while keeping latency low.
* **Schema & types** – Keep payloads consistent, map Avro/JSON types carefully to CrateDB types (timestamps/time zones, numerics, arrays).
* **Retention & replay** – Kafka retention defines how far back you can reprocess, plan storage and compaction accordingly.
* **Observability** – Monitor producer lag, consumer lag, sink error rates, CrateDB shard health, and query latency.

## Learn more

::::{grid} 2
:gutter: 2

:::{grid-item-card} Tutorial: Use Docker and Python
:link: kafka-docker-python
:link-type: ref
This walkthrough demonstrates how to load data from a Kafka topic into a
CrateDB table, using a Python consumer and CrateDB's HTTP interface.
:::

:::{grid-item-card} Tutorial: Build a data ingestion pipeline
:::{grid-item-card} Tutorial: Use Confluent Kafka Connect
:link: kafka-connect
:link-type: ref
The tutorial explains how to build a data ingestion pipeline using Apache
Expand All @@ -61,6 +131,7 @@ An executable stack with Apache Kafka, Apache Flink, and CrateDB. Uses Java.

```{toctree}
:hidden:
docker-python
kafka-connect
```

Expand All @@ -69,12 +140,5 @@ kafka-connect
```


[Aiven for Apache Kafka]: https://aiven.io/kafka
[Amazon Managed Streaming for Apache Kafka (MSK)]: https://aws.amazon.com/msk/
[Apache Kafka]: https://kafka.apache.org/
[Apache Kafka on Azure]: https://azuremarketplace.microsoft.com/marketplace/consulting-services/canonical.0001-com-ubuntu-managed-kafka
[Azure Event Hubs for Apache Kafka]: https://learn.microsoft.com/en-us/azure/event-hubs/azure-event-hubs-kafka-overview
[Confluent Cloud]: https://www.confluent.io/confluent-cloud/
[CrateDB and Apache Kafka]: https://cratedb.com/integrations/cratedb-and-kafka
[DoubleCloud Managed Service for Apache Kafka]: https://double.cloud/services/managed-kafka/
[overview of managed Kafka offerings]: https://keen.io/blog/managed-apache-kafka-vs-diy/
6 changes: 3 additions & 3 deletions docs/integrate/kafka/kafka-connect.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
(kafka-connect)=
# Data Ingestion Pipeline with Kafka and CrateDB
# Using Confluent Kafka Connect

This guide describes a dockerized procedure for integrating CrateDB with Kafka
Connect. By following these steps, you will set up a pipeline to ingest data
Expand Down Expand Up @@ -233,15 +233,15 @@ data transfer:
Launch all the services defined in the `docker-compose.yml` file:

```bash
docker-compose up -d
docker compose up -d
```

## Verify the Running Containers

Ensure that all services are up and running by listing the active containers:

```bash
docker-compose ps
docker compose ps
```

You should see the following containers:
Expand Down