Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix/fix potential security vulnerability #201

Merged
merged 7 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 10 additions & 14 deletions src/power_grid_model_io/converters/pandapower_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from power_grid_model_io.converters.base_converter import BaseConverter
from power_grid_model_io.data_types import ExtraInfo
from power_grid_model_io.functions import get_winding
from power_grid_model_io.utils.regex import NODE_REF_RE, TRAFO3_CONNECTION_RE, TRAFO_CONNECTION_RE
from power_grid_model_io.utils.parsing import is_node_ref, parse_trafo3_connection, parse_trafo_connection

PandaPowerData = MutableMapping[str, pd.DataFrame]

Expand Down Expand Up @@ -170,7 +170,7 @@ def _fill_pgm_extra_info(self, extra_info: ExtraInfo):
extra_cols = ["i_n"]
for component_data in self.pgm_input_data.values():
for attr_name in component_data.dtype.names:
if not NODE_REF_RE.fullmatch(attr_name) and attr_name not in extra_cols:
if not is_node_ref(attr_name) and attr_name not in extra_cols:
continue
for pgm_id, node_id in component_data[["id", attr_name]]:
if pgm_id not in extra_info:
Expand Down Expand Up @@ -248,7 +248,7 @@ def _extra_info_to_pgm_input_data(self, extra_info: ExtraInfo): # pylint: disab
all_other_cols = ["i_n"]
for component, data in self.pgm_output_data.items():
input_cols = power_grid_meta_data["input"][component].dtype.names
node_cols = [col for col in input_cols if NODE_REF_RE.fullmatch(col)]
node_cols = [col for col in input_cols if is_node_ref(col)]
other_cols = [col for col in input_cols if col in all_other_cols]
if not node_cols + other_cols:
continue
Expand Down Expand Up @@ -2290,11 +2290,9 @@ def get_trafo_winding_types(self) -> pd.DataFrame:

@lru_cache
def vector_group_to_winding_types(vector_group: str) -> pd.Series:
match = TRAFO_CONNECTION_RE.fullmatch(vector_group)
if not match:
raise ValueError(f"Invalid transformer connection string: '{vector_group}'")
winding_from = get_winding(match.group(1)).value
winding_to = get_winding(match.group(2)).value
trafo_connection = parse_trafo_connection(vector_group)
winding_from = get_winding(trafo_connection["winding_from"]).value
winding_to = get_winding(trafo_connection["winding_to"]).value
return pd.Series([winding_from, winding_to])

trafo = self.pp_input_data["trafo"]
Expand All @@ -2315,12 +2313,10 @@ def get_trafo3w_winding_types(self) -> pd.DataFrame:

@lru_cache
def vector_group_to_winding_types(vector_group: str) -> pd.Series:
match = TRAFO3_CONNECTION_RE.fullmatch(vector_group)
if not match:
raise ValueError(f"Invalid transformer connection string: '{vector_group}'")
winding_1 = get_winding(match.group(1)).value
winding_2 = get_winding(match.group(2)).value
winding_3 = get_winding(match.group(4)).value
trafo_connection = parse_trafo3_connection(vector_group)
winding_1 = get_winding(trafo_connection["winding_1"]).value
winding_2 = get_winding(trafo_connection["winding_2"]).value
winding_3 = get_winding(trafo_connection["winding_3"]).value
return pd.Series([winding_1, winding_2, winding_3])

