From 168fd2b56a5d886dcd1351be4f3eb64db2e41752 Mon Sep 17 00:00:00 2001 From: Rahul Date: Sun, 21 Jun 2026 22:15:24 +0530 Subject: [PATCH 1/2] feat: 3-layer TaxDiagnosticResult model + migrate TDS/ITC/GST-RCM guards (close #39) --- qwed_tax/__pycache__/models.cpython-311.pyc | Bin 5969 -> 5969 bytes qwed_tax/audit.py | 20 + qwed_tax/diagnostics.py | 342 ++++++++++++++++++ qwed_tax/guards/indirect_tax_guard.py | 30 ++ qwed_tax/guards/tds_guard.py | 38 +- .../jurisdictions/india/guards/gst_guard.py | 42 +++ tests/test_diagnostics.py | 311 ++++++++++++++++ 7 files changed, 782 insertions(+), 1 deletion(-) create mode 100644 qwed_tax/diagnostics.py create mode 100644 tests/test_diagnostics.py diff --git a/qwed_tax/__pycache__/models.cpython-311.pyc b/qwed_tax/__pycache__/models.cpython-311.pyc index 2c730ce9f8101d0ce63acd8068b838c308150435..0034c5547b38f1369f4d0924c61c33cfb0117c05 100644 GIT binary patch delta 20 acmcbpcTtafIWI340}!~0SZw5W69)h| str: + """Compute a deterministic proof reference hash from an audit trace. + + This binds a VERIFIED verdict to the specific audit_trace that justified it. + If the trace changes (different rule, different inputs, different outcome), + the hash changes — making verdict/trace drift structurally detectable. + + Args: + trace: The dict returned by build_trace(). + + Returns: + sha256-prefixed hex digest string, e.g. "sha256:abcdef...". + """ + payload = json.dumps(trace, sort_keys=True) + digest = hashlib.sha256(payload.encode("utf-8")).hexdigest() + return f"sha256:{digest}" diff --git a/qwed_tax/diagnostics.py b/qwed_tax/diagnostics.py new file mode 100644 index 0000000..f21b5da --- /dev/null +++ b/qwed_tax/diagnostics.py @@ -0,0 +1,342 @@ +""" +QWED-Tax Structured Verification Diagnostics. + +Implements the 3-layer TaxDiagnosticResult model (Issue #39): + + Layer 1 — Agent-Safe Diagnostics + agent_message: str + Agent/model-facing summary. No statute sections, no rule IDs, + no detection logic leaked. Allows agents to correct failures + without exposing verification internals. + + Layer 2 — Developer Diagnostics + developer_fields: dict + Structured developer evidence with tax-specific fields: + constraint_id, statute, jurisdiction, expected/actual, + advisory_checks, deduction, allowable_credit, safe_harbour_range, + residency, net_payable, audit_trace. + + Layer 3 — Proof Diagnostics + proof_ref: Optional[str] + sha256 hash of retained proof artifact (audit_trace output). + Present only when status == VERIFIED and proof was established. + None for UNVERIFIABLE / BLOCKED — this is the authority bit. + +Constraints (non-negotiable, per #39): +- Diagnostics are NOT explainability — no confidence scores, no chain-of-thought. +- All diagnostic fields must originate from verification results, constraints, + rule evaluation, or proof systems. +- Agent-safe diagnostics must never expose detection logic, rule IDs, statute + sections, or security bypass guidance. +- VERIFIED requires proof_ref is not None — structurally enforced. +- Non-VERIFIED rejects proof_ref — structurally enforced. +- Frozen dataclass — prevents post-construction mutation. +- Advisory checks (advisory_only=True) never set status or proof_ref. + +This module does NOT depend on qwed-verification — QWED-Tax is a separate +package. The model follows the same 3-layer pattern but uses tax-specific +developer_fields and leverages the existing audit.py RuleRef + build_trace() +foundation. +""" + +from __future__ import annotations + +import hashlib +import json +from dataclasses import dataclass, field +from enum import Enum +from typing import Any, Dict, List, Optional + + +class TaxDiagnosticStatus(str, Enum): + """Tax verification diagnostic status. + + Three states only — no HEURISTIC, AMBIGUOUS, or CORRECTION_NEEDED. + Richer distinctions live in developer_fields.constraint_id, not status. + + VERIFIED: + The tax decision was deterministically proven. proof_ref MUST be present. + Downstream gates MAY admit for control flow. + + UNVERIFIABLE: + The tax decision could not be proven. proof_ref MUST be None. + Reasons: insufficient evidence, ambiguous input, no claim to compare, + computation-only mode, unknown rule. + Downstream gates MUST NOT admit for control flow. + + BLOCKED: + Verification could not even be attempted. proof_ref MUST be None. + Reasons: missing declarations, parse error, schema validation failure, + unsupported service/entity type, invalid input format. + Downstream gates MUST NOT admit for control flow. + """ + VERIFIED = "VERIFIED" + UNVERIFIABLE = "UNVERIFIABLE" + BLOCKED = "BLOCKED" + + +@dataclass(frozen=True) +class TaxAdvisoryCheck: + """A non-proof-bearing analysis result attached as advisory metadata. + + Advisory checks may carry useful information for developers or auditors, + but they MUST NOT influence the verification verdict. The constraint: + + advisory_only = True + + is structurally enforced: advisory checks populate + developer_fields.advisory_checks, never status or proof_ref. + """ + name: str + advisory_only: bool = True + constraint_id: Optional[str] = None + details: Dict[str, Any] = field(default_factory=dict) + + def __post_init__(self) -> None: + if self.advisory_only is not True: + raise ValueError( + "TaxAdvisoryCheck.advisory_only must be True — " + "advisory checks must never influence the verification verdict." + ) + + def to_dict(self) -> Dict[str, Any]: + return { + "name": self.name, + "advisory_only": self.advisory_only, + "constraint_id": self.constraint_id, + "details": self.details, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "TaxAdvisoryCheck": + raw_advisory_only = data.get("advisory_only", True) + if isinstance(raw_advisory_only, bool): + advisory_only = raw_advisory_only + elif isinstance(raw_advisory_only, int) and raw_advisory_only in (0, 1): + advisory_only = bool(raw_advisory_only) + else: + raise ValueError( + "TaxAdvisoryCheck.advisory_only must be a bool or integer 0/1" + ) + + return cls( + name=data.get("name", ""), + advisory_only=advisory_only, + constraint_id=data.get("constraint_id"), + details=data.get("details", {}), + ) + + +def compute_proof_ref(evidence: Dict[str, Any]) -> str: + """Compute a deterministic proof reference hash from retained evidence. + + The proof_ref binds the verdict (status=VERIFIED) to the specific evidence + that justified it. If the evidence changes, the hash changes — making + verdict/evidence drift structurally detectable. + + For audit_trace-based guards: pass the build_trace() output as evidence. + For Decimal guards: pass the computed + claimed values + comparison result. + For Z3 guards: pass the assertion stack + solver result. + + Args: + evidence: The proof artifact dict (must be JSON-serializable). + + Returns: + sha256-prefixed hex digest string, e.g. "sha256:abcdef...". + + Raises: + ValueError: If evidence is not JSON-serializable (fail-closed). + """ + try: + payload = json.dumps(evidence, sort_keys=True) + except (TypeError, ValueError) as exc: + raise ValueError( + f"Proof evidence must be JSON-serializable for proof_ref hashing: {exc}" + ) from exc + digest = hashlib.sha256(payload.encode("utf-8")).hexdigest() + return f"sha256:{digest}" + + +@dataclass(frozen=True) +class TaxDiagnosticResult: + """Unified 3-layer tax verification diagnostic result (Issue #39). + + Replaces the ad-hoc Dict[str, Any] returns and the multiple incompatible + result models (TaxResult, VerificationResult) across QWED-Tax guards. + + Three layers: + 1. agent_message — Layer 1 (agent-safe, no internals) + 2. developer_fields — Layer 2 (structured developer evidence) + 3. proof_ref — Layer 3 (cryptographic proof artifact hash) + + Authority contract: + proof_ref is not None → authoritative, admissible for control flow + proof_ref is None → non-authoritative, NOT admissible for control flow + + Constraints enforced in __post_init__: + - status == VERIFIED requires proof_ref is not None + - status == UNVERIFIABLE or BLOCKED requires proof_ref is None + - agent_message must be non-empty + """ + + status: TaxDiagnosticStatus + agent_message: str + developer_fields: Dict[str, Any] = field(default_factory=dict) + proof_ref: Optional[str] = None + + def __post_init__(self) -> None: + if not self.agent_message or not self.agent_message.strip(): + raise ValueError( + "agent_message must be non-empty — Layer 1 diagnostics are mandatory" + ) + + if self.status is TaxDiagnosticStatus.VERIFIED and not self.proof_ref: + raise ValueError( + "VERIFIED status requires proof_ref is not None and non-empty — " + "a tax claim cannot be marked proven without a proof artifact hash. " + "Use UNVERIFIABLE if no proof was established." + ) + + if self.status is not TaxDiagnosticStatus.VERIFIED and self.proof_ref is not None: + raise ValueError( + f"{self.status.value} status requires proof_ref is None — " + "non-VERIFIED states are non-authoritative by construction." + ) + + @property + def is_verified(self) -> bool: + """True only when status is VERIFIED (which implies proof_ref is not None).""" + return self.status is TaxDiagnosticStatus.VERIFIED + + @property + def is_authoritative(self) -> bool: + """Authority bit — True when proof_ref is present (admissible for control flow).""" + return self.proof_ref is not None + + @property + def is_fail_closed(self) -> bool: + """True when status is UNVERIFIABLE or BLOCKED (non-pass, fail-closed).""" + return self.status in (TaxDiagnosticStatus.UNVERIFIABLE, TaxDiagnosticStatus.BLOCKED) + + @property + def constraint_id(self) -> Optional[str]: + """The primary constraint identifier from developer_fields, if present.""" + return self.developer_fields.get("constraint_id") + + @property + def audit_trace(self) -> Optional[Dict[str, Any]]: + """The audit_trace from developer_fields, if present.""" + return self.developer_fields.get("audit_trace") + + @property + def advisory_checks(self) -> List[TaxAdvisoryCheck]: + """Advisory checks from developer_fields, deserialized to TaxAdvisoryCheck.""" + raw = self.developer_fields.get("advisory_checks", []) + if not isinstance(raw, list): + return [] + result = [] + for item in raw: + if isinstance(item, dict): + try: + result.append(TaxAdvisoryCheck.from_dict(item)) + except ValueError: + continue + elif isinstance(item, TaxAdvisoryCheck): + result.append(item) + return result + + def to_dict(self) -> Dict[str, Any]: + """Serialize to dict for API/SDK responses.""" + fields = dict(self.developer_fields) + checks = fields.get("advisory_checks") + if isinstance(checks, list): + fields["advisory_checks"] = [ + item.to_dict() if isinstance(item, TaxAdvisoryCheck) else item + for item in checks + ] + return { + "status": self.status.value, + "agent_message": self.agent_message, + "developer_fields": fields, + "proof_ref": self.proof_ref, + "is_authoritative": self.is_authoritative, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "TaxDiagnosticResult": + """Deserialize from dict.""" + status = data.get("status", "UNVERIFIABLE") + if isinstance(status, str): + try: + status = TaxDiagnosticStatus(status) + except ValueError: + valid = ", ".join(s.value for s in TaxDiagnosticStatus) + raise ValueError( + f"from_dict: invalid status {status!r} — " + f"must be one of: {valid}." + ) from None + + agent_message = data.get("agent_message") + if not agent_message or not str(agent_message).strip(): + raise ValueError( + "from_dict: 'agent_message' is missing or empty — " + "Layer 1 diagnostics are mandatory." + ) + + return cls( + status=status, + agent_message=agent_message, + developer_fields=data.get("developer_fields", {}), + proof_ref=data.get("proof_ref"), + ) + + @classmethod + def verified( + cls, + agent_message: str, + developer_fields: Dict[str, Any], + evidence: Dict[str, Any], + ) -> "TaxDiagnosticResult": + """Construct a VERIFIED result with proof_ref computed from evidence.""" + return cls( + status=TaxDiagnosticStatus.VERIFIED, + agent_message=agent_message, + developer_fields=developer_fields, + proof_ref=compute_proof_ref(evidence), + ) + + @classmethod + def unverifiable( + cls, + agent_message: str, + developer_fields: Optional[Dict[str, Any]] = None, + ) -> "TaxDiagnosticResult": + """Construct an UNVERIFIABLE result (non-pass, non-authoritative).""" + return cls( + status=TaxDiagnosticStatus.UNVERIFIABLE, + agent_message=agent_message, + developer_fields=developer_fields or {}, + proof_ref=None, + ) + + @classmethod + def blocked( + cls, + agent_message: str, + developer_fields: Optional[Dict[str, Any]] = None, + ) -> "TaxDiagnosticResult": + """Construct a BLOCKED result (verification could not be attempted).""" + return cls( + status=TaxDiagnosticStatus.BLOCKED, + agent_message=agent_message, + developer_fields=developer_fields or {}, + proof_ref=None, + ) + + +__all__ = [ + "TaxDiagnosticStatus", + "TaxDiagnosticResult", + "TaxAdvisoryCheck", + "compute_proof_ref", +] diff --git a/qwed_tax/guards/indirect_tax_guard.py b/qwed_tax/guards/indirect_tax_guard.py index 8322fd5..69a0b33 100644 --- a/qwed_tax/guards/indirect_tax_guard.py +++ b/qwed_tax/guards/indirect_tax_guard.py @@ -9,6 +9,7 @@ ITC_PERSONAL_CONSUMPTION, build_trace, ) +from qwed_tax.diagnostics import TaxDiagnosticResult from qwed_tax.numeric import decimal_text, parse_decimal_input @@ -152,3 +153,32 @@ def verify_gstin_format(self, gstin: str) -> Dict[str, Any]: return {"verified": False, "error": "Invalid GSTIN checksum."} return {"verified": True} + + @staticmethod + def to_diagnostic(result: Dict[str, Any]) -> TaxDiagnosticResult: + """Convert a legacy verify_itc_eligibility() dict to TaxDiagnosticResult.""" + verified = result.get("verified", False) + audit_trace = result.get("audit_trace") + + if not verified: + return TaxDiagnosticResult.blocked( + agent_message="Input tax credit eligibility could not be verified.", + developer_fields={ + "constraint_id": audit_trace["rule_id"] if audit_trace else "ITC_UNKNOWN", + "audit_trace": audit_trace, + "reason": result.get("reason"), + "eligible_itc": result.get("eligible_itc"), + }, + ) + + return TaxDiagnosticResult.verified( + agent_message="Input tax credit eligibility verified.", + developer_fields={ + "constraint_id": audit_trace["rule_id"] if audit_trace else "ITC_VERIFIED", + "statute": audit_trace["statute"] if audit_trace else None, + "jurisdiction": audit_trace["jurisdiction"] if audit_trace else None, + "audit_trace": audit_trace, + "eligible_itc": result.get("eligible_itc"), + }, + evidence=audit_trace or {}, + ) diff --git a/qwed_tax/guards/tds_guard.py b/qwed_tax/guards/tds_guard.py index ed73bc0..9d4fa63 100644 --- a/qwed_tax/guards/tds_guard.py +++ b/qwed_tax/guards/tds_guard.py @@ -1,7 +1,8 @@ from decimal import Decimal from typing import Any, Dict -from qwed_tax.audit import TDS_194C, TDS_194H, TDS_194I, TDS_194J, build_trace +from qwed_tax.audit import TDS_194C, TDS_194H, TDS_194I, TDS_194J, build_trace, trace_proof_ref +from qwed_tax.diagnostics import TaxDiagnosticResult, TaxDiagnosticStatus from qwed_tax.numeric import decimal_text, parse_decimal_input class TDSGuard: @@ -80,3 +81,38 @@ def calculate_deduction(self, service_type: str, invoice_amount: Any, ytd_paymen }, ), } + + @staticmethod + def to_diagnostic(result: Dict[str, Any]) -> TaxDiagnosticResult: + """Convert a legacy calculate_deduction() dict to TaxDiagnosticResult. + + Backward-compatible migration helper. Guards that already produce + audit_trace can be converted with zero logic change. + """ + verified = result.get("verified", False) + audit_trace = result.get("audit_trace") + + if not verified: + return TaxDiagnosticResult.blocked( + agent_message="Tax deduction verification could not be completed.", + developer_fields={ + "constraint_id": audit_trace["rule_id"] if audit_trace else "TDS_UNKNOWN", + "audit_trace": audit_trace, + "error": result.get("error"), + "deduction": result.get("deduction"), + "net_payable": result.get("net_payable"), + }, + ) + + return TaxDiagnosticResult.verified( + agent_message="Tax deduction verified.", + developer_fields={ + "constraint_id": audit_trace["rule_id"] if audit_trace else "TDS_VERIFIED", + "statute": audit_trace["statute"] if audit_trace else None, + "jurisdiction": audit_trace["jurisdiction"] if audit_trace else None, + "audit_trace": audit_trace, + "deduction": result.get("deduction"), + "net_payable": result.get("net_payable"), + }, + evidence=audit_trace or {}, + ) diff --git a/qwed_tax/jurisdictions/india/guards/gst_guard.py b/qwed_tax/jurisdictions/india/guards/gst_guard.py index 8059d06..7a03f64 100644 --- a/qwed_tax/jurisdictions/india/guards/gst_guard.py +++ b/qwed_tax/jurisdictions/india/guards/gst_guard.py @@ -14,6 +14,7 @@ RuleRef, build_trace, ) +from qwed_tax.diagnostics import TaxDiagnosticResult from qwed_tax.numeric import decimal_text, parse_decimal_input class EntityType(str, Enum): @@ -228,6 +229,47 @@ def _build_calculation_response( ), } + @staticmethod + def to_diagnostic(result: Dict[str, Any]) -> TaxDiagnosticResult: + """Convert a legacy verify_rcm_applicability() dict to TaxDiagnosticResult.""" + verified = result.get("verified", False) + audit_trace = result.get("audit_trace") + computed_only = result.get("computed_only", False) + + if not verified: + if computed_only: + return TaxDiagnosticResult.unverifiable( + agent_message="RCM liability was computed but not verified against a claim.", + developer_fields={ + "constraint_id": audit_trace["rule_id"] if audit_trace else "RCM_COMPUTED_ONLY", + "audit_trace": audit_trace, + "is_rcm": result.get("is_rcm"), + "liability": result.get("liability"), + }, + ) + return TaxDiagnosticResult.blocked( + agent_message="RCM applicability could not be verified.", + developer_fields={ + "constraint_id": audit_trace["rule_id"] if audit_trace else "RCM_BLOCKED", + "audit_trace": audit_trace, + "error": result.get("error"), + "is_rcm": result.get("is_rcm"), + }, + ) + + return TaxDiagnosticResult.verified( + agent_message="RCM applicability verified.", + developer_fields={ + "constraint_id": audit_trace["rule_id"] if audit_trace else "RCM_VERIFIED", + "statute": audit_trace["statute"] if audit_trace else None, + "jurisdiction": audit_trace["jurisdiction"] if audit_trace else None, + "audit_trace": audit_trace, + "is_rcm": result.get("is_rcm"), + "liability": result.get("liability"), + }, + evidence=audit_trace or {}, + ) + @staticmethod def _try_coerce(enum_cls, value): """Return an enum member or None if the value doesn't match any member.""" diff --git a/tests/test_diagnostics.py b/tests/test_diagnostics.py new file mode 100644 index 0000000..6db3cd2 --- /dev/null +++ b/tests/test_diagnostics.py @@ -0,0 +1,311 @@ +"""Tests for TaxDiagnosticResult 3-layer model (#39).""" + +import pytest + +from qwed_tax.audit import TDS_194J, build_trace, trace_proof_ref +from qwed_tax.diagnostics import ( + TaxAdvisoryCheck, + TaxDiagnosticResult, + TaxDiagnosticStatus, + compute_proof_ref, +) +from qwed_tax.guards.indirect_tax_guard import InputCreditGuard +from qwed_tax.guards.tds_guard import TDSGuard +from qwed_tax.jurisdictions.india.guards.gst_guard import EntityType, GSTGuard, ServiceType + + +# --------------------------------------------------------------------------- +# TaxDiagnosticResult model tests +# --------------------------------------------------------------------------- + +class TestTaxDiagnosticResultModel: + """Core model invariants — 3 layers, frozen, status/proof_ref contract.""" + + def test_verified_requires_proof_ref(self): + with pytest.raises(ValueError, match="VERIFIED status requires proof_ref"): + TaxDiagnosticResult( + status=TaxDiagnosticStatus.VERIFIED, + agent_message="ok", + developer_fields={}, + proof_ref=None, + ) + + def test_non_verified_rejects_proof_ref(self): + with pytest.raises(ValueError, match="UNVERIFIABLE status requires proof_ref is None"): + TaxDiagnosticResult( + status=TaxDiagnosticStatus.UNVERIFIABLE, + agent_message="cannot verify", + developer_fields={}, + proof_ref="sha256:abc", + ) + + def test_blocked_rejects_proof_ref(self): + with pytest.raises(ValueError, match="BLOCKED status requires proof_ref is None"): + TaxDiagnosticResult( + status=TaxDiagnosticStatus.BLOCKED, + agent_message="blocked", + developer_fields={}, + proof_ref="sha256:abc", + ) + + def test_empty_agent_message_rejected(self): + with pytest.raises(ValueError, match="agent_message must be non-empty"): + TaxDiagnosticResult( + status=TaxDiagnosticStatus.BLOCKED, + agent_message="", + ) + + def test_frozen_dataclass(self): + result = TaxDiagnosticResult.blocked("blocked") + with pytest.raises(Exception): + result.agent_message = "mutated" + + def test_verified_factory_produces_proof_ref(self): + result = TaxDiagnosticResult.verified( + agent_message="verified", + developer_fields={"constraint_id": "TEST_RULE"}, + evidence={"rule_id": "TEST_RULE", "outcome": "PASS"}, + ) + assert result.status is TaxDiagnosticStatus.VERIFIED + assert result.proof_ref is not None + assert result.proof_ref.startswith("sha256:") + + def test_unverifiable_factory(self): + result = TaxDiagnosticResult.unverifiable( + "cannot verify", + {"constraint_id": "TEST"}, + ) + assert result.status is TaxDiagnosticStatus.UNVERIFIABLE + assert result.proof_ref is None + assert result.is_fail_closed is True + + def test_blocked_factory(self): + result = TaxDiagnosticResult.blocked( + "blocked", + {"constraint_id": "TEST"}, + ) + assert result.status is TaxDiagnosticStatus.BLOCKED + assert result.proof_ref is None + assert result.is_fail_closed is True + + def test_is_verified_property(self): + verified = TaxDiagnosticResult.verified("ok", {}, {"x": 1}) + assert verified.is_verified is True + assert verified.is_authoritative is True + + unverified = TaxDiagnosticResult.unverifiable("no") + assert unverified.is_verified is False + assert unverified.is_authoritative is False + + def test_to_dict_and_from_dict_roundtrip(self): + result = TaxDiagnosticResult.verified( + agent_message="verified", + developer_fields={"constraint_id": "TDS_194J", "deduction": "3000"}, + evidence={"rule_id": "TDS_194J"}, + ) + d = result.to_dict() + assert d["status"] == "VERIFIED" + assert d["proof_ref"].startswith("sha256:") + assert d["is_authoritative"] is True + + restored = TaxDiagnosticResult.from_dict(d) + assert restored.status is TaxDiagnosticStatus.VERIFIED + assert restored.agent_message == "verified" + assert restored.proof_ref == result.proof_ref + + def test_from_dict_rejects_invalid_status(self): + with pytest.raises(ValueError, match="invalid status"): + TaxDiagnosticResult.from_dict({ + "status": "HEURISTIC", + "agent_message": "test", + }) + + def test_from_dict_rejects_empty_agent_message(self): + with pytest.raises(ValueError, match="agent_message"): + TaxDiagnosticResult.from_dict({ + "status": "BLOCKED", + "agent_message": "", + }) + + def test_constraint_id_property(self): + result = TaxDiagnosticResult.blocked("blocked", {"constraint_id": "ITC_BLOCKED_17_5"}) + assert result.constraint_id == "ITC_BLOCKED_17_5" + + def test_audit_trace_property(self): + trace = build_trace(TDS_194J, "DEDUCTION_REQUIRED", {"amount": "50000"}) + result = TaxDiagnosticResult.verified( + "verified", + {"constraint_id": "TDS_194J", "audit_trace": trace}, + trace, + ) + assert result.audit_trace == trace + + +# --------------------------------------------------------------------------- +# TaxAdvisoryCheck tests +# --------------------------------------------------------------------------- + +class TestTaxAdvisoryCheck: + def test_advisory_only_must_be_true(self): + with pytest.raises(ValueError, match="advisory_only must be True"): + TaxAdvisoryCheck(name="heuristic", advisory_only=False) + + def test_to_dict_and_from_dict_roundtrip(self): + check = TaxAdvisoryCheck(name="heuristic_check", constraint_id="ADVISORY_1") + d = check.to_dict() + assert d["advisory_only"] is True + restored = TaxAdvisoryCheck.from_dict(d) + assert restored.name == "heuristic_check" + assert restored.advisory_only is True + + def test_advisory_checks_in_result(self): + check = TaxAdvisoryCheck(name="heuristic", constraint_id="ADVISORY_1") + result = TaxDiagnosticResult.unverifiable( + "cannot verify", + {"advisory_checks": [check]}, + ) + checks = result.advisory_checks + assert len(checks) == 1 + assert checks[0].name == "heuristic" + + +# --------------------------------------------------------------------------- +# compute_proof_ref tests +# --------------------------------------------------------------------------- + +class TestComputeProofRef: + def test_deterministic_hash(self): + evidence = {"rule_id": "TDS_194J", "outcome": "PASS"} + ref1 = compute_proof_ref(evidence) + ref2 = compute_proof_ref(evidence) + assert ref1 == ref2 + assert ref1.startswith("sha256:") + + def test_different_evidence_different_hash(self): + ref1 = compute_proof_ref({"rule_id": "TDS_194J"}) + ref2 = compute_proof_ref({"rule_id": "TDS_194C"}) + assert ref1 != ref2 + + def test_non_serializable_raises(self): + with pytest.raises(ValueError, match="JSON-serializable"): + compute_proof_ref({"non_serializable": object()}) + + +# --------------------------------------------------------------------------- +# trace_proof_ref tests +# --------------------------------------------------------------------------- + +class TestTraceProofRef: + def test_trace_proof_ref_matches_compute_proof_ref(self): + trace = build_trace(TDS_194J, "DEDUCTION_REQUIRED", {"amount": "50000"}) + ref1 = trace_proof_ref(trace) + ref2 = compute_proof_ref(trace) + assert ref1 == ref2 + + def test_different_traces_different_hash(self): + trace1 = build_trace(TDS_194J, "DEDUCTION_REQUIRED", {"amount": "50000"}) + trace2 = build_trace(TDS_194J, "BELOW_THRESHOLD", {"amount": "50000"}) + assert trace_proof_ref(trace1) != trace_proof_ref(trace2) + + +# --------------------------------------------------------------------------- +# Guard migration tests — TDS, ITC, GST-RCM +# --------------------------------------------------------------------------- + +class TestTDSGuardMigration: + """TDSGuard.to_diagnostic() converts legacy dict to TaxDiagnosticResult.""" + + def setup_method(self): + self.guard = TDSGuard() + + def test_deduction_required_to_diagnostic(self): + result = self.guard.calculate_deduction("PROFESSIONAL_FEES", "50000", "0") + diag = TDSGuard.to_diagnostic(result) + assert diag.status is TaxDiagnosticStatus.VERIFIED + assert diag.proof_ref is not None + assert diag.constraint_id == "TDS_194J" + assert diag.developer_fields["deduction"] is not None + + def test_below_threshold_to_diagnostic(self): + result = self.guard.calculate_deduction("PROFESSIONAL_FEES", "1000", "0") + diag = TDSGuard.to_diagnostic(result) + assert diag.status is TaxDiagnosticStatus.VERIFIED + assert diag.proof_ref is not None + assert diag.constraint_id == "TDS_194J" + + def test_unknown_service_to_diagnostic(self): + result = self.guard.calculate_deduction("UNKNOWN_TYPE", "50000", "0") + diag = TDSGuard.to_diagnostic(result) + assert diag.status is TaxDiagnosticStatus.BLOCKED + assert diag.proof_ref is None + assert diag.constraint_id == "TDS_UNKNOWN" + + +class TestITCGuardMigration: + """InputCreditGuard.to_diagnostic() converts legacy dict to TaxDiagnosticResult.""" + + def setup_method(self): + self.guard = InputCreditGuard() + + def test_eligible_to_diagnostic(self): + result = self.guard.verify_itc_eligibility("OFFICE_SUPPLIES", "1000", "180") + diag = InputCreditGuard.to_diagnostic(result) + assert diag.status is TaxDiagnosticStatus.VERIFIED + assert diag.proof_ref is not None + assert diag.constraint_id == "ITC_ELIGIBLE" + + def test_blocked_to_diagnostic(self): + result = self.guard.verify_itc_eligibility("FOOD_AND_BEVERAGE", "1000", "180") + diag = InputCreditGuard.to_diagnostic(result) + assert diag.status is TaxDiagnosticStatus.BLOCKED + assert diag.proof_ref is None + assert diag.constraint_id == "ITC_BLOCKED_17_5" + + def test_gift_threshold_to_diagnostic(self): + result = self.guard.verify_itc_eligibility("GIFT_TO_EMPLOYEE", "40000", "7200") + diag = InputCreditGuard.to_diagnostic(result) + assert diag.status is TaxDiagnosticStatus.VERIFIED + assert diag.constraint_id == "ITC_GIFT_THRESHOLD" + + +class TestGSTRCMGuardMigration: + """GSTGuard.to_diagnostic() converts legacy dict to TaxDiagnosticResult.""" + + def setup_method(self): + self.guard = GSTGuard() + + def test_verification_mode_match_to_diagnostic(self): + result = self.guard.verify_rcm_applicability( + ServiceType.GTA, EntityType.INDIVIDUAL, EntityType.BODY_CORPORATE, + claimed_is_rcm=True, + ) + diag = GSTGuard.to_diagnostic(result) + assert diag.status is TaxDiagnosticStatus.VERIFIED + assert diag.proof_ref is not None + assert diag.constraint_id == "RCM_GTA" + + def test_verification_mode_mismatch_to_diagnostic(self): + result = self.guard.verify_rcm_applicability( + ServiceType.GTA, EntityType.INDIVIDUAL, EntityType.BODY_CORPORATE, + claimed_is_rcm=False, + ) + diag = GSTGuard.to_diagnostic(result) + assert diag.status is TaxDiagnosticStatus.BLOCKED + assert diag.proof_ref is None + + def test_computation_mode_to_diagnostic(self): + result = self.guard.verify_rcm_applicability( + ServiceType.GTA, EntityType.INDIVIDUAL, EntityType.BODY_CORPORATE, + ) + diag = GSTGuard.to_diagnostic(result) + assert diag.status is TaxDiagnosticStatus.UNVERIFIABLE + assert diag.proof_ref is None + assert diag.developer_fields.get("constraint_id") == "RCM_GTA" + + def test_unknown_service_to_diagnostic(self): + result = self.guard.verify_rcm_applicability( + "MYSTERY", "INDIVIDUAL", "BODY_CORPORATE", + ) + diag = GSTGuard.to_diagnostic(result) + assert diag.status is TaxDiagnosticStatus.BLOCKED + assert diag.proof_ref is None From a5227cc26ac8c3f7927f90b307fd51bbcfd2bda0 Mon Sep 17 00:00:00 2001 From: Rahul Date: Sun, 21 Jun 2026 23:54:20 +0530 Subject: [PATCH 2/2] fix: address CodeQL, Sentry, Greptile P1, and CodeRabbit review comments --- qwed_tax/audit.py | 7 ++++- qwed_tax/diagnostics.py | 26 ++++++++++++++++--- qwed_tax/guards/indirect_tax_guard.py | 14 +++++++--- qwed_tax/guards/tds_guard.py | 18 ++++++++----- .../jurisdictions/india/guards/gst_guard.py | 14 +++++++--- tests/test_diagnostics.py | 6 +++-- 6 files changed, 64 insertions(+), 21 deletions(-) diff --git a/qwed_tax/audit.py b/qwed_tax/audit.py index ae4e00f..1e28dae 100644 --- a/qwed_tax/audit.py +++ b/qwed_tax/audit.py @@ -110,6 +110,11 @@ def trace_proof_ref(trace: Dict[str, Any]) -> str: Returns: sha256-prefixed hex digest string, e.g. "sha256:abcdef...". """ - payload = json.dumps(trace, sort_keys=True) + try: + payload = json.dumps(trace, sort_keys=True) + except (TypeError, ValueError) as exc: + raise ValueError( + f"Audit trace must be JSON-serializable for proof_ref hashing: {exc}" + ) from exc digest = hashlib.sha256(payload.encode("utf-8")).hexdigest() return f"sha256:{digest}" diff --git a/qwed_tax/diagnostics.py b/qwed_tax/diagnostics.py index f21b5da..3c912fa 100644 --- a/qwed_tax/diagnostics.py +++ b/qwed_tax/diagnostics.py @@ -185,11 +185,19 @@ class TaxDiagnosticResult: proof_ref: Optional[str] = None def __post_init__(self) -> None: - if not self.agent_message or not self.agent_message.strip(): + if not isinstance(self.status, TaxDiagnosticStatus): + valid = ", ".join(s.value for s in TaxDiagnosticStatus) + raise ValueError(f"status must be a TaxDiagnosticStatus ({valid})") + + if not isinstance(self.agent_message, str) or not self.agent_message.strip(): raise ValueError( - "agent_message must be non-empty — Layer 1 diagnostics are mandatory" + "agent_message must be a non-empty string — " + "Layer 1 diagnostics are mandatory" ) + if not isinstance(self.developer_fields, dict): + raise ValueError("developer_fields must be a dict") + if self.status is TaxDiagnosticStatus.VERIFIED and not self.proof_ref: raise ValueError( "VERIFIED status requires proof_ref is not None and non-empty — " @@ -275,18 +283,28 @@ def from_dict(cls, data: Dict[str, Any]) -> "TaxDiagnosticResult": f"from_dict: invalid status {status!r} — " f"must be one of: {valid}." ) from None + elif not isinstance(status, TaxDiagnosticStatus): + valid = ", ".join(s.value for s in TaxDiagnosticStatus) + raise ValueError( + f"from_dict: invalid status type {type(status).__name__} — " + f"must be one of: {valid}." + ) agent_message = data.get("agent_message") - if not agent_message or not str(agent_message).strip(): + if not isinstance(agent_message, str) or not agent_message.strip(): raise ValueError( "from_dict: 'agent_message' is missing or empty — " "Layer 1 diagnostics are mandatory." ) + developer_fields = data.get("developer_fields", {}) + if not isinstance(developer_fields, dict): + raise ValueError("from_dict: 'developer_fields' must be a dict.") + return cls( status=status, agent_message=agent_message, - developer_fields=data.get("developer_fields", {}), + developer_fields=developer_fields, proof_ref=data.get("proof_ref"), ) diff --git a/qwed_tax/guards/indirect_tax_guard.py b/qwed_tax/guards/indirect_tax_guard.py index 69a0b33..1b56a6e 100644 --- a/qwed_tax/guards/indirect_tax_guard.py +++ b/qwed_tax/guards/indirect_tax_guard.py @@ -171,14 +171,20 @@ def to_diagnostic(result: Dict[str, Any]) -> TaxDiagnosticResult: }, ) + if audit_trace is None: + raise ValueError( + "VERIFIED result requires audit_trace — " + "use UNVERIFIABLE if no evidence was established." + ) + return TaxDiagnosticResult.verified( agent_message="Input tax credit eligibility verified.", developer_fields={ - "constraint_id": audit_trace["rule_id"] if audit_trace else "ITC_VERIFIED", - "statute": audit_trace["statute"] if audit_trace else None, - "jurisdiction": audit_trace["jurisdiction"] if audit_trace else None, + "constraint_id": audit_trace["rule_id"], + "statute": audit_trace.get("statute"), + "jurisdiction": audit_trace.get("jurisdiction"), "audit_trace": audit_trace, "eligible_itc": result.get("eligible_itc"), }, - evidence=audit_trace or {}, + evidence=audit_trace, ) diff --git a/qwed_tax/guards/tds_guard.py b/qwed_tax/guards/tds_guard.py index 9d4fa63..95d67c5 100644 --- a/qwed_tax/guards/tds_guard.py +++ b/qwed_tax/guards/tds_guard.py @@ -1,8 +1,8 @@ from decimal import Decimal from typing import Any, Dict -from qwed_tax.audit import TDS_194C, TDS_194H, TDS_194I, TDS_194J, build_trace, trace_proof_ref -from qwed_tax.diagnostics import TaxDiagnosticResult, TaxDiagnosticStatus +from qwed_tax.audit import TDS_194C, TDS_194H, TDS_194I, TDS_194J, build_trace +from qwed_tax.diagnostics import TaxDiagnosticResult from qwed_tax.numeric import decimal_text, parse_decimal_input class TDSGuard: @@ -104,15 +104,21 @@ def to_diagnostic(result: Dict[str, Any]) -> TaxDiagnosticResult: }, ) + if audit_trace is None: + raise ValueError( + "VERIFIED result requires audit_trace — " + "use UNVERIFIABLE if no evidence was established." + ) + return TaxDiagnosticResult.verified( agent_message="Tax deduction verified.", developer_fields={ - "constraint_id": audit_trace["rule_id"] if audit_trace else "TDS_VERIFIED", - "statute": audit_trace["statute"] if audit_trace else None, - "jurisdiction": audit_trace["jurisdiction"] if audit_trace else None, + "constraint_id": audit_trace["rule_id"], + "statute": audit_trace.get("statute"), + "jurisdiction": audit_trace.get("jurisdiction"), "audit_trace": audit_trace, "deduction": result.get("deduction"), "net_payable": result.get("net_payable"), }, - evidence=audit_trace or {}, + evidence=audit_trace, ) diff --git a/qwed_tax/jurisdictions/india/guards/gst_guard.py b/qwed_tax/jurisdictions/india/guards/gst_guard.py index 7a03f64..a88c5e3 100644 --- a/qwed_tax/jurisdictions/india/guards/gst_guard.py +++ b/qwed_tax/jurisdictions/india/guards/gst_guard.py @@ -257,17 +257,23 @@ def to_diagnostic(result: Dict[str, Any]) -> TaxDiagnosticResult: }, ) + if audit_trace is None: + raise ValueError( + "VERIFIED result requires audit_trace — " + "use UNVERIFIABLE if no evidence was established." + ) + return TaxDiagnosticResult.verified( agent_message="RCM applicability verified.", developer_fields={ - "constraint_id": audit_trace["rule_id"] if audit_trace else "RCM_VERIFIED", - "statute": audit_trace["statute"] if audit_trace else None, - "jurisdiction": audit_trace["jurisdiction"] if audit_trace else None, + "constraint_id": audit_trace["rule_id"], + "statute": audit_trace.get("statute"), + "jurisdiction": audit_trace.get("jurisdiction"), "audit_trace": audit_trace, "is_rcm": result.get("is_rcm"), "liability": result.get("liability"), }, - evidence=audit_trace or {}, + evidence=audit_trace, ) @staticmethod diff --git a/tests/test_diagnostics.py b/tests/test_diagnostics.py index 6db3cd2..f4d01f6 100644 --- a/tests/test_diagnostics.py +++ b/tests/test_diagnostics.py @@ -1,5 +1,7 @@ """Tests for TaxDiagnosticResult 3-layer model (#39).""" +from dataclasses import FrozenInstanceError + import pytest from qwed_tax.audit import TDS_194J, build_trace, trace_proof_ref @@ -49,7 +51,7 @@ def test_blocked_rejects_proof_ref(self): ) def test_empty_agent_message_rejected(self): - with pytest.raises(ValueError, match="agent_message must be non-empty"): + with pytest.raises(ValueError, match="agent_message must be a non-empty string"): TaxDiagnosticResult( status=TaxDiagnosticStatus.BLOCKED, agent_message="", @@ -57,7 +59,7 @@ def test_empty_agent_message_rejected(self): def test_frozen_dataclass(self): result = TaxDiagnosticResult.blocked("blocked") - with pytest.raises(Exception): + with pytest.raises(FrozenInstanceError): result.agent_message = "mutated" def test_verified_factory_produces_proof_ref(self):