diff --git a/silnlp/common/check_books.py b/silnlp/common/check_books.py index b069c5bf..485bdb01 100644 --- a/silnlp/common/check_books.py +++ b/silnlp/common/check_books.py @@ -50,7 +50,8 @@ def parse_book(project_dir: str, book: str): settings = FileParatextProjectSettingsParser(project_dir).parse() book_path = Path(project_dir) / settings.get_book_file_name(book) - + LOGGER.info(f"Attempting to parse {book} from {book_path}.") + if not book_path.is_file(): raise RuntimeError(f"Can't find file {book_path} for book {book}") diff --git a/silnlp/common/combine_scores_save.py b/silnlp/common/combine_scores_save.py new file mode 100644 index 00000000..098846a5 --- /dev/null +++ b/silnlp/common/combine_scores_save.py @@ -0,0 +1,116 @@ +import argparse +import csv +import sys +from collections import defaultdict +from pathlib import Path + +import pandas as pd + +from ..common.environment import SIL_NLP_ENV + + +def check_for_lock_file(folder: Path, filename: str, file_type: str): + """Check for lock files and ask the user to close them then exit.""" + + if file_type[0] == ".": + file_type = file_type[1:] + + if file_type.lower() == "csv": + lockfile = folder / f".~lock.{filename}.{file_type}#" + elif file_type.lower() == "xlsx": + lockfile = folder / f"~${filename}.{file_type}" + + if lockfile.is_file(): + print(f"Found lock file: {lockfile}") + print(f"Please close {filename}.{file_type} in folder {folder} OR delete the lock file and try again.") + sys.exit() + + +def aggregate_csv(folder_path): + # Dictionary to store rows by header type + data_by_header = defaultdict(list) + + # Iterate over all CSV files in the folder and its subfolders + for csv_file in folder_path.rglob("*/scores-*.csv"): + series = csv_file.parts[-3] # Extract series folder name + experiment = csv_file.parts[-2] # Extract experiment folder name + steps = csv_file.stem.split("-")[-1] # Extract steps from file name + + # Read the CSV file and add new columns + with open(csv_file, "r") as f: + reader = csv.reader(f) + rows = list(reader) + header = tuple(rows[0]) # Use tuple to make it hashable + + # Add columns to the beginning of each row + if header not in data_by_header: + data_by_header[header].append(["Series", "Experiment", "Steps"] + list(header)) + for row in rows[1:]: + data_by_header[header].append([series, experiment, steps] + row) + + return data_by_header + + +def write_to_csv(data_by_header, folder, output_filename): + + output_file = folder / f"{output_filename}.csv" + with open(output_file, "w", newline="") as f: + writer = csv.writer(f) + for header, rows in data_by_header.items(): + writer.writerows(rows) + writer.writerow([]) # Add a blank row to separate different types + # Write the folder path to the last line of the CSV file + writer.writerow([folder]) + print(f"Wrote scores to {output_file}") + + +def write_to_excel(data_by_header, folder, output_filename): + output_file = folder / f"{output_filename}.xlsx" + with pd.ExcelWriter(output_file) as writer: + for i, (header, rows) in enumerate(data_by_header.items()): + # Create a DataFrame for the current header + df = pd.DataFrame(rows[1:], columns=rows[0]) + # Convert columns to appropriate data types + df = df.apply(pd.to_numeric, errors="ignore") + # Generate a unique sheet name + sheet_name = f"Table_{i + 1}" + # Write the DataFrame to the Excel file + df.to_excel(writer, sheet_name=sheet_name, index=False) + print(f"Wrote scores to {output_file}") + + +def main(): + parser = argparse.ArgumentParser(description="Aggregate CSV files in a folder.") + parser.add_argument("folder", type=Path, help="Path to the folder containing CSV files.") + parser.add_argument( + "--output_filename", + type=str, + default="scores", + help="Filename suffix without the '.csv' or '.xlsx'. \ + The folder name is added as a prefix to make it easier to distinguish scores files in search results.", + ) + args = parser.parse_args() + + folder = Path(args.folder) + + csv_filename = f"{folder}_{args.output_filename}" + excel_filename = f"{folder}_{args.output_filename}" + + if not folder.is_dir(): + folder = Path(SIL_NLP_ENV.mt_experiments_dir) / args.folder + + # Check for lock files and ask the user to close them. + check_for_lock_file(folder, csv_filename, "csv") + check_for_lock_file(folder, excel_filename, "xlsx") + + data = aggregate_csv(folder) + + # Write the aggregated data to a new CSV file + write_to_csv(data, folder, csv_filename) + + # Write the aggregated data to an Excel file + write_to_excel(data, folder, excel_filename) + + +if __name__ == "__main__": + main() diff --git a/silnlp/common/find_by_iso.py b/silnlp/common/find_by_iso.py index a47effa8..a91177ef 100644 --- a/silnlp/common/find_by_iso.py +++ b/silnlp/common/find_by_iso.py @@ -1,12 +1,15 @@ import argparse import json import logging -from pathlib import Path -from typing import Dict, List, Set, Tuple, Union import sys +from datetime import datetime +from pathlib import Path +from typing import Dict, List, Set, Tuple + +import regex as re from .environment import SIL_NLP_ENV -from .iso_info import NLLB_ISO_SET, ALT_ISO +from .iso_info import ALT_ISO, NLLB_ISO_SET IsoCode = str IsoCodeList = List[IsoCode] @@ -14,6 +17,7 @@ LANGUAGE_FAMILY_FILE = SIL_NLP_ENV.assets_dir / "languageFamilies.json" + def load_language_data(file_path: Path) -> Tuple[Dict, Dict, Dict]: try: with open(file_path, "r", encoding="utf-8") as file: @@ -54,8 +58,6 @@ def find_related_isocodes( for iso_code in iso_codes: if iso_code in language_data: lang_info = language_data[iso_code] -# logger.info(f"{iso_code}: {lang_info['Name']}, {lang_info['Country']}, {lang_info['Family']}") - iso_set.update(country_data.get(lang_info["Country"], [])) iso_set.update(family_data.get(lang_info["Family"], [])) @@ -64,10 +66,10 @@ def find_related_isocodes( def get_files_by_iso(isocodes: IsoCodeList, scripture_dir: Path) -> List[Path]: return [ - file for file in scripture_dir.glob('*.txt') - if any(file.stem.startswith(isocode + '-') for isocode in isocodes) + file for file in scripture_dir.glob("*.txt") if any(file.stem.startswith(isocode + "-") for isocode in isocodes) ] + def split_files_by_projects(files: List[Path], projects_dir: Path) -> Tuple[Dict[Path, Path], Dict[Path, Path]]: existing_projects = {} missing_projects = {} @@ -85,24 +87,64 @@ def split_files_by_projects(files: List[Path], projects_dir: Path) -> Tuple[Dict def get_equivalent_isocodes(iso_codes: List[str]) -> Set[str]: return {code for iso_code in iso_codes for code in (iso_code, ALT_ISO.get_alternative(iso_code)) if code} + +def filter_files(files: List[Path], excluded_patterns: List[str]) -> List[Path]: + filtered = [] + + today = datetime.now() + today_pattern = re.compile(f"{today.strftime('_%Y_%m_%d')}|{today.strftime('_%d_%m_%Y')}") + date_pattern = re.compile(r"_\d{4}_\d{1,2}_\d{1,2}|_\d{1,2}_\d{1,2}_\d{4}") + + for file in files: + parts = file.stem.split("-", 1) + if len(parts) != 2: + continue + iso, name = parts + if today_pattern.search(name): + filtered.append(file) + continue + if date_pattern.search(name): + continue + if len(iso) not in (2, 3): + continue + if any(pattern.lower() in name.lower() for pattern in excluded_patterns): + continue + if file.is_file() and file.stat().st_size < 100_000: + continue + filtered.append(file) + return filtered + + def main(): parser = argparse.ArgumentParser(description="Find related ISO language codes.") parser.add_argument("iso_codes", nargs="+", help="ISO codes to find related languages for") - parser.add_argument("--scripture-dir", type=Path, default=Path(SIL_NLP_ENV.mt_scripture_dir), help="Directory containing scripture files") - parser.add_argument("--all-related", action='store_true', help="List all related scriptures without filtering to those that are part of NLLB") - parser.add_argument("--no-related", action='store_true', help="Only list scriptures in the specified languages and not in related languages") + parser.add_argument( + "--scripture-dir", + type=Path, + default=Path(SIL_NLP_ENV.mt_scripture_dir), + help="Directory containing scripture files", + ) + parser.add_argument( + "--all-related", + action="store_true", + help="List all related scriptures without filtering to those that are part of NLLB", + ) + parser.add_argument( + "--no-related", + action="store_true", + help="Only list scriptures in the specified languages and not in related languages", + ) parser.add_argument("--output", type=Path, help="Output to the specified file.") args = parser.parse_args() # Create a custom logger logger = logging.getLogger(__name__) - #logger.basicConfig() # Set the global logging level - logger.setLevel(logging.INFO) - - formatter = logging.Formatter('%(message)s') + logger.setLevel(logging.INFO) + + formatter = logging.Formatter("%(message)s") if args.output: # Create handler for the file output. @@ -115,7 +157,6 @@ def main(): console_handler.setFormatter(formatter) logger.addHandler(console_handler) - language_data, country_data, family_data = load_language_data(LANGUAGE_FAMILY_FILE) projects_dir = SIL_NLP_ENV.pt_projects_dir scripture_dir = Path(args.scripture_dir) @@ -123,22 +164,22 @@ def main(): if not language_data: logging.error("Failed to load language data.") return - + # Get equivalent ISO codes for input iso_codes = get_equivalent_isocodes(args.iso_codes) - + if args.no_related: - + # Option 2: No files in related languages, only equivalent ISO codes codes_to_find = list(iso_codes) logger.info(f"\nConsidering only the specified iso codes and their equivalents. {codes_to_find}") - + else: # Find related ISO codes codes_to_find = find_related_isocodes(list(iso_codes), language_data, country_data, family_data) logger.info(f"\nFound {len(codes_to_find)} related languages:\n{codes_to_find}.") - if not args.all_related: + if not args.all_related: # Option 3 (default): Filter to NLLB languages codes_to_find = [iso for iso in codes_to_find if iso in NLLB_ISO_SET] logger.info(f"\nFound {len(codes_to_find)} specified or related languages in NLLB:\n{codes_to_find}") @@ -148,10 +189,29 @@ def main(): # Get all possible 2 and 3 letter codes for the related languages all_possible_codes = get_equivalent_isocodes(codes_to_find) - + # Find files matching the codes files = get_files_by_iso(all_possible_codes, scripture_dir) - existing_projects, missing_projects = split_files_by_projects(files, projects_dir) + + # Filter out AI and XRI files, and others. + excluded_patterns = [ + "XRI", + "600M", + "3.3B", + "1.3B", + "words", + "name", + "clean", + "transcription", + "matthew", + "mark", + "mrk", + "luk", + ] + filtered_files = filter_files(files, excluded_patterns) + print(f"There are {len(files)} files and {len(files)-len(filtered_files)} were filtered out.") + + existing_projects, missing_projects = split_files_by_projects(filtered_files, projects_dir) # Display results if existing_projects: @@ -163,12 +223,13 @@ def main(): logger.info(f"\nThese {len(missing_projects)} files don't have a corresponding project folder:") for file, _ in missing_projects.items(): logger.info(f"{file.stem}") - logger.info(f"\nAll the files:") - for file in files: + logger.info("\nFiltered files:") + for file in filtered_files: logger.info(f" - {file.stem}") if not files: logger.info("\nCouldn't find any Scripture files in these languages.") + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/silnlp/common/usfm_utils.py b/silnlp/common/usfm_utils.py deleted file mode 100644 index 7064b66c..00000000 --- a/silnlp/common/usfm_utils.py +++ /dev/null @@ -1,66 +0,0 @@ -from pathlib import Path - -from machine.corpora import FileParatextProjectSettingsParser, UsfmFileText, UsfmTokenizer, UsfmTokenType - -# Marker "type" is as defined by the UsfmTokenType given to tokens by the UsfmTokenizer, -# which mostly aligns with a marker's StyleType in the USFM stylesheet -CHARACTER_TYPE_EMBEDS = ["fig", "fm", "jmp", "rq", "va", "vp", "xt", "xtSee", "xtSeeAlso"] -PARAGRAPH_TYPE_EMBEDS = ["lit", "r", "rem"] -NON_NOTE_TYPE_EMBEDS = CHARACTER_TYPE_EMBEDS + PARAGRAPH_TYPE_EMBEDS - - -def main() -> None: - """ - Print out all paragraph and character markers for a book - To use set book, fpath, and out_path. fpath should be a path to a book in a Paratext project - """ - - book = "MAT" - fpath = Path("") - out_path = Path("") - sentences_file = Path("") - - settings = FileParatextProjectSettingsParser(fpath.parent).parse() - file_text = UsfmFileText( - settings.stylesheet, - settings.encoding, - book, - fpath, - settings.versification, - include_markers=True, - include_all_text=True, - project=settings.name, - ) - - vrefs = [] - usfm_markers = [] - usfm_tokenizer = UsfmTokenizer(settings.stylesheet) - with sentences_file.open("w", encoding=settings.encoding) as f: - for sent in file_text: - f.write(f"{sent}\n") - if len(sent.ref.path) > 0 and sent.ref.path[-1].name in PARAGRAPH_TYPE_EMBEDS: - continue - - vrefs.append(sent.ref) - usfm_markers.append([]) - usfm_toks = usfm_tokenizer.tokenize(sent.text.strip()) - - ignore_scope = None - for tok in usfm_toks: - if ignore_scope is not None: - if tok.type == UsfmTokenType.END and tok.marker[:-1] == ignore_scope.marker: - ignore_scope = None - elif tok.type == UsfmTokenType.NOTE or ( - tok.type == UsfmTokenType.CHARACTER and tok.marker in CHARACTER_TYPE_EMBEDS - ): - ignore_scope = tok - elif tok.type in [UsfmTokenType.PARAGRAPH, UsfmTokenType.CHARACTER, UsfmTokenType.END]: - usfm_markers[-1].append(tok.marker) - - with out_path.open("w", encoding=settings.encoding) as f: - for ref, markers in zip(vrefs, usfm_markers): - f.write(f"{ref} {markers}\n") - - -if __name__ == "__main__": - main()