trafo3w = self.pp_input_data["trafo3w"]
Expand Down
125 changes: 34 additions & 91 deletions src/power_grid_model_io/functions/phase_to_phase.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@
"""

import math
from typing import Tuple

import structlog
from power_grid_model import WindingType

from power_grid_model_io.functions import get_winding
from power_grid_model_io.utils.regex import PVS_EFFICIENCY_TYPE_RE, TRAFO3_CONNECTION_RE, TRAFO_CONNECTION_RE
from power_grid_model_io.utils.parsing import parse_pvs_efficiency_type, parse_trafo3_connection, parse_trafo_connection

_LOG = structlog.get_logger(__file__)

Expand Down Expand Up @@ -76,68 +75,36 @@ def power_wind_speed( # pylint: disable=too-many-arguments
return 0.0


def get_winding_from(conn_str: str, neutral_grounding: bool = True) -> WindingType:
"""
Get the winding type, based on a textual encoding of the conn_str
"""
winding_from, _, _ = _split_connection_string(conn_str)
return get_winding(winding=winding_from, neutral_grounding=neutral_grounding)


def get_winding_to(conn_str: str, neutral_grounding: bool = True) -> WindingType:
"""
Get the winding type, based on a textual encoding of the conn_str
"""
_, winding_to, _ = _split_connection_string(conn_str)
return get_winding(winding=winding_to, neutral_grounding=neutral_grounding)


def get_winding_1(conn_str: str, neutral_grounding: bool = True) -> WindingType:
"""
Get the winding type, based on a textual encoding of the conn_str
"""
winding_1, _, _, _, _ = _split_connection_string_3w(conn_str)
return get_winding(winding=winding_1, neutral_grounding=neutral_grounding)


def get_winding_2(conn_str: str, neutral_grounding: bool = True) -> WindingType:
"""
Get the winding type, based on a textual encoding of the conn_str
"""
_, winding_2, _, _, _ = _split_connection_string_3w(conn_str)
return get_winding(winding=winding_2, neutral_grounding=neutral_grounding)
def _get_winding(trafo_connection_parser, winding_ref: str):
def _get_winding_impl(conn_str: str, neutral_grounding: bool = True) -> WindingType:
"""
Get the winding type, based on a textual encoding of the conn_str
"""
return get_winding(trafo_connection_parser(conn_str)[winding_ref], neutral_grounding=neutral_grounding)

return _get_winding_impl

def get_winding_3(conn_str: str, neutral_grounding: bool = True) -> WindingType:
"""
Get the winding type, based on a textual encoding of the conn_str
"""
_, _, _, winding_3, _ = _split_connection_string_3w(conn_str)
return get_winding(winding=winding_3, neutral_grounding=neutral_grounding)

def _get_clock(trafo_connection_parser, clock_ref: str):
def _get_clock_impl(conn_str: str) -> int:
"""
Extract the clock part of the conn_str
"""
return int(trafo_connection_parser(conn_str)[clock_ref])

def get_clock(conn_str: str) -> int:
"""
Extract the clock part of the conn_str
"""
_, _, clock = _split_connection_string(conn_str)
return clock
return _get_clock_impl


def get_clock_12(conn_str: str) -> int:
"""
Extract the clock part of the conn_str
"""
_, _, clock_12, _, _ = _split_connection_string_3w(conn_str)
return clock_12
get_winding_from = _get_winding(parse_trafo_connection, "winding_from")
get_winding_to = _get_winding(parse_trafo_connection, "winding_to")
get_winding_1 = _get_winding(parse_trafo3_connection, "winding_1")
get_winding_2 = _get_winding(parse_trafo3_connection, "winding_2")
get_winding_3 = _get_winding(parse_trafo3_connection, "winding_3")


def get_clock_13(conn_str: str) -> int:
"""
Extract the clock part of the conn_str
"""
_, _, _, _, clock_13 = _split_connection_string_3w(conn_str)
return clock_13
get_clock = _get_clock(parse_trafo_connection, "clock")
get_clock_12 = _get_clock(parse_trafo3_connection, "clock_12")
get_clock_13 = _get_clock(parse_trafo3_connection, "clock_13")


def reactive_power_to_susceptance(q: float, u_nom: float) -> float:
Expand All @@ -147,43 +114,19 @@ def reactive_power_to_susceptance(q: float, u_nom: float) -> float:
return q / u_nom / u_nom


def _split_connection_string(conn_str: str) -> Tuple[str, str, int]:
"""
Helper function to split the conn_str into three parts:
* winding_from
* winding_to
* clock
"""
match = TRAFO_CONNECTION_RE.fullmatch(conn_str)
if not match:
raise ValueError(f"Invalid transformer connection string: '{conn_str}'")
return match.group(1), match.group(2), int(match.group(3))


def _split_connection_string_3w(conn_str: str) -> Tuple[str, str, int, str, int]:
"""
Helper function to split the conn_str into three parts:
* winding_1
* winding_2
* clock 12
* winding_3
* clock 13
"""
match = TRAFO3_CONNECTION_RE.fullmatch(conn_str)
if not match:
raise ValueError(f"Invalid three winding transformer connection string: '{conn_str}'")
return match.group(1), match.group(2), int(match.group(3)), match.group(4), int(match.group(5))


def pvs_power_adjustment(p: float, efficiency_type: str) -> float:
"""
Adjust power of PV for the default efficiency type of 97% or 95%. Defaults to 100 % for other custom types
"""
match = PVS_EFFICIENCY_TYPE_RE.search(efficiency_type)
if match is not None:
_LOG.warning("PV approximation applied for efficiency type", efficiency_type=efficiency_type)
if match.group(1) == "97":
return p * 0.97
if match.group(1) == "95":
return p * 0.95
try:
pvs_efficiency_type = parse_pvs_efficiency_type(efficiency_type)
except ValueError:
return p

_LOG.warning("PV approximation applied for efficiency type", efficiency_type=efficiency_type)
if pvs_efficiency_type == "97":
return p * 0.97
if pvs_efficiency_type == "95":
return p * 0.95

return p
167 changes: 167 additions & 0 deletions src/power_grid_model_io/utils/parsing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
# SPDX-FileCopyrightText: 2022 Contributors to the Power Grid Model project <[email protected]>
#
# SPDX-License-Identifier: MPL-2.0
"""
General regular expressions
"""

import re
from typing import Dict

_TRAFO_CONNECTION_RE = re.compile(r"^(Y|YN|D|Z|ZN)(y|yn|d|z|zn)(\d|1[0-2])?$")


def parse_trafo_connection(string: str) -> Dict[str, str]:
r"""Parse a trafo connection string.

