Skip to content

production hotfixes #98

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Mar 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 101 additions & 11 deletions qp_klp/Assays.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
from os import listdir, makedirs
from os.path import isfile
from os import listdir, makedirs, walk
from os.path import isfile, join, basename, dirname, abspath
from shutil import copyfile
from sequence_processing_pipeline.NuQCJob import NuQCJob
from sequence_processing_pipeline.FastQCJob import FastQCJob
from sequence_processing_pipeline.GenPrepFileJob import GenPrepFileJob
from os.path import join
import pandas as pd
from json import dumps
from collections import defaultdict
from os.path import basename, dirname


ASSAY_NAME_NONE = "Assay"
Expand Down Expand Up @@ -253,7 +251,7 @@ def generate_prep_file(self):
seqpro_path,
config['modules_to_load'],
self.master_qiita_job_id,
join(self.pipeline.output_path, 'ConvertJob'),
self.reports_path,
is_amplicon=True)

if 'GenPrepFileJob' not in self.skip_steps:
Expand Down Expand Up @@ -417,11 +415,6 @@ def generate_reports(self):
def generate_prep_file(self):
config = self.pipeline.get_software_configuration('seqpro')

if 'ConvertJob' in self.raw_fastq_files_path:
reports_dir = join(self.pipeline.output_path, 'ConvertJob')
elif 'TRIntegrateJob' in self.raw_fastq_files_path:
reports_dir = join(self.pipeline.output_path, 'SeqCountsJob')

job = GenPrepFileJob(self.pipeline.run_dir,
self.raw_fastq_files_path,
join(self.pipeline.output_path, 'NuQCJob'),
Expand All @@ -430,7 +423,7 @@ def generate_prep_file(self):
config['seqpro_path'],
config['modules_to_load'],
self.master_qiita_job_id,
reports_dir)
self.reports_path)

if 'GenPrepFileJob' not in self.skip_steps:
job.run(callback=self.job_callback)
Expand Down Expand Up @@ -505,6 +498,103 @@ class Metagenomic(MetaOmic):
METAGENOMIC_TYPE = 'Metagenomic'
assay_type = ASSAY_NAME_METAGENOMIC

def execute_pipeline(self):
'''
Executes steps of pipeline in proper sequence.
:return: None
'''
self.pre_check()

self.generate_special_map()

self.update_status("Converting data", 1, 9)

self.convert_raw_to_fastq()

self.integrate_results()

self.generate_sequence_counts()

self.update_status("Performing quality control", 2, 9)
self.quality_control()

self.update_status("Generating reports", 3, 9)
self.generate_reports()

self.update_status("Generating preps", 4, 9)
self.generate_prep_file()

# moved final component of genprepfilejob outside of object.
# obtain the paths to the prep-files generated by GenPrepFileJob
# w/out having to recover full state.
tmp = join(self.pipeline.output_path, 'GenPrepFileJob', 'PrepFiles')

self.has_replicates = False

prep_paths = []
self.prep_file_paths = {}

for root, dirs, files in walk(tmp):
for _file in files:
# breakup the prep-info-file into segments
# (run-id, project_qid, other) and cleave
# the qiita-id from the project_name.
qid = _file.split('.')[1].split('_')[-1]

if qid not in self.prep_file_paths:
self.prep_file_paths[qid] = []

_path = abspath(join(root, _file))
if _path.endswith('.tsv'):
prep_paths.append(_path)
self.prep_file_paths[qid].append(_path)

for _dir in dirs:
if _dir == '1':
# if PrepFiles contains the '1' directory, then it's a
# given that this sample-sheet contains replicates.
self.has_replicates = True

# currently imported from Assay although it is a base method. it
# could be imported into Workflows potentially, since it is a post-
# processing step. All pairings of assay and instrument type need to
# generate prep-info files in the same format.
self.overwrite_prep_files(prep_paths)

# for now, simply re-run any line below as if it was a new job, even
# for a restart. functionality is idempotent, except for the
# registration of new preps in Qiita. These will simply be removed
# manually.

# post-processing steps are by default associated with the Workflow
# class, since they deal with fastq files and Qiita, and don't depend
# on assay or instrument type.
self.update_status("Generating sample information", 5, 9)
self.sifs = self.generate_sifs()

# post-processing step.
self.update_status("Registering blanks in Qiita", 6, 9)
if self.update:
self.update_blanks_in_qiita()

self.update_status("Loading preps into Qiita", 7, 9)
if self.update:
self.update_prep_templates()

# before we load preps into Qiita we need to copy the fastq
# files n times for n preps and correct the file-paths each
# prep is pointing to.
self.load_preps_into_qiita()

self.fsr.generate_report()

self.update_status("Generating packaging commands", 8, 9)
self.generate_commands()

self.update_status("Packaging results", 9, 9)
if self.update:
self.execute_commands()


class Metatranscriptomic(MetaOmic):
METATRANSCRIPTOMIC_TYPE = 'Metatranscriptomic'
Expand Down
13 changes: 13 additions & 0 deletions qp_klp/Protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ def get_config(command):
if 'ConvertJob' not in self.skip_steps:
job.run(callback=self.job_callback)

# if successful, set self.reports_path
self.reports_path = join(self.pipeline.output_path,
'ConvertJob',
'Reports',
'Demultiplex_Stats.csv')
# TODO: Include alternative path when using bcl2fastq instead of
# bcl-convert.

