From 63fc8978d2c9b273d74ae155e11f4dcca67b153d Mon Sep 17 00:00:00 2001 From: Ruggero Turra Date: Thu, 16 Nov 2023 00:39:32 +0100 Subject: [PATCH 1/4] Add thread safety to database updates in check_biblio.py --- check_biblio.py | 47 ++++++++++++++++++++++++++++------------------- 1 file changed, 28 insertions(+), 19 deletions(-) diff --git a/check_biblio.py b/check_biblio.py index 495c1d9..fe2fa45 100755 --- a/check_biblio.py +++ b/check_biblio.py @@ -9,6 +9,7 @@ import sqlite3 import difflib from typing import Optional +import threading import bibtexparser @@ -18,18 +19,25 @@ class DataBase: def __init__(self, filename: str): self.filename = filename - self.con = sqlite3.connect(filename) + self.con = sqlite3.connect(filename, check_same_thread=False) self.cur = self.con.cursor() self.cur.execute("CREATE TABLE IF NOT EXISTS entries(key, original, final)") + self.lock = threading.Lock() + self.nupdates = 0 def update(self, key: str, original: str, final: str): - self.cur.execute("INSERT INTO entries VALUES (?, ?, ?)", (key, original.strip(), final.strip())) + with self.lock: + self.cur.execute("INSERT INTO entries VALUES (?, ?, ?)", (key, original.strip(), final.strip())) + self.nupdates += 1 + if self.nupdates % 20 == 0: + self.con.commit() def query(self, original: str): try: query = "SELECT * FROM entries WHERE original=?" - res = self.cur.execute(query, (original,)) - r = res.fetchone() + with self.lock: + res = self.cur.execute(query, (original,)) + r = res.fetchone() except sqlite3.OperationalError as ex: print(f"problem executing query {query} with {original}") print("error: %s" % ex) @@ -213,7 +221,7 @@ def check_latex_entry(key: str, tex: str, use_bibtex: bool = False) -> Optional[ return error - +lock = threading.Lock() def run_entry(entry, db, fix_unicode, substitutions): raw_original = entry.raw.strip() raw_proposed = raw_original @@ -221,25 +229,29 @@ def run_entry(entry, db, fix_unicode, substitutions): from_cache = db.query(raw_original) if from_cache is not None: if raw_original != from_cache: - substitutions.append((raw_original, from_cache)) + with lock: + substitutions.append((raw_original, from_cache)) return if fix_unicode: raw_proposed = replace_unicode(raw_original) if raw_proposed != raw_original: - print(f"unicode found in {entry.key}, fixing") + with lock: + print(f"unicode found in {entry.key}, fixing") while True: error = check_latex_entry(entry.key, raw_proposed, args.use_bibtex) if error is None: break - print(f"problem running item {entry.key}") - raw_proposed = modify_item(raw_proposed, error).strip() + with lock: + print(f"problem running item {entry.key}") + raw_proposed = modify_item(raw_proposed, error).strip() if raw_original != raw_proposed: - print(diff_strings(raw_original, raw_proposed)) - substitutions.append((raw_original, raw_proposed)) + with lock: + print(diff_strings(raw_original, raw_proposed)) + substitutions.append((raw_original, raw_proposed)) db.update(entry.key, raw_original, raw_proposed) @@ -267,13 +279,10 @@ def run_entry(entry, db, fix_unicode, substitutions): print("found %d entries" % len(biblio_parsed.entries)) nentries = len(biblio_parsed.entries) - # import multiprocessing - # import functools - # p = multiprocessing.Pool(4) - # p.map(functools.partial(run_entry, cur=cur, fix_unicode=args.fix_unicode, substitutions=substitutions), biblio_parsed.entries) - for ientry, entry in enumerate(biblio_parsed.entries, 1): - print("checking key %s %d/%d" % (entry.key, ientry, nentries)) - run_entry(entry, db, args.fix_unicode, substitutions) + from multiprocessing.pool import ThreadPool + import functools + p = ThreadPool(4) + p.map(functools.partial(run_entry, db=db, fix_unicode=args.fix_unicode, substitutions=substitutions), biblio_parsed.entries) finally: @@ -284,7 +293,7 @@ def run_entry(entry, db, fix_unicode, substitutions): print("BIG PROBLEM: old == new") print(diff_strings(old, new)) if old not in biblio: - print("BIG PROBLEM: old not in biblio") + print("BIG PROBLEM: old not in biblio: %s" % old) biblio = biblio.replace(old, new) new_biblio_fn = args.bibtex.replace(".bib", "_new.bib") with open(new_biblio_fn, "w", encoding="utf-8") as f: From dac2568865a573b3a08f403e4dd496b75db71a48 Mon Sep 17 00:00:00 2001 From: Ruggero Turra Date: Thu, 16 Nov 2023 16:21:11 +0100 Subject: [PATCH 2/4] move to threadpoolexecutor --- check_biblio.py | 107 ++++++++++++++++++++++++++++++++---------------- 1 file changed, 72 insertions(+), 35 deletions(-) diff --git a/check_biblio.py b/check_biblio.py index fe2fa45..26d8971 100755 --- a/check_biblio.py +++ b/check_biblio.py @@ -8,49 +8,66 @@ import argparse import sqlite3 import difflib -from typing import Optional +from typing import Optional, List, Tuple import threading +from concurrent.futures import ThreadPoolExecutor, as_completed +import functools import bibtexparser +print_lock = threading.Lock() + class DataBase: """A simple database to store the original and final strings""" def __init__(self, filename: str): self.filename = filename - self.con = sqlite3.connect(filename, check_same_thread=False) - self.cur = self.con.cursor() - self.cur.execute("CREATE TABLE IF NOT EXISTS entries(key, original, final)") - self.lock = threading.Lock() - self.nupdates = 0 + self._con = sqlite3.connect(filename, check_same_thread=False) + self._cur = self._con.cursor() + self._cur.execute("CREATE TABLE IF NOT EXISTS entries(key, original, final)") + self._lock = threading.Lock() + self._nupdates = 0 + self.substitutions: List[Tuple[str, str]] = [] def update(self, key: str, original: str, final: str): - with self.lock: - self.cur.execute("INSERT INTO entries VALUES (?, ?, ?)", (key, original.strip(), final.strip())) - self.nupdates += 1 - if self.nupdates % 20 == 0: - self.con.commit() + with self._lock: + self._cur.execute( + "INSERT INTO entries VALUES (?, ?, ?)", + (key, original.strip(), final.strip()), + ) + self._nupdates += 1 + if self._nupdates % 20 == 0: + self._con.commit() + if original != final: + self.substitutions.append((original, final)) def query(self, original: str): try: query = "SELECT * FROM entries WHERE original=?" - with self.lock: - res = self.cur.execute(query, (original,)) + with self._lock: + res = self._cur.execute(query, (original,)) r = res.fetchone() except sqlite3.OperationalError as ex: print(f"problem executing query {query} with {original}") print("error: %s" % ex) raise ex if r: - return r[2] - else: - return None + proposed = r[2] + if original != proposed: + with self._lock: + self.substitutions.append((original, proposed)) + return proposed + return None + + def commit(self): + with self._lock: + self._con.commit() def __del__(self): print("closing database") - self.con.commit() - self.con.close() + self._con.commit() + self._con.close() def diff_strings(a: str, b: str) -> str: @@ -75,6 +92,8 @@ def diff_strings(a: str, b: str) -> str: regex_unicode = re.compile("[^\x00-\x7F]") + + def help_unicode(item: str) -> Optional[str]: m = regex_unicode.search(item) if m: @@ -85,6 +104,7 @@ def help_unicode(item: str) -> Optional[str]: + "*****UNICODE******" + item[m.end() :] ) + return None def replace_unicode(item: str) -> str: @@ -100,7 +120,8 @@ def replace_unicode(item: str) -> str: def replace_chars(match): char = match.group(0) - print('unicode found, replacing "%s" with "%s"' % (char, chars[char])) + with print_lock: + print('unicode found, replacing "%s" with "%s"' % (char, chars[char])) return chars[char] return re.sub("(" + "|".join(list(chars.keys())) + ")", replace_chars, item) @@ -136,7 +157,7 @@ def modify_item(item: str, error: str) -> str: tmp_filename = next(tempfile._get_candidate_names()) with open(tmp_filename, "w", encoding="utf-8") as f: - preamble = "do not delete these lines\n" "error found:\n" + preamble = "do not delete these lines\n" + "error found:\n" preamble += error r = help_unicode(item) if r is not None: @@ -221,22 +242,26 @@ def check_latex_entry(key: str, tex: str, use_bibtex: bool = False) -> Optional[ return error -lock = threading.Lock() -def run_entry(entry, db, fix_unicode, substitutions): + +global_counter = 0 + + +def run_entry(entry, nentries, db, fix_unicode): + with print_lock: + global global_counter + global_counter += 1 + print(f"checking {entry.key} {global_counter}/{nentries}") raw_original = entry.raw.strip() raw_proposed = raw_original from_cache = db.query(raw_original) if from_cache is not None: - if raw_original != from_cache: - with lock: - substitutions.append((raw_original, from_cache)) return if fix_unicode: raw_proposed = replace_unicode(raw_original) if raw_proposed != raw_original: - with lock: + with print_lock: print(f"unicode found in {entry.key}, fixing") while True: @@ -244,14 +269,13 @@ def run_entry(entry, db, fix_unicode, substitutions): if error is None: break - with lock: + with print_lock: print(f"problem running item {entry.key}") raw_proposed = modify_item(raw_proposed, error).strip() if raw_original != raw_proposed: - with lock: + with print_lock: print(diff_strings(raw_original, raw_proposed)) - substitutions.append((raw_original, raw_proposed)) db.update(entry.key, raw_original, raw_proposed) @@ -263,13 +287,13 @@ def run_entry(entry, db, fix_unicode, substitutions): ) parser.add_argument("bibtex", default="https://inspirehep.net/") parser.add_argument("--fix-unicode", action="store_true") + parser.add_argument("--nthreads", type=int, default=5) parser.add_argument( "--use-bibtex", action="store_true", help="use bibtex instead of biblatex" ) args = parser.parse_args() try: - substitutions = [] biblio_parsed = bibtexparser.parse_file(args.bibtex) db = DataBase("db.sqlite") @@ -279,14 +303,27 @@ def run_entry(entry, db, fix_unicode, substitutions): print("found %d entries" % len(biblio_parsed.entries)) nentries = len(biblio_parsed.entries) - from multiprocessing.pool import ThreadPool - import functools - p = ThreadPool(4) - p.map(functools.partial(run_entry, db=db, fix_unicode=args.fix_unicode, substitutions=substitutions), biblio_parsed.entries) - finally: + with ThreadPoolExecutor(max_workers=args.nthreads) as p: + partial_function = functools.partial( + run_entry, nentries=nentries, db=db, fix_unicode=args.fix_unicode + ) + futures = { + p.submit(partial_function, entry): entry.key + for entry in biblio_parsed.entries + } + for future in as_completed(futures): + key = futures[future] + try: + future.result() + except Exception as ex: + print(f"problem with entry: {key}") + raise ex + print(f"finished {key}") + finally: biblio = open(args.bibtex, encoding="utf-8").read() + substitutions = db.substitutions print(f"applying {len(substitutions)} substitutions") for old, new in substitutions: if old == new: From c97ffbc8935cb3edc4783acc89b70a450a42c901 Mon Sep 17 00:00:00 2001 From: Ruggero Turra Date: Thu, 16 Nov 2023 17:00:50 +0100 Subject: [PATCH 3/4] Add progress bar to check_biblio.py --- check_biblio.py | 38 +++++++++++++++++++++----------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/check_biblio.py b/check_biblio.py index 26d8971..bef0022 100755 --- a/check_biblio.py +++ b/check_biblio.py @@ -12,6 +12,7 @@ import threading from concurrent.futures import ThreadPoolExecutor, as_completed import functools +import tqdm import bibtexparser @@ -250,7 +251,7 @@ def run_entry(entry, nentries, db, fix_unicode): with print_lock: global global_counter global_counter += 1 - print(f"checking {entry.key} {global_counter}/{nentries}") + #print(f"checking {entry.key} {global_counter}/{nentries}") raw_original = entry.raw.strip() raw_proposed = raw_original @@ -305,22 +306,25 @@ def run_entry(entry, nentries, db, fix_unicode): nentries = len(biblio_parsed.entries) with ThreadPoolExecutor(max_workers=args.nthreads) as p: - partial_function = functools.partial( - run_entry, nentries=nentries, db=db, fix_unicode=args.fix_unicode - ) - futures = { - p.submit(partial_function, entry): entry.key - for entry in biblio_parsed.entries - } - for future in as_completed(futures): - key = futures[future] - try: - future.result() - except Exception as ex: - print(f"problem with entry: {key}") - raise ex - print(f"finished {key}") - + with tqdm.tqdm(total=nentries) as pbar: + pbar.set_description("checking entries") + pbar.set_postfix_str(f"nthreads={args.nthreads}") + partial_function = functools.partial( + run_entry, nentries=nentries, db=db, fix_unicode=args.fix_unicode + ) + futures = {} + for entry in biblio_parsed.entries: + future = p.submit(partial_function, entry) + future.add_done_callback(lambda _: pbar.update()) + futures[future] = entry.key + for future in as_completed(futures): + key = futures[future] + try: + future.result() + except Exception as ex: + print(f"problem with entry: {key}") + raise ex + pbar.write(f"finished {key}") finally: biblio = open(args.bibtex, encoding="utf-8").read() substitutions = db.substitutions From 18dda1fdbb8917488d79ee8b0dbdc1a15275030d Mon Sep 17 00:00:00 2001 From: Ruggero Turra Date: Thu, 16 Nov 2023 18:44:04 +0100 Subject: [PATCH 4/4] Add logging and fix editor command prompt --- check_biblio.py | 53 ++++++++++++++++++++++++------------------------ requirements.txt | 3 ++- 2 files changed, 29 insertions(+), 27 deletions(-) diff --git a/check_biblio.py b/check_biblio.py index bef0022..72486ee 100755 --- a/check_biblio.py +++ b/check_biblio.py @@ -13,6 +13,7 @@ from concurrent.futures import ThreadPoolExecutor, as_completed import functools import tqdm +import logging import bibtexparser @@ -121,8 +122,7 @@ def replace_unicode(item: str) -> str: def replace_chars(match): char = match.group(0) - with print_lock: - print('unicode found, replacing "%s" with "%s"' % (char, chars[char])) + logging.debug('unicode found, replacing "%s" with "%s"', char, chars[char]) return chars[char] return re.sub("(" + "|".join(list(chars.keys())) + ")", replace_chars, item) @@ -147,13 +147,6 @@ def find_error_latex(filename: str) -> str: def modify_item(item: str, error: str) -> str: editor_command = os.environ.get("EDITOR") - if not editor_command: - print("you haven't defined a default EDITOR, (e.g. export EDITOR=emacs)") - editor_command = input( - "enter the command to open an editor (e.g. emacs/atom -w/...): " - ) - os.environ["EDITOR"] = editor_command - editor_command = editor_command.strip() tmp_filename = next(tempfile._get_candidate_names()) @@ -244,26 +237,19 @@ def check_latex_entry(key: str, tex: str, use_bibtex: bool = False) -> Optional[ return error -global_counter = 0 - - -def run_entry(entry, nentries, db, fix_unicode): - with print_lock: - global global_counter - global_counter += 1 - #print(f"checking {entry.key} {global_counter}/{nentries}") +def run_entry(entry, db, fix_unicode) -> None: raw_original = entry.raw.strip() - raw_proposed = raw_original from_cache = db.query(raw_original) if from_cache is not None: return + raw_proposed = raw_original + if fix_unicode: raw_proposed = replace_unicode(raw_original) if raw_proposed != raw_original: - with print_lock: - print(f"unicode found in {entry.key}, fixing") + logging.debug("unicode found in %s, fixing", entry.key) while True: error = check_latex_entry(entry.key, raw_proposed, args.use_bibtex) @@ -294,6 +280,14 @@ def run_entry(entry, nentries, db, fix_unicode): ) args = parser.parse_args() + editor_command = os.environ.get("EDITOR") + if not editor_command: + print("you haven't defined a default EDITOR, (e.g. export EDITOR=emacs)") + editor_command = input( + "enter the command to open an editor (e.g. emacs/atom -w/...): " + ) + os.environ["EDITOR"] = editor_command.strip() + try: biblio_parsed = bibtexparser.parse_file(args.bibtex) db = DataBase("db.sqlite") @@ -309,22 +303,29 @@ def run_entry(entry, nentries, db, fix_unicode): with tqdm.tqdm(total=nentries) as pbar: pbar.set_description("checking entries") pbar.set_postfix_str(f"nthreads={args.nthreads}") - partial_function = functools.partial( - run_entry, nentries=nentries, db=db, fix_unicode=args.fix_unicode - ) + + def partial_function(entry): + with print_lock: + pbar.set_description(entry.key) + run_entry(entry, db, args.fix_unicode) + with print_lock: + pbar.update() + futures = {} + for entry in biblio_parsed.entries: future = p.submit(partial_function, entry) - future.add_done_callback(lambda _: pbar.update()) futures[future] = entry.key for future in as_completed(futures): key = futures[future] try: future.result() except Exception as ex: - print(f"problem with entry: {key}") + with print_lock: + pbar.write(f"problem with entry: {key}") raise ex - pbar.write(f"finished {key}") + with print_lock: + pbar.set_description(key) finally: biblio = open(args.bibtex, encoding="utf-8").read() substitutions = db.substitutions diff --git a/requirements.txt b/requirements.txt index 1c9a5eb..82e4713 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ -bibtexparser==2.0.0b4 \ No newline at end of file +bibtexparser==2.0.0b4 +tqdm