Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ jobs:
run: |
uv sync --locked --no-install-package=django
uv pip install "${{ matrix.django }}"
- name: Run tests
- name: Run tests on PostgreSQL
env:
DB_SETTINGS: >-
{
Expand All @@ -86,5 +86,8 @@ jobs:
}
run: .venv/bin/pytest -v
continue-on-error: ${{ matrix.python == env.allowed_python_failure }}
- name: Run tests on SQLite
run: .venv/bin/pytest -v
continue-on-error: ${{ matrix.python == env.allowed_python_failure }}
- name: Check style
run: .venv/bin/ruff check
5 changes: 3 additions & 2 deletions django_cte/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .cte import CTEManager, CTEQuerySet, With # noqa
from .cte import CTE, with_cte, CTEManager, CTEQuerySet, With # noqa

__version__ = "1.3.3"
__version__ = "2.0.0"
__all__ = ["CTE", "with_cte"]
138 changes: 138 additions & 0 deletions django_cte/_deprecated.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
try:
from warnings import deprecated
except ImportError:
from warnings import warn

# Copied from Python 3.13, lightly modified for Python 3.9 compatibility.
# Can be removed when the oldest supported Python version is 3.13.
class deprecated:
"""Indicate that a class, function or overload is deprecated.

When this decorator is applied to an object, the type checker
will generate a diagnostic on usage of the deprecated object.

Usage:

@deprecated("Use B instead")
class A:
pass

@deprecated("Use g instead")
def f():
pass

@overload
@deprecated("int support is deprecated")
def g(x: int) -> int: ...
@overload
def g(x: str) -> int: ...

The warning specified by *category* will be emitted at runtime
on use of deprecated objects. For functions, that happens on calls;
for classes, on instantiation and on creation of subclasses.
If the *category* is ``None``, no warning is emitted at runtime.
The *stacklevel* determines where the
warning is emitted. If it is ``1`` (the default), the warning
is emitted at the direct caller of the deprecated object; if it
is higher, it is emitted further up the stack.
Static type checker behavior is not affected by the *category*
and *stacklevel* arguments.

The deprecation message passed to the decorator is saved in the
``__deprecated__`` attribute on the decorated object.
If applied to an overload, the decorator
must be after the ``@overload`` decorator for the attribute to
exist on the overload as returned by ``get_overloads()``.

See PEP 702 for details.

"""
def __init__(
self,
message: str,
/,
*,
category=DeprecationWarning,
stacklevel=1,
):
if not isinstance(message, str):
raise TypeError(
f"Expected an object of type str for 'message', not {type(message).__name__!r}"
)
self.message = message
self.category = category
self.stacklevel = stacklevel

def __call__(self, arg, /):
# Make sure the inner functions created below don't
# retain a reference to self.
msg = self.message
category = self.category
stacklevel = self.stacklevel
if category is None:
arg.__deprecated__ = msg
return arg
elif isinstance(arg, type):
import functools
from types import MethodType

original_new = arg.__new__

@functools.wraps(original_new)
def __new__(cls, /, *args, **kwargs):
if cls is arg:
warn(msg, category=category, stacklevel=stacklevel + 1)
if original_new is not object.__new__:
return original_new(cls, *args, **kwargs)
# Mirrors a similar check in object.__new__.
elif cls.__init__ is object.__init__ and (args or kwargs):
raise TypeError(f"{cls.__name__}() takes no arguments")
else:
return original_new(cls)

arg.__new__ = staticmethod(__new__)

original_init_subclass = arg.__init_subclass__
# We need slightly different behavior if __init_subclass__
# is a bound method (likely if it was implemented in Python)
if isinstance(original_init_subclass, MethodType):
original_init_subclass = original_init_subclass.__func__

@functools.wraps(original_init_subclass)
def __init_subclass__(*args, **kwargs):
warn(msg, category=category, stacklevel=stacklevel + 1)
return original_init_subclass(*args, **kwargs)

arg.__init_subclass__ = classmethod(__init_subclass__)
# Or otherwise, which likely means it's a builtin such as
# object's implementation of __init_subclass__.
else:
@functools.wraps(original_init_subclass)
def __init_subclass__(*args, **kwargs):
warn(msg, category=category, stacklevel=stacklevel + 1)
return original_init_subclass(*args, **kwargs)

arg.__init_subclass__ = __init_subclass__

arg.__deprecated__ = __new__.__deprecated__ = msg
__init_subclass__.__deprecated__ = msg
return arg
elif callable(arg):
import functools
import inspect

@functools.wraps(arg)
def wrapper(*args, **kwargs):
warn(msg, category=category, stacklevel=stacklevel + 1)
return arg(*args, **kwargs)

if inspect.iscoroutinefunction(arg):
wrapper = inspect.markcoroutinefunction(wrapper)

arg.__deprecated__ = wrapper.__deprecated__ = msg
return wrapper
else:
raise TypeError(
"@deprecated decorator with non-None category must be applied to "
f"a class or callable, not {arg!r}"
)
105 changes: 56 additions & 49 deletions django_cte/cte.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,38 @@
from django.db.models import Manager
from copy import copy

from django.db.models import Manager, sql
from django.db.models.expressions import Ref
from django.db.models.query import Q, QuerySet, ValuesIterable
from django.db.models.sql.datastructures import BaseTable

from .jitmixin import jit_mixin
from .join import QJoin, INNER
from .meta import CTEColumnRef, CTEColumns
from .query import CTEQuery
from ._deprecated import deprecated

