From 4fd1fd5fcc69c3befb14072a535f0de46c8c688e Mon Sep 17 00:00:00 2001 From: Charles Cowart Date: Thu, 16 Jan 2025 16:52:58 -0800 Subject: [PATCH 1/2] Updated FastQCJob to use Jinja2 template. MultiQCJob support removed. --- sequence_processing_pipeline/FastQCJob.py | 125 ++++++------------ .../templates/fastqc_job.sh | 23 ++++ .../tests/test_FastQCJob.py | 26 +--- 3 files changed, 67 insertions(+), 107 deletions(-) create mode 100644 sequence_processing_pipeline/templates/fastqc_job.sh diff --git a/sequence_processing_pipeline/FastQCJob.py b/sequence_processing_pipeline/FastQCJob.py index f5a5555a..4ac272cb 100644 --- a/sequence_processing_pipeline/FastQCJob.py +++ b/sequence_processing_pipeline/FastQCJob.py @@ -1,18 +1,19 @@ -from os import listdir, makedirs -from os.path import exists, join, basename -from sequence_processing_pipeline.Job import Job -from sequence_processing_pipeline.PipelineError import (PipelineError, - JobFailedError) from functools import partial +from jinja2 import Environment from json import dumps import logging +from os import listdir, makedirs +from os.path import join, basename +from sequence_processing_pipeline.Job import Job, KISSLoader +from sequence_processing_pipeline.PipelineError import (PipelineError, + JobFailedError) class FastQCJob(Job): def __init__(self, run_dir, output_path, raw_fastq_files_path, processed_fastq_files_path, nprocs, nthreads, fastqc_path, modules_to_load, qiita_job_id, queue_name, node_count, - wall_time_limit, jmem, pool_size, multiqc_config_file_path, + wall_time_limit, jmem, pool_size, max_array_length, is_amplicon): super().__init__(run_dir, output_path, @@ -36,9 +37,6 @@ def __init__(self, run_dir, output_path, raw_fastq_files_path, self.job_script_path = join(self.output_path, f"{self.job_name}.sh") - self._file_check(multiqc_config_file_path) - self.multiqc_config_file_path = multiqc_config_file_path - self.project_names = [] self.commands, self.project_names = self._get_commands() # for lists greater than n commands, chain the extra commands, @@ -46,6 +44,11 @@ def __init__(self, run_dir, output_path, raw_fastq_files_path, self.commands = self._group_commands(self.commands) self.suffix = 'fastqc.html' + # for projects that use sequence_processing_pipeline as a dependency, + # jinja_env must be set to sequence_processing_pipeline's root path, + # rather than the project's root path. + self.jinja_env = Environment(loader=KISSLoader('templates')) + self._generate_job_script() def _get_commands(self): @@ -217,90 +220,38 @@ def run(self, callback=None): logging.debug(job_info) - # If project-level reports were not needed, MultiQC could simply be - # given the path to the run-directory itself and it will discover all - # of the relevant data files. Confirmed that for a one-project sample- - # sheet, this produces and equivalent report. - - for project in self.project_names: - # MultiQC doesn't like input paths that don't exist. Simply add - # all paths that do exist as input. - input_path_list = [] - p_path = partial(join, self.output_path, 'fastqc') - - for filter_type in ['bclconvert', 'trimmed_sequences', - 'filtered_sequences', 'amplicon']: - input_path_list.append(p_path(project, filter_type)) - - input_path_list.append(p_path(project, 'Reports')) - - p_path = partial(join, self.processed_fastq_files_path, project) - input_path_list.append(p_path('fastp_reports_dir', 'json')) - - # I don't usually see a json directory associated with raw data. - # It looks to be metadata coming directly off the machine, in the - # few instances I've seen it in /sequencing... - p_path = partial(join, self.raw_fastq_files_path, project) - input_path_list.append(p_path('json')) - - input_path_list = [x for x in input_path_list if exists(x)] - - cmd_head = ['multiqc', '-c', self.multiqc_config_file_path, - '--fullnames', '--force'] - - # --interactive graphs is set to True in MultiQC configuration - # file and hence this switch was redunant and now removed. - cmd_tail = ['-o', join(self.output_path, 'multiqc', project)] - - cmd = ' '.join(cmd_head + input_path_list + cmd_tail) - - results = self._system_call(cmd, callback=callback) - - if results['return_code'] != 0: - raise PipelineError("multiqc encountered an error") - if self._get_failed_indexes(job_info['job_id']): # raise error if list isn't empty. raise PipelineError("FastQCJob did not complete successfully.") def _generate_job_script(self): - lines = [] - - details_file_name = f'{self.job_name}.array-details' - sh_details_fp = join(self.output_path, details_file_name) + # bypass generating job script for a force-fail job, since it is + # not needed. + if self.force_job_fail: + return None - lines.append("#!/bin/bash") + template = self.jinja_env.get_template("fastqc_job.sh") job_name = f'{self.qiita_job_id}_{self.job_name}' - lines.append(f"#SBATCH --job-name {job_name}") - lines.append(f"#SBATCH -p {self.queue_name}") - lines.append(f"#SBATCH -N {self.node_count}") - lines.append(f"#SBATCH -n {self.nprocs}") - lines.append("#SBATCH --time %d" % self.wall_time_limit) - lines.append(f"#SBATCH --mem {self.jmem}") - lines.append("#SBATCH --array 1-%d%%%d" % ( - len(self.commands), self.pool_size)) - - lines.append("set -x") - lines.append("set +e") - lines.append('date') - lines.append('hostname') - lines.append('echo ${SLURM_JOBID} ${SLURM_ARRAY_TASK_ID}') - lines.append(f'cd {self.output_path}') - - if self.modules_to_load: - lines.append("module load " + ' '.join(self.modules_to_load)) - - lines.append('offset=${SLURM_ARRAY_TASK_ID}') - lines.append('step=$(( $offset - 0 ))') - lines.append(f'cmd0=$(head -n $step {sh_details_fp} | tail -n 1)') - lines.append('eval $cmd0') - - sentinel_file = f'{self.job_name}_$step.completed' - lines.append(f'echo "Cmd Completed: $cmd0" > logs/{sentinel_file}') - - with open(self.job_script_path, 'w') as f: - f.write('\n'.join(lines)) - - with open(sh_details_fp, 'w') as f: + details_file_name = f'{self.job_name}.array-details' + array_details = join(self.output_path, details_file_name) + array_params = "1-%d%%%d" % (len(self.commands), self.pool_size) + modules_to_load = ' '.join(self.modules_to_load) + + with open(self.job_script_path, mode="w", encoding="utf-8") as f: + f.write(template.render(job_name=job_name, + array_details=array_details, + queue_name=self.queue_name, + node_count=self.node_count, + nprocs=self.nprocs, + wall_time_limit=self.wall_time_limit, + mem_in_gb=self.jmem, + array_params=array_params, + output_path=self.output_path, + modules_to_load=modules_to_load)) + + # save the .details file as well + with open(array_details, 'w') as f: f.write('\n'.join(self.commands)) + + return self.job_script_path diff --git a/sequence_processing_pipeline/templates/fastqc_job.sh b/sequence_processing_pipeline/templates/fastqc_job.sh new file mode 100644 index 00000000..8b797063 --- /dev/null +++ b/sequence_processing_pipeline/templates/fastqc_job.sh @@ -0,0 +1,23 @@ +#!/bin/bash +#SBATCH -J {{job_name}} +#SBATCH -p {{queue_name}} +#SBATCH -N {{node_count}} +#SBATCH -n {{nprocs}} +#SBATCH --time {{wall_time_limit}} +#SBATCH --mem {{mem_in_gb}}G +#SBATCH --array {{array_params}} +#SBATCH --constraint="intel" +set -x +set +e +date +hostname +echo ${SLURM_JOBID} ${SLURM_ARRAY_TASK_ID} +cd {{output_path}} +{% if modules_to_load is defined %} + module load {{modules_to_load}} +{% endif %} +offset=${SLURM_ARRAY_TASK_ID} +step=$(( $offset - 0 )) +cmd0=$(head -n $step {{array_details}} | tail -n 1) +eval $cmd0 +echo "Cmd Completed: $cmd0" > logs/FastQCJob_$step.completed \ No newline at end of file diff --git a/sequence_processing_pipeline/tests/test_FastQCJob.py b/sequence_processing_pipeline/tests/test_FastQCJob.py index a2291296..93bd4785 100644 --- a/sequence_processing_pipeline/tests/test_FastQCJob.py +++ b/sequence_processing_pipeline/tests/test_FastQCJob.py @@ -576,20 +576,6 @@ def tearDown(self): rmtree(zero_path) - def test_config_file_not_found(self): - with self.assertRaises(PipelineError) as e: - FastQCJob(self.qc_root_path, self.output_path, - self.raw_fastq_files_path.replace('/project1', ''), - self.processed_fastq_files_path, 16, 16, - 'sequence_processing_pipeline/tests/bin/fastqc', [], - self.qiita_job_id, 'queue_name', 4, 23, '8g', 30, - ('sequence_processing_pipeline/' - 'not-multiqc-bclconvert-config.yaml'), 1000, False) - - self.assertEqual(str(e.exception), "file 'sequence_processing_pipeline" - "/not-multiqc-bclconvert-config." - "yaml' does not exist.") - def test_generate_job_scripts(self): job = FastQCJob(self.qc_root_path, self.output_path, self.raw_fastq_files_path.replace('/project1', ''), @@ -597,7 +583,7 @@ def test_generate_job_scripts(self): 16, 16, 'sequence_processing_pipeline/tests/bin/fastqc', [], self.qiita_job_id, 'queue_name', 4, 23, '8g', 30, - self.config_yml, 1000, False) + 1000, False) self.assertEqual(exists(join(job.output_path, 'FastQCJob.sh')), True) self.assertEqual(exists(join(job.output_path, @@ -610,7 +596,7 @@ def test_audit(self): 16, 16, 'sequence_processing_pipeline/tests/bin/fastqc', [], self.qiita_job_id, 'queue_name', 4, 23, '8g', 30, - self.config_yml, 1000, False) + 1000, False) obs = job.audit(self.sample_ids) @@ -1044,7 +1030,7 @@ def test_all_zero_files(self): 16, 16, 'sequence_processing_pipeline/tests/bin/fastqc', [], self.qiita_job_id, 'queue_name', 4, 23, '8g', 30, - self.config_yml, 1000, False) + 1000, False) self.assertEqual(str(e.exception), "There are no fastq files for " "FastQCJob to process in sequence" @@ -1059,7 +1045,7 @@ def test_completed_file_generation(self): 16, 16, 'sequence_processing_pipeline/tests/bin/fastqc', [], self.qiita_job_id, 'queue_name', 4, 23, '8g', 30, - self.config_yml, 1000, False) + 1000, False) my_path = join(self.output_path, 'FastQCJob', 'logs') @@ -1079,7 +1065,7 @@ def test_completed_file_generation_some_failures(self): 16, 16, 'sequence_processing_pipeline/tests/bin/fastqc', [], self.qiita_job_id, 'queue_name', 4, 23, '8g', 30, - self.config_yml, 1000, False) + 1000, False) my_path = join(self.output_path, 'FastQCJob', 'logs') @@ -1115,7 +1101,7 @@ def test_error_msg_from_logs(self): 16, 16, 'sequence_processing_pipeline/tests/bin/fastqc', [], self.qiita_job_id, 'queue_name', 4, 23, '8g', 30, - self.config_yml, 1000, False) + 1000, False) self.assertFalse(job is None) From b2cddc793e6332a22f9bf7f9d41a08ec4076a3ee Mon Sep 17 00:00:00 2001 From: Charles Cowart Date: Thu, 16 Jan 2025 20:19:39 -0800 Subject: [PATCH 2/2] Added MultiQCJob --- sequence_processing_pipeline/FastQCJob.py | 1 - sequence_processing_pipeline/MultiQCJob.py | 233 ++++++++++++++++++ .../templates/fastqc_job.sh | 1 - .../templates/multiqc_job.sh | 22 ++ 4 files changed, 255 insertions(+), 2 deletions(-) create mode 100644 sequence_processing_pipeline/MultiQCJob.py create mode 100644 sequence_processing_pipeline/templates/multiqc_job.sh diff --git a/sequence_processing_pipeline/FastQCJob.py b/sequence_processing_pipeline/FastQCJob.py index 4ac272cb..63b562c0 100644 --- a/sequence_processing_pipeline/FastQCJob.py +++ b/sequence_processing_pipeline/FastQCJob.py @@ -37,7 +37,6 @@ def __init__(self, run_dir, output_path, raw_fastq_files_path, self.job_script_path = join(self.output_path, f"{self.job_name}.sh") - self.project_names = [] self.commands, self.project_names = self._get_commands() # for lists greater than n commands, chain the extra commands, # distributing them evenly throughout the first n commands. diff --git a/sequence_processing_pipeline/MultiQCJob.py b/sequence_processing_pipeline/MultiQCJob.py new file mode 100644 index 00000000..966aeec7 --- /dev/null +++ b/sequence_processing_pipeline/MultiQCJob.py @@ -0,0 +1,233 @@ +from functools import partial +from jinja2 import Environment +from json import dumps +import logging +from os import listdir +from os.path import join, basename, exists, sep, split +from sequence_processing_pipeline.Job import Job, KISSLoader +from sequence_processing_pipeline.PipelineError import (PipelineError, + JobFailedError) +from sequence_processing_pipeline.util import determine_orientation + + +class MultiQCJob(Job): + def __init__(self, run_dir, output_path, raw_fastq_files_path, + processed_fastq_files_path, nprocs, nthreads, multiqc_path, + modules_to_load, qiita_job_id, queue_name, node_count, + wall_time_limit, jmem, pool_size, + max_array_length, is_amplicon): + super().__init__(run_dir, + output_path, + 'MultiQCJob', + [multiqc_path], + max_array_length, + modules_to_load=modules_to_load) + + self.nprocs = nprocs + self.nthreads = nthreads + self.multiqc_path = multiqc_path + self.queue_name = queue_name + self.node_count = node_count + self.wall_time_limit = wall_time_limit + self.jmem = jmem + self.qiita_job_id = qiita_job_id + self.pool_size = pool_size + self.raw_fastq_files_path = raw_fastq_files_path + self.processed_fastq_files_path = processed_fastq_files_path + self.is_amplicon = is_amplicon + + self.job_script_path = join(self.output_path, f"{self.job_name}.sh") + + # for projects that use sequence_processing_pipeline as a dependency, + # jinja_env must be set to sequence_processing_pipeline's root path, + # rather than the project's root path. + self.jinja_env = Environment(loader=KISSLoader('templates')) + + self._generate_job_script() + + def _find_projects(self): + find_paths = [self.processed_fastq_files_path] + + if not self.is_amplicon: + # avoid processing the raw fastq files for amplicon runs because + # they are identical to files in self.processed_fastq_files_path. + find_paths += [self.raw_fastq_files_path] + + projects = [] + + for fastq_files_path in find_paths: + for directory in listdir(fastq_files_path): + # confirm that this directory has data we want to show to + # multiqc. + + # generate a list of all files in this directory. + files = self._find_files(join(fastq_files_path, directory)) + + # filter out all files that aren't fastq.gz files. + files = [x for x in files if x.endswith('.fastq.gz')] + + for _file in files: + # split path into a list of folder names and the filename. + # filter out the contents of any folders that we don't + # want included in the report. + file_path, file_name = split(_file) + + folders_present = [x for x in file_path.split(sep) + if x in ['zero_files', + 'only-adapter-filtered']] + + if folders_present: + # if one or more of the folders are present in _file's + # path, then do not consider this file. + continue + + # lastly, only consider folders that contain at least one + # R1 file. + if determine_orientation(file_name) != 'R1': + continue + + # according to legacy behavior, if _file has met the above + # criteria, then add the value of directory as a project + # name. + projects.append(directory) + + if projects: + # remove duplicates + return list(set(projects)) + + raise PipelineError("There are no fastq files for MultiQCJob to " + "process") + + def _get_failed_indexes(self, job_id): + completed_files = self._find_files(self.output_path) + completed_files = [x for x in completed_files if + x.endswith('.completed')] + + completed_indexes = [] + + for completed_file in completed_files: + # remove path and .completed extension from file-name. e.g.: + # 'project1_0', 'project1_1', ..., 'project1_n' + completed_file = basename(completed_file).replace('.completed', '') + # extract the line number in the .detailed file corresponding to + # the command used for this job + completed_indexes.append(int(completed_file.split('_')[-1])) + + # a successfully completed job array should have a list of array + # numbers from 0 - len(self.commands). + all_indexes = [x for x in range(1, len(self.commands) + 1)] + failed_indexes = list(set(all_indexes) - set(completed_indexes)) + failed_indexes.sort() + + # generate log-file here instead of in run() where it can be + # unittested more easily. + log_fp = join(self.output_path, + 'logs', + f'failed_indexes_{job_id}.json') + + if failed_indexes: + with open(log_fp, 'w') as f: + f.write(dumps({'job_id': job_id, + 'failed_indexes': failed_indexes}, indent=2)) + + return failed_indexes + + def _get_commands(self): + # If project-level reports were not needed, MultiQC could simply be + # given the path to the run-directory itself and it will discover all + # of the relevant data files. Confirmed that for a one-project sample- + # sheet, this produces an equivalent report. + + array_cmds = [] + + for project in self._find_projects(): + # MultiQC doesn't like input paths that don't exist. Simply add + # all paths that do exist as input. + input_path_list = [] + p_path = partial(join, self.output_path, 'fastqc') + + for filter_type in ['bclconvert', 'trimmed_sequences', + 'filtered_sequences', 'amplicon']: + input_path_list.append(p_path(project, filter_type)) + + input_path_list.append(p_path(project, 'Reports')) + + p_path = partial(join, self.processed_fastq_files_path, project) + input_path_list.append(p_path('fastp_reports_dir', 'json')) + + # I don't usually see a json directory associated with raw data. + # It looks to be metadata coming directly off the machine, in the + # few instances I've seen it in /sequencing... + p_path = partial(join, self.raw_fastq_files_path, project) + input_path_list.append(p_path('json')) + + input_path_list = [x for x in input_path_list if exists(x)] + + cmd_head = ['multiqc', '-c', self.multiqc_config_file_path, + '--fullnames', '--force'] + + # --interactive graphs is set to True in MultiQC configuration + # file and hence this switch was redunant and now removed. + cmd_tail = ['-o', join(self.output_path, 'multiqc', project)] + + array_cmds.append(' '.join(cmd_head + input_path_list + cmd_tail)) + + # These commands are okay to execute in parallel because each command + # is limited to a specific project and each invocation creates its own + # multiqc/project output directory so there will not be collisions. + # These commands must be executed after FastQCJob has completed for + # FastQC report results to be included, however. + return array_cmds + + def _generate_job_script(self): + # bypass generating job script for a force-fail job, since it is + # not needed. + if self.force_job_fail: + return None + + template = self.jinja_env.get_template("multiqc_job.sh") + + array_cmds = self._get_commands() + + job_name = f'{self.qiita_job_id}_{self.job_name}' + details_file_name = f'{self.job_name}.array-details' + array_details = join(self.output_path, details_file_name) + array_params = "1-%d%%%d" % (len(array_cmds), self.pool_size) + modules_to_load = ' '.join(self.modules_to_load) + + with open(self.job_script_path, mode="w", encoding="utf-8") as f: + f.write(template.render(job_name=job_name, + array_details=array_details, + queue_name=self.queue_name, + node_count=self.node_count, + nprocs=self.nprocs, + wall_time_limit=self.wall_time_limit, + mem_in_gb=self.jmem, + array_params=array_params, + output_path=self.output_path, + modules_to_load=modules_to_load)) + + # save the .details file as well + with open(array_details, 'w') as f: + f.write('\n'.join(array_cmds)) + + return self.job_script_path + + def run(self, callback=None): + try: + job_info = self.submit_job(self.job_script_path, + exec_from=self.log_path, + callback=callback) + except JobFailedError as e: + # When a job has failed, parse the logs generated by this specific + # job to return a more descriptive message to the user. + info = self.parse_logs() + # prepend just the message component of the Error. + info.insert(0, str(e)) + raise JobFailedError('\n'.join(info)) + + logging.debug(job_info) + + if self._get_failed_indexes(job_info['job_id']): + # raise error if list isn't empty. + raise PipelineError("MultiQCJob did not complete successfully.") diff --git a/sequence_processing_pipeline/templates/fastqc_job.sh b/sequence_processing_pipeline/templates/fastqc_job.sh index 8b797063..349ebc2f 100644 --- a/sequence_processing_pipeline/templates/fastqc_job.sh +++ b/sequence_processing_pipeline/templates/fastqc_job.sh @@ -6,7 +6,6 @@ #SBATCH --time {{wall_time_limit}} #SBATCH --mem {{mem_in_gb}}G #SBATCH --array {{array_params}} -#SBATCH --constraint="intel" set -x set +e date diff --git a/sequence_processing_pipeline/templates/multiqc_job.sh b/sequence_processing_pipeline/templates/multiqc_job.sh new file mode 100644 index 00000000..202e5aef --- /dev/null +++ b/sequence_processing_pipeline/templates/multiqc_job.sh @@ -0,0 +1,22 @@ +#!/bin/bash +#SBATCH -J {{job_name}} +#SBATCH -p {{queue_name}} +#SBATCH -N {{node_count}} +#SBATCH -n {{nprocs}} +#SBATCH --time {{wall_time_limit}} +#SBATCH --mem {{mem_in_gb}}G +#SBATCH --array {{array_params}} +set -x +set +e +date +hostname +echo ${SLURM_JOBID} ${SLURM_ARRAY_TASK_ID} +cd {{output_path}} +{% if modules_to_load is defined %} + module load {{modules_to_load}} +{% endif %} +offset=${SLURM_ARRAY_TASK_ID} +step=$(( $offset - 0 )) +cmd0=$(head -n $step {{array_details}} | tail -n 1) +eval $cmd0 +echo "Cmd Completed: $cmd0" > logs/MultiQCJob_$step.completed \ No newline at end of file