forked from cf-convention/cf-convention.github.io
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added tools (python, bash) for processing XML files (cf-convention#470)
Regarding cf-convention#469: Just to test the workflow the current XSD link in XML files points to my repo.
- Loading branch information
1 parent
444f308
commit 5d5dd8a
Showing
7 changed files
with
603 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
import re | ||
import argparse | ||
import numpy as np | ||
|
||
|
||
def do_the_work(file_name): | ||
with open(file_name, "r") as fh: | ||
in_text = fh.readlines() | ||
|
||
err_dict = {} | ||
for line in in_text: | ||
line = line.strip() | ||
if line: | ||
if line.startswith("/home"): | ||
version = re.search(r"(?<=names/)\d{1,2}(?=/src)", line) | ||
version = version.group() | ||
else: | ||
line = re.sub(r"Line \d+? : ", "", line) | ||
if line in err_dict.keys(): | ||
err_dict[line].append(version) | ||
else: | ||
err_dict[line] = [version] | ||
out_dict = {} | ||
for line, version_list in err_dict.items(): | ||
v0 = int(version_list[0]) | ||
version_string = ", ".join(version_list) | ||
text = f"{version_string} | {line}" | ||
if v0 in out_dict: | ||
out_dict[v0].append(text) | ||
else: | ||
out_dict[v0] = [text] | ||
for v0 in range(1, 84): | ||
text_list = out_dict.pop(v0, "") | ||
for text in text_list: | ||
print(text) | ||
|
||
if __name__ == "__main__": | ||
parser = argparse.ArgumentParser( | ||
prog="compact_errors", | ||
description=("\nCompact error lists to show in which version each error occur.") | ||
) | ||
parser.add_argument("-f", "--file_name", type=str, | ||
help="Name of input error file") | ||
args = parser.parse_args() | ||
|
||
do_the_work(args.file_name) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,154 @@ | ||
import os | ||
import re | ||
import argparse | ||
import urllib.request | ||
from io import BytesIO | ||
|
||
from cfunits import Units as cfUnits | ||
from cf_units import Unit as uuUnits | ||
from lxml import etree | ||
|
||
|
||
|
||
PATH0 = "/home/a001257/CODE/" | ||
BASE_PATH = PATH0 + "cf-conventions/cf-convention.github.io/Data/cf-standard-names/" | ||
|
||
def parse_xml(xml_raw): | ||
try: | ||
xml_tree = etree.parse(BytesIO(xml_raw)) | ||
except etree.XMLSyntaxError: | ||
print(f"{':'*100}\n{xml_raw[:1000]}") | ||
raise | ||
return xml_tree | ||
|
||
|
||
def get_schema(xml_tree): | ||
root = xml_tree.getroot() | ||
xsd_uri = root.values()[0] | ||
link = urllib.request.urlopen(xsd_uri) | ||
xsd_raw = link.read() | ||
xsd_tree = parse_xml(xsd_raw) | ||
schema = etree.XMLSchema(xsd_tree) | ||
return schema | ||
|
||
|
||
def find_xml_errors(xml_tree, schema, xml_raw): | ||
try: | ||
schema.assertValid(xml_tree) | ||
print(" ---- Valid and Well-formed") | ||
except etree.DocumentInvalid: | ||
xml_list = xml_raw.split(b"\n") | ||
for error in schema.error_log: | ||
for element in ["description", "canonical_units"]: | ||
if f"( {element}" in error.message: | ||
std_name = xml_list[error.line - 1].split(b'"')[1].decode("utf-8)") | ||
print(f"Line {error.line} : Standard name entry for '{std_name}' has no <{element}>") | ||
break | ||
else: | ||
print(f"Line {error.line} : {error.message}") | ||
|
||
|
||
def check_units(can_units, std_name): | ||
uu = cfUnits(can_units) | ||
if not uu.isvalid: | ||
print(f"Canonical units '{can_units}' is not accepted by CF-UNITS for '{std_name}'") | ||
else: | ||
try: | ||
uu = uuUnits(can_units) | ||
if " -" in can_units: | ||
try: | ||
uu = uuUnits(can_units.replace(" -", "-")) | ||
print(f"Canonical units '{can_units}' has a spurious space for '{std_name}'") | ||
except ValueError: | ||
print(f"Canonical unit '{can_units}' is really weird for '{std_name}'") | ||
elif "/" in can_units: | ||
print(f"Canonical units '{can_units}' used '/' for '{std_name}'") | ||
except ValueError: | ||
print(f"Canonical unit '{can_units} is a special CF unit for '{std_name}'") | ||
|
||
|
||
def find_missing_and_duplicates(xml_raw, old_entry_list, old_alias_list): | ||
def _extract_entries(xml_raw): | ||
entry_list = [] | ||
for entry in re.finditer(rb'<entry id=\".+?\">.+?</entry>', xml_raw, re.S): | ||
e = re.search(rb'(?<=\").+?(?=\")', entry.group()) | ||
std_name = e.group().decode("utf-8") | ||
entry_list.append(std_name) | ||
can_units = re.search(rb'(?<=_units>).+?(?=</canonical)', entry.group()) | ||
if can_units: | ||
can_units = can_units.group().decode("utf-8") | ||
check_units(can_units, std_name) | ||
return entry_list | ||
|
||
def _extract_aliases(xml_raw): | ||
alias_dict = {} | ||
for alias in re.finditer(rb'<alias id=.+?</alias>', xml_raw, re.S): | ||
alias_from = re.search(rb'(?<=\").+?(?=\")', alias.group()) | ||
alias_to = re.search(rb'(?<=entry_id>).+?(?=</entry_id)', alias.group()) | ||
alias_to = alias_to.group().decode("utf-8") | ||
alias_from = alias_from.group().decode("utf-8") | ||
alias_dict[alias_from] = alias_to | ||
return alias_dict | ||
|
||
new_entry_list = _extract_entries(xml_raw) | ||
alias_dict = _extract_aliases(xml_raw) | ||
new_alias_list = sorted(alias_dict.keys()) | ||
for alias_from, alias_to in alias_dict.items(): | ||
if alias_from in new_entry_list: | ||
print(f"Both defining and aliasing standard name '{alias_from}' into '{alias_to}'") | ||
elif (alias_from not in old_entry_list) and (alias_from not in old_alias_list): | ||
print(f"Aliasing the undefined standard name '{alias_from}' into '{alias_to}'") | ||
elif alias_to not in new_entry_list: | ||
print(f"Aliasing standard name '{alias_from}' into into the non-existing '{alias_to}'") | ||
_ = [print(f"Standard name '{s}' is discontinued") for | ||
s in sorted(list(set(old_entry_list) - | ||
(set(new_entry_list) | set(new_alias_list))))] | ||
return new_entry_list, new_alias_list | ||
|
||
|
||
def do_the_work(version, severity, entry_list, alias_list): | ||
xml_file = f"{BASE_PATH}{version}/src/cf-standard-name-table.xml" | ||
with open(xml_file, "rb") as fh: | ||
xml_raw = fh.read() | ||
print(xml_file) | ||
|
||
xml_tree = parse_xml(xml_raw) | ||
schema = get_schema(xml_tree) | ||
if severity != 1: | ||
find_xml_errors(xml_tree, schema, xml_raw) | ||
if severity: | ||
entry_list, alias_list = find_missing_and_duplicates(xml_raw, entry_list, alias_list) | ||
return entry_list, alias_list | ||
|
||
|
||
if __name__ == "__main__": | ||
parser = argparse.ArgumentParser( | ||
prog="LIST_ERRORS", | ||
description=("\nList (most) XML errors in standard name files.") | ||
) | ||
parser.add_argument("-v", "--version", type=int, default = 0, | ||
help="Check a specific version (default is 0 (='all').") | ||
parser.add_argument("-s", "--severity", type=int, default = 0, | ||
help="Level of error checks (0=xml (default), 1=CF, 2=both.") | ||
args = parser.parse_args() | ||
severity = args.severity | ||
|
||
if args.version == 0: | ||
version_list = range(1, 100) | ||
elif severity > 0: | ||
version_list = range(1, args.version + 1) | ||
else: | ||
version_list = [args.version] | ||
|
||
entry_list = [] | ||
alias_list = [] | ||
for version in version_list: | ||
try: | ||
if version != 38: | ||
print("\n") | ||
entry_list, alias_list = do_the_work( | ||
version, args.severity, entry_list, alias_list | ||
) | ||
except: | ||
break | ||
print() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
# -*- coding: <encoding name> -*- | ||
|
||
import re | ||
from datetime import datetime, UTC | ||
from pathlib import Path | ||
|
||
MY_PATH = "/home/a001257/CODE/" | ||
BASE_PATH = MY_PATH + "cf-conventions/cf-convention.github.io/Data/cf-standard-names/" | ||
# NEW_XSD = b"../../schema-files/cf-standard-name-table-2.0.xsd" | ||
NEW_XSD = (b"https://raw.githubusercontent.com/larsbarring/cf-convention.github.io/" | ||
b"test-all-issue-457/Data/schema-files/cf-standard-name-table-2.0.xsd") | ||
|
||
def fix_v1_datetime(xml_raw): | ||
txt1 = b">1</version_number>\n" | ||
txt2 = txt1 + b" <last_modified>2002-04-02T12:00:00Z</last_modified>\n" | ||
xml_raw = xml_raw.replace(txt1, txt2) | ||
print("ADDED : DATETIME in version 1") | ||
return xml_raw | ||
|
||
|
||
def fix_v71_datetime(xml_raw): | ||
if b"2020-02-04T12:00Z" in xml_raw: | ||
xml_raw = xml_raw.replace(b"2020-02-04T12:00Z", b"2020-02-04T12:00:00Z") | ||
print("FIXED : DATETIME in version 71") | ||
return xml_raw | ||
|
||
def fix_v12_duplicate_entry(xml_raw): | ||
pat = rb'\n *<entry id="sea_surface_height_above_reference_ellipsoid">.+</entry> *(?=\n)' | ||
xml_raw = re.sub(pat, b"", xml_raw, 1, re.S) | ||
print("FIXED : Removed first duplicate of 'sea_surface_height_above_reference_ellipsoid'") | ||
return xml_raw | ||
|
||
|
||
def add_modified_date(xml_raw): | ||
time_stamp = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ").encode("utf-8") | ||
modified = b"last_modified" | ||
modified_start = b"<" + modified + b">" | ||
modified_end = modified_start.replace(b"<", b"</") | ||
modified_element = modified_start + time_stamp + modified_end | ||
inst_text = b"<institution>" | ||
n = len( inst_text) | ||
inst = re.search((b"\n( *)" + inst_text), xml_raw) | ||
spaces = inst.group()[1: -n] | ||
position = inst.span()[0] | ||
xml_raw = xml_raw[:position] + b"\n" + spaces + modified_element + xml_raw[position:] | ||
print("ADDED : MODIFIED DATE") | ||
return xml_raw | ||
|
||
|
||
def do_the_work(version): | ||
xml_original = f"{BASE_PATH}{version}/src/cf-standard-name-table.xml" | ||
xml_saved = xml_original.replace("-table", "-table__SAVED") | ||
|
||
my_file = Path(xml_saved) | ||
if my_file.is_file(): | ||
# work on original files that are already saved | ||
with open(xml_saved, "rb") as fh: | ||
xml_raw = fh.read() | ||
print(f"READING SAVED ORIGINAL FILE: {xml_original}") | ||
else: | ||
# work on original files that have not yet been saved | ||
with open(xml_original, "rb") as fh: | ||
xml_raw = fh.read() | ||
# then save the original before changing the original | ||
with open(xml_saved, "wb") as fh: | ||
fh.write(xml_raw) | ||
print(f"READING AND SAVING ORIGINAL FILE: {xml_original}") | ||
|
||
for old_xsd in [b"CFStandardNameTable-1.0.xsd", | ||
b"CFStandardNameTable-1.1.xsd", | ||
b"cf-standard-name-table-1.1.xsd"]: | ||
if old_xsd in xml_raw: | ||
xml_raw = xml_raw.replace(old_xsd, NEW_XSD) | ||
print(f"CHANGED : XSD FILE NAME {old_xsd.decode('utf-8')} --> {NEW_XSD.decode('utf-8')}") | ||
|
||
if version == 1: | ||
xml_raw = fix_v1_datetime(xml_raw) | ||
elif version == 12: | ||
xml_raw = fix_v12_duplicate_entry(xml_raw) | ||
elif version == 71: | ||
xml_raw = fix_v71_datetime(xml_raw) | ||
|
||
xml_raw = xml_raw.replace(b"last_modified", b"first_published_date") | ||
print("CHANGED : 'last_modified' --> 'first_published_date'") | ||
|
||
xml_raw = add_modified_date(xml_raw) | ||
|
||
with open(xml_original, "wb") as fh: | ||
fh.write(xml_raw) | ||
|
||
|
||
if __name__ == "__main__": | ||
for version in range(1, 100): | ||
try: | ||
if version != 38: | ||
print("\n") | ||
do_the_work(version) | ||
except: | ||
break |
Oops, something went wrong.