Skip to content

Commit

Permalink
Merge pull request #201 from PowerGridModel/bugfix/fix-security-vulne…
Browse files Browse the repository at this point in the history
…rability

Bugfix/fix potential security vulnerability
  • Loading branch information
mgovers authored Oct 10, 2023
2 parents eca9024 + 6076ae6 commit d3b97cb
Show file tree
Hide file tree
Showing 7 changed files with 358 additions and 220 deletions.
24 changes: 10 additions & 14 deletions src/power_grid_model_io/converters/pandapower_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from power_grid_model_io.converters.base_converter import BaseConverter
from power_grid_model_io.data_types import ExtraInfo
from power_grid_model_io.functions import get_winding
from power_grid_model_io.utils.regex import NODE_REF_RE, TRAFO3_CONNECTION_RE, TRAFO_CONNECTION_RE
from power_grid_model_io.utils.parsing import is_node_ref, parse_trafo3_connection, parse_trafo_connection

PandaPowerData = MutableMapping[str, pd.DataFrame]

Expand Down Expand Up @@ -170,7 +170,7 @@ def _fill_pgm_extra_info(self, extra_info: ExtraInfo):
extra_cols = ["i_n"]
for component_data in self.pgm_input_data.values():
for attr_name in component_data.dtype.names:
if not NODE_REF_RE.fullmatch(attr_name) and attr_name not in extra_cols:
if not is_node_ref(attr_name) and attr_name not in extra_cols:
continue
for pgm_id, node_id in component_data[["id", attr_name]]:
if pgm_id not in extra_info:
Expand Down Expand Up @@ -248,7 +248,7 @@ def _extra_info_to_pgm_input_data(self, extra_info: ExtraInfo): # pylint: disab
all_other_cols = ["i_n"]
for component, data in self.pgm_output_data.items():
input_cols = power_grid_meta_data["input"][component].dtype.names
node_cols = [col for col in input_cols if NODE_REF_RE.fullmatch(col)]
node_cols = [col for col in input_cols if is_node_ref(col)]
other_cols = [col for col in input_cols if col in all_other_cols]
if not node_cols + other_cols:
continue
Expand Down Expand Up @@ -2290,11 +2290,9 @@ def get_trafo_winding_types(self) -> pd.DataFrame:

@lru_cache
def vector_group_to_winding_types(vector_group: str) -> pd.Series:
match = TRAFO_CONNECTION_RE.fullmatch(vector_group)
if not match:
raise ValueError(f"Invalid transformer connection string: '{vector_group}'")
winding_from = get_winding(match.group(1)).value
winding_to = get_winding(match.group(2)).value
trafo_connection = parse_trafo_connection(vector_group)
winding_from = get_winding(trafo_connection["winding_from"]).value
winding_to = get_winding(trafo_connection["winding_to"]).value
return pd.Series([winding_from, winding_to])

trafo = self.pp_input_data["trafo"]
Expand All @@ -2315,12 +2313,10 @@ def get_trafo3w_winding_types(self) -> pd.DataFrame:

@lru_cache
def vector_group_to_winding_types(vector_group: str) -> pd.Series:
match = TRAFO3_CONNECTION_RE.fullmatch(vector_group)
if not match:
raise ValueError(f"Invalid transformer connection string: '{vector_group}'")
winding_1 = get_winding(match.group(1)).value
winding_2 = get_winding(match.group(2)).value
winding_3 = get_winding(match.group(4)).value
trafo_connection = parse_trafo3_connection(vector_group)
winding_1 = get_winding(trafo_connection["winding_1"]).value
winding_2 = get_winding(trafo_connection["winding_2"]).value
winding_3 = get_winding(trafo_connection["winding_3"]).value
return pd.Series([winding_1, winding_2, winding_3])

