Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,22 @@ Please see [MIGRATING.md](./MIGRATING.md) for information on breaking changes.

### Removed

## [3.2.0] - September 2025

### Added

- Source Storage endpoints now stream only if streaming is available.
- Connections returned from the LDLite.connect_db methods are now isolated from the ones used internally.

### Changed

- psycopg3 is now used for internal operations. LDLite.connect_db_postgres will return a psycopg3 connection instead of psycopg2 in the next major release.
- psycopg2 is now installed as the binary version.
- Refactored internal table management to be safer and more resilient.
- Ingesting data into postgres now uses COPY FROM which significantly improves the download performance.

### Removed

## [3.1.4] - September 2025

### Fixed
Expand Down
106 changes: 73 additions & 33 deletions pylock.maximal.toml

Large diffs are not rendered by default.

222 changes: 154 additions & 68 deletions pylock.minimal.toml

Large diffs are not rendered by default.

243 changes: 161 additions & 82 deletions pylock.toml

Large diffs are not rendered by default.

17 changes: 13 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,28 @@ build-backend = "pdm.backend"

[project]
name = "ldlite"
version = "3.1.4"
version = "3.2.0"
description = "Lightweight analytics tool for FOLIO services"
authors = [
{name = "Katherine Bargar", email = "[email protected]"},
{name = "Nassib Nassar", email = "[email protected]"},
]
dependencies = [
"duckdb>=0.6.1",
"psycopg2>=2.9.5",
# 0.6.1 was the original pinned dependency version
# 1.4 slightly changes data formats and fails tests
"duckdb>=0.6.1,<1.4",
# 2.9.5 was the original pinned dependency version
"psycopg2-binary>=2.9.5",
# 4.64.1 was the original pinned dependency version
"tqdm>=4.64.1",
# 3.0.6 was the original pinned dependency version
"XlsxWriter>=3.0.6",
# 0.2 has query parameter handling, 0.2.3 is a required bug fix version
"httpx-folio>=0.2.3",
"orjson>=2.2.1",
# 3.9 introduces orjson.Fragment
"orjson>=3.9",
# 3.2 changes the sql.SQL.format signature
"psycopg>=3.2.0",
]
requires-python = ">=3.9"
readme = "README.md"
Expand Down
201 changes: 95 additions & 106 deletions src/ldlite/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,27 @@

import sqlite3
import sys
from itertools import count
from typing import TYPE_CHECKING, NoReturn, cast

import duckdb
import psycopg
import psycopg2
from httpx_folio.auth import FolioParams
from tqdm import tqdm

from ._csv import to_csv
from ._database import Prefix
from ._folio import FolioClient
from ._jsonx import Attr, drop_json_tables, transform_json
from ._jsonx import Attr, transform_json
from ._select import select
from ._sqlx import DBType, as_postgres, autocommit, encode_sql_str, json_type, sqlid
from ._sqlx import (
DBType,
DBTypeDatabase,
as_postgres,
autocommit,
sqlid,
)
from ._xlsx import to_xlsx

