Skip to content

Commit

Permalink
feat: add connector for Parseable (apache#32052)
Browse files Browse the repository at this point in the history
  • Loading branch information
AdheipSingh authored Jan 31, 2025
1 parent 1064ad5 commit 9e5876d
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 0 deletions.
18 changes: 18 additions & 0 deletions docs/docs/configuration/databases.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ are compatible with Superset.
| [MySQL](/docs/configuration/databases#mysql) | `pip install mysqlclient` | `mysql://<UserName>:<DBPassword>@<Database Host>/<Database Name>` |
| [OceanBase](/docs/configuration/databases#oceanbase) | `pip install oceanbase_py` | `oceanbase://<UserName>:<DBPassword>@<Database Host>/<Database Name>` |
| [Oracle](/docs/configuration/databases#oracle) | `pip install cx_Oracle` | `oracle://` |
| [Parseable](/docs/configuration/databases#parseable) | `pip install sqlalchemy-parseable` | `parseable://<UserName>:<DBPassword>@<Database Host>/<Stream Name>` |
| [PostgreSQL](/docs/configuration/databases#postgres) | `pip install psycopg2` | `postgresql://<UserName>:<DBPassword>@<Database Host>/<Database Name>` |
| [Presto](/docs/configuration/databases#presto) | `pip install pyhive` | `presto://` |
| [Rockset](/docs/configuration/databases#rockset) | `pip install rockset-sqlalchemy` | `rockset://<api_key>:@<api_server>` |
Expand Down Expand Up @@ -1074,6 +1075,23 @@ The connection string is formatted as follows:
oracle://<username>:<password>@<hostname>:<port>
```

#### Parseable

[Parseable](https://www.parseable.io) is a distributed log analytics database that provides SQL-like query interface for log data. The recommended connector library is [sqlalchemy-parseable](https://github.com/parseablehq/sqlalchemy-parseable).

The connection string is formatted as follows:

```
parseable://<username>:<password>@<hostname>:<port>/<stream_name>
```

For example:

```
parseable://admin:[email protected]:443/ingress-nginx
```

Note: The stream_name in the URI represents the Parseable logstream you want to query. You can use both HTTP (port 80) and HTTPS (port 443) connections.


#### Apache Pinot
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ ocient = [
"geojson",
]
oracle = ["cx-Oracle>8.0.0, <8.1"]
parseable = ["sqlalchemy-parseable>=0.1.3,<0.2.0"]
pinot = ["pinotdb>=5.0.0, <6.0.0"]
playwright = ["playwright>=1.37.0, <2"]
postgres = ["psycopg2-binary==2.9.6"]
Expand Down
84 changes: 84 additions & 0 deletions superset/db_engine_specs/parseable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from datetime import datetime
from typing import Any, TYPE_CHECKING

from sqlalchemy import types

from superset.constants import TimeGrain
from superset.db_engine_specs.base import BaseEngineSpec

if TYPE_CHECKING:
from superset.connectors.sqla.models import TableColumn
from superset.models.core import Database


class ParseableEngineSpec(BaseEngineSpec):
"""Engine spec for Parseable log analytics database."""

engine = "parseable"
engine_name = "Parseable"

_time_grain_expressions = {
None: "{col}",
TimeGrain.SECOND: "date_trunc('second', {col})",
TimeGrain.MINUTE: "date_trunc('minute', {col})",
TimeGrain.HOUR: "date_trunc('hour', {col})",
TimeGrain.DAY: "date_trunc('day', {col})",
TimeGrain.WEEK: "date_trunc('week', {col})",
TimeGrain.MONTH: "date_trunc('month', {col})",
TimeGrain.QUARTER: "date_trunc('quarter', {col})",
TimeGrain.YEAR: "date_trunc('year', {col})",
}

@classmethod
def epoch_to_dttm(cls) -> str:
return "to_timestamp({col})"

@classmethod
def epoch_ms_to_dttm(cls) -> str:
return "to_timestamp({col} / 1000)"

@classmethod
def convert_dttm(
cls, target_type: str, dttm: datetime, db_extra: dict[str, Any] | None = None
) -> str | None:
sqla_type = cls.get_sqla_column_type(target_type)

if isinstance(sqla_type, types.TIMESTAMP):
return f"'{dttm.strftime('%Y-%m-%dT%H:%M:%S.000')}'"
return None

@classmethod
def alter_new_orm_column(cls, orm_col: TableColumn) -> None:
"""Handle p_timestamp column specifically for Parseable."""
if orm_col.column_name == "p_timestamp":
orm_col.python_date_format = "epoch_ms"
orm_col.is_dttm = True

@classmethod
def get_extra_params(cls, database: Database) -> dict[str, Any]:
"""Additional parameters for Parseable connections."""
return {
"engine_params": {
"connect_args": {
"timeout": 300, # 5 minutes timeout
}
}
}
77 changes: 77 additions & 0 deletions tests/unit_tests/db_engine_specs/test_parseable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from datetime import datetime
from typing import Optional

import pytest

from tests.unit_tests.db_engine_specs.utils import assert_convert_dttm
from tests.unit_tests.fixtures.common import dttm # noqa: F401


def test_epoch_to_dttm() -> None:
"""
DB Eng Specs (parseable): Test epoch to dttm
"""
from superset.db_engine_specs.parseable import ParseableEngineSpec

assert ParseableEngineSpec.epoch_to_dttm() == "to_timestamp({col})"


def test_epoch_ms_to_dttm() -> None:
"""
DB Eng Specs (parseable): Test epoch ms to dttm
"""
from superset.db_engine_specs.parseable import ParseableEngineSpec

assert ParseableEngineSpec.epoch_ms_to_dttm() == "to_timestamp({col} / 1000)"


def test_alter_new_orm_column() -> None:
"""
DB Eng Specs (parseable): Test alter orm column
"""
from superset.connectors.sqla.models import SqlaTable, TableColumn
from superset.db_engine_specs.parseable import ParseableEngineSpec
from superset.models.core import Database

database = Database(database_name="parseable", sqlalchemy_uri="parseable://db")
tbl = SqlaTable(table_name="tbl", database=database)
col = TableColumn(column_name="p_timestamp", type="TIMESTAMP", table=tbl)
ParseableEngineSpec.alter_new_orm_column(col)
assert col.python_date_format == "epoch_ms"
assert col.is_dttm is True


@pytest.mark.parametrize(
"target_type,expected_result",
[
("TIMESTAMP", "'2019-01-02T03:04:05.000'"),
("UnknownType", None),
],
)
def test_convert_dttm(
target_type: str,
expected_result: Optional[str],
dttm: datetime, # noqa: F811
) -> None:
"""
DB Eng Specs (parseable): Test conversion to date time
"""
from superset.db_engine_specs.parseable import ParseableEngineSpec

assert_convert_dttm(ParseableEngineSpec, target_type, expected_result, dttm)

0 comments on commit 9e5876d

Please sign in to comment.