trafo3w = self.pp_input_data["trafo3w"]
Expand Down
125 changes: 34 additions & 91 deletions src/power_grid_model_io/functions/phase_to_phase.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@
"""

import math
from typing import Tuple

import structlog
from power_grid_model import WindingType

from power_grid_model_io.functions import get_winding
from power_grid_model_io.utils.regex import PVS_EFFICIENCY_TYPE_RE, TRAFO3_CONNECTION_RE, TRAFO_CONNECTION_RE
from power_grid_model_io.utils.parsing import parse_pvs_efficiency_type, parse_trafo3_connection, parse_trafo_connection

_LOG = structlog.get_logger(__file__)

Expand Down Expand Up @@ -76,68 +75,36 @@ def power_wind_speed( # pylint: disable=too-many-arguments
return 0.0


def get_winding_from(conn_str: str, neutral_grounding: bool = True) -> WindingType:
"""
Get the winding type, based on a textual encoding of the conn_str
"""
winding_from, _, _ = _split_connection_string(conn_str)
return get_winding(winding=winding_from, neutral_grounding=neutral_grounding)


def get_winding_to(conn_str: str, neutral_grounding: bool = True) -> WindingType:
"""
Get the winding type, based on a textual encoding of the conn_str
"""
_, winding_to, _ = _split_connection_string(conn_str)
return get_winding(winding=winding_to, neutral_grounding=neutral_grounding)


def get_winding_1(conn_str: str, neutral_grounding: bool = True) -> WindingType:
"""
Get the winding type, based on a textual encoding of the conn_str
"""
winding_1, _, _, _, _ = _split_connection_string_3w(conn_str)
return get_winding(winding=winding_1, neutral_grounding=neutral_grounding)


def get_winding_2(conn_str: str, neutral_grounding: bool = True) -> WindingType:
"""
Get the winding type, based on a textual encoding of the conn_str
"""
_, winding_2, _, _, _ = _split_connection_string_3w(conn_str)
return get_winding(winding=winding_2, neutral_grounding=neutral_grounding)
def _get_winding(trafo_connection_parser, winding_ref: str):
def _get_winding_impl(conn_str: str, neutral_grounding: bool = True) -> WindingType:
"""
Get the winding type, based on a textual encoding of the conn_str
"""
return get_winding(trafo_connection_parser(conn_str)[winding_ref], neutral_grounding=neutral_grounding)

return _get_winding_impl

def get_winding_3(conn_str: str, neutral_grounding: bool = True) -> WindingType:
"""
Get the winding type, based on a textual encoding of the conn_str
"""
_, _, _, winding_3, _ = _split_connection_string_3w(conn_str)
return get_winding(winding=winding_3, neutral_grounding=neutral_grounding)

def _get_clock(trafo_connection_parser, clock_ref: str):
def _get_clock_impl(conn_str: str) -> int:
"""
Extract the clock part of the conn_str
"""
return int(trafo_connection_parser(conn_str)[clock_ref])

def get_clock(conn_str: str) -> int:
"""
Extract the clock part of the conn_str
"""
_, _, clock = _split_connection_string(conn_str)
return clock
return _get_clock_impl


def get_clock_12(conn_str: str) -> int:
"""
Extract the clock part of the conn_str
"""
_, _, clock_12, _, _ = _split_connection_string_3w(conn_str)
return clock_12
get_winding_from = _get_winding(parse_trafo_connection, "winding_from")
get_winding_to = _get_winding(parse_trafo_connection, "winding_to")
get_winding_1 = _get_winding(parse_trafo3_connection, "winding_1")
get_winding_2 = _get_winding(parse_trafo3_connection, "winding_2")
get_winding_3 = _get_winding(parse_trafo3_connection, "winding_3")


def get_clock_13(conn_str: str) -> int:
"""
Extract the clock part of the conn_str
"""
_, _, _, _, clock_13 = _split_connection_string_3w(conn_str)
return clock_13
get_clock = _get_clock(parse_trafo_connection, "clock")
get_clock_12 = _get_clock(parse_trafo3_connection, "clock_12")
get_clock_13 = _get_clock(parse_trafo3_connection, "clock_13")


def reactive_power_to_susceptance(q: float, u_nom: float) -> float:
Expand All @@ -147,43 +114,19 @@ def reactive_power_to_susceptance(q: float, u_nom: float) -> float:
return q / u_nom / u_nom


def _split_connection_string(conn_str: str) -> Tuple[str, str, int]:
"""
Helper function to split the conn_str into three parts:
* winding_from
* winding_to
* clock
"""
match = TRAFO_CONNECTION_RE.fullmatch(conn_str)
if not match:
raise ValueError(f"Invalid transformer connection string: '{conn_str}'")
return match.group(1), match.group(2), int(match.group(3))


def _split_connection_string_3w(conn_str: str) -> Tuple[str, str, int, str, int]:
"""
Helper function to split the conn_str into three parts:
* winding_1
* winding_2
* clock 12
* winding_3
* clock 13
"""
match = TRAFO3_CONNECTION_RE.fullmatch(conn_str)
if not match:
raise ValueError(f"Invalid three winding transformer connection string: '{conn_str}'")
return match.group(1), match.group(2), int(match.group(3)), match.group(4), int(match.group(5))


def pvs_power_adjustment(p: float, efficiency_type: str) -> float:
"""
Adjust power of PV for the default efficiency type of 97% or 95%. Defaults to 100 % for other custom types
"""
match = PVS_EFFICIENCY_TYPE_RE.search(efficiency_type)
if match is not None:
_LOG.warning("PV approximation applied for efficiency type", efficiency_type=efficiency_type)
if match.group(1) == "97":
return p * 0.97
if match.group(1) == "95":
return p * 0.95
try:
pvs_efficiency_type = parse_pvs_efficiency_type(efficiency_type)
except ValueError:
return p

_LOG.warning("PV approximation applied for efficiency type", efficiency_type=efficiency_type)
if pvs_efficiency_type == "97":
return p * 0.97
if pvs_efficiency_type == "95":
return p * 0.95

return p
167 changes: 167 additions & 0 deletions src/power_grid_model_io/utils/parsing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
# SPDX-FileCopyrightText: 2022 Contributors to the Power Grid Model project <[email protected]>
#
# SPDX-License-Identifier: MPL-2.0
"""
General regular expressions
"""

import re
from typing import Dict

_TRAFO_CONNECTION_RE = re.compile(r"^(Y|YN|D|Z|ZN)(y|yn|d|z|zn)(\d|1[0-2])?$")


def parse_trafo_connection(string: str) -> Dict[str, str]:
r"""Parse a trafo connection string.
Matches the following regular expression to the winding_from and winding_to codes.
Optionally checks the clock number:
^ Start of the string
(Y|YN|D|Z|ZN) From winding type
(y|yn|d|z|zn) To winding type
(\d|1[0-2])? Optional clock number (0-12)
$ End of the string
Args:
string (str): The input string.
Raises:
ValueError: If the input is not a trafo connection string.
Returns:
Dict[str, str]: The parameters of the trafo connection.
"""
match = _TRAFO_CONNECTION_RE.fullmatch(string)
if not match:
raise ValueError(f"Invalid transformer connection string: '{string}'")

return {"winding_from": match.group(1), "winding_to": match.group(2), "clock": match.group(3)}


_TRAFO3_CONNECTION_RE = re.compile(r"^(Y|YN|D|Z|ZN)(y|yn|d|z|zn)(\d|1[0-2])?(y|yn|d|z|zn)(\d|1[0-2])?$")


def parse_trafo3_connection(string: str) -> Dict[str, str]:
r"""Parse a trafo connection string.
Matches the following regular expression to the winding_1, winding_2 and winding_3 codes.
Optionally checks the clock numbers:
^ Start of the string
(Y|YN|D|Z|ZN) First winding type
(y|yn|d|z|zn) Second winding type
(\d|1[0-2]) Clock number (0-12)
(y|yn|d|z|zn) Third winding type
(\d|1[0-2]) Clock number (0-12)
$ End of the string
Args:
string (str): The input string.
Raises:
ValueError: If the input is not a trafo connection string.
Returns:
Dict[str, str]: The parameters of the trafo connection.
"""
match = _TRAFO3_CONNECTION_RE.fullmatch(string)
if not match:
raise ValueError(f"Invalid three winding transformer connection string: '{string}'")

return {
"winding_1": match.group(1),
"winding_2": match.group(2),
"winding_3": match.group(4),
"clock_12": match.group(3),
"clock_13": match.group(5),
}


def parse_node_ref(string: str) -> Dict[str, str]:
"""Parse a node reference string.
Matches if the input is the word 'node' with an optional prefix or suffix. E.g.:
- node
- from_node
- node_1
Args:
string (str): The input string.
Raises:
ValueError: If the input string is not a node reference.
Returns:
Optional[Dict[str, str]]: The prefix and suffix (may be empty).
"""

def _raise():
raise ValueError(f"Invalid node reference string: '{string}'")

if "node" not in string:
_raise()

prefix_and_suffix = string.split("node")
if len(prefix_and_suffix) != 2:
_raise()

prefix, suffix = prefix_and_suffix
if prefix and not prefix.endswith("_"):
_raise()
if suffix and not suffix.startswith("_"):
_raise()

return {"prefix": prefix, "suffix": suffix}


def is_node_ref(string: str) -> bool:
"""Return True if the string represents a node reference, else False.
Like parse_node_ref, but without exceptions and result data.
Args:
string (str): The input string.
Returns:
bool: True if the string represents a node reference, else False.
"""
try:
parse_node_ref(string)
return True
except ValueError:
return False


_PVS_EFFICIENCY_TYPE_RE = re.compile(r"[ ,.]1 pu: (95|97) %")


def parse_pvs_efficiency_type(string: str) -> str:
r"""Parse a PVS efficiency type string.
Matches the following regex to the efficiency type percentage at 1 pu.
1 pu After 1 pu '1 pu:'
(95|97) 95 or 97 % type
% before '%'
E.g.:
- 0,1 pu: 93 %; 1 pu: 97 %
- 0,1..1 pu: 95 %
Args:
string (str): The input string.
Raises:
ValueError: If the input string is not a PVS efficiency type.
Returns:
Optional[str]: The efficiency type percentage string at 1 pu.
"""
match = _PVS_EFFICIENCY_TYPE_RE.search(string)
if not match:
raise ValueError(f"Invalid PVS efficiency type string: '{string}'")

return match.group(1)
Loading

0 comments on commit d3b97cb

Please sign in to comment.