if TYPE_CHECKING:
Expand All @@ -73,6 +82,7 @@ def __init__(self) -> None:
self._quiet = False
self.dbtype: DBType = DBType.UNDEFINED
self.db: dbapi.DBAPIConnection | None = None
self._db: DBTypeDatabase | None = None
self._folio: FolioClient | None = None
self.page_size = 1000
self._okapi_timeout = 60
Expand Down Expand Up @@ -122,8 +132,13 @@ def _connect_db_duckdb(
self.dbtype = DBType.DUCKDB
fn = filename if filename is not None else ":memory:"
db = duckdb.connect(database=fn)
self.db = cast("dbapi.DBAPIConnection", db)
return db
self.db = cast("dbapi.DBAPIConnection", db.cursor())
self._db = DBTypeDatabase(
DBType.DUCKDB,
lambda: cast("dbapi.DBAPIConnection", db.cursor()),
)

return db.cursor()

def connect_db_postgresql(self, dsn: str) -> psycopg2.extensions.connection:
"""Connects to a PostgreSQL database for storing data.
Expand All @@ -132,15 +147,24 @@ def connect_db_postgresql(self, dsn: str) -> psycopg2.extensions.connection:
connection to the database which can be used to submit SQL queries.
The returned connection defaults to autocommit mode.

This will return a psycopg3 connection in the next major release of LDLite.

Example:
db = ld.connect_db_postgresql(dsn='dbname=ld host=localhost user=ldlite')

"""
self.dbtype = DBType.POSTGRES
db = psycopg2.connect(dsn)
db = psycopg.connect(dsn)
self.db = cast("dbapi.DBAPIConnection", db)
autocommit(self.db, self.dbtype, True)
return db
self._db = DBTypeDatabase(
DBType.POSTGRES,
lambda: cast("dbapi.DBAPIConnection", psycopg.connect(dsn)),
)

ret_db = psycopg2.connect(dsn)
ret_db.rollback()
ret_db.set_session(autocommit=True)
return ret_db

def experimental_connect_db_sqlite(
self,
Expand All @@ -163,9 +187,15 @@ def experimental_connect_db_sqlite(

"""
self.dbtype = DBType.SQLITE
fn = filename if filename is not None else ":memory:"
fn = filename if filename is not None else "file::memory:?cache=shared"
self.db = sqlite3.connect(fn)
autocommit(self.db, self.dbtype, True)
self._db = DBTypeDatabase(
DBType.SQLITE,
lambda: cast("dbapi.DBAPIConnection", sqlite3.connect(fn)),
)

db = sqlite3.connect(fn)
autocommit(db, self.dbtype, True)
return self.db

def _check_folio(self) -> None:
Expand Down Expand Up @@ -206,22 +236,16 @@ def drop_tables(self, table: str) -> None:
ld.drop_tables('g')

"""
if self.db is None:
if self.db is None or self._db is None:
self._check_db()
return
autocommit(self.db, self.dbtype, True)
schema_table = table.strip().split(".")
if len(schema_table) < 1 or len(schema_table) > 2:
if len(schema_table) != 1 and len(schema_table) != 2:
raise ValueError("invalid table name: " + table)
self._check_db()
cur = self.db.cursor()
try:
cur.execute("DROP TABLE IF EXISTS " + sqlid(table))
except (RuntimeError, psycopg2.Error):
pass
finally:
cur.close()
drop_json_tables(self.db, table)
if len(schema_table) == 2 and self.dbtype == DBType.SQLITE:
table = schema_table[0] + "_" + schema_table[1]
prefix = Prefix(table)
self._db.drop_prefix(prefix)

def set_folio_max_retries(self, max_retries: int) -> None:
"""Sets the maximum number of retries for FOLIO requests.
Expand Down Expand Up @@ -321,32 +345,15 @@ def query( # noqa: C901, PLR0912, PLR0913, PLR0915
if self._folio is None:
self._check_folio()
return []
if self.db is None:
if self.db is None or self._db is None:
self._check_db()
return []
if len(schema_table) == 2 and self.dbtype == DBType.SQLITE:
table = schema_table[0] + "_" + schema_table[1]
schema_table = [table]
prefix = Prefix(table)
if not self._quiet:
print("ldlite: querying: " + path, file=sys.stderr)
drop_json_tables(self.db, table)
autocommit(self.db, self.dbtype, False)
try:
cur = self.db.cursor()
try:
if len(schema_table) == 2:
cur.execute("CREATE SCHEMA IF NOT EXISTS " + sqlid(schema_table[0]))
cur.execute("DROP TABLE IF EXISTS " + sqlid(table))
cur.execute(
"CREATE TABLE "
+ sqlid(table)
+ "(__id integer, jsonb "
+ json_type(self.dbtype)
+ ")",
)
finally:
cur.close()
self.db.commit()
# First get total number of records
records = self._folio.iterate_records(
path,
Expand All @@ -355,70 +362,61 @@ def query( # noqa: C901, PLR0912, PLR0913, PLR0915
self.page_size,
query=cast("QueryType", query),
)

(total_records, _) = next(records)
total = total_records if total_records is not None else 0
total = min(total_records, limit or total_records)
if self._verbose:
print("ldlite: estimated row count: " + str(total), file=sys.stderr)
# Read result pages
pbar = None
pbartotal = 0

p_count = count(1)
processed = 0
pbar: tqdm | PbarNoop # type:ignore[type-arg]
if not self._quiet:
if total == -1:
pbar = tqdm(
desc="reading",
leave=False,
mininterval=3,
smoothing=0,
colour="#A9A9A9",
bar_format="{desc} {elapsed} {bar}{postfix}",
)
else:
pbar = tqdm(
desc="reading",
total=total,
leave=False,
mininterval=3,
smoothing=0,
colour="#A9A9A9",
bar_format="{desc} {bar}{postfix}",
)
cur = self.db.cursor()
try:
count = 0
for pkey, d in records:
cur.execute(
"INSERT INTO "
+ sqlid(table)
+ " VALUES("
+ str(pkey)
+ ","
+ encode_sql_str(self.dbtype, d)
+ ")",
)
count += 1
if pbar is not None:
if pbartotal + 1 > total:
pbartotal = total
pbar.update(total - pbartotal)
else:
pbartotal += 1
pbar.update(1)
if limit is not None and count == limit:
break
finally:
cur.close()
if pbar is not None:
pbar.close()
self.db.commit()
pbar = tqdm(
desc="reading",
total=total,
leave=False,
mininterval=3,
smoothing=0,
colour="#A9A9A9",
bar_format="{desc} {bar}{postfix}",
)
else:

class PbarNoop:
def update(self, _: int) -> None: ...
def close(self) -> None: ...

pbar = PbarNoop()

def on_processed() -> bool:
pbar.update(1)
nonlocal processed
processed = next(p_count)
return True

def on_processed_limit() -> bool:
pbar.update(1)
nonlocal processed, limit
processed = next(p_count)
return limit is None or processed < limit

self._db.ingest_records(
prefix,
on_processed_limit if limit is not None else on_processed,
records,
)
pbar.close()

self._db.drop_extracted_tables(prefix)
newtables = [table]
newattrs = {}
if json_depth > 0:
autocommit(self.db, self.dbtype, False)
jsontables, jsonattrs = transform_json(
self.db,
self.dbtype,
table,
count,
processed,
self._quiet,
json_depth,
)
Expand All @@ -429,12 +427,7 @@ def query( # noqa: C901, PLR0912, PLR0913, PLR0915
newattrs[table] = {"__id": Attr("__id", "bigint")}

if not keep_raw:
cur = self.db.cursor()
try:
cur.execute("DROP TABLE " + sqlid(table))
self.db.commit()
finally:
cur.close()
self._db.drop_raw_table(prefix)

finally:
autocommit(self.db, self.dbtype, True)
Expand All @@ -459,22 +452,18 @@ def query( # noqa: C901, PLR0912, PLR0913, PLR0915
colour="#A9A9A9",
bar_format="{desc} {bar}{postfix}",
)
pbartotal = 0
for t, attr in indexable_attrs:
cur = self.db.cursor()
try:
cur.execute(
"CREATE INDEX ON " + sqlid(t) + " (" + sqlid(attr.name) + ")",
)
except (RuntimeError, psycopg2.Error):
except (RuntimeError, psycopg.Error):
pass
finally:
cur.close()
if pbar is not None:
pbartotal += 1
pbar.update(1)
if pbar is not None:
pbar.close()
pbar.update(1)
pbar.close()
# Return table names
if not self._quiet:
print("ldlite: created tables: " + ", ".join(newtables), file=sys.stderr)
Expand Down
Loading
Loading