Skip to content
2 changes: 1 addition & 1 deletion label_studio/core/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -903,4 +903,4 @@ def collect_versions_dummy(**kwargs):

# Used for async migrations. In LSE this is set to a real queue name, including here so we
# can use settings.SERVICE_QUEUE_NAME in async migrations in LSO
SERVICE_QUEUE_NAME = ''
SERVICE_QUEUE_NAME = get_env('SERVICE_QUEUE_NAME', 'default')
27 changes: 27 additions & 0 deletions label_studio/core/tests/test_db_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,30 @@
import logging

from label_studio.core.utils import db as db_utils


class _BrokenConnection:
vendor = 'testdb'

@property
def settings_dict(self):
# Simulate an unexpected error when accessing connection.settings_dict
raise RuntimeError('boom')


def test_current_db_key_exception_path(monkeypatch, caplog):
# Arrange: replace connection with a broken one to trigger the except path
monkeypatch.setattr(db_utils, 'connection', _BrokenConnection())

# Act: call current_db_key and capture error logs
with caplog.at_level(logging.ERROR, logger='label_studio.core.utils.db'):
key = db_utils.current_db_key()

# Assert: name fallback used and error message logged
assert key == 'testdb:unknown'
assert any('Error getting current DB key' in rec.message for rec in caplog.records)


"""This module contains tests for database utility functions in core/utils/db.py"""
import pytest
from core.utils.db import batch_delete
Expand Down
60 changes: 58 additions & 2 deletions label_studio/core/utils/db.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import itertools
import logging
import time
from typing import Optional, TypeVar
from typing import Dict, Optional, TypeVar

from django.db import OperationalError, models, transaction
from django.db import OperationalError, connection, models, transaction
from django.db.models import Model, QuerySet, Subquery
from django.db.models.signals import post_migrate
from django.db.utils import DatabaseError, ProgrammingError
from django.dispatch import receiver

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -118,3 +121,56 @@ def batch_delete(queryset, batch_size=500):
total_deleted += deleted

return total_deleted


# =====================
# Schema helpers
# =====================

_column_presence_cache: Dict[str, Dict[str, Dict[str, bool]]] = {}


def current_db_key() -> str:
"""Return a process-stable identifier for the current DB connection.

Using vendor + NAME isolates caches between sqlite test DBs and postgres runs,
avoiding stale lookups across pytest sessions or multi-DB setups.
"""
try:
name = str(connection.settings_dict.get('NAME'))
except Exception as e:
name = 'unknown'
logger.error(f'Error getting current DB key: {e}')
return f'{connection.vendor}:{name}'


def has_column_cached(table_name: str, column_name: str) -> bool:
"""Check if a DB column exists for the given table, with per-process memoization.

Notes:
- Uses Django introspection; caches per (table, column) with case-insensitive column keys.
- Safe during early migrations; returns False on any error.
"""
col_key = column_name.lower()
db_cache = _column_presence_cache.get(current_db_key())
table_cache = db_cache.get(table_name) if db_cache else None
if table_cache and col_key in table_cache:
return table_cache[col_key]

try:
with connection.cursor() as cursor:
cols = connection.introspection.get_table_description(cursor, table_name)
present = any(getattr(col, 'name', '').lower() == col_key for col in cols)
except (DatabaseError, ProgrammingError):
present = False

_column_presence_cache.setdefault(current_db_key(), {}).setdefault(table_name, {})[col_key] = present
return present


@receiver(post_migrate)
def signal_clear_column_presence_cache(**_kwargs):
"""If some migration adds a column, we need to clear the column_presence_cache
so that the next migration can introspect the new column using has_column_cached()."""
logger.debug('Clearing column presence cache in post_migrate signal')
_column_presence_cache.clear()
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Generated by Django 5.1.12 on 2025-10-25 00:22

import django.db.models.deletion
from django.conf import settings
from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("organizations", "0006_alter_organizationmember_deleted_at"),
("projects", "0031_alter_project_show_ground_truth_first"),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]

