diff --git a/src/ursa/tools/cmm_ortools_solver.py b/src/ursa/tools/cmm_ortools_solver.py new file mode 100644 index 00000000..2495ba0d --- /dev/null +++ b/src/ursa/tools/cmm_ortools_solver.py @@ -0,0 +1,237 @@ +"""OR-Tools MIP solver for CMM supply-chain allocation. + +Falls back gracefully when ``ortools`` is not installed — callers should +check :func:`ortools_available` or catch the ``None`` return from +:func:`solve_with_ortools`. +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from typing import Any + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Lazy OR-Tools import +# --------------------------------------------------------------------------- +try: + from ortools.linear_solver import pywraplp # type: ignore[import-untyped] + + _HAS_ORTOOLS = True +except ImportError: + _HAS_ORTOOLS = False + +_EPS = 1e-9 + + +def ortools_available() -> bool: + """Return ``True`` when the OR-Tools package is importable.""" + return _HAS_ORTOOLS + + +# --------------------------------------------------------------------------- +# Result container +# --------------------------------------------------------------------------- +@dataclass(frozen=True) +class ORToolsResult: + """Holds the raw solution extracted from the MIP.""" + + allocation: dict[tuple[str, str], float] + unmet: dict[str, float] + solver_status: str + objective_value: float = 0.0 + supplier_max: dict[str, float] = field(default_factory=dict) + + +# --------------------------------------------------------------------------- +# MIP formulation +# --------------------------------------------------------------------------- +def solve_with_ortools( + *, + markets: list[str], + supplier_names: list[str], + supplier_capacities: dict[str, float], + supplier_unit_costs: dict[str, float], + supplier_risk_scores: dict[str, float], + demand: dict[str, float], + shipping_cost: dict[str, dict[str, float]], + risk_weight: float, + unmet_penalty: float, + max_supplier_share: float, + composition_targets: dict[str, float], + composition_tolerance: float, + composition_profiles: dict[str, dict[str, float]], +) -> ORToolsResult | None: + """Build and solve the CMM allocation MIP. + + Returns an :class:`ORToolsResult` on success, or ``None`` if OR-Tools is + unavailable or the solver encounters an unexpected error. + """ + if not _HAS_ORTOOLS: + return None + + try: + return _build_and_solve( + markets=markets, + supplier_names=supplier_names, + supplier_capacities=supplier_capacities, + supplier_unit_costs=supplier_unit_costs, + supplier_risk_scores=supplier_risk_scores, + demand=demand, + shipping_cost=shipping_cost, + risk_weight=risk_weight, + unmet_penalty=unmet_penalty, + max_supplier_share=max_supplier_share, + composition_targets=composition_targets, + composition_tolerance=composition_tolerance, + composition_profiles=composition_profiles, + ) + except Exception: + logger.exception("OR-Tools solver failed unexpectedly") + return None + + +def _build_and_solve( + *, + markets: list[str], + supplier_names: list[str], + supplier_capacities: dict[str, float], + supplier_unit_costs: dict[str, float], + supplier_risk_scores: dict[str, float], + demand: dict[str, float], + shipping_cost: dict[str, dict[str, float]], + risk_weight: float, + unmet_penalty: float, + max_supplier_share: float, + composition_targets: dict[str, float], + composition_tolerance: float, + composition_profiles: dict[str, dict[str, float]], +) -> ORToolsResult | None: + solver = pywraplp.Solver.CreateSolver("CBC") + if solver is None: + logger.warning("CBC solver not available in OR-Tools installation") + return None + + total_demand = sum(demand.values()) + + # Supplier share caps + supplier_max: dict[str, float] = {} + for s in supplier_names: + share_cap = max_supplier_share * total_demand + supplier_max[s] = min(supplier_capacities[s], share_cap) + + # --- Decision variables ------------------------------------------------- + # x[s, m] — continuous allocation flow + x: dict[tuple[str, str], Any] = {} + for s in supplier_names: + for m in markets: + x[s, m] = solver.NumVar(0.0, solver.infinity(), f"x_{s}_{m}") + + # y[s] — binary supplier activation + y: dict[str, Any] = {} + for s in supplier_names: + y[s] = solver.IntVar(0, 1, f"y_{s}") + + # u[m] — continuous unmet demand + u: dict[str, Any] = {} + for m in markets: + u[m] = solver.NumVar(0.0, solver.infinity(), f"u_{m}") + + # --- Constraints -------------------------------------------------------- + # (1) Demand: sum_s x[s,m] + u[m] = demand[m] + for m in markets: + ct = solver.Constraint(demand[m], demand[m], f"demand_{m}") + for s in supplier_names: + ct.SetCoefficient(x[s, m], 1.0) + ct.SetCoefficient(u[m], 1.0) + + # (2) Capacity: sum_m x[s,m] <= capacity[s] + for s in supplier_names: + ct = solver.Constraint(0.0, supplier_capacities[s], f"cap_{s}") + for m in markets: + ct.SetCoefficient(x[s, m], 1.0) + + # (3) Activation (big-M linking): x[s,m] <= capacity[s] * y[s] + for s in supplier_names: + big_m = supplier_capacities[s] + for m in markets: + ct = solver.Constraint(-solver.infinity(), 0.0, f"act_{s}_{m}") + ct.SetCoefficient(x[s, m], 1.0) + ct.SetCoefficient(y[s], -big_m) + + # (4) Max share: sum_m x[s,m] <= max_share * total_demand + for s in supplier_names: + ct = solver.Constraint(0.0, supplier_max[s], f"share_{s}") + for m in markets: + ct.SetCoefficient(x[s, m], 1.0) + + # (5) Composition constraints (linearized) + for comp, target in composition_targets.items(): + # Upper bound: sum_{s,m} x[s,m] * (profile[s,c] - target - tol) <= 0 + ct_upper = solver.Constraint( + -solver.infinity(), 0.0, f"comp_upper_{comp}" + ) + for s in supplier_names: + profile_val = composition_profiles.get(s, {}).get(comp, 0.0) + coeff = profile_val - target - composition_tolerance + for m in markets: + ct_upper.SetCoefficient(x[s, m], coeff) + + # Lower bound: sum_{s,m} x[s,m] * (target - tol - profile[s,c]) <= 0 + ct_lower = solver.Constraint( + -solver.infinity(), 0.0, f"comp_lower_{comp}" + ) + for s in supplier_names: + profile_val = composition_profiles.get(s, {}).get(comp, 0.0) + coeff = target - composition_tolerance - profile_val + for m in markets: + ct_lower.SetCoefficient(x[s, m], coeff) + + # --- Objective ---------------------------------------------------------- + objective = solver.Objective() + for s in supplier_names: + for m in markets: + unit_cost = supplier_unit_costs[s] + ship = shipping_cost.get(s, {}).get(m, 0.0) + risk = risk_weight * supplier_risk_scores[s] + objective.SetCoefficient(x[s, m], unit_cost + ship + risk) + for m in markets: + objective.SetCoefficient(u[m], unmet_penalty) + objective.SetMinimization() + + # --- Solve -------------------------------------------------------------- + solver.SetTimeLimit(60_000) # 60 seconds + status = solver.Solve() + + if status not in (pywraplp.Solver.OPTIMAL, pywraplp.Solver.FEASIBLE): + logger.warning("OR-Tools solver status %s — no feasible solution", status) + return None + + # --- Extract solution --------------------------------------------------- + allocation: dict[tuple[str, str], float] = {} + for s in supplier_names: + for m in markets: + val = x[s, m].solution_value() + if val > _EPS: + allocation[s, m] = val + + unmet: dict[str, float] = {} + for m in markets: + val = u[m].solution_value() + unmet[m] = max(0.0, val) + + solver_status: str + if status == pywraplp.Solver.OPTIMAL: + solver_status = "optimal" + else: + solver_status = "optimal_mip_feasible" + + return ORToolsResult( + allocation=allocation, + unmet=unmet, + solver_status=solver_status, + objective_value=solver.Objective().Value(), + supplier_max=supplier_max, + ) diff --git a/src/ursa/tools/cmm_supply_chain_optimization_tool.py b/src/ursa/tools/cmm_supply_chain_optimization_tool.py new file mode 100644 index 00000000..588622dc --- /dev/null +++ b/src/ursa/tools/cmm_supply_chain_optimization_tool.py @@ -0,0 +1,751 @@ +from __future__ import annotations + +import logging +from dataclasses import dataclass +from typing import Any + +from langchain_core.tools import tool + +logger = logging.getLogger(__name__) + +_EPS = 1e-9 + + +@dataclass(frozen=True) +class _Supplier: + name: str + capacity: float + unit_cost: float + risk_score: float + + +def _to_float(value: Any, default: float = 0.0) -> float: + try: + return float(value) + except (TypeError, ValueError): + return float(default) + + +def _to_fraction(value: Any, default: float = 0.0) -> float: + return max(0.0, min(1.0, _to_float(value, default))) + + +def _normalize_component_name(name: Any) -> str: + return str(name).strip().upper() + + +def _sum_allocated_by_supplier( + allocation: dict[tuple[str, str], float], + suppliers: list[_Supplier], +) -> dict[str, float]: + totals = {supplier.name: 0.0 for supplier in suppliers} + for (supplier_name, _market), amount in allocation.items(): + totals[supplier_name] = totals.get(supplier_name, 0.0) + amount + return totals + + +def _compute_costs( + *, + allocation: dict[tuple[str, str], float], + suppliers: list[_Supplier], + shipping_cost: dict[str, dict[str, float]], + risk_weight: float, + unmet: dict[str, float], + unmet_penalty: float, +) -> dict[str, float]: + suppliers_by_name = {supplier.name: supplier for supplier in suppliers} + procurement_cost = 0.0 + shipping_total = 0.0 + risk_total = 0.0 + + for (supplier_name, market), amount in allocation.items(): + supplier = suppliers_by_name[supplier_name] + procurement_cost += amount * supplier.unit_cost + shipping_total += amount * shipping_cost.get(supplier_name, {}).get( + market, 0.0 + ) + risk_total += amount * (risk_weight * supplier.risk_score) + + unmet_cost = sum(unmet.values()) * unmet_penalty + return { + "procurement": procurement_cost, + "shipping": shipping_total, + "risk_penalty": risk_total, + "unmet_penalty": unmet_cost, + } + + +def _compute_composition_metrics( + *, + totals_by_supplier: dict[str, float], + composition_targets: dict[str, float], + composition_profiles: dict[str, dict[str, float]], + composition_tolerance: float, +) -> dict[str, Any] | None: + if not composition_targets: + return None + + total_allocated = sum(totals_by_supplier.values()) + actual: dict[str, float] = {} + residuals: dict[str, float] = {} + + for component, target in composition_targets.items(): + if total_allocated <= _EPS: + actual_value = 0.0 + else: + weighted = 0.0 + for supplier_name, amount in totals_by_supplier.items(): + profile = composition_profiles.get(supplier_name, {}) + weighted += amount * profile.get(component, 0.0) + actual_value = weighted / total_allocated + actual[component] = actual_value + residuals[component] = actual_value - target + + feasible = all( + abs(residual) <= composition_tolerance + _EPS + for residual in residuals.values() + ) + + return { + "targets": composition_targets, + "actual": actual, + "residuals": residuals, + "tolerance": composition_tolerance, + "feasible": feasible, + } + + +def _shift_between_suppliers( + *, + allocation: dict[tuple[str, str], float], + supplier_remaining: dict[str, float], + donor: str, + receiver: str, + markets: list[str], + max_shift: float, +) -> float: + if max_shift <= _EPS: + return 0.0 + + shifted_total = 0.0 + for market in markets: + if shifted_total >= max_shift - _EPS: + break + + donor_key = (donor, market) + receiver_key = (receiver, market) + donor_amount = allocation.get(donor_key, 0.0) + receiver_cap = supplier_remaining.get(receiver, 0.0) + + shift = min( + max_shift - shifted_total, + donor_amount, + receiver_cap, + ) + if shift <= _EPS: + continue + + new_donor_amount = donor_amount - shift + if new_donor_amount <= _EPS: + allocation.pop(donor_key, None) + else: + allocation[donor_key] = new_donor_amount + + allocation[receiver_key] = allocation.get(receiver_key, 0.0) + shift + supplier_remaining[receiver] = receiver_cap - shift + supplier_remaining[donor] = supplier_remaining.get(donor, 0.0) + shift + shifted_total += shift + + return shifted_total + + +def _rebalance_for_composition( + *, + allocation: dict[tuple[str, str], float], + supplier_remaining: dict[str, float], + suppliers: list[_Supplier], + markets: list[str], + composition_targets: dict[str, float], + composition_profiles: dict[str, dict[str, float]], + composition_tolerance: float, + max_iterations: int = 200, +) -> None: + if not composition_targets: + return + + for _ in range(max_iterations): + totals = _sum_allocated_by_supplier(allocation, suppliers) + metrics = _compute_composition_metrics( + totals_by_supplier=totals, + composition_targets=composition_targets, + composition_profiles=composition_profiles, + composition_tolerance=composition_tolerance, + ) + if metrics is None or metrics["feasible"]: + return + + total_allocated = sum(totals.values()) + if total_allocated <= _EPS: + return + + residuals = metrics["residuals"] + pending_components = [ + (component, float(residual)) + for component, residual in residuals.items() + if abs(float(residual)) > composition_tolerance + _EPS + ] + pending_components.sort(key=lambda item: abs(item[1]), reverse=True) + + progressed = False + for component, residual in pending_components: + too_high = residual > 0.0 + if too_high: + donor_candidates = sorted( + suppliers, + key=lambda supplier: ( + -composition_profiles[supplier.name].get(component, 0.0), + supplier.name, + ), + ) + receiver_candidates = sorted( + suppliers, + key=lambda supplier: ( + composition_profiles[supplier.name].get(component, 0.0), + supplier.name, + ), + ) + else: + donor_candidates = sorted( + suppliers, + key=lambda supplier: ( + composition_profiles[supplier.name].get(component, 0.0), + supplier.name, + ), + ) + receiver_candidates = sorted( + suppliers, + key=lambda supplier: ( + -composition_profiles[supplier.name].get(component, 0.0), + supplier.name, + ), + ) + + for donor in donor_candidates: + donor_name = donor.name + donor_amount = totals.get(donor_name, 0.0) + if donor_amount <= _EPS: + continue + + donor_profile = composition_profiles[donor_name].get(component, 0.0) + for receiver in receiver_candidates: + receiver_name = receiver.name + if receiver_name == donor_name: + continue + if supplier_remaining.get(receiver_name, 0.0) <= _EPS: + continue + + receiver_profile = composition_profiles[receiver_name].get( + component, 0.0 + ) + profile_delta = ( + donor_profile - receiver_profile + if too_high + else receiver_profile - donor_profile + ) + if profile_delta <= _EPS: + continue + + needed_shift = ( + (abs(residual) - composition_tolerance) + * total_allocated + / profile_delta + ) + if needed_shift <= _EPS: + continue + + shifted = _shift_between_suppliers( + allocation=allocation, + supplier_remaining=supplier_remaining, + donor=donor_name, + receiver=receiver_name, + markets=markets, + max_shift=needed_shift, + ) + if shifted > _EPS: + progressed = True + break + if progressed: + break + if progressed: + break + + if not progressed: + return + + +def _normalize_input(payload: dict[str, Any]) -> tuple[ + str, + list[str], + list[_Supplier], + dict[str, float], + dict[str, dict[str, float]], + float, + float, + float, + dict[str, float], + float, + dict[str, dict[str, float]], +]: + commodity = str(payload.get("commodity", "CMM")) + + demand_raw = payload.get("demand", {}) + if not isinstance(demand_raw, dict) or not demand_raw: + raise ValueError("optimization_input.demand must be a non-empty mapping") + markets = sorted(str(k) for k in demand_raw.keys()) + demand = {str(k): _to_float(v, 0.0) for k, v in demand_raw.items()} + + suppliers_raw = payload.get("suppliers", []) + if not isinstance(suppliers_raw, list) or not suppliers_raw: + raise ValueError("optimization_input.suppliers must be a non-empty list") + + composition_targets_raw = payload.get("composition_targets", {}) + composition_targets: dict[str, float] = {} + if isinstance(composition_targets_raw, dict): + for component, target in composition_targets_raw.items(): + name = _normalize_component_name(component) + if not name: + continue + composition_targets[name] = _to_fraction(target, 0.0) + composition_tolerance = _to_fraction( + payload.get("composition_tolerance"), + 0.0, + ) + + suppliers: list[_Supplier] = [] + composition_profiles: dict[str, dict[str, float]] = {} + for idx, item in enumerate(suppliers_raw): + if not isinstance(item, dict): + raise ValueError("each supplier must be a mapping") + name = str(item.get("name") or f"supplier_{idx + 1}") + suppliers.append( + _Supplier( + name=name, + capacity=max(0.0, _to_float(item.get("capacity"), 0.0)), + unit_cost=max(0.0, _to_float(item.get("unit_cost"), 0.0)), + risk_score=max(0.0, _to_float(item.get("risk_score"), 0.0)), + ) + ) + + raw_profile = item.get("composition_profile", {}) + profile: dict[str, float] = {} + if isinstance(raw_profile, dict): + for component, fraction in raw_profile.items(): + comp_name = _normalize_component_name(component) + if not comp_name: + continue + profile[comp_name] = _to_fraction(fraction, 0.0) + composition_profiles[name] = profile + + suppliers = sorted(suppliers, key=lambda supplier: supplier.name) + + components = set(composition_targets.keys()) + for profile in composition_profiles.values(): + components.update(profile.keys()) + + normalized_profiles: dict[str, dict[str, float]] = {} + for supplier in suppliers: + profile = composition_profiles.get(supplier.name, {}) + normalized_profiles[supplier.name] = { + component: profile.get(component, 0.0) + for component in sorted(components) + } + + shipping_raw = payload.get("shipping_cost", {}) + shipping_cost: dict[str, dict[str, float]] = {} + if isinstance(shipping_raw, dict): + for supplier_name, market_costs in shipping_raw.items(): + if isinstance(market_costs, dict): + shipping_cost[str(supplier_name)] = { + str(market): _to_float(cost, 0.0) + for market, cost in market_costs.items() + } + + risk_weight = max(0.0, _to_float(payload.get("risk_weight"), 0.0)) + unmet_penalty = max(1.0, _to_float(payload.get("unmet_demand_penalty"), 10000.0)) + max_supplier_share = _to_fraction(payload.get("max_supplier_share"), 1.0) + + return ( + commodity, + markets, + suppliers, + demand, + shipping_cost, + risk_weight, + unmet_penalty, + max_supplier_share, + composition_targets, + composition_tolerance, + normalized_profiles, + ) + + +def _greedy_fallback( + *, + markets: list[str], + suppliers: list[_Supplier], + demand: dict[str, float], + shipping_cost: dict[str, dict[str, float]], + risk_weight: float, + max_supplier_share: float, +) -> tuple[ + dict[tuple[str, str], float], + dict[str, float], + dict[str, float], + dict[str, float], +]: + total_demand = sum(demand.values()) + supplier_max: dict[str, float] = {} + for supplier in suppliers: + share_cap = max_supplier_share * total_demand + supplier_max[supplier.name] = min(supplier.capacity, share_cap) + + supplier_remaining = dict(supplier_max) + demand_remaining = {market: float(demand[market]) for market in markets} + allocation: dict[tuple[str, str], float] = {} + + for market in markets: + candidates: list[tuple[float, _Supplier]] = [] + for supplier in suppliers: + landed_cost = ( + supplier.unit_cost + + shipping_cost.get(supplier.name, {}).get(market, 0.0) + + risk_weight * supplier.risk_score + ) + candidates.append((landed_cost, supplier)) + candidates.sort(key=lambda item: (item[0], item[1].name)) + + for _landed_cost, supplier in candidates: + if demand_remaining[market] <= _EPS: + break + available = supplier_remaining[supplier.name] + if available <= _EPS: + continue + flow = min(available, demand_remaining[market]) + if flow <= _EPS: + continue + allocation[(supplier.name, market)] = flow + supplier_remaining[supplier.name] -= flow + demand_remaining[market] -= flow + + unmet = {market: max(0.0, demand_remaining[market]) for market in markets} + return allocation, unmet, supplier_max, supplier_remaining + + +def _build_output( + *, + commodity: str, + markets: list[str], + suppliers: list[_Supplier], + demand: dict[str, float], + allocation: dict[tuple[str, str], float], + unmet: dict[str, float], + supplier_max: dict[str, float], + shipping_cost: dict[str, dict[str, float]], + risk_weight: float, + unmet_penalty: float, + composition_targets: dict[str, float], + composition_profiles: dict[str, dict[str, float]], + composition_tolerance: float, + status_label: str, +) -> dict[str, Any]: + """Build the canonical output dict from a solved allocation. + + ``status_label`` is the feasible-case label (e.g. ``"optimal"`` or + ``"optimal_greedy"``). Infeasibility statuses are derived from the + actual solution values and override ``status_label`` when appropriate. + """ + totals_by_supplier = _sum_allocated_by_supplier(allocation, suppliers) + costs = _compute_costs( + allocation=allocation, + suppliers=suppliers, + shipping_cost=shipping_cost, + risk_weight=risk_weight, + unmet=unmet, + unmet_penalty=unmet_penalty, + ) + + allocation_items: list[dict[str, Any]] = [] + suppliers_by_name = {supplier.name: supplier for supplier in suppliers} + for (supplier_name, market), amount in sorted( + allocation.items(), key=lambda item: (item[0][0], item[0][1]) + ): + supplier = suppliers_by_name[supplier_name] + unit_total = ( + supplier.unit_cost + + shipping_cost.get(supplier_name, {}).get(market, 0.0) + + risk_weight * supplier.risk_score + ) + allocation_items.append( + { + "supplier": supplier_name, + "market": market, + "amount": round(amount, 6), + "unit_total_cost": round(unit_total, 6), + } + ) + + demand_residual: dict[str, float] = {} + for market in markets: + allocated = sum( + amount + for (supplier_name, mkt), amount in allocation.items() + if mkt == market and supplier_name + ) + demand_residual[market] = round(allocated + unmet[market] - demand[market], 9) + + supplier_capacity_residual: dict[str, float] = {} + supplier_share_residual: dict[str, float] = {} + for supplier in suppliers: + used = totals_by_supplier[supplier.name] + supplier_capacity_residual[supplier.name] = round( + supplier.capacity - used, + 9, + ) + supplier_share_residual[supplier.name] = round( + supplier_max[supplier.name] - used, + 9, + ) + + unmet_total = sum(unmet.values()) + objective_value = sum(costs.values()) + + composition = _compute_composition_metrics( + totals_by_supplier=totals_by_supplier, + composition_targets=composition_targets, + composition_profiles=composition_profiles, + composition_tolerance=composition_tolerance, + ) + composition_feasible = True + composition_residuals: dict[str, float] = {} + composition_binding: list[str] = [] + if composition is not None: + composition_feasible = bool(composition["feasible"]) + composition_residuals = { + component: round(float(residual), 9) + for component, residual in composition["residuals"].items() + } + composition_binding = [ + component + for component, residual in composition["residuals"].items() + if abs(float(residual)) >= composition_tolerance - _EPS + ] + + feasible = unmet_total <= _EPS and composition_feasible + if unmet_total > _EPS and not composition_feasible: + status = "infeasible_unmet_and_composition" + elif unmet_total > _EPS: + status = "infeasible_unmet_demand" + elif not composition_feasible: + status = "infeasible_composition_constraints" + else: + status = status_label + + active_capacity = [ + supplier.name + for supplier in suppliers + if abs(supplier_capacity_residual[supplier.name]) <= 1e-6 + ] + bottleneck_markets = [market for market in markets if unmet[market] > _EPS] + allocated_total = sum(totals_by_supplier.values()) + avg_unit = objective_value / allocated_total if allocated_total else 0.0 + + composition_output = None + if composition is not None: + composition_output = { + "targets": { + component: round(float(target), 9) + for component, target in composition["targets"].items() + }, + "actual": { + component: round(float(actual), 9) + for component, actual in composition["actual"].items() + }, + "residuals": composition_residuals, + "tolerance": round(float(composition["tolerance"]), 9), + "feasible": composition_feasible, + } + + return { + "commodity": commodity, + "status": status, + "feasible": feasible, + "objective_value": round(objective_value, 6), + "objective_breakdown": {k: round(v, 6) for k, v in costs.items()}, + "allocations": allocation_items, + "unmet_demand": {k: round(v, 6) for k, v in unmet.items()}, + "constraint_residuals": { + "demand_balance": demand_residual, + "supplier_capacity": supplier_capacity_residual, + "supplier_share_cap": supplier_share_residual, + "composition": composition_residuals, + }, + "composition": composition_output, + "sensitivity_summary": { + "active_capacity_constraints": active_capacity, + "bottleneck_markets": bottleneck_markets, + "average_unit_cost": round(avg_unit, 6), + "unmet_demand_total": round(unmet_total, 6), + "composition_binding_components": sorted(composition_binding), + "composition_feasible": composition_feasible, + }, + } + + +def _try_ortools( + *, + markets: list[str], + suppliers: list[_Supplier], + demand: dict[str, float], + shipping_cost: dict[str, dict[str, float]], + risk_weight: float, + unmet_penalty: float, + max_supplier_share: float, + composition_targets: dict[str, float], + composition_tolerance: float, + composition_profiles: dict[str, dict[str, float]], +) -> tuple[dict[tuple[str, str], float], dict[str, float], str, dict[str, float]] | None: + """Attempt to solve via OR-Tools MIP. Returns ``None`` on any failure.""" + try: + from ursa.tools.cmm_ortools_solver import solve_with_ortools + except Exception: + return None + + result = solve_with_ortools( + markets=markets, + supplier_names=[s.name for s in suppliers], + supplier_capacities={s.name: s.capacity for s in suppliers}, + supplier_unit_costs={s.name: s.unit_cost for s in suppliers}, + supplier_risk_scores={s.name: s.risk_score for s in suppliers}, + demand=demand, + shipping_cost=shipping_cost, + risk_weight=risk_weight, + unmet_penalty=unmet_penalty, + max_supplier_share=max_supplier_share, + composition_targets=composition_targets, + composition_tolerance=composition_tolerance, + composition_profiles=composition_profiles, + ) + if result is None: + return None + + return result.allocation, result.unmet, result.solver_status, result.supplier_max + + +def solve_cmm_supply_chain_optimization( + optimization_input: dict[str, Any], +) -> dict[str, Any]: + ( + commodity, + markets, + suppliers, + demand, + shipping_cost, + risk_weight, + unmet_penalty, + max_supplier_share, + composition_targets, + composition_tolerance, + composition_profiles, + ) = _normalize_input(optimization_input) + + # --- Try OR-Tools MIP first --------------------------------------------- + mip_result = _try_ortools( + markets=markets, + suppliers=suppliers, + demand=demand, + shipping_cost=shipping_cost, + risk_weight=risk_weight, + unmet_penalty=unmet_penalty, + max_supplier_share=max_supplier_share, + composition_targets=composition_targets, + composition_tolerance=composition_tolerance, + composition_profiles=composition_profiles, + ) + if mip_result is not None: + allocation, unmet, solver_status, supplier_max = mip_result + logger.info("OR-Tools solver succeeded with status=%s", solver_status) + return _build_output( + commodity=commodity, + markets=markets, + suppliers=suppliers, + demand=demand, + allocation=allocation, + unmet=unmet, + supplier_max=supplier_max, + shipping_cost=shipping_cost, + risk_weight=risk_weight, + unmet_penalty=unmet_penalty, + composition_targets=composition_targets, + composition_profiles=composition_profiles, + composition_tolerance=composition_tolerance, + status_label=solver_status, + ) + + # --- Greedy fallback ---------------------------------------------------- + logger.info("Falling back to greedy solver") + allocation, unmet, supplier_max, supplier_remaining = _greedy_fallback( + markets=markets, + suppliers=suppliers, + demand=demand, + shipping_cost=shipping_cost, + risk_weight=risk_weight, + max_supplier_share=max_supplier_share, + ) + + _rebalance_for_composition( + allocation=allocation, + supplier_remaining=supplier_remaining, + suppliers=suppliers, + markets=markets, + composition_targets=composition_targets, + composition_profiles=composition_profiles, + composition_tolerance=composition_tolerance, + ) + + return _build_output( + commodity=commodity, + markets=markets, + suppliers=suppliers, + demand=demand, + allocation=allocation, + unmet=unmet, + supplier_max=supplier_max, + shipping_cost=shipping_cost, + risk_weight=risk_weight, + unmet_penalty=unmet_penalty, + composition_targets=composition_targets, + composition_profiles=composition_profiles, + composition_tolerance=composition_tolerance, + status_label="optimal_greedy", + ) + + +@tool +def run_cmm_supply_chain_optimization( + optimization_input: dict[str, Any], +) -> dict[str, Any]: + """Run a deterministic CMM supply allocation optimization. + + Expected optimization_input schema: + - commodity: str + - demand: mapping market -> required quantity + - suppliers: list of {name, capacity, unit_cost, risk_score} + - optional per-supplier composition_profile: mapping component -> fraction + - shipping_cost: optional mapping supplier -> market -> cost + - risk_weight: optional float + - unmet_demand_penalty: optional float + - max_supplier_share: optional float in [0, 1] + - composition_targets: optional mapping component -> target fraction + - composition_tolerance: optional fraction tolerance in [0, 1] + """ + return solve_cmm_supply_chain_optimization(optimization_input) diff --git a/tests/tools/test_cmm_ortools_solver.py b/tests/tools/test_cmm_ortools_solver.py new file mode 100644 index 00000000..4a443560 --- /dev/null +++ b/tests/tools/test_cmm_ortools_solver.py @@ -0,0 +1,269 @@ +from __future__ import annotations + +from unittest.mock import patch + +import pytest + +from ursa.tools.cmm_ortools_solver import ORToolsResult, ortools_available, solve_with_ortools + +_skip_no_ortools = pytest.mark.skipif( + not ortools_available(), reason="ortools not installed" +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- +def _base_kwargs( + *, + suppliers: list[dict[str, object]] | None = None, + demand: dict[str, float] | None = None, + composition_targets: dict[str, float] | None = None, + composition_tolerance: float = 0.0, + composition_profiles: dict[str, dict[str, float]] | None = None, + max_supplier_share: float = 1.0, +) -> dict[str, object]: + """Build a minimal keyword-argument dict for ``solve_with_ortools``.""" + if suppliers is None: + suppliers = [ + {"name": "S1", "capacity": 100.0, "unit_cost": 5.0, "risk_score": 0.1}, + {"name": "S2", "capacity": 80.0, "unit_cost": 7.0, "risk_score": 0.2}, + ] + if demand is None: + demand = {"NA": 60.0, "EU": 40.0} + if composition_targets is None: + composition_targets = {} + if composition_profiles is None: + composition_profiles = {s["name"]: {} for s in suppliers} + + names = [str(s["name"]) for s in suppliers] + return dict( + markets=sorted(demand.keys()), + supplier_names=names, + supplier_capacities={str(s["name"]): float(s["capacity"]) for s in suppliers}, + supplier_unit_costs={str(s["name"]): float(s["unit_cost"]) for s in suppliers}, + supplier_risk_scores={str(s["name"]): float(s["risk_score"]) for s in suppliers}, + demand=demand, + shipping_cost={n: {m: 1.0 for m in demand} for n in names}, + risk_weight=1.0, + unmet_penalty=10_000.0, + max_supplier_share=max_supplier_share, + composition_targets=composition_targets, + composition_tolerance=composition_tolerance, + composition_profiles=composition_profiles, + ) + + +# --------------------------------------------------------------------------- +# 1. Basic feasible — 2 suppliers, 2 markets, no composition +# --------------------------------------------------------------------------- +@_skip_no_ortools +def test_basic_feasible() -> None: + kw = _base_kwargs() + result = solve_with_ortools(**kw) + + assert result is not None + assert result.solver_status in ("optimal", "optimal_mip_feasible") + + # All demand met + total_unmet = sum(result.unmet.values()) + assert total_unmet < 1e-6 + + # Allocations cover demand + alloc_by_market: dict[str, float] = {} + for (_, m), amt in result.allocation.items(): + alloc_by_market[m] = alloc_by_market.get(m, 0.0) + amt + for m, d in kw["demand"].items(): + assert abs(alloc_by_market.get(m, 0.0) - d) < 1e-6 + + +# --------------------------------------------------------------------------- +# 2. Composition feasible — la_rich + y_rich must blend 50/50 to hit targets +# --------------------------------------------------------------------------- +@_skip_no_ortools +def test_composition_feasible() -> None: + suppliers = [ + {"name": "la_rich", "capacity": 100.0, "unit_cost": 1.0, "risk_score": 0.0}, + {"name": "y_rich", "capacity": 100.0, "unit_cost": 5.0, "risk_score": 0.0}, + ] + profiles = { + "la_rich": {"LA": 0.10, "Y": 0.00}, + "y_rich": {"LA": 0.00, "Y": 0.10}, + } + kw = _base_kwargs( + suppliers=suppliers, + demand={"US": 100.0}, + composition_targets={"LA": 0.05, "Y": 0.05}, + composition_tolerance=0.001, + composition_profiles=profiles, + ) + result = solve_with_ortools(**kw) + + assert result is not None + assert result.solver_status in ("optimal", "optimal_mip_feasible") + assert sum(result.unmet.values()) < 1e-6 + + # Both suppliers must be used ~50 each + totals: dict[str, float] = {} + for (s, _), amt in result.allocation.items(): + totals[s] = totals.get(s, 0.0) + amt + assert abs(totals.get("la_rich", 0.0) - 50.0) < 2.0 + assert abs(totals.get("y_rich", 0.0) - 50.0) < 2.0 + + +# --------------------------------------------------------------------------- +# 3. Infeasible capacity — total capacity < demand → unmet > 0 +# --------------------------------------------------------------------------- +@_skip_no_ortools +def test_infeasible_capacity() -> None: + suppliers = [ + {"name": "S1", "capacity": 30.0, "unit_cost": 5.0, "risk_score": 0.1}, + {"name": "S2", "capacity": 20.0, "unit_cost": 7.0, "risk_score": 0.2}, + ] + kw = _base_kwargs(suppliers=suppliers, demand={"NA": 100.0}) + result = solve_with_ortools(**kw) + + assert result is not None + assert sum(result.unmet.values()) > 0.0 + + +# --------------------------------------------------------------------------- +# 4. Infeasible composition — suppliers can't achieve target +# --------------------------------------------------------------------------- +@_skip_no_ortools +def test_infeasible_composition() -> None: + suppliers = [ + {"name": "A", "capacity": 100.0, "unit_cost": 1.0, "risk_score": 0.0}, + {"name": "B", "capacity": 100.0, "unit_cost": 1.0, "risk_score": 0.0}, + ] + profiles = { + "A": {"LA": 0.02, "Y": 0.01}, + "B": {"LA": 0.03, "Y": 0.02}, + } + kw = _base_kwargs( + suppliers=suppliers, + demand={"US": 100.0}, + composition_targets={"LA": 0.10, "Y": 0.10}, + composition_tolerance=0.001, + composition_profiles=profiles, + ) + result = solve_with_ortools(**kw) + + # The composition constraints make a zero-unmet solution infeasible, + # so either solver returns None (infeasible) or it uses unmet demand to + # relax the blend constraints. + if result is not None: + assert sum(result.unmet.values()) > 0.0 + + +# --------------------------------------------------------------------------- +# 5. MIP objective ≤ greedy objective +# --------------------------------------------------------------------------- +@_skip_no_ortools +def test_mip_leq_greedy() -> None: + from ursa.tools.cmm_supply_chain_optimization_tool import ( + _greedy_fallback, + _Supplier, + _compute_costs, + ) + + suppliers_raw = [ + {"name": "S1", "capacity": 100.0, "unit_cost": 5.0, "risk_score": 0.1}, + {"name": "S2", "capacity": 80.0, "unit_cost": 7.0, "risk_score": 0.2}, + {"name": "S3", "capacity": 60.0, "unit_cost": 6.0, "risk_score": 0.05}, + ] + demand = {"NA": 60.0, "EU": 40.0, "APAC": 30.0} + shipping = { + "S1": {"NA": 1.0, "EU": 3.0, "APAC": 4.0}, + "S2": {"NA": 2.0, "EU": 1.0, "APAC": 3.0}, + "S3": {"NA": 2.5, "EU": 2.0, "APAC": 1.0}, + } + risk_weight = 2.0 + unmet_penalty = 10_000.0 + max_share = 0.6 + + # --- OR-Tools --- + names = [s["name"] for s in suppliers_raw] + mip_result = solve_with_ortools( + markets=sorted(demand.keys()), + supplier_names=names, + supplier_capacities={s["name"]: s["capacity"] for s in suppliers_raw}, + supplier_unit_costs={s["name"]: s["unit_cost"] for s in suppliers_raw}, + supplier_risk_scores={s["name"]: s["risk_score"] for s in suppliers_raw}, + demand=demand, + shipping_cost=shipping, + risk_weight=risk_weight, + unmet_penalty=unmet_penalty, + max_supplier_share=max_share, + composition_targets={}, + composition_tolerance=0.0, + composition_profiles={n: {} for n in names}, + ) + assert mip_result is not None + + # --- Greedy --- + supplier_objs = sorted( + [ + _Supplier(name=s["name"], capacity=s["capacity"], + unit_cost=s["unit_cost"], risk_score=s["risk_score"]) + for s in suppliers_raw + ], + key=lambda x: x.name, + ) + alloc_g, unmet_g, _, _ = _greedy_fallback( + markets=sorted(demand.keys()), + suppliers=supplier_objs, + demand=demand, + shipping_cost=shipping, + risk_weight=risk_weight, + max_supplier_share=max_share, + ) + greedy_costs = _compute_costs( + allocation=alloc_g, + suppliers=supplier_objs, + shipping_cost=shipping, + risk_weight=risk_weight, + unmet=unmet_g, + unmet_penalty=unmet_penalty, + ) + greedy_obj = sum(greedy_costs.values()) + + assert mip_result.objective_value <= greedy_obj + 1e-6 + + +# --------------------------------------------------------------------------- +# 6. Max share enforced — one cheap supplier capped by share limit +# --------------------------------------------------------------------------- +@_skip_no_ortools +def test_max_share_enforced() -> None: + suppliers = [ + {"name": "cheap", "capacity": 200.0, "unit_cost": 1.0, "risk_score": 0.0}, + {"name": "pricey", "capacity": 200.0, "unit_cost": 10.0, "risk_score": 0.0}, + ] + demand = {"US": 100.0} + kw = _base_kwargs( + suppliers=suppliers, + demand=demand, + max_supplier_share=0.5, + ) + result = solve_with_ortools(**kw) + + assert result is not None + total_demand = sum(demand.values()) + share_cap = 0.5 * total_demand + + for s_name in ["cheap", "pricey"]: + total_s = sum( + amt for (s, _), amt in result.allocation.items() if s == s_name + ) + assert total_s <= share_cap + 1e-6 + + +# --------------------------------------------------------------------------- +# 7. Graceful None when unavailable +# --------------------------------------------------------------------------- +def test_graceful_none_when_unavailable() -> None: + kw = _base_kwargs() + with patch("ursa.tools.cmm_ortools_solver._HAS_ORTOOLS", False): + result = solve_with_ortools(**kw) + assert result is None diff --git a/tests/tools/test_cmm_supply_chain_optimization_tool.py b/tests/tools/test_cmm_supply_chain_optimization_tool.py new file mode 100644 index 00000000..3f6713cc --- /dev/null +++ b/tests/tools/test_cmm_supply_chain_optimization_tool.py @@ -0,0 +1,150 @@ +from ursa.tools.cmm_supply_chain_optimization_tool import ( + solve_cmm_supply_chain_optimization, +) + + +def _base_input(): + return { + "commodity": "CO", + "demand": {"NA": 100, "EU": 80}, + "suppliers": [ + { + "name": "US_mine", + "capacity": 120, + "unit_cost": 8.0, + "risk_score": 0.2, + }, + { + "name": "Allied_import", + "capacity": 90, + "unit_cost": 9.2, + "risk_score": 0.1, + }, + ], + "shipping_cost": { + "US_mine": {"NA": 1.0, "EU": 2.0}, + "Allied_import": {"NA": 1.4, "EU": 1.2}, + }, + "risk_weight": 2.0, + "max_supplier_share": 0.8, + } + + +def test_cmm_optimization_output_schema_and_determinism(): + payload = _base_input() + result_a = solve_cmm_supply_chain_optimization(payload) + result_b = solve_cmm_supply_chain_optimization(payload) + + assert result_a == result_b + assert "objective_value" in result_a + assert "allocations" in result_a + assert "constraint_residuals" in result_a + assert "sensitivity_summary" in result_a + assert isinstance(result_a["feasible"], bool) + + +def test_cmm_optimization_reports_infeasible_with_unmet_demand(): + payload = _base_input() + payload["suppliers"] = [ + { + "name": "low_capacity_1", + "capacity": 50, + "unit_cost": 8.0, + "risk_score": 0.2, + }, + { + "name": "low_capacity_2", + "capacity": 30, + "unit_cost": 9.2, + "risk_score": 0.1, + }, + ] + + result = solve_cmm_supply_chain_optimization(payload) + + assert result["feasible"] is False + assert result["status"] == "infeasible_unmet_demand" + assert sum(result["unmet_demand"].values()) > 0 + + +def test_cmm_optimization_enforces_composition_targets_when_possible(): + payload = { + "commodity": "ND2FE14B_LA5_Y5", + "demand": {"US": 100}, + "suppliers": [ + { + "name": "la_rich", + "capacity": 100, + "unit_cost": 1.0, + "risk_score": 0.1, + "composition_profile": {"LA": 0.10, "Y": 0.00}, + }, + { + "name": "y_rich", + "capacity": 100, + "unit_cost": 5.0, + "risk_score": 0.1, + "composition_profile": {"LA": 0.00, "Y": 0.10}, + }, + ], + "shipping_cost": { + "la_rich": {"US": 0.0}, + "y_rich": {"US": 0.0}, + }, + "risk_weight": 0.0, + "max_supplier_share": 1.0, + "composition_targets": {"LA": 0.05, "Y": 0.05}, + "composition_tolerance": 0.001, + } + + result = solve_cmm_supply_chain_optimization(payload) + + assert result["feasible"] is True + assert result["status"] in ("optimal", "optimal_greedy", "optimal_mip_feasible") + composition = result["composition"] + assert composition is not None + assert composition["feasible"] is True + assert abs(composition["actual"]["LA"] - 0.05) <= 0.001 + assert abs(composition["actual"]["Y"] - 0.05) <= 0.0011 + + +def test_cmm_optimization_reports_infeasible_composition_constraints(): + payload = { + "commodity": "ND2FE14B_LA5_Y5", + "demand": {"US": 100}, + "suppliers": [ + { + "name": "supplier_a", + "capacity": 100, + "unit_cost": 1.0, + "risk_score": 0.1, + "composition_profile": {"LA": 0.02, "Y": 0.01}, + }, + { + "name": "supplier_b", + "capacity": 100, + "unit_cost": 1.1, + "risk_score": 0.1, + "composition_profile": {"LA": 0.03, "Y": 0.02}, + }, + ], + "shipping_cost": { + "supplier_a": {"US": 0.0}, + "supplier_b": {"US": 0.0}, + }, + "risk_weight": 0.0, + "max_supplier_share": 1.0, + "composition_targets": {"LA": 0.05, "Y": 0.05}, + "composition_tolerance": 0.001, + } + + result = solve_cmm_supply_chain_optimization(payload) + + assert result["status"] in ( + "infeasible_composition_constraints", + "infeasible_unmet_and_composition", + ) + assert result["feasible"] is False + composition = result["composition"] + assert composition is not None + assert composition["feasible"] is False