-
Notifications
You must be signed in to change notification settings - Fork 3
[EWT-1250] Sqlalchemy/Superset expectations from python-DBAPI #17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
c43c8a4
c21261a
fa5d1b1
df3fff5
b802b80
65120fb
2c9b5f8
96e44dc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,7 +22,7 @@ | |
GeometryRepresentation, | ||
) | ||
from wherobots.db.cursor import Cursor | ||
from wherobots.db.errors import NotSupportedError, OperationalError | ||
from wherobots.db.errors import OperationalError | ||
|
||
|
||
@dataclass | ||
|
@@ -78,10 +78,10 @@ def close(self): | |
self.__ws.close() | ||
|
||
def commit(self): | ||
raise NotSupportedError | ||
pass | ||
|
||
def rollback(self): | ||
raise NotSupportedError | ||
pass | ||
|
||
def cursor(self) -> Cursor: | ||
return Cursor(self.__execute_sql, self.__cancel_query) | ||
|
@@ -155,12 +155,20 @@ def __listen(self): | |
|
||
query.state = ExecutionState.COMPLETED | ||
if result_format == ResultsFormat.JSON: | ||
query.handler(json.loads(result_bytes.decode("utf-8"))) | ||
data = json.loads(result_bytes.decode("utf-8")) | ||
columns = data["columns"] | ||
column_types = data.get("column_types") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is |
||
rows = data["rows"] | ||
query.handler((columns, column_types, rows)) | ||
elif result_format == ResultsFormat.ARROW: | ||
buffer = pyarrow.py_buffer(result_bytes) | ||
stream = pyarrow.input_stream(buffer, result_compression) | ||
with pyarrow.ipc.open_stream(stream) as reader: | ||
query.handler(reader.read_pandas()) | ||
schema = reader.schema | ||
columns = schema.names | ||
column_types = [field.type for field in schema] | ||
rows = reader.read_pandas().values.tolist() | ||
query.handler((columns, column_types, rows)) | ||
else: | ||
query.handler( | ||
OperationalError(f"Unsupported results format {result_format}") | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,12 +5,19 @@ | |
|
||
_TYPE_MAP = { | ||
"object": "STRING", | ||
"string": "STRING", | ||
"int32": "NUMBER", | ||
"int64": "NUMBER", | ||
"float32": "NUMBER", | ||
"float64": "NUMBER", | ||
"datetime64[ns]": "DATETIME", | ||
"timedelta[ns]": "DATETIME", | ||
"double": "NUMBER", | ||
"bool": "NUMBER", # Assuming boolean is stored as number | ||
"bytes": "BINARY", | ||
"struct": "STRUCT", | ||
"list": "LIST", | ||
"geometry": "GEOMETRY", | ||
} | ||
|
||
|
||
|
@@ -54,20 +61,21 @@ def __get_results(self) -> Optional[List[Tuple[Any, ...]]]: | |
if isinstance(result, DatabaseError): | ||
raise result | ||
|
||
self.__rowcount = len(result) | ||
self.__results = result | ||
if not result.empty: | ||
columns, column_types, rows = result | ||
self.__rowcount = len(rows) | ||
self.__results = rows | ||
if rows: | ||
self.__description = [ | ||
( | ||
col_name, # name | ||
_TYPE_MAP.get(str(result[col_name].dtype), "STRING"), # type_code | ||
_TYPE_MAP.get(str(column_types[i]), "STRING"), # type_code | ||
None, # display_size | ||
result[col_name].memory_usage(), # internal_size | ||
None, # internal_size | ||
peterfoldes marked this conversation as resolved.
Show resolved
Hide resolved
|
||
None, # precision | ||
None, # scale | ||
True, # null_ok; Assuming all columns can accept NULL values | ||
) | ||
for col_name in result.columns | ||
for i, col_name in enumerate(columns) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use https://docs.python.org/3/library/functions.html#zip to avoid jumping hoops with an index (it's much nicer to read, and also more efficient): self.__description = [
(
col_name,
_TYPE_MAP.get(col_type, 'STRING'),
...
)
for (col_name, col_type) in zip(columns, column_types)
] |
||
] | ||
|
||
return self.__results | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,7 +4,6 @@ | |
""" | ||
|
||
import logging | ||
import os | ||
import urllib.parse | ||
import queue | ||
import requests | ||
|
@@ -51,6 +50,7 @@ def connect( | |
results_format: Union[ResultsFormat, None] = None, | ||
data_compression: Union[DataCompression, None] = None, | ||
geometry_representation: Union[GeometryRepresentation, None] = None, | ||
ws_url: str = None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this required, instead of calling The reason this parameter wasn't included in |
||
) -> Connection: | ||
if not token and not api_key: | ||
raise ValueError("At least one of `token` or `api_key` is required") | ||
|
@@ -67,6 +67,24 @@ def connect( | |
runtime = runtime or DEFAULT_RUNTIME | ||
region = region or DEFAULT_REGION | ||
|
||
if ws_url: | ||
logging.info( | ||
"Using existing %s/%s runtime in %s from %s ...", | ||
runtime.name, | ||
runtime.value, | ||
region.value, | ||
host, | ||
) | ||
session_uri = ws_url | ||
return connect_direct( | ||
uri=http_to_ws(session_uri), | ||
headers=headers, | ||
read_timeout=read_timeout, | ||
results_format=results_format, | ||
data_compression=data_compression, | ||
geometry_representation=geometry_representation, | ||
) | ||
|
||
logging.info( | ||
"Requesting %s/%s runtime in %s from %s ...", | ||
runtime.name, | ||
|
Uh oh!
There was an error while loading. Please reload this page.