From a032b6822273ac442877828176214ad6f904c3e1 Mon Sep 17 00:00:00 2001 From: Charles Cowart Date: Thu, 30 Jan 2025 14:09:33 -0800 Subject: [PATCH 01/16] Production hotfixes --- sequence_processing_pipeline/Commands.py | 3 + .../GenPrepFileJob.py | 15 ++- sequence_processing_pipeline/NuQCJob.py | 103 +++++++++++++++++- sequence_processing_pipeline/Pipeline.py | 9 +- 4 files changed, 120 insertions(+), 10 deletions(-) diff --git a/sequence_processing_pipeline/Commands.py b/sequence_processing_pipeline/Commands.py index d8e2e166..51e8f5b5 100644 --- a/sequence_processing_pipeline/Commands.py +++ b/sequence_processing_pipeline/Commands.py @@ -119,6 +119,9 @@ def demux(id_map, fp, out_d, task, maxtask): # '@1', 'LH00444:84:227CNHLT4:7:1101:41955:2443/1 BX:Z:TATGACACATGCGGCCCT' # noqa # '@baz/1 + # NB: from 6d794a37-12cd-4f8e-95d6-72a4b8a1ec1c's only-adapter-filtered results: # noqa + # @A00953:244:HYHYWDSXY:3:1101:14082:3740 1:N:0:CCGTAAGA+TCTAACGC + fname_encoded, sid = i.split(delimiter, 1) if fname_encoded not in openfps: diff --git a/sequence_processing_pipeline/GenPrepFileJob.py b/sequence_processing_pipeline/GenPrepFileJob.py index 129d5050..3f3a7565 100644 --- a/sequence_processing_pipeline/GenPrepFileJob.py +++ b/sequence_processing_pipeline/GenPrepFileJob.py @@ -2,7 +2,7 @@ from sequence_processing_pipeline.PipelineError import PipelineError from os import makedirs, symlink from os.path import join, exists, basename -from shutil import copytree +from shutil import copy from functools import partial from collections import defaultdict from metapool import (demux_sample_sheet, parse_prep, @@ -31,6 +31,11 @@ def __init__(self, run_dir, convert_job_path, qc_job_path, output_path, self.commands = [] self.has_replicates = False self.replicate_count = 0 + # instead of a directory, reports_path should point to the single file + # currently needed by seqpro. This means reports_path should equal: + # /.../ConvertJob/Reports/Demultiplex_Stats.csv not + # /.../ConvertJob/Reports. + self.reports_path = reports_path # make the 'root' of your run_directory @@ -39,14 +44,18 @@ def __init__(self, run_dir, convert_job_path, qc_job_path, output_path, # run_directory # This directory will already exist on restarts, hence avoid - # copying. + # copying. To support legacy seqpro, We will copy the single file + # seqpro needs into a clean sub-directory named 'Reports'. This can + # be fixed when seqpro is refactored. reports_dir = join(self.output_path, self.run_id, 'Reports') if exists(reports_dir): self.is_restart = True else: self.is_restart = False - copytree(self.reports_path, reports_dir) + + makedirs(reports_dir) + copy(self.reports_path, reports_dir) # extracting from either convert_job_path or qc_job_path should # produce equal results. diff --git a/sequence_processing_pipeline/NuQCJob.py b/sequence_processing_pipeline/NuQCJob.py index 0e05b41d..3907e182 100644 --- a/sequence_processing_pipeline/NuQCJob.py +++ b/sequence_processing_pipeline/NuQCJob.py @@ -8,11 +8,13 @@ from shutil import move import logging from sequence_processing_pipeline.Commands import split_similar_size_bins -from sequence_processing_pipeline.util import iter_paired_files +from sequence_processing_pipeline.util import (iter_paired_files, + determine_orientation) from jinja2 import Environment from glob import glob import re from sys import executable +from gzip import open as gzip_open logging.basicConfig(level=logging.DEBUG) @@ -116,6 +118,63 @@ def __init__(self, fastq_root_dir, output_path, sample_sheet_path, self._validate_project_data() + def hack_helper(self): + # get list of raw compressed fastq files. only consider R1 and R2 + # reads. + + # Note that NuQCJob works across all projects in a sample-sheet. if + # there are more than one project in the sample-sheet and if one + # is a TellSeq project and one isn't then that would break an + # an assumption of this helper (one file is representative of all + # files.) However since tellseq requires a special sample-sheet, it + # can be assumed that all projects in a tellseq sample-sheet will be + # tellseq and therefore carry the TellSeq BX metadata. The inverse + # should also be true. + + fastq_paths = glob(self.root_dir + '/*/*.fastq.gz') + fastq_paths = [x for x in fastq_paths + if determine_orientation(x) in ['R1', 'R2']] + + apply_bx = None + + for fp in fastq_paths: + # open a compressed fastq file and read its first line. + with gzip_open(fp, 'r') as f: + line = f.readline() + + # convert the line to regular text and remove newline. + line = line.decode("utf-8").strip() + + # if file is empty process another file until we find + # one that isn't empty. + if line == '': + continue + + # break up sequence id line into sequence id plus possible + # metadata element(s). + line = line.split(' ') + + if len(line) == 1: + # there is no metadata. do not apply 'BX'. + apply_bx = False + break + elif len(line) == 2: + # there is some kind of additional metadata, + # but it may not be BX. + if line[-1].startswith('BX'): + apply_bx = True + break + else: + apply_bx = False + break + else: + raise ValueError("I don't know how to process '%s'" % line) + + if apply_bx is None: + raise ValueError("It seems like all raw files are empty") + + return apply_bx + def _validate_project_data(self): # Validate project settings in [Bioinformatics] for project in self.project_data: @@ -394,15 +453,26 @@ def _generate_mmi_filter_cmds(self, working_dir): cores_to_allocate = int(self.cores_per_task / 2) - if len(self.additional_fastq_tags) > 0: + # hack_helper is a hack that will scan all of the R1 and R2 files + # in self.root_dir until it finds a non-empty file to read. It will + # read the first line of the compressed fastq file and see if it + # contains optional BX metadata. If not it will return False, other + # wise True. + apply_bx = self.hack_helper() + + # the default setting. + tags = "" + t_switch = "" + + if apply_bx & len(self.additional_fastq_tags) > 0: # add tags for known metadata types that fastq files may have # been annotated with. Samtools will safely ignore tags that # are not present. + # NB: This doesn't appear to be true, actually. if there is + # a metadata element but it does not begin with 'BX', supplying + # '-T BX' will cause an error writing output to disk. tags = " -T %s" % ','.join(self.additional_fastq_tags) t_switch = " -y" - else: - tags = "" - t_switch = "" for count, mmi_db_path in enumerate(self.mmi_file_paths): if count == 0: @@ -499,3 +569,26 @@ def _generate_job_script(self, max_bucket_size): pmls_path=self.pmls_path)) return job_script_path + + def parse_logs(self): + log_path = join(self.output_path, 'logs') + files = sorted(glob(join(log_path, '*'))) + msgs = [] + + # assume that the only possible files in logs directory are '.out' + # files and zero, one, or many 'seqs.movi.n.txt.gz' files. + # the latter may be present because the last step of a successful + # job is to rename and move this file into its final location while + # the logs directory is the default 'working' directory for this job + # as this ensures slurm.err and slurm.out files will always be in + # a known location. + + # for now, construct lists of both of these types of files. + output_logs = [x for x in files if x.endswith('.out')] + + for some_file in output_logs: + with open(some_file, 'r') as f: + msgs += [line for line in f.readlines() + if 'error:' in line.lower()] + + return [msg.strip() for msg in msgs] diff --git a/sequence_processing_pipeline/Pipeline.py b/sequence_processing_pipeline/Pipeline.py index 9a30c2a0..ae6db85d 100644 --- a/sequence_processing_pipeline/Pipeline.py +++ b/sequence_processing_pipeline/Pipeline.py @@ -850,8 +850,13 @@ def get_project_info(self, short_names=False): if CONTAINS_REPLICATES_KEY in bi_df.columns.tolist(): # subselect rows in [Bioinformatics] based on whether they # match the project name. - df = bi_df.loc[bi_df['Sample_Project'] == - curr_project_info[proj_name_key]] + + # whether short_names or full_names are requested in the + # results, the match will always need to be against the + # full project name, which is what's expected to be in + # the Sample_Project column. + sample_project = curr_project_info[PROJECT_FULL_NAME_KEY] + df = bi_df.loc[bi_df['Sample_Project'] == sample_project] # since only one project can match by definition, convert # to dict and extract the needed value. curr_contains_reps = df.iloc[0].to_dict()[ From 8625dd6fe6ee70c82df496099a58cca152e27dd7 Mon Sep 17 00:00:00 2001 From: Charles Cowart Date: Fri, 21 Feb 2025 14:29:19 -0800 Subject: [PATCH 02/16] All tests are working --- .../GenPrepFileJob.py | 32 ++++++++++++++++--- sequence_processing_pipeline/NuQCJob.py | 4 +-- .../tests/test_NuQCJob.py | 11 ++++--- 3 files changed, 36 insertions(+), 11 deletions(-) diff --git a/sequence_processing_pipeline/GenPrepFileJob.py b/sequence_processing_pipeline/GenPrepFileJob.py index 3f3a7565..32545694 100644 --- a/sequence_processing_pipeline/GenPrepFileJob.py +++ b/sequence_processing_pipeline/GenPrepFileJob.py @@ -1,8 +1,8 @@ from sequence_processing_pipeline.Job import Job from sequence_processing_pipeline.PipelineError import PipelineError from os import makedirs, symlink -from os.path import join, exists, basename -from shutil import copy +from os.path import isdir, join, exists, basename +from shutil import copy, copytree from functools import partial from collections import defaultdict from metapool import (demux_sample_sheet, parse_prep, @@ -54,8 +54,32 @@ def __init__(self, run_dir, convert_job_path, qc_job_path, output_path, else: self.is_restart = False - makedirs(reports_dir) - copy(self.reports_path, reports_dir) + """ + With copytree(src, dst), src must be an existing directory and dst + must be a path that doesn't already exist. + src cannot be a file. it must be a directory. + + when using copytree to copy a directory, dst must be the path to + the directory where you want the copy PLUS the name of the copied + directory. To give dst the same directory name as src you would + then need to split() the folder name off the end of src and append + it to dst to get a proper value. copytree() DOES NOT put a copy + of src in dst. More like it copies the entire contents of src + recursively into dst, however dst cannot already exist so you + can't use it to copy the contents of a directory into an existing + directory. + + This means that if src is a file and not a directory, you will + need to use copy() to copy the file instead. It also means that + you will need to make reports_dir manually. + """ + + if isdir(self.reports_path): + copytree(self.reports_path, reports_dir) + else: + # assume self.reports_path is a file. + makedirs(reports_dir) + copy(self.reports_path, reports_dir) # extracting from either convert_job_path or qc_job_path should # produce equal results. diff --git a/sequence_processing_pipeline/NuQCJob.py b/sequence_processing_pipeline/NuQCJob.py index 3907e182..09b87281 100644 --- a/sequence_processing_pipeline/NuQCJob.py +++ b/sequence_processing_pipeline/NuQCJob.py @@ -118,7 +118,7 @@ def __init__(self, fastq_root_dir, output_path, sample_sheet_path, self._validate_project_data() - def hack_helper(self): + def contains_bx_metadata(self): # get list of raw compressed fastq files. only consider R1 and R2 # reads. @@ -458,7 +458,7 @@ def _generate_mmi_filter_cmds(self, working_dir): # read the first line of the compressed fastq file and see if it # contains optional BX metadata. If not it will return False, other # wise True. - apply_bx = self.hack_helper() + apply_bx = self.contains_bx_metadata() # the default setting. tags = "" diff --git a/sequence_processing_pipeline/tests/test_NuQCJob.py b/sequence_processing_pipeline/tests/test_NuQCJob.py index 8737552b..a7c8a8c7 100644 --- a/sequence_processing_pipeline/tests/test_NuQCJob.py +++ b/sequence_processing_pipeline/tests/test_NuQCJob.py @@ -10,6 +10,7 @@ from os import makedirs, remove from metapool import load_sample_sheet from os import walk +import gzip class TestNuQCJob(unittest.TestCase): @@ -73,10 +74,11 @@ def setUp(self): for id in ids: fr_fp = join(sample_path, f"{id}_R1_001.fastq.gz") rr_fp = join(sample_path, f"{id}_R2_001.fastq.gz") - with open(fr_fp, "w") as f: - f.write("This is a forward-read file.") - with open(rr_fp, "w") as f: - f.write("This is a reverse-read file.") + + with gzip.open(fr_fp, "wb") as f: + f.write(b"@my_seq_id BX:1:1101:10000:10000\n") + with gzip.open(rr_fp, "wb") as f: + f.write(b"@my_seq_id BX:1:1101:10000:10000\n") self.feist_ids = [ "JM-MEC__Staphylococcus_aureusstrain_BERTI-R08624", @@ -868,7 +870,6 @@ def tearDown(self): if exists(self.tmp_file_path): remove(self.tmp_file_path) - # for test_move_trimmed_files() if exists(self.path("NuQCJob")): shutil.rmtree(self.path("NuQCJob")) From fc4e5afc8781f6d5138c80d729f3f8e7a39a85ef Mon Sep 17 00:00:00 2001 From: Charles Cowart Date: Fri, 21 Feb 2025 15:03:44 -0800 Subject: [PATCH 03/16] Folded in MultiQCJob breakout PR (162) --- sequence_processing_pipeline/ConvertJob.py | 2 + sequence_processing_pipeline/FastQCJob.py | 128 ++++++------------ .../GenPrepFileJob.py | 2 + sequence_processing_pipeline/Job.py | 9 ++ sequence_processing_pipeline/NuQCJob.py | 7 +- sequence_processing_pipeline/SeqCountsJob.py | 4 + .../TRIntegrateJob.py | 2 + sequence_processing_pipeline/TellReadJob.py | 2 + .../tests/test_FastQCJob.py | 26 +--- .../tests/test_NuQCJob.py | 1 + 10 files changed, 74 insertions(+), 109 deletions(-) diff --git a/sequence_processing_pipeline/ConvertJob.py b/sequence_processing_pipeline/ConvertJob.py index dc9b36aa..3a521d8c 100644 --- a/sequence_processing_pipeline/ConvertJob.py +++ b/sequence_processing_pipeline/ConvertJob.py @@ -172,6 +172,8 @@ def run(self, callback=None): info.insert(0, str(e)) raise JobFailedError('\n'.join(info)) + self. mark_job_completed() + logging.info(f'Successful job: {job_info}') def parse_logs(self): diff --git a/sequence_processing_pipeline/FastQCJob.py b/sequence_processing_pipeline/FastQCJob.py index f5a5555a..a52a451a 100644 --- a/sequence_processing_pipeline/FastQCJob.py +++ b/sequence_processing_pipeline/FastQCJob.py @@ -1,18 +1,19 @@ -from os import listdir, makedirs -from os.path import exists, join, basename -from sequence_processing_pipeline.Job import Job -from sequence_processing_pipeline.PipelineError import (PipelineError, - JobFailedError) from functools import partial +from jinja2 import Environment from json import dumps import logging +from os import listdir, makedirs +from os.path import join, basename +from sequence_processing_pipeline.Job import Job, KISSLoader +from sequence_processing_pipeline.PipelineError import (PipelineError, + JobFailedError) class FastQCJob(Job): def __init__(self, run_dir, output_path, raw_fastq_files_path, processed_fastq_files_path, nprocs, nthreads, fastqc_path, modules_to_load, qiita_job_id, queue_name, node_count, - wall_time_limit, jmem, pool_size, multiqc_config_file_path, + wall_time_limit, jmem, pool_size, max_array_length, is_amplicon): super().__init__(run_dir, output_path, @@ -36,16 +37,17 @@ def __init__(self, run_dir, output_path, raw_fastq_files_path, self.job_script_path = join(self.output_path, f"{self.job_name}.sh") - self._file_check(multiqc_config_file_path) - self.multiqc_config_file_path = multiqc_config_file_path - - self.project_names = [] self.commands, self.project_names = self._get_commands() # for lists greater than n commands, chain the extra commands, # distributing them evenly throughout the first n commands. self.commands = self._group_commands(self.commands) self.suffix = 'fastqc.html' + # for projects that use sequence_processing_pipeline as a dependency, + # jinja_env must be set to sequence_processing_pipeline's root path, + # rather than the project's root path. + self.jinja_env = Environment(loader=KISSLoader('templates')) + self._generate_job_script() def _get_commands(self): @@ -217,90 +219,40 @@ def run(self, callback=None): logging.debug(job_info) - # If project-level reports were not needed, MultiQC could simply be - # given the path to the run-directory itself and it will discover all - # of the relevant data files. Confirmed that for a one-project sample- - # sheet, this produces and equivalent report. - - for project in self.project_names: - # MultiQC doesn't like input paths that don't exist. Simply add - # all paths that do exist as input. - input_path_list = [] - p_path = partial(join, self.output_path, 'fastqc') - - for filter_type in ['bclconvert', 'trimmed_sequences', - 'filtered_sequences', 'amplicon']: - input_path_list.append(p_path(project, filter_type)) - - input_path_list.append(p_path(project, 'Reports')) - - p_path = partial(join, self.processed_fastq_files_path, project) - input_path_list.append(p_path('fastp_reports_dir', 'json')) - - # I don't usually see a json directory associated with raw data. - # It looks to be metadata coming directly off the machine, in the - # few instances I've seen it in /sequencing... - p_path = partial(join, self.raw_fastq_files_path, project) - input_path_list.append(p_path('json')) - - input_path_list = [x for x in input_path_list if exists(x)] - - cmd_head = ['multiqc', '-c', self.multiqc_config_file_path, - '--fullnames', '--force'] - - # --interactive graphs is set to True in MultiQC configuration - # file and hence this switch was redunant and now removed. - cmd_tail = ['-o', join(self.output_path, 'multiqc', project)] - - cmd = ' '.join(cmd_head + input_path_list + cmd_tail) - - results = self._system_call(cmd, callback=callback) - - if results['return_code'] != 0: - raise PipelineError("multiqc encountered an error") - if self._get_failed_indexes(job_info['job_id']): # raise error if list isn't empty. raise PipelineError("FastQCJob did not complete successfully.") - def _generate_job_script(self): - lines = [] + self.mark_job_completed() - details_file_name = f'{self.job_name}.array-details' - sh_details_fp = join(self.output_path, details_file_name) + def _generate_job_script(self): + # bypass generating job script for a force-fail job, since it is + # not needed. + if self.force_job_fail: + return None - lines.append("#!/bin/bash") + template = self.jinja_env.get_template("fastqc_job.sh") job_name = f'{self.qiita_job_id}_{self.job_name}' - lines.append(f"#SBATCH --job-name {job_name}") - lines.append(f"#SBATCH -p {self.queue_name}") - lines.append(f"#SBATCH -N {self.node_count}") - lines.append(f"#SBATCH -n {self.nprocs}") - lines.append("#SBATCH --time %d" % self.wall_time_limit) - lines.append(f"#SBATCH --mem {self.jmem}") - lines.append("#SBATCH --array 1-%d%%%d" % ( - len(self.commands), self.pool_size)) - - lines.append("set -x") - lines.append("set +e") - lines.append('date') - lines.append('hostname') - lines.append('echo ${SLURM_JOBID} ${SLURM_ARRAY_TASK_ID}') - lines.append(f'cd {self.output_path}') - - if self.modules_to_load: - lines.append("module load " + ' '.join(self.modules_to_load)) - - lines.append('offset=${SLURM_ARRAY_TASK_ID}') - lines.append('step=$(( $offset - 0 ))') - lines.append(f'cmd0=$(head -n $step {sh_details_fp} | tail -n 1)') - lines.append('eval $cmd0') - - sentinel_file = f'{self.job_name}_$step.completed' - lines.append(f'echo "Cmd Completed: $cmd0" > logs/{sentinel_file}') - - with open(self.job_script_path, 'w') as f: - f.write('\n'.join(lines)) - - with open(sh_details_fp, 'w') as f: + details_file_name = f'{self.job_name}.array-details' + array_details = join(self.output_path, details_file_name) + array_params = "1-%d%%%d" % (len(self.commands), self.pool_size) + modules_to_load = ' '.join(self.modules_to_load) + + with open(self.job_script_path, mode="w", encoding="utf-8") as f: + f.write(template.render(job_name=job_name, + array_details=array_details, + queue_name=self.queue_name, + node_count=self.node_count, + nprocs=self.nprocs, + wall_time_limit=self.wall_time_limit, + mem_in_gb=self.jmem, + array_params=array_params, + output_path=self.output_path, + modules_to_load=modules_to_load)) + + # save the .details file as well + with open(array_details, 'w') as f: f.write('\n'.join(self.commands)) + + return self.job_script_path diff --git a/sequence_processing_pipeline/GenPrepFileJob.py b/sequence_processing_pipeline/GenPrepFileJob.py index 32545694..84a7b711 100644 --- a/sequence_processing_pipeline/GenPrepFileJob.py +++ b/sequence_processing_pipeline/GenPrepFileJob.py @@ -217,3 +217,5 @@ def run(self, callback=None): results[qiita_id] += cmd_results[qiita_id] self.prep_file_paths = results + + self.mark_job_completed() diff --git a/sequence_processing_pipeline/Job.py b/sequence_processing_pipeline/Job.py index 4121bd7f..d2bfc3fe 100644 --- a/sequence_processing_pipeline/Job.py +++ b/sequence_processing_pipeline/Job.py @@ -126,6 +126,15 @@ def run(self): """ raise PipelineError("Base class run() method not implemented.") + def mark_job_completed(self): + with open(join(self.output_path, 'job_completed'), 'w') as f: + f.write("job_completed") + + def mark_post_processing_completed(self): + with open(join(self.output_path, + 'post_processing_completed'), 'w') as f: + f.write("post_processing_completed") + def parse_logs(self): # by default, look for anything to parse in the logs directory. log_path = join(self.output_path, 'logs') diff --git a/sequence_processing_pipeline/NuQCJob.py b/sequence_processing_pipeline/NuQCJob.py index 09b87281..b936d09b 100644 --- a/sequence_processing_pipeline/NuQCJob.py +++ b/sequence_processing_pipeline/NuQCJob.py @@ -326,6 +326,9 @@ def run(self, callback=None): raise JobFailedError('\n'.join(info)) job_id = job_info['job_id'] + + self.mark_job_completed() + logging.debug(f'NuQCJob {job_id} completed') for project in self.project_data: @@ -401,6 +404,8 @@ def run(self, callback=None): empty_files_directory, self.minimum_bytes) + self.mark_post_processing_completed() + def _confirm_job_completed(self): # since NuQCJob processes across all projects in a run, there isn't # a need to iterate by project_name and job_id. @@ -453,7 +458,7 @@ def _generate_mmi_filter_cmds(self, working_dir): cores_to_allocate = int(self.cores_per_task / 2) - # hack_helper is a hack that will scan all of the R1 and R2 files + # contains_bx_metadata() will scan all of the R1 and R2 files # in self.root_dir until it finds a non-empty file to read. It will # read the first line of the compressed fastq file and see if it # contains optional BX metadata. If not it will return False, other diff --git a/sequence_processing_pipeline/SeqCountsJob.py b/sequence_processing_pipeline/SeqCountsJob.py index b88fdb3a..d41d62f5 100644 --- a/sequence_processing_pipeline/SeqCountsJob.py +++ b/sequence_processing_pipeline/SeqCountsJob.py @@ -82,8 +82,12 @@ def run(self, callback=None): info.insert(0, str(e)) raise JobFailedError('\n'.join(info)) + self.mark_job_completed() + self._aggregate_counts(self.sample_sheet_path) + self.mark_post_processing_completed() + logging.debug(f'SeqCountJob {self.job_info["job_id"]} completed') def _generate_job_script(self): diff --git a/sequence_processing_pipeline/TRIntegrateJob.py b/sequence_processing_pipeline/TRIntegrateJob.py index 90494f97..71644b8c 100644 --- a/sequence_processing_pipeline/TRIntegrateJob.py +++ b/sequence_processing_pipeline/TRIntegrateJob.py @@ -107,6 +107,8 @@ def run(self, callback=None): info.insert(0, str(e)) raise JobFailedError('\n'.join(info)) + self.mark_job_completed() + logging.debug(f'TRIntegrateJob {self.job_info["job_id"]} completed') def _process_sample_sheet(self): diff --git a/sequence_processing_pipeline/TellReadJob.py b/sequence_processing_pipeline/TellReadJob.py index 53802736..251327b2 100644 --- a/sequence_processing_pipeline/TellReadJob.py +++ b/sequence_processing_pipeline/TellReadJob.py @@ -95,6 +95,8 @@ def run(self, callback=None): info = str(e) raise JobFailedError('\n'.join(info)) + self.mark_job_completed() + logging.debug(f'TellReadJob {self.job_info["job_id"]} completed') def _process_sample_sheet(self): diff --git a/sequence_processing_pipeline/tests/test_FastQCJob.py b/sequence_processing_pipeline/tests/test_FastQCJob.py index a2291296..93bd4785 100644 --- a/sequence_processing_pipeline/tests/test_FastQCJob.py +++ b/sequence_processing_pipeline/tests/test_FastQCJob.py @@ -576,20 +576,6 @@ def tearDown(self): rmtree(zero_path) - def test_config_file_not_found(self): - with self.assertRaises(PipelineError) as e: - FastQCJob(self.qc_root_path, self.output_path, - self.raw_fastq_files_path.replace('/project1', ''), - self.processed_fastq_files_path, 16, 16, - 'sequence_processing_pipeline/tests/bin/fastqc', [], - self.qiita_job_id, 'queue_name', 4, 23, '8g', 30, - ('sequence_processing_pipeline/' - 'not-multiqc-bclconvert-config.yaml'), 1000, False) - - self.assertEqual(str(e.exception), "file 'sequence_processing_pipeline" - "/not-multiqc-bclconvert-config." - "yaml' does not exist.") - def test_generate_job_scripts(self): job = FastQCJob(self.qc_root_path, self.output_path, self.raw_fastq_files_path.replace('/project1', ''), @@ -597,7 +583,7 @@ def test_generate_job_scripts(self): 16, 16, 'sequence_processing_pipeline/tests/bin/fastqc', [], self.qiita_job_id, 'queue_name', 4, 23, '8g', 30, - self.config_yml, 1000, False) + 1000, False) self.assertEqual(exists(join(job.output_path, 'FastQCJob.sh')), True) self.assertEqual(exists(join(job.output_path, @@ -610,7 +596,7 @@ def test_audit(self): 16, 16, 'sequence_processing_pipeline/tests/bin/fastqc', [], self.qiita_job_id, 'queue_name', 4, 23, '8g', 30, - self.config_yml, 1000, False) + 1000, False) obs = job.audit(self.sample_ids) @@ -1044,7 +1030,7 @@ def test_all_zero_files(self): 16, 16, 'sequence_processing_pipeline/tests/bin/fastqc', [], self.qiita_job_id, 'queue_name', 4, 23, '8g', 30, - self.config_yml, 1000, False) + 1000, False) self.assertEqual(str(e.exception), "There are no fastq files for " "FastQCJob to process in sequence" @@ -1059,7 +1045,7 @@ def test_completed_file_generation(self): 16, 16, 'sequence_processing_pipeline/tests/bin/fastqc', [], self.qiita_job_id, 'queue_name', 4, 23, '8g', 30, - self.config_yml, 1000, False) + 1000, False) my_path = join(self.output_path, 'FastQCJob', 'logs') @@ -1079,7 +1065,7 @@ def test_completed_file_generation_some_failures(self): 16, 16, 'sequence_processing_pipeline/tests/bin/fastqc', [], self.qiita_job_id, 'queue_name', 4, 23, '8g', 30, - self.config_yml, 1000, False) + 1000, False) my_path = join(self.output_path, 'FastQCJob', 'logs') @@ -1115,7 +1101,7 @@ def test_error_msg_from_logs(self): 16, 16, 'sequence_processing_pipeline/tests/bin/fastqc', [], self.qiita_job_id, 'queue_name', 4, 23, '8g', 30, - self.config_yml, 1000, False) + 1000, False) self.assertFalse(job is None) diff --git a/sequence_processing_pipeline/tests/test_NuQCJob.py b/sequence_processing_pipeline/tests/test_NuQCJob.py index a7c8a8c7..c908fee5 100644 --- a/sequence_processing_pipeline/tests/test_NuQCJob.py +++ b/sequence_processing_pipeline/tests/test_NuQCJob.py @@ -870,6 +870,7 @@ def tearDown(self): if exists(self.tmp_file_path): remove(self.tmp_file_path) + # for test_move_trimmed_files() if exists(self.path("NuQCJob")): shutil.rmtree(self.path("NuQCJob")) From 0a57259de84486f8ce36e29333678d32d54417ab Mon Sep 17 00:00:00 2001 From: Charles Cowart Date: Fri, 21 Feb 2025 15:16:07 -0800 Subject: [PATCH 04/16] New files added --- sequence_processing_pipeline/MultiQCJob.py | 233 ++++++++++++++++++ .../templates/fastqc_job.sh | 22 ++ .../templates/multiqc_job.sh | 22 ++ 3 files changed, 277 insertions(+) create mode 100644 sequence_processing_pipeline/MultiQCJob.py create mode 100644 sequence_processing_pipeline/templates/fastqc_job.sh create mode 100644 sequence_processing_pipeline/templates/multiqc_job.sh diff --git a/sequence_processing_pipeline/MultiQCJob.py b/sequence_processing_pipeline/MultiQCJob.py new file mode 100644 index 00000000..966aeec7 --- /dev/null +++ b/sequence_processing_pipeline/MultiQCJob.py @@ -0,0 +1,233 @@ +from functools import partial +from jinja2 import Environment +from json import dumps +import logging +from os import listdir +from os.path import join, basename, exists, sep, split +from sequence_processing_pipeline.Job import Job, KISSLoader +from sequence_processing_pipeline.PipelineError import (PipelineError, + JobFailedError) +from sequence_processing_pipeline.util import determine_orientation + + +class MultiQCJob(Job): + def __init__(self, run_dir, output_path, raw_fastq_files_path, + processed_fastq_files_path, nprocs, nthreads, multiqc_path, + modules_to_load, qiita_job_id, queue_name, node_count, + wall_time_limit, jmem, pool_size, + max_array_length, is_amplicon): + super().__init__(run_dir, + output_path, + 'MultiQCJob', + [multiqc_path], + max_array_length, + modules_to_load=modules_to_load) + + self.nprocs = nprocs + self.nthreads = nthreads + self.multiqc_path = multiqc_path + self.queue_name = queue_name + self.node_count = node_count + self.wall_time_limit = wall_time_limit + self.jmem = jmem + self.qiita_job_id = qiita_job_id + self.pool_size = pool_size + self.raw_fastq_files_path = raw_fastq_files_path + self.processed_fastq_files_path = processed_fastq_files_path + self.is_amplicon = is_amplicon + + self.job_script_path = join(self.output_path, f"{self.job_name}.sh") + + # for projects that use sequence_processing_pipeline as a dependency, + # jinja_env must be set to sequence_processing_pipeline's root path, + # rather than the project's root path. + self.jinja_env = Environment(loader=KISSLoader('templates')) + + self._generate_job_script() + + def _find_projects(self): + find_paths = [self.processed_fastq_files_path] + + if not self.is_amplicon: + # avoid processing the raw fastq files for amplicon runs because + # they are identical to files in self.processed_fastq_files_path. + find_paths += [self.raw_fastq_files_path] + + projects = [] + + for fastq_files_path in find_paths: + for directory in listdir(fastq_files_path): + # confirm that this directory has data we want to show to + # multiqc. + + # generate a list of all files in this directory. + files = self._find_files(join(fastq_files_path, directory)) + + # filter out all files that aren't fastq.gz files. + files = [x for x in files if x.endswith('.fastq.gz')] + + for _file in files: + # split path into a list of folder names and the filename. + # filter out the contents of any folders that we don't + # want included in the report. + file_path, file_name = split(_file) + + folders_present = [x for x in file_path.split(sep) + if x in ['zero_files', + 'only-adapter-filtered']] + + if folders_present: + # if one or more of the folders are present in _file's + # path, then do not consider this file. + continue + + # lastly, only consider folders that contain at least one + # R1 file. + if determine_orientation(file_name) != 'R1': + continue + + # according to legacy behavior, if _file has met the above + # criteria, then add the value of directory as a project + # name. + projects.append(directory) + + if projects: + # remove duplicates + return list(set(projects)) + + raise PipelineError("There are no fastq files for MultiQCJob to " + "process") + + def _get_failed_indexes(self, job_id): + completed_files = self._find_files(self.output_path) + completed_files = [x for x in completed_files if + x.endswith('.completed')] + + completed_indexes = [] + + for completed_file in completed_files: + # remove path and .completed extension from file-name. e.g.: + # 'project1_0', 'project1_1', ..., 'project1_n' + completed_file = basename(completed_file).replace('.completed', '') + # extract the line number in the .detailed file corresponding to + # the command used for this job + completed_indexes.append(int(completed_file.split('_')[-1])) + + # a successfully completed job array should have a list of array + # numbers from 0 - len(self.commands). + all_indexes = [x for x in range(1, len(self.commands) + 1)] + failed_indexes = list(set(all_indexes) - set(completed_indexes)) + failed_indexes.sort() + + # generate log-file here instead of in run() where it can be + # unittested more easily. + log_fp = join(self.output_path, + 'logs', + f'failed_indexes_{job_id}.json') + + if failed_indexes: + with open(log_fp, 'w') as f: + f.write(dumps({'job_id': job_id, + 'failed_indexes': failed_indexes}, indent=2)) + + return failed_indexes + + def _get_commands(self): + # If project-level reports were not needed, MultiQC could simply be + # given the path to the run-directory itself and it will discover all + # of the relevant data files. Confirmed that for a one-project sample- + # sheet, this produces an equivalent report. + + array_cmds = [] + + for project in self._find_projects(): + # MultiQC doesn't like input paths that don't exist. Simply add + # all paths that do exist as input. + input_path_list = [] + p_path = partial(join, self.output_path, 'fastqc') + + for filter_type in ['bclconvert', 'trimmed_sequences', + 'filtered_sequences', 'amplicon']: + input_path_list.append(p_path(project, filter_type)) + + input_path_list.append(p_path(project, 'Reports')) + + p_path = partial(join, self.processed_fastq_files_path, project) + input_path_list.append(p_path('fastp_reports_dir', 'json')) + + # I don't usually see a json directory associated with raw data. + # It looks to be metadata coming directly off the machine, in the + # few instances I've seen it in /sequencing... + p_path = partial(join, self.raw_fastq_files_path, project) + input_path_list.append(p_path('json')) + + input_path_list = [x for x in input_path_list if exists(x)] + + cmd_head = ['multiqc', '-c', self.multiqc_config_file_path, + '--fullnames', '--force'] + + # --interactive graphs is set to True in MultiQC configuration + # file and hence this switch was redunant and now removed. + cmd_tail = ['-o', join(self.output_path, 'multiqc', project)] + + array_cmds.append(' '.join(cmd_head + input_path_list + cmd_tail)) + + # These commands are okay to execute in parallel because each command + # is limited to a specific project and each invocation creates its own + # multiqc/project output directory so there will not be collisions. + # These commands must be executed after FastQCJob has completed for + # FastQC report results to be included, however. + return array_cmds + + def _generate_job_script(self): + # bypass generating job script for a force-fail job, since it is + # not needed. + if self.force_job_fail: + return None + + template = self.jinja_env.get_template("multiqc_job.sh") + + array_cmds = self._get_commands() + + job_name = f'{self.qiita_job_id}_{self.job_name}' + details_file_name = f'{self.job_name}.array-details' + array_details = join(self.output_path, details_file_name) + array_params = "1-%d%%%d" % (len(array_cmds), self.pool_size) + modules_to_load = ' '.join(self.modules_to_load) + + with open(self.job_script_path, mode="w", encoding="utf-8") as f: + f.write(template.render(job_name=job_name, + array_details=array_details, + queue_name=self.queue_name, + node_count=self.node_count, + nprocs=self.nprocs, + wall_time_limit=self.wall_time_limit, + mem_in_gb=self.jmem, + array_params=array_params, + output_path=self.output_path, + modules_to_load=modules_to_load)) + + # save the .details file as well + with open(array_details, 'w') as f: + f.write('\n'.join(array_cmds)) + + return self.job_script_path + + def run(self, callback=None): + try: + job_info = self.submit_job(self.job_script_path, + exec_from=self.log_path, + callback=callback) + except JobFailedError as e: + # When a job has failed, parse the logs generated by this specific + # job to return a more descriptive message to the user. + info = self.parse_logs() + # prepend just the message component of the Error. + info.insert(0, str(e)) + raise JobFailedError('\n'.join(info)) + + logging.debug(job_info) + + if self._get_failed_indexes(job_info['job_id']): + # raise error if list isn't empty. + raise PipelineError("MultiQCJob did not complete successfully.") diff --git a/sequence_processing_pipeline/templates/fastqc_job.sh b/sequence_processing_pipeline/templates/fastqc_job.sh new file mode 100644 index 00000000..349ebc2f --- /dev/null +++ b/sequence_processing_pipeline/templates/fastqc_job.sh @@ -0,0 +1,22 @@ +#!/bin/bash +#SBATCH -J {{job_name}} +#SBATCH -p {{queue_name}} +#SBATCH -N {{node_count}} +#SBATCH -n {{nprocs}} +#SBATCH --time {{wall_time_limit}} +#SBATCH --mem {{mem_in_gb}}G +#SBATCH --array {{array_params}} +set -x +set +e +date +hostname +echo ${SLURM_JOBID} ${SLURM_ARRAY_TASK_ID} +cd {{output_path}} +{% if modules_to_load is defined %} + module load {{modules_to_load}} +{% endif %} +offset=${SLURM_ARRAY_TASK_ID} +step=$(( $offset - 0 )) +cmd0=$(head -n $step {{array_details}} | tail -n 1) +eval $cmd0 +echo "Cmd Completed: $cmd0" > logs/FastQCJob_$step.completed \ No newline at end of file diff --git a/sequence_processing_pipeline/templates/multiqc_job.sh b/sequence_processing_pipeline/templates/multiqc_job.sh new file mode 100644 index 00000000..202e5aef --- /dev/null +++ b/sequence_processing_pipeline/templates/multiqc_job.sh @@ -0,0 +1,22 @@ +#!/bin/bash +#SBATCH -J {{job_name}} +#SBATCH -p {{queue_name}} +#SBATCH -N {{node_count}} +#SBATCH -n {{nprocs}} +#SBATCH --time {{wall_time_limit}} +#SBATCH --mem {{mem_in_gb}}G +#SBATCH --array {{array_params}} +set -x +set +e +date +hostname +echo ${SLURM_JOBID} ${SLURM_ARRAY_TASK_ID} +cd {{output_path}} +{% if modules_to_load is defined %} + module load {{modules_to_load}} +{% endif %} +offset=${SLURM_ARRAY_TASK_ID} +step=$(( $offset - 0 )) +cmd0=$(head -n $step {{array_details}} | tail -n 1) +eval $cmd0 +echo "Cmd Completed: $cmd0" > logs/MultiQCJob_$step.completed \ No newline at end of file From c6dd95c5c5e828532f32256c722e148e5a472039 Mon Sep 17 00:00:00 2001 From: Charles Cowart Date: Fri, 21 Feb 2025 18:20:28 -0800 Subject: [PATCH 05/16] Tests updated --- sequence_processing_pipeline/MultiQCJob.py | 3 +- .../templates/fastqc_job.sh | 2 +- .../tests/test_FastQCJob.py | 68 ++++++- .../tests/test_Job.py | 18 +- .../tests/test_MultiQCJob.py | 177 ++++++++++++++++++ .../tests/test_NuQCJob.py | 4 +- 6 files changed, 261 insertions(+), 11 deletions(-) create mode 100644 sequence_processing_pipeline/tests/test_MultiQCJob.py diff --git a/sequence_processing_pipeline/MultiQCJob.py b/sequence_processing_pipeline/MultiQCJob.py index 966aeec7..109d13a6 100644 --- a/sequence_processing_pipeline/MultiQCJob.py +++ b/sequence_processing_pipeline/MultiQCJob.py @@ -15,7 +15,7 @@ def __init__(self, run_dir, output_path, raw_fastq_files_path, processed_fastq_files_path, nprocs, nthreads, multiqc_path, modules_to_load, qiita_job_id, queue_name, node_count, wall_time_limit, jmem, pool_size, - max_array_length, is_amplicon): + max_array_length, multiqc_config_file_path, is_amplicon): super().__init__(run_dir, output_path, 'MultiQCJob', @@ -34,6 +34,7 @@ def __init__(self, run_dir, output_path, raw_fastq_files_path, self.pool_size = pool_size self.raw_fastq_files_path = raw_fastq_files_path self.processed_fastq_files_path = processed_fastq_files_path + self.multiqc_config_file_path = multiqc_config_file_path self.is_amplicon = is_amplicon self.job_script_path = join(self.output_path, f"{self.job_name}.sh") diff --git a/sequence_processing_pipeline/templates/fastqc_job.sh b/sequence_processing_pipeline/templates/fastqc_job.sh index 349ebc2f..8fff906a 100644 --- a/sequence_processing_pipeline/templates/fastqc_job.sh +++ b/sequence_processing_pipeline/templates/fastqc_job.sh @@ -13,7 +13,7 @@ hostname echo ${SLURM_JOBID} ${SLURM_ARRAY_TASK_ID} cd {{output_path}} {% if modules_to_load is defined %} - module load {{modules_to_load}} +module load {{modules_to_load}} {% endif %} offset=${SLURM_ARRAY_TASK_ID} step=$(( $offset - 0 )) diff --git a/sequence_processing_pipeline/tests/test_FastQCJob.py b/sequence_processing_pipeline/tests/test_FastQCJob.py index 93bd4785..89f5baed 100644 --- a/sequence_processing_pipeline/tests/test_FastQCJob.py +++ b/sequence_processing_pipeline/tests/test_FastQCJob.py @@ -581,13 +581,69 @@ def test_generate_job_scripts(self): self.raw_fastq_files_path.replace('/project1', ''), self.processed_fastq_files_path, 16, 16, - 'sequence_processing_pipeline/tests/bin/fastqc', [], - self.qiita_job_id, 'queue_name', 4, 23, '8g', 30, - 1000, False) + 'sequence_processing_pipeline/tests/bin/fastqc', + ['my_module.1.1'], self.qiita_job_id, 'queue_name', + 4, 23, '8g', 30, 1000, False) + + job_script_path = join(job.output_path, 'FastQCJob.sh') + array_details_path = join(job.output_path, 'FastQCJob.array-details') + self.assertEqual(exists(job_script_path), True) + self.assertEqual(exists(array_details_path), True) + + exp = ["#!/bin/bash", + "#SBATCH -J abcdabcdabcdabcdabcdabcdabcdabcd_FastQCJob", + "#SBATCH -p queue_name", "#SBATCH -N 4", "#SBATCH -n 16", + "#SBATCH --time 23", "#SBATCH --mem 8gG", + "#SBATCH --array 1-4%30", "set -x", "set +e", "date", + "hostname", "echo ${SLURM_JOBID} ${SLURM_ARRAY_TASK_ID}", + "cd sequence_processing_pipeline/tests/data/output_dir2/" + "FastQCJob", "", "module load my_module.1.1", "", + "offset=${SLURM_ARRAY_TASK_ID}", "step=$(( $offset - 0 ))", + "cmd0=$(head -n $step sequence_processing_pipeline/tests/data/" + "output_dir2/FastQCJob/FastQCJob.array-details | tail -n 1)", + "eval $cmd0", "echo \"Cmd Completed: $cmd0\" > logs/FastQCJob_" + "$step.completed"] + + with open(job_script_path, 'r') as f: + obs = f.readlines() + obs = [x.strip() for x in obs] + + for a, b in zip(obs, exp): + self.assertEqual(a, b) + + exp = ["fastqc --noextract -t 16 sequence_processing_pipeline/tests/da" + "ta/211021_A00000_0000_SAMPLE/Data/Fastq/project1/sample1_R1_.f" + "astq.gz sequence_processing_pipeline/tests/data/211021_A00000_" + "0000_SAMPLE/Data/Fastq/project1/sample1_R2_.fastq.gz -o sequen" + "ce_processing_pipeline/tests/data/output_dir2/FastQCJob/fastqc" + "/project1/bclconvert", + "fastqc --noextract -t 16 sequence_processing_pipeline/tests/da" + "ta/211021_A00000_0000_SAMPLE/Data/Fastq/project1/sample2_R1_.f" + "astq.gz sequence_processing_pipeline/tests/data/211021_A00000_" + "0000_SAMPLE/Data/Fastq/project1/sample2_R2_.fastq.gz -o sequen" + "ce_processing_pipeline/tests/data/output_dir2/FastQCJob/fastqc" + "/project1/bclconvert", + "fastqc --noextract -t 16 sequence_processing_pipeline/tests/da" + "ta/211021_A00000_0000_SAMPLE/sample-sequence-directory/project" + "1/filtered_sequences/sample1_R1_.trimmed.fastq.gz sequence_pro" + "cessing_pipeline/tests/data/211021_A00000_0000_SAMPLE/sample-s" + "equence-directory/project1/filtered_sequences/sample1_R2_.trim" + "med.fastq.gz -o sequence_processing_pipeline/tests/data/output" + "_dir2/FastQCJob/fastqc/project1/filtered_sequences", + "fastqc --noextract -t 16 sequence_processing_pipeline/tests/da" + "ta/211021_A00000_0000_SAMPLE/sample-sequence-directory/project" + "1/filtered_sequences/sample2_R1_.trimmed.fastq.gz sequence_pro" + "cessing_pipeline/tests/data/211021_A00000_0000_SAMPLE/sample-s" + "equence-directory/project1/filtered_sequences/sample2_R2_.trim" + "med.fastq.gz -o sequence_processing_pipeline/tests/data/output" + "_dir2/FastQCJob/fastqc/project1/filtered_sequences"] + + with open(array_details_path, 'r') as f: + obs = f.readlines() + obs = [x.strip() for x in obs] - self.assertEqual(exists(join(job.output_path, 'FastQCJob.sh')), True) - self.assertEqual(exists(join(job.output_path, - 'FastQCJob.array-details')), True) + for a, b in zip(obs, exp): + self.assertEqual(a, b) def test_audit(self): job = FastQCJob(self.qc_root_path, self.output_path, diff --git a/sequence_processing_pipeline/tests/test_Job.py b/sequence_processing_pipeline/tests/test_Job.py index 192709b6..714d6f0f 100644 --- a/sequence_processing_pipeline/tests/test_Job.py +++ b/sequence_processing_pipeline/tests/test_Job.py @@ -1,7 +1,7 @@ import unittest from sequence_processing_pipeline.Job import Job from sequence_processing_pipeline.PipelineError import PipelineError -from os.path import abspath, join, dirname, split, isdir +from os.path import abspath, join, dirname, split, isdir, exists from os import makedirs, chmod, remove from functools import partial from shutil import rmtree, copyfile @@ -288,6 +288,22 @@ def test_wait_on_job_ids(self): # query_slurm(), they should be equal. self.assertDictEqual(obs, results) + def test_mark_completed_commands(self): + package_root = abspath('./sequence_processing_pipeline') + self.path = partial(join, package_root, 'tests', 'data') + + job = Job(self.path('211021_A00000_0000_SAMPLE'), + self.path('my_output_dir'), '200nnn_xnnnnn_nnnn_xxxxxxxxxx', + ['ls'], 2, None) + + fp = join(job.output_path, 'job_completed') + + self.assertFalse(exists(fp)) + + job.mark_job_completed() + + self.assertTrue(exists(fp)) + if __name__ == '__main__': unittest.main() diff --git a/sequence_processing_pipeline/tests/test_MultiQCJob.py b/sequence_processing_pipeline/tests/test_MultiQCJob.py new file mode 100644 index 00000000..6c4fa41a --- /dev/null +++ b/sequence_processing_pipeline/tests/test_MultiQCJob.py @@ -0,0 +1,177 @@ +import unittest +from os.path import join, exists +from functools import partial +from sequence_processing_pipeline.MultiQCJob import MultiQCJob +from sequence_processing_pipeline.PipelineError import JobFailedError +from os import makedirs, listdir +from shutil import rmtree, move + + +class TestMultiQCJob(unittest.TestCase): + def setUp(self): + self.maxDiff = None + package_root = 'sequence_processing_pipeline' + self.path = partial(join, package_root, 'tests', 'data') + self.qiita_job_id = 'abcdabcdabcdabcdabcdabcdabcdabcd' + self.output_path = self.path('output_dir2') + self.fastqc_log_path = join(self.output_path, 'logs') + self.raw_fastq_files_path = ('sequence_processing_pipeline/tests/data' + '/211021_A00000_0000_SAMPLE/Data/Fastq/p' + 'roject1') + self.processed_fastq_files_path = ('sequence_processing_pipeline/tests' + '/data/211021_A00000_0000_SAMPLE/sa' + 'mple-sequence-directory') + self.config_yml = join(package_root, 'multiqc-bclconvert-config.yaml') + self.qc_root_path = join(self.output_path, 'QCJob') + makedirs(self.qc_root_path, exist_ok=True) + + base_path = join(self.output_path, 'MultiQCJob') + file_names = [] + makedirs(join(base_path, 'Feist_11661', 'trimmed_sequences'), + exist_ok=True) + makedirs(join(base_path, 'Gerwick_6123', 'filtered_sequences'), + exist_ok=True) + + file_names.append(join(base_path, + 'Feist_11661', + 'trimmed_sequences', + ('CDPH-SAL_Salmonella_Typhi_MDL-143_S1_L003_R1_' + '001.trimmed_fastqc.html'))) + + file_names.append(join(base_path, + 'Gerwick_6123', + 'filtered_sequences', + '3A_S169_L003_R2_001.trimmed_fastqc.html')) + + # write files out to disk + for file_name in file_names: + with open(file_name, 'w') as f2: + f2.write("This is a file.") + + # set up dummy logs + self.fastqc_log_path = join(self.output_path, "MultiQCJob", "logs") + makedirs(self.fastqc_log_path, exist_ok=True) + + log_files = { + 'slurm-9999999_35.out': ["---------------", + "Run details:", + ("hds-fe848a9e-c0e9-49d9-978d-" + "27565a314e8b 1908305 b2-018"), + "---------------", + "+ this", + "+ that", + "+ blah", + ("something error: Generic Standin Error" + " (GSE).")], + 'slurm-9999999_17.out': ["---------------", + "Run details:", + ("hds-fe848a9e-c0e9-49d9-978d-" + "27565a314e8b 1908305 b2-018"), + "---------------", + "+ this", + "+ that", + "+ blah", + ("something error: Another Standin Error" + " (ASE).")] + } + + for log_file in log_files: + fp = join(self.fastqc_log_path, log_file) + + with open(fp, 'w') as f: + lines = log_files[log_file] + for line in lines: + f.write(f"{line}\n") + + def tearDown(self): + rmtree(self.output_path) + + zero_path = join(self.raw_fastq_files_path, 'zero_files') + + if exists(zero_path): + for f in listdir(zero_path): + source = join(zero_path, f) + move(source, join(self.raw_fastq_files_path, f)) + + rmtree(zero_path) + + def test_generate_job_scripts(self): + job = MultiQCJob(self.qc_root_path, self.output_path, + self.raw_fastq_files_path.replace('/project1', ''), + self.processed_fastq_files_path, + 16, 16, + 'sequence_processing_pipeline/tests/bin/multiqc', + ['multiqc.2.0'], self.qiita_job_id, 'queue_name', 4, + 23, '8g', 30, 1000, "sequence_processing_pipeline/" + "multiqc-bclconvert-config.yaml", False) + + job_script_path = join(job.output_path, 'MultiQCJob.sh') + array_details_path = join(job.output_path, 'MultiQCJob.array-details') + self.assertEqual(exists(job_script_path), True) + self.assertEqual(exists(array_details_path), True) + + exp = ["#!/bin/bash", + "#SBATCH -J abcdabcdabcdabcdabcdabcdabcdabcd_MultiQCJob", + "#SBATCH -p queue_name", "#SBATCH -N 4", "#SBATCH -n 16", + "#SBATCH --time 23", "#SBATCH --mem 8gG", + "#SBATCH --array 1-1%30", "set -x", "set +e", "date", + "hostname", "echo ${SLURM_JOBID} ${SLURM_ARRAY_TASK_ID}", + "cd sequence_processing_pipeline/tests/data/output_dir2/" + "MultiQCJob", "", "module load multiqc.2.0", "", + "offset=${SLURM_ARRAY_TASK_ID}", "step=$(( $offset - 0 ))", + "cmd0=$(head -n $step sequence_processing_pipeline/tests/data/" + "output_dir2/MultiQCJob/MultiQCJob.array-details | tail -n 1)", + "eval $cmd0", "echo \"Cmd Completed: $cmd0\" > logs/MultiQCJob_" + "$step.completed"] + + with open(job_script_path, 'r') as f: + obs = f.readlines() + obs = [x.strip() for x in obs] + + for a, b in zip(obs, exp): + self.assertEqual(a, b) + + exp = ["multiqc -c sequence_processing_pipeline/multiqc-bclconvert-con" + "fig.yaml --fullnames --force -o sequence_processing_pipeline/t" + "ests/data/output_dir2/MultiQCJob/multiqc/project1"] + + with open(array_details_path, 'r') as f: + obs = f.readlines() + obs = [x.strip() for x in obs] + + for a, b in zip(obs, exp): + self.assertEqual(a, b) + + def test_error_msg_from_logs(self): + job = MultiQCJob(self.qc_root_path, self.output_path, + self.raw_fastq_files_path.replace('/project1', ''), + self.processed_fastq_files_path, + 16, 16, + 'sequence_processing_pipeline/tests/bin/multiqc', + ['multiqc.2.0'], self.qiita_job_id, 'queue_name', 4, + 23, '8g', 30, 1000, "sequence_processing_pipeline/" + "multiqc-bclconvert-config.yaml", False) + + self.assertFalse(job is None) + + # an internal method to force submit_job() to raise a JobFailedError + # instead of submitting the job w/sbatch and waiting for a failed + # job w/squeue. + self.assertTrue(job._toggle_force_job_fail()) + + try: + job.run() + except JobFailedError as e: + # assert that the text of the original error message was + # preserved, while including known strings from each of the + # sample log-files. + print(str(e)) + self.assertIn('This job died.', str(e)) + self.assertIn('something error: Generic Standin Error (GSE)', + str(e)) + self.assertIn('something error: Another Standin Error (ASE)', + str(e)) + + +if __name__ == '__main__': + unittest.main() diff --git a/sequence_processing_pipeline/tests/test_NuQCJob.py b/sequence_processing_pipeline/tests/test_NuQCJob.py index c908fee5..445d2258 100644 --- a/sequence_processing_pipeline/tests/test_NuQCJob.py +++ b/sequence_processing_pipeline/tests/test_NuQCJob.py @@ -76,9 +76,9 @@ def setUp(self): rr_fp = join(sample_path, f"{id}_R2_001.fastq.gz") with gzip.open(fr_fp, "wb") as f: - f.write(b"@my_seq_id BX:1:1101:10000:10000\n") + f.write(b"@my_seq_id BX:Z:TATGACACATGCGGCCCT\n") with gzip.open(rr_fp, "wb") as f: - f.write(b"@my_seq_id BX:1:1101:10000:10000\n") + f.write(b"@my_seq_id BX:Z:TATGACACATGCGGCCCT\n") self.feist_ids = [ "JM-MEC__Staphylococcus_aureusstrain_BERTI-R08624", From 9cb1d8c3198972ff36ba7237151a0142bddfee6a Mon Sep 17 00:00:00 2001 From: Charles Cowart Date: Fri, 21 Feb 2025 21:57:21 -0800 Subject: [PATCH 06/16] MultiQCJob updated --- sequence_processing_pipeline/MultiQCJob.py | 6 ++++-- .../tests/test_MultiQCJob.py | 15 +++++++++------ 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/sequence_processing_pipeline/MultiQCJob.py b/sequence_processing_pipeline/MultiQCJob.py index 109d13a6..63ad4779 100644 --- a/sequence_processing_pipeline/MultiQCJob.py +++ b/sequence_processing_pipeline/MultiQCJob.py @@ -14,7 +14,7 @@ class MultiQCJob(Job): def __init__(self, run_dir, output_path, raw_fastq_files_path, processed_fastq_files_path, nprocs, nthreads, multiqc_path, modules_to_load, qiita_job_id, queue_name, node_count, - wall_time_limit, jmem, pool_size, + wall_time_limit, jmem, pool_size, fastqc_root_path, max_array_length, multiqc_config_file_path, is_amplicon): super().__init__(run_dir, output_path, @@ -36,6 +36,7 @@ def __init__(self, run_dir, output_path, raw_fastq_files_path, self.processed_fastq_files_path = processed_fastq_files_path self.multiqc_config_file_path = multiqc_config_file_path self.is_amplicon = is_amplicon + self.fastqc_root_path = fastqc_root_path self.job_script_path = join(self.output_path, f"{self.job_name}.sh") @@ -145,7 +146,8 @@ def _get_commands(self): # MultiQC doesn't like input paths that don't exist. Simply add # all paths that do exist as input. input_path_list = [] - p_path = partial(join, self.output_path, 'fastqc') + + p_path = partial(join, self.fastqc_root_path, 'fastqc') for filter_type in ['bclconvert', 'trimmed_sequences', 'filtered_sequences', 'amplicon']: diff --git a/sequence_processing_pipeline/tests/test_MultiQCJob.py b/sequence_processing_pipeline/tests/test_MultiQCJob.py index 6c4fa41a..a8b2fd07 100644 --- a/sequence_processing_pipeline/tests/test_MultiQCJob.py +++ b/sequence_processing_pipeline/tests/test_MultiQCJob.py @@ -14,7 +14,7 @@ def setUp(self): self.path = partial(join, package_root, 'tests', 'data') self.qiita_job_id = 'abcdabcdabcdabcdabcdabcdabcdabcd' self.output_path = self.path('output_dir2') - self.fastqc_log_path = join(self.output_path, 'logs') + self.multiqc_log_path = join(self.output_path, 'logs') self.raw_fastq_files_path = ('sequence_processing_pipeline/tests/data' '/211021_A00000_0000_SAMPLE/Data/Fastq/p' 'roject1') @@ -23,6 +23,7 @@ def setUp(self): 'mple-sequence-directory') self.config_yml = join(package_root, 'multiqc-bclconvert-config.yaml') self.qc_root_path = join(self.output_path, 'QCJob') + self.fastqc_root_path = join(self.output_path, 'FastQCJob') makedirs(self.qc_root_path, exist_ok=True) base_path = join(self.output_path, 'MultiQCJob') @@ -49,8 +50,8 @@ def setUp(self): f2.write("This is a file.") # set up dummy logs - self.fastqc_log_path = join(self.output_path, "MultiQCJob", "logs") - makedirs(self.fastqc_log_path, exist_ok=True) + self.multiqc_log_path = join(self.output_path, "MultiQCJob", "logs") + makedirs(self.multiqc_log_path, exist_ok=True) log_files = { 'slurm-9999999_35.out': ["---------------", @@ -76,7 +77,7 @@ def setUp(self): } for log_file in log_files: - fp = join(self.fastqc_log_path, log_file) + fp = join(self.multiqc_log_path, log_file) with open(fp, 'w') as f: lines = log_files[log_file] @@ -102,7 +103,8 @@ def test_generate_job_scripts(self): 16, 16, 'sequence_processing_pipeline/tests/bin/multiqc', ['multiqc.2.0'], self.qiita_job_id, 'queue_name', 4, - 23, '8g', 30, 1000, "sequence_processing_pipeline/" + 23, '8g', 30, self.fastqc_root_path, 1000, + "sequence_processing_pipeline/" "multiqc-bclconvert-config.yaml", False) job_script_path = join(job.output_path, 'MultiQCJob.sh') @@ -149,7 +151,8 @@ def test_error_msg_from_logs(self): 16, 16, 'sequence_processing_pipeline/tests/bin/multiqc', ['multiqc.2.0'], self.qiita_job_id, 'queue_name', 4, - 23, '8g', 30, 1000, "sequence_processing_pipeline/" + 23, '8g', 30, self.fastqc_root_path, 1000, + "sequence_processing_pipeline/" "multiqc-bclconvert-config.yaml", False) self.assertFalse(job is None) From fe5a9195777054b1e25d8ea4e1bbb6f968c88551 Mon Sep 17 00:00:00 2001 From: Charles Cowart <42684307+charles-cowart@users.noreply.github.com> Date: Sat, 22 Feb 2025 18:24:03 -0800 Subject: [PATCH 07/16] Update sequence_processing_pipeline/MultiQCJob.py Co-authored-by: Daniel McDonald --- sequence_processing_pipeline/MultiQCJob.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sequence_processing_pipeline/MultiQCJob.py b/sequence_processing_pipeline/MultiQCJob.py index 63ad4779..b1124c69 100644 --- a/sequence_processing_pipeline/MultiQCJob.py +++ b/sequence_processing_pipeline/MultiQCJob.py @@ -95,7 +95,7 @@ def _find_projects(self): if projects: # remove duplicates - return list(set(projects)) + return sorted(set(projects)) raise PipelineError("There are no fastq files for MultiQCJob to " "process") From 19ce6c090c4d2fa739df380d5c74bceb154add34 Mon Sep 17 00:00:00 2001 From: Charles Cowart <42684307+charles-cowart@users.noreply.github.com> Date: Sat, 22 Feb 2025 20:32:43 -0800 Subject: [PATCH 08/16] Update sequence_processing_pipeline/MultiQCJob.py Co-authored-by: Daniel McDonald --- sequence_processing_pipeline/MultiQCJob.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sequence_processing_pipeline/MultiQCJob.py b/sequence_processing_pipeline/MultiQCJob.py index b1124c69..3084ec0a 100644 --- a/sequence_processing_pipeline/MultiQCJob.py +++ b/sequence_processing_pipeline/MultiQCJob.py @@ -118,8 +118,7 @@ def _get_failed_indexes(self, job_id): # a successfully completed job array should have a list of array # numbers from 0 - len(self.commands). all_indexes = [x for x in range(1, len(self.commands) + 1)] - failed_indexes = list(set(all_indexes) - set(completed_indexes)) - failed_indexes.sort() + failed_indexes = sorted(set(all_indexes) - set(completed_indexes)) # generate log-file here instead of in run() where it can be # unittested more easily. From abc55b2fecac9e0e0062232a964a3020a9c64cd6 Mon Sep 17 00:00:00 2001 From: Charles Cowart <42684307+charles-cowart@users.noreply.github.com> Date: Sat, 22 Feb 2025 20:53:28 -0800 Subject: [PATCH 09/16] Update sequence_processing_pipeline/MultiQCJob.py Co-authored-by: Daniel McDonald --- sequence_processing_pipeline/MultiQCJob.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sequence_processing_pipeline/MultiQCJob.py b/sequence_processing_pipeline/MultiQCJob.py index 3084ec0a..8adcff5f 100644 --- a/sequence_processing_pipeline/MultiQCJob.py +++ b/sequence_processing_pipeline/MultiQCJob.py @@ -117,7 +117,7 @@ def _get_failed_indexes(self, job_id): # a successfully completed job array should have a list of array # numbers from 0 - len(self.commands). - all_indexes = [x for x in range(1, len(self.commands) + 1)] + all_indexes = list(range(1, len(self.commands) + 1)) failed_indexes = sorted(set(all_indexes) - set(completed_indexes)) # generate log-file here instead of in run() where it can be From 16b9dc898d447e1613e463080c01f35dbd95bce5 Mon Sep 17 00:00:00 2001 From: Charles Cowart Date: Sat, 22 Feb 2025 19:53:08 -0800 Subject: [PATCH 10/16] Updates based on feedback. --- sequence_processing_pipeline/FastQCJob.py | 22 +++++++++---------- sequence_processing_pipeline/MultiQCJob.py | 22 +++++++++---------- .../templates/fastqc_job.sh | 1 + .../templates/multiqc_job.sh | 1 + .../templates/seq_counts.sbatch | 1 + .../tests/data/seq_counts.sbatch | 2 +- .../tests/test_FastQCJob.py | 5 +++-- .../tests/test_MultiQCJob.py | 5 +++-- 8 files changed, 30 insertions(+), 29 deletions(-) diff --git a/sequence_processing_pipeline/FastQCJob.py b/sequence_processing_pipeline/FastQCJob.py index a52a451a..36ef40e1 100644 --- a/sequence_processing_pipeline/FastQCJob.py +++ b/sequence_processing_pipeline/FastQCJob.py @@ -4,6 +4,7 @@ import logging from os import listdir, makedirs from os.path import join, basename +from re import sub from sequence_processing_pipeline.Job import Job, KISSLoader from sequence_processing_pipeline.PipelineError import (PipelineError, JobFailedError) @@ -172,18 +173,15 @@ def _scan_fastq_files(self, is_raw_input=False): def _get_failed_indexes(self, job_id): completed_files = self._find_files(self.output_path) - completed_files = [x for x in completed_files if - x.endswith('.completed')] - - completed_indexes = [] - - for completed_file in completed_files: - # remove path and .completed extension from file-name. e.g.: - # 'project1_0', 'project1_1', ..., 'project1_n' - completed_file = basename(completed_file).replace('.completed', '') - # extract the line number in the .detailed file corresponding to - # the command used for this job - completed_indexes.append(int(completed_file.split('_')[-1])) + + # remove path and .completed extension from file-name. e.g.: + # 'project1_0', 'project1_1', ..., 'project1_n' + completed_files = [sub(r'\.completed$', '', basename(fp)) for fp in + completed_files if fp.endswith('.completed')] + + # extract the line number in the .detailed file corresponding to + # the command used for this job + completed_indexes = [int(cf.split('_')[-1]) for cf in completed_files] # a successfully completed job array should have a list of array # numbers from 0 - len(self.commands). diff --git a/sequence_processing_pipeline/MultiQCJob.py b/sequence_processing_pipeline/MultiQCJob.py index 8adcff5f..75b5b301 100644 --- a/sequence_processing_pipeline/MultiQCJob.py +++ b/sequence_processing_pipeline/MultiQCJob.py @@ -8,6 +8,7 @@ from sequence_processing_pipeline.PipelineError import (PipelineError, JobFailedError) from sequence_processing_pipeline.util import determine_orientation +from re import sub class MultiQCJob(Job): @@ -102,18 +103,15 @@ def _find_projects(self): def _get_failed_indexes(self, job_id): completed_files = self._find_files(self.output_path) - completed_files = [x for x in completed_files if - x.endswith('.completed')] - - completed_indexes = [] - - for completed_file in completed_files: - # remove path and .completed extension from file-name. e.g.: - # 'project1_0', 'project1_1', ..., 'project1_n' - completed_file = basename(completed_file).replace('.completed', '') - # extract the line number in the .detailed file corresponding to - # the command used for this job - completed_indexes.append(int(completed_file.split('_')[-1])) + + # remove path and .completed extension from file-name. e.g.: + # 'project1_0', 'project1_1', ..., 'project1_n' + completed_files = [sub(r'\.completed$', '', basename(fp)) for fp in + completed_files if fp.endswith('.completed')] + + # extract the line number in the .detailed file corresponding to + # the command used for this job + completed_indexes = [int(cf.split('_')[-1]) for cf in completed_files] # a successfully completed job array should have a list of array # numbers from 0 - len(self.commands). diff --git a/sequence_processing_pipeline/templates/fastqc_job.sh b/sequence_processing_pipeline/templates/fastqc_job.sh index 8fff906a..8423418a 100644 --- a/sequence_processing_pipeline/templates/fastqc_job.sh +++ b/sequence_processing_pipeline/templates/fastqc_job.sh @@ -8,6 +8,7 @@ #SBATCH --array {{array_params}} set -x set +e +set -o pipefail date hostname echo ${SLURM_JOBID} ${SLURM_ARRAY_TASK_ID} diff --git a/sequence_processing_pipeline/templates/multiqc_job.sh b/sequence_processing_pipeline/templates/multiqc_job.sh index 202e5aef..8975c4cb 100644 --- a/sequence_processing_pipeline/templates/multiqc_job.sh +++ b/sequence_processing_pipeline/templates/multiqc_job.sh @@ -8,6 +8,7 @@ #SBATCH --array {{array_params}} set -x set +e +set -o pipefail date hostname echo ${SLURM_JOBID} ${SLURM_ARRAY_TASK_ID} diff --git a/sequence_processing_pipeline/templates/seq_counts.sbatch b/sequence_processing_pipeline/templates/seq_counts.sbatch index 5ea12650..a472921b 100644 --- a/sequence_processing_pipeline/templates/seq_counts.sbatch +++ b/sequence_processing_pipeline/templates/seq_counts.sbatch @@ -12,6 +12,7 @@ set -x set -e +set -o pipefail mkdir -p {{output_path}}/logs diff --git a/sequence_processing_pipeline/tests/data/seq_counts.sbatch b/sequence_processing_pipeline/tests/data/seq_counts.sbatch index 70398dfe..17892d47 100644 --- a/sequence_processing_pipeline/tests/data/seq_counts.sbatch +++ b/sequence_processing_pipeline/tests/data/seq_counts.sbatch @@ -12,7 +12,7 @@ set -x set -e - +set -o pipefail mkdir -p sequence_processing_pipeline/tests/2caa8226-cf69-45a3-bd40-1e90ec3d18d0/SeqCountsJob/logs files=($(cat sequence_processing_pipeline/tests/data/files_to_count.txt)) diff --git a/sequence_processing_pipeline/tests/test_FastQCJob.py b/sequence_processing_pipeline/tests/test_FastQCJob.py index 89f5baed..2d713cfe 100644 --- a/sequence_processing_pipeline/tests/test_FastQCJob.py +++ b/sequence_processing_pipeline/tests/test_FastQCJob.py @@ -594,8 +594,9 @@ def test_generate_job_scripts(self): "#SBATCH -J abcdabcdabcdabcdabcdabcdabcdabcd_FastQCJob", "#SBATCH -p queue_name", "#SBATCH -N 4", "#SBATCH -n 16", "#SBATCH --time 23", "#SBATCH --mem 8gG", - "#SBATCH --array 1-4%30", "set -x", "set +e", "date", - "hostname", "echo ${SLURM_JOBID} ${SLURM_ARRAY_TASK_ID}", + "#SBATCH --array 1-4%30", "set -x", "set +e", + "set -o pipefail", "date", "hostname", + "echo ${SLURM_JOBID} ${SLURM_ARRAY_TASK_ID}", "cd sequence_processing_pipeline/tests/data/output_dir2/" "FastQCJob", "", "module load my_module.1.1", "", "offset=${SLURM_ARRAY_TASK_ID}", "step=$(( $offset - 0 ))", diff --git a/sequence_processing_pipeline/tests/test_MultiQCJob.py b/sequence_processing_pipeline/tests/test_MultiQCJob.py index a8b2fd07..3e88e437 100644 --- a/sequence_processing_pipeline/tests/test_MultiQCJob.py +++ b/sequence_processing_pipeline/tests/test_MultiQCJob.py @@ -116,8 +116,9 @@ def test_generate_job_scripts(self): "#SBATCH -J abcdabcdabcdabcdabcdabcdabcdabcd_MultiQCJob", "#SBATCH -p queue_name", "#SBATCH -N 4", "#SBATCH -n 16", "#SBATCH --time 23", "#SBATCH --mem 8gG", - "#SBATCH --array 1-1%30", "set -x", "set +e", "date", - "hostname", "echo ${SLURM_JOBID} ${SLURM_ARRAY_TASK_ID}", + "#SBATCH --array 1-1%30", "set -x", "set +e", + "set -o pipefail", "date", "hostname", + "echo ${SLURM_JOBID} ${SLURM_ARRAY_TASK_ID}", "cd sequence_processing_pipeline/tests/data/output_dir2/" "MultiQCJob", "", "module load multiqc.2.0", "", "offset=${SLURM_ARRAY_TASK_ID}", "step=$(( $offset - 0 ))", From 5c19f8d42275140ab915b8de62f0501c1f3198c3 Mon Sep 17 00:00:00 2001 From: Charles Cowart Date: Sat, 22 Feb 2025 20:56:39 -0800 Subject: [PATCH 11/16] Updates based on feedback --- sequence_processing_pipeline/FastQCJob.py | 2 +- sequence_processing_pipeline/MultiQCJob.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sequence_processing_pipeline/FastQCJob.py b/sequence_processing_pipeline/FastQCJob.py index 36ef40e1..39580915 100644 --- a/sequence_processing_pipeline/FastQCJob.py +++ b/sequence_processing_pipeline/FastQCJob.py @@ -213,7 +213,7 @@ def run(self, callback=None): info = self.parse_logs() # prepend just the message component of the Error. info.insert(0, str(e)) - raise JobFailedError('\n'.join(info)) + raise JobFailedError('\n'.join(info)) from None logging.debug(job_info) diff --git a/sequence_processing_pipeline/MultiQCJob.py b/sequence_processing_pipeline/MultiQCJob.py index 75b5b301..356f8d27 100644 --- a/sequence_processing_pipeline/MultiQCJob.py +++ b/sequence_processing_pipeline/MultiQCJob.py @@ -224,7 +224,7 @@ def run(self, callback=None): info = self.parse_logs() # prepend just the message component of the Error. info.insert(0, str(e)) - raise JobFailedError('\n'.join(info)) + raise JobFailedError('\n'.join(info)) from None logging.debug(job_info) From 77b21de8f17f20f168b4ac47d3fcf1511a87bbd1 Mon Sep 17 00:00:00 2001 From: Charles Cowart Date: Mon, 24 Feb 2025 19:07:06 -0800 Subject: [PATCH 12/16] Updates based on feedback --- .../GenPrepFileJob.py | 30 +------- sequence_processing_pipeline/MultiQCJob.py | 29 +++----- sequence_processing_pipeline/NuQCJob.py | 74 +------------------ 3 files changed, 18 insertions(+), 115 deletions(-) diff --git a/sequence_processing_pipeline/GenPrepFileJob.py b/sequence_processing_pipeline/GenPrepFileJob.py index 84a7b711..579baa29 100644 --- a/sequence_processing_pipeline/GenPrepFileJob.py +++ b/sequence_processing_pipeline/GenPrepFileJob.py @@ -31,17 +31,12 @@ def __init__(self, run_dir, convert_job_path, qc_job_path, output_path, self.commands = [] self.has_replicates = False self.replicate_count = 0 - # instead of a directory, reports_path should point to the single file - # currently needed by seqpro. This means reports_path should equal: - # /.../ConvertJob/Reports/Demultiplex_Stats.csv not - # /.../ConvertJob/Reports. self.reports_path = reports_path - # make the 'root' of your run_directory + # make the 'root' of the 'run_directory'. on restarts it will exist + # already. makedirs(join(self.output_path, self.run_id), exist_ok=True) - # copy bcl-convert's Stats-equivalent directory to the - # run_directory # This directory will already exist on restarts, hence avoid # copying. To support legacy seqpro, We will copy the single file @@ -49,31 +44,12 @@ def __init__(self, run_dir, convert_job_path, qc_job_path, output_path, # be fixed when seqpro is refactored. reports_dir = join(self.output_path, self.run_id, 'Reports') + # handle reports_path being either a directory or a file. if exists(reports_dir): self.is_restart = True else: self.is_restart = False - """ - With copytree(src, dst), src must be an existing directory and dst - must be a path that doesn't already exist. - src cannot be a file. it must be a directory. - - when using copytree to copy a directory, dst must be the path to - the directory where you want the copy PLUS the name of the copied - directory. To give dst the same directory name as src you would - then need to split() the folder name off the end of src and append - it to dst to get a proper value. copytree() DOES NOT put a copy - of src in dst. More like it copies the entire contents of src - recursively into dst, however dst cannot already exist so you - can't use it to copy the contents of a directory into an existing - directory. - - This means that if src is a file and not a directory, you will - need to use copy() to copy the file instead. It also means that - you will need to make reports_dir manually. - """ - if isdir(self.reports_path): copytree(self.reports_path, reports_dir) else: diff --git a/sequence_processing_pipeline/MultiQCJob.py b/sequence_processing_pipeline/MultiQCJob.py index 356f8d27..54b83b3b 100644 --- a/sequence_processing_pipeline/MultiQCJob.py +++ b/sequence_processing_pipeline/MultiQCJob.py @@ -46,7 +46,10 @@ def __init__(self, run_dir, output_path, raw_fastq_files_path, # rather than the project's root path. self.jinja_env = Environment(loader=KISSLoader('templates')) - self._generate_job_script() + # bypass generating job script for a force-fail job, since it is + # not needed. + if not self.force_job_fail: + self._generate_job_script() def _find_projects(self): find_paths = [self.processed_fastq_files_path] @@ -75,11 +78,11 @@ def _find_projects(self): # want included in the report. file_path, file_name = split(_file) - folders_present = [x for x in file_path.split(sep) - if x in ['zero_files', - 'only-adapter-filtered']] + ignore_this_file = [x for x in file_path.split(sep) + if x in ['zero_files', + 'only-adapter-filtered']] - if folders_present: + if ignore_this_file: # if one or more of the folders are present in _file's # path, then do not consider this file. continue @@ -113,19 +116,14 @@ def _get_failed_indexes(self, job_id): # the command used for this job completed_indexes = [int(cf.split('_')[-1]) for cf in completed_files] - # a successfully completed job array should have a list of array - # numbers from 0 - len(self.commands). all_indexes = list(range(1, len(self.commands) + 1)) failed_indexes = sorted(set(all_indexes) - set(completed_indexes)) # generate log-file here instead of in run() where it can be # unittested more easily. - log_fp = join(self.output_path, - 'logs', - f'failed_indexes_{job_id}.json') - if failed_indexes: - with open(log_fp, 'w') as f: + with open(join(self.output_path, 'logs', + f'failed_indexes_{job_id}.json'), 'w') as f: f.write(dumps({'job_id': job_id, 'failed_indexes': failed_indexes}, indent=2)) @@ -180,11 +178,6 @@ def _get_commands(self): return array_cmds def _generate_job_script(self): - # bypass generating job script for a force-fail job, since it is - # not needed. - if self.force_job_fail: - return None - template = self.jinja_env.get_template("multiqc_job.sh") array_cmds = self._get_commands() @@ -209,7 +202,7 @@ def _generate_job_script(self): # save the .details file as well with open(array_details, 'w') as f: - f.write('\n'.join(array_cmds)) + f.write('\n'.join(array_cmds) + '\n') return self.job_script_path diff --git a/sequence_processing_pipeline/NuQCJob.py b/sequence_processing_pipeline/NuQCJob.py index b936d09b..bdecf5a5 100644 --- a/sequence_processing_pipeline/NuQCJob.py +++ b/sequence_processing_pipeline/NuQCJob.py @@ -8,13 +8,11 @@ from shutil import move import logging from sequence_processing_pipeline.Commands import split_similar_size_bins -from sequence_processing_pipeline.util import (iter_paired_files, - determine_orientation) +from sequence_processing_pipeline.util import iter_paired_files from jinja2 import Environment from glob import glob import re from sys import executable -from gzip import open as gzip_open logging.basicConfig(level=logging.DEBUG) @@ -118,63 +116,6 @@ def __init__(self, fastq_root_dir, output_path, sample_sheet_path, self._validate_project_data() - def contains_bx_metadata(self): - # get list of raw compressed fastq files. only consider R1 and R2 - # reads. - - # Note that NuQCJob works across all projects in a sample-sheet. if - # there are more than one project in the sample-sheet and if one - # is a TellSeq project and one isn't then that would break an - # an assumption of this helper (one file is representative of all - # files.) However since tellseq requires a special sample-sheet, it - # can be assumed that all projects in a tellseq sample-sheet will be - # tellseq and therefore carry the TellSeq BX metadata. The inverse - # should also be true. - - fastq_paths = glob(self.root_dir + '/*/*.fastq.gz') - fastq_paths = [x for x in fastq_paths - if determine_orientation(x) in ['R1', 'R2']] - - apply_bx = None - - for fp in fastq_paths: - # open a compressed fastq file and read its first line. - with gzip_open(fp, 'r') as f: - line = f.readline() - - # convert the line to regular text and remove newline. - line = line.decode("utf-8").strip() - - # if file is empty process another file until we find - # one that isn't empty. - if line == '': - continue - - # break up sequence id line into sequence id plus possible - # metadata element(s). - line = line.split(' ') - - if len(line) == 1: - # there is no metadata. do not apply 'BX'. - apply_bx = False - break - elif len(line) == 2: - # there is some kind of additional metadata, - # but it may not be BX. - if line[-1].startswith('BX'): - apply_bx = True - break - else: - apply_bx = False - break - else: - raise ValueError("I don't know how to process '%s'" % line) - - if apply_bx is None: - raise ValueError("It seems like all raw files are empty") - - return apply_bx - def _validate_project_data(self): # Validate project settings in [Bioinformatics] for project in self.project_data: @@ -458,18 +399,11 @@ def _generate_mmi_filter_cmds(self, working_dir): cores_to_allocate = int(self.cores_per_task / 2) - # contains_bx_metadata() will scan all of the R1 and R2 files - # in self.root_dir until it finds a non-empty file to read. It will - # read the first line of the compressed fastq file and see if it - # contains optional BX metadata. If not it will return False, other - # wise True. - apply_bx = self.contains_bx_metadata() - # the default setting. tags = "" t_switch = "" - if apply_bx & len(self.additional_fastq_tags) > 0: + if self.additional_fastq_tags: # add tags for known metadata types that fastq files may have # been annotated with. Samtools will safely ignore tags that # are not present. @@ -593,7 +527,7 @@ def parse_logs(self): for some_file in output_logs: with open(some_file, 'r') as f: - msgs += [line for line in f.readlines() + msgs += [line.strip() for line in f.readlines() if 'error:' in line.lower()] - return [msg.strip() for msg in msgs] + return msgs From 66c60420a5df282ba23df8ed462532b57fdcfba6 Mon Sep 17 00:00:00 2001 From: Antonio Gonzalez Date: Tue, 25 Feb 2025 06:37:11 -0700 Subject: [PATCH 13/16] address @wasade offset comment --- sequence_processing_pipeline/templates/fastqc_job.sh | 5 ++--- sequence_processing_pipeline/templates/multiqc_job.sh | 5 ++--- sequence_processing_pipeline/tests/test_FastQCJob.py | 2 +- sequence_processing_pipeline/tests/test_MultiQCJob.py | 2 +- 4 files changed, 6 insertions(+), 8 deletions(-) diff --git a/sequence_processing_pipeline/templates/fastqc_job.sh b/sequence_processing_pipeline/templates/fastqc_job.sh index 8423418a..2adc5bd7 100644 --- a/sequence_processing_pipeline/templates/fastqc_job.sh +++ b/sequence_processing_pipeline/templates/fastqc_job.sh @@ -16,8 +16,7 @@ cd {{output_path}} {% if modules_to_load is defined %} module load {{modules_to_load}} {% endif %} -offset=${SLURM_ARRAY_TASK_ID} -step=$(( $offset - 0 )) +step=${SLURM_ARRAY_TASK_ID} cmd0=$(head -n $step {{array_details}} | tail -n 1) eval $cmd0 -echo "Cmd Completed: $cmd0" > logs/FastQCJob_$step.completed \ No newline at end of file +echo "Cmd Completed: $cmd0" > logs/FastQCJob_$step.completed diff --git a/sequence_processing_pipeline/templates/multiqc_job.sh b/sequence_processing_pipeline/templates/multiqc_job.sh index 8975c4cb..fb4a516a 100644 --- a/sequence_processing_pipeline/templates/multiqc_job.sh +++ b/sequence_processing_pipeline/templates/multiqc_job.sh @@ -16,8 +16,7 @@ cd {{output_path}} {% if modules_to_load is defined %} module load {{modules_to_load}} {% endif %} -offset=${SLURM_ARRAY_TASK_ID} -step=$(( $offset - 0 )) +step=${SLURM_ARRAY_TASK_ID} cmd0=$(head -n $step {{array_details}} | tail -n 1) eval $cmd0 -echo "Cmd Completed: $cmd0" > logs/MultiQCJob_$step.completed \ No newline at end of file +echo "Cmd Completed: $cmd0" > logs/MultiQCJob_$step.completed diff --git a/sequence_processing_pipeline/tests/test_FastQCJob.py b/sequence_processing_pipeline/tests/test_FastQCJob.py index 2d713cfe..2d3636f6 100644 --- a/sequence_processing_pipeline/tests/test_FastQCJob.py +++ b/sequence_processing_pipeline/tests/test_FastQCJob.py @@ -599,7 +599,7 @@ def test_generate_job_scripts(self): "echo ${SLURM_JOBID} ${SLURM_ARRAY_TASK_ID}", "cd sequence_processing_pipeline/tests/data/output_dir2/" "FastQCJob", "", "module load my_module.1.1", "", - "offset=${SLURM_ARRAY_TASK_ID}", "step=$(( $offset - 0 ))", + "step=${SLURM_ARRAY_TASK_ID}", "cmd0=$(head -n $step sequence_processing_pipeline/tests/data/" "output_dir2/FastQCJob/FastQCJob.array-details | tail -n 1)", "eval $cmd0", "echo \"Cmd Completed: $cmd0\" > logs/FastQCJob_" diff --git a/sequence_processing_pipeline/tests/test_MultiQCJob.py b/sequence_processing_pipeline/tests/test_MultiQCJob.py index 3e88e437..0774de0a 100644 --- a/sequence_processing_pipeline/tests/test_MultiQCJob.py +++ b/sequence_processing_pipeline/tests/test_MultiQCJob.py @@ -121,7 +121,7 @@ def test_generate_job_scripts(self): "echo ${SLURM_JOBID} ${SLURM_ARRAY_TASK_ID}", "cd sequence_processing_pipeline/tests/data/output_dir2/" "MultiQCJob", "", "module load multiqc.2.0", "", - "offset=${SLURM_ARRAY_TASK_ID}", "step=$(( $offset - 0 ))", + "step=${SLURM_ARRAY_TASK_ID}", "cmd0=$(head -n $step sequence_processing_pipeline/tests/data/" "output_dir2/MultiQCJob/MultiQCJob.array-details | tail -n 1)", "eval $cmd0", "echo \"Cmd Completed: $cmd0\" > logs/MultiQCJob_" From 188539af002367578e00786e703f56a2d5415d04 Mon Sep 17 00:00:00 2001 From: Antonio Gonzalez Date: Wed, 5 Mar 2025 15:07:00 -0700 Subject: [PATCH 14/16] fixes after deploy 03.25 --- sequence_processing_pipeline/FastQCJob.py | 31 +++++++++-------------- sequence_processing_pipeline/Job.py | 6 +++++ sequence_processing_pipeline/NuQCJob.py | 1 + 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/sequence_processing_pipeline/FastQCJob.py b/sequence_processing_pipeline/FastQCJob.py index 39580915..24147798 100644 --- a/sequence_processing_pipeline/FastQCJob.py +++ b/sequence_processing_pipeline/FastQCJob.py @@ -58,30 +58,23 @@ def _get_commands(self): """ results = [] - if self.is_amplicon: - # skip this step for amplicon runs since raw and processed are the - # same file. - project_names = [] - else: - # gather the parameters for processing all relevant raw fastq - # files. - params, project_names = self._scan_fastq_files(True) - - for fwd_file_path, rev_file_path, output_path in params: - command = ['fastqc', '--noextract', '-t', str(self.nthreads), - fwd_file_path, rev_file_path, '-o', output_path] - results.append(' '.join(command)) - - # next, do the same for the trimmed/filtered fastq files. - params, additional_project_names = self._scan_fastq_files(False) - + # gather the parameters for processing all relevant raw fastq + # files. + params, project_names = self._scan_fastq_files(True) for fwd_file_path, rev_file_path, output_path in params: command = ['fastqc', '--noextract', '-t', str(self.nthreads), fwd_file_path, rev_file_path, '-o', output_path] results.append(' '.join(command)) - # remove duplicate project names from the list - project_names = list(set(project_names + additional_project_names)) + if not self.is_amplicon: + # next, do the same for the trimmed/filtered fastq files. + params, additional_project_names = self._scan_fastq_files(False) + for fwd_file_path, rev_file_path, output_path in params: + command = ['fastqc', '--noextract', '-t', str(self.nthreads), + fwd_file_path, rev_file_path, '-o', output_path] + results.append(' '.join(command)) + # remove duplicate project names from the list + project_names = list(set(project_names + additional_project_names)) return results, project_names diff --git a/sequence_processing_pipeline/Job.py b/sequence_processing_pipeline/Job.py index d2bfc3fe..0080aa55 100644 --- a/sequence_processing_pipeline/Job.py +++ b/sequence_processing_pipeline/Job.py @@ -92,6 +92,8 @@ def __init__(self, root_dir, output_path, job_name, executable_paths, self.is_test = True if [ x for x in stack() if 'unittest' in x.filename] else False + self.audit_folders = None + # For each executable in the list, get its filename and use _which() # to see if it can be found. Directly pass an optional list of modules # to load before-hand, so that the binary can be found. @@ -475,6 +477,10 @@ def audit(self, sample_ids): for root, dirs, files in walk(self.output_path): if 'zero_files' in root: continue + if self.audit_folders is not None: + # let's check that any of the audit_folders is in root + if not [f for f in self.audit_folders if f in root]: + continue files_found += [join(root, x) for x in files if x.endswith(self.suffix)] diff --git a/sequence_processing_pipeline/NuQCJob.py b/sequence_processing_pipeline/NuQCJob.py index bdecf5a5..3ed9c322 100644 --- a/sequence_processing_pipeline/NuQCJob.py +++ b/sequence_processing_pipeline/NuQCJob.py @@ -80,6 +80,7 @@ def __init__(self, fastq_root_dir, output_path, sample_sheet_path, self.gres_value = gres_value self.pmls_path = pmls_path self.additional_fastq_tags = additional_fastq_tags + self.audit_folders = ['filtered_sequences'] # for projects that use sequence_processing_pipeline as a dependency, # jinja_env must be set to sequence_processing_pipeline's root path, From 6e22b1a0c0e47338ecf6cddf282643f1900c39e9 Mon Sep 17 00:00:00 2001 From: Antonio Gonzalez Date: Thu, 6 Mar 2025 08:58:37 -0700 Subject: [PATCH 15/16] self.commands -> self.array_cmds --- sequence_processing_pipeline/MultiQCJob.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sequence_processing_pipeline/MultiQCJob.py b/sequence_processing_pipeline/MultiQCJob.py index 54b83b3b..31f44703 100644 --- a/sequence_processing_pipeline/MultiQCJob.py +++ b/sequence_processing_pipeline/MultiQCJob.py @@ -116,7 +116,7 @@ def _get_failed_indexes(self, job_id): # the command used for this job completed_indexes = [int(cf.split('_')[-1]) for cf in completed_files] - all_indexes = list(range(1, len(self.commands) + 1)) + all_indexes = list(range(1, len(self.array_cmds) + 1)) failed_indexes = sorted(set(all_indexes) - set(completed_indexes)) # generate log-file here instead of in run() where it can be @@ -180,12 +180,12 @@ def _get_commands(self): def _generate_job_script(self): template = self.jinja_env.get_template("multiqc_job.sh") - array_cmds = self._get_commands() + self.array_cmds = self._get_commands() job_name = f'{self.qiita_job_id}_{self.job_name}' details_file_name = f'{self.job_name}.array-details' array_details = join(self.output_path, details_file_name) - array_params = "1-%d%%%d" % (len(array_cmds), self.pool_size) + array_params = "1-%d%%%d" % (len(self.array_cmds), self.pool_size) modules_to_load = ' '.join(self.modules_to_load) with open(self.job_script_path, mode="w", encoding="utf-8") as f: @@ -202,7 +202,7 @@ def _generate_job_script(self): # save the .details file as well with open(array_details, 'w') as f: - f.write('\n'.join(array_cmds) + '\n') + f.write('\n'.join(self.array_cmds) + '\n') return self.job_script_path From 9cd37afe1f30855581fe72257f56fd4efacb3e02 Mon Sep 17 00:00:00 2001 From: Antonio Gonzalez Date: Thu, 6 Mar 2025 10:39:43 -0700 Subject: [PATCH 16/16] forgotten change --- sequence_processing_pipeline/GenPrepFileJob.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sequence_processing_pipeline/GenPrepFileJob.py b/sequence_processing_pipeline/GenPrepFileJob.py index 579baa29..00d6e901 100644 --- a/sequence_processing_pipeline/GenPrepFileJob.py +++ b/sequence_processing_pipeline/GenPrepFileJob.py @@ -52,7 +52,7 @@ def __init__(self, run_dir, convert_job_path, qc_job_path, output_path, if isdir(self.reports_path): copytree(self.reports_path, reports_dir) - else: + elif not self.is_amplicon: # assume self.reports_path is a file. makedirs(reports_dir) copy(self.reports_path, reports_dir)