diff --git a/qp_klp/Assays.py b/qp_klp/Assays.py index 7a202d55..5e9e8751 100644 --- a/qp_klp/Assays.py +++ b/qp_klp/Assays.py @@ -1,14 +1,12 @@ -from os import listdir, makedirs -from os.path import isfile +from os import listdir, makedirs, walk +from os.path import isfile, join, basename, dirname, abspath from shutil import copyfile from sequence_processing_pipeline.NuQCJob import NuQCJob from sequence_processing_pipeline.FastQCJob import FastQCJob from sequence_processing_pipeline.GenPrepFileJob import GenPrepFileJob -from os.path import join import pandas as pd from json import dumps from collections import defaultdict -from os.path import basename, dirname ASSAY_NAME_NONE = "Assay" @@ -253,7 +251,7 @@ def generate_prep_file(self): seqpro_path, config['modules_to_load'], self.master_qiita_job_id, - join(self.pipeline.output_path, 'ConvertJob'), + self.reports_path, is_amplicon=True) if 'GenPrepFileJob' not in self.skip_steps: @@ -417,11 +415,6 @@ def generate_reports(self): def generate_prep_file(self): config = self.pipeline.get_software_configuration('seqpro') - if 'ConvertJob' in self.raw_fastq_files_path: - reports_dir = join(self.pipeline.output_path, 'ConvertJob') - elif 'TRIntegrateJob' in self.raw_fastq_files_path: - reports_dir = join(self.pipeline.output_path, 'SeqCountsJob') - job = GenPrepFileJob(self.pipeline.run_dir, self.raw_fastq_files_path, join(self.pipeline.output_path, 'NuQCJob'), @@ -430,7 +423,7 @@ def generate_prep_file(self): config['seqpro_path'], config['modules_to_load'], self.master_qiita_job_id, - reports_dir) + self.reports_path) if 'GenPrepFileJob' not in self.skip_steps: job.run(callback=self.job_callback) @@ -505,6 +498,103 @@ class Metagenomic(MetaOmic): METAGENOMIC_TYPE = 'Metagenomic' assay_type = ASSAY_NAME_METAGENOMIC + def execute_pipeline(self): + ''' + Executes steps of pipeline in proper sequence. + :return: None + ''' + self.pre_check() + + self.generate_special_map() + + self.update_status("Converting data", 1, 9) + + self.convert_raw_to_fastq() + + self.integrate_results() + + self.generate_sequence_counts() + + self.update_status("Performing quality control", 2, 9) + self.quality_control() + + self.update_status("Generating reports", 3, 9) + self.generate_reports() + + self.update_status("Generating preps", 4, 9) + self.generate_prep_file() + + # moved final component of genprepfilejob outside of object. + # obtain the paths to the prep-files generated by GenPrepFileJob + # w/out having to recover full state. + tmp = join(self.pipeline.output_path, 'GenPrepFileJob', 'PrepFiles') + + self.has_replicates = False + + prep_paths = [] + self.prep_file_paths = {} + + for root, dirs, files in walk(tmp): + for _file in files: + # breakup the prep-info-file into segments + # (run-id, project_qid, other) and cleave + # the qiita-id from the project_name. + qid = _file.split('.')[1].split('_')[-1] + + if qid not in self.prep_file_paths: + self.prep_file_paths[qid] = [] + + _path = abspath(join(root, _file)) + if _path.endswith('.tsv'): + prep_paths.append(_path) + self.prep_file_paths[qid].append(_path) + + for _dir in dirs: + if _dir == '1': + # if PrepFiles contains the '1' directory, then it's a + # given that this sample-sheet contains replicates. + self.has_replicates = True + + # currently imported from Assay although it is a base method. it + # could be imported into Workflows potentially, since it is a post- + # processing step. All pairings of assay and instrument type need to + # generate prep-info files in the same format. + self.overwrite_prep_files(prep_paths) + + # for now, simply re-run any line below as if it was a new job, even + # for a restart. functionality is idempotent, except for the + # registration of new preps in Qiita. These will simply be removed + # manually. + + # post-processing steps are by default associated with the Workflow + # class, since they deal with fastq files and Qiita, and don't depend + # on assay or instrument type. + self.update_status("Generating sample information", 5, 9) + self.sifs = self.generate_sifs() + + # post-processing step. + self.update_status("Registering blanks in Qiita", 6, 9) + if self.update: + self.update_blanks_in_qiita() + + self.update_status("Loading preps into Qiita", 7, 9) + if self.update: + self.update_prep_templates() + + # before we load preps into Qiita we need to copy the fastq + # files n times for n preps and correct the file-paths each + # prep is pointing to. + self.load_preps_into_qiita() + + self.fsr.generate_report() + + self.update_status("Generating packaging commands", 8, 9) + self.generate_commands() + + self.update_status("Packaging results", 9, 9) + if self.update: + self.execute_commands() + class Metatranscriptomic(MetaOmic): METATRANSCRIPTOMIC_TYPE = 'Metatranscriptomic' diff --git a/qp_klp/Protocol.py b/qp_klp/Protocol.py index 195577d5..82e23b33 100644 --- a/qp_klp/Protocol.py +++ b/qp_klp/Protocol.py @@ -77,6 +77,14 @@ def get_config(command): if 'ConvertJob' not in self.skip_steps: job.run(callback=self.job_callback) + # if successful, set self.reports_path + self.reports_path = join(self.pipeline.output_path, + 'ConvertJob', + 'Reports', + 'Demultiplex_Stats.csv') + # TODO: Include alternative path when using bcl2fastq instead of + # bcl-convert. + # audit the results to determine which samples failed to convert # properly. Append these to the failed-samples report and also # return the list directly to the caller. @@ -157,6 +165,11 @@ def generate_sequence_counts(self): if 'SeqCountsJob' not in self.skip_steps: job.run(callback=self.job_callback) + # if successful, set self.reports_path + self.reports_path = join(self.pipeline.output_path, + 'SeqCountsJob', + 'SeqCounts.csv') + # Do not add an entry to fsr because w/respect to counting, either # all jobs are going to fail or none are going to fail. It's not # likely that we're going to fail to count sequences for only some diff --git a/qp_klp/StandardAmpliconWorkflow.py b/qp_klp/StandardAmpliconWorkflow.py index 75768526..907e1d51 100644 --- a/qp_klp/StandardAmpliconWorkflow.py +++ b/qp_klp/StandardAmpliconWorkflow.py @@ -35,7 +35,7 @@ def __init__(self, **kwargs): # NB: Amplicon workflows don't have failed samples records because # the fastq files are not demultiplexed. - self.master_qiita_job_id = None + self.master_qiita_job_id = self.kwargs['job_id'] self.lane_number = self.kwargs['lane_number'] self.is_restart = bool(self.kwargs['is_restart']) diff --git a/qp_klp/StandardMetagenomicWorkflow.py b/qp_klp/StandardMetagenomicWorkflow.py index e9d9b494..c2c17502 100644 --- a/qp_klp/StandardMetagenomicWorkflow.py +++ b/qp_klp/StandardMetagenomicWorkflow.py @@ -1,6 +1,5 @@ from .Protocol import Illumina -from os.path import join, abspath, exists -from os import walk +from os.path import join, exists from shutil import rmtree from sequence_processing_pipeline.Pipeline import Pipeline from .Assays import Metagenomic @@ -35,7 +34,7 @@ def __init__(self, **kwargs): self.fsr = FailedSamplesRecord(self.kwargs['output_dir'], self.pipeline.sample_sheet.samples) - self.master_qiita_job_id = None + self.master_qiita_job_id = self.kwargs['job_id'] self.lane_number = self.kwargs['lane_number'] self.is_restart = bool(self.kwargs['is_restart']) @@ -67,117 +66,3 @@ def determine_steps_to_skip(self): else: # work stopped before this job could be completed. rmtree(join(out_dir, directory)) - - def execute_pipeline(self): - ''' - Executes steps of pipeline in proper sequence. - :return: None - ''' - if not self.is_restart: - self.pre_check() - - # this is performed even in the event of a restart. - self.generate_special_map() - - # even if a job is being skipped, it's being skipped because it was - # determined that it already completed successfully. Hence, - # increment the status because we are still iterating through them. - - self.update_status("Converting data", 1, 9) - if "ConvertJob" not in self.skip_steps: - # converting raw data to fastq depends heavily on the instrument - # used to generate the run_directory. Hence this method is - # supplied by the instrument mixin. - # NB: convert_raw_to_fastq() now generates fsr on its own. - self.convert_raw_to_fastq() - - self.update_status("Performing quality control", 2, 9) - if "NuQCJob" not in self.skip_steps: - # quality_control generates its own fsr now - self.quality_control(self.pipeline) - - self.update_status("Generating reports", 3, 9) - if "FastQCJob" not in self.skip_steps: - # reports are currently implemented by the assay mixin. This is - # only because metagenomic runs currently require a failed-samples - # report to be generated. This is not done for amplicon runs since - # demultiplexing occurs downstream of SPP. - results = self.generate_reports() - self.fsr_write(results, 'FastQCJob') - - self.update_status("Generating preps", 4, 9) - if "GenPrepFileJob" not in self.skip_steps: - # preps are currently associated with array mixin, but only - # because there are currently some slight differences in how - # FastQCJob gets instantiated(). This could get moved into a - # shared method, but probably still in Assay. - self.generate_prep_file() - - # moved final component of genprepfilejob outside of object. - # obtain the paths to the prep-files generated by GenPrepFileJob - # w/out having to recover full state. - tmp = join(self.pipeline.output_path, 'GenPrepFileJob', 'PrepFiles') - - self.has_replicates = False - - prep_paths = [] - self.prep_file_paths = {} - - for root, dirs, files in walk(tmp): - for _file in files: - # breakup the prep-info-file into segments - # (run-id, project_qid, other) and cleave - # the qiita-id from the project_name. - qid = _file.split('.')[1].split('_')[-1] - - if qid not in self.prep_file_paths: - self.prep_file_paths[qid] = [] - - _path = abspath(join(root, _file)) - if _path.endswith('.tsv'): - prep_paths.append(_path) - self.prep_file_paths[qid].append(_path) - - for _dir in dirs: - if _dir == '1': - # if PrepFiles contains the '1' directory, then it's a - # given that this sample-sheet contains replicates. - self.has_replicates = True - - # currently imported from Assay although it is a base method. it - # could be imported into Workflows potentially, since it is a post- - # processing step. All pairings of assay and instrument type need to - # generate prep-info files in the same format. - self.overwrite_prep_files(prep_paths) - - # for now, simply re-run any line below as if it was a new job, even - # for a restart. functionality is idempotent, except for the - # registration of new preps in Qiita. These will simply be removed - # manually. - - # post-processing steps are by default associated with the Workflow - # class, since they deal with fastq files and Qiita, and don't depend - # on assay or instrument type. - self.update_status("Generating sample information", 5, 9) - self.sifs = self.generate_sifs() - - # post-processing step. - self.update_status("Registering blanks in Qiita", 6, 9) - if self.update: - self.update_blanks_in_qiita() - - self.update_status("Loading preps into Qiita", 7, 9) - if self.update: - self.update_prep_templates() - - # before we load preps into Qiita we need to copy the fastq - # files n times for n preps and correct the file-paths each - # prep is pointing to. - self.load_preps_into_qiita() - - self.update_status("Generating packaging commands", 8, 9) - self.generate_commands() - - self.update_status("Packaging results", 9, 9) - if self.update: - self.execute_commands() diff --git a/qp_klp/StandardMetatranscriptomicWorkflow.py b/qp_klp/StandardMetatranscriptomicWorkflow.py index 43c14c38..0c4c0c5d 100644 --- a/qp_klp/StandardMetatranscriptomicWorkflow.py +++ b/qp_klp/StandardMetatranscriptomicWorkflow.py @@ -1,6 +1,5 @@ from .Protocol import Illumina -from os.path import join, abspath, exists -from os import walk +from os.path import join, exists from shutil import rmtree from sequence_processing_pipeline.Pipeline import Pipeline from .Assays import Metatranscriptomic @@ -36,7 +35,7 @@ def __init__(self, **kwargs): self.fsr = FailedSamplesRecord(self.kwargs['output_dir'], self.pipeline.sample_sheet.samples) - self.master_qiita_job_id = None + self.master_qiita_job_id = self.kwargs['job_id'] self.lane_number = self.kwargs['lane_number'] self.is_restart = bool(self.kwargs['is_restart']) @@ -68,117 +67,3 @@ def determine_steps_to_skip(self): else: # work stopped before this job could be completed. rmtree(join(out_dir, directory)) - - def execute_pipeline(self): - ''' - Executes steps of pipeline in proper sequence. - :return: None - ''' - if not self.is_restart: - self.pre_check() - - # this is performed even in the event of a restart. - self.generate_special_map() - - # even if a job is being skipped, it's being skipped because it was - # determined that it already completed successfully. Hence, - # increment the status because we are still iterating through them. - - self.update_status("Converting data", 1, 9) - if "ConvertJob" not in self.skip_steps: - # converting raw data to fastq depends heavily on the instrument - # used to generate the run_directory. Hence this method is - # supplied by the instrument mixin. - # NB: convert_raw_to_fastq() now generates fsr on its own - results = self.convert_raw_to_fastq() - - self.update_status("Performing quality control", 2, 9) - if "NuQCJob" not in self.skip_steps: - # NB: quality_control generates its own fsr - self.quality_control(self.pipeline) - - self.update_status("Generating reports", 3, 9) - if "FastQCJob" not in self.skip_steps: - # reports are currently implemented by the assay mixin. This is - # only because metaranscriptomic runs currently require a failed- - # samples report to be generated. This is not done for amplicon - # runs since demultiplexing occurs downstream of SPP. - results = self.generate_reports() - self.fsr_write(results, 'FastQCJob') - - self.update_status("Generating preps", 4, 9) - if "GenPrepFileJob" not in self.skip_steps: - # preps are currently associated with array mixin, but only - # because there are currently some slight differences in how - # FastQCJob gets instantiated(). This could get moved into a - # shared method, but probably still in Assay. - self.generate_prep_file() - - # moved final component of genprepfilejob outside of object. - # obtain the paths to the prep-files generated by GenPrepFileJob - # w/out having to recover full state. - tmp = join(self.pipeline.output_path, 'GenPrepFileJob', 'PrepFiles') - - self.has_replicates = False - - prep_paths = [] - self.prep_file_paths = {} - - for root, dirs, files in walk(tmp): - for _file in files: - # breakup the prep-info-file into segments - # (run-id, project_qid, other) and cleave - # the qiita-id from the project_name. - qid = _file.split('.')[1].split('_')[-1] - - if qid not in self.prep_file_paths: - self.prep_file_paths[qid] = [] - - _path = abspath(join(root, _file)) - if _path.endswith('.tsv'): - prep_paths.append(_path) - self.prep_file_paths[qid].append(_path) - - for _dir in dirs: - if _dir == '1': - # if PrepFiles contains the '1' directory, then it's a - # given that this sample-sheet contains replicates. - self.has_replicates = True - - # currently imported from Assay although it is a base method. it - # could be imported into Workflows potentially, since it is a post- - # processing step. All pairings of assay and instrument type need to - # generate prep-info files in the same format. - self.overwrite_prep_files(prep_paths) - - # for now, simply re-run any line below as if it was a new job, even - # for a restart. functionality is idempotent, except for the - # registration of new preps in Qiita. These will simply be removed - # manually. - - # post-processing steps are by default associated with the Workflow - # class, since they deal with fastq files and Qiita, and don't depend - # on assay or instrument type. - self.update_status("Generating sample information", 5, 9) - self.sifs = self.generate_sifs() - - # post-processing step. - self.update_status("Registering blanks in Qiita", 6, 9) - if self.update: - self.update_blanks_in_qiita() - - self.update_status("Loading preps into Qiita", 7, 9) - if self.update: - self.update_prep_templates() - - # before we load preps into Qiita we need to copy the fastq - # files n times for n preps and correct the file-paths each - # prep is pointing to. - self.load_preps_into_qiita() - - self.update_status("Generating packaging commands", 8, 9) - self.generate_commands() - - self.update_status("Packaging results", 9, 9) - if self.update: - self.execute_commands() diff --git a/qp_klp/TellseqMetagenomicWorkflow.py b/qp_klp/TellseqMetagenomicWorkflow.py index be353f2c..0e2aaec2 100644 --- a/qp_klp/TellseqMetagenomicWorkflow.py +++ b/qp_klp/TellseqMetagenomicWorkflow.py @@ -1,12 +1,10 @@ from .Protocol import TellSeq -from os.path import join, abspath, exists -from os import walk +from os.path import join, exists from sequence_processing_pipeline.Pipeline import Pipeline, InstrumentUtils from .Assays import Metagenomic from .Assays import ASSAY_NAME_METAGENOMIC from .Workflows import Workflow from .FailedSamplesRecord import FailedSamplesRecord -from collections import defaultdict class TellSeqMetagenomicWorkflow(Workflow, Metagenomic, TellSeq): @@ -41,7 +39,7 @@ def __init__(self, **kwargs): self.iseq_run = True if type == 'iSeq' else False - self.master_qiita_job_id = None + self.master_qiita_job_id = self.kwargs['job_id'] self.lane_number = self.kwargs['lane_number'] self.is_restart = bool(self.kwargs['is_restart']) @@ -62,7 +60,7 @@ def determine_steps_to_skip(self): out_dir = self.pipeline.output_path directories_to_check = ['TellReadJob', 'TRIntegrateJob', 'NuQCJob', - 'FastQCJob', 'GenPrepFileJob'] + 'FastQCJob', 'SeqCountsJob', 'GenPrepFileJob'] for directory in directories_to_check: if exists(join(out_dir, directory)): @@ -77,105 +75,3 @@ def determine_steps_to_skip(self): msg = "%s doesn't have job completed" % join(out_dir, directory) raise ValueError(msg) - - def execute_pipeline(self): - ''' - Executes steps of pipeline in proper sequence. - :return: None - ''' - - # perform some (re)initialization steps on (re)startup. - self.pre_check() - - # this is performed even in the event of a restart. - self.generate_special_map() - - # even if a job is being skipped, it's being skipped because it was - # determined that it already completed successfully. Hence, - # increment the status because we are still iterating through them. - - self.update_status("Converting data", 1, 9) - - # convert_raw_to_fastq() now performs its own checking of skip_steps. - # convert_raw_to_fastq() now performs its own write to fsr reports. - # This means fsr reports will be accurate even on restarts. - self.convert_raw_to_fastq() - - self.integrate_results() - - self.generate_sequence_counts() - - self.update_status("Performing quality control", 2, 9) - self.quality_control() - - self.update_status("Generating reports", 3, 9) - self.generate_reports() - - self.update_status("Generating preps", 4, 9) - self.generate_prep_file() - - # moved final component of genprepfilejob outside of object. - # obtain the paths to the prep-files generated by GenPrepFileJob - # w/out having to recover full state. - tmp = join(self.pipeline.output_path, 'GenPrepFileJob', 'PrepFiles') - - self.has_replicates = False - - prep_paths = [] - self.prep_file_paths = defaultdict(list) - - for root, dirs, files in walk(tmp): - for _file in files: - # breakup the prep-info-file into segments - # (run-id, project_qid, other) and cleave - # the qiita-id from the project_name. - qid = _file.split('.')[1].split('_')[-1] - - if _file.endswith('.tsv'): - _path = abspath(join(root, _file)) - prep_paths.append(_path) - self.prep_file_paths[qid].append(_path) - - for _dir in dirs: - if _dir == '1': - # if PrepFiles contains the '1' directory, then it's a - # given that this sample-sheet contains replicates. - self.has_replicates = True - - # currently imported from Assay although it is a base method. it - # could be imported into Workflows potentially, since it is a post- - # processing step. All pairings of assay and instrument type need to - # generate prep-info files in the same format. - self.overwrite_prep_files(prep_paths) - - # for now, simply re-run any line below as if it was a new job, even - # for a restart. functionality is idempotent, except for the - # registration of new preps in Qiita. These will simply be removed - # manually. - - # post-processing steps are by default associated with the Workflow - # class, since they deal with fastq files and Qiita, and don't depend - # on assay or instrument type. - self.update_status("Generating sample information", 5, 9) - self.sifs = self.generate_sifs() - - # post-processing step. - self.update_status("Registering blanks in Qiita", 6, 9) - if self.update: - self.update_blanks_in_qiita() - - self.update_status("Loading preps into Qiita", 7, 9) - if self.update: - self.update_prep_templates() - - # before we load preps into Qiita we need to copy the fastq - # files n times for n preps and correct the file-paths each - # prep is pointing to. - self.load_preps_into_qiita() - - self.update_status("Generating packaging commands", 8, 9) - self.generate_commands() - - self.update_status("Packaging results", 9, 9) - if self.update: - self.execute_commands() diff --git a/qp_klp/tests/test_workflows.py b/qp_klp/tests/test_workflows.py index 37f1fcf3..759f8e02 100644 --- a/qp_klp/tests/test_workflows.py +++ b/qp_klp/tests/test_workflows.py @@ -340,7 +340,8 @@ def test_partial_metagenomic_pipeline(self): # confirming its digest. exp = ['#!/bin/bash', - '#SBATCH --job-name None_ConvertJob', + '#SBATCH --job-name 077c4da8-74eb-4184-8860-0207f53623be' + '_ConvertJob', '#SBATCH -p qiita', '#SBATCH -N 1', '#SBATCH -n 16', @@ -437,6 +438,16 @@ def test_partial_metagenomic_pipeline(self): # NuQCJob successful. + # we have specific code to test FailedSamplesReport() itself. All + # we need to do here is confirm that the workflow used has + # instantiated a valid FailedSamplesReport() object, attached it to + # wf.fsr, generate_report() can be run, and that the following two + # files were generated at the location below. + wf.fsr.generate_report() + + self.assertTrue(exists(join(self.output_dir, 'failed_samples.json'))) + self.assertTrue(exists(join(self.output_dir, 'failed_samples.html'))) + def test_partial_metatranscriptomic_pipeline(self): # Tests convert_raw_to_fastq() and quality_control() steps of # StandardMetatranscriptomicWorkflow(), which in turn exercises @@ -538,7 +549,8 @@ def test_partial_metatranscriptomic_pipeline(self): # confirming its digest. exp = [ "#!/bin/bash", - "#SBATCH --job-name None_ConvertJob", + "#SBATCH --job-name 077c4da8-74eb-4184-8860-0207f53623be" + "_ConvertJob", "#SBATCH -p qiita", "#SBATCH -N 1", "#SBATCH -n 16", @@ -714,7 +726,8 @@ def test_partial_amplicon_pipeline(self): # confirm ConvertJob.sh Slurm job script looks exactly as intended by # confirming its digest. exp = ["#!/bin/bash", - "#SBATCH --job-name None_ConvertJob", + "#SBATCH --job-name 077c4da8-74eb-4184-8860-0207f53623be" + "_ConvertJob", "#SBATCH -p qiita", "#SBATCH -N 2", "#SBATCH -n 62",