__all__ = ["CTE", "with_cte"]


def with_cte(*ctes, select):
"""Add Common Table Expression(s) (CTEs) to a model or queryset

__all__ = ["With", "CTEManager", "CTEQuerySet"]
:param *ctes: One or more CTE objects.
:param select: A model class, queryset, or CTE to use as the base
query to which CTEs are attached.
:returns: A queryset with the given CTE added to it.
"""
if isinstance(select, CTE):
select = select.queryset()
elif not isinstance(select, QuerySet):
select = select._default_manager.all()
jit_mixin(select.query, CTEQuery)
select.query._with_ctes += ctes
return select


class With(object):
"""Common Table Expression query object: `WITH ...`
class CTE:
"""Common Table Expression

:param queryset: A queryset to use as the body of the CTE.
:param name: Optional name parameter for the CTE (default: "cte").
Expand Down Expand Up @@ -41,7 +62,7 @@ def __repr__(self):

@classmethod
def recursive(cls, make_cte_queryset, name="cte", materialized=False):
"""Recursive Common Table Expression: `WITH RECURSIVE ...`
"""Recursive Common Table Expression

:param make_cte_queryset: Function taking a single argument (a
not-yet-fully-constructed cte object) and returning a `QuerySet`
Expand All @@ -58,10 +79,11 @@ def recursive(cls, make_cte_queryset, name="cte", materialized=False):
def join(self, model_or_queryset, *filter_q, **filter_kw):
"""Join this CTE to the given model or queryset

This CTE will be refernced by the returned queryset, but the
This CTE will be referenced by the returned queryset, but the

corresponding `WITH ...` statement will not be prepended to the
queryset's SQL output; use `<CTEQuerySet>.with_cte(cte)` to
achieve that outcome.
queryset's SQL output; use `with_cte(cte, select=cte.join(...))`
to achieve that outcome.

:param model_or_queryset: Model class or queryset to which the
CTE should be joined.
Expand Down Expand Up @@ -96,15 +118,15 @@ def queryset(self):

This CTE will be referenced by the returned queryset, but the
corresponding `WITH ...` statement will not be prepended to the
queryset's SQL output; use `<CTEQuerySet>.with_cte(cte)` to
achieve that outcome.
queryset's SQL output; use `with_cte(cte, select=cte)` to do
that.

:returns: A queryset.
"""
cte_query = self.query
qs = cte_query.model._default_manager.get_queryset()

query = CTEQuery(cte_query.model)
query = jit_mixin(sql.Query(cte_query.model), CTEQuery)
query.join(BaseTable(self.name, None))
query.default_cols = cte_query.default_cols
query.deferred_loading = cte_query.deferred_loading
Expand All @@ -130,26 +152,38 @@ def _resolve_ref(self, name):
return Ref(name, self.query.resolve_ref(name))
return self.query.resolve_ref(name)

def resolve_expression(self, *args, **kw):
if self.query is None:
raise ValueError("Cannot resolve recursive CTE without a query.")
clone = copy(self)
clone.query = clone.query.resolve_expression(*args, **kw)
return clone


@deprecated("Use `django_cte.CTE` instead.")
class With(CTE):

@staticmethod
@deprecated("Use `django_cte.CTE.recursive` instead.")
def recursive(*args, **kw):
return CTE.recursive(*args, **kw)


@deprecated("CTEQuerySet is deprecated. "
"CTEs can now be applied to any queryset using `with_cte()`")
class CTEQuerySet(QuerySet):
"""QuerySet with support for Common Table Expressions"""

def __init__(self, model=None, query=None, using=None, hints=None):
# Only create an instance of a Query if this is the first invocation in
# a query chain.
if query is None:
query = CTEQuery(model)
super(CTEQuerySet, self).__init__(model, query, using, hints)
jit_mixin(self.query, CTEQuery)

@deprecated("Use `django_cte.with_cte(cte, select=...)` instead.")
def with_cte(self, cte):
"""Add a Common Table Expression to this queryset

The CTE `WITH ...` clause will be added to the queryset's SQL
output (after other CTEs that have already been added) so it
can be referenced in annotations, filters, etc.
"""
qs = self._clone()
qs.query._with_ctes.append(cte)
qs.query._with_ctes += cte,
return qs

def as_manager(cls):
Expand All @@ -161,36 +195,9 @@ def as_manager(cls):
as_manager.queryset_only = True
as_manager = classmethod(as_manager)

def _combinator_query(self, *args, **kw):
clone = super()._combinator_query(*args, **kw)
if clone.query.combinator:
ctes = clone.query._with_ctes = []
seen = {}
for query in clone.query.combined_queries:
for cte in getattr(query, "_with_ctes", []):
if seen.get(cte.name) is cte:
continue
if cte.name in seen:
raise ValueError(
f"Found two or more CTEs named '{cte.name}'. "
"Hint: assign a unique name to each CTE."
)
ctes.append(cte)
seen[cte.name] = cte
if ctes:
def without_ctes(query):
if getattr(query, "_with_ctes", None):
query = query.clone()
query._with_ctes = []
return query

clone.query.combined_queries = [
without_ctes(query)
for query in clone.query.combined_queries
]
return clone


@deprecated("CTEMAnager is deprecated. "
"CTEs can now be applied to any queryset using `with_cte()`")
class CTEManager(Manager.from_queryset(CTEQuerySet)):
"""Manager for models that perform CTE queries"""

Expand Down
45 changes: 0 additions & 45 deletions django_cte/expressions.py

This file was deleted.

Loading
Loading