Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
f5a4efb
feat: Add autofix infrastructure for validation issues
Magssch Feb 4, 2026
945ab97
Refactoring
Magssch Feb 5, 2026
3a99bb2
Linting and static code checks
Magssch Feb 5, 2026
2f75557
Add future fix support for ChangedField
Magssch Feb 5, 2026
2521b2e
Minor refactoring
Magssch Feb 5, 2026
2cf0fa7
Refactoring
Magssch Feb 5, 2026
ff77992
Fix test
Magssch Feb 5, 2026
42c80e9
Merge branch 'main' into feat/autofix-part1
Magssch Feb 5, 2026
3234141
Change tests slightly
Magssch Feb 5, 2026
f468191
Minor refactoring
Magssch Feb 5, 2026
526e09e
Fix lint
Magssch Feb 5, 2026
0bb23a2
Minor refactor
Magssch Feb 9, 2026
89ce43e
Refactor fix infrastructure: merge modules, use model_copy, return fi…
Magssch Feb 10, 2026
a9a11e0
Refactoring to use provenance
Magssch Feb 10, 2026
03f17cb
Linting and static code checks
Magssch Feb 10, 2026
7e17641
Refactoring
Magssch Feb 10, 2026
4ead64e
Refactoring
Magssch Feb 10, 2026
86cb08a
Refactoring
Magssch Feb 10, 2026
5f35122
More minor refactoring
Magssch Feb 10, 2026
6878991
Revert unintended changes
Magssch Feb 10, 2026
c8ece61
Revert some changes
Magssch Feb 10, 2026
79b4a71
Refactoring
Magssch Feb 10, 2026
7097051
Add back tests and refactor
Magssch Feb 10, 2026
2129d7d
Fix tests
Magssch Feb 10, 2026
49e7b14
Fix lint
Magssch Feb 10, 2026
5cd19ef
Merge branch 'main' into feat/autofix-part1
Magssch Feb 10, 2026
58b3ded
Address comments from gemini review
Magssch Feb 10, 2026
9db1919
Minor refactoring
Magssch Feb 10, 2026
faf82c3
Minor refactor
Magssch Feb 10, 2026
9b6b05d
Linting and static code checks
Magssch Feb 10, 2026
7f4c971
Test refactoring
Magssch Feb 10, 2026
34bdd32
Fix lint
Magssch Feb 10, 2026
07baacb
Disable fix mechanism until exposed later
Magssch Feb 11, 2026
5c05009
Remove methods to be reintroduced later
Magssch Feb 16, 2026
ec70486
Make changes per review
Magssch Feb 16, 2026
36ac219
More refactoring
Magssch Feb 16, 2026
ddef15f
Linting and static code checks
Magssch Feb 16, 2026
d0d8f56
Revert unecessary diff
Magssch Feb 16, 2026
28adb3d
Refactoring
Magssch Feb 16, 2026
f5cad88
Address review comments
Magssch Feb 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions cognite/neat/_data_model/_fix.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from pydantic import BaseModel, ConfigDict, Field

from cognite.neat._data_model.deployer.data_classes import FieldChange
from cognite.neat._data_model.models.dms import SchemaResourceId


class FixAction(BaseModel):
"""An atomic, individually-applicable fix for a schema issue.

Attributes:
resource_id: Reference to the resource being modified.
changes: List of field-level changes.
message: Human-readable description of what this fix does.
code: The validator code (e.g., "NEAT-DMS-PERFORMANCE-001") for grouping in UI.
"""

model_config = ConfigDict(frozen=True)

resource_id: SchemaResourceId
changes: tuple[FieldChange, ...] = Field(default_factory=tuple)
message: str | None = None
code: str
16 changes: 16 additions & 0 deletions cognite/neat/_data_model/_shared.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from abc import ABC, abstractmethod
from typing import Any

from cognite.neat._data_model._fix import FixAction
from cognite.neat._data_model.deployer.data_classes import DeploymentResult
from cognite.neat._issues import IssueList

Expand All @@ -19,14 +20,29 @@ class OnSuccessIssuesChecker(OnSuccess, ABC):

def __init__(self) -> None:
self._issues = IssueList()
self._pending_fixes: list[FixAction] = []
self._has_run = False

@property
def pending_fixes(self) -> list[FixAction]:
"""Return collected fix actions. Subclasses that support fixing should populate _pending_fixes."""
if not self._has_run:
raise RuntimeError(f"{type(self).__name__} has not been run yet.")
return self._pending_fixes

