This repository was archived by the owner on May 5, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 16
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
test: add datatype.parse_sqltype testcases
Signed-off-by: Đặng Minh Dũng <[email protected]>
- Loading branch information
Showing
4 changed files
with
191 additions
and
14 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
import tests.assertions # noqa |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
from assertpy import add_extension, assert_that | ||
from sqlalchemy.sql.sqltypes import ARRAY | ||
|
||
from sqlalchemy_trino.datatype import SQLType, MAP, ROW | ||
|
||
|
||
def assert_sqltype(this: SQLType, that: SQLType): | ||
if isinstance(this, type): | ||
this = this() | ||
if isinstance(that, type): | ||
that = that() | ||
assert_that(type(this)).is_same_as(type(that)) | ||
if isinstance(this, ARRAY): | ||
assert_sqltype(this.item_type, that.item_type) | ||
if this.dimensions is None or this.dimensions == 1: | ||
assert_that(that.dimensions).is_in(None, 1) | ||
else: | ||
assert_that(this.dimensions).is_equal_to(this.dimensions) | ||
elif isinstance(this, MAP): | ||
assert_sqltype(this.key_type, that.key_type) | ||
assert_sqltype(this.value_type, that.value_type) | ||
elif isinstance(this, ROW): | ||
assert_that(len(this.attr_types)).is_equal_to(len(that.attr_types)) | ||
for name, this_attr in this.attr_types.items(): | ||
that_attr = this.attr_types[name] | ||
assert_sqltype(this_attr, that_attr) | ||
else: | ||
assert_that(str(this)).is_equal_to(str(that)) | ||
|
||
|
||
@add_extension | ||
def is_sqltype(self, that): | ||
this = self.val | ||
assert_sqltype(this, that) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
import pytest | ||
from assertpy import assert_that | ||
from sqlalchemy.sql.sqltypes import * | ||
from sqlalchemy.sql.type_api import TypeEngine | ||
|
||
from sqlalchemy_trino import datatype | ||
from sqlalchemy_trino.datatype import MAP, ROW | ||
|
||
|
||
@pytest.mark.parametrize( | ||
'type_str, sql_type', | ||
datatype._type_map.items(), | ||
ids=datatype._type_map.keys() | ||
) | ||
def test_parse_simple_type(type_str: str, sql_type: TypeEngine): | ||
actual_type = datatype.parse_sqltype(type_str) | ||
if not isinstance(actual_type, type): | ||
actual_type = type(actual_type) | ||
assert_that(actual_type).is_equal_to(sql_type) | ||
|
||
|
||
parse_type_options_testcases = { | ||
'VARCHAR(10)': VARCHAR(10), | ||
'DECIMAL(20)': DECIMAL(20), | ||
'DECIMAL(20, 3)': DECIMAL(20, 3), | ||
} | ||
|
||
|
||
@pytest.mark.parametrize( | ||
'type_str, sql_type', | ||
parse_type_options_testcases.items(), | ||
ids=parse_type_options_testcases.keys() | ||
) | ||
def test_parse_type_options(type_str: str, sql_type: TypeEngine): | ||
actual_type = datatype.parse_sqltype(type_str) | ||
assert_that(actual_type).is_sqltype(sql_type) | ||
|
||
|
||
parse_array_testcases = { | ||
'array(integer)': ARRAY(INTEGER()), | ||
'array(varchar(10))': ARRAY(VARCHAR(10)), | ||
'array(decimal(20,3))': ARRAY(DECIMAL(20, 3)), | ||
'array(array(varchar(10)))': ARRAY(VARCHAR(10), dimensions=2), | ||
} | ||
|
||
|
||
@pytest.mark.parametrize( | ||
'type_str, sql_type', | ||
parse_array_testcases.items(), | ||
ids=parse_array_testcases.keys() | ||
) | ||
def test_parse_array(type_str: str, sql_type: ARRAY): | ||
actual_type = datatype.parse_sqltype(type_str) | ||
assert_that(actual_type).is_sqltype(sql_type) | ||
|
||
|
||
parse_map_testcases = { | ||
'map(char, integer)': MAP(CHAR(), INTEGER()), | ||
'map(varchar(10), varchar(10))': MAP(VARCHAR(10), VARCHAR(10)), | ||
'map(varchar(10), decimal(20,3))': MAP(VARCHAR(10), DECIMAL(20, 3)), | ||
'map(char, array(varchar(10)))': MAP(CHAR(), ARRAY(VARCHAR(10))), | ||
'map(varchar(10), array(varchar(10)))': MAP(VARCHAR(10), ARRAY(VARCHAR(10))), | ||
'map(varchar(10), array(array(varchar(10))))': MAP(VARCHAR(10), ARRAY(VARCHAR(10), dimensions=2)), | ||
} | ||
|
||
|
||
@pytest.mark.parametrize( | ||
'type_str, sql_type', | ||
parse_map_testcases.items(), | ||
ids=parse_map_testcases.keys() | ||
) | ||
def test_parse_map(type_str: str, sql_type: ARRAY): | ||
actual_type = datatype.parse_sqltype(type_str) | ||
assert_that(actual_type).is_sqltype(sql_type) | ||
|
||
|
||
parse_row_testcases = { | ||
'row(a integer, b varchar)': ROW(dict(a=INTEGER(), b=VARCHAR())), | ||
'row(a varchar(20), b decimal(20,3))': ROW(dict(a=VARCHAR(20), b=DECIMAL(20, 3))), | ||
'row(x array(varchar(10)), y array(array(varchar(10))), z decimal(20,3))': | ||
ROW(dict(x=ARRAY(VARCHAR(10)), y=ARRAY(VARCHAR(10), dimensions=2), z=DECIMAL(20, 3))), | ||
} | ||
|
||
|
||
@pytest.mark.parametrize( | ||
'type_str, sql_type', | ||
parse_row_testcases.items(), | ||
ids=parse_row_testcases.keys() | ||
) | ||
def test_parse_row(type_str: str, sql_type: ARRAY): | ||
actual_type = datatype.parse_sqltype(type_str) | ||
assert_that(actual_type).is_sqltype(sql_type) | ||
|
||
|
||
parse_datetime_testcases = { | ||
'date': DATE(), | ||
'time': TIME(), | ||
'time with time zone': TIME(timezone=True), | ||
'timestamp': TIMESTAMP(), | ||
'timestamp with time zone': TIMESTAMP(timezone=True), | ||
} | ||
|
||
|
||
@pytest.mark.parametrize( | ||
'type_str, sql_type', | ||
parse_datetime_testcases.items(), | ||
ids=parse_datetime_testcases.keys() | ||
) | ||
def test_parse_datetime(type_str: str, sql_type: ARRAY): | ||
actual_type = datatype.parse_sqltype(type_str) | ||
assert_that(actual_type).is_sqltype(sql_type) |