diff --git a/CHANGELOG.md b/CHANGELOG.md index 303d86b..42beae1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ Please see [MIGRATING.md](./MIGRATING.md) for information on breaking changes. - psycopg3 is now used for internal operations. LDLite.connect_db_postgres will return a psycopg3 connection instead of psycopg2 in the next major release. - psycopg2 is now installed using the binary version. +- Refactored internal database handling logic ### Removed diff --git a/pylock.maximal.toml b/pylock.maximal.toml index d04a769..171af54 100644 --- a/pylock.maximal.toml +++ b/pylock.maximal.toml @@ -674,7 +674,7 @@ marker = "sys_platform == \"win32\" and python_version >= \"3.13\" and \"default dependencies = [] [tool.pdm] -hashes = {sha256 = "993642e8053f231239acbb7c445ace66bc6705d078056cf64d5ce3729d4d43c3"} +hashes = {sha256 = "9cd31bd06db4ed0781a4f5b9f507c9fb109b9e48b412ee0b76a1115f841b603b"} strategy = ["inherit_metadata", "static_urls"] [[tool.pdm.targets]] diff --git a/pylock.minimal.toml b/pylock.minimal.toml index 66189f3..218eb67 100644 --- a/pylock.minimal.toml +++ b/pylock.minimal.toml @@ -252,18 +252,19 @@ dependencies = [] [[packages]] name = "psycopg" -version = "3.0" -requires-python = ">=3.6" -sdist = {name = "psycopg-3.0.tar.gz", url = "https://files.pythonhosted.org/packages/2a/1d/77fd95c963b8e066de8b49e592e7e912ae53a24275f04a4d5279f67e454a/psycopg-3.0.tar.gz", hashes = {sha256 = "82bce906431f44e81d72fbe0b924a71eebec09189bcf0dd16003e33960e36efa"}} +version = "3.2.0" +requires-python = ">=3.8" +sdist = {name = "psycopg-3.2.0.tar.gz", url = "https://files.pythonhosted.org/packages/10/8e/6b14afef7d09f2769cded48422dbb46699612573092334274e56e4a989b5/psycopg-3.2.0.tar.gz", hashes = {sha256 = "f93c5376598da868a5f761a44f920b84ec937a15a46e85df2454328f13d7009a"}} wheels = [ - {name = "psycopg-3.0-py3-none-any.whl",url = "https://files.pythonhosted.org/packages/d8/31/9ce982570ebb20d092ba4d029ed2702634789bc0cc8deb0c09ba6e22e4db/psycopg-3.0-py3-none-any.whl",hashes = {sha256 = "65b9fb8838dae61040ad3e0cfc184d4ffd17f740ef4c0353d76050a6eb061a9c"}}, + {name = "psycopg-3.2.0-py3-none-any.whl",url = "https://files.pythonhosted.org/packages/e4/72/d7f826af204e7d826e00c5fb28ce6fca7ee4b66f784ea8d9184c5fd0a9e4/psycopg-3.2.0-py3-none-any.whl",hashes = {sha256 = "48a069cfe0ae852a331e1eb48e4da64a1a47723a6303559935801f0e0245ba22"}}, ] marker = "\"default\" in dependency_groups" [packages.tool.pdm] dependencies = [ - "typing-extensions; python_version < \"3.8\"", - "backports-zoneinfo; python_version < \"3.9\"", + "typing-extensions>=4.4", + "backports-zoneinfo>=0.2.0; python_version < \"3.9\"", + "tzdata; sys_platform == \"win32\"", ] [[packages]] @@ -463,6 +464,19 @@ marker = "\"test\" in dependency_groups" [packages.tool.pdm] dependencies = [] +[[packages]] +name = "typing-extensions" +version = "4.15.0" +requires-python = ">=3.9" +sdist = {name = "typing_extensions-4.15.0.tar.gz", url = "https://files.pythonhosted.org/packages/72/94/1a15dd82efb362ac84269196e94cf00f187f7ed21c242792a923cdb1c61f/typing_extensions-4.15.0.tar.gz", hashes = {sha256 = "0cea48d173cc12fa28ecabc3b837ea3cf6f38c6d1136f85cbaaf598984861466"}} +wheels = [ + {name = "typing_extensions-4.15.0-py3-none-any.whl",url = "https://files.pythonhosted.org/packages/18/67/36e9267722cc04a6b9f15c7f3441c2363321a3ea07da7ae0c0707beb2a9c/typing_extensions-4.15.0-py3-none-any.whl",hashes = {sha256 = "f0fa19c6845758ab08074a0cfa8b7aecb71c999ca73d62883bc25cc018c4e548"}}, +] +marker = "\"default\" in dependency_groups or \"lint\" in dependency_groups or \"test\" in dependency_groups" + +[packages.tool.pdm] +dependencies = [] + [[packages]] name = "colorama" version = "0.4.6" @@ -491,19 +505,6 @@ dependencies = [ "typing-extensions>=4.6.0; python_version < \"3.13\"", ] -[[packages]] -name = "typing-extensions" -version = "4.15.0" -requires-python = ">=3.9" -sdist = {name = "typing_extensions-4.15.0.tar.gz", url = "https://files.pythonhosted.org/packages/72/94/1a15dd82efb362ac84269196e94cf00f187f7ed21c242792a923cdb1c61f/typing_extensions-4.15.0.tar.gz", hashes = {sha256 = "0cea48d173cc12fa28ecabc3b837ea3cf6f38c6d1136f85cbaaf598984861466"}} -wheels = [ - {name = "typing_extensions-4.15.0-py3-none-any.whl",url = "https://files.pythonhosted.org/packages/18/67/36e9267722cc04a6b9f15c7f3441c2363321a3ea07da7ae0c0707beb2a9c/typing_extensions-4.15.0-py3-none-any.whl",hashes = {sha256 = "f0fa19c6845758ab08074a0cfa8b7aecb71c999ca73d62883bc25cc018c4e548"}}, -] -marker = "\"default\" in dependency_groups or \"lint\" in dependency_groups or \"test\" in dependency_groups" - -[packages.tool.pdm] -dependencies = [] - [[packages]] name = "httpx" version = "0.28.1" @@ -846,8 +847,21 @@ dependencies = [ "enum34; python_version < \"3.4\"", ] +[[packages]] +name = "tzdata" +version = "2025.2" +requires-python = ">=2" +sdist = {name = "tzdata-2025.2.tar.gz", url = "https://files.pythonhosted.org/packages/95/32/1a225d6164441be760d75c2c42e2780dc0873fe382da3e98a2e1e48361e5/tzdata-2025.2.tar.gz", hashes = {sha256 = "b60a638fcc0daffadf82fe0f57e53d06bdec2f36c4df66280ae79bce6bd6f2b9"}} +wheels = [ + {name = "tzdata-2025.2-py2.py3-none-any.whl",url = "https://files.pythonhosted.org/packages/5c/23/c7abc0ca0a1526a0774eca151daeb8de62ec457e77262b66b359c3c7679e/tzdata-2025.2-py2.py3-none-any.whl",hashes = {sha256 = "1a403fada01ff9221ca8044d701868fa132215d84beb92242d9acd2147f667a8"}}, +] +marker = "sys_platform == \"win32\" and \"default\" in dependency_groups" + +[packages.tool.pdm] +dependencies = [] + [tool.pdm] -hashes = {sha256 = "993642e8053f231239acbb7c445ace66bc6705d078056cf64d5ce3729d4d43c3"} +hashes = {sha256 = "9cd31bd06db4ed0781a4f5b9f507c9fb109b9e48b412ee0b76a1115f841b603b"} strategy = ["direct_minimal_versions", "inherit_metadata", "static_urls"] [[tool.pdm.targets]] diff --git a/pylock.toml b/pylock.toml index 7b801c5..513acd0 100644 --- a/pylock.toml +++ b/pylock.toml @@ -965,7 +965,7 @@ marker = "sys_platform == \"win32\" and \"default\" in dependency_groups" dependencies = [] [tool.pdm] -hashes = {sha256 = "993642e8053f231239acbb7c445ace66bc6705d078056cf64d5ce3729d4d43c3"} +hashes = {sha256 = "9cd31bd06db4ed0781a4f5b9f507c9fb109b9e48b412ee0b76a1115f841b603b"} strategy = ["inherit_metadata", "static_urls"] [[tool.pdm.targets]] diff --git a/pyproject.toml b/pyproject.toml index e6651d6..b23a302 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,7 +17,7 @@ dependencies = [ "XlsxWriter>=3.0.6", "httpx-folio>=0.2.3", "orjson>=2.2.1", - "psycopg>=3.0", + "psycopg>=3.2.0", ] requires-python = ">=3.9" readme = "README.md" diff --git a/src/ldlite/__init__.py b/src/ldlite/__init__.py index d506d76..61c6081 100644 --- a/src/ldlite/__init__.py +++ b/src/ldlite/__init__.py @@ -38,6 +38,7 @@ import sqlite3 import sys +from itertools import count from typing import TYPE_CHECKING, NoReturn, cast import duckdb @@ -47,10 +48,17 @@ from tqdm import tqdm from ._csv import to_csv +from ._database import Prefix from ._folio import FolioClient from ._jsonx import Attr, drop_json_tables, transform_json from ._select import select -from ._sqlx import DBType, as_postgres, autocommit, encode_sql_str, json_type, sqlid +from ._sqlx import ( + DBType, + DBTypeDatabase, + as_postgres, + autocommit, + sqlid, +) from ._xlsx import to_xlsx if TYPE_CHECKING: @@ -341,21 +349,6 @@ def query( # noqa: C901, PLR0912, PLR0913, PLR0915 drop_json_tables(self.db, table) autocommit(self.db, self.dbtype, False) try: - cur = self.db.cursor() - try: - if len(schema_table) == 2: - cur.execute("CREATE SCHEMA IF NOT EXISTS " + sqlid(schema_table[0])) - cur.execute("DROP TABLE IF EXISTS " + sqlid(table)) - cur.execute( - "CREATE TABLE " - + sqlid(table) - + "(__id integer, jsonb " - + json_type(self.dbtype) - + ")", - ) - finally: - cur.close() - self.db.commit() # First get total number of records records = self._folio.iterate_records( path, @@ -364,62 +357,37 @@ def query( # noqa: C901, PLR0912, PLR0913, PLR0915 self.page_size, query=cast("QueryType", query), ) - (total_records, _) = next(records) - total = total_records if total_records is not None else 0 + total = min(total_records, limit or total_records) if self._verbose: print("ldlite: estimated row count: " + str(total), file=sys.stderr) - # Read result pages + + processed = count(0) pbar = None - pbartotal = 0 if not self._quiet: - if total == -1: - pbar = tqdm( - desc="reading", - leave=False, - mininterval=3, - smoothing=0, - colour="#A9A9A9", - bar_format="{desc} {elapsed} {bar}{postfix}", - ) - else: - pbar = tqdm( - desc="reading", - total=total, - leave=False, - mininterval=3, - smoothing=0, - colour="#A9A9A9", - bar_format="{desc} {bar}{postfix}", - ) + pbar = tqdm( + desc="reading", + total=total, + leave=False, + mininterval=3, + smoothing=0, + colour="#A9A9A9", + bar_format="{desc} {bar}{postfix}", + ) + + def on_processed() -> bool: + if pbar is not None: + pbar.update(1) + p = next(processed) + return limit is None or p >= limit + cur = self.db.cursor() - try: - count = 0 - for pkey, d in records: - cur.execute( - "INSERT INTO " - + sqlid(table) - + " VALUES(" - + str(pkey) - + "," - + encode_sql_str(self.dbtype, d) - + ")", - ) - count += 1 - if pbar is not None: - if pbartotal + 1 > total: - pbartotal = total - pbar.update(total - pbartotal) - else: - pbartotal += 1 - pbar.update(1) - if limit is not None and count == limit: - break - finally: - cur.close() + db = DBTypeDatabase(self.dbtype, self.db) + db.ingest_records(self.db, Prefix(table), on_processed, records) + self.db.commit() if pbar is not None: pbar.close() - self.db.commit() + newtables = [table] newattrs = {} if json_depth > 0: @@ -427,7 +395,7 @@ def query( # noqa: C901, PLR0912, PLR0913, PLR0915 self.db, self.dbtype, table, - count, + next(processed) - 1, self._quiet, json_depth, ) @@ -468,7 +436,6 @@ def query( # noqa: C901, PLR0912, PLR0913, PLR0915 colour="#A9A9A9", bar_format="{desc} {bar}{postfix}", ) - pbartotal = 0 for t, attr in indexable_attrs: cur = self.db.cursor() try: @@ -480,7 +447,6 @@ def query( # noqa: C901, PLR0912, PLR0913, PLR0915 finally: cur.close() if pbar is not None: - pbartotal += 1 pbar.update(1) if pbar is not None: pbar.close() diff --git a/src/ldlite/_database.py b/src/ldlite/_database.py new file mode 100644 index 0000000..f1988de --- /dev/null +++ b/src/ldlite/_database.py @@ -0,0 +1,98 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from contextlib import closing +from typing import TYPE_CHECKING, Callable, Generic, TypeVar + +from psycopg import sql + +if TYPE_CHECKING: + from collections.abc import Iterator + + from _typeshed import dbapi + +DB = TypeVar("DB", bound="dbapi.DBAPIConnection") + + +class Prefix: + def __init__(self, table: str): + self._schema: str | None = None + sandt = table.split(".") + if len(sandt) == 1: + (self._prefix,) = sandt + else: + (self._schema, self._prefix) = sandt + + @property + def schema_name(self) -> sql.Identifier | None: + return None if self._schema is None else sql.Identifier(self._schema) + + @property + def raw_table_name(self) -> sql.Identifier: + return ( + sql.Identifier(self._schema, self._prefix) + if self._schema is not None + else sql.Identifier(self._prefix) + ) + + +class Database(ABC, Generic[DB]): + def __init__(self, conn_factory: Callable[[], DB]): + self._conn_factory = conn_factory + + @property + @abstractmethod + def _truncate_raw_table_sql(self) -> sql.SQL: ... + @property + @abstractmethod + def _create_raw_table_sql(self) -> sql.SQL: ... + @property + @abstractmethod + def _insert_record_sql(self) -> sql.SQL: ... + + def _prepare_raw_table( + self, + conn: DB, + prefix: Prefix, + ) -> None: + with closing(conn.cursor()) as cur: + if prefix.schema_name is not None: + cur.execute( + sql.SQL("CREATE SCHEMA IF NOT EXISTS {schema};") + .format(schema=prefix.schema_name) + .as_string(), + ) + + cur.execute( + self._create_raw_table_sql.format( + table=prefix.raw_table_name, + ).as_string(), + ) + cur.execute( + self._truncate_raw_table_sql.format( + table=prefix.raw_table_name, + ).as_string(), + ) + + def ingest_records( + self, + conn: DB, + prefix: Prefix, + on_processed: Callable[[], bool], + records: Iterator[tuple[int, str | bytes]], + ) -> None: + # the only implementation right now is a hack + # the db connection is managed outside of the factory + # for now it's taken as a parameter + # with self._conn_factory() as conn: + self._prepare_raw_table(conn, prefix) + with closing(conn.cursor()) as cur: + for pkey, d in records: + cur.execute( + self._insert_record_sql.format( + table=prefix.raw_table_name, + ).as_string(), + [pkey, d if isinstance(d, str) else d.decode("utf-8")], + ) + if not on_processed(): + return diff --git a/src/ldlite/_sqlx.py b/src/ldlite/_sqlx.py index 34615f0..6277fda 100644 --- a/src/ldlite/_sqlx.py +++ b/src/ldlite/_sqlx.py @@ -4,6 +4,10 @@ from enum import Enum from typing import TYPE_CHECKING, cast +from psycopg import sql + +from ._database import Database + if TYPE_CHECKING: import sqlite3 @@ -21,6 +25,38 @@ class DBType(Enum): SQLITE = 4 +class DBTypeDatabase(Database["dbapi.DBAPIConnection"]): + def __init__(self, dbtype: DBType, db: dbapi.DBAPIConnection): + self._dbtype = dbtype + super().__init__(lambda: db) + + @property + def _create_raw_table_sql(self) -> sql.SQL: + create_sql = "CREATE TABLE IF NOT EXISTS {table} (__id integer, jsonb text);" + if self._dbtype == DBType.POSTGRES: + create_sql = ( + "CREATE TABLE IF NOT EXISTS {table} (__id integer, jsonb jsonb);" + ) + + return sql.SQL(create_sql) + + @property + def _truncate_raw_table_sql(self) -> sql.SQL: + truncate_sql = "TRUNCATE TABLE {table};" + if self._dbtype == DBType.SQLITE: + truncate_sql = "DELETE FROM {table};" + + return sql.SQL(truncate_sql) + + @property + def _insert_record_sql(self) -> sql.SQL: + insert_sql = "INSERT INTO {table} VALUES(?, ?);" + if self._dbtype == DBType.POSTGRES: + insert_sql = "INSERT INTO {table} VALUES(%s, %s);" + + return sql.SQL(insert_sql) + + def as_duckdb( db: dbapi.DBAPIConnection, dbtype: DBType, @@ -51,15 +87,6 @@ def as_sqlite( return cast("sqlite3.Connection", db) -def strip_schema(table: str) -> str: - st = table.split(".") - if len(st) == 1: - return table - if len(st) == 2: - return st[1] - raise ValueError("invalid table name: " + table) - - def autocommit(db: dbapi.DBAPIConnection, dbtype: DBType, enable: bool) -> None: if (pgdb := as_postgres(db, dbtype)) is not None: pgdb.rollback() @@ -101,14 +128,6 @@ def varchar_type(dbtype: DBType) -> str: return "varchar" -def json_type(dbtype: DBType) -> str: - if dbtype == DBType.POSTGRES: - return "jsonb" - if dbtype == DBType.SQLITE: - return "text" - return "varchar" - - def encode_sql_str(dbtype: DBType, s: str | bytes) -> str: # noqa: C901, PLR0912 if isinstance(s, bytes): s = s.decode("utf-8")