From 1b13339bb93baa608f0462e54eaf899ad832deff Mon Sep 17 00:00:00 2001 From: Alejandro Giacometti Date: Tue, 4 Nov 2025 15:20:56 +0000 Subject: [PATCH 1/6] Adds CursorPaginator --- ninja/conf.py | 4 + ninja/pagination.py | 391 +++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 393 insertions(+), 2 deletions(-) diff --git a/ninja/conf.py b/ninja/conf.py index 1584a5ce0..fd0d423cc 100644 --- a/ninja/conf.py +++ b/ninja/conf.py @@ -11,6 +11,10 @@ class Settings(BaseModel): PAGINATION_CLASS: str = Field( "ninja.pagination.LimitOffsetPagination", alias="NINJA_PAGINATION_CLASS" ) + PAGINATION_DEFAULT_ORDERING: tuple[str, ...] = Field( + ("-created",), alias="NINJA_PAGINATION_DEFAULT_ORDERING" + ) + PAGINATION_MAX_OFFSET: int = Field(100, alias="NINJA_PAGINATION_MAX_OFFSET") PAGINATION_PER_PAGE: int = Field(100, alias="NINJA_PAGINATION_PER_PAGE") PAGINATION_MAX_PER_PAGE_SIZE: int = Field(100, alias="NINJA_MAX_PER_PAGE_SIZE") PAGINATION_MAX_LIMIT: int = Field(inf, alias="NINJA_PAGINATION_MAX_LIMIT") # type: ignore diff --git a/ninja/pagination.py b/ninja/pagination.py index 258a0937d..2a311717d 100644 --- a/ninja/pagination.py +++ b/ninja/pagination.py @@ -1,18 +1,32 @@ +import binascii import inspect from abc import ABC, abstractmethod +from base64 import b64decode, b64encode from functools import partial, wraps from math import inf -from typing import Any, AsyncGenerator, Callable, List, Optional, Tuple, Type, Union +from typing import ( + Annotated, + Any, + AsyncGenerator, + Callable, + List, + Optional, + Tuple, + Type, + Union, +) +from urllib import parse from django.db.models import QuerySet from django.http import HttpRequest from django.utils.module_loading import import_string +from pydantic import BaseModel, field_validator from typing_extensions import get_args as get_collection_args from ninja import Field, Query, Router, Schema from ninja.conf import settings from ninja.constants import NOT_SET -from ninja.errors import ConfigError +from ninja.errors import ConfigError, ValidationError from ninja.operation import Operation from ninja.signature.details import is_collection_type from ninja.utils import ( @@ -179,6 +193,379 @@ async def apaginate_queryset( } # noqa: E203 +class CursorPagination(AsyncPaginationBase): + max_page_size: int + page_size: int + + items_attribute: str = "results" + + def __init__( + self, + *, + ordering: tuple[str, ...] = settings.PAGINATION_DEFAULT_ORDERING, + page_size: int = settings.PAGINATION_PER_PAGE, + max_page_size: int = settings.PAGINATION_MAX_PER_PAGE_SIZE, + **kwargs: Any, + ) -> None: + self.ordering = ordering + # take the first ordering parameter as the attribute for establishing + # position + self._order_attribute = ( + ordering[0][1:] if ordering[0].startswith("-") else ordering[0] + ) + self._order_attribute_reversed = ordering[0].startswith("-") + + self.page_size = page_size + self.max_page_size = max_page_size + + super().__init__(**kwargs) + + class Input(Schema): + page_size: int | None = None + cursor: str | None = None + + class Output(Schema): + previous: str | None + next: str | None + results: list[Any] + + class Cursor(BaseModel): + """ + Represents pagination state. + + This is encoded in a base64 query parameter. + + """ + + p: Annotated[ + str | None, + Field( + title="position", + description="String identifier for the current position in the dataset", + ), + ] = None + r: Annotated[ + bool, + Field( + title="reverse", description="Whether to reverse the ordering direction" + ), + ] = False + # offset enables the use of a non-unique ordering field + # e.g. if created time of two items is exactly the same, we can use the offset + # to figure out the position exactly + o: Annotated[ + int, + Field( + ge=0, + lt=settings.PAGINATION_MAX_OFFSET, + title="offset", + description="Number of items to skip from the current position", + ), + ] = 0 + + @field_validator("*", mode="before") + @classmethod + def validate_individual_queryparam(cls, value: Any) -> Any: + """ + Handle query string parsing quirks where single values become lists. + + URL parsing libraries wrap single query parameters in lists, we only + care about a single value + """ + if isinstance(value, list): + return value[0] + return value + + @classmethod + def from_encoded_param( + cls, encoded_param: str | None, context: Any = None + ) -> "CursorPagination.Cursor": + """ + Deserialize cursor from URL-safe base64 token. + """ + if not encoded_param: + return cls() + try: + decoded = b64decode( + encoded_param.encode("ascii"), validate=True + ).decode("ascii") + except (ValueError, binascii.Error) as e: + raise ValidationError([{"cursor": "Invalid Cursor"}]) from e + + parsed_querystring = parse.parse_qs(decoded, keep_blank_values=True) + return cls.model_validate(parsed_querystring, context=context) + + def encode_as_param(self) -> str: + """ + Serialize cursor to URL-safe base64 token. + """ + data = self.model_dump( + exclude_defaults=True, exclude_none=True, exclude_unset=True + ) + query_string = parse.urlencode(data, doseq=True) + return b64encode(query_string.encode("ascii")).decode("ascii") + + @staticmethod + def _reverse_order(order: tuple[str, ...]) -> tuple[str, ...]: + """ + Flip ordering direction for backward pagination. + + Example: + ("-created", "id") becomes ("created", "-id") + ("name", "-updated") becomes ("-name", "updated") + """ + return tuple( + marker[1:] if marker.startswith("-") else f"-{marker}" for marker in order + ) + + def _get_position(self, item: Any) -> str: + """ + Extract the string representation of the attribute value used for ordering, + which serves as the position identifier. + + """ + return str(getattr(item, self._order_attribute)) + + def _get_page_size(self, requested_page_size: int | None) -> int: + """ + Determine the actual page size to use, respecting configured limits. + + Uses the default page size when no specific size is requested, otherwise + clamps the requested size within the allowed range to prevent resource + exhaustion attacks. + """ + if requested_page_size is None: + return self.page_size + return min(self.max_page_size, max(1, requested_page_size)) + + def _build_next_cursor( + self, + current_cursor: Cursor, + results: list[Any], + additional_position: str | None = None, + ) -> Cursor | None: + """ + Build cursor for next page + """ + if (additional_position is None and not current_cursor.r) or not results: + return None + + if not current_cursor.r: + # next position is provided by the additional position in a forward cursor + next_position = additional_position + else: + # default to the last item + # this will result in this item being included in the next set of results + # when flipping from a reversed cursor query to a forward cursor query + next_position = self._get_position(results[-1]) + + offset = 0 + + if current_cursor.p == next_position and not current_cursor.r: + offset += current_cursor.o + len(results) + else: + # Count duplicates at page end to find the offset + for item in reversed(results): + item_position_value = self._get_position(item) + if item_position_value != next_position: + break + offset += 1 + + return self.Cursor(o=offset, r=False, p=next_position) + + def _build_previous_cursor( + self, + current_cursor: Cursor, + results: list[Any], + additional_position: str | None = None, + ) -> Cursor | None: + """ + Build cursor for previous page + """ + if ( + current_cursor.r and additional_position is None + ) or current_cursor.p is None: + return None + + if not results: + # End of dataset - create reverse cursor to go backward + return self.Cursor(o=0, r=True, p=current_cursor.p) + + if current_cursor.r: + # previous position is provided by the additional position in a + # reversed cursor + previous_position = additional_position + + else: + # default to the first item + # this will result in this item being included in the previous set of + # results when flipping from a forward cursor query to a reversed + # cursor query + previous_position = self._get_position(results[0]) + + offset = 0 + + if current_cursor.p == previous_position and current_cursor.r: + offset += current_cursor.o + len(results) + else: + # Count duplicates at page end to find the offset + for item in results: + item_position_value = self._get_position(item) + if item_position_value != previous_position: + break + offset += 1 + + return self.Cursor(o=offset, r=True, p=previous_position) + + @staticmethod + def _add_cursor_to_URL(url: str, cursor: Cursor | None) -> str | None: + """ + Build pagination URLs with an encoded cursor. + + Ignore any previous cursors but preserve any other query parameters + + Example: + Given URL "https://api.example.com/pages?tag=hiring" and a cursor + with position "2024-01-01T10:00:00Z", returns: + "https://api.example.com/pages?cursor=cD0yMDI0LTAxLTAxVDEwJTNBMDA%3D&tag=hiring" + """ + + if cursor is None: + return None + (scheme, netloc, path, query, fragment) = parse.urlsplit(url) + query_dict = parse.parse_qs(query, keep_blank_values=True) + query_dict["cursor"] = [cursor.encode_as_param()] + query = parse.urlencode(sorted(query_dict.items()), doseq=True) + return parse.urlunsplit((scheme, netloc, path, query, fragment)) + + def _order_queryset(self, queryset: QuerySet, cursor: Cursor) -> QuerySet: + """ + Apply ordering to queryset based on cursor direction. + + For backward pagination (cursor.r=True), flips the ordering direction + to traverse the dataset in reverse. + """ + if cursor.r: + return queryset.order_by(*self._reverse_order(self.ordering)) + + return queryset.order_by(*self.ordering) + + def _find_position(self, queryset: QuerySet, cursor: Cursor) -> QuerySet: + """ + Filter queryset to start from the cursor position. + """ + if cursor.p is None: + return queryset + + cmp = "gte" if cursor.r == self._order_attribute_reversed else "lte" + filters = {f"{self._order_attribute}__{cmp}": cursor.p} + return queryset.filter(**filters) + + def paginate_queryset( + self, queryset: QuerySet, pagination: Input, request: HttpRequest, **params: Any + ) -> Any: + """ + Execute cursor-based pagination with stable positioning. + + We fetch page_size + 1 items to detect whether more pages exist without + requiring a separate count query. The extra item is discarded from results + but used for next/previous cursor generation. + """ + page_size = self._get_page_size(pagination.page_size) + cursor = self.Cursor.from_encoded_param(pagination.cursor) + + queryset = self._order_queryset(queryset, cursor) + queryset = self._find_position(queryset, cursor) + + # fetch results here and turn into a list + results_plus_one = list(queryset[cursor.o : cursor.o + page_size + 1]) + additional_position = ( + self._get_position(results_plus_one[-1]) + if len(results_plus_one) > page_size + else None + ) + + if cursor.r: + results = list(reversed(results_plus_one[:page_size])) + else: + results = results_plus_one[:page_size] + + next_cursor = self._build_next_cursor( + current_cursor=cursor, + results=results, + additional_position=additional_position, + ) + + previous_cursor = self._build_previous_cursor( + current_cursor=cursor, + results=results, + additional_position=additional_position, + ) + + base_url = request.build_absolute_uri() + + return { + "next": self._add_cursor_to_URL(base_url, next_cursor), + "previous": self._add_cursor_to_URL(base_url, previous_cursor), + self.items_attribute: results, + } + + async def apaginate_queryset( + self, + queryset: QuerySet, + pagination: Input, + request: HttpRequest, + **params: Any, + ) -> Any: + """ + Execute async cursor-based pagination with stable positioning. + + We fetch page_size + 1 items to detect whether more pages exist without + requiring a separate count query. The extra item is discarded from results + but used for next/previous cursor generation. + """ + page_size = self._get_page_size(pagination.page_size) + cursor = self.Cursor.from_encoded_param(pagination.cursor) + + queryset = self._order_queryset(queryset, cursor) + queryset = self._find_position(queryset, cursor) + + # fetch results here and turn into a list + results_plus_one = [ + obj async for obj in queryset[cursor.o : cursor.o + page_size + 1] + ] + additional_position = ( + self._get_position(results_plus_one[-1]) + if len(results_plus_one) > page_size + else None + ) + + if cursor.r: + results = list(reversed(results_plus_one[:page_size])) + else: + results = results_plus_one[:page_size] + + next_cursor = self._build_next_cursor( + current_cursor=cursor, + results=results, + additional_position=additional_position, + ) + + previous_cursor = self._build_previous_cursor( + current_cursor=cursor, + results=results, + additional_position=additional_position, + ) + + base_url = request.build_absolute_uri() + + return { + "next": self._add_cursor_to_URL(base_url, next_cursor), + "previous": self._add_cursor_to_URL(base_url, previous_cursor), + self.items_attribute: results, + } + + def paginate( func_or_pgn_class: Any = NOT_SET, **paginator_params: Any ) -> Callable[..., Any]: From db691ae48fb5cbc48225fe86a701ac1ae6b33bed Mon Sep 17 00:00:00 2001 From: Alejandro Giacometti Date: Tue, 4 Nov 2025 15:21:13 +0000 Subject: [PATCH 2/6] Adds tests for CursorPaginator --- tests/test_pagination_cursor.py | 931 ++++++++++++++++++++++++++++++++ 1 file changed, 931 insertions(+) create mode 100644 tests/test_pagination_cursor.py diff --git a/tests/test_pagination_cursor.py b/tests/test_pagination_cursor.py new file mode 100644 index 000000000..c6bde2fce --- /dev/null +++ b/tests/test_pagination_cursor.py @@ -0,0 +1,931 @@ +from datetime import date, timedelta +from http import HTTPStatus +from typing import List + +import pytest +from django.test import override_settings +from someapp.api import EventSchema # pyright: ignore[reportMissingImports] +from someapp.models import Category, Event # pyright: ignore[reportMissingImports] + +from ninja import NinjaAPI +from ninja.pagination import CursorPagination, paginate +from ninja.testing import TestAsyncClient, TestClient + +api = NinjaAPI() + + +@api.get("/cursor_events", response=List[EventSchema]) +@paginate(CursorPagination, ordering=("start_date",), page_size=10) +def cursor_events(request, **kwargs): + return Event.objects.all() + + +@api.get("/cursor_events_reverse", response=List[EventSchema]) +@paginate(CursorPagination, ordering=("-start_date",), page_size=10) +def cursor_events_reverse(request, **kwargs): + return Event.objects.all() + + +@api.get("/cursor_events_end_date_offset", response=List[EventSchema]) +@paginate(CursorPagination, ordering=("end_date", "start_date"), page_size=2) +def cursor_events_end_date_offset(request, **kwargs): + return Event.objects.all() + + +@api.get("/cursor_events_default_size", response=List[EventSchema]) +@paginate(CursorPagination, ordering=("start_date",)) +def cursor_events_default_size(request, **kwargs): + return Event.objects.all() + + +@api.get("/cursor_events_with_params", response=List[EventSchema]) +@paginate(CursorPagination, ordering=("start_date",), page_size=10) +def cursor_events_with_params(request, title_filter: str = "", **kwargs): + # API method with a filter in query parameter + if title_filter: + return Event.objects.filter(title__icontains=title_filter) + return Event.objects.all() + + +# Async versions of all endpoints +@api.get("/async_cursor_events", response=List[EventSchema]) +@paginate(CursorPagination, ordering=("start_date",), page_size=10) +async def async_cursor_events(request, **kwargs): + return Event.objects.all() + + +@api.get("/async_cursor_events_reverse", response=List[EventSchema]) +@paginate(CursorPagination, ordering=("-start_date",), page_size=10) +async def async_cursor_events_reverse(request, **kwargs): + return Event.objects.all() + + +@api.get("/async_cursor_events_end_date_offset", response=List[EventSchema]) +@paginate(CursorPagination, ordering=("end_date", "start_date"), page_size=2) +async def async_cursor_events_end_date_offset(request, **kwargs): + return Event.objects.all() + + +@api.get("/async_cursor_events_default_size", response=List[EventSchema]) +@paginate(CursorPagination, ordering=("start_date",)) +async def async_cursor_events_default_size(request, **kwargs): + return Event.objects.all() + + +@api.get("/async_cursor_events_with_params", response=List[EventSchema]) +@paginate(CursorPagination, ordering=("start_date",), page_size=10) +async def async_cursor_events_with_params(request, title_filter: str = "", **kwargs): + # API method with a filter in query parameter + if title_filter: + return Event.objects.filter(title__icontains=title_filter) + return Event.objects.all() + + +client = TestClient(api) +async_client = TestAsyncClient(api) + + +@pytest.fixture(autouse=True) +def clean_db(transactional_db): + """Clean up categories and events before and after each test.""" + Category.objects.all().delete() + Event.objects.all().delete() + yield + Category.objects.all().delete() + Event.objects.all().delete() + + +@pytest.fixture() +def start_date(): + return date(2023, 1, 1) + + +@pytest.fixture(autouse=True) +def events(transactional_db, start_date): + """Create a number of test events.""" + + events = [] + for i in range(1, 11): + event = Event( + title=f"Event {i}", # 1-10 + start_date=start_date + timedelta(days=i), # sequential start dates + end_date=start_date + + timedelta( + days=((i - 1) // 3) + 1 + ), # end dates 1, 1, 1, 2, 2, 2, 3, 3, 3, 4 + ) + events.append(event) + + return Event.objects.bulk_create(events) + + +@pytest.fixture(autouse=True) +def special_events(transactional_db, start_date, events): + """Create a number of special events occurring after the last ones""" + n_events = len(events) + special_events = [] + for i in range(n_events + 1, n_events + 4): + event = Event( + title=f"Special Event {i}", # 11-13 + start_date=start_date + timedelta(days=i), # sequential start dates + end_date=start_date + + timedelta(days=((i - 1) // 3) + 1), # end dates 4, 4, 5 + ) + special_events.append(event) + + return Event.objects.bulk_create(special_events) + + +def test_cursor_pagination_first_page(): + """Test first page of cursor pagination.""" + + response = client.get("/cursor_events?page_size=5").json() + + assert len(response["results"]) == 5 + assert response["results"][0]["title"] == "Event 1" + assert response["results"][-1]["title"] == "Event 5" + assert response["next"] is not None + assert response["previous"] is None + + +def test_cursor_pagination_with_cursor(): + """Test navigation using cursor.""" + + # Get first page to obtain next cursor + first_response = client.get("/cursor_events").json() + + assert len(first_response["results"]) == 10 + assert first_response["next"] is not None + + if first_response["next"]: + # we can't rely on `next` being a well-formed URL, because the testclient + # httprequest mock does not pass the path, so we extract the cursor instead + next_cursor = first_response["next"].split("cursor=")[1].split("&")[0] + + # Use cursor to get next page + response = client.get(f"/cursor_events?cursor={next_cursor}").json() + + assert len(response["results"]) == 3 # Remaining 3 events + assert response["next"] is None + assert response["previous"] is not None + + +def test_cursor_pagination_reverse_ordering(): + """Test cursor pagination with reverse ordering.""" + + response = client.get("/cursor_events_reverse").json() + + assert len(response["results"]) == 10 + # With reverse ordering, should start from the end + assert response["results"][0]["title"] == "Special Event 13" + assert response["next"] is not None + assert response["previous"] is None + + +def test_cursor_pagination_end_date_offset(): + """Test cursor pagination handles duplicate end_date values""" + + response = client.get("/cursor_events_end_date_offset").json() + + # Should handle events with same end_date by using id as secondary ordering + assert len(response["results"]) == 2 + + # Verify ordering is consistent across pages + all_results = [] + current_response = response + + while True: + all_results.extend(current_response["results"]) + if not current_response["next"]: + break + next_cursor = current_response["next"].split("cursor=")[1].split("&")[0] + current_response = client.get( + f"/cursor_events_end_date_offset?cursor={next_cursor}" + ).json() + + # Should have all events and maintain consistent ordering + assert len(all_results) == 13 + # Verify no duplicates + titles = [result["title"] for result in all_results] + assert len(titles) == len(set(titles)) + + # Verify results are sorted by start_dates, then by id (implicit in creation order) + start_dates = [result["start_date"] for result in all_results] + assert start_dates == sorted(start_dates), "Results should be sorted by end_date" + + +def test_cursor_pagination_end_date_offset_backwards(): + """Test cursor pagination handles duplicate end_date values iterating backwards""" + + # Start from the last page by getting all pages first + response = client.get("/cursor_events_end_date_offset").json() + + # Navigate to the last page + current_response = response + while current_response["next"]: + next_cursor = current_response["next"].split("cursor=")[1].split("&")[0] + current_response = client.get( + f"/cursor_events_end_date_offset?cursor={next_cursor}" + ).json() + + # Now iterate backwards using previous links + all_results_backwards = [] + while True: + # Insert at beginning to maintain reverse order + all_results_backwards = current_response["results"] + all_results_backwards + if not current_response["previous"]: + break + prev_cursor = current_response["previous"].split("cursor=")[1].split("&")[0] + current_response = client.get( + f"/cursor_events_end_date_offset?cursor={prev_cursor}" + ).json() + + # Should have all events and maintain consistent ordering + assert len(all_results_backwards) == 13 + # Verify no duplicates + titles = [result["title"] for result in all_results_backwards] + assert len(titles) == len(set(titles)) + + # Verify results are sorted by start_date when iterating backwards + start_dates = [result["start_date"] for result in all_results_backwards] + assert start_dates == sorted( + start_dates + ), "Results should be sorted by start_date when iterating backwards" + + +def test_cursor_pagination_default_page_size(): + """Test cursor pagination with default page size.""" + # Create test events + + response = client.get("/cursor_events_default_size").json() + + # Should use default page size which is 1000 + assert len(response["results"]) == 13 + assert response["results"][0]["title"] == "Event 1" + assert response["results"][-1]["title"] == "Special Event 13" + assert response["next"] is None + assert response["previous"] is None + + +def test_cursor_pagination_custom_page_size_override(): + """Test overriding page size in request.""" + # Create test events + + response = client.get("/cursor_events_default_size?page_size=3").json() + + assert len(response["results"]) == 3 + assert response["results"][0]["title"] == "Event 1" + assert response["results"][-1]["title"] == "Event 3" + assert response["next"] is not None + assert response["previous"] is None + + +def test_cursor_pagination_with_custom_params(): + """Test cursor pagination with additional query parameters.""" + + response = client.get( + "/cursor_events_with_params?title_filter=Special&page_size=5" + ).json() + + # Should filter to events with "Special" in title + assert len(response["results"]) == 3 + assert all("Special" in item["title"] for item in response["results"]) + + +def test_cursor_pagination_invalid_cursor(): + """Test handling of invalid cursor values.""" + + response = client.get("/cursor_events?cursor=invalid&page_size=3") + + assert response.status_code == HTTPStatus.UNPROCESSABLE_ENTITY + + +def test_cursor_pagination_empty_cursor(): + """Test handling of empty cursor.""" + + response = client.get("/cursor_events?cursor=&page_size=3").json() + + # Should default to first page + assert len(response["results"]) == 3 + assert response["results"][0]["title"] == "Event 1" + + +def test_cursor_pagination_no_page_size(): + """Test cursor pagination without specifying page_size.""" + + response = client.get("/cursor_events").json() + + # Should use default page size from settings + assert len(response["results"]) >= 1 + assert "next" in response + assert "previous" in response + assert "results" in response + + +def test_cursor_pagination_openapi_schema(): + """Test that cursor pagination generates correct OpenAPI schema.""" + schema = api.get_openapi_schema()["paths"]["/api/cursor_events"]["get"] + + parameters = {param["name"]: param for param in schema["parameters"]} + + # Check page_size parameter + assert "page_size" in parameters + page_size_param = parameters["page_size"] + assert page_size_param["in"] == "query" + assert page_size_param["required"] is False + + # Check cursor parameter + assert "cursor" in parameters + cursor_param = parameters["cursor"] + assert cursor_param["in"] == "query" + assert cursor_param["required"] is False + assert {"type": "string"} in cursor_param["schema"]["anyOf"] + assert {"type": "null"} in cursor_param["schema"]["anyOf"] + + +def test_cursor_pagination_response_schema(): + """Test that cursor pagination generates correct response schema.""" + schema = api.get_openapi_schema()["paths"]["/api/cursor_events"]["get"] + response_schema = schema["responses"][HTTPStatus.OK]["content"]["application/json"][ + "schema" + ] + + # Should have cursor pagination structure (may be a $ref) + if "$ref" in response_schema: + # Extract the schema name and check it exists in components + ref_path = response_schema["$ref"] + assert ref_path.startswith("#/components/schemas/") + schema_name = ref_path.split("/")[-1] + components_schema = api.get_openapi_schema()["components"]["schemas"][ + schema_name + ] + + # Check the referenced schema has the right properties + assert "properties" in components_schema + properties = components_schema["properties"] + else: + # Direct inline schema + assert "properties" in response_schema + properties = response_schema["properties"] + + assert "results" in properties + assert "next" in properties + assert "previous" in properties + + # Results should be array of items + assert properties["results"]["type"] == "array" + + +def test_cursor_pagination_large_page_size(): + """Test edge cases for cursor pagination.""" + + # Very large page_size + response = client.get("/cursor_events?page_size=1000").json() + # all available items (10 events + 3 special events) + assert len(response["results"]) == 13 + assert response["next"] is None + + +def test_cursor_pagination_page_size_of_one(): + # Page size of 1 + response = client.get("/cursor_events?page_size=1").json() + assert len(response["results"]) == 1 + assert response["results"][0]["title"] == "Event 1" + assert response["next"] is not None + + # Request all pages and check length and order + all_results = [] + current_response = response + + while True: + all_results.extend(current_response["results"]) + if not current_response["next"]: + break + next_cursor = current_response["next"].split("cursor=")[1].split("&")[0] + current_response = client.get( + f"/cursor_events?cursor={next_cursor}&page_size=1" + ).json() + + # Check total length + assert len(all_results) == 13 # 10 events + 3 special events + + # Check order - should be sorted by start_date + titles = [result["title"] for result in all_results] + expected_titles = [f"Event {i}" for i in range(1, 11)] + [ + f"Special Event {i}" for i in range(11, 14) + ] + assert titles == expected_titles + + +def test_cursor_pagination_empty_queryset(): + """Test cursor pagination with empty queryset.""" + # Explicitly clear all events for this test + Category.objects.all().delete() + Event.objects.all().delete() + + response = client.get("/cursor_events?page_size=5").json() + + assert len(response["results"]) == 0 + assert response["next"] is None + assert response["previous"] is None + + +@override_settings(NINJA_PAGINATION_PER_PAGE=20) +def test_cursor_pagination_settings_override(): + """Test that Django settings affect cursor pagination.""" + + response = client.get("/cursor_events_default_size").json() + assert "results" in response + assert len(response["results"]) == 13 + + +def test_cursor_pagination_deleted_position(): + """Test cursor pagination when the cursor position is deleted between requests.""" + + # Get first page with page_size=3 to get a cursor + first_response = client.get("/cursor_events?page_size=3").json() + assert len(first_response["results"]) == 3 + assert first_response["next"] is not None + + # Extract the cursor for the next page + next_cursor = first_response["next"].split("cursor=")[1].split("&")[0] + + # Delete the event that the cursor is pointing to + # The cursor should be pointing to Event 4 + event_to_delete = Event.objects.get(title="Event 4") + event_to_delete.delete() + + # Now try to use the cursor - it should still work gracefully + # even though the position it was pointing to no longer exists + response = client.get(f"/cursor_events?cursor={next_cursor}&page_size=3").json() + + # Should still return results, just continuing from where it can + assert len(response["results"]) >= 1 + assert "next" in response + assert "previous" in response + + # Verify we get the remaining events after the deleted position + titles = [result["title"] for result in response["results"]] + assert "Event 4" not in titles + # Should now contain the deleted Event 5 + assert titles[0] == "Event 5" + + +def test_cursor_pagination_deleted_position_previous(): + """Test cursor pagination when the cursor position is deleted between requests using previous cursor.""" + + # Get to the last page first + response = client.get("/cursor_events?page_size=3").json() + + # Navigate to get a page with a previous cursor + while response["next"]: + next_cursor = response["next"].split("cursor=")[1].split("&")[0] + response = client.get(f"/cursor_events?cursor={next_cursor}&page_size=3").json() + + # Now we should have a previous cursor + assert response["previous"] is not None + prev_cursor = response["previous"].split("cursor=")[1].split("&")[0] + + # Delete an event that will be referenced by the previous cursor + + event_to_delete = Event.objects.get(title="Special Event 13") + event_to_delete.delete() + + # Now try to use the previous cursor - it should still work gracefully + # even though the position it was pointing to no longer exists + prev_response = client.get( + f"/cursor_events?cursor={prev_cursor}&page_size=3" + ).json() + + # Should still return results, just continuing from where it can + assert len(prev_response["results"]) >= 1 + assert "next" in prev_response + assert "previous" in prev_response + + # Verify we get the remaining events and don't include the deleted one + titles = [result["title"] for result in prev_response["results"]] + assert "Special Event 13" not in titles + + +def test_cursor_pagination_last_item_deleted(): + """Test cursor pagination when the cursor is pointed at the last item, but it is deleted.""" + + # Navigate through all pages to get to the last one + next_cursor = "" + + while True: + response = client.get(f"/cursor_events?cursor={next_cursor}&page_size=3").json() + if response["next"] is None: + break + next_cursor = response["next"].split("cursor=")[1].split("&")[0] + + # Delete the last item (Special Event 13) + last_event = Event.objects.get(title="Special Event 13") + last_event.delete() + + # Now try to use the cursor that was pointing to the last page + # It should handle the deletion gracefully + response = client.get(f"/cursor_events?cursor={next_cursor}&page_size=3").json() + + # Should return empty results + assert len(response["results"]) == 0 + assert response["next"] is None + assert response["previous"] is not None + + +# Async versions of all tests + + +@pytest.mark.asyncio +async def test_async_cursor_pagination_first_page(): + """Test first page of cursor pagination with async.""" + + response = await async_client.get("/async_cursor_events?page_size=5") + response_data = response.json() + + assert len(response_data["results"]) == 5 + assert response_data["results"][0]["title"] == "Event 1" + assert response_data["results"][-1]["title"] == "Event 5" + assert response_data["next"] is not None + assert response_data["previous"] is None + + +@pytest.mark.asyncio +async def test_async_cursor_pagination_with_cursor(): + """Test navigation using cursor with async.""" + + # Get first page to obtain next cursor + first_response = await async_client.get("/async_cursor_events") + first_response_data = first_response.json() + + assert len(first_response_data["results"]) == 10 + assert first_response_data["next"] is not None + + if first_response_data["next"]: + # we can't rely on `next` being a well-formed URL, because the testclient + # httprequest mock does not pass the path, so we extract the cursor instead + next_cursor = first_response_data["next"].split("cursor=")[1].split("&")[0] + + # Use cursor to get next page + response = await async_client.get(f"/async_cursor_events?cursor={next_cursor}") + response_data = response.json() + + assert len(response_data["results"]) == 3 # Remaining 3 events + assert response_data["next"] is None + assert response_data["previous"] is not None + + +@pytest.mark.asyncio +async def test_async_cursor_pagination_reverse_ordering(): + """Test cursor pagination with reverse ordering with async.""" + + response = await async_client.get("/async_cursor_events_reverse") + response_data = response.json() + + assert len(response_data["results"]) == 10 + # With reverse ordering, should start from the end + assert response_data["results"][0]["title"] == "Special Event 13" + assert response_data["next"] is not None + assert response_data["previous"] is None + + +@pytest.mark.asyncio +async def test_async_cursor_pagination_end_date_offset(): + """Test cursor pagination handles duplicate end_date values with async""" + + response = await async_client.get("/async_cursor_events_end_date_offset") + response_data = response.json() + + # Should handle events with same end_date by using id as secondary ordering + assert len(response_data["results"]) == 2 + + # Verify ordering is consistent across pages + all_results = [] + current_response = response_data + + while True: + all_results.extend(current_response["results"]) + if not current_response["next"]: + break + next_cursor = current_response["next"].split("cursor=")[1].split("&")[0] + current_response_obj = await async_client.get( + f"/async_cursor_events_end_date_offset?cursor={next_cursor}" + ) + current_response = current_response_obj.json() + + # Should have all events and maintain consistent ordering + assert len(all_results) == 13 + # Verify no duplicates + titles = [result["title"] for result in all_results] + assert len(titles) == len(set(titles)) + + # Verify results are sorted by start_dates, then by id (implicit in creation order) + start_dates = [result["start_date"] for result in all_results] + assert start_dates == sorted(start_dates), "Results should be sorted by end_date" + + +@pytest.mark.asyncio +async def test_async_cursor_pagination_end_date_offset_backwards(): + """Test cursor pagination handles duplicate end_date values iterating backwards with async""" + + # Start from the last page by getting all pages first + response = await async_client.get("/async_cursor_events_end_date_offset") + response_data = response.json() + + # Navigate to the last page + current_response = response_data + while current_response["next"]: + next_cursor = current_response["next"].split("cursor=")[1].split("&")[0] + current_response_obj = await async_client.get( + f"/async_cursor_events_end_date_offset?cursor={next_cursor}" + ) + current_response = current_response_obj.json() + + # Now iterate backwards using previous links + all_results_backwards = [] + while True: + # Insert at beginning to maintain reverse order + all_results_backwards = current_response["results"] + all_results_backwards + if not current_response["previous"]: + break + prev_cursor = current_response["previous"].split("cursor=")[1].split("&")[0] + current_response_obj = await async_client.get( + f"/async_cursor_events_end_date_offset?cursor={prev_cursor}" + ) + current_response = current_response_obj.json() + + # Should have all events and maintain consistent ordering + assert len(all_results_backwards) == 13 + # Verify no duplicates + titles = [result["title"] for result in all_results_backwards] + assert len(titles) == len(set(titles)) + + # Verify results are sorted by start_date when iterating backwards + start_dates = [result["start_date"] for result in all_results_backwards] + assert start_dates == sorted( + start_dates + ), "Results should be sorted by start_date when iterating backwards" + + +@pytest.mark.asyncio +async def test_async_cursor_pagination_default_page_size(): + """Test cursor pagination with default page size with async.""" + # Create test events + + response = await async_client.get("/async_cursor_events_default_size") + response_data = response.json() + + # Should use default page size which is 1000 + assert len(response_data["results"]) == 13 + assert response_data["results"][0]["title"] == "Event 1" + assert response_data["results"][-1]["title"] == "Special Event 13" + assert response_data["next"] is None + assert response_data["previous"] is None + + +@pytest.mark.asyncio +async def test_async_cursor_pagination_custom_page_size_override(): + """Test overriding page size in request with async.""" + # Create test events + + response = await async_client.get("/async_cursor_events_default_size?page_size=3") + response_data = response.json() + + assert len(response_data["results"]) == 3 + assert response_data["results"][0]["title"] == "Event 1" + assert response_data["results"][-1]["title"] == "Event 3" + assert response_data["next"] is not None + assert response_data["previous"] is None + + +@pytest.mark.asyncio +async def test_async_cursor_pagination_with_custom_params(): + """Test cursor pagination with additional query parameters with async.""" + + response = await async_client.get( + "/async_cursor_events_with_params?title_filter=Special&page_size=5" + ) + response_data = response.json() + + # Should filter to events with "Special" in title + assert len(response_data["results"]) == 3 + assert all("Special" in item["title"] for item in response_data["results"]) + + +@pytest.mark.asyncio +async def test_async_cursor_pagination_invalid_cursor(): + """Test handling of invalid cursor values with async.""" + + response = await async_client.get("/async_cursor_events?cursor=invalid&page_size=3") + + assert response.status_code == HTTPStatus.UNPROCESSABLE_ENTITY + + +@pytest.mark.asyncio +async def test_async_cursor_pagination_empty_cursor(): + """Test handling of empty cursor with async.""" + + response = await async_client.get("/async_cursor_events?cursor=&page_size=3") + response_data = response.json() + + # Should default to first page + assert len(response_data["results"]) == 3 + assert response_data["results"][0]["title"] == "Event 1" + + +@pytest.mark.asyncio +async def test_async_cursor_pagination_no_page_size(): + """Test cursor pagination without specifying page_size with async.""" + + response = await async_client.get("/async_cursor_events") + response_data = response.json() + + # Should use default page size from settings + assert len(response_data["results"]) >= 1 + assert "next" in response_data + assert "previous" in response_data + assert "results" in response_data + + +@pytest.mark.asyncio +async def test_async_cursor_pagination_large_page_size(): + """Test edge cases for cursor pagination with async.""" + + # Very large page_size + response = await async_client.get("/async_cursor_events?page_size=1000") + response_data = response.json() + # all available items (10 events + 3 special events) + assert len(response_data["results"]) == 13 + assert response_data["next"] is None + + +@pytest.mark.asyncio +async def test_async_cursor_pagination_page_size_of_one(): + # Page size of 1 with async + response = await async_client.get("/async_cursor_events?page_size=1") + response_data = response.json() + assert len(response_data["results"]) == 1 + assert response_data["results"][0]["title"] == "Event 1" + assert response_data["next"] is not None + + # Request all pages and check length and order + all_results = [] + current_response = response_data + + while True: + all_results.extend(current_response["results"]) + if not current_response["next"]: + break + next_cursor = current_response["next"].split("cursor=")[1].split("&")[0] + current_response_obj = await async_client.get( + f"/async_cursor_events?cursor={next_cursor}&page_size=1" + ) + current_response = current_response_obj.json() + + # Check total length + assert len(all_results) == 13 # 10 events + 3 special events + + # Check order - should be sorted by start_date + titles = [result["title"] for result in all_results] + expected_titles = [f"Event {i}" for i in range(1, 11)] + [ + f"Special Event {i}" for i in range(11, 14) + ] + assert titles == expected_titles + + +@pytest.mark.asyncio +async def test_async_cursor_pagination_empty_queryset(): + """Test cursor pagination with empty queryset with async.""" + # Explicitly clear all events for this test using async-safe methods + await Category.objects.all().adelete() + await Event.objects.all().adelete() + + response = await async_client.get("/async_cursor_events?page_size=5") + response_data = response.json() + + assert len(response_data["results"]) == 0 + assert response_data["next"] is None + assert response_data["previous"] is None + + +@pytest.mark.asyncio +@override_settings(NINJA_PAGINATION_PER_PAGE=20) +async def test_async_cursor_pagination_settings_override(): + """Test that Django settings affect cursor pagination with async.""" + + response = await async_client.get("/async_cursor_events_default_size") + response_data = response.json() + assert "results" in response_data + assert len(response_data["results"]) == 13 + + +@pytest.mark.asyncio +async def test_async_cursor_pagination_deleted_position(): + """Test cursor pagination when the cursor position is deleted between requests with async.""" + + # Get first page with page_size=3 to get a cursor + first_response = await async_client.get("/async_cursor_events?page_size=3") + first_response_data = first_response.json() + assert len(first_response_data["results"]) == 3 + assert first_response_data["next"] is not None + + # Extract the cursor for the next page + next_cursor = first_response_data["next"].split("cursor=")[1].split("&")[0] + + # Delete the event that the cursor is pointing to + # The cursor should be pointing to Event 4 + event_to_delete = await Event.objects.aget(title="Event 4") + await event_to_delete.adelete() + + # Now try to use the cursor - it should still work gracefully + # even though the position it was pointing to no longer exists + response = await async_client.get( + f"/async_cursor_events?cursor={next_cursor}&page_size=3" + ) + response_data = response.json() + + # Should still return results, just continuing from where it can + assert len(response_data["results"]) >= 1 + assert "next" in response_data + assert "previous" in response_data + + # Verify we get the remaining events after the deleted position + titles = [result["title"] for result in response_data["results"]] + assert "Event 4" not in titles + # Should now contain the deleted Event 5 + assert titles[0] == "Event 5" + + +@pytest.mark.asyncio +async def test_async_cursor_pagination_deleted_position_previous(): + """Test cursor pagination when the cursor position is deleted between requests using previous cursor with async.""" + + # Get to the last page first + response = await async_client.get("/async_cursor_events?page_size=3") + response_data = response.json() + + # Navigate to get a page with a previous cursor + while response_data["next"]: + next_cursor = response_data["next"].split("cursor=")[1].split("&")[0] + response = await async_client.get( + f"/async_cursor_events?cursor={next_cursor}&page_size=3" + ) + response_data = response.json() + + # Now we should have a previous cursor + assert response_data["previous"] is not None + prev_cursor = response_data["previous"].split("cursor=")[1].split("&")[0] + + # Delete an event that will be referenced by the previous cursor + + event_to_delete = await Event.objects.aget(title="Special Event 13") + await event_to_delete.adelete() + + # Now try to use the previous cursor - it should still work gracefully + # even though the position it was pointing to no longer exists + prev_response = await async_client.get( + f"/async_cursor_events?cursor={prev_cursor}&page_size=3" + ) + prev_response_data = prev_response.json() + + # Should still return results, just continuing from where it can + assert len(prev_response_data["results"]) >= 1 + assert "next" in prev_response_data + assert "previous" in prev_response_data + + # Verify we get the remaining events and don't include the deleted one + titles = [result["title"] for result in prev_response_data["results"]] + assert "Special Event 13" not in titles + + +@pytest.mark.asyncio +async def test_async_cursor_pagination_last_item_deleted(): + """Test cursor pagination when the cursor is pointed at the last item, but it is deleted with async.""" + + # Navigate through all pages to get to the last one + next_cursor = "" + + while True: + response = await async_client.get( + f"/async_cursor_events?cursor={next_cursor}&page_size=3" + ) + response_data = response.json() + if response_data["next"] is None: + break + next_cursor = response_data["next"].split("cursor=")[1].split("&")[0] + + # Delete the last item (Special Event 13) + last_event = await Event.objects.aget(title="Special Event 13") + await last_event.adelete() + + # Now try to use the cursor that was pointing to the last page + # It should handle the deletion gracefully + response = await async_client.get( + f"/async_cursor_events?cursor={next_cursor}&page_size=3" + ) + response_data = response.json() + + # Should return empty results + assert len(response_data["results"]) == 0 + assert response_data["next"] is None + assert response_data["previous"] is not None From 8113379a9839c76b7978b96b92ef71fe9a5a38e7 Mon Sep 17 00:00:00 2001 From: Alejandro Giacometti Date: Tue, 4 Nov 2025 17:18:16 +0000 Subject: [PATCH 3/6] Adds docs for CursorPaginator --- docs/docs/guides/response/pagination.md | 61 +++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/docs/docs/guides/response/pagination.md b/docs/docs/guides/response/pagination.md index f9c771a23..8a3f821a4 100644 --- a/docs/docs/guides/response/pagination.md +++ b/docs/docs/guides/response/pagination.md @@ -88,6 +88,67 @@ Example query: This allows you to temporarily override the page size setting in your request. The request will use the specified `page_size` value if provided. Otherwise, it will use either the value specified in the decorator or the value from `PAGINATION_MAX_PER_PAGE_SIZE` in settings.py if no decorator value is set. +### CursorPagination + +Cursor-based pagination provides stable pagination for datasets that may change frequently. Cursor pagination uses base64 encoded tokens to mark positions in the dataset, ensuring consistent results even when items are added or removed. + +```python hl_lines="1 4" +from ninja.pagination import paginate, CursorPagination + +@api.get('/events', response=List[EventSchema]) +@paginate(CursorPagination) +def list_events(request): + return Event.objects.all() +``` + +Example query: + +``` +/api/events?cursor=eyJwIjoiMjAyNC0wMS0wMSIsInIiOmZhbHNlLCJvIjowfQ== +``` + +this class has two input parameters: + +- `cursor` - base64 token representing the current position (optional, starts from beginning if not provided) +- `page_size` - number of items per page (optional) + +You can specify the `page_size` value to temporarily override in the request: + +``` +/api/events?cursor=eyJwIjoiMjAyNC0wMS0wMSIsInIiOmZhbHNlLCJvIjowfQ==&page_size=5 +``` + +This class has a few parameters, which determine how the cursor position is ascertained and the parameter encoded: + +- `ordering` - tuple of field names to order the queryset. Use `-` prefix for descending order. The first one of which will be used to encode the position. The ordering field should be unique if possible. A string representation of this field will be used to point to the current position of the cursor. Timestamps work well if each item in the collection is created independently. The paginator can handle some non-uniqueness by adding an offset. Defaults to `("-created",)`, change in `NINJA_PAGINATION_DEFAULT_ORDERING` + +- `page_size` - default page size for endpoint. Defaults to `100`, change in `NINJA_PAGINATION_PER_PAGE` +- `max_page_size` - maximum allowed page size for endpoint. Defaults to `100`, change in `NINJA_PAGINATION_MAX_PER_PAGE_SIZE` + +Finally, there is a `NINJA_PAGINATION_MAX_OFFSET` setting to limit malicious cursor requests. It defaults to `100`. + +The class parameters can be set globally via settings as well as per view: + +```python hl_lines="2" +@api.get("/events") +@paginate(CursorPagination, ordering=("start_date", "end_date"), page_size=20, max_page_size=100) +def list_events(request): + return Event.objects.all() +``` + +The response includes navigation links and results: + +```json +{ + "next": "http://api.example.com/events?cursor=eyJwIjoiMjAyNC0wMS0wMiIsInIiOmZhbHNlLCJvIjowfQ==", + "previous": "http://api.example.com/events?cursor=eyJwIjoiMjAyNC0wMS0wMSIsInIiOnRydWUsIm8iOjB9", + "results": [ + { "id": 1, "title": "Event 1", "start_date": "2024-01-01" }, + { "id": 2, "title": "Event 2", "start_date": "2024-01-02" } + ] +} +``` + ## Accessing paginator parameters in view function If you need access to `Input` parameters used for pagination in your view function - use `pass_parameter` argument From cd9dcc51003b7a21c992862429fd073237b8707c Mon Sep 17 00:00:00 2001 From: Alejandro Giacometti Date: Thu, 6 Nov 2025 10:49:48 +0000 Subject: [PATCH 4/6] default to `("-pk",)` --- docs/docs/guides/response/pagination.md | 2 +- ninja/conf.py | 2 +- ninja/pagination.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/docs/guides/response/pagination.md b/docs/docs/guides/response/pagination.md index 8a3f821a4..b3dec1c4f 100644 --- a/docs/docs/guides/response/pagination.md +++ b/docs/docs/guides/response/pagination.md @@ -120,7 +120,7 @@ You can specify the `page_size` value to temporarily override in the request: This class has a few parameters, which determine how the cursor position is ascertained and the parameter encoded: -- `ordering` - tuple of field names to order the queryset. Use `-` prefix for descending order. The first one of which will be used to encode the position. The ordering field should be unique if possible. A string representation of this field will be used to point to the current position of the cursor. Timestamps work well if each item in the collection is created independently. The paginator can handle some non-uniqueness by adding an offset. Defaults to `("-created",)`, change in `NINJA_PAGINATION_DEFAULT_ORDERING` +- `ordering` - tuple of field names to order the queryset. Use `-` prefix for descending order. The first one of which will be used to encode the position. The ordering field should be unique if possible. A string representation of this field will be used to point to the current position of the cursor. Timestamps work well if each item in the collection is created independently. The paginator can handle some non-uniqueness by adding an offset. Defaults to `("-pk",)`, change in `NINJA_PAGINATION_DEFAULT_ORDERING` - `page_size` - default page size for endpoint. Defaults to `100`, change in `NINJA_PAGINATION_PER_PAGE` - `max_page_size` - maximum allowed page size for endpoint. Defaults to `100`, change in `NINJA_PAGINATION_MAX_PER_PAGE_SIZE` diff --git a/ninja/conf.py b/ninja/conf.py index fd0d423cc..838de638e 100644 --- a/ninja/conf.py +++ b/ninja/conf.py @@ -12,7 +12,7 @@ class Settings(BaseModel): "ninja.pagination.LimitOffsetPagination", alias="NINJA_PAGINATION_CLASS" ) PAGINATION_DEFAULT_ORDERING: tuple[str, ...] = Field( - ("-created",), alias="NINJA_PAGINATION_DEFAULT_ORDERING" + ("-pk",), alias="NINJA_PAGINATION_DEFAULT_ORDERING" ) PAGINATION_MAX_OFFSET: int = Field(100, alias="NINJA_PAGINATION_MAX_OFFSET") PAGINATION_PER_PAGE: int = Field(100, alias="NINJA_PAGINATION_PER_PAGE") diff --git a/ninja/pagination.py b/ninja/pagination.py index 2a311717d..a53129ed3 100644 --- a/ninja/pagination.py +++ b/ninja/pagination.py @@ -311,7 +311,7 @@ def _reverse_order(order: tuple[str, ...]) -> tuple[str, ...]: Flip ordering direction for backward pagination. Example: - ("-created", "id") becomes ("created", "-id") + ("-created", "pk") becomes ("created", "-pk") ("name", "-updated") becomes ("-name", "updated") """ return tuple( From 89021f4b1c59a72ec2861ea6dfa75274d4f2b488 Mon Sep 17 00:00:00 2001 From: Alejandro Giacometti Date: Thu, 6 Nov 2025 11:00:53 +0000 Subject: [PATCH 5/6] downgrade type annotations --- ninja/conf.py | 4 +-- ninja/pagination.py | 74 +++++++++++++++++++++------------------------ 2 files changed, 37 insertions(+), 41 deletions(-) diff --git a/ninja/conf.py b/ninja/conf.py index 838de638e..3f2332250 100644 --- a/ninja/conf.py +++ b/ninja/conf.py @@ -1,5 +1,5 @@ from math import inf -from typing import Dict, Optional, Set +from typing import Dict, Optional, Set, Tuple from django.conf import settings as django_settings from pydantic import BaseModel, ConfigDict, Field @@ -11,7 +11,7 @@ class Settings(BaseModel): PAGINATION_CLASS: str = Field( "ninja.pagination.LimitOffsetPagination", alias="NINJA_PAGINATION_CLASS" ) - PAGINATION_DEFAULT_ORDERING: tuple[str, ...] = Field( + PAGINATION_DEFAULT_ORDERING: Tuple[str, ...] = Field( ("-pk",), alias="NINJA_PAGINATION_DEFAULT_ORDERING" ) PAGINATION_MAX_OFFSET: int = Field(100, alias="NINJA_PAGINATION_MAX_OFFSET") diff --git a/ninja/pagination.py b/ninja/pagination.py index a53129ed3..e964ab29d 100644 --- a/ninja/pagination.py +++ b/ninja/pagination.py @@ -5,7 +5,6 @@ from functools import partial, wraps from math import inf from typing import ( - Annotated, Any, AsyncGenerator, Callable, @@ -202,7 +201,7 @@ class CursorPagination(AsyncPaginationBase): def __init__( self, *, - ordering: tuple[str, ...] = settings.PAGINATION_DEFAULT_ORDERING, + ordering: Tuple[str, ...] = settings.PAGINATION_DEFAULT_ORDERING, page_size: int = settings.PAGINATION_PER_PAGE, max_page_size: int = settings.PAGINATION_MAX_PER_PAGE_SIZE, **kwargs: Any, @@ -221,13 +220,13 @@ def __init__( super().__init__(**kwargs) class Input(Schema): - page_size: int | None = None - cursor: str | None = None + page_size: Optional[int] = None + cursor: Optional[str] = None class Output(Schema): - previous: str | None - next: str | None - results: list[Any] + previous: Optional[str] + next: Optional[str] + results: List[Any] class Cursor(BaseModel): """ @@ -237,31 +236,28 @@ class Cursor(BaseModel): """ - p: Annotated[ - str | None, - Field( - title="position", - description="String identifier for the current position in the dataset", - ), - ] = None - r: Annotated[ - bool, - Field( - title="reverse", description="Whether to reverse the ordering direction" - ), - ] = False + p: Optional[str] = Field( + None, + title="position", + description="String identifier for the current position in the dataset", + ) + + r: bool = Field( + False, + title="reverse", + description="Whether to reverse the ordering direction", + ) + # offset enables the use of a non-unique ordering field # e.g. if created time of two items is exactly the same, we can use the offset # to figure out the position exactly - o: Annotated[ - int, - Field( - ge=0, - lt=settings.PAGINATION_MAX_OFFSET, - title="offset", - description="Number of items to skip from the current position", - ), - ] = 0 + o: int = Field( + 0, + ge=0, + lt=settings.PAGINATION_MAX_OFFSET, + title="offset", + description="Number of items to skip from the current position", + ) @field_validator("*", mode="before") @classmethod @@ -278,7 +274,7 @@ def validate_individual_queryparam(cls, value: Any) -> Any: @classmethod def from_encoded_param( - cls, encoded_param: str | None, context: Any = None + cls, encoded_param: Optional[str], context: Any = None ) -> "CursorPagination.Cursor": """ Deserialize cursor from URL-safe base64 token. @@ -306,7 +302,7 @@ def encode_as_param(self) -> str: return b64encode(query_string.encode("ascii")).decode("ascii") @staticmethod - def _reverse_order(order: tuple[str, ...]) -> tuple[str, ...]: + def _reverse_order(order: Tuple[str, ...]) -> Tuple[str, ...]: """ Flip ordering direction for backward pagination. @@ -326,7 +322,7 @@ def _get_position(self, item: Any) -> str: """ return str(getattr(item, self._order_attribute)) - def _get_page_size(self, requested_page_size: int | None) -> int: + def _get_page_size(self, requested_page_size: Optional[int]) -> int: """ Determine the actual page size to use, respecting configured limits. @@ -341,9 +337,9 @@ def _get_page_size(self, requested_page_size: int | None) -> int: def _build_next_cursor( self, current_cursor: Cursor, - results: list[Any], - additional_position: str | None = None, - ) -> Cursor | None: + results: List[Any], + additional_position: Optional[str] = None, + ) -> Optional[Cursor]: """ Build cursor for next page """ @@ -376,9 +372,9 @@ def _build_next_cursor( def _build_previous_cursor( self, current_cursor: Cursor, - results: list[Any], - additional_position: str | None = None, - ) -> Cursor | None: + results: List[Any], + additional_position: Optional[str] = None, + ) -> Optional[Cursor]: """ Build cursor for previous page """ @@ -418,7 +414,7 @@ def _build_previous_cursor( return self.Cursor(o=offset, r=True, p=previous_position) @staticmethod - def _add_cursor_to_URL(url: str, cursor: Cursor | None) -> str | None: + def _add_cursor_to_URL(url: str, cursor: Optional[Cursor]) -> Optional[str]: """ Build pagination URLs with an encoded cursor. From 5528fdf60819f811371d705f2664e78c8ef12f35 Mon Sep 17 00:00:00 2001 From: Alejandro Giacometti Date: Mon, 1 Dec 2025 14:05:16 +0000 Subject: [PATCH 6/6] Explicit default values --- ninja/pagination.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ninja/pagination.py b/ninja/pagination.py index e964ab29d..04e7e7785 100644 --- a/ninja/pagination.py +++ b/ninja/pagination.py @@ -237,13 +237,13 @@ class Cursor(BaseModel): """ p: Optional[str] = Field( - None, + default=None, title="position", description="String identifier for the current position in the dataset", ) r: bool = Field( - False, + default=False, title="reverse", description="Whether to reverse the ordering direction", ) @@ -252,7 +252,7 @@ class Cursor(BaseModel): # e.g. if created time of two items is exactly the same, we can use the offset # to figure out the position exactly o: int = Field( - 0, + default=0, ge=0, lt=settings.PAGINATION_MAX_OFFSET, title="offset",