|
| 1 | +# Tutorial: Using as-of joins to enrich telemetry data of the solar panels. |
| 2 | + |
| 3 | +In this tutorial, we will use as-of joins to enrich metrics of the solar panels with the weather forecast data. |
| 4 | +The enriched data can be used to optimize battery management and better analyze the performance of the panel (for example, to predict battery discharging during cloudy days). |
| 5 | + |
| 6 | + |
| 7 | +## What You Will Learn |
| 8 | + |
| 9 | +This example will show how to use a Quix Streams `Application` to enrich data from one Kafka topic using another. |
| 10 | + |
| 11 | + |
| 12 | +## Outline of the Problem |
| 13 | + |
| 14 | +For our example, we have a solar battery farm generating electricity. While running, the batteries emit their current power output and internal temperature several times a second. |
| 15 | + |
| 16 | +We want to merge the telemetry data with the current weather forecast in order to better analyze and predict the farm performance, all in real-time. |
| 17 | + |
| 18 | + |
| 19 | +## Our Example |
| 20 | + |
| 21 | +We will use two [Quix Streams `Sources`](../../connectors/sources/README.md): |
| 22 | + |
| 23 | +- One to generate mock telemetry measurements for 3 solar panels (the "telemetry" topic). |
| 24 | +- And another one generating a forecast data once in 30s for the given location (the "forecast" topic). |
| 25 | + |
| 26 | +We need to match the telemetry events with the latest effective forecasts in order to |
| 27 | +have as fresh data as possible. |
| 28 | + |
| 29 | +The data will be processed by the Solar Farm `Application`, which will do the join and send the results to the output topic. |
| 30 | + |
| 31 | + |
| 32 | +## Before Getting Started |
| 33 | + |
| 34 | +1. You will see links scattered throughout this tutorial. |
| 35 | + - Tutorial code links are marked **>>> LIKE THIS <<<** . |
| 36 | + - ***All other links provided are completely optional***. |
| 37 | + - They are great ways to learn more about various concepts if you need it! |
| 38 | + |
| 39 | +2. This tutorial uses a [`Source`](../../connectors/sources/README.md) rather than a Kafka [`Topic`]() to ingest data. |
| 40 | + - `Source` connectors enable reading data from a non-Kafka origin (typically to get it into Kafka). |
| 41 | + - This approach circumvents users having to run a [producer](../../producer.md) alongside the `Application`. |
| 42 | + - A `Source` is easily replaced with an actual Kafka topic (just pass a `Topic` instead of a `Source`). |
| 43 | + |
| 44 | + |
| 45 | + |
| 46 | +## Generating Sample Data |
| 47 | + |
| 48 | +In our [**>>> Enrichment Application <<<**](tutorial_app.py), we use two `Sources`: |
| 49 | + |
| 50 | +- `WeatherForecastGenerator` that generates weather forecast data once in 30s. |
| 51 | +Format: |
| 52 | +`{"timestamp": <float>, "forecast_temp": <float>, "forecast_cloud": <float>}` |
| 53 | +<br> |
| 54 | +<br> |
| 55 | + |
| 56 | +- `BatteryTelemetryGenerator` that generates battery measurements for three solar panels every second. |
| 57 | +Format: |
| 58 | +`{"timestamp": <float>, "panel_id": <str>, "location_id": <str>, "temperature_C": <float>, "power_watt": <float>}` |
| 59 | +<br> |
| 60 | +<br> |
| 61 | + |
| 62 | +Each Source will have **a unique name** which is used as a part of the underlying topic name. |
| 63 | + |
| 64 | +Both datasets will use `location_id` as a message key - it is an important part of the join operation. |
| 65 | + |
| 66 | + |
| 67 | +## Enrichment Application |
| 68 | + |
| 69 | +Now let's go over the `main()` portion of our |
| 70 | +[**>>> Enrichment Application <<<**](tutorial_app.py) in detail! |
| 71 | + |
| 72 | + |
| 73 | +### Create an Application |
| 74 | + |
| 75 | +Create a [Quix Streams Application](../../configuration.md), which is our constructor for everything! |
| 76 | + |
| 77 | +We provide it our connection settings, consumer group (ideally unique per Application), |
| 78 | +and where the consumer group should start from on the (internal) Source topic. |
| 79 | + |
| 80 | +!!! TIP |
| 81 | + |
| 82 | + Once you are more familiar with Kafka, we recommend |
| 83 | + [learning more about auto_offset_reset](https://www.quix.io/blog/kafka-auto-offset-reset-use-cases-and-pitfalls). |
| 84 | + |
| 85 | +#### Our Application |
| 86 | + |
| 87 | +```python |
| 88 | +import os |
| 89 | +from quixstreams import Application |
| 90 | + |
| 91 | +app = Application( |
| 92 | + broker_address=os.getenv("BROKER_ADDRESS", "localhost:9092"), |
| 93 | + consumer_group="temperature_alerter", |
| 94 | + auto_offset_reset="earliest", |
| 95 | + # Disable changelog topics for this app, but it's recommended to keep them "on" in production |
| 96 | + use_changelog_topics=False |
| 97 | +) |
| 98 | +``` |
| 99 | + |
| 100 | + |
| 101 | + |
| 102 | +### Specify Topics |
| 103 | + |
| 104 | +`Application.topic()` returns [`Topic`](../../api-reference/topics.md) objects which are used by `StreamingDataFrame`. |
| 105 | + |
| 106 | +Create one for each topic used by your `Application`. |
| 107 | + |
| 108 | +!!! NOTE |
| 109 | + |
| 110 | + Any missing topics will be automatically created for you upon running an `Application`. |
| 111 | + |
| 112 | + |
| 113 | +#### Our Topics |
| 114 | +We have one output topic, named `"telemetry-with-forecast"`: |
| 115 | + |
| 116 | +```python |
| 117 | +output_topic = app.topic(name="telemetry-with-forecast") |
| 118 | +``` |
| 119 | + |
| 120 | + |
| 121 | + |
| 122 | +### The StreamingDataFrames (SDF) |
| 123 | + |
| 124 | +Now we need to define our [StreamingDataFrame](../../processing.md#introduction-to-streamingdataframe), often shorthanded to "SDF". |
| 125 | + |
| 126 | +SDF allows manipulating the message value in a dataframe-like fashion using various operations. |
| 127 | + |
| 128 | +After initializing with either a `Topic` or `Source`, we continue reassigning to the |
| 129 | +same `sdf` variable as we add operations. |
| 130 | + |
| 131 | +!!! NOTE |
| 132 | + |
| 133 | + A few `StreamingDataFrame` operations are |
| 134 | + ["in-place"](../../advanced/dataframe-assignments.md#valid-in-place-operations), |
| 135 | + like `.print()`. |
| 136 | + |
| 137 | +#### Initializing the dataframes |
| 138 | + |
| 139 | +```python |
| 140 | +telemetry_sdf = app.dataframe(source=BatteryTelemetryGenerator(name="telemetry")) |
| 141 | +forecast_sdf = app.dataframe(source=WeatherForecastGenerator(name="forecast")) |
| 142 | +``` |
| 143 | + |
| 144 | +First, we initialize our SDF with our `BatteryTelemetryGenerator` and `WeatherForecastGenerator` sources, |
| 145 | +which means we will be consuming data from a non-Kafka origin. |
| 146 | + |
| 147 | + |
| 148 | +!!! TIP |
| 149 | + |
| 150 | + You can consume from a Kafka topic instead by passing a `Topic` object |
| 151 | + with `app.dataframe(topic=<Topic>)`. |
| 152 | + |
| 153 | +Let's go over the SDF operations in this example in detail. |
| 154 | + |
| 155 | + |
| 156 | +### Joining the Dataframes |
| 157 | + |
| 158 | +```python |
| 159 | +from datetime import timedelta |
| 160 | + |
| 161 | + |
| 162 | +def merge_events(telemetry: dict, forecast: dict) -> dict: |
| 163 | + """ |
| 164 | + Merge the matching events into a new one |
| 165 | + """ |
| 166 | + forecast = {"forecast." + k: v for k, v in forecast.items()} |
| 167 | + return {**telemetry, **forecast} |
| 168 | + |
| 169 | +# Join the telemetry data with the latest effective forecasts (forecast timestamp always <= telemetry timestamp) |
| 170 | +# using join_asof(). |
| 171 | +enriched_sdf = telemetry_sdf.join_asof( |
| 172 | + forecast_sdf, |
| 173 | + how="inner", # Join using "inner" strategy |
| 174 | + on_merge=merge_events, # Use a custom function to merge events together because of the overlapping keys |
| 175 | + grace_ms=timedelta(days=7), # Store forecast updates in state for 7d |
| 176 | +) |
| 177 | +``` |
| 178 | + |
| 179 | +Now we join the telemetry data with the forecasts in "as-of" manner, so each telemetry event is matched with the latest forecast effective at the time |
| 180 | +when the telemetry was produced. |
| 181 | + |
| 182 | +In particular: |
| 183 | + |
| 184 | +- We use `how="inner"` which means that the results are emitted only when the match is found. |
| 185 | +It can also be set to `how="left"` to emit records even if there is no matching forecast for the telemetry event. |
| 186 | + |
| 187 | +- We also provide a custom `merge_events` function to define how the join result will look like. |
| 188 | +It's an optional step if the column names in both dataframes don't overlap. |
| 189 | +In our case, the `timestamp` column is present on both sides, so we have to resolve it. |
| 190 | + |
| 191 | +- We set `grace_ms=timedelta(days=7)` to keep the forecast data in the state for at least 7 days. |
| 192 | +This interval can be changed if out-of-order data is expected in the stream (for example, some batteries produce telemetry with a large delay). |
| 193 | + |
| 194 | + |
| 195 | +#### How the joining works |
| 196 | +Because "join" is a stateful operation, and it requires access to multiple topic partitions within the same process, |
| 197 | +the dataframes must meet certain requirements to be joined: |
| 198 | + |
| 199 | +1. The underlying topics of the dataframes must have the same number of partitions. |
| 200 | +Quix Streams validates that when the `join_asof` is called. |
| 201 | +In this tutorial, both topics have one partition. |
| 202 | +<br> |
| 203 | +<br> |
| 204 | + |
| 205 | +2. The messages keys in these topics must be distributed across partitions using the same algorithm . |
| 206 | +In our case, messages are produced using the default built-in partitioner. |
| 207 | + |
| 208 | +Under the hood, `join_asof` works like this: |
| 209 | + |
| 210 | +- Records from the right side (`forecast_sdf`) are written to the state store without emitting any updates downstream. |
| 211 | +- Records on the left side (`telemetry_sdf`) query the forecasts store for the values with the same **key** and the **timestamp lower or equal to the left record's timestamp**. |
| 212 | + Left side emits data downstream. |
| 213 | + - If the match is found, the two records are merged together into a new one according to the `on_merge` logic. |
| 214 | +- The retention of the right store is controlled by the `grace_ms`: |
| 215 | + each "right" record bumps the maximum timestamp for this key in the store, and values with the same keys and timestamps below "<current timestamp> - <grace_ms>" are deleted. |
| 216 | + |
| 217 | + |
| 218 | +You can find more details on the [Joins](../../joins.md) page. |
| 219 | + |
| 220 | + |
| 221 | +### Printing the enriched data |
| 222 | + |
| 223 | +```python |
| 224 | +# Convert timestamps to strings for readbility |
| 225 | +enriched_sdf["timestamp"] = enriched_sdf["timestamp"].apply(timestamp_to_str) |
| 226 | +enriched_sdf["forecast.timestamp"] = enriched_sdf["forecast.timestamp"].apply( |
| 227 | + timestamp_to_str |
| 228 | +) |
| 229 | + |
| 230 | +# Print the enriched data |
| 231 | +enriched_sdf.print_table(live=False) |
| 232 | +``` |
| 233 | + |
| 234 | +Now we have a joined dataframe, and we want to verify that the data looks as we expect. |
| 235 | + |
| 236 | +We first convert timestamps from numbers to strings for readability, and the print the |
| 237 | +results as a table. |
| 238 | + |
| 239 | +The output should look like this: |
| 240 | + |
| 241 | +``` |
| 242 | +┏━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━┓ |
| 243 | +┃ _key ┃ _timestamp ┃ timestamp ┃ panel_id ┃ location_id ┃ temperature_C ┃ power_watt ┃ forecast.timestamp ┃ forecast.forecast_temp ┃ forecast.forecast_cloud ┃ |
| 244 | +┡━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━┩ |
| 245 | +│ b'location-1' │ 1748444438007 │ 2025-05-28T15:00:38.007725 │ panel-1 │ location-1 │ 21 │ 0.6 │ 2025-05-28T15:00:20.929200 │ 29 │ 88 │ |
| 246 | +│ b'location-1' │ 1748444438007 │ 2025-05-28T15:00:38.007855 │ panel-2 │ location-1 │ 17 │ 1.0 │ 2025-05-28T15:00:20.929200 │ 29 │ 88 │ |
| 247 | +│ b'location-1' │ 1748444438007 │ 2025-05-28T15:00:38.007886 │ panel-3 │ location-1 │ 35 │ 0.6 │ 2025-05-28T15:00:20.929200 │ 29 │ 88 │ |
| 248 | +│ b'location-1' │ 1748444439009 │ 2025-05-28T15:00:39.009509 │ panel-1 │ location-1 │ 20 │ 0.2 │ 2025-05-28T15:00:20.929200 │ 29 │ 88 │ |
| 249 | +│ b'location-1' │ 1748444439009 │ 2025-05-28T15:00:39.009870 │ panel-2 │ location-1 │ 23 │ 0.8 │ 2025-05-28T15:00:20.929200 │ 29 │ 88 │ |
| 250 | +└───────────────┴───────────────┴────────────────────────────┴──────────┴─────────────┴───────────────┴────────────┴────────────────────────────┴────────────────────────┴─────────────────────────┘ |
| 251 | +``` |
| 252 | + |
| 253 | +Now we can examine how the battery metrics correlate with the weather forecast in the area! |
| 254 | + |
| 255 | +### Producing the output messages |
| 256 | + |
| 257 | +```python |
| 258 | +# Produce results to the output topic |
| 259 | +enriched_sdf.to_topic(output_topic) |
| 260 | +``` |
| 261 | + |
| 262 | +To produce the enriched data, we call [`StreamingDataFrame.to_topic()`](../../processing.md#writing-data-to-kafka-topics) and pass the previously defined `output_topic` to it. |
| 263 | + |
| 264 | + |
| 265 | +### Running the Application |
| 266 | + |
| 267 | +Running a `Source`-based `Application` requires calling `Application.run()` within a |
| 268 | +`if __name__ == "__main__"` block. |
| 269 | + |
| 270 | +#### Our Application Run Block |
| 271 | + |
| 272 | +Our entire `Application` (and all its spawned objects) resides within a |
| 273 | +`main()` function, executed as required: |
| 274 | + |
| 275 | +```python |
| 276 | +if __name__ == "__main__": |
| 277 | + main() |
| 278 | +``` |
| 279 | + |
| 280 | + |
| 281 | +## Try it Yourself! |
| 282 | + |
| 283 | +### 1. Run Kafka |
| 284 | +First, have a running Kafka cluster. |
| 285 | + |
| 286 | +To easily run a broker locally with Docker, just [run this simple one-liner](../README.md#running-kafka-locally). |
| 287 | + |
| 288 | +### 2. Download files |
| 289 | +- [tutorial_app.py](tutorial_app.py) |
| 290 | + |
| 291 | +### 3. Install Quix Streams |
| 292 | +In your desired python environment, execute: `pip install quixstreams` |
| 293 | + |
| 294 | +### 4. Run the application |
| 295 | +In your desired python environment, execute: `python tutorial_app.py`. |
0 commit comments