Matches the following regular expression to the winding_from and winding_to codes.
Optionally checks the clock number:

^ Start of the string
(Y|YN|D|Z|ZN) From winding type
(y|yn|d|z|zn) To winding type
(\d|1[0-2])? Optional clock number (0-12)
$ End of the string

Args:
string (str): The input string.

Raises:
ValueError: If the input is not a trafo connection string.

Returns:
Dict[str, str]: The parameters of the trafo connection.
"""
match = _TRAFO_CONNECTION_RE.fullmatch(string)
if not match:
raise ValueError(f"Invalid transformer connection string: '{string}'")

return {"winding_from": match.group(1), "winding_to": match.group(2), "clock": match.group(3)}


_TRAFO3_CONNECTION_RE = re.compile(r"^(Y|YN|D|Z|ZN)(y|yn|d|z|zn)(\d|1[0-2])?(y|yn|d|z|zn)(\d|1[0-2])?$")


def parse_trafo3_connection(string: str) -> Dict[str, str]:
r"""Parse a trafo connection string.

Matches the following regular expression to the winding_1, winding_2 and winding_3 codes.
Optionally checks the clock numbers:

^ Start of the string
(Y|YN|D|Z|ZN) First winding type
(y|yn|d|z|zn) Second winding type
(\d|1[0-2]) Clock number (0-12)
(y|yn|d|z|zn) Third winding type
(\d|1[0-2]) Clock number (0-12)
$ End of the string

Args:
string (str): The input string.

Raises:
ValueError: If the input is not a trafo connection string.

Returns:
Dict[str, str]: The parameters of the trafo connection.
"""
match = _TRAFO3_CONNECTION_RE.fullmatch(string)
if not match:
raise ValueError(f"Invalid three winding transformer connection string: '{string}'")

return {
"winding_1": match.group(1),
"winding_2": match.group(2),
"winding_3": match.group(4),
"clock_12": match.group(3),
"clock_13": match.group(5),
}


def parse_node_ref(string: str) -> Dict[str, str]:
"""Parse a node reference string.

Matches if the input is the word 'node' with an optional prefix or suffix. E.g.:

- node
- from_node
- node_1

Args:
string (str): The input string.

Raises:
ValueError: If the input string is not a node reference.

Returns:
Optional[Dict[str, str]]: The prefix and suffix (may be empty).
"""

def _raise():
raise ValueError(f"Invalid node reference string: '{string}'")

if "node" not in string:
_raise()

prefix_and_suffix = string.split("node")
if len(prefix_and_suffix) != 2:
_raise()

prefix, suffix = prefix_and_suffix
if prefix and not prefix.endswith("_"):
_raise()
if suffix and not suffix.startswith("_"):
_raise()

return {"prefix": prefix, "suffix": suffix}


def is_node_ref(string: str) -> bool:
"""Return True if the string represents a node reference, else False.

Like parse_node_ref, but without exceptions and result data.

Args:
string (str): The input string.

Returns:
bool: True if the string represents a node reference, else False.
"""
try:
parse_node_ref(string)
return True
except ValueError:
return False


_PVS_EFFICIENCY_TYPE_RE = re.compile(r"[ ,.]1 pu: (95|97) %")


def parse_pvs_efficiency_type(string: str) -> str:
r"""Parse a PVS efficiency type string.

Matches the following regex to the efficiency type percentage at 1 pu.

1 pu After 1 pu '1 pu:'
(95|97) 95 or 97 % type
% before '%'

E.g.:

- 0,1 pu: 93 %; 1 pu: 97 %
- 0,1..1 pu: 95 %

Args:
string (str): The input string.

Raises:
ValueError: If the input string is not a PVS efficiency type.

Returns:
Optional[str]: The efficiency type percentage string at 1 pu.
"""
match = _PVS_EFFICIENCY_TYPE_RE.search(string)
if not match:
raise ValueError(f"Invalid PVS efficiency type string: '{string}'")

return match.group(1)
Loading