# audit the results to determine which samples failed to convert
# properly. Append these to the failed-samples report and also
# return the list directly to the caller.
Expand Down Expand Up @@ -157,6 +165,11 @@ def generate_sequence_counts(self):
if 'SeqCountsJob' not in self.skip_steps:
job.run(callback=self.job_callback)

# if successful, set self.reports_path
self.reports_path = join(self.pipeline.output_path,
'SeqCountsJob',
'SeqCounts.csv')

# Do not add an entry to fsr because w/respect to counting, either
# all jobs are going to fail or none are going to fail. It's not
# likely that we're going to fail to count sequences for only some
Expand Down
2 changes: 1 addition & 1 deletion qp_klp/StandardAmpliconWorkflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def __init__(self, **kwargs):
# NB: Amplicon workflows don't have failed samples records because
# the fastq files are not demultiplexed.

self.master_qiita_job_id = None
self.master_qiita_job_id = self.kwargs['job_id']

self.lane_number = self.kwargs['lane_number']
self.is_restart = bool(self.kwargs['is_restart'])
Expand Down
119 changes: 2 additions & 117 deletions qp_klp/StandardMetagenomicWorkflow.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from .Protocol import Illumina
from os.path import join, abspath, exists
from os import walk
from os.path import join, exists
from shutil import rmtree
from sequence_processing_pipeline.Pipeline import Pipeline
from .Assays import Metagenomic
Expand Down Expand Up @@ -35,7 +34,7 @@ def __init__(self, **kwargs):
self.fsr = FailedSamplesRecord(self.kwargs['output_dir'],
self.pipeline.sample_sheet.samples)

self.master_qiita_job_id = None
self.master_qiita_job_id = self.kwargs['job_id']

self.lane_number = self.kwargs['lane_number']
self.is_restart = bool(self.kwargs['is_restart'])
Expand Down Expand Up @@ -67,117 +66,3 @@ def determine_steps_to_skip(self):
else:
# work stopped before this job could be completed.
rmtree(join(out_dir, directory))

def execute_pipeline(self):
'''
Executes steps of pipeline in proper sequence.
:return: None
'''
if not self.is_restart:
self.pre_check()

# this is performed even in the event of a restart.
self.generate_special_map()

# even if a job is being skipped, it's being skipped because it was
# determined that it already completed successfully. Hence,
# increment the status because we are still iterating through them.

self.update_status("Converting data", 1, 9)
if "ConvertJob" not in self.skip_steps:
# converting raw data to fastq depends heavily on the instrument
# used to generate the run_directory. Hence this method is
# supplied by the instrument mixin.
# NB: convert_raw_to_fastq() now generates fsr on its own.
self.convert_raw_to_fastq()

self.update_status("Performing quality control", 2, 9)
if "NuQCJob" not in self.skip_steps:
# quality_control generates its own fsr now
self.quality_control(self.pipeline)

self.update_status("Generating reports", 3, 9)
if "FastQCJob" not in self.skip_steps:
# reports are currently implemented by the assay mixin. This is
# only because metagenomic runs currently require a failed-samples
# report to be generated. This is not done for amplicon runs since
# demultiplexing occurs downstream of SPP.
results = self.generate_reports()
self.fsr_write(results, 'FastQCJob')

self.update_status("Generating preps", 4, 9)
if "GenPrepFileJob" not in self.skip_steps:
# preps are currently associated with array mixin, but only
# because there are currently some slight differences in how
# FastQCJob gets instantiated(). This could get moved into a
# shared method, but probably still in Assay.
self.generate_prep_file()

# moved final component of genprepfilejob outside of object.
# obtain the paths to the prep-files generated by GenPrepFileJob
# w/out having to recover full state.
tmp = join(self.pipeline.output_path, 'GenPrepFileJob', 'PrepFiles')

self.has_replicates = False

prep_paths = []
self.prep_file_paths = {}

for root, dirs, files in walk(tmp):
for _file in files:
# breakup the prep-info-file into segments
# (run-id, project_qid, other) and cleave
# the qiita-id from the project_name.
qid = _file.split('.')[1].split('_')[-1]

if qid not in self.prep_file_paths:
self.prep_file_paths[qid] = []

_path = abspath(join(root, _file))
if _path.endswith('.tsv'):
prep_paths.append(_path)
self.prep_file_paths[qid].append(_path)

for _dir in dirs:
if _dir == '1':
# if PrepFiles contains the '1' directory, then it's a
# given that this sample-sheet contains replicates.
self.has_replicates = True

# currently imported from Assay although it is a base method. it
# could be imported into Workflows potentially, since it is a post-
# processing step. All pairings of assay and instrument type need to
# generate prep-info files in the same format.
self.overwrite_prep_files(prep_paths)

# for now, simply re-run any line below as if it was a new job, even
# for a restart. functionality is idempotent, except for the
# registration of new preps in Qiita. These will simply be removed
# manually.

# post-processing steps are by default associated with the Workflow
# class, since they deal with fastq files and Qiita, and don't depend
# on assay or instrument type.
self.update_status("Generating sample information", 5, 9)
self.sifs = self.generate_sifs()

# post-processing step.
self.update_status("Registering blanks in Qiita", 6, 9)
if self.update:
self.update_blanks_in_qiita()

self.update_status("Loading preps into Qiita", 7, 9)
if self.update:
self.update_prep_templates()

# before we load preps into Qiita we need to copy the fastq
# files n times for n preps and correct the file-paths each
# prep is pointing to.
self.load_preps_into_qiita()

self.update_status("Generating packaging commands", 8, 9)
self.generate_commands()

self.update_status("Packaging results", 9, 9)
if self.update:
self.execute_commands()
Loading