@property
def issues(self) -> IssueList:
if not self._has_run:
raise RuntimeError(f"{type(self).__name__} has not been run yet.")
return IssueList(self._issues)

def copy(self) -> "OnSuccessIssuesChecker":
"""
Create a new instance of this handler with the same configuration but with a clean state.
This is used to enable re-running the handler after the data model state has been modified.
"""
raise NotImplementedError(f"{type(self).__name__} does not support copying instances.")


class OnSuccessResultProducer(OnSuccess, ABC):
"""Abstract base class for post-activity success handlers that produce desired outcomes using the data model."""
Expand Down
3 changes: 2 additions & 1 deletion cognite/neat/_data_model/models/dms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
from cognite.neat._data_model.models.dms._space import Space, SpaceRequest, SpaceResponse

from ._data_model import DataModelRequest, DataModelResponse
from ._http import DataModelBody, DataModelResource, ResourceId, T_DataModelResource, T_ResourceId
from ._http import DataModelBody, DataModelResource, ResourceId, SchemaResourceId, T_DataModelResource, T_ResourceId
from ._references import (
ContainerConstraintReference,
ContainerDirectReference,
Expand Down Expand Up @@ -171,6 +171,7 @@
"RequiresConstraintDefinition",
"Resource",
"ResourceId",
"SchemaResourceId",
"SequenceCDFExternalIdReference",
"SequenceCDFExternalIdReference",
"SingleEdgeProperty",
Expand Down
13 changes: 5 additions & 8 deletions cognite/neat/_data_model/models/dms/_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@

T_DataModelResource = TypeVar("T_DataModelResource", bound=DataModelResource)

ResourceId: TypeAlias = (
SpaceReference
| DataModelReference
| ViewReference
| ContainerReference
| ContainerIndexReference
| ContainerConstraintReference
)
# Top-level schema resources (spaces, data models, views, containers)
SchemaResourceId: TypeAlias = SpaceReference | DataModelReference | ViewReference | ContainerReference

# All resource IDs including nested refs (used by deployer for API responses)
ResourceId: TypeAlias = SchemaResourceId | ContainerIndexReference | ContainerConstraintReference

T_ResourceId = TypeVar("T_ResourceId", bound=ResourceId)

Expand Down
4 changes: 2 additions & 2 deletions cognite/neat/_data_model/rules/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import ClassVar

from cognite.neat._data_model._analysis import ValidationResources
from cognite.neat._data_model.models.dms._schema import RequestSchema
from cognite.neat._data_model._fix import FixAction
from cognite.neat._issues import ConsistencyError, Recommendation


Expand All @@ -25,7 +25,7 @@ def validate(self) -> list[ConsistencyError] | list[Recommendation] | list[Consi
"""Execute rule validation."""
...

def fix(self) -> RequestSchema:
def fix(self) -> list[FixAction]:
"""Fix the issues found by the validator producing a fixed object."""

raise NotImplementedError("This rule does not implement fix()")
11 changes: 11 additions & 0 deletions cognite/neat/_data_model/rules/dms/_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,20 @@ def run(self, request_schema: RequestSchema) -> None:
continue
if self._can_run_validator(validator.code, validator.issue_type):
self._issues.extend(validator.validate())
if validator.fixable:
self._pending_fixes.extend(validator.fix())

self._has_run = True

def copy(self) -> "DmsDataModelRulesOrchestrator":
return DmsDataModelRulesOrchestrator(
cdf_snapshot=self._cdf_snapshot,
limits=self._limits,
modus_operandi=self._modus_operandi,
can_run_validator=self._can_run_validator,
enable_alpha_validators=self._enable_alpha_validators,
)

def _gather_validation_resources(self, request_schema: RequestSchema) -> ValidationResources:
# we do not want to modify the original request schema during validation
copy = request_schema.model_copy(deep=True)
Expand Down
4 changes: 4 additions & 0 deletions cognite/neat/_data_model/transformers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from ._base import Transformer
from ._fix_applicator import FixApplicator

__all__ = ["FixApplicator", "Transformer"]
12 changes: 12 additions & 0 deletions cognite/neat/_data_model/transformers/_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from abc import ABC, abstractmethod

from cognite.neat._data_model.models.dms._schema import RequestSchema


class Transformer(ABC):
"""Abstract base class for data model transformers."""

@abstractmethod
def transform(self, data_model: RequestSchema) -> RequestSchema:
"""Transform and return the modified data model."""
raise NotImplementedError()
99 changes: 99 additions & 0 deletions cognite/neat/_data_model/transformers/_fix_applicator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
from collections import defaultdict

from cognite.neat._data_model._fix import FixAction
from cognite.neat._data_model.deployer.data_classes import (
AddedField,
ChangedField,
FieldChange,
PrimitiveField,
RemovedField,
)
from cognite.neat._data_model.models.dms import (
ContainerReference,
DataModelReference,
DataModelResource,
SchemaResourceId,
SpaceReference,
ViewReference,
)
from cognite.neat._data_model.models.dms._schema import RequestSchema
from cognite.neat._data_model.transformers._base import Transformer


class FixApplicator(Transformer):
"""Applies the changes in FixAction objects to a schema."""

def __init__(self, fix_actions: list[FixAction]) -> None:
self._fix_actions = fix_actions

def transform(self, data_model: RequestSchema) -> RequestSchema:
"""Apply fix actions and return the fixed schema (a deep copy of the original)."""
result = data_model.model_copy(deep=True)

if not self._fix_actions:
return result

fix_by_resource_id: dict[SchemaResourceId, list[FixAction]] = defaultdict(list)
for action in self._fix_actions:
fix_by_resource_id[action.resource_id].append(action)

resources_list_lookup: dict[type, dict[SchemaResourceId, DataModelResource]] = {
ViewReference: {view.as_reference(): view for view in result.views},
ContainerReference: {container.as_reference(): container for container in result.containers},
SpaceReference: {space.as_reference(): space for space in result.spaces},
DataModelReference: {result.data_model.as_reference(): result.data_model},
}

for resource_id, actions in fix_by_resource_id.items():
resource_lookup = resources_list_lookup.get(type(resource_id))
if resource_lookup is None:
raise RuntimeError(
f"{type(self).__name__}: Unsupported resource type {type(resource_id)}. This is a bug in NEAT."
)
resource = resource_lookup.get(resource_id)
if resource is None:
raise RuntimeError(
f"{type(self).__name__}: Resource {resource_id} not found in schema. This is a bug in NEAT."
)

all_changes_for_resource = [change for action in actions for change in action.changes]
self._check_no_field_path_conflicts(all_changes_for_resource)
self._apply_changes_to_resource(resource, all_changes_for_resource)

return result

def _apply_changes_to_resource(self, resource: DataModelResource, changes: list[FieldChange]) -> None:
"""Apply field changes to the resource in place."""
for change in changes:
if not isinstance(change, PrimitiveField):
raise RuntimeError(
f"{type(self).__name__}: Only primitive field changes are supported, "
f"got {type(change).__name__}. This is a bug in NEAT."
)
if "." not in change.field_path:
raise RuntimeError(
f"{type(self).__name__}: Invalid field_path '{change.field_path}' "
"(expected 'field_name.identifier' format). This is a bug in NEAT."
)
field_name, identifier = change.field_path.split(".", maxsplit=1)
field_map = getattr(resource, field_name, None)
if field_map is None:
field_map = {}
setattr(resource, field_name, field_map)
if isinstance(change, RemovedField):
field_map.pop(identifier, None)
elif isinstance(change, AddedField | ChangedField):
field_map[identifier] = change.new_value
if not field_map:
setattr(resource, field_name, None)

def _check_no_field_path_conflicts(self, changes: list[FieldChange]) -> None:
"""Raise if any changes touch a field_path already modified by a previous change."""
seen_paths: set[str] = set()
for change in changes:
if change.field_path in seen_paths:
raise RuntimeError(
f"{type(self).__name__}: Conflicting fixes — multiple changes "
f"to '{change.field_path}'. This is a bug in NEAT."
)
seen_paths.add(change.field_path)
3 changes: 2 additions & 1 deletion cognite/neat/_state_machine/_states.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from cognite.neat._data_model.exporters import DMSExporter
from cognite.neat._data_model.importers import DMSImporter
from cognite.neat._data_model.transformers import Transformer

from ._base import State

Expand Down Expand Up @@ -46,7 +47,7 @@ class PhysicalState(State):
"""

def transition(self, event: Any) -> State:
if isinstance(event, DMSExporter):
if isinstance(event, DMSExporter | Transformer):
return PhysicalState()

return ForbiddenState(self)
2 changes: 2 additions & 0 deletions cognite/neat/_store/_provenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from datetime import datetime
from typing import Any

from cognite.neat._data_model._fix import FixAction
from cognite.neat._data_model.deployer.data_classes import DeploymentResult
from cognite.neat._issues import ConsistencyError, IssueList, ModelSyntaxError
from cognite.neat._state_machine import State
Expand All @@ -22,6 +23,7 @@ class Change:
target_entity: str | None = field(default="FailedEntity")
issues: IssueList | None = field(default=None)
errors: IssueList | None = field(default=None)
fixes: list[FixAction] | None = field(default=None)
# for time being setting to Any, can be refined later
result: DeploymentResult | None = field(default=None)
description: str | None = field(default=None)
Expand Down
38 changes: 37 additions & 1 deletion cognite/neat/_store/_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from cognite.neat._client.client import NeatClient
from cognite.neat._client.data_classes import SpaceStatisticsResponse
from cognite.neat._config import NeatConfig
from cognite.neat._data_model._fix import FixAction
from cognite.neat._data_model._shared import OnSuccess, OnSuccessIssuesChecker, OnSuccessResultProducer
from cognite.neat._data_model._snapshot import SchemaSnapshot
from cognite.neat._data_model.deployer.data_classes import DeploymentResult
Expand All @@ -16,6 +17,7 @@
from cognite.neat._data_model.importers import DMSImporter, DMSTableImporter
from cognite.neat._data_model.models.dms import RequestSchema as PhysicalDataModel
from cognite.neat._data_model.models.dms._limits import SchemaLimits
from cognite.neat._data_model.transformers import FixApplicator, Transformer
from cognite.neat._exceptions import DataModelCreateException, DataModelImportException
from cognite.neat._issues import IssueList
from cognite.neat._state_machine._states import EmptyState, PhysicalState, State
Expand Down Expand Up @@ -59,7 +61,7 @@ def cdf_snapshot(self) -> SchemaSnapshot:
self._cdf_snapshot = SchemaSnapshot.fetch_entire_cdf(self._client)
return self._cdf_snapshot

def read_physical(self, reader: DMSImporter, on_success: OnSuccess | None = None) -> None:
def read_physical(self, reader: DMSImporter, on_success: OnSuccess | None = None, fix: bool = False) -> None:
"""Read object from the store"""
self._can_agent_do_activity(reader)

Expand All @@ -73,6 +75,37 @@ def read_physical(self, reader: DMSImporter, on_success: OnSuccess | None = None

self.provenance.append(change)

if (
data_model
and fix
and self._config.alpha.fix_validation_issues
and isinstance(on_success, OnSuccessIssuesChecker)
and on_success.pending_fixes
):
self.transform_physical(FixApplicator(on_success.pending_fixes), on_success.copy())

def transform_physical(self, transformer: Transformer, on_success: OnSuccess | None = None) -> None:
"""
Transform the current physical data model.

Args:
transformer: The `Transformer` object to apply to the current physical data model to transform it.
on_success: The `OnSuccess` handler to run after the transformation has been applied.
"""
change, transformed_model = self._do_activity(
transformer.transform, on_success, data_model=self.physical_data_model[-1]
)

if transformed_model:
change.target_entity = self.physical_data_model.generate_reference(
cast(PhysicalDataModel, transformed_model)
)
self.physical_data_model.append(transformed_model)
self.state = self.state.transition(transformer)
change.target_state = self.state

self.provenance.append(change)

def write_physical(self, writer: DMSExporter, on_success: OnSuccess | None = None, **kwargs: Any) -> None:
"""Write object into the store"""
self._can_agent_do_activity(writer)
Expand Down Expand Up @@ -180,6 +213,7 @@ def _do_activity(
created_data_model: PhysicalDataModel | None = None
issues = IssueList()
errors = IssueList()
fixes: list[FixAction] = []
deployment_result: DeploymentResult | None = None

try:
Expand All @@ -188,6 +222,7 @@ def _do_activity(
on_success.run(created_data_model)
if isinstance(on_success, OnSuccessIssuesChecker):
issues.extend(on_success.issues)
fixes.extend(on_success.pending_fixes)
elif isinstance(on_success, OnSuccessResultProducer):
deployment_result = on_success.result
else:
Expand All @@ -213,6 +248,7 @@ def _do_activity(
agent=type(activity.__self__).__name__ if hasattr(activity, "__self__") else "UnknownAgent",
issues=issues,
errors=errors,
fixes=fixes,
result=deployment_result,
activity=Change.standardize_activity_name(activity.__name__, start, end),
), created_data_model
Expand Down
Loading