Skip to content

stream-processing: getting-started: fluent-bit-sql: general cleanup #1627

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 48 additions & 90 deletions stream-processing/getting-started/fluent-bit-sql.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
# Fluent Bit + SQL
# Fluent Bit and SQL

Fluent Bit stream processor uses common SQL to perform record queries. The following section describe the features available and examples of it.
Stream processing in Fluent Bit uses SQL to perform record queries.

## Statements
For more information, see the [stream processing README file](https://github.com/fluent/fluent-bit/tree/master/src/stream_processor).

You can find the detailed query language syntax in BNF form [here](https://github.com/fluent/fluent-bit/tree/master/src/stream_processor). The following section will be a brief introduction on how to write SQL queries for Fluent Bit stream processing.
## Statements

### SELECT Statement
Use the following SQL statements in Fluent Bit.

#### Synopsis
### `SELECT`

```sql
SELECT results_statement
Expand All @@ -18,201 +18,159 @@ SELECT results_statement
[GROUP BY groupby]
```

#### Description
Groups keys from records that originate from a specified stream, or from records that match a specific tag pattern.

Select keys from records coming from a stream or records matching a specific Tag pattern. Note that a simple `SELECT` statement **not** associated from a stream creation will send the results to the standard output interface \(stdout\), useful for debugging purposes.
{% hint style="info" %}
A `SELECT` statement not associated with stream creation will send the results to the standard output interface, which can be helpful for debugging purposes.
{% endhint %}

The query allows filtering the results by applying a condition using `WHERE` statement. We will explain `WINDOW` and `GROUP BY` statements later in aggregation functions section.
You can filter the results of this query by applying a condition by using a `WHERE` statement. For information about the `WINDOW` and `GROUP BY` statements, see [Aggregation functions](#aggregation-functions).

#### Examples

Select all keys from records coming from a stream called _apache_:
Selects all keys from records that originate from a stream called `apache`:

```sql
SELECT * FROM STREAM:apache;
```

Select code key from records which Tag starts with _apache._:
Selects the `code` key from records with tags whose name begins with `apache`:

```sql
SELECT code AS http_status FROM TAG:'apache.*';
```

> Since the TAG selector allows the use of wildcards, we put the value between single quotes.

### CREATE STREAM Statement

#### Synopsis
### `CREATE STREAM`

```sql
CREATE STREAM stream_name
[WITH (property_name=value, [...])]
AS select_statement
```

#### Description

Create a new stream of data using the results from the `SELECT` statement. New stream created can be optionally re-ingested back into Fluent Bit pipeline if the property _Tag_ is set in the WITH statement.
Creates a new stream of data using the results from a `SELECT` statement. If the `Tag` property in the `WITH` statement is set, this new stream can optionally be re-ingested into the Fluent Bit pipeline.

#### Examples

Create a new stream called _hello_ from stream called _apache_:
Creates a new stream called `hello_` from a stream called `apache`:

```sql
CREATE STREAM hello AS SELECT * FROM STREAM:apache;
```

Create a new stream called hello for all records which original Tag starts with _apache_:
Creates a new stream called `hello` for all records whose original tag name begins with `apache`:

```sql
CREATE STREAM hello AS SELECT * FROM TAG:'apache.*';
```

## Aggregation Functions

Aggregation functions are used in `results_statement` on the keys, allowing to perform data calculation on groups of records. Group of records that aggregation functions apply on are determined by `WINDOW` keyword. When `WINDOW` is not specified, aggregation functions apply on the current buffer of records received, which may have non-deterministic number of elements. Aggregation functions can be applied on records in a window of a specific time interval \(see the syntax of `WINDOW` in select statement\).
## Aggregation functions

Fluent Bit streaming currently supports tumbling window, which is non-overlapping window type. That means, a window of size 5 seconds performs aggregation computations on records over a 5-second interval, and then starts new calculations for the next interval.
You can use aggregation functions in the `results_statement` on keys, which lets you perform data calculation on groups of records. These groups are determined by the `WINDOW` key. If `WINDOW` is unspecified, aggregation functions are applied to the current buffer of records received, which might have a non-deterministic number of elements. You can also apply aggregation functions to records in a window of a specific time interval.

In addition, the syntax support `GROUP BY` statement, which groups the results by the one or more keys, when they have the same values.
Fluent Bit uses a tumbling window, which is non-overlapping. For example, a window size of `5` performs aggregation computations on records during a five-second interval, then starts new calculations for the next interval.

### AVG
Additionally, you can use the `GROUP BY` statement to group results by one or more keys with matching values.

#### Synopsis
### `AVG`

```sql
SELECT AVG(size) FROM STREAM:apache WHERE method = 'POST' ;
```

#### Description

Calculates the average of request sizes in POST requests.
Calculates the average size of `POST` requests.

### COUNT

#### Synopsis
### `COUNT`

```sql
SELECT host, COUNT(*) FROM STREAM:apache WINDOW TUMBLING (5 SECOND) GROUP BY host;
SELECT host, COUNT(*) FROM STREAM:apache WINDOW TUMBLING (X SECOND) GROUP BY host;
```

#### Description

Count the number of records in 5 second windows group by host IP addresses.
Counts the number of records in a five-second window, grouped by host IP addresses.

### MIN

#### Synopsis
### `MIN`

```sql
SELECT MIN(key) FROM STREAM:apache;
```

#### Description

Gets the minimum value of a key in a set of records.
Returns the minimum value of a key in a set of records.

### MAX

#### Synopsis
### `MAX`

```sql
SELECT MIN(key) FROM STREAM:apache;
SELECT MAX(key) FROM STREAM:apache;
```
Returns the maximum value of a key in a set of records.

#### Description

Gets the maximum value of a key in a set of records.

### SUM

#### Synopsis
### `SUM`

```sql
SELECT SUM(key) FROM STREAM:apache;
```

#### Description

Calculates the sum of all values of key in a set of records.
Calculates the sum of all values of a key in a set of records.

## Time Functions

Time functions adds a new key into the record with timing data

### NOW
Use time functions to add a new key with time data into a record.

#### Synopsis
### `NOW`

```sql
SELECT NOW() FROM STREAM:apache;
```

#### Description
Adds the current system time to a record using the format `%Y-%m-%d %H:%M:%S`. Output example: `2019-03-09 21:36:05`.

Add system time using format: %Y-%m-%d %H:%M:%S. Output example: 2019-03-09 21:36:05.

### UNIX\_TIMESTAMP

#### Synopsis
### `UNIX_TIMESTAMP`

```sql
SELECT UNIX_TIMESTAMP() FROM STREAM:apache;
```

#### Description

Add current Unix timestamp to the record. Output example: 1552196165 .
Adds the current Unix time to a record. Output example: `1552196165`.

## Record Functions

Record functions append new keys to the record using values from the record context.
Use record functions to append new keys to a record using values from the record's context.

### RECORD\_TAG

#### Synopsis
### `RECORD_TAG`

```sql
SELECT RECORD_TAG() FROM STREAM:apache;
```

#### Description

Append Tag string associated to the record as a new key.
Append tag string associated to the record as a new key.

### RECORD\_TIME

#### Synopsis
### `RECORD_TIME`

```sql
SELECT RECORD_TIME() FROM STREAM:apache;
```

## WHERE Condition
## `WHERE` condition

Similar to conventional SQL statements, `WHERE` condition is supported in Fluent Bit query language. The language supports conditions over keys and subkeys, for instance:
Similar to conventional SQL statements, Fluent Bit supports the `WHERE` condition. You can use this condition in both keys and subkeys. For example:

```sql
SELECT AVG(size) FROM STREAM:apache WHERE method = 'POST' AND status = 200;
```

It is possible to check the existence of a key in the record using record-specific function `@record.contains`:
You can confirm whether a key exists in a record by using the record-specific function `@record.contains`:

```sql
SELECT MAX(key) FROM STREAM:apache WHERE @record.contains(key);
```

And to check if the value of a key is/is not `NULL`:
To determine if the value of a key is `NULL`:

```sql
SELECT MAX(key) FROM STREAM:apache WHERE key IS NULL;
```

Or similar:

```sql
SELECT * FROM STREAM:apache WHERE user IS NOT NULL;
```

#### Description

Append a new key with the record Timestamp in _double_ format: seconds.nanoseconds. Output example: 1552196165.705683 .