diff --git a/.dockstore.yml b/.dockstore.yml index ee7b190ef3..6e7ccdf10d 100644 --- a/.dockstore.yml +++ b/.dockstore.yml @@ -75,10 +75,6 @@ workflows: subclass: WDL primaryDescriptorPath: /pipelines/broad/arrays/imputation_beagle/ImputationBeagle.wdl - - name: ImputationBeaglePreChunk - subclass: WDL - primaryDescriptorPath: /pipelines/broad/arrays/imputation_beagle/ImputationBeaglePreChunk.wdl - - name: LiftoverVcfs subclass: WDL primaryDescriptorPath: /pipelines/broad/arrays/imputation_beagle/LiftoverVcfs.wdl diff --git a/pipelines/broad/arrays/imputation_beagle/ImputationBeagle.wdl b/pipelines/broad/arrays/imputation_beagle/ImputationBeagle.wdl index 2b6387d677..ecd073524f 100644 --- a/pipelines/broad/arrays/imputation_beagle/ImputationBeagle.wdl +++ b/pipelines/broad/arrays/imputation_beagle/ImputationBeagle.wdl @@ -1,6 +1,7 @@ version 1.0 import "../../../../tasks/broad/ImputationTasks.wdl" as tasks +import "../../../../tasks/broad/ImputationBeagleTasks.wdl" as beagleTasks workflow ImputationBeagle { @@ -84,7 +85,7 @@ workflow ImputationBeagle { gatk_docker = gatk_docker } - call tasks.CountVariantsInChunksBeagle { + call beagleTasks.CountVariantsInChunks { input: vcf = GenerateChunk.output_vcf, vcf_index = GenerateChunk.output_vcf_index, @@ -92,50 +93,30 @@ workflow ImputationBeagle { gatk_docker = gatk_docker } - call tasks.CheckChunksBeagle { + call beagleTasks.CheckChunks { input: - var_in_original = CountVariantsInChunksBeagle.var_in_original, - var_also_in_reference = CountVariantsInChunksBeagle.var_also_in_reference - } - - # create chunk without overlaps to get sites to impute - call tasks.SubsetVcfToRegion { - input: - vcf = CreateVcfIndex.vcf, - vcf_index = CreateVcfIndex.vcf_index, - output_basename = "input_samples_subset_to_chunk", - contig = referencePanelContig.contig, - start = start, - end = end, - gatk_docker = gatk_docker - } - - call tasks.SetIDs as SetIdsVcfToImpute { - input: - vcf = SubsetVcfToRegion.output_vcf, - output_basename = "input_samples_with_variant_ids" + var_in_original = CountVariantsInChunks.var_in_original, + var_also_in_reference = CountVariantsInChunks.var_also_in_reference } } Array[File] chunkedVcfsWithOverlapsForImputation = GenerateChunk.output_vcf - Array[File] chunkedVcfsWithoutOverlapsForSiteIds = SetIdsVcfToImpute.output_vcf - Array[File] chunkedVcfIndexesWithoutOverlapsForSiteIds = SetIdsVcfToImpute.output_vcf_index call tasks.StoreChunksInfo as StoreContigLevelChunksInfo { input: chroms = chunk_contig, starts = start, ends = end, - vars_in_array = CountVariantsInChunksBeagle.var_in_original, - vars_in_panel = CountVariantsInChunksBeagle.var_also_in_reference, - valids = CheckChunksBeagle.valid, + vars_in_array = CountVariantsInChunks.var_in_original, + vars_in_panel = CountVariantsInChunks.var_also_in_reference, + valids = CheckChunks.valid, basename = output_basename } # if any chunk for any chromosome fail CheckChunks, then we will not impute run any task in the next scatter, # namely phasing and imputing which would be the most costly to throw away Int n_failed_chunks_int = select_first([error_count_override, read_int(StoreContigLevelChunksInfo.n_failed_chunks)]) - call tasks.ErrorWithMessageIfErrorCountNotZero as FailQCNChunks { + call beagleTasks.ErrorWithMessageIfErrorCountNotZero as FailQCNChunks { input: errorCount = n_failed_chunks_int, message = "contig " + referencePanelContig.contig + " had " + n_failed_chunks_int + " failing chunks" @@ -144,13 +125,6 @@ workflow ImputationBeagle { scatter (i in range(num_chunks)) { String chunk_basename_imputed = referencePanelContig.contig + "_chunk_" + i + "_imputed" - call tasks.ExtractIDs as ExtractIdsVcfToImpute { - input: - vcf = chunkedVcfsWithoutOverlapsForSiteIds[i], - output_basename = "imputed_sites", - for_dependency = FailQCNChunks.done # these shenanigans can be replaced with `after` in wdl 1.1 - } - # max amount of cpus you can ask for is 96 so at a max of 10k samples we can only ask for 9 cpu a sample. # these values are based on trying to optimize for pre-emptibility using a 400k sample referene panel # and up to a 10k sample input vcf @@ -158,7 +132,7 @@ workflow ImputationBeagle { Int beagle_phase_memory_in_gb = if (CountSamples.nSamples <= 1000) then 22 else ceil(beagle_cpu * 1.5) Int beagle_impute_memory_in_gb = if (CountSamples.nSamples <= 1000) then 30 else ceil(beagle_cpu * 4.3) - call tasks.PhaseBeagle { + call beagleTasks.Phase { input: dataset_vcf = chunkedVcfsWithOverlapsForImputation[i], ref_panel_bref3 = referencePanelContig.bref3, @@ -171,9 +145,9 @@ workflow ImputationBeagle { memory_mb = beagle_phase_memory_in_gb * 1024 } - call tasks.ImputeBeagle { + call beagleTasks.Impute { input: - dataset_vcf = PhaseBeagle.vcf, + dataset_vcf = Phase.vcf, ref_panel_bref3 = referencePanelContig.bref3, chrom = referencePanelContig.contig, basename = chunk_basename_imputed, @@ -186,7 +160,7 @@ workflow ImputationBeagle { call tasks.CreateVcfIndex as IndexImputedBeagle { input: - vcf_input = ImputeBeagle.vcf, + vcf_input = Impute.vcf, gatk_docker = gatk_docker } @@ -214,50 +188,9 @@ workflow ImputationBeagle { output_basename = chunk_basename_imputed, gatk_docker = gatk_docker } - - call tasks.SetIDs { - input: - vcf = RemoveSymbolicAlleles.output_vcf, - output_basename = chunk_basename_imputed - } - - call tasks.ExtractIDs { - input: - vcf = SetIDs.output_vcf, - output_basename = "imputed_sites" - } - - call tasks.FindSitesUniqueToFileTwoOnly { - input: - file1 = select_first([ExtractIDs.ids, write_lines([])]), - file2 = ExtractIdsVcfToImpute.ids, - ubuntu_docker = ubuntu_docker - } - - call tasks.SelectVariantsByIds { - input: - vcf = chunkedVcfsWithoutOverlapsForSiteIds[i], - vcf_index = chunkedVcfIndexesWithoutOverlapsForSiteIds[i], - ids = FindSitesUniqueToFileTwoOnly.missing_sites, - basename = "imputed_sites_to_recover", - gatk_docker = gatk_docker - } - - call tasks.RemoveAnnotations { - input: - vcf = SelectVariantsByIds.output_vcf, - basename = "imputed_sites_to_recover_annotations_removed" - } - - call tasks.InterleaveVariants { - input: - vcfs = select_all([RemoveAnnotations.output_vcf, SetIDs.output_vcf]), - basename = output_basename, # TODO consider using a contig/chunk labeled basename - gatk_docker = gatk_docker - } } - Array[File] chromosome_vcfs = select_all(InterleaveVariants.output_vcf) + Array[File] chromosome_vcfs = select_all(RemoveSymbolicAlleles.output_vcf) } call tasks.GatherVcfs { @@ -272,9 +205,9 @@ workflow ImputationBeagle { chroms = flatten(chunk_contig), starts = flatten(start), ends = flatten(end), - vars_in_array = flatten(CountVariantsInChunksBeagle.var_in_original), - vars_in_panel = flatten(CountVariantsInChunksBeagle.var_also_in_reference), - valids = flatten(CheckChunksBeagle.valid), + vars_in_array = flatten(CountVariantsInChunks.var_in_original), + vars_in_panel = flatten(CountVariantsInChunks.var_also_in_reference), + valids = flatten(CheckChunks.valid), basename = output_basename } diff --git a/pipelines/broad/arrays/imputation_beagle/ImputationBeaglePreChunk.changelog.md b/pipelines/broad/arrays/imputation_beagle/ImputationBeaglePreChunk.changelog.md deleted file mode 100644 index 336273806b..0000000000 --- a/pipelines/broad/arrays/imputation_beagle/ImputationBeaglePreChunk.changelog.md +++ /dev/null @@ -1,4 +0,0 @@ -# 0.0.1 -2024-11-13 (Date of Last Commit) - -* Pipeline still in developmental state diff --git a/pipelines/broad/arrays/imputation_beagle/ImputationBeaglePreChunk.wdl b/pipelines/broad/arrays/imputation_beagle/ImputationBeaglePreChunk.wdl deleted file mode 100644 index a35cf241f6..0000000000 --- a/pipelines/broad/arrays/imputation_beagle/ImputationBeaglePreChunk.wdl +++ /dev/null @@ -1,247 +0,0 @@ -version 1.0 - -import "../../../../tasks/broad/ImputationTasks.wdl" as tasks - -workflow ImputationBeagle { - - String pipeline_version = "0.0.1" - - input { - Int chunkLength = 25000000 - Int chunkOverlaps = 5000000 # this is the padding that will be added to the beginning and end of each chunk to reduce edge effects - - File multi_sample_vcf - - File ref_dict # for reheadering / adding contig lengths in the header of the ouptut VCF, and calculating contig lengths - Array[String] contigs - String reference_panel_path_prefix # path + file prefix to the bucket where the reference panel files are stored for all contigs - String genetic_maps_path # path to the bucket where genetic maps are stored for all contigs - String output_basename # the basename for intermediate and output files - - # file extensions used to find reference panel files - String bed_suffix = ".bed" - String bref3_suffix = ".bref3" - - String gatk_docker = "terrapublic.azurecr.io/gatk:4.5-squashed" # "broadinstitute/gatk-nightly:2024-06-06-4.5.0.0-36-g2a420e483-NIGHTLY-SNAPSHOT" - - Int? error_count_override - } - - call tasks.CountSamples { - input: - vcf = multi_sample_vcf, - } - - call tasks.PreSplitVcf { - input: - contigs = contigs, - vcf = multi_sample_vcf, - gatk_docker = gatk_docker - } - - scatter (contig_index in range(length(contigs))) { - # these are specific to hg38 - contig is format 'chr1' - String reference_basename = reference_panel_path_prefix + "." + contigs[contig_index] - String genetic_map_filename = genetic_maps_path + "plink." + contigs[contig_index] + ".GRCh38.withchr.map" - - ReferencePanelContig referencePanelContig = { - "bed": reference_basename + bed_suffix, - "bref3": reference_basename + bref3_suffix, - "contig": contigs[contig_index], - "genetic_map": genetic_map_filename - } - - call tasks.CalculateChromosomeLength { - input: - ref_dict = ref_dict, - chrom = referencePanelContig.contig - } - - call tasks.PreChunkVcf { - input: - chromosome_length=CalculateChromosomeLength.chrom_length, - chunk_length = chunkLength, - chunk_overlap = chunkOverlaps, - chrom = contigs[contig_index], - vcf = PreSplitVcf.chr_split_vcfs[contig_index], - vcf_index = PreSplitVcf.chr_split_vcf_indices[contig_index], - gatk_docker = gatk_docker - } - - scatter (i in range(length(PreChunkVcf.generate_chunk_vcfs))) { - String chunk_contig = referencePanelContig.contig - - Int start = PreChunkVcf.starts[i] - Int end = PreChunkVcf.ends[i] - - call tasks.CountVariantsInChunksBeagle { - input: - vcf = PreChunkVcf.generate_chunk_vcfs[i], - vcf_index = PreChunkVcf.generate_chunk_vcf_indices[i], - panel_bed_file = referencePanelContig.bed, - gatk_docker = gatk_docker - } - - call tasks.CheckChunksBeagle { - input: - var_in_original = CountVariantsInChunksBeagle.var_in_original, - var_also_in_reference = CountVariantsInChunksBeagle.var_also_in_reference - } - - call tasks.SetIDs as SetIdsVcfToImpute { - input: - vcf = PreChunkVcf.subset_vcfs[i], - output_basename = "input_samples_with_variant_ids" - } - } - - call tasks.StoreChunksInfo as StoreContigLevelChunksInfo { - input: - chroms = chunk_contig, - starts = start, - ends = end, - vars_in_array = CountVariantsInChunksBeagle.var_in_original, - vars_in_panel = CountVariantsInChunksBeagle.var_also_in_reference, - valids = CheckChunksBeagle.valid, - basename = output_basename - } - - # if any chunk for any chromosome fail CheckChunks, then we will not impute run any task in the next scatter, - # namely phasing and imputing which would be the most costly to throw away - Int n_failed_chunks_int = select_first([error_count_override, read_int(StoreContigLevelChunksInfo.n_failed_chunks)]) - call tasks.ErrorWithMessageIfErrorCountNotZero as FailQCNChunks { - input: - errorCount = n_failed_chunks_int, - message = "contig " + referencePanelContig.contig + " had " + n_failed_chunks_int + " failing chunks" - } - - scatter (i in range(length(PreChunkVcf.generate_chunk_vcfs))) { - - String chunk_basename = referencePanelContig.contig + "_chunk_" + i - - Int start2 = PreChunkVcf.starts[i] - Int end2 = PreChunkVcf.ends[i] - - call tasks.ExtractIDs as ExtractIdsVcfToImpute { - input: - vcf = SetIdsVcfToImpute.output_vcf[i], - output_basename = "imputed_sites", - for_dependency = FailQCNChunks.done # these shenanigans can be replaced with `after` in wdl 1.1 - } - - call tasks.PhaseAndImputeBeagle { - input: - dataset_vcf = PreChunkVcf.generate_chunk_vcfs[i], - ref_panel_bref3 = referencePanelContig.bref3, - chrom = referencePanelContig.contig, - basename = chunk_basename, - genetic_map_file = referencePanelContig.genetic_map, - start = start2, - end = end2 - } - - call tasks.UpdateHeader { - input: - vcf = PhaseAndImputeBeagle.vcf, - vcf_index = PhaseAndImputeBeagle.vcf_index, - ref_dict = ref_dict, - basename = chunk_basename + "_imputed", - gatk_docker = gatk_docker - } - - call tasks.SeparateMultiallelics { - input: - original_vcf = UpdateHeader.output_vcf, - original_vcf_index = UpdateHeader.output_vcf_index, - output_basename = chunk_basename + "_imputed" - } - - call tasks.RemoveSymbolicAlleles { - input: - original_vcf = SeparateMultiallelics.output_vcf, - original_vcf_index = SeparateMultiallelics.output_vcf_index, - output_basename = chunk_basename + "_imputed", - gatk_docker = gatk_docker - } - - call tasks.SetIDs { - input: - vcf = RemoveSymbolicAlleles.output_vcf, - output_basename = chunk_basename + "_imputed" - } - - call tasks.ExtractIDs { - input: - vcf = SetIDs.output_vcf, - output_basename = "imputed_sites", - for_dependency = true - } - - call tasks.FindSitesUniqueToFileTwoOnly { - input: - file1 = ExtractIDs.ids, - file2 = ExtractIdsVcfToImpute.ids - } - - call tasks.SelectVariantsByIds { - input: - vcf = SetIdsVcfToImpute.output_vcf[i], - vcf_index = SetIdsVcfToImpute.output_vcf_index[i], - ids = FindSitesUniqueToFileTwoOnly.missing_sites, - basename = "imputed_sites_to_recover", - gatk_docker = gatk_docker - } - - call tasks.RemoveAnnotations { - input: - vcf = SelectVariantsByIds.output_vcf, - basename = "imputed_sites_to_recover_annotations_removed" - } - - call tasks.InterleaveVariants { - input: - vcfs = [RemoveAnnotations.output_vcf, SetIDs.output_vcf], - basename = output_basename, - gatk_docker = gatk_docker - } - } - - Array[File] chromosome_vcfs = InterleaveVariants.output_vcf - } - - call tasks.GatherVcfs { - input: - input_vcfs = flatten(chromosome_vcfs), - output_vcf_basename = output_basename + ".imputed", - gatk_docker = gatk_docker - } - - call tasks.StoreChunksInfo { - input: - chroms = flatten(chunk_contig), - starts = flatten(start), - ends = flatten(end), - vars_in_array = flatten(CountVariantsInChunksBeagle.var_in_original), - vars_in_panel = flatten(CountVariantsInChunksBeagle.var_also_in_reference), - valids = flatten(CheckChunksBeagle.valid), - basename = output_basename - } - - output { - File imputed_multi_sample_vcf = GatherVcfs.output_vcf - File imputed_multi_sample_vcf_index = GatherVcfs.output_vcf_index - File chunks_info = StoreChunksInfo.chunks_info - } - - meta { - allowNestedInputs: true - } - -} - -struct ReferencePanelContig { - File bed - File bref3 - String contig - File genetic_map -} diff --git a/tasks/broad/ImputationBeagleTasks.wdl b/tasks/broad/ImputationBeagleTasks.wdl new file mode 100644 index 0000000000..d62f5a4e8c --- /dev/null +++ b/tasks/broad/ImputationBeagleTasks.wdl @@ -0,0 +1,181 @@ +version 1.0 + +task CountVariantsInChunks { + input { + File vcf + File vcf_index + File panel_bed_file + + String gatk_docker = "us.gcr.io/broad-gatk/gatk:4.5.0.0" + Int cpu = 1 + Int memory_mb = 16000 + Int disk_size_gb = 2 * ceil(size([vcf, vcf_index, panel_bed_file], "GiB")) + 10 + } + Int command_mem = memory_mb - 1500 + Int max_heap = memory_mb - 1000 + + command <<< + set -e -o pipefail + + ln -sf ~{vcf} input.vcf.gz + ln -sf ~{vcf_index} input.vcf.gz.tbi + + gatk --java-options "-Xms~{command_mem}m -Xmx~{max_heap}m" CountVariants -V input.vcf.gz | tail -n 1 > var_in_original + bedtools intersect -a ~{vcf} -b ~{panel_bed_file} | wc -l > var_also_in_reference + >>> + + output { + Int var_in_original = read_int("var_in_original") + Int var_also_in_reference = read_int("var_also_in_reference") + } + runtime { + docker: gatk_docker + disks: "local-disk ${disk_size_gb} HDD" + memory: "${memory_mb} MiB" + cpu: cpu + preemptible: 3 + } +} + +task CheckChunks { + input { + Int var_in_original + Int var_also_in_reference + + String bcftools_docker = "us.gcr.io/broad-gotc-prod/imputation-bcf-vcf:1.0.7-1.10.2-0.1.16-1669908889" + Int cpu = 1 + Int memory_mb = 4000 + } + command <<< + set -e -o pipefail + + if [ $(( ~{var_also_in_reference} * 2 - ~{var_in_original})) -gt 0 ] && [ ~{var_also_in_reference} -gt 3 ]; then + echo true > valid_file.txt + else + echo false > valid_file.txt + fi + >>> + output { + Boolean valid = read_boolean("valid_file.txt") + } + runtime { + docker: bcftools_docker + disks: "local-disk 10 HDD" + memory: "${memory_mb} MiB" + cpu: cpu + preemptible: 3 + } +} + +task Phase { + input { + File dataset_vcf + File ref_panel_bref3 + File genetic_map_file + String basename + String chrom # not needed if ref file has been chunked and you are using the entire chunk + Int start # not needed if ref file has been chunked and you are using the entire chunk + Int end # not needed if ref file has been chunked and you are using the entire chunk + + String beagle_docker = "us-central1-docker.pkg.dev/morgan-fieldeng-gcp/imputation-beagle-development/imputation-beagle:0.0.1-01Mar24.d36-wip-temp-20240301" + Int cpu = 8 # This parameter is used as the nthreads input to Beagle which is part of how we make it determinstic. Changing this value may change the output generated by the tool + Int memory_mb = 32000 # value depends on chunk size, the number of samples in ref and target panel, and whether imputation is performed + Int xmx_mb = memory_mb - 5000 # I suggest setting this parameter to be 85-90% of the memory_mb parameter + Int disk_size_gb = ceil(3 * size([dataset_vcf, ref_panel_bref3], "GiB")) + 10 # value may need to be adjusted + } + + command <<< + set -e -o pipefail + + java -ea -Xmx~{xmx_mb}m \ + -jar /usr/gitc/beagle.01Mar24.d36.jar \ + gt=~{dataset_vcf} \ + ref=~{ref_panel_bref3} \ + map=~{genetic_map_file} \ + out=phased_~{basename} \ + chrom=~{chrom}:~{start}-~{end} \ + impute=false \ + nthreads=~{cpu} \ + seed=-99999 + + >>> + output { + File vcf = "phased_~{basename}.vcf.gz" + File log = "phased_~{basename}.log" + } + runtime { + docker: beagle_docker + disks: "local-disk ${disk_size_gb} HDD" + memory: "${memory_mb} MiB" + cpu: cpu + preemptible: 3 + } +} + +task Impute { + input { + File dataset_vcf + File ref_panel_bref3 + File genetic_map_file + String basename + String chrom # not needed if ref file has been chunked and you are using the entire chunk + Int start # not needed if ref file has been chunked and you are using the entire chunk + Int end # not needed if ref file has been chunked and you are using the entire chunk + + String beagle_docker = "us-central1-docker.pkg.dev/morgan-fieldeng-gcp/imputation-beagle-development/imputation-beagle:0.0.1-01Mar24.d36-wip-temp-20240301" + Int cpu = 8 # This parameter is used as the nthreads input to Beagle which is part of how we make it determinstic. Changing this value may change the output generated by the tool + Int memory_mb = 32000 # value depends on chunk size, the number of samples in ref and target panel, and whether imputation is performed + Int xmx_mb = memory_mb - 5000 # I suggest setting this parameter to be 85-90% of the memory_mb parameter + Int disk_size_gb = ceil(3 * size([dataset_vcf, ref_panel_bref3], "GiB")) + 10 # value may need to be adjusted + } + + command <<< + set -e -o pipefail + + java -ea -Xmx~{xmx_mb}m \ + -jar /usr/gitc/beagle.01Mar24.d36.jar \ + gt=~{dataset_vcf} \ + ref=~{ref_panel_bref3} \ + map=~{genetic_map_file} \ + out=imputed_~{basename} \ + chrom=~{chrom}:~{start}-~{end} \ + impute=true \ + nthreads=~{cpu} \ + seed=-99999 + + >>> + output { + File vcf = "imputed_~{basename}.vcf.gz" + File log = "imputed_~{basename}.log" + } + runtime { + docker: beagle_docker + disks: "local-disk ${disk_size_gb} HDD" + memory: "${memory_mb} MiB" + cpu: cpu + preemptible: 3 + } +} + +task ErrorWithMessageIfErrorCountNotZero { + input { + Int errorCount + String message + } + command <<< + if [[ ~{errorCount} -gt 0 ]]; then + >&2 echo "Error: ~{message}" + exit 1 + else + exit 0 + fi + >>> + + runtime { + docker: "ubuntu.azurecr.io/ubuntu:20.04" + preemptible: 3 + } + output { + Boolean done = true + } +} diff --git a/tasks/broad/ImputationTasks.wdl b/tasks/broad/ImputationTasks.wdl index 6fbf87334b..f6bd7ad113 100644 --- a/tasks/broad/ImputationTasks.wdl +++ b/tasks/broad/ImputationTasks.wdl @@ -301,163 +301,6 @@ task Minimac4 { } } -task CountVariantsInChunksBeagle { - input { - File vcf - File vcf_index - File panel_bed_file - - String gatk_docker = "us.gcr.io/broad-gatk/gatk:4.5.0.0" - Int cpu = 1 - Int memory_mb = 16000 - Int disk_size_gb = 2 * ceil(size([vcf, vcf_index, panel_bed_file], "GiB")) + 10 - } - Int command_mem = memory_mb - 1500 - Int max_heap = memory_mb - 1000 - - command <<< - set -e -o pipefail - - ln -sf ~{vcf} input.vcf.gz - ln -sf ~{vcf_index} input.vcf.gz.tbi - - gatk --java-options "-Xms~{command_mem}m -Xmx~{max_heap}m" CountVariants -V input.vcf.gz | tail -n 1 > var_in_original - bedtools intersect -a ~{vcf} -b ~{panel_bed_file} | wc -l > var_also_in_reference - >>> - - output { - Int var_in_original = read_int("var_in_original") - Int var_also_in_reference = read_int("var_also_in_reference") - } - runtime { - docker: gatk_docker - disks: "local-disk ${disk_size_gb} HDD" - memory: "${memory_mb} MiB" - cpu: cpu - preemptible: 3 - } -} - -task CheckChunksBeagle { - input { - Int var_in_original - Int var_also_in_reference - - String bcftools_docker = "us.gcr.io/broad-gotc-prod/imputation-bcf-vcf:1.0.7-1.10.2-0.1.16-1669908889" - Int cpu = 1 - Int memory_mb = 4000 - } - command <<< - set -e -o pipefail - - if [ $(( ~{var_also_in_reference} * 2 - ~{var_in_original})) -gt 0 ] && [ ~{var_also_in_reference} -gt 3 ]; then - echo true > valid_file.txt - else - echo false > valid_file.txt - fi - >>> - output { - Boolean valid = read_boolean("valid_file.txt") - } - runtime { - docker: bcftools_docker - disks: "local-disk 10 HDD" - memory: "${memory_mb} MiB" - cpu: cpu - preemptible: 3 - } -} - -task PhaseBeagle { - input { - File dataset_vcf - File ref_panel_bref3 - File genetic_map_file - String basename - String chrom # not needed if ref file has been chunked and you are using the entire chunk - Int start # not needed if ref file has been chunked and you are using the entire chunk - Int end # not needed if ref file has been chunked and you are using the entire chunk - - String beagle_docker = "us-central1-docker.pkg.dev/morgan-fieldeng-gcp/imputation-beagle-development/imputation-beagle:0.0.1-01Mar24.d36-wip-temp-20240301" - Int cpu = 8 # This parameter is used as the nthreads input to Beagle which is part of how we make it determinstic. Changing this value may change the output generated by the tool - Int memory_mb = 32000 # value depends on chunk size, the number of samples in ref and target panel, and whether imputation is performed - Int xmx_mb = memory_mb - 5000 # I suggest setting this parameter to be 85-90% of the memory_mb parameter - Int disk_size_gb = ceil(3 * size([dataset_vcf, ref_panel_bref3], "GiB")) + 10 # value may need to be adjusted - } - - command <<< - set -e -o pipefail - - java -ea -Xmx~{xmx_mb}m \ - -jar /usr/gitc/beagle.01Mar24.d36.jar \ - gt=~{dataset_vcf} \ - ref=~{ref_panel_bref3} \ - map=~{genetic_map_file} \ - out=phased_~{basename} \ - chrom=~{chrom}:~{start}-~{end} \ - impute=false \ - nthreads=~{cpu} \ - seed=-99999 - - >>> - output { - File vcf = "phased_~{basename}.vcf.gz" - File log = "phased_~{basename}.log" - } - runtime { - docker: beagle_docker - disks: "local-disk ${disk_size_gb} HDD" - memory: "${memory_mb} MiB" - cpu: cpu - preemptible: 3 - } -} - -task ImputeBeagle { - input { - File dataset_vcf - File ref_panel_bref3 - File genetic_map_file - String basename - String chrom # not needed if ref file has been chunked and you are using the entire chunk - Int start # not needed if ref file has been chunked and you are using the entire chunk - Int end # not needed if ref file has been chunked and you are using the entire chunk - - String beagle_docker = "us-central1-docker.pkg.dev/morgan-fieldeng-gcp/imputation-beagle-development/imputation-beagle:0.0.1-01Mar24.d36-wip-temp-20240301" - Int cpu = 8 # This parameter is used as the nthreads input to Beagle which is part of how we make it determinstic. Changing this value may change the output generated by the tool - Int memory_mb = 32000 # value depends on chunk size, the number of samples in ref and target panel, and whether imputation is performed - Int xmx_mb = memory_mb - 5000 # I suggest setting this parameter to be 85-90% of the memory_mb parameter - Int disk_size_gb = ceil(3 * size([dataset_vcf, ref_panel_bref3], "GiB")) + 10 # value may need to be adjusted - } - - command <<< - set -e -o pipefail - - java -ea -Xmx~{xmx_mb}m \ - -jar /usr/gitc/beagle.01Mar24.d36.jar \ - gt=~{dataset_vcf} \ - ref=~{ref_panel_bref3} \ - map=~{genetic_map_file} \ - out=imputed_~{basename} \ - chrom=~{chrom}:~{start}-~{end} \ - impute=true \ - nthreads=~{cpu} \ - seed=-99999 - - >>> - output { - File vcf = "imputed_~{basename}.vcf.gz" - File log = "imputed_~{basename}.log" - } - runtime { - docker: beagle_docker - disks: "local-disk ${disk_size_gb} HDD" - memory: "${memory_mb} MiB" - cpu: cpu - preemptible: 3 - } -} - task GatherVcfs { input { Array[File] input_vcfs @@ -1164,185 +1007,3 @@ task CreateVcfIndex { File vcf_index = "~{vcf_basename}.tbi" } } - -task PreSplitVcf { - input { - Array[String] contigs - File vcf - - Int disk_size_gb = ceil(3*size(vcf, "GiB")) + 10 - Int cpu = 1 - Int memory_mb = 6000 - String gatk_docker = "us.gcr.io/broad-gatk/gatk:4.5.0.0" - } - Int command_mem = memory_mb - 1500 - Int max_heap = memory_mb - 1000 - - command { - set -e -o pipefail - - gatk --java-options "-Xms~{command_mem}m -Xmx~{max_heap}m" \ - IndexFeatureFile -I ~{vcf} - - mkdir split_vcfs - - CONTIG_FILE=~{write_lines(contigs)} - i=0 - - while read -r line; - do - - SPLIT=$(printf "%03d" $i) - echo "SPLIT: $SPLIT" - - gatk --java-options "-Xms~{command_mem}m -Xmx~{max_heap}m" \ - SelectVariants \ - -V ~{vcf} \ - -L $line \ - -O split_vcfs/split_chr_$SPLIT.vcf.gz - - i=$(($i + 1)) - - done < $CONTIG_FILE - } - runtime { - docker: gatk_docker - disks: "local-disk ${disk_size_gb} HDD" - memory: "${memory_mb} MiB" - cpu: cpu - preemptible: 3 - } - output { - Array[File] chr_split_vcfs = glob("split_vcfs/*.vcf.gz") - Array[File] chr_split_vcf_indices = glob("split_vcfs/*.vcf.gz.tbi") - } -} - -task PreChunkVcf { - input { - Int chromosome_length - Int chunk_length - Int chunk_overlap - String chrom - File vcf - File vcf_index - Boolean exclude_filtered = false - - Int disk_size_gb = ceil(4*size(vcf, "GiB")) + 10 - Int cpu = 1 - Int memory_mb = 6000 - String gatk_docker = "us.gcr.io/broad-gatk/gatk:4.5.0.0" - } - Int command_mem = memory_mb - 1500 - Int max_heap = memory_mb - 1000 - - command { - set -e -o pipefail - - ln -sf ~{vcf} input.vcf.gz - ln -sf ~{vcf_index} input.vcf.gz.tbi - - mkdir generate_chunk - mkdir subset_vcf - - CHROM_LENGTH=~{chromosome_length} - CHUNK_LENGTH=~{chunk_length} - CHUNK_OVERLAPS=~{chunk_overlap} - i=0 - LOOP_DRIVER=$(( $i * $CHUNK_LENGTH + 1 )) - - while [ $LOOP_DRIVER -lt $CHROM_LENGTH ] - do - START=$(( $i * $CHUNK_LENGTH + 1 )) - START_OVERLAP_CHECK=$(($START - $CHUNK_OVERLAPS)) - if [ $START_OVERLAP_CHECK -lt 1 ]; then - START_WITH_OVERLAPS=$START - else - START_WITH_OVERLAPS=$(($START - $CHUNK_OVERLAPS)) - fi - echo "START: $START" - echo "START WITH OVERLAPS: $START_WITH_OVERLAPS" - - END_CHECK=$(( ($i + 1) * $CHUNK_LENGTH )) - if [ $END_CHECK -gt $CHROM_LENGTH ]; then - END=$CHROM_LENGTH - else - END=$(( ($i + 1) * $CHUNK_LENGTH )) - fi - - END_OVERLAP_CHECK=$(( $END + $CHUNK_OVERLAPS )) - if [ $END_OVERLAP_CHECK -gt $CHROM_LENGTH ]; then - END_WITH_OVERLAPS=$CHROM_LENGTH - else - END_WITH_OVERLAPS=$(( $END + $CHUNK_OVERLAPS )) - fi - echo "END: $END" - echo "END WITH OVERLAPS: $END_WITH_OVERLAPS" - - CHUNK=$(printf "%03d" $i) - echo "CHUNK: $CHUNK" - - gatk --java-options "-Xms~{command_mem}m -Xmx~{max_heap}m" \ - SelectVariants \ - -V input.vcf.gz \ - --select-type-to-include SNP \ - --max-nocall-fraction 0.1 \ - -xl-select-type SYMBOLIC \ - --select-type-to-exclude MIXED \ - --restrict-alleles-to BIALLELIC \ - -L ~{chrom}:$START_WITH_OVERLAPS-$END_WITH_OVERLAPS \ - -O generate_chunk/~{chrom}_generate_chunk_$CHUNK.vcf.gz \ - --exclude-filtered true - - gatk --java-options "-Xms~{command_mem}m -Xmx~{max_heap}m" \ - SelectVariants \ - -V input.vcf.gz \ - -L ~{chrom}:$START-$END \ - -select "POS >= $START" ~{if exclude_filtered then "--exclude-filtered" else ""} \ - -O subset_vcf/~{chrom}_subset_chunk_$CHUNK.vcf.gz - - echo $START >> start.txt - echo $END >> end.txt - - i=$(($i + 1)) - LOOP_DRIVER=$(( $i * $CHUNK_LENGTH + 1 )) - done - } - runtime { - docker: gatk_docker - disks: "local-disk ${disk_size_gb} HDD" - memory: "${memory_mb} MiB" - cpu: cpu - preemptible: 3 - } - output { - Array[File] generate_chunk_vcfs = glob("generate_chunk/*.vcf.gz") - Array[File] generate_chunk_vcf_indices = glob("generate_chunk/*.vcf.gz.tbi") - Array[File] subset_vcfs = glob("subset_vcf/*.vcf.gz") - Array[String] starts = read_lines("start.txt") - Array[String] ends = read_lines("end.txt") - } -} - -task ErrorWithMessageIfErrorCountNotZero { - input { - Int errorCount - String message - } - command <<< - if [[ ~{errorCount} -gt 0 ]]; then - >&2 echo "Error: ~{message}" - exit 1 - else - exit 0 - fi - >>> - - runtime { - docker: "ubuntu.azurecr.io/ubuntu:20.04" - preemptible: 3 - } - output { - Boolean done = true - } -}