diff --git a/rao/crac/builder.py b/rao/crac/builder.py index ec747cb..10685d5 100644 --- a/rao/crac/builder.py +++ b/rao/crac/builder.py @@ -1,9 +1,13 @@ +from math import isnan +import re + from loguru import logger import pandas as pd import triplets from rao.crac import models import json from common.decorators import performance_counter +from rao.crac.context import CracWorkaroundContext class CracBuilder: @@ -12,12 +16,13 @@ class CracBuilder: This class is a placeholder and can be extended with specific pre-processing methods. """ - def __init__(self, data: pd.DataFrame, network: pd.DataFrame | None): + def __init__(self, data: pd.DataFrame, network: pd.DataFrame | None, workaround: CracWorkaroundContext | None = None): logger.info(f"CRAC builder initialized") self.data = data self.network = network self.limits = None self._crac = None + self.workaround = workaround or CracWorkaroundContext() # TODO [TEMPORARY] exclude boundary set boundary_files = self.network[(self.network.KEY == 'label') & (self.network.VALUE.str.contains("ENTSOE"))] @@ -34,6 +39,331 @@ def crac(self): def crac_pprint(self): return print(json.dumps(self.crac, indent=2)) + def apply_workarounds(self): + if not self.workaround.has_3w_replacement(): + return + logger.info("[WORKAROUND] Applying 3w transformer replacement workaround to CRAC file") + + # --------------------------------------------------------------------- + # 3W -> 3x2W transformer workaround helpers + # --------------------------------------------------------------------- + + @staticmethod + def _normalize_grid_element_id(eid: str | None) -> str | None: + """Normalize grid element ID to avoid double leading underscores. + + The source data (especially contingencies) may contain IDs with two leading + underscores ("__..."). CRAC typically uses single-underscore IDs ("_..."). + + - If the ID starts with one or more underscores, collapse them to a single one. + - If the ID does not start with an underscore, keep it unchanged. + """ + if eid is None: + return None + s = str(eid) + if s.startswith("_"): + return s.lstrip("_") + return s + + def _infer_hv_mv_nominal_kv(self, base_equipment_id: str) -> tuple[float, float]: + """Infer HV and MV nominal voltages (kV) for a 3W autotransformer. + + We only have the original 3W transformer in the binary object model, so we + approximate nominal voltages from SvVoltage values present in `self.limits`. + + Returns: + (hv_kv, mv_kv) + + Fallbacks: + - hv_kv defaults to 330 kV + - mv_kv defaults to 115 kV + """ + default_hv = 330.0 + default_mv = 115.0 + + try: + if self.limits is None or "SvVoltage.v" not in self.limits.columns or "ID_Equipment" not in self.limits.columns: + return default_hv, default_mv + + base_id = str(base_equipment_id).lstrip("_") + vs = self.limits.loc[self.limits["ID_Equipment"] == base_id, "SvVoltage.v"] + if vs is None or vs.empty: + return default_hv, default_mv + + vals = pd.to_numeric(vs, errors="coerce").dropna() + if vals.empty: + return default_hv, default_mv + + # SvVoltage.v is assumed to be in kV (see MW approximation in get_limits()) + rounded = vals.round(0) + levels = sorted({float(v) for v in rounded.tolist() if v is not None and float(v) > 1.0}) + if not levels: + return default_hv, default_mv + + hv = max(levels) if levels else default_hv + # For 3W, the MV is typically the 2nd highest voltage level. + mv = levels[-2] if len(levels) >= 2 else default_mv + + # Sanity checks + if hv <= 0: + hv = default_hv + if mv <= 0 or mv >= hv: + # If we cannot reliably infer, fall back to default MV + mv = default_mv + + return hv, mv + except Exception: + return default_hv, default_mv + + def _get_base_to_legs_map(self, include_leg3: bool = True) -> dict[str, list[str]]: + """Build mapping from replaced 3W transformer base ID to its 2W leg equipment IDs. + + The mapping is derived from workaround.replaced_3w_trafos index, which contains the + leg equipment IDs with suffix '-LegX'. The base ID is matched by stripping any leading + underscores and removing the '-Leg...' suffix. + + Args: + include_leg3: if False, returns only Leg1 and Leg2 (Leg3 is LV side and not monitored). + """ + if not self.workaround or not self.workaround.has_3w_replacement(): + return {} + + replaced_3w_trafos = getattr(self.workaround, "replaced_3w_trafos", None) + if replaced_3w_trafos is None or getattr(replaced_3w_trafos, "empty", True): + return {} + + df = replaced_3w_trafos.copy() + # Ensure index is string-like + df.index = df.index.astype(str) + + if not include_leg3: + df = df[df.index.str.contains(r"-Leg[12]$", case=False, regex=True)] + if df.empty: + return {} + + df = df.assign( + base_id=lambda x: (x.index.to_series().str.split("-Leg", n=1).str[0]).str.lstrip("_") + ) + + return ( + df.groupby("base_id") + .apply(lambda g: g.index.tolist()) + .to_dict() + ) + + def get_limits_for_replaced_3w_trafos(self, limits: dict, kind: str | None = None) -> dict: + """Extend a limits dictionary with entries for replaced 3W transformer legs (Leg1 & Leg2). + + Limits in the binary object model are only available on the original 3W transformer equipment ID + (base ID, no '-Leg' suffix). When the 3W is replaced with 3x2W, FlowCNECs are created on the legs. + This function copies the already-retrieved base limit value to the corresponding Leg1/Leg2 IDs. + + Args: + limits: dict keyed by equipment ID (grid element ID) -> limit value. + kind: Optional hint about what the limit represents. + - "current": values are in ampere; Leg2 is scaled by HV/MV nominal voltage ratio. + - any other value / None: no scaling is applied. + + Notes: + - Leg3 is intentionally NOT mapped (LV side). + - Leading underscore variants are added (no underscore, single '_', double '__') to be robust. + - Existing non-missing leg limits are not overwritten. + """ + + def _is_missing(v) -> bool: + if v is None: + return True + try: + return bool(pd.isna(v)) + except Exception: + return False + + def _variants(eid: str) -> list[str]: + base = str(eid).lstrip("_") + return [base, "_" + base, "__" + base] + + leg_re = re.compile(r"-Leg(?P[0-9]+)$", re.IGNORECASE) + + def _leg_num(leg_id: str) -> int | None: + m = leg_re.search(str(leg_id)) + if not m: + return None + try: + return int(m.group("num")) + except Exception: + return None + + if not limits: + return limits + + base_to_legs = self._get_base_to_legs_map(include_leg3=False) + if not base_to_legs: + return limits + + # Work on a copy to avoid side-effects surprises + out = dict(limits) + + mapped = 0 + for base_id, legs in base_to_legs.items(): + # Find base value under any underscore-variant key + base_value = None + for key in _variants(base_id): + v = out.get(key) + if not _is_missing(v): + base_value = v + break + + if _is_missing(base_value): + continue + + # Ensure base aliases exist as well + for key in _variants(base_id): + if _is_missing(out.get(key)): + out[key] = base_value + + # If this is a current limit, scale Leg2 by HV/MV ratio. + # Base current limit is assumed to be on the HV side (Leg1). + scale_leg2 = 1.0 + if kind == "current": + hv_kv, mv_kv = self._infer_hv_mv_nominal_kv(base_id) + if mv_kv and mv_kv > 0: + scale_leg2 = hv_kv / mv_kv + + # Map base value to Leg1 & Leg2 (with underscore variants) + for leg_id in legs: + leg_clean = str(leg_id).lstrip("_") + leg_num = _leg_num(leg_clean) + + leg_value = base_value + if kind == "current" and leg_num == 2: + # Only scale if we have a numeric value + try: + leg_value = float(base_value) * float(scale_leg2) + except Exception: + leg_value = base_value + + for key in _variants(leg_clean): + existing = out.get(key) + if _is_missing(existing): + out[key] = leg_value + mapped += 1 + elif kind == "current" and leg_num == 2: + # If earlier logic copied Leg1 current limit to Leg2 unscaled, correct it. + # We only overwrite if the existing value equals the base HV value. + try: + if float(existing) == float(base_value) and float(existing) != float(leg_value): + out[key] = leg_value + mapped += 1 + except Exception: + pass + + if mapped: + logger.info(f"[WORKAROUND] Mapped {mapped} limit entries from replaced 3W transformers to 2W legs (Leg1/Leg2)") + + return out + + def flowcnecs_3w_workaround(self): + """Replace FlowCNECs on replaced 3W transformers with FlowCNECs on their 2W legs. + + - Replaces a FlowCNEC whose networkElementId matches a replaced 3W base ID + with two FlowCNECs: one for Leg1 and one for Leg2. + - Leg3 is omitted from FlowCNECs entirely. + - New IDs follow pattern: '-leg{n}-preventive/curative'. + """ + if not self.workaround or not self.workaround.has_3w_replacement(): + return + + base_to_legs = self._get_base_to_legs_map(include_leg3=False) + if not base_to_legs: + return + + flow_cnecs = list(getattr(self._crac, "flowCnecs", []) or []) + if not flow_cnecs: + return + + leg_re = re.compile(r"-Leg(?P[0-9]+)$", re.IGNORECASE) + + def _extract_leg_num(leg_id: str, fallback: int) -> int: + m = leg_re.search(str(leg_id)) + if not m: + return fallback + try: + return int(m.group("num")) + except Exception: + return fallback + + def _inject_leg_into_id(flowcnec_id: str, leg_num: int) -> str: + if flowcnec_id.endswith("-preventive"): + base = flowcnec_id[: -len("-preventive")] + return f"{base}-leg{leg_num}-preventive" + if flowcnec_id.endswith("-curative"): + base = flowcnec_id[: -len("-curative")] + return f"{base}-leg{leg_num}-curative" + return f"{flowcnec_id}-leg{leg_num}" + + new_flow_cnecs = [] + replaced_count = 0 + created_count = 0 + + for cnec in flow_cnecs: + elem_id = getattr(cnec, "networkElementId", "") or "" + elem_norm = str(elem_id).lstrip("_") + + # Drop any Leg3 CNECs related to replaced 3W transformers (LV side not monitored) + if "-Leg" in elem_norm: + base_norm = elem_norm.split("-Leg", 1)[0] + if base_norm in base_to_legs and elem_norm.lower().endswith("-leg3"): + continue + + if elem_norm not in base_to_legs: + new_flow_cnecs.append(cnec) + continue + + # Replace base 3W FlowCNEC with Leg1 & Leg2 FlowCNECs + replaced_count += 1 + legs = base_to_legs[elem_norm] + for idx, leg_id in enumerate(legs, start=1): + leg_num = _extract_leg_num(leg_id, fallback=idx) + # Normalize to a single leading underscore if the source contains underscores + leg_network_id = self._normalize_grid_element_id(str(leg_id)) + + updates = { + "networkElementId": leg_network_id, + "id": _inject_leg_into_id(getattr(cnec, "id", ""), leg_num=leg_num), + } + new_flow_cnecs.append(cnec.model_copy(update=updates)) + created_count += 1 + + setattr(self._crac, "flowCnecs", new_flow_cnecs) + + if replaced_count: + logger.info( + f"[WORKAROUND] FlowCNECs: replaced {replaced_count} 3W-monitored FlowCNECs " + f"with {created_count} leg-specific FlowCNECs (Leg1/Leg2 only)" + ) + + def perform_cnec_consistency_check(self): + + # Find the flowCnec thresholds + flow_cnecs = list(getattr(self._crac, "flowCnecs", [])) + kept = [] + + for cnec in flow_cnecs: + cnec_name = getattr(cnec, "name", None) + + thresholds = getattr(cnec, "thresholds", []) or [] + if not isinstance(thresholds, list): + thresholds = [thresholds] + + removed = any((getattr(th, "min", None) == 0) and (getattr(th, "max", None) == 0) for th in thresholds) + + if removed: + logger.warning(f"CNEC {cnec_name} removed from CRAC file due to missing limits") + else: + kept.append(cnec) + + setattr(self._crac, "flowCnecs", kept) + def get_limits(self): if self.network is None: @@ -130,6 +460,15 @@ def _get_limit_fallback_to_patl(instance: object, primary: dict, fallback: dict) patl_apparent_power_limits = patl_limits["ApparentPowerLimit.value"].min().to_dict() tatl_apparent_power_limits = tatl_limits["ApparentPowerLimit.value"].min().to_dict() + # 3W -> 3x2W workaround: map base 3W trafo limits to Leg1 & Leg2 IDs + if self.workaround and self.workaround.has_3w_replacement(): + patl_current_limits = self.get_limits_for_replaced_3w_trafos(patl_current_limits, kind="current") + tatl_current_limits = self.get_limits_for_replaced_3w_trafos(tatl_current_limits, kind="current") + patl_active_power_limits = self.get_limits_for_replaced_3w_trafos(patl_active_power_limits) + tatl_active_power_limits = self.get_limits_for_replaced_3w_trafos(tatl_active_power_limits) + patl_apparent_power_limits = self.get_limits_for_replaced_3w_trafos(patl_apparent_power_limits) + tatl_apparent_power_limits = self.get_limits_for_replaced_3w_trafos(tatl_apparent_power_limits) + for monitored_element in self._crac.flowCnecs: # TODO figure out optimization that same CNEC on preventive and curative instance would be updated @@ -209,6 +548,47 @@ def process_contingencies(self, specific_contingencies: list | None = None): self._crac.contingencies.append(contingency) logger.debug(f"Added contingency of type {contingency_type}: {name}") + def contingencies_3w_workaround(self): + """ + If replace 3w trafo with 3x 2w trafo workaround is enabled, re-build contingencies to ensure that all the replaced 3w transformer legs are part of the contingency element(s) + Returns: Replaced contingency (if a 3w trafo contingency was passed) with correct one that has all the replaced 3w trafo legs grid element IDs included + """ + if not self.workaround.has_3w_replacement(): + return + + # Map: base_3w_id -> list of 2w leg ids (include Leg3 for contingencies) + base_to_legs = self._get_base_to_legs_map(include_leg3=True) + + for contingency in self._crac.contingencies: + + original_ids = contingency.networkElementsIds or [] + if not original_ids: + continue + + new_ids: list[str] = [] + replaced = False + + for elem_id in original_ids: + elem_str = str(elem_id) + elem_norm = elem_str.lstrip("_") + + # If the contingency already references a leg id ('...-LegX'), use its base id + base_norm = elem_norm.split("-Leg", 1)[0] + + if base_norm in base_to_legs: + # Replace the 3W element (or any single leg) with *all* its 2W legs + new_ids.extend(base_to_legs[base_norm]) + replaced = True + else: + new_ids.append(elem_str) + + # Always normalize leading underscores (fix '__' -> '_') and deduplicate while preserving order + new_ids = [self._normalize_grid_element_id(x) for x in new_ids if x] + contingency.networkElementsIds = list(dict.fromkeys(new_ids)) + + if replaced: + logger.info(f"[WORKAROUND] Contingency {contingency.name} 3W elements replaced with 2W elements") + def process_cnecs(self): """ We want to always monitor all assessed elements, so we create CNECs for each assessed element. @@ -379,11 +759,27 @@ def build_crac(self, contingency_ids: list | None = None): # Initialize CRAC object self._crac = models.Crac() # TODO can be replaced with separate function also need to include some general parameters + # Apply workaround-specific flags to CRAC building process + self.apply_workarounds() + # Process contingencies, CNECs and remedial actions self.process_contingencies(specific_contingencies=contingency_ids) + if self.workaround: + logger.info("[WORKAROUND] Applying 3w transformer replacement workaround to contingencies") + self.contingencies_3w_workaround() + self.process_cnecs() + if self.workaround: + logger.info("[WORKAROUND] Applying 3w transformer replacement workaround to FlowCNECs") + self.flowcnecs_3w_workaround() self.process_remedial_actions() + + # TODO need to also build 3w workaround RAs + # if self.workaround: + # logger.info("[WORKAROUND] Applying 3w transformer replacement workaround to Remedial actions") + self.update_limits_from_network() + self.perform_cnec_consistency_check() return self.crac diff --git a/rao/crac/context.py b/rao/crac/context.py new file mode 100644 index 0000000..b797e73 --- /dev/null +++ b/rao/crac/context.py @@ -0,0 +1,15 @@ +from dataclasses import dataclass +import pandas as pd + +@dataclass +class CracWorkaroundContext: + enable_3w_trafo_replacement: bool = False + replaced_3w_trafos: pd.DataFrame | None = None + + def has_3w_replacement(self) -> bool: + return( + self.enable_3w_trafo_replacement + and self.replaced_3w_trafos is not None + and not self.replaced_3w_trafos.empty + ) + diff --git a/rao/crac/models.py b/rao/crac/models.py index d4c548f..4373ea1 100644 --- a/rao/crac/models.py +++ b/rao/crac/models.py @@ -119,14 +119,14 @@ class Config: @field_serializer("flowCnecs", mode='plain') def exclude_3w_transformer_from_flow_cnecs(self, values: List[FlowCnec]) -> List[FlowCnec]: # TODO TEMPORARY FILTER - remove after September release - logger.warning(f"[TEMPORARY] Excluding 3W transformers from serialized CNECs for operator: ELERING") + # logger.warning(f"[TEMPORARY] Excluding 3W transformers from serialized CNECs for operator: ELERING") logger.warning(f"[TEMPORARY] Excluding transformers from serialized CNECs for operator: PSE") result = [] for cnec in values: - if "AT" in cnec.name and "10X1001A1001A39W" in cnec.operator: - logger.warning(f"3W transformer CNEC excluded: {cnec.name} [{cnec.instant}]") - continue - elif "10XPL-TSO------P" in cnec.operator and cnec.thresholds[0].unit == "apparent": + # if "AT" in cnec.name and "10X1001A1001A39W" in cnec.operator: + # logger.warning(f"3W transformer CNEC excluded: {cnec.name} [{cnec.instant}]") + # continue + if "10XPL-TSO------P" in cnec.operator: logger.warning(f"Poland area CNEC excluded due to unsupported apparent power limits: {cnec.name} [{cnec.instant}]") continue else: diff --git a/rao/handlers.py b/rao/handlers.py index 728ace5..72c565b 100644 --- a/rao/handlers.py +++ b/rao/handlers.py @@ -14,6 +14,7 @@ from common.config_parser import parse_app_properties from common.decorators import performance_counter from rao.crac.builder import CracBuilder +from rao.crac.context import CracWorkaroundContext from rao.parameters.manager import RaoSettingsManager from rao.parameters.manager import LoadflowSettingsManager from rao.optimizer import Optimizer @@ -241,6 +242,21 @@ def handle(self, message: bytes, properties: object, **kwargs): buffer=network_object, parameters=lf_settings_manager.config['CGMES_IMPORT_PARAMETERS']) + # TODO Temporary: Replace 3w transformers with 3 x 2w transformers in the network + three_w_trafos = self.network.get_3_windings_transformers() + # Replace only 3w transformers that are XNEs + three_w_trafos_to_replace = three_w_trafos.index[three_w_trafos["rated_u1"] >= 330].tolist() + pypowsybl.network.replace_3_windings_transformers_with_3_2_windings_transformers(self.network, three_w_trafos_to_replace) + logger.info("[TEMPORARY] Replaced 3w transformers with 3 x 2w transformers in the network") + two_w_trafos = self.network.get_2_windings_transformers() + replaced_ids = set(three_w_trafos_to_replace) + replaced_3w_trafos = two_w_trafos[two_w_trafos.index.str.split("-Leg", n=1).str[0].isin(replaced_ids)] + + workaround_ctx = CracWorkaroundContext( + enable_3w_trafo_replacement=True, + replaced_3w_trafos=replaced_3w_trafos + ) + # Solve initial loadflow on retrieved model logger.info(f"Solve initial loadflow analysis") lf_result = pypowsybl.loadflow.run_ac( @@ -266,7 +282,7 @@ def handle(self, message: bytes, properties: object, **kwargs): # Create CRAC service logger.info(f"Loading network to triplets for CRAC service") network_triplets = pd.read_RDF(network_object) - crac_service = CracBuilder(data=input_files_data, network=network_triplets) + crac_service = CracBuilder(data=input_files_data, network=network_triplets, workaround=workaround_ctx) crac_service.get_limits() # get limits from model and store in CRAC service object # Group by contingency id @@ -280,7 +296,7 @@ def handle(self, message: bytes, properties: object, **kwargs): self.crac = crac_service.build_crac(contingency_ids=[mrid]) # For debugging - with open("test-crac.json", "w") as f: + with open("crac-3w-test_v2.json", "w") as f: json.dump(self.crac, f, ensure_ascii=False, indent=4) # Store built CRAC files in S3 storage @@ -362,9 +378,9 @@ def handle(self, message: bytes, properties: object, **kwargs): "sender": "TSOX", "senderApplication": "APPX", "service": "INPUT-DATA", - "scenario-time": datetime(2025, 7, 22, 5, 30), + "scenario-time": datetime(2026, 2, 13, 8, 30), "time-horizon": "ID", - "content-reference": "EMFOS/RMM/RMM_1D_001_20250722T0530Z_BA_ce84d8cf-6ae2-4237-9ab9-34838dcff6b8.zip", + "content-reference": "EMFOS/RMM/RMM_09_001_20260213T0730Z_BA_6318372a-1952-494a-92c9-61d541483cac.zip", } properties = BasicProperties( content_type='application/octet-stream', @@ -374,7 +390,7 @@ def handle(self, message: bytes, properties: object, **kwargs): timestamp=1747208205, headers=headers, ) - with open(r"C:\Users\martynas.karobcikas\Documents\Python projects\RAO\test-data\SAR_20250609T1230_1D_1.xml", "rb") as file: + with open(r"C:\Users\lukas.navickas\Documents\test_data_rao\SAR_20260213T0830_ID_1.xml", "rb") as file: file_bytes = file.read() # Create instance