Skip to content

Commit

Permalink
Merge pull request #261 from PowerGridModel/feature/vision-excel-cnvt…
Browse files Browse the repository at this point in the history
…r-duplicated-column

Feature / Ambiguity handling Vision exported Excels
  • Loading branch information
Jerry-Jinfeng-Guo authored Jul 5, 2024
2 parents 4004d24 + 4a18cfe commit 15c5574
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 14 deletions.
14 changes: 8 additions & 6 deletions src/power_grid_model_io/data_stores/excel_file_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,16 @@ def _handle_duplicate_columns(self, data: pd.DataFrame, sheet_name: str) -> pd.D
if to_rename:
columns = data.columns.values.copy()
for col_idx, new_name in to_rename.items():
new_name = new_name[0] if isinstance(new_name, tuple) else new_name
full_new_name = (new_name, columns[col_idx][1])
self._log.warning(
"Column is renamed",
sheet_name=sheet_name,
col_name=columns[col_idx],
new_name=new_name,
new_name=full_new_name,
col_idx=col_idx,
)
columns[col_idx] = new_name
columns[col_idx] = full_new_name

if data.columns.nlevels == 1:
data.columns = pd.Index(columns)
Expand Down Expand Up @@ -218,7 +220,7 @@ def _check_duplicate_values(self, sheet_name: str, data: pd.DataFrame) -> Dict[i
if isinstance(col_name, tuple):
to_rename[dup_idx] = (f"{col_name[0]}_{counter}",) + col_name[1:]
else:
to_rename[dup_idx] = f"{col_name[0]}_{counter}"
to_rename[dup_idx] = f"{col_name}_{counter}"

return to_rename

Expand Down Expand Up @@ -253,7 +255,7 @@ def _group_columns_by_index(data: pd.DataFrame) -> Dict[Union[str, Tuple[str, ..
grouped: Dict[Union[str, Tuple[str, ...]], Set[int]] = {}
columns = data.columns.values
for col_idx, col_name in enumerate(columns):
if col_name not in grouped:
grouped[col_name] = set()
grouped[col_name].add(col_idx)
if col_name[0] not in grouped:
grouped[col_name[0]] = set()
grouped[col_name[0]].add(col_idx)
return grouped
2 changes: 1 addition & 1 deletion src/power_grid_model_io/functions/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def exclude_empty(row: pd.Series, col: str) -> bool:
"""
filter out empty
"""
if col not in row:
if col not in row:
raise ValueError(f"The column: '{col}' cannot be found for the filter")
result = has_value(row[col])
if isinstance(result, pd.Series):
Expand Down
165 changes: 165 additions & 0 deletions src/power_grid_model_io/utils/excel_ambiguity_checker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
# SPDX-FileCopyrightText: Contributors to the Power Grid Model project <[email protected]>
#
# SPDX-License-Identifier: MPL-2.0
"""
This module provides the ExcelAmbiguityChecker class, which is designed to identify and report ambiguous column names
within the sheets of an Excel (.xlsx) file. It parses the Excel file, extracts the names of columns from a specified
row across all sheets, and checks for any duplicates within those names to flag them as ambiguous.
Usage:
checker = ExcelAmbiguityChecker(file_path='path/to/excel/file.xlsx', column_name_in_row=0)
has_ambiguity, ambiguous_columns = checker.check_ambiguity()
if has_ambiguity:
print("Ambiguous column names found:", ambiguous_columns)
else:
print("No ambiguous column names found.")
Requirements:
- Python 3.9 or higher (PGM library dependencies)
- xml.etree.ElementTree for parsing XML structures within the Excel file.
- zipfile to handle the Excel file as a ZIP archive for parsing.
"""
import os
import xml.etree.ElementTree as ET
import zipfile
from collections import Counter
from typing import Dict, List, Optional, Tuple

XML_NAME_SPACE = {"": "http://schemas.openxmlformats.org/spreadsheetml/2006/main"} # NOSONAR
WORK_BOOK = "xl/workbook.xml"
SHARED_STR_PATH = "xl/sharedStrings.xml"
FIND_T = ".//t"
FIND_C = ".//c"
FIND_V = ".//v"
NAME = "name"
FIND_ROW = ".//row"
FIND_SHEET = ".//sheet"
FIND_TYPE = "t"
TYPE_STR = "s"


class ExcelAmbiguityChecker:
"""
A class to check for ambiguous column names within the sheets of an Excel (.xlsx) file.
Attributes:
_file_path (str): The path to the Excel file to be checked.
_col_name_in_row (int): The row index (0-based) where column names are expected. Default is 0.
sheets (dict): A dictionary storing sheet names as keys and lists of column names as values.
Methods:
__init__(self, file_path, column_name_in_row=0): Initializes the ExcelAmbiguityChecker instance.
_parse_zip(self, zip_file): Parses the shared strings XML file within the Excel ZIP archive.
_get_column_names_from_row(self, row, shared_strings): Extracts column names from a specified row.
_parse_excel_file(self): Parses the Excel file to extract sheet names and their corresponding column names.
"""

def __init__(self, file_path, column_name_in_row=0) -> None:
"""
Initializes the ExcelAmbiguityChecker with the path to an Excel file and the row index for column names.
Parameters:
file_path (str): The path to the Excel file.
column_name_in_row (int): The row index (0-based) where column names are expected. Default is 0.
"""
self._valid_file = file_path.endswith(".xlsx") and os.path.exists(file_path)
if self._valid_file:
self._file_path = file_path
self._col_name_in_row = column_name_in_row
self.sheets: Dict[str, List[str]] = {}
self._parse_excel_file()

def _parse_zip(self, zip_file) -> List[Optional[str]]:
"""
Parses the shared strings XML file within the Excel ZIP archive to extract all shared strings.
Parameters:
zip_file (zipfile.ZipFile): The opened Excel ZIP file.
Returns:
list: A list of shared strings used in the Excel file.
"""
shared_strings_path = SHARED_STR_PATH
shared_strings = []
with zip_file.open(shared_strings_path) as f:
tree = ET.parse(f)
for si in tree.findall(FIND_T, namespaces=XML_NAME_SPACE):
shared_strings.append(si.text)
return shared_strings

def _get_column_names_from_row(self, row, shared_strings) -> List[Optional[str]]:
"""
Extracts column names from a specified row using shared strings for strings stored in the shared string table.
Parameters:
row (xml.etree.ElementTree.Element): The XML element representing the row.
shared_strings (list): A list of shared strings extracted from the Excel file.
Returns:
list: A list of column names found in the row.
"""
column_names = []
for c in row.findall(FIND_C, namespaces=XML_NAME_SPACE):
cell_type = c.get(FIND_TYPE)
value = c.find(FIND_V, namespaces=XML_NAME_SPACE)
if cell_type == TYPE_STR and value is not None:
column_names.append(shared_strings[int(value.text)])
elif value is not None:
column_names.append(value.text)
else:
column_names.append(None)
return column_names

def _parse_excel_file(self) -> None:
"""
Parses the Excel file to extract sheet names and their corresponding column names.
"""
with zipfile.ZipFile(self._file_path) as z:
shared_strings = self._parse_zip(z)
workbook_xml = z.read(WORK_BOOK)
xml_tree = ET.fromstring(workbook_xml)
sheets = xml_tree.findall(FIND_SHEET, namespaces=XML_NAME_SPACE)

for index, sheet in enumerate(sheets, start=1):
sheet_name = str(sheet.get(NAME))
sheet_file_path = f"xl/worksheets/sheet{index}.xml"

with z.open(sheet_file_path) as f:
sheet_tree = ET.parse(f)
rows = sheet_tree.findall(FIND_ROW, namespaces=XML_NAME_SPACE)
if rows:
column_names = self._get_column_names_from_row(rows[self._col_name_in_row], shared_strings)
self.sheets[sheet_name] = [name for name in column_names if name is not None]

def list_sheets(self) -> List[str]:
"""
Get the list of all sheet names in the Excel file.
Returns:
List[str]: list of all sheet names
"""
return list(self.sheets.keys())

def check_ambiguity(self) -> Tuple[bool, Dict[str, List[str]]]:
"""
Check if there is ambiguity in column names across sheets.
Returns:
Tuple[bool, Dict[str, List[str]]]: A tuple containing a boolean indicating if any ambiguity was found,
and a dictionary with sheet names as keys and lists of ambiguous column names as values.
"""
res: Dict[str, List[str]] = {}
if not self._valid_file:
return False, res
for sheet_name, column_names in self.sheets.items():
column_name_counts = Counter(column_names)
duplicates = [name for name, count in column_name_counts.items() if count > 1]
if duplicates:
res[sheet_name] = duplicates
return bool(res), res


# Example usage
if __name__ == "__main__":
excel_file_checker = ExcelAmbiguityChecker("excel_ambiguity_check_data.xlsx")
excel_file_checker.check_ambiguity()
Binary file added tests/data/vision/excel_ambiguity_check_data.xlsx
Binary file not shown.
3 changes: 3 additions & 0 deletions tests/data/vision/excel_ambiguity_check_data.xlsx.license
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
SPDX-FileCopyrightText: Contributors to the Power Grid Model project <[email protected]>

SPDX-License-Identifier: MPL-2.0
8 changes: 8 additions & 0 deletions tests/unit/converters/test_vision_excel_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pytest

from power_grid_model_io.converters.vision_excel_converter import DEFAULT_MAPPING_FILE, VisionExcelConverter
from power_grid_model_io.utils.excel_ambiguity_checker import ExcelAmbiguityChecker


@pytest.fixture
Expand Down Expand Up @@ -139,3 +140,10 @@ def test_get_appliance_id(converter: VisionExcelConverter):

with pytest.raises(KeyError):
converter.get_appliance_id(table="Sources", node_number=1, sub_number=3)


def test_ambiguity_in_vision_excel():
ambiguious_test_file = Path(__file__).parents[2] / "data" / "vision" / "excel_ambiguity_check_data.xlsx"
excel_file_checker = ExcelAmbiguityChecker(file_path=ambiguious_test_file.as_posix())
res, _ = excel_file_checker.check_ambiguity()
assert res == True
15 changes: 8 additions & 7 deletions tests/unit/data_stores/test_excel_file_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ def test_handle_duplicate_columns(mock_check_duplicate_values: MagicMock):
[102, 202, 302, 111, 202, 102],
[103, 203, 303, 103, 203, 103],
],
columns=["A", "B", "C", "A", "B", "A"],
columns=[("A", ""), ("B", ""), ("C", ""), ("A", ""), ("B", ""), ("A", "KW")],
)
store = ExcelFileStore()
mock_check_duplicate_values.return_value = {3: "A_2", 4: "B_2", 5: "A_3"}
Expand All @@ -319,17 +319,18 @@ def test_handle_duplicate_columns(mock_check_duplicate_values: MagicMock):

# Assert
assert len(cap_log) == 3
assert_log_exists(cap_log, "warning", "Column is renamed", col_name="A", new_name="A_2", col_idx=3)
assert_log_exists(cap_log, "warning", "Column is renamed", col_name="B", new_name="B_2", col_idx=4)
assert_log_exists(cap_log, "warning", "Column is renamed", col_name="A", new_name="A_3", col_idx=5)
assert_log_exists(cap_log, "warning", "Column is renamed", col_name=("A", ""), new_name=("A_2", ""), col_idx=3)
assert_log_exists(cap_log, "warning", "Column is renamed", col_name=("B", ""), new_name=("B_2", ""), col_idx=4)
assert_log_exists(cap_log, "warning", "Column is renamed", col_name=("A", "KW"), new_name=("A_3", "KW"), col_idx=5)

expected = pd.DataFrame(
[ # A B C A_2 B_2 A_3
# KW
[101, 201, 301, 101, 201, 101],
[102, 202, 302, 111, 202, 102],
[103, 203, 303, 103, 203, 103],
],
columns=["A", "B", "C", "A_2", "B_2", "A_3"],
columns=[("A", ""), ("B", ""), ("C", ""), ("A_2", ""), ("B_2", ""), ("A_3", "KW")],
)
pd.testing.assert_frame_equal(actual, expected)

Expand Down Expand Up @@ -455,10 +456,10 @@ def test_group_columns_by_index():

def test_group_columns_by_index__multi():
# Arrange
data = pd.DataFrame(columns=pd.MultiIndex.from_tuples([("A", 1), ("B", 2), ("C", 3), ("A", 1), ("B", 2), ("A", 1)]))
data = pd.DataFrame(columns=pd.MultiIndex.from_tuples([("A", 1), ("B", 2), ("C", 3), ("A", 4), ("B", 5), ("A", 6)]))

# Act
grouped = ExcelFileStore._group_columns_by_index(data=data)

# Assert
assert grouped == {("A", 1): {0, 3, 5}, ("B", 2): {1, 4}, ("C", 3): {2}}
assert grouped == {"A": {0, 3, 5}, "B": {1, 4}, "C": {2}}

0 comments on commit 15c5574

Please sign in to comment.