operations = [
migrations.AddField(
model_name="project",
name="deleted_at",
field=models.DateTimeField(blank=True, null=True, verbose_name="deleted at"),
),
migrations.AddField(
model_name="project",
name="deleted_by",
field=models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name="deleted_projects",
db_index=False,
to=settings.AUTH_USER_MODEL,
verbose_name="deleted by",
),
),
migrations.AddField(
model_name="project",
name="purge_at",
field=models.DateTimeField(blank=True, null=True, verbose_name="purge at"),
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from django.db import migrations, connection
from core.redis import start_job_async_or_sync
from core.models import AsyncMigrationStatus
import logging

logger = logging.getLogger(__name__)
migration_name = '0033_projects_soft_delete_indexes_async'


def forward_migration(migration_name):
rec = AsyncMigrationStatus.objects.create(name=migration_name, status=AsyncMigrationStatus.STATUS_STARTED)
if connection.vendor == 'postgresql':
sqls = [
'CREATE INDEX CONCURRENTLY IF NOT EXISTS project_org_deleted_idx ON project (organization_id, deleted_at)',
'CREATE INDEX CONCURRENTLY IF NOT EXISTS project_deleted_at_idx ON project (deleted_at)',
'CREATE INDEX CONCURRENTLY IF NOT EXISTS project_purge_at_idx ON project (purge_at)',
]
else:
sqls = [
'CREATE INDEX IF NOT EXISTS project_org_deleted_idx ON project (organization_id, deleted_at)',
'CREATE INDEX IF NOT EXISTS project_deleted_at_idx ON project (deleted_at)',
'CREATE INDEX IF NOT EXISTS project_purge_at_idx ON project (purge_at)',
]
with connection.cursor() as c:
for sql in sqls:
c.execute(sql)
rec.status = AsyncMigrationStatus.STATUS_FINISHED
rec.save()


def reverse_migration(migration_name):
rec = AsyncMigrationStatus.objects.create(name=migration_name, status=AsyncMigrationStatus.STATUS_STARTED)
if connection.vendor == 'postgresql':
sqls = [
'DROP INDEX CONCURRENTLY IF EXISTS project_org_deleted_idx',
'DROP INDEX CONCURRENTLY IF EXISTS project_deleted_at_idx',
'DROP INDEX CONCURRENTLY IF EXISTS project_purge_at_idx',
]
else:
sqls = [
'DROP INDEX IF EXISTS project_org_deleted_idx',
'DROP INDEX IF EXISTS project_deleted_at_idx',
'DROP INDEX IF EXISTS project_purge_at_idx',
]
with connection.cursor() as c:
for sql in sqls:
c.execute(sql)
rec.status = AsyncMigrationStatus.STATUS_FINISHED
rec.save()


def forwards(apps, schema_editor):
start_job_async_or_sync(forward_migration, migration_name=migration_name)


def backwards(apps, schema_editor):
start_job_async_or_sync(reverse_migration, migration_name=migration_name)


class Migration(migrations.Migration):
atomic = False
dependencies = [
('projects', '0032_project_deleted_at_project_deleted_by_and_more'),
]
operations = [
migrations.RunPython(forwards, backwards),
]


30 changes: 28 additions & 2 deletions label_studio/projects/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
load_func,
merge_labels_counters,
)
from core.utils.db import batch_update_with_retry, fast_first
from core.utils.db import batch_update_with_retry, fast_first, has_column_cached
from django.conf import settings
from django.contrib.postgres.search import SearchVectorField
from django.core.validators import MaxLengthValidator, MinLengthValidator
Expand Down Expand Up @@ -107,6 +107,17 @@ def with_counts_annotate(queryset, fields=None, exclude=None):
return queryset


class ProjectVisibleManager(ProjectManager):
"""Default manager that hides soft-deleted projects (deleted_at IS NULL)."""

def get_queryset(self):
qs = super().get_queryset()
# Avoid referencing columns that might not exist during early migrations
if has_column_cached(self.model._meta.db_table, 'deleted_at'):
return qs.filter(deleted_at__isnull=True)
return qs


ProjectMixin = load_func(settings.PROJECT_MIXIN)


Expand All @@ -123,7 +134,9 @@ class SkipQueue(models.TextChoices):
# ignore skipped tasks => skip is a valid annotation, task is completed (finished=True)
IGNORE_SKIPPED = 'IGNORE_SKIPPED', 'Ignore skipped'

objects = ProjectManager()
# Managers: default (visible only) and explicit unfiltered
objects = ProjectVisibleManager()
all_objects = ProjectManager()
__original_label_config = None

title = models.CharField(
Expand Down Expand Up @@ -290,6 +303,19 @@ class SkipQueue(models.TextChoices):
help_text='Custom task lock TTL in seconds. If not set, the default value is used',
)

# Soft-delete lifecycle (OSS fields, used by LSE logic)
deleted_at = models.DateTimeField(_('deleted at'), null=True, blank=True)
deleted_by = models.ForeignKey(
settings.AUTH_USER_MODEL,
related_name='deleted_projects',
on_delete=models.SET_NULL,
null=True,
blank=True,
db_index=False,
verbose_name=_('deleted by'),
)
purge_at = models.DateTimeField(_('purge at'), null=True, blank=True)

