Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature / Ambiguity handling Vision exported Excels #261

Merged
merged 14 commits into from
Jul 5, 2024
11 changes: 6 additions & 5 deletions src/power_grid_model_io/data_stores/excel_file_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ def _handle_duplicate_columns(self, data: pd.DataFrame, sheet_name: str) -> pd.D
new_name=new_name,
col_idx=col_idx,
)
columns[col_idx] = new_name
new_name = new_name[0] if isinstance(new_name, tuple) else new_name
columns[col_idx] = (new_name, columns[col_idx][1])
Jerry-Jinfeng-Guo marked this conversation as resolved.
Show resolved Hide resolved

if data.columns.nlevels == 1:
data.columns = pd.Index(columns)
Expand Down Expand Up @@ -218,7 +219,7 @@ def _check_duplicate_values(self, sheet_name: str, data: pd.DataFrame) -> Dict[i
if isinstance(col_name, tuple):
to_rename[dup_idx] = (f"{col_name[0]}_{counter}",) + col_name[1:]
else:
to_rename[dup_idx] = f"{col_name[0]}_{counter}"
to_rename[dup_idx] = f"{col_name}_{counter}"

return to_rename

Expand Down Expand Up @@ -253,7 +254,7 @@ def _group_columns_by_index(data: pd.DataFrame) -> Dict[Union[str, Tuple[str, ..
grouped: Dict[Union[str, Tuple[str, ...]], Set[int]] = {}
columns = data.columns.values
for col_idx, col_name in enumerate(columns):
if col_name not in grouped:
grouped[col_name] = set()
grouped[col_name].add(col_idx)
if col_name[0] not in grouped:
Jerry-Jinfeng-Guo marked this conversation as resolved.
Show resolved Hide resolved
grouped[col_name[0]] = set()
grouped[col_name[0]].add(col_idx)
Jerry-Jinfeng-Guo marked this conversation as resolved.
Show resolved Hide resolved
return grouped
2 changes: 1 addition & 1 deletion src/power_grid_model_io/functions/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def exclude_empty(row: pd.Series, col: str) -> bool:
"""
filter out empty
"""
if col not in row:
if col not in row:
raise ValueError(f"The column: '{col}' cannot be found for the filter")
result = has_value(row[col])
if isinstance(result, pd.Series):
Expand Down
155 changes: 155 additions & 0 deletions src/power_grid_model_io/utils/excel_ambiguity_checker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# SPDX-FileCopyrightText: Contributors to the Power Grid Model project <[email protected]>
#
# SPDX-License-Identifier: MPL-2.0
"""
This module provides the ExcelAmbiguityChecker class, which is designed to identify and report ambiguous column names
within the sheets of an Excel (.xlsx) file. It parses the Excel file, extracts the names of columns from a specified
row across all sheets, and checks for any duplicates within those names to flag them as ambiguous.

Usage:
checker = ExcelAmbiguityChecker(file_path='path/to/excel/file.xlsx', column_name_in_row=0)
has_ambiguity, ambiguous_columns = checker.check_ambiguity()
if has_ambiguity:
print("Ambiguous column names found:", ambiguous_columns)
else:
print("No ambiguous column names found.")

Requirements:
- Python 3.6 or higher
Jerry-Jinfeng-Guo marked this conversation as resolved.
Show resolved Hide resolved
- xml.etree.ElementTree for parsing XML structures within the Excel file.
- zipfile to handle the Excel file as a ZIP archive for parsing.
"""
import os
import xml.etree.ElementTree as ET
import zipfile
from collections import Counter
from typing import Dict, List, Optional, Tuple

XML_NAME_SPACE = {"": "http://schemas.openxmlformats.org/spreadsheetml/2006/main"} # NOSONAR


class ExcelAmbiguityChecker:
"""
A class to check for ambiguous column names within the sheets of an Excel (.xlsx) file.

Attributes:
_file_path (str): The path to the Excel file to be checked.
_col_name_in_row (int): The row index (0-based) where column names are expected. Default is 0.
sheets (dict): A dictionary storing sheet names as keys and lists of column names as values.

Methods:
__init__(self, file_path, column_name_in_row=0): Initializes the ExcelAmbiguityChecker instance.
_parse_zip(self, zip_file): Parses the shared strings XML file within the Excel ZIP archive.
_get_column_names_from_row(self, row, shared_strings): Extracts column names from a specified row.
_parse_excel_file(self): Parses the Excel file to extract sheet names and their corresponding column names.
"""

def __init__(self, file_path, column_name_in_row=0) -> None:
"""
Initializes the ExcelAmbiguityChecker with the path to an Excel file and the row index for column names.

Parameters:
file_path (str): The path to the Excel file.
column_name_in_row (int): The row index (0-based) where column names are expected. Default is 0.
"""
self._valid_file = file_path.endswith(".xlsx") and os.path.exists(file_path)
if self._valid_file:
self._file_path = file_path
self._col_name_in_row = column_name_in_row
self.sheets: Dict[str, List[str]] = {}
self._parse_excel_file()

def _parse_zip(self, zip_file) -> List[Optional[str]]:
"""
Parses the shared strings XML file within the Excel ZIP archive to extract all shared strings.

Parameters:
zip_file (zipfile.ZipFile): The opened Excel ZIP file.

Returns:
list: A list of shared strings used in the Excel file.
"""
shared_strings_path = "xl/sharedStrings.xml"
shared_strings = []
with zip_file.open(shared_strings_path) as f:
tree = ET.parse(f)
for si in tree.findall(".//t", namespaces=XML_NAME_SPACE):
shared_strings.append(si.text)
return shared_strings

def _get_column_names_from_row(self, row, shared_strings) -> List[Optional[str]]:
"""
Extracts column names from a specified row using shared strings for strings stored in the shared string table.

Parameters:
row (xml.etree.ElementTree.Element): The XML element representing the row.
shared_strings (list): A list of shared strings extracted from the Excel file.

Returns:
list: A list of column names found in the row.
"""
column_names = []
for c in row.findall(".//c", namespaces=XML_NAME_SPACE):
cell_type = c.get("t")
value = c.find(".//v", namespaces=XML_NAME_SPACE)
if cell_type == "s" and value is not None:
Jerry-Jinfeng-Guo marked this conversation as resolved.
Show resolved Hide resolved
column_names.append(shared_strings[int(value.text)])
elif value is not None:
column_names.append(value.text)
else:
column_names.append(None)
return column_names

def _parse_excel_file(self) -> None:
"""
Parses the Excel file to extract sheet names and their corresponding column names.
"""
with zipfile.ZipFile(self._file_path) as z:
shared_strings = self._parse_zip(z)
workbook_xml = z.read("xl/workbook.xml")
xml_tree = ET.fromstring(workbook_xml)
sheets = xml_tree.findall(".//sheet", namespaces=XML_NAME_SPACE)

for index, sheet in enumerate(sheets, start=1):
sheet_name = str(sheet.get("name"))
sheet_file_path = f"xl/worksheets/sheet{index}.xml"

with z.open(sheet_file_path) as f:
sheet_tree = ET.parse(f)
rows = sheet_tree.findall(".//row", namespaces=XML_NAME_SPACE)
if rows:
column_names = self._get_column_names_from_row(rows[self._col_name_in_row], shared_strings)
self.sheets[sheet_name] = [name for name in column_names if name is not None]

def list_sheets(self) -> List[str]:
"""
Get the list of all sheet names in the Excel file.

Returns:
List[str]: list of all sheet names
"""
return list(self.sheets.keys())

def check_ambiguity(self) -> Tuple[bool, Dict[str, List[str]]]:
"""
Check if there is ambiguity in column names across sheets.

Returns:
Tuple[bool, Dict[str, List[str]]]: A tuple containing a boolean indicating if any ambiguity was found,
and a dictionary with sheet names as keys and lists of ambiguous column names as values.
"""
res: Dict[str, List[str]] = {}
if not self._valid_file:
return False, res
for sheet_name, column_names in self.sheets.items():
column_name_counts = Counter(column_names)
duplicates = [name for name, count in column_name_counts.items() if count > 1]
if duplicates:
res[sheet_name] = duplicates
return bool(res), res


# Example usage
if __name__ == "__main__":
excel_file_checker = ExcelAmbiguityChecker("data.xlsx")
Jerry-Jinfeng-Guo marked this conversation as resolved.
Show resolved Hide resolved
excel_file_checker.check_ambiguity()
Binary file added tests/data/vision/data.xlsx
Binary file not shown.
3 changes: 3 additions & 0 deletions tests/data/vision/data.xlsx.license
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
SPDX-FileCopyrightText: Contributors to the Power Grid Model project <[email protected]>

SPDX-License-Identifier: MPL-2.0
8 changes: 8 additions & 0 deletions tests/unit/converters/test_vision_excel_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pytest

from power_grid_model_io.converters.vision_excel_converter import DEFAULT_MAPPING_FILE, VisionExcelConverter
from power_grid_model_io.utils.excel_ambiguity_checker import ExcelAmbiguityChecker


@pytest.fixture
Expand Down Expand Up @@ -139,3 +140,10 @@ def test_get_appliance_id(converter: VisionExcelConverter):

with pytest.raises(KeyError):
converter.get_appliance_id(table="Sources", node_number=1, sub_number=3)


def test_ambiguity_in_vision_excel():
test_file = Path(__file__).parents[2] / "data" / "vision" / "data.xlsx"
Jerry-Jinfeng-Guo marked this conversation as resolved.
Show resolved Hide resolved
converter = ExcelAmbiguityChecker(file_path=test_file.as_posix())
res, _ = converter.check_ambiguity()
assert res == True
14 changes: 7 additions & 7 deletions tests/unit/data_stores/test_excel_file_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ def test_handle_duplicate_columns(mock_check_duplicate_values: MagicMock):
[102, 202, 302, 111, 202, 102],
[103, 203, 303, 103, 203, 103],
],
columns=["A", "B", "C", "A", "B", "A"],
columns=[("A", ""), ("B", ""), ("C", ""), ("A", ""), ("B", ""), ("A", "KW")],
)
store = ExcelFileStore()
mock_check_duplicate_values.return_value = {3: "A_2", 4: "B_2", 5: "A_3"}
Expand All @@ -319,17 +319,17 @@ def test_handle_duplicate_columns(mock_check_duplicate_values: MagicMock):

# Assert
assert len(cap_log) == 3
assert_log_exists(cap_log, "warning", "Column is renamed", col_name="A", new_name="A_2", col_idx=3)
assert_log_exists(cap_log, "warning", "Column is renamed", col_name="B", new_name="B_2", col_idx=4)
assert_log_exists(cap_log, "warning", "Column is renamed", col_name="A", new_name="A_3", col_idx=5)
assert_log_exists(cap_log, "warning", "Column is renamed", col_name=("A", ""), new_name="A_2", col_idx=3)
assert_log_exists(cap_log, "warning", "Column is renamed", col_name=("B", ""), new_name="B_2", col_idx=4)
assert_log_exists(cap_log, "warning", "Column is renamed", col_name=("A", "KW"), new_name="A_3", col_idx=5)
Jerry-Jinfeng-Guo marked this conversation as resolved.
Show resolved Hide resolved

expected = pd.DataFrame(
[ # A B C A_2 B_2 A_3
[101, 201, 301, 101, 201, 101],
[102, 202, 302, 111, 202, 102],
[103, 203, 303, 103, 203, 103],
],
columns=["A", "B", "C", "A_2", "B_2", "A_3"],
columns=[("A", ""), ("B", ""), ("C", ""), ("A_2", ""), ("B_2", ""), ("A_3", "KW")],
)
pd.testing.assert_frame_equal(actual, expected)

Expand Down Expand Up @@ -455,10 +455,10 @@ def test_group_columns_by_index():

def test_group_columns_by_index__multi():
# Arrange
data = pd.DataFrame(columns=pd.MultiIndex.from_tuples([("A", 1), ("B", 2), ("C", 3), ("A", 1), ("B", 2), ("A", 1)]))
data = pd.DataFrame(columns=pd.MultiIndex.from_tuples([("A", 1), ("B", 2), ("C", 3), ("A", 4), ("B", 5), ("A", 6)]))

# Act
grouped = ExcelFileStore._group_columns_by_index(data=data)

# Assert
assert grouped == {("A", 1): {0, 3, 5}, ("B", 2): {1, 4}, ("C", 3): {2}}
assert grouped == {"A": {0, 3, 5}, "B": {1, 4}, "C": {2}}