Skip to content

Commit

Permalink
Enhanced file extension check (#34)
Browse files Browse the repository at this point in the history
* refactor file_choices method in WiperTool; streamline argument parsing in fastq_gather, fastq_scatter, and fastq_wiper

* fix: correct string formatting in ArgumentTypeError message in WiperTool

* refactor: improve error handling and logging in FastqWiper and GatherReport
  • Loading branch information
mazzalab authored Jan 30, 2025
1 parent 8837edf commit f9324b3
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 213 deletions.
8 changes: 4 additions & 4 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
"args": [
"fastqwiper",
"--fastq_in",
"data/gathered_unix.fastq.gz",
"data/gathered_unix_final.fastq",
"--fastq_out",
"data/gathered_unix_wiped.fq.gz",
"-f",
Expand All @@ -51,8 +51,8 @@
"args": [
"fastqscatter",
"--fastq",
//"data/test2_1.fastq.gz",
"data/a.b.ba1.fastq.gs",
"data/sample_R1.fastq.gz",
// "data/a.b.ba1.fastq.gs",
"--num_splits",
"2",
"--out_folder",
Expand Down Expand Up @@ -94,7 +94,7 @@
"args": [
"fastqgather",
"--in_fastq",
"data/gathered_unix.fq",
"data/sample_R2.fastq.gz",
"data/sample_R1.fastq.gz",
"--out_fastq",
"data/gathered_unix_final.fastq",
Expand Down
150 changes: 74 additions & 76 deletions wipertools/fastq_gather.py
Original file line number Diff line number Diff line change
@@ -1,85 +1,83 @@
import os
import gzip
import logging
import argparse
import subprocess
from pathlib import Path
from enum import auto, Enum
from wipertools.wipertool_abstract import WiperTool


class GatherFastq(WiperTool):
def __init__(self):
super().__init__("fastqgather")
logging.basicConfig(level=logging.DEBUG)

# Inherited methods
def set_parser(self, parser: argparse.ArgumentParser):
class OsEnum(Enum):
UNIX = auto()
CROSS_PLATFORM = auto()

class FastqExtEnum(Enum):
FASTQ = auto()
FQ = auto()
FASTQ_GZ = auto()
FQ_GZ = auto()

def files_choices(choices, fname):
# Extract double extensions if present
path = Path(fname)
if len(path.suffixes) == 2: # Handle double extensions like ".fastq.gz"
# Combine the suffixes and remove the dot
ext = "".join(path.suffixes)[1:]
else:
ext = path.suffix[1:] # Single extension

if ext not in choices:
parser.error(f"File '{fname}' doesn't end with one of {choices}")
raise ValueError(f"File '{fname}' doesn't end with one of {choices}")
return fname

parser.add_argument(
"-i",
"--in_fastq",
nargs="+",
type=lambda s: files_choices(
(e.name.lower().replace("_", ".") for e in FastqExtEnum), s
),
help="List of FASTQ files to be joined",
required=True,
)
parser.add_argument(
"-o",
"--out_fastq",
type=lambda s: files_choices(
(e.name.lower().replace("_", ".") for e in FastqExtEnum), s
),
help="Name of the resulting fastq file",
required=True,
)
# Optional arguments
parser.add_argument(
"-p",
"--prefix",
nargs="?",
help="Prefix common to the files to be joined",
required=False,
)
parser.add_argument(
"-O",
"--os",
help="Underlying OS (Default: %(default)s)",
default="cross_platform",
choices=[e.name.lower() for e in OsEnum],
required=False,
)
# Add a version flag that prints the version and exits
parser.add_argument(
"-v",
"--version",
action="version",
version=self.version(),
help="Print the version and exits",
)
if isinstance(parser, argparse.ArgumentParser):

class OsEnum(Enum):
UNIX = auto()
CROSS_PLATFORM = auto()

class FastqExtEnum(Enum):
FASTQ = auto()
FQ = auto()
FASTQ_GZ = auto()
FQ_GZ = auto()

parser.add_argument(
"-i",
"--in_fastq",
nargs="+",
type=lambda s: WiperTool.file_choices(
[e.name.lower().replace("_", ".") for e in FastqExtEnum], s
),
help="List of FASTQ files to be joined",
required=True,
)
parser.add_argument(
"-o",
"--out_fastq",
type=lambda s: WiperTool.file_choices(
[e.name.lower().replace("_", ".") for e in FastqExtEnum], s
),
help="Name of the resulting fastq file",
required=True,
)
# Optional arguments
parser.add_argument(
"-p",
"--prefix",
nargs="?",
help="Prefix common to the files to be joined",
required=False,
)
parser.add_argument(
"-O",
"--os",
help="Underlying OS (Default: %(default)s)",
default="cross_platform",
choices=[e.name.lower() for e in OsEnum],
required=False,
)
# Add a version flag that prints the version and exits
parser.add_argument(
"-v",
"--version",
action="version",
version=self.version(),
help="Print the version and exits",
)
else:
logging.critical(
" Incorrect parser. set_parser accepts an instance of "
+ f"argparse.Namespace. Passed: {parser}"
)
raise ValueError(
"Incorrect parser. set_parser accepts an instance of "
+ f"argparse.Namespace. Passed: {parser}"
)

def run(self, argv: argparse.Namespace):
in_fastq: list[str] = argv.in_fastq
Expand All @@ -104,7 +102,7 @@ def concatenate_fastq(
)

if not files:
print(f"No files with prefix {prefix}.")
logging.critical(f"No files with prefix {prefix}.")
return

# Separate gzipped files from regular files
Expand All @@ -121,9 +119,9 @@ def concatenate_fastq(
" ".join(regular_files), " ".join(gz_files), output_file
)

print("Files concatenated successfully.")
logging.info("Files concatenated successfully.")
except Exception as e:
print(f"Error while concatenating files: {e}")
logging.critical(f"Error while concatenating files: {e}")

@staticmethod
def __concat_unix(regular_files: str, gz_files: str, outfile: str):
Expand All @@ -138,7 +136,7 @@ def __concat_unix(regular_files: str, gz_files: str, outfile: str):
stdout, stderr = process_regular.communicate()

if process_regular.returncode != 0:
print(f"Error occurred: {stderr.decode()}")
logging.critical(f"Error occurred: {stderr.decode()}")

if gz_files:
process_gzip = subprocess.Popen(
Expand All @@ -151,7 +149,7 @@ def __concat_unix(regular_files: str, gz_files: str, outfile: str):
stdout, stderr = process_gzip.communicate()

if process_gzip.returncode != 0:
print(f"Error occurred: {stderr.decode()}")
logging.critical(f"Error occurred: {stderr.decode()}")

if outfile.endswith(".gz"):
uncompressed_file = outfile.removesuffix(".gz")
Expand All @@ -166,7 +164,7 @@ def __concat_unix(regular_files: str, gz_files: str, outfile: str):
stdout, stderr = process_compress.communicate()

if process_compress.returncode != 0:
print(f"Error occurred: {stderr.decode()}")
logging.critical(f"Error occurred: {stderr.decode()}")

@staticmethod
def __concat_cross_platform(regular_files, gz_files, outfile: str):
Expand All @@ -182,7 +180,7 @@ def __concat_cross_platform(regular_files, gz_files, outfile: str):
# Count the replacement characters
replacement_count = data.count("�")
if replacement_count > 0:
print(f"Warning: File '{file_path}' contains {replacement_count} unreadable characters that were replaced.")
logging.warning(f"Warning: File '{file_path}' contains {replacement_count} unreadable characters that were replaced.")

if isinstance(output_file, gzip.GzipFile):
# Write as bytes for gzip
Expand All @@ -205,7 +203,7 @@ def __concat_cross_platform(regular_files, gz_files, outfile: str):
decoded_data = data.decode("utf-8")
except UnicodeDecodeError:
# Gracefully handle decoding errors
print(f"Warning: Decoding error in {file_path}, replacing invalid characters.")
logging.warning(f"Warning: Decoding error in {file_path}, replacing invalid characters.")
decoded_data = data.decode("utf-8", errors="replace")
output_file.write(decoded_data)

Expand Down
Loading

0 comments on commit f9324b3

Please sign in to comment.