diff --git a/label_studio/core/settings/base.py b/label_studio/core/settings/base.py
index 310490bc57ad..d76fdbd89032 100644
--- a/label_studio/core/settings/base.py
+++ b/label_studio/core/settings/base.py
@@ -933,6 +933,7 @@ def collect_versions_dummy(**kwargs):
# Base FSM (Finite State Machine) Configuration for Label Studio
FSM_CACHE_TTL = 300 # Cache TTL in seconds (5 minutes)
FSM_SYNC_PROJECT_STATE = 'fsm.project_transitions.sync_project_state'
+FSM_INFERENCE_FUNCTION = 'fsm.state_inference._get_or_infer_state'
# Used for async migrations. In LSE this is set to a real queue name, including here so we
# can use settings.SERVICE_QUEUE_NAME in async migrations in LSO
diff --git a/label_studio/fsm/README.md b/label_studio/fsm/README.md
index 00aaca3d8006..eba8679581d6 100644
--- a/label_studio/fsm/README.md
+++ b/label_studio/fsm/README.md
@@ -309,4 +309,5 @@ When contributing:
- Keep framework code generic and reusable
- Add performance tests for UUID7 optimizations
- Document extension points and customization options
-- Ensure extensibility is preserved
\ No newline at end of file
+- Ensure extensibility is preserved
+- When adding pytests, use and extend reusable helper functions in `fsm/tests/helpers.py` when appropriate
diff --git a/label_studio/fsm/functions.py b/label_studio/fsm/functions.py
index b2ea11b872c8..641fcd025099 100644
--- a/label_studio/fsm/functions.py
+++ b/label_studio/fsm/functions.py
@@ -4,9 +4,12 @@
This module contains reusable functions for FSM state management that are
used across different parts of the codebase.
"""
-
import logging
+from core.current_request import CurrentContext
+from fsm.state_inference import get_or_infer_state
+from fsm.utils import get_or_initialize_state
+
logger = logging.getLogger(__name__)
@@ -32,7 +35,6 @@ def backfill_fsm_states_for_tasks(storage_id, tasks_created, link_class):
return
try:
- from lse_fsm.state_inference import backfill_state_for_entity
from tasks.models import Task
# Get tasks created in this sync
@@ -48,12 +50,11 @@ def backfill_fsm_states_for_tasks(storage_id, tasks_created, link_class):
# Backfill initial CREATED state for each task
for task in tasks:
- backfill_state_for_entity(task, 'task', create_record=True)
+
+ inferred_state = get_or_infer_state(task)
+ get_or_initialize_state(task, user=CurrentContext.get_user(), inferred_state=inferred_state)
logger.info(f'Storage sync: FSM states created for {len(task_ids)} tasks')
- except ImportError:
- # LSE not available (OSS), skip FSM sync
- logger.debug('LSE not available, skipping FSM state backfill for storage sync')
except Exception as e:
# Don't fail storage sync if FSM sync fails
logger.error(f'FSM sync after storage sync failed: {e}', exc_info=True)
diff --git a/label_studio/fsm/project_transitions.py b/label_studio/fsm/project_transitions.py
index ebc17066d525..9fa000408917 100644
--- a/label_studio/fsm/project_transitions.py
+++ b/label_studio/fsm/project_transitions.py
@@ -11,9 +11,10 @@
from django.conf import settings
from fsm.registry import register_state_transition
from fsm.state_choices import ProjectStateChoices
+from fsm.state_inference import get_or_infer_state
from fsm.state_manager import StateManager
from fsm.transitions import ModelChangeTransition, TransitionContext
-from fsm.utils import get_or_initialize_state, infer_entity_state_from_data
+from fsm.utils import get_or_initialize_state
@register_state_transition('project', 'project_created', triggers_on_create=True, triggers_on_update=False)
@@ -135,7 +136,7 @@ def transition(self, context: TransitionContext) -> Dict[str, Any]:
def sync_project_state(project, user=None, reason=None, context_data=None):
current_state = StateManager.get_current_state_value(project)
- inferred_state = infer_entity_state_from_data(project)
+ inferred_state = get_or_infer_state(project)
if current_state is None:
get_or_initialize_state(project, user=user, inferred_state=inferred_state)
diff --git a/label_studio/fsm/state_inference.py b/label_studio/fsm/state_inference.py
new file mode 100644
index 000000000000..10e807517642
--- /dev/null
+++ b/label_studio/fsm/state_inference.py
@@ -0,0 +1,77 @@
+import logging
+from typing import Optional
+
+from core.utils.common import load_func
+from django.conf import settings
+
+logger = logging.getLogger(__name__)
+
+
+def _get_or_infer_state(entity) -> Optional[str]:
+ """
+ Infer what the FSM state should be based on entity's current data.
+
+ This is used for "cold start" scenarios where entities exist in the database
+ but don't have FSM state records yet (e.g., after FSM deployment to production
+ with pre-existing data).
+
+ Args:
+ entity: The entity to infer state for (Task, Project, or Annotation)
+
+ Returns:
+ Inferred state value, or None if entity type not supported
+
+ Examples:
+ >>> task = Task.objects.get(id=123)
+ >>> task.is_labeled = True
+ >>> _get_or_infer_state(task)
+ 'COMPLETED'
+
+ >>> project = Project.objects.get(id=456)
+ >>> _get_or_infer_state(project)
+ 'CREATED'
+ """
+ from fsm.state_choices import AnnotationStateChoices, ProjectStateChoices, TaskStateChoices
+
+ entity_type = entity._meta.model_name.lower()
+
+ if entity_type == 'task':
+ # Task state depends on whether it has been labeled
+ return TaskStateChoices.COMPLETED if entity.is_labeled else TaskStateChoices.CREATED
+ elif entity_type == 'project':
+ # Project state depends on task completion
+ # If no tasks exist, project is CREATED
+ # If any tasks are completed, project is at least IN_PROGRESS
+ # If all tasks are completed, project is COMPLETED
+ tasks = entity.tasks.all()
+ if not tasks.exists():
+ return ProjectStateChoices.CREATED
+
+ # Count labeled tasks to determine project state
+ total_tasks = tasks.count()
+ labeled_tasks = tasks.filter(is_labeled=True).count()
+
+ if labeled_tasks == 0:
+ return ProjectStateChoices.CREATED
+ elif labeled_tasks == total_tasks:
+ return ProjectStateChoices.COMPLETED
+ else:
+ return ProjectStateChoices.IN_PROGRESS
+ elif entity_type == 'annotation':
+ # Annotations are SUBMITTED when created
+ return AnnotationStateChoices.SUBMITTED
+ else:
+ logger.warning(
+ f'Cannot infer state for unknown entity type: {entity_type}',
+ extra={
+ 'event': 'fsm.infer_state_unknown_type',
+ 'entity_type': entity_type,
+ 'entity_id': entity.pk,
+ },
+ )
+ return None
+
+
+def get_or_infer_state(entity) -> Optional[str]:
+ func = load_func(settings.FSM_INFERENCE_FUNCTION)
+ return func(entity)
diff --git a/label_studio/fsm/tests/conftest.py b/label_studio/fsm/tests/conftest.py
index f44042b3c90d..3a7c8efb9ef4 100644
--- a/label_studio/fsm/tests/conftest.py
+++ b/label_studio/fsm/tests/conftest.py
@@ -12,9 +12,25 @@
from fsm.registry import state_choices_registry, state_model_registry, transition_registry
from fsm.state_manager import StateManager
+from label_studio.tests import conftest as ls_tests_conftest
+
logger = logging.getLogger(__name__)
+# Re-export core fixtures from main LS test suite so FSM tests behave like OSS tests.
+# NOTE: We alias the same underlying functions so pytest registers them as fixtures
+# in this module as well (including their autouse behavior).
+django_live_url = ls_tests_conftest.get_server_url
+business_client = ls_tests_conftest.business_client
+
+# Storage-related fixtures to mock cloud providers for import storages
+aws_credentials = ls_tests_conftest.aws_credentials
+s3 = ls_tests_conftest.s3
+s3_with_images = ls_tests_conftest.s3_with_images
+gcs_client = ls_tests_conftest.gcs_client
+azure_client = ls_tests_conftest.azure_client
+
+
@pytest.fixture(autouse=True, scope='function')
def fsm_test_isolation():
"""
diff --git a/label_studio/fsm/tests/helpers.py b/label_studio/fsm/tests/helpers.py
new file mode 100644
index 000000000000..78aca279fe43
--- /dev/null
+++ b/label_studio/fsm/tests/helpers.py
@@ -0,0 +1,301 @@
+"""
+Test helper functions and assertion utilities for LSE FSM tests.
+
+This module provides reusable helper functions for FSM testing, including:
+- FSM state assertions for all entity types
+- SDK client setup utilities
+- Common test data factories
+- State verification helpers
+"""
+
+import logging
+from typing import Optional
+
+from core.current_request import CurrentContext
+from django.core.cache import cache
+from fsm.state_manager import get_state_manager
+from label_studio_sdk.client import LabelStudio
+from projects.models import Project
+from tasks.models import Annotation, AnnotationDraft, Task
+
+logger = logging.getLogger(__name__)
+
+# Get the configured StateManager
+StateManager = get_state_manager()
+
+
+# ============================================================================
+# SDK Client Setup
+# ============================================================================
+
+
+def create_sdk_client(django_live_url: str, business_client) -> LabelStudio:
+ """
+ Create and configure an SDK client for testing.
+
+ Args:
+ django_live_url: Base URL for the Django test server
+ business_client: Business client fixture with API key
+
+ Returns:
+ Configured LabelStudio SDK client
+ """
+ return LabelStudio(base_url=django_live_url, api_key=business_client.api_key)
+
+
+def setup_fsm_context(user):
+ """
+ Set up CurrentContext for FSM operations.
+
+ This ensures FSM transitions execute properly during tests by:
+ - Setting the current user in CurrentContext
+ - Clearing any existing cache
+
+ Args:
+ user: User instance to set as current user
+ """
+ CurrentContext.clear()
+ cache.clear()
+ CurrentContext.set_user(user)
+ logger.info(
+ f'FSM test context set up for user {user.id}',
+ extra={'event': 'fsm.test_context_setup', 'user_id': user.id},
+ )
+
+
+# ============================================================================
+# State Assertion Helpers
+# ============================================================================
+
+
+def assert_task_state(task_id: int, expected_state: str, msg: Optional[str] = None):
+ """
+ Assert that a task has the expected FSM state.
+
+ This test validates step by step:
+ - Task exists in database
+ - Task has an FSM state record
+ - FSM state matches expected value
+
+ Args:
+ task_id: Task ID to check
+ expected_state: Expected TaskStateChoices value
+ msg: Optional custom assertion message
+
+ Raises:
+ AssertionError: If state doesn't match expected value
+ """
+ task = Task.objects.get(id=task_id)
+ actual_state = StateManager.get_current_state_value(task)
+
+ error_msg = msg or f'Task {task_id} state mismatch: expected {expected_state}, got {actual_state}'
+ assert actual_state == expected_state, error_msg
+
+ logger.info(
+ f'✓ Task {task_id} has correct state: {actual_state}',
+ extra={
+ 'event': 'fsm.test_assertion_pass',
+ 'entity_type': 'task',
+ 'entity_id': task_id,
+ 'state': actual_state,
+ },
+ )
+
+
+def assert_annotation_state(annotation_id: int, expected_state: str, msg: Optional[str] = None):
+ """
+ Assert that an annotation has the expected FSM state.
+
+ This test validates step by step:
+ - Annotation exists in database
+ - Annotation has an FSM state record
+ - FSM state matches expected value
+
+ Args:
+ annotation_id: Annotation ID to check
+ expected_state: Expected AnnotationStateChoices value
+ msg: Optional custom assertion message
+
+ Raises:
+ AssertionError: If state doesn't match expected value
+ """
+ annotation = Annotation.objects.get(id=annotation_id)
+ actual_state = StateManager.get_current_state_value(annotation)
+
+ error_msg = msg or f'Annotation {annotation_id} state mismatch: expected {expected_state}, got {actual_state}'
+ assert actual_state == expected_state, error_msg
+
+ logger.info(
+ f'✓ Annotation {annotation_id} has correct state: {actual_state}',
+ extra={
+ 'event': 'fsm.test_assertion_pass',
+ 'entity_type': 'annotation',
+ 'entity_id': annotation_id,
+ 'state': actual_state,
+ },
+ )
+
+
+def assert_draft_state(draft_id: int, expected_state: str, msg: Optional[str] = None):
+ """
+ Assert that a draft has the expected FSM state.
+
+ This test validates step by step:
+ - Draft exists in database
+ - Draft has an FSM state record
+ - FSM state matches expected value
+
+ Args:
+ draft_id: AnnotationDraft ID to check
+ expected_state: Expected AnnotationDraftStateChoices value
+ msg: Optional custom assertion message
+
+ Raises:
+ AssertionError: If state doesn't match expected value
+ """
+ draft = AnnotationDraft.objects.get(id=draft_id)
+ actual_state = StateManager.get_current_state_value(draft)
+
+ error_msg = msg or f'Draft {draft_id} state mismatch: expected {expected_state}, got {actual_state}'
+ assert actual_state == expected_state, error_msg
+
+ logger.info(
+ f'✓ Draft {draft_id} has correct state: {actual_state}',
+ extra={
+ 'event': 'fsm.test_assertion_pass',
+ 'entity_type': 'draft',
+ 'entity_id': draft_id,
+ 'state': actual_state,
+ },
+ )
+
+
+def assert_project_state(project_id: int, expected_state: str, msg: Optional[str] = None):
+ """
+ Assert that a project has the expected FSM state.
+
+ This test validates step by step:
+ - Project exists in database
+ - Project has an FSM state record
+ - FSM state matches expected value
+
+ Args:
+ project_id: Project ID to check
+ expected_state: Expected ProjectStateChoices value
+ msg: Optional custom assertion message
+
+ Raises:
+ AssertionError: If state doesn't match expected value
+ """
+ project = Project.objects.get(id=project_id)
+ actual_state = StateManager.get_current_state_value(project)
+
+ error_msg = msg or f'Project {project_id} state mismatch: expected {expected_state}, got {actual_state}'
+ assert actual_state == expected_state, error_msg
+
+ logger.info(
+ f'✓ Project {project_id} has correct state: {actual_state}',
+ extra={
+ 'event': 'fsm.test_assertion_pass',
+ 'entity_type': 'project',
+ 'entity_id': project_id,
+ 'state': actual_state,
+ },
+ )
+
+
+def assert_state_exists(entity, entity_type: str = None):
+ """
+ Assert that an entity has an FSM state record.
+
+ This test validates:
+ - Entity has a state record in the FSM system
+ - State is not None
+
+ Args:
+ entity: Entity instance (Task, Annotation, etc.)
+ entity_type: Optional entity type string for better error messages
+
+ Raises:
+ AssertionError: If entity has no FSM state record
+ """
+ actual_state = StateManager.get_current_state_value(entity)
+ entity_type = entity_type or entity._meta.label_lower
+
+ assert actual_state is not None, f'{entity_type} {entity.id} has no FSM state record'
+
+ logger.info(
+ f'✓ {entity_type} {entity.id} has FSM state: {actual_state}',
+ extra={
+ 'event': 'fsm.test_state_exists',
+ 'entity_type': entity_type,
+ 'entity_id': entity.id,
+ 'state': actual_state,
+ },
+ )
+
+
+def assert_state_not_exists(entity, entity_type: str = None):
+ """
+ Assert that an entity does NOT have an FSM state record.
+
+ Useful for testing scenarios where FSM should be skipped.
+
+ Args:
+ entity: Entity instance (Task, Annotation, etc.)
+ entity_type: Optional entity type string for better error messages
+
+ Raises:
+ AssertionError: If entity has an FSM state record
+ """
+ actual_state = StateManager.get_current_state_value(entity)
+ entity_type = entity_type or entity._meta.label_lower
+
+ assert actual_state is None, f'{entity_type} {entity.id} unexpectedly has FSM state: {actual_state}'
+
+ logger.info(
+ f'✓ {entity_type} {entity.id} correctly has no FSM state',
+ extra={
+ 'event': 'fsm.test_no_state',
+ 'entity_type': entity_type,
+ 'entity_id': entity.id,
+ },
+ )
+
+
+# ============================================================================
+# Common Test Label Configs
+# ============================================================================
+
+SIMPLE_TEXT_CLASSIFICATION_CONFIG = """
+
+
+
+
+
+
+
+
+"""
+
+IMAGE_CLASSIFICATION_CONFIG = """
+
+
+
+
+
+
+
+
+"""
+
+NER_CONFIG = """
+
+
+
+
+
+
+
+
+"""
diff --git a/label_studio/fsm/tests/test_storage_sync_workflows.py b/label_studio/fsm/tests/test_storage_sync_workflows.py
new file mode 100644
index 000000000000..6fbc298eb4f5
--- /dev/null
+++ b/label_studio/fsm/tests/test_storage_sync_workflows.py
@@ -0,0 +1,381 @@
+"""
+Comprehensive tests for Storage Sync FSM workflows.
+
+This test module validates FSM state transitions for tasks created from
+storage sync operations, following patterns from label_studio/tests/sdk/test_storages.py
+
+Test Coverage:
+1. S3 Import Storage Sync - create tasks from S3
+2. GCS Import Storage Sync - create tasks from GCS
+3. Azure Import Storage Sync - create tasks from Azure
+4. Local Storage Sync - create tasks from local files
+"""
+
+import json
+
+import pytest
+from fsm.state_choices import AnnotationStateChoices, TaskStateChoices
+from fsm.tests.helpers import (
+ IMAGE_CLASSIFICATION_CONFIG,
+ NER_CONFIG,
+ assert_annotation_state,
+ assert_state_exists,
+ assert_task_state,
+ create_sdk_client,
+ setup_fsm_context,
+)
+from tasks.models import Annotation, Task
+
+pytestmark = pytest.mark.django_db
+
+
+class TestS3StorageSyncWorkflows:
+ """Test FSM state management for S3 storage sync operations."""
+
+ def test_s3_sync_recursive_creates_tasks_with_fsm_states(self, django_live_url, business_client):
+ """Test S3 recursive sync creates tasks with FSM states.
+
+ This test validates step by step:
+ - Creating S3 storage with recursive_scan=True
+ - Syncing to import files from subdirectories
+ - Verifying all created tasks have FSM states
+ """
+ setup_fsm_context(business_client.user)
+ ls = create_sdk_client(django_live_url, business_client)
+
+ # Create project
+ project = ls.projects.create(title='FSM S3 Recursive Sync Test', label_config=IMAGE_CLASSIFICATION_CONFIG)
+
+ # Create S3 import storage with recursive scan
+ storage = ls.import_storage.s3.create(
+ project=project.id,
+ bucket='pytest-s3-images',
+ regex_filter='.*',
+ use_blob_urls=True,
+ recursive_scan=True,
+ )
+
+ # Trigger sync
+ sync_result = ls.import_storage.s3.sync(id=storage.id)
+
+ assert sync_result.status == 'completed'
+
+ # Get tasks created by sync
+ tasks = list(ls.tasks.list(project=project.id))
+
+ # Verify each task has FSM state
+ for task in tasks:
+ task_obj = Task.objects.get(id=task.id)
+ assert_state_exists(task_obj, 'task')
+ assert_task_state(task.id, TaskStateChoices.CREATED)
+
+ def test_s3_sync_multiple_times_maintains_fsm_states(self, django_live_url, business_client):
+ """Test multiple S3 syncs maintain FSM state consistency.
+
+ This test validates step by step:
+ - Running initial S3 sync
+ - Verifying tasks have FSM states
+ - Running sync again (should be idempotent)
+ - Verifying FSM states remain consistent
+
+ Critical validation: Re-syncing should not create duplicate
+ tasks or corrupt existing FSM states.
+ """
+ setup_fsm_context(business_client.user)
+ ls = create_sdk_client(django_live_url, business_client)
+
+ # Create project
+ project = ls.projects.create(title='FSM S3 Resync Test', label_config=IMAGE_CLASSIFICATION_CONFIG)
+
+ # Create S3 import storage
+ storage = ls.import_storage.s3.create(
+ project=project.id, bucket='pytest-s3-images', regex_filter='.*', use_blob_urls=True, recursive_scan=False
+ )
+
+ # First sync
+ ls.import_storage.s3.sync(id=storage.id)
+ tasks_after_first_sync = list(ls.tasks.list(project=project.id))
+ first_sync_count = len(tasks_after_first_sync)
+
+ # Verify FSM states
+ for task in tasks_after_first_sync:
+ task_obj = Task.objects.get(id=task.id)
+ assert_state_exists(task_obj, 'task')
+
+ # Second sync (should be idempotent)
+ ls.import_storage.s3.sync(id=storage.id)
+ tasks_after_second_sync = list(ls.tasks.list(project=project.id))
+
+ # Task count should remain the same (idempotent)
+ assert len(tasks_after_second_sync) == first_sync_count
+
+ # FSM states should still exist and be correct
+ for task in tasks_after_second_sync:
+ task_obj = Task.objects.get(id=task.id)
+ assert_state_exists(task_obj, 'task')
+ assert_task_state(task.id, TaskStateChoices.CREATED)
+
+
+class TestGCSStorageSyncWorkflows:
+ """Test FSM state management for GCS storage sync operations."""
+
+ def test_gcs_sync_creates_tasks_with_fsm_states(self, django_live_url, business_client):
+ """Test GCS storage sync creates tasks with FSM states.
+
+ This test validates step by step:
+ - Setting up GCS import storage
+ - Triggering sync operation
+ - Verifying created tasks have FSM state records
+
+ Critical validation: Tasks created from GCS storage sync should
+ have FSM states just like S3 and direct API creation.
+ """
+ setup_fsm_context(business_client.user)
+ ls = create_sdk_client(django_live_url, business_client)
+
+ # Create project
+ project = ls.projects.create(title='FSM GCS Sync Test', label_config=IMAGE_CLASSIFICATION_CONFIG)
+
+ storage = ls.import_storage.gcs.create(
+ project=project.id, bucket='pytest-gs-bucket', regex_filter='.*', use_blob_urls=True
+ )
+
+ # Trigger sync
+ sync_result = ls.import_storage.gcs.sync(id=storage.id)
+ assert sync_result.status == 'completed'
+
+ # Get tasks created by sync
+ tasks = list(ls.tasks.list(project=project.id))
+
+ # Verify each task has FSM state
+ for task in tasks:
+ task_obj = Task.objects.get(id=task.id)
+ assert_state_exists(task_obj, 'task')
+ assert_task_state(task.id, TaskStateChoices.CREATED)
+
+
+class TestAzureStorageSyncWorkflows:
+ """Test FSM state management for Azure storage sync operations."""
+
+ def test_azure_sync_creates_tasks_with_fsm_states(self, django_live_url, business_client):
+ """Test Azure storage sync creates tasks with FSM states.
+
+ This test validates step by step:
+ - Setting up Azure import storage
+ - Triggering sync operation
+ - Verifying created tasks have FSM state records
+
+ Critical validation: Tasks created from Azure storage sync should
+ have FSM states consistent with other storage types.
+ """
+ setup_fsm_context(business_client.user)
+ ls = create_sdk_client(django_live_url, business_client)
+
+ # Create project
+ project = ls.projects.create(title='FSM Azure Sync Test', label_config=IMAGE_CLASSIFICATION_CONFIG)
+
+ # Try to create Azure import storage
+ try:
+ storage = ls.import_storage.azure.create(
+ project=project.id, container='pytest-azure-container', regex_filter='.*', use_blob_urls=False
+ )
+
+ # Trigger sync
+ sync_result = ls.import_storage.azure.sync(id=storage.id)
+ assert sync_result.status in ('initialized', 'queued', 'completed')
+
+ # Get tasks created by sync
+ tasks = list(ls.tasks.list(project=project.id))
+
+ # Verify each task has FSM state
+ for task in tasks:
+ task_obj = Task.objects.get(id=task.id)
+ assert_state_exists(task_obj, 'task')
+ assert_task_state(task.id, TaskStateChoices.CREATED)
+ except Exception as e:
+ # Azure might not be configured in test environment
+ pytest.skip(f'Azure storage not available in test environment: {e}')
+
+
+class TestLocalStorageSyncWorkflows:
+ """Test FSM state management for local storage sync operations."""
+
+ def test_local_storage_sync_creates_tasks_with_fsm_states(
+ self, django_live_url, business_client, tmp_path, settings
+ ):
+ """Test local storage sync creates tasks with FSM states.
+
+ This test validates step by step:
+ - Setting up local import storage
+ - Triggering sync operation
+ - Verifying created tasks have FSM state records
+
+ Critical validation: Tasks created from local storage sync should
+ have FSM states like cloud storage types.
+ """
+ setup_fsm_context(business_client.user)
+ ls = create_sdk_client(django_live_url, business_client)
+
+ # Create project
+ project = ls.projects.create(title='FSM Local Storage Sync Test', label_config=IMAGE_CLASSIFICATION_CONFIG)
+ local_root = tmp_path / 'local-storage'
+ local_root.mkdir()
+ (local_root / 'image1.jpg').write_text('123')
+ (local_root / 'subdir').mkdir()
+ (local_root / 'subdir' / 'image2.jpg').write_text('456')
+
+ settings.LOCAL_FILES_DOCUMENT_ROOT = tmp_path
+ settings.LOCAL_FILES_SERVING_ENABLED = True
+
+ storage = ls.import_storage.local.create(
+ project=project.id, path=str(local_root), regex_filter='.*', use_blob_urls=True
+ )
+
+ # Trigger sync
+ sync_result = ls.import_storage.local.sync(id=storage.id)
+ assert sync_result.status == 'completed'
+
+ # Get tasks created by sync
+ tasks = list(ls.tasks.list(project=project.id))
+
+ # Verify each task has FSM state
+ for task in tasks:
+ task_obj = Task.objects.get(id=task.id)
+ assert_state_exists(task_obj, 'task')
+ assert_task_state(task.id, TaskStateChoices.CREATED)
+
+
+@pytest.mark.skip(reason='TODO')
+class TestStorageSyncWithAnnotations:
+ """Test FSM state management for storage sync with pre-labeled data."""
+
+ def test_sync_with_preannotated_tasks_including_predictions_and_annotations(
+ self, django_live_url, business_client, tmp_path, settings
+ ):
+ """Test local storage sync imports tasks with predictions and annotations.
+
+ This test validates step by step:
+ - Creating a local import storage pointing to a directory containing a JSON file
+ with Label Studio tasks that include both "annotations" and "predictions"
+ - Running a storage sync to create tasks, predictions, and annotations in the database
+ - Verifying each created Task has an FSM state
+ - Verifying each imported Annotation has an FSM state (Submitted/Completed)
+ - Verifying Prediction rows are created and linked to the imported tasks
+
+ Critical validation: Storage sync must support fully pre-labeled task JSON
+ payloads (tasks + predictions + annotations) and still initialize FSM states
+ correctly for tasks and annotations.
+ """
+ setup_fsm_context(business_client.user)
+ ls = create_sdk_client(django_live_url, business_client)
+
+ # Create a project whose label config matches the annotation/prediction results we import.
+ project = ls.projects.create(title='FSM Sync with Annots+Preds Test', label_config=NER_CONFIG)
+
+ # Make the test deterministic: storage import sets task.is_labeled based on project.maximum_annotations.
+ # If we ever change defaults, explicitly keep this test in the 1-annotation overlap regime.
+ from projects.models import Project
+
+ Project.objects.filter(id=project.id).update(maximum_annotations=1)
+
+ # Local files storage requires the storage path to be under LOCAL_FILES_DOCUMENT_ROOT and
+ # requires local file serving to be explicitly enabled (security guard).
+ local_root = tmp_path / 'local-storage-json'
+ local_root.mkdir()
+ settings.LOCAL_FILES_DOCUMENT_ROOT = tmp_path
+ settings.LOCAL_FILES_SERVING_ENABLED = True
+
+ tasks_payload = [
+ {
+ 'data': {'text': 'John Doe works at Acme Corp.'},
+ 'predictions': [
+ {
+ 'model_version': 'test-model-v1',
+ 'score': 0.9,
+ 'result': [
+ {
+ 'value': {'start': 0, 'end': 8, 'text': 'John Doe', 'labels': ['Person']},
+ 'from_name': 'label',
+ 'to_name': 'text',
+ 'type': 'labels',
+ }
+ ],
+ }
+ ],
+ 'annotations': [
+ {
+ 'result': [
+ {
+ 'value': {'start': 0, 'end': 8, 'text': 'John Doe', 'labels': ['Person']},
+ 'from_name': 'label',
+ 'to_name': 'text',
+ 'type': 'labels',
+ },
+ {
+ 'value': {'start': 18, 'end': 27, 'text': 'Acme Corp', 'labels': ['Organization']},
+ 'from_name': 'label',
+ 'to_name': 'text',
+ 'type': 'labels',
+ },
+ ]
+ }
+ ],
+ },
+ {
+ 'data': {'text': 'Jane visited Paris.'},
+ 'predictions': [
+ {
+ 'model_version': 'test-model-v1',
+ 'result': [
+ {
+ 'value': {'start': 0, 'end': 4, 'text': 'Jane', 'labels': ['Person']},
+ 'from_name': 'label',
+ 'to_name': 'text',
+ 'type': 'labels',
+ },
+ {
+ 'value': {'start': 13, 'end': 18, 'text': 'Paris', 'labels': ['Location']},
+ 'from_name': 'label',
+ 'to_name': 'text',
+ 'type': 'labels',
+ },
+ ],
+ }
+ ],
+ 'annotations': [
+ {
+ 'result': [
+ {
+ 'value': {'start': 0, 'end': 4, 'text': 'Jane', 'labels': ['Person']},
+ 'from_name': 'label',
+ 'to_name': 'text',
+ 'type': 'labels',
+ }
+ ]
+ }
+ ],
+ },
+ ]
+ (local_root / 'tasks.json').write_text(json.dumps(tasks_payload))
+
+ storage = ls.import_storage.local.create(
+ project=project.id,
+ path=str(local_root),
+ regex_filter=r'.*\.json$',
+ use_blob_urls=False,
+ )
+
+ # Trigger sync
+ sync_result = ls.import_storage.local.sync(id=storage.id)
+ assert sync_result.status == 'completed'
+
+ # FSM assertions only (avoid coupling to DB counters/content beyond what we need to locate entities)
+ for task in Task.objects.filter(project_id=project.id).order_by('id'):
+ assert_state_exists(task, 'task')
+ # Imported tasks include annotations, therefore task.is_labeled=True and inferred state is COMPLETED.
+ assert_task_state(task.id, TaskStateChoices.COMPLETED)
+
+ for annotation in Annotation.objects.filter(project_id=project.id).order_by('id'):
+ assert_state_exists(annotation, 'annotation')
+ # Cold-start inference for imported annotations is SUBMITTED.
+ assert_annotation_state(annotation.id, AnnotationStateChoices.SUBMITTED)
diff --git a/label_studio/fsm/tests/test_utils.py b/label_studio/fsm/tests/test_utils.py
index 02ecbe838ee0..f1ae5715dcd3 100644
--- a/label_studio/fsm/tests/test_utils.py
+++ b/label_studio/fsm/tests/test_utils.py
@@ -568,7 +568,7 @@ def test_get_or_initialize_state_accepts_reason_parameter(self):
mock_get_sm.return_value = mock_sm
# Mock state inference
- with patch('fsm.utils.infer_entity_state_from_data', return_value='IN_PROGRESS'):
+ with patch('fsm.state_inference._get_or_infer_state', return_value='IN_PROGRESS'):
with patch('fsm.utils._get_initialization_transition_name', return_value='init_transition'):
# Call with reason
get_or_initialize_state(
@@ -612,7 +612,7 @@ def test_get_or_initialize_state_accepts_context_data_parameter(self):
mock_get_sm.return_value = mock_sm
# Mock state inference
- with patch('fsm.utils.infer_entity_state_from_data', return_value='IN_PROGRESS'):
+ with patch('fsm.state_inference._get_or_infer_state', return_value='IN_PROGRESS'):
with patch('fsm.utils._get_initialization_transition_name', return_value='init_transition'):
# Call with context_data
get_or_initialize_state(
@@ -658,7 +658,7 @@ def test_get_or_initialize_state_with_both_reason_and_context_data(self):
mock_get_sm.return_value = mock_sm
# Mock state inference
- with patch('fsm.utils.infer_entity_state_from_data', return_value='IN_PROGRESS'):
+ with patch('fsm.state_inference._get_or_infer_state', return_value='IN_PROGRESS'):
with patch('fsm.utils._get_initialization_transition_name', return_value='init_transition'):
# Call with both reason and context_data
get_or_initialize_state(
@@ -698,7 +698,7 @@ def test_get_or_initialize_state_defaults_context_data_to_empty_dict(self):
mock_get_sm.return_value = mock_sm
# Mock state inference
- with patch('fsm.utils.infer_entity_state_from_data', return_value='IN_PROGRESS'):
+ with patch('fsm.state_inference._get_or_infer_state', return_value='IN_PROGRESS'):
with patch('fsm.utils._get_initialization_transition_name', return_value='init_transition'):
# Call without context_data
get_or_initialize_state(
diff --git a/label_studio/fsm/utils.py b/label_studio/fsm/utils.py
index ea6249c9fc09..ce7179b3de2a 100644
--- a/label_studio/fsm/utils.py
+++ b/label_studio/fsm/utils.py
@@ -279,11 +279,9 @@ def is_fsm_enabled(user=None) -> bool:
def get_current_state_safe(entity, user=None) -> Optional[str]:
"""
Safely get current state with error handling.
-
Args:
entity: The entity to get state for
user: The user making the request (for feature flag checking)
-
Returns:
Current state string or None if failed
"""
@@ -309,87 +307,21 @@ def get_current_state_safe(entity, user=None) -> Optional[str]:
return None
-def infer_entity_state_from_data(entity) -> Optional[str]:
- """
- Infer what the FSM state should be based on entity's current data.
-
- This is used for "cold start" scenarios where entities exist in the database
- but don't have FSM state records yet (e.g., after FSM deployment to production
- with pre-existing data).
-
- Args:
- entity: The entity to infer state for (Task, Project, or Annotation)
-
- Returns:
- Inferred state value, or None if entity type not supported
-
- Examples:
- >>> task = Task.objects.get(id=123)
- >>> task.is_labeled = True
- >>> infer_entity_state_from_data(task)
- 'COMPLETED'
-
- >>> project = Project.objects.get(id=456)
- >>> infer_entity_state_from_data(project)
- 'CREATED'
- """
- from fsm.state_choices import AnnotationStateChoices, ProjectStateChoices, TaskStateChoices
-
- entity_type = entity._meta.model_name.lower()
-
- if entity_type == 'task':
- # Task state depends on whether it has been labeled
- return TaskStateChoices.COMPLETED if entity.is_labeled else TaskStateChoices.CREATED
- elif entity_type == 'project':
- # Project state depends on task completion
- # If no tasks exist, project is CREATED
- # If any tasks are completed, project is at least IN_PROGRESS
- # If all tasks are completed, project is COMPLETED
- tasks = entity.tasks.all()
- if not tasks.exists():
- return ProjectStateChoices.CREATED
-
- # Count labeled tasks to determine project state
- total_tasks = tasks.count()
- labeled_tasks = tasks.filter(is_labeled=True).count()
-
- if labeled_tasks == 0:
- return ProjectStateChoices.CREATED
- elif labeled_tasks == total_tasks:
- return ProjectStateChoices.COMPLETED
- else:
- return ProjectStateChoices.IN_PROGRESS
- elif entity_type == 'annotation':
- # Annotations are SUBMITTED when created
- return AnnotationStateChoices.SUBMITTED
- else:
- logger.warning(
- f'Cannot infer state for unknown entity type: {entity_type}',
- extra={
- 'event': 'fsm.infer_state_unknown_type',
- 'entity_type': entity_type,
- 'entity_id': entity.pk,
- },
- )
- return None
-
-
-def get_or_initialize_state(entity, user=None, inferred_state=None, reason=None, context_data=None) -> Optional[str]:
+def get_or_initialize_state(entity, user, inferred_state: str, reason=None, context_data=None) -> Optional[str]:
"""
Get current state, or initialize it if it doesn't exist.
This function handles "cold start" scenarios where pre-existing entities
don't have FSM state records. It will:
- 1. Try to get the current state
- 2. If None, infer the state from entity data
- 3. Initialize the state with an appropriate transition
- 4. Return the state value (never returns None if initialization succeeds)
+ 1. If the state already exists, use that
+ 2. If the state doesn't exist, infer the state from the entity and initialize it with an appropriate transition
+ 2. Return the state value (never returns None if initialization succeeds)
Args:
entity: The entity to get or initialize state for
- user: User for FSM context (optional)
- inferred_state: Pre-computed inferred state (optional, will compute if not provided)
- reason: Custom reason for the state initialization (optional, overrides default)
+ user: User for FSM context
+ inferred_state: Pre-computed inferred state
+ reason: Custom reason for the state initialization (optional, overrides default reason)
context_data: Additional context data to store with state record (optional)
Returns:
@@ -397,7 +329,9 @@ def get_or_initialize_state(entity, user=None, inferred_state=None, reason=None,
Examples:
>>> task = Task.objects.get(id=123) # Pre-existing task without state
- >>> state = get_or_initialize_state(task, user=request.user)
+ >>> from fsm.state_inference import get_or_infer_state
+ >>> inferred_state = get_or_infer_state(task)
+ >>> state = get_or_initialize_state(task, user=request.user, inferred_state=inferred_state)
>>> # state is now 'COMPLETED' or 'CREATED' based on task.is_labeled
>>> # and a state record has been created
"""
@@ -420,10 +354,6 @@ def get_or_initialize_state(entity, user=None, inferred_state=None, reason=None,
# State already exists, return it
return current_state
- # No state exists - need to initialize it
- if inferred_state is None:
- inferred_state = infer_entity_state_from_data(entity)
-
if inferred_state is None:
logger.warning(
f'Cannot initialize state for {entity._meta.model_name} {entity.pk} - inference failed',
diff --git a/label_studio/io_storages/base_models.py b/label_studio/io_storages/base_models.py
index e62884507ff8..ac6f07a39630 100644
--- a/label_studio/io_storages/base_models.py
+++ b/label_studio/io_storages/base_models.py
@@ -29,6 +29,7 @@
from django.shortcuts import reverse
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
+from fsm.functions import backfill_fsm_states_for_tasks
from io_storages.utils import StorageObject, get_uri_via_regex, parse_bucket_uri
from rest_framework.exceptions import ValidationError
from rq.job import Job
@@ -646,9 +647,6 @@ def _scan_and_create_links(self, link_class):
)
# Create initial FSM states for all tasks created during storage sync
- # CurrentContext is now available because we use start_job_async_or_sync
- from fsm.functions import backfill_fsm_states_for_tasks
-
backfill_fsm_states_for_tasks(self.id, tasks_created, link_class)
self.project.update_tasks_states(
diff --git a/label_studio/tasks/serializers.py b/label_studio/tasks/serializers.py
index 7fc500c2d6c3..5d231da49893 100644
--- a/label_studio/tasks/serializers.py
+++ b/label_studio/tasks/serializers.py
@@ -12,8 +12,8 @@
from django.db import IntegrityError, transaction
from drf_spectacular.utils import extend_schema_field
from fsm.serializer_fields import FSMStateField
-from fsm.state_manager import get_state_manager
-from fsm.utils import is_fsm_enabled
+from fsm.state_inference import get_or_infer_state
+from fsm.utils import get_or_initialize_state, is_fsm_enabled
from label_studio_sdk.label_interface import LabelInterface
from projects.models import Project
from rest_flex_fields import FlexFieldsModelSerializer
@@ -716,14 +716,16 @@ def _backfill_fsm_states(self, tasks):
Backfill FSM states for tasks created via bulk_create().
bulk_create() bypasses the model's save() method, so FSM transitions
- don't fire automatically. This sets initial CREATED state for newly imported tasks.
+ don't fire automatically. This sets initial state for newly imported tasks.
"""
- if not tasks or not is_fsm_enabled(user=None):
+ # TODO: extend this to importing other bulk objects
+ user = CurrentContext.get_user()
+ if not tasks or not is_fsm_enabled(user):
return
- StateManager = get_state_manager()
for task in tasks:
- StateManager.execute_transition(entity=task, transition_name='task_created', user=None)
+ inferred_state = get_or_infer_state(task)
+ get_or_initialize_state(task, user=user, inferred_state=inferred_state)
@staticmethod
def post_process_annotations(user, db_annotations, action):