From 84251128444dfed7355a74311ab465d72502a4ef Mon Sep 17 00:00:00 2001 From: Dmitry Maslennikov Date: Wed, 2 Nov 2022 17:19:41 +0400 Subject: [PATCH] autocommit, isolation levels, getlastrowid, patch for fetchone --- sqlalchemy_iris/base.py | 111 ++++++++++++++++++++++++++++++-- sqlalchemy_iris/requirements.py | 55 ++++++++-------- test/test_suite.py | 78 ++++++++++++++++++---- 3 files changed, 198 insertions(+), 46 deletions(-) diff --git a/sqlalchemy_iris/base.py b/sqlalchemy_iris/base.py index 4c49c45..b95ecf1 100644 --- a/sqlalchemy_iris/base.py +++ b/sqlalchemy_iris/base.py @@ -1,5 +1,10 @@ import datetime from telnetlib import BINARY +from iris.dbapi._DBAPI import Cursor +from iris.dbapi._ResultSetRow import _ResultSetRow +from iris.dbapi._DBAPI import SQLType as IRISSQLType +import iris._IRISNative as irisnative +import iris.dbapi._DBAPI as dbapi from . import information_schema as ischema from sqlalchemy import exc from sqlalchemy.orm import aliased @@ -9,6 +14,7 @@ from sqlalchemy.sql import util as sql_util from sqlalchemy.sql import between from sqlalchemy.sql import func +from sqlalchemy.sql import expression from sqlalchemy import sql, text from sqlalchemy import util from sqlalchemy import types as sqltypes @@ -501,8 +507,41 @@ def __init__(self, dialect): dialect, omit_schema=False) +class CursorWrapper(Cursor): + def __init__(self, connection): + super(CursorWrapper, self).__init__(connection) + + def fetchone(self): + retval = super(CursorWrapper, self).fetchone() + if retval is None: + return None + if not isinstance(retval, _ResultSetRow.DataRow): + return retval + + # Workaround for fetchone, which returns values in row not from 0 + row = [] + for c in self._columns: + value = retval[c.name] + # Workaround for issue, when int returned as string + if value is not None and c.type in (IRISSQLType.INTEGER, IRISSQLType.BIGINT,) and type(value) is not int: + value = int(value) + row.append(value) + return row + + class IRISExecutionContext(default.DefaultExecutionContext): - pass + + def get_lastrowid(self): + cursor = self.create_cursor() + cursor.execute("SELECT LAST_IDENTITY()") + lastrowid = cursor.fetchone()[0] + cursor.close() + return lastrowid + + def create_cursor(self): + # cursor = self._dbapi_connection.cursor() + cursor = CursorWrapper(self._dbapi_connection) + return cursor HOROLOG_ORDINAL = datetime.date(1840, 12, 31).toordinal() @@ -594,7 +633,7 @@ class IRISDialect(default.DefaultDialect): supports_sequences = False - postfetch_lastrowid = False + postfetch_lastrowid = True non_native_boolean_check_constraint = False supports_simple_order_by_label = False supports_empty_insert = False @@ -612,11 +651,58 @@ class IRISDialect(default.DefaultDialect): def __init__(self, **kwargs): default.DefaultDialect.__init__(self, **kwargs) + self._auto_parallel = 1 + + _isolation_lookup = set( + [ + "READ UNCOMMITTED", + "READ COMMITTED", + "READ VERIFIED", + ] + ) + + def _get_option(self, connection, option): + cursor = CursorWrapper(connection) + # cursor = connection.cursor() + cursor.execute('SELECT %SYSTEM_SQL.Util_GetOption(?)', [option, ]) + row = cursor.fetchone() + if row: + return row[0] + return None + + def _set_option(self, connection, option, value): + cursor = CursorWrapper(connection) + # cursor = connection.cursor() + cursor.execute('SELECT %SYSTEM_SQL.Util_SetOption(?, ?)', [option, value, ]) + row = cursor.fetchone() + if row: + return row[0] + return None + + def get_isolation_level(self, connection): + level = int(self._get_option(connection, 'IsolationMode')) + if level == 0: + return 'READ UNCOMMITTED' + elif level == 1: + return 'READ COMMITTED' + elif level == 3: + return 'READ VERIFIED' + return None + + def set_isolation_level(self, connection, level_str): + if level_str == "AUTOCOMMIT": + connection.setAutoCommit(True) + else: + connection.setAutoCommit(False) + level = 0 + if level_str == 'READ COMMITTED': + level = 1 + elif level_str == 'READ VERIFIED': + level = 3 + self._set_option(connection, 'IsolationMode', level) @classmethod def dbapi(cls): - import iris._IRISNative as irisnative - import iris.dbapi._DBAPI as dbapi dbapi.connect = irisnative.connect dbapi.paramstyle = "format" return dbapi @@ -629,6 +715,8 @@ def create_connect_args(self, url): opts["username"] = url.username if url.username else '' opts["password"] = url.password if url.password else '' + opts['autoCommit'] = False + return ([], opts) def _fix_for_params(self, query, params, many=False): @@ -659,6 +747,21 @@ def do_executemany(self, cursor, query, params, context=None): query, params = self._fix_for_params(query, params, True) cursor.executemany(query, params) + def do_begin(self, connection): + pass + + def do_rollback(self, connection): + connection.rollback() + + def do_commit(self, connection): + connection.commit() + + def do_savepoint(self, connection, name): + connection.execute(expression.SavepointClause(name)) + + def do_release_savepoint(self, connection, name): + pass + def get_schema(self, schema=None): if schema is None: return 'SQLUser' diff --git a/sqlalchemy_iris/requirements.py b/sqlalchemy_iris/requirements.py index 9150a5e..fddf824 100644 --- a/sqlalchemy_iris/requirements.py +++ b/sqlalchemy_iris/requirements.py @@ -298,7 +298,7 @@ def implements_get_lastrowid(self): method without reliance on RETURNING. """ - # return exclusions.open() + return exclusions.open() return exclusions.closed() @property @@ -658,7 +658,7 @@ def binary_literals(self): @property def autocommit(self): """target dialect supports 'AUTOCOMMIT' as an isolation_level""" - return exclusions.closed() + return exclusions.open() @property def isolation_level(self): @@ -668,33 +668,18 @@ def isolation_level(self): the get_isolation_levels() method be implemented. """ - return exclusions.closed() + return exclusions.open() def get_isolation_levels(self, config): - """Return a structure of supported isolation levels for the current - testing dialect. - - The structure indicates to the testing suite what the expected - "default" isolation should be, as well as the other values that - are accepted. The dictionary has two keys, "default" and "supported". - The "supported" key refers to a list of all supported levels and - it should include AUTOCOMMIT if the dialect supports it. - - If the :meth:`.DefaultRequirements.isolation_level` requirement is - not open, then this method has no return value. - - E.g.:: - - >>> testing.requirements.get_isolation_levels() - { - "default": "READ_COMMITTED", - "supported": [ - "SERIALIZABLE", "READ UNCOMMITTED", - "READ COMMITTED", "REPEATABLE READ", - "AUTOCOMMIT" - ] - } - """ + return { + "default": "READ UNCOMMITTED", + "supported": [ + "AUTOCOMMIT", + "READ UNCOMMITTED", + "READ COMMITTED", + "READ VERIFIED", + ] + } @property def json_type(self): @@ -878,7 +863,7 @@ def selectone(self): def savepoints(self): """Target database must support savepoints.""" - return exclusions.closed() + return exclusions.open() @property def two_phase_transactions(self): @@ -1125,3 +1110,17 @@ def autoincrement_without_sequence(self): """ return exclusions.open() # return exclusions.closed() + + # + # SQLAlchemy Tests + # pytest --dburi iris://_SYSTEM:SYS@localhost:1972/USER \ + # --requirements sqlalchemy_iris.requirements:Requirements + # + + @property + def memory_process_intensive(self): + return exclusions.closed() + + @property + def array_type(self): + return exclusions.closed() diff --git a/test/test_suite.py b/test/test_suite.py index 1bb4ece..8d14100 100644 --- a/test/test_suite.py +++ b/test/test_suite.py @@ -1,7 +1,8 @@ from sqlalchemy.testing.suite import QuotedNameArgumentTest as _QuotedNameArgumentTest from sqlalchemy.testing.suite import FetchLimitOffsetTest as _FetchLimitOffsetTest from sqlalchemy.testing.suite import CompoundSelectTest as _CompoundSelectTest -from sqlalchemy.testing import fixtures, AssertsExecutionResults, AssertsCompiledSQL +from sqlalchemy.testing import fixtures +# from sqlalchemy.testing import AssertsExecutionResults, AssertsCompiledSQL from sqlalchemy import testing from sqlalchemy import Table, Column, Integer, String, select import pytest @@ -56,25 +57,74 @@ def test_simple_limit_offset_no_order(self, connection, cases): ) -class MiscTest(AssertsExecutionResults, AssertsCompiledSQL, fixtures.TablesTest): +# class MiscTest(AssertsExecutionResults, AssertsCompiledSQL, fixtures.TablesTest): - __backend__ = True +# __backend__ = True + +# __only_on__ = "iris" + +# @classmethod +# def define_tables(cls, metadata): +# Table( +# "some_table", +# metadata, +# Column("id", Integer, primary_key=True), +# Column("x", Integer), +# Column("y", Integer), +# Column("z", String(50)), +# ) + +# # def test_compile(self): +# # table = self.tables.some_table + +# # stmt = select(table.c.id, table.c.x).offset(20).limit(10) - __only_on__ = "iris" + +class TransactionTest(fixtures.TablesTest): + __backend__ = True @classmethod def define_tables(cls, metadata): Table( - "some_table", + "users", metadata, - Column("id", Integer, primary_key=True), - Column("x", Integer), - Column("y", Integer), - Column("z", String(50)), + Column("user_id", Integer, primary_key=True), + Column("user_name", String(20)), + test_needs_acid=True, ) - # def test_compile(self): - # table = self.tables.some_table - - # stmt = select(table.c.id, table.c.x).offset(20).limit(10) - + @testing.fixture + def local_connection(self): + with testing.db.connect() as conn: + yield conn + + def test_commits(self, local_connection): + users = self.tables.users + connection = local_connection + transaction = connection.begin() + connection.execute(users.insert(), dict(user_id=1, user_name="user1")) + transaction.commit() + + transaction = connection.begin() + connection.execute(users.insert(), dict(user_id=2, user_name="user2")) + connection.execute(users.insert(), dict(user_id=3, user_name="user3")) + transaction.commit() + + transaction = connection.begin() + result = connection.exec_driver_sql("select * from users") + assert len(result.fetchall()) == 3 + transaction.commit() + connection.close() + + def test_rollback(self, local_connection): + """test a basic rollback""" + + users = self.tables.users + connection = local_connection + transaction = connection.begin() + connection.execute(users.insert(), dict(user_id=1, user_name="user1")) + connection.execute(users.insert(), dict(user_id=2, user_name="user2")) + connection.execute(users.insert(), dict(user_id=3, user_name="user3")) + transaction.rollback() + result = connection.exec_driver_sql("select * from users") + assert len(result.fetchall()) == 0