|
11 | 11 | from shutil import copy2 |
12 | 12 | from subprocess import run |
13 | 13 |
|
| 14 | +import pandas as pd |
14 | 15 | import yaml |
| 16 | +from biom import load_table |
15 | 17 | from jinja2 import Environment |
| 18 | +from pysyndna import ( |
| 19 | + fit_linear_regression_models_for_qiita, |
| 20 | +) |
16 | 21 | from qiita_client import ArtifactInfo |
17 | 22 |
|
18 | 23 | from .util import KISSLoader, find_base_path |
@@ -424,3 +429,147 @@ def generate_minimap2_processing(qclient, job_id, out_dir, parameters, url): |
424 | 429 | minimap2_merge_script = _write_slurm(f"{out_dir}/merge", m2mt, **params) |
425 | 430 |
|
426 | 431 | return minimap2_script, minimap2_merge_script |
| 432 | + |
| 433 | + |
| 434 | +def syndna_processing(qclient, job_id, parameters, out_dir): |
| 435 | + """generates output for syndna processing. |
| 436 | +
|
| 437 | + Parameters |
| 438 | + ---------- |
| 439 | + qclient : tgp.qiita_client.QiitaClient |
| 440 | + Qiita server client. |
| 441 | + job_id : str |
| 442 | + Job id. |
| 443 | + parameters : dict |
| 444 | + Parameters for this job. |
| 445 | + out_dir : str |
| 446 | + Output directory. |
| 447 | +
|
| 448 | + Returns |
| 449 | + ------- |
| 450 | + bool, list, str |
| 451 | + Results tuple for Qiita. |
| 452 | + """ |
| 453 | + qclient.update_job_step(job_id, "Commands finished") |
| 454 | + |
| 455 | + errors = [] |
| 456 | + ainfo = [] |
| 457 | + fp_biom = f"{out_dir}/syndna.biom" |
| 458 | + # do we need to stor alignments? |
| 459 | + # fp_alng = f'{out_dir}/sams/final/alignment.tar' |
| 460 | + |
| 461 | + if exists(fp_biom): # and exists(fp_alng): |
| 462 | + # if we got to this point a preparation file should exist in |
| 463 | + # the output folder |
| 464 | + prep = pd.read_csv(f"{out_dir}/prep_info.tsv", index_col=None, sep="\t") |
| 465 | + output = fit_linear_regression_models_for_qiita( |
| 466 | + prep, load_table(fp_biom), int(parameters["min_sample_counts"]) |
| 467 | + ) |
| 468 | + # saving results to disk |
| 469 | + lin_regress_results_fp = f"{out_dir}/lin_regress_by_sample_id.yaml" |
| 470 | + fit_syndna_models_log_fp = f"{out_dir}/fit_syndna_models_log.txt" |
| 471 | + with open(lin_regress_results_fp, "w") as fp: |
| 472 | + fp.write(output["lin_regress_by_sample_id"]) |
| 473 | + with open(fit_syndna_models_log_fp, "w") as fp: |
| 474 | + fp.write(output["fit_syndna_models_log"]) |
| 475 | + ainfo = [ |
| 476 | + ArtifactInfo( |
| 477 | + "SynDNA hits", |
| 478 | + "BIOM", |
| 479 | + [ |
| 480 | + (fp_biom, "biom"), |
| 481 | + # rm if fp_alng is not needed |
| 482 | + # (fp_alng, "log"), |
| 483 | + (lin_regress_results_fp, "log"), |
| 484 | + (fit_syndna_models_log_fp, "log"), |
| 485 | + ], |
| 486 | + ) |
| 487 | + ] |
| 488 | + else: |
| 489 | + ainfo = [] |
| 490 | + errors.append( |
| 491 | + 'Missing files from the "SynDNA hits"; please ' |
| 492 | + "contact [email protected] for more information" |
| 493 | + ) |
| 494 | + |
| 495 | + fp_seqs = f"{out_dir}/filtered" |
| 496 | + reads = [] |
| 497 | + for f in glob(f"{fp_seqs}/*.fastq.gz"): |
| 498 | + reads.append((f, "raw_forward_seqs")) |
| 499 | + |
| 500 | + if not errors: |
| 501 | + ainfo.append(ArtifactInfo("reads without SynDNA", "per_sample_FASTQ", reads)) |
| 502 | + else: |
| 503 | + return False, ainfo, "\n".join(errors) |
| 504 | + |
| 505 | + return True, ainfo, "" |
| 506 | + |
| 507 | + |
| 508 | +def generate_syndna_processing(qclient, job_id, out_dir, parameters, url): |
| 509 | + """generates slurm scripts for syndna processing. |
| 510 | +
|
| 511 | + Parameters |
| 512 | + ---------- |
| 513 | + qclient : tgp.qiita_client.QiitaClient |
| 514 | + Qiita server client. |
| 515 | + job_id : str |
| 516 | + Job id. |
| 517 | + out_dir : str |
| 518 | + Output directory. |
| 519 | + parameters : dict |
| 520 | + Parameters for this job. |
| 521 | + url : str |
| 522 | + URL to send the respose, finish the job. |
| 523 | +
|
| 524 | + Returns |
| 525 | + ------- |
| 526 | + str, str |
| 527 | + Returns the two filepaths of the slurm scripts |
| 528 | + """ |
| 529 | + resources = RESOURCES[ |
| 530 | + "Remove SynDNA plasmid, insert, & GCF_000184185 reads (minimap2)" |
| 531 | + ] |
| 532 | + main_parameters = { |
| 533 | + "conda_environment": CONDA_ENVIRONMENT, |
| 534 | + "output": out_dir, |
| 535 | + "qjid": job_id, |
| 536 | + } |
| 537 | + |
| 538 | + qclient.update_job_step( |
| 539 | + job_id, "Step 1 of 4: Collecting info and generating submission" |
| 540 | + ) |
| 541 | + |
| 542 | + artifact_id = parameters["artifact"] |
| 543 | + |
| 544 | + njobs = generate_sample_list(qclient, artifact_id, out_dir) |
| 545 | + |
| 546 | + qclient.update_job_step( |
| 547 | + job_id, |
| 548 | + "Step 2 of 4: Creating submission templates", |
| 549 | + ) |
| 550 | + |
| 551 | + m2t = JGT("syndna.sbatch") |
| 552 | + step_resources = resources["syndna"] |
| 553 | + params = main_parameters | { |
| 554 | + "job_name": f"sd_{job_id}", |
| 555 | + "node_count": step_resources["node_count"], |
| 556 | + "nprocs": step_resources["nprocs"], |
| 557 | + "wall_time_limit": step_resources["wall_time_limit"], |
| 558 | + "mem_in_gb": step_resources["mem_in_gb"], |
| 559 | + "array_params": f"1-{njobs}%{step_resources['max_tasks']}", |
| 560 | + } |
| 561 | + minimap2_script = _write_slurm(f"{out_dir}/minimap2", m2t, **params) |
| 562 | + |
| 563 | + m2mt = JGT("syndna_finish.sbatch") |
| 564 | + step_resources = resources["finish"] |
| 565 | + params = main_parameters | { |
| 566 | + "job_name": f"me_{job_id}", |
| 567 | + "node_count": step_resources["node_count"], |
| 568 | + "nprocs": step_resources["nprocs"], |
| 569 | + "wall_time_limit": step_resources["wall_time_limit"], |
| 570 | + "mem_in_gb": step_resources["mem_in_gb"], |
| 571 | + "url": url, |
| 572 | + } |
| 573 | + minimap2_merge_script = _write_slurm(f"{out_dir}/merge", m2mt, **params) |
| 574 | + |
| 575 | + return minimap2_script, minimap2_merge_script |
0 commit comments