def __init__(self, *args, **kwargs):
super(Project, self).__init__(*args, **kwargs)
self.__original_label_config = self.label_config
Expand Down
114 changes: 114 additions & 0 deletions label_studio/projects/tests/test_soft_delete_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import types

import projects.models as project_models
import pytest
from django.db import connection
from projects.models import Project

from label_studio.core.utils import db as db_utils
from label_studio.core.utils.db import has_column_cached
from label_studio.organizations.tests.factories import OrganizationFactory
from label_studio.projects.tests.factories import ProjectFactory

pytestmark = pytest.mark.django_db


def test_project_manager_filters_deleted():
"""Project manager hides deleted rows by default; unfiltered manager returns all.

Purpose: Verify default manager excludes soft-deleted rows and all_objects returns them.
Setup: Create two projects in one org; mark one as deleted.
Actions: Query via Project.objects and Project.all_objects.
Validations: Visible list excludes deleted; unfiltered includes both.
Edge cases: N/A.
"""
org = OrganizationFactory()
p1 = ProjectFactory(organization=org, title='active')
_ = ProjectFactory(organization=org, title='deleted', deleted_at=p1.created_at)

visible = list(Project.objects.order_by('id').values_list('title', flat=True))
all_rows = list(Project.all_objects.order_by('id').values_list('title', flat=True))

assert 'active' in visible and 'deleted' not in visible
assert set(all_rows) >= {'active', 'deleted'}


def test_project_manager_for_user_respects_filter():
"""for_user applies org scope and soft-delete filter.

Purpose: Ensure for_user(user) scopes to user's active org and hides deleted rows.
Setup: Two orgs; three projects (active+deleted in org1, active in org2).
Actions: Call Project.objects.for_user(user) for org1 user.
Validations: Only org1 active project is returned.
Edge cases: N/A.
"""
org1 = OrganizationFactory()
org2 = OrganizationFactory()
user = org1.created_by
user.active_organization = org1
user.save(update_fields=['active_organization'])

p1 = ProjectFactory(organization=org1, title='org1-active')
_ = ProjectFactory(organization=org1, title='org1-deleted', deleted_at=p1.created_at)
_ = ProjectFactory(organization=org2, title='org2-active')

titles = set(Project.objects.for_user(user).values_list('title', flat=True))
assert 'org1-active' in titles
assert 'org1-deleted' not in titles
assert 'org2-active' not in titles


def test_visible_manager_skips_filter_without_column(monkeypatch):
"""Manager should not reference missing deleted_at during early migrations.

Purpose: Avoid schema errors before column exists.
Setup: Force has_column_cached to return False; create active+deleted rows.
Actions: Query via Project.objects.
Validations: Both rows are returned (no filter applied).
Edge cases: N/A.
"""
monkeypatch.setattr(project_models, 'has_column_cached', lambda *_: False, raising=True)

org = OrganizationFactory()
p1 = ProjectFactory(organization=org, title='active')
_ = ProjectFactory(organization=org, title='deleted', deleted_at=p1.created_at)

# Without column, the filter is skipped, so both come back
titles = set(Project.objects.values_list('title', flat=True))
assert {'active', 'deleted'} <= titles


def test_has_column_cached_memoization_and_clear(monkeypatch):
"""Column presence check is memoized and reset by post_migrate.

Purpose: Ensure only one introspection call until cache clear.
Setup: Stub get_table_description to count calls.
Actions: Call has_column_cached twice, then clear cache, call again.
Validations: One call before, second after clear.
Edge cases: N/A.
"""
# Ensure cache starts empty so first call triggers introspection
db_utils.signal_clear_column_presence_cache()

calls = {'count': 0}

def fake_get_table_description(cursor, table):
calls['count'] += 1
# Return objects that mimic description entries with .name attribute
col = types.SimpleNamespace(name='deleted_at')
return [col]

monkeypatch.setattr(connection.introspection, 'get_table_description', fake_get_table_description)

# First call hits DB introspection
assert has_column_cached('project', 'deleted_at') is True
# Second call should be cached
assert has_column_cached('project', 'deleted_at') is True
assert calls['count'] == 1

# Clear cache directly (instead of sending the signal with app_config)
db_utils.signal_clear_column_presence_cache()

# After cache clear, another introspection happens
assert has_column_cached('project', 'deleted_at') is True
assert calls['count'] == 2
Loading
Loading