|
| 1 | +--- |
| 2 | +keywords: [streaming process, flow management, Flownode components, Flownode limitations, batching mode] |
| 3 | +description: Overview of Flownode's batching mode, a component providing continuous data aggregation capabilities to the database, including its architecture and query execution flow. |
| 4 | +--- |
| 5 | + |
| 6 | +# Flownode Batching Mode Developer Guide |
| 7 | + |
| 8 | +This guide provides a brief overview of the batching mode in `flownode`. It's intended for developers who want to understand the internal workings of this mode. |
| 9 | + |
| 10 | +## Overview |
| 11 | + |
| 12 | +The batching mode in `flownode` is designed for continuous data aggregation. It periodically executes a user-defined SQL query over small, discrete time windows. This is in contrast to a streaming mode where data is processed as it arrives. |
| 13 | + |
| 14 | +The core idea is to: |
| 15 | +1. Define a `flow` with a SQL query that aggregates data from a source table into a sink table. |
| 16 | +2. The query typically includes a time window function (e.g., `date_bin`) on a timestamp column. |
| 17 | +3. When new data is inserted into the source table, the system marks the corresponding time windows as "dirty." |
| 18 | +4. A background task periodically wakes up, identifies these dirty windows, and re-runs the aggregation query for those specific time ranges. |
| 19 | +5. The results are then inserted into the sink table, effectively updating the aggregated view. |
| 20 | + |
| 21 | +## Architecture |
| 22 | + |
| 23 | +The batching mode consists of several key components that work together to achieve this continuous aggregation. As shown in the diagram below: |
| 24 | + |
| 25 | + |
| 26 | + |
| 27 | +### `BatchingEngine` |
| 28 | + |
| 29 | +The `BatchingEngine` is the heart of the batching mode. It's a central component that manages all active flows. Its primary responsibilities are: |
| 30 | + |
| 31 | +- **Task Management**: It maintains a map of `FlowId` to `BatchingTask`. It handles the creation, deletion, and retrieval of these tasks. |
| 32 | +- **Event Dispatching**: When new data arrives (via `handle_inserts_inner`) or when time windows are explicitly marked as dirty (`handle_mark_dirty_time_window`), the `BatchingEngine` identifies which flows are affected and forwards the information to the corresponding `BatchingTask`s. |
| 33 | + |
| 34 | +### `BatchingTask` |
| 35 | + |
| 36 | +A `BatchingTask` represents a single, independent data flow. Each task is associated with one `flow` definition and runs in its own asynchronous loop. |
| 37 | + |
| 38 | +- **Configuration (`TaskConfig`)**: This struct holds the immutable configuration for a flow, such as the SQL query, source and sink table names, and time window expression. |
| 39 | +- **State (`TaskState`)**: This contains the dynamic, mutable state of the task, most importantly the `DirtyTimeWindows`. |
| 40 | +- **Execution Loop**: The task runs an infinite loop (`start_executing_loop`) that: |
| 41 | + 1. Checks for a shutdown signal. |
| 42 | + 2. Waits for a scheduled interval or until it's woken up. |
| 43 | + 3. Generates a new query plan (`gen_insert_plan`) based on the current set of dirty time windows. |
| 44 | + 4. Executes the query (`execute_logical_plan`) against the database. |
| 45 | + 5. Cleans up the processed dirty windows. |
| 46 | + |
| 47 | +### `TaskState` and `DirtyTimeWindows` |
| 48 | + |
| 49 | +- **`TaskState`**: This struct tracks the runtime state of a `BatchingTask`. It includes `dirty_time_windows`, which is crucial for determining what work needs to be done. |
| 50 | +- **`DirtyTimeWindows`**: This is a key data structure that keeps track of which time windows have received new data since the last query execution. It stores a set of non-overlapping time ranges. When a task's execution loop runs, it consults this structure to build a `WHERE` clause that filters the source table for only the dirty time windows. |
| 51 | + |
| 52 | +### `TimeWindowExpr` |
| 53 | + |
| 54 | +The `TimeWindowExpr` is a helper utility for dealing with time window functions like `TUMBLE`. |
| 55 | + |
| 56 | +- **Evaluation**: It can take a timestamp and evaluate the time window expression to determine the start and end of the window that the timestamp falls into. |
| 57 | +- **Window Size**: It can also determine the size (duration) of the time window from the expression. |
| 58 | + |
| 59 | +This is essential for both marking windows as dirty and for generating the correct filter conditions when querying the source table. |
| 60 | + |
| 61 | +## Query Execution Flow |
| 62 | + |
| 63 | +Here's a simplified step-by-step walkthrough of how a query is executed in batch mode: |
| 64 | + |
| 65 | +1. **Data Ingestion**: New data is written to a source table. |
| 66 | +2. **Marking Dirty**: The `BatchingEngine` receives a notification about the new data. It uses the `TimeWindowExpr` associated with each relevant flow to determine which time windows are affected by the new data points. These windows are then added to the `DirtyTimeWindows` set in the corresponding `TaskState`. |
| 67 | +3. **Task Wake-up**: The `BatchingTask`'s execution loop wakes up, either due to its periodic schedule or because it was notified of a large backlog of dirty windows. |
| 68 | +4. **Plan Generation**: The task calls `gen_insert_plan`. This method: |
| 69 | + - Inspects the `DirtyTimeWindows`. |
| 70 | + - Generates a series of `OR`'d `WHERE` clauses (e.g., `(ts >= 't1' AND ts < 't2') OR (ts >= 't3' AND ts < 't4') ...`) that cover the dirty windows. |
| 71 | + - Rewrites the original SQL query to include this new filter, ensuring that only the necessary data is processed. |
| 72 | +5. **Execution**: The modified query plan is sent to the `Frontend` for execution. The database processes the aggregation on the filtered data. |
| 73 | +6. **Upsert**: The results are inserted into the sink table. The sink table is typically defined with a primary key that includes the time window column, so new results for an existing window will overwrite (upsert) the old ones. |
| 74 | +7. **State Update**: The `DirtyTimeWindows` set is cleared of the windows that were just processed. The task then goes back to sleep until the next interval. |
0 commit comments