forked from pmorrell/SNPMeta
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSNPMeta.py
177 lines (166 loc) · 6.96 KB
/
SNPMeta.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
#!/usr/bin/env python
"""
Annotates SNPs using sequences from a specified organism in NCBI's nonredundant
nucleotide sequence database. Produces a dbSNP submission report, or a tab-
delimited report. Requires Biopython.
"""
# Import standard library modules
import sys
import os
import time
# Make sure we are running Python 3
if sys.version_info.major < 3:
sys.exit('This script requires Python 3')
# Try to import the Biopython library
try:
import Bio
from Bio.Blast import NCBIXML
from Bio import SeqIO
except ImportError:
print('This script requires the BioPython library to be installed.')
exit(1)
# Import package components
from snpmeta.LookupTables.tabular_header import table_header
from snpmeta.Environment import setup_env
from snpmeta.ArgumentHandling import parse_args
from snpmeta.ArgumentHandling import validate_args
from snpmeta.SNPAnnotation.snp_annotation import SNPAnnotation
from snpmeta.SNPAnnotation.blast_search import BlastSearch
from snpmeta.SNPAnnotation.genbank import GenBankHandler
def main():
"""Main function."""
# First, parse arguments
parser, args = parse_args.parse_args()
# And then validate
error_code = validate_args.validate_args(args)
if error_code:
if error_code == 1:
parser.print_help()
sys.stderr.write('\nError! Some required inputs were not given.\n')
exit(1)
elif error_code == 2:
sys.stderr.write('Email entered is invalid.\n')
exit(1)
elif error_code == 3:
sys.stderr.write('Contextual sequence length entered is invalid.\n')
exit(1)
elif error_code == 4:
sys.stderr.write(
'Directory entered is not readable or does not exist.\n')
exit(1)
elif error_code == 5:
sys.stderr.write(
'FASTA file entered is an invalid FASTA file.\n')
exit(1)
# Start writing the output file
if args.outfmt == 'tabular':
args.output.write('\t'.join(table_header) + '\n')
# Done checking arguments. Start annotating SNPs.
for s in setup_env.build_targets(args.fasta_file, args.dir, args.no_blast):
# If there is no blast, then we read the FASTA instead
if args.no_blast:
q_seq = s.replace('.xml', '.fasta')
q_seq = SeqIO.read(os.path.join(args.dir, q_seq), 'fasta')
else:
q_seq = s
sys.stderr.write(
'Annotating ' + q_seq.id + '\n')
anno = SNPAnnotation(q_seq, args)
sys.stderr.write(
' Contextual sequence length: ' + str(anno.context) + '\n')
# If BLAST has already been run, we skip this step
if args.no_blast:
blastresults = open(os.path.join(args.dir, s), 'r')
else:
# Start a BlastSearch class
blast = BlastSearch(
args.program,
args.evalue,
args.max_hits,
args.database,
args.entrez_query)
sys.stderr.write(
' Running BLAST with the following parameters: ... \n'
' Program: ' + args.program + '\n'
' E-value: ' + str(args.evalue) + '\n'
' Max hits: ' + str(args.max_hits) + '\n')
# Decide the command to run BLAST, web or local?
blast.build_commandline(q_seq)
# Run the search, return a handle to the results
blastresults = blast.run_blast(q_seq)
# Move on to next element if we could not get BLAST results
if not blastresults:
anno.print_annotation(args.output, args.outfmt)
sys.stderr.write(
' No BLAST hits! Try less stringent criteria.\n')
continue
sys.stderr.write(' Done!\n')
# Start a GenBankHandler class to extract the GenBank record info
genbank = GenBankHandler(args.email, blastresults)
sys.stderr.write(
' Fetching GenBank records ... ')
# Fetch the relevant records
gb_success = genbank.fetch_gb_records()
if not gb_success:
sys.stderr.write('\n')
anno.print_annotation(args.output, args.outfmt)
continue
sys.stderr.write('Done!\n')
# And extract the regions and annotations
sys.stderr.write(
' Extracting annotation information from GenBank records\n')
gb_annotations = genbank.extract_annotations(args.target_organism)
# Set the organism name, gene name, and related locus info from the
# annotations.
if gb_annotations['records']:
anno.genbank_id = gb_annotations['records']
if gb_annotations['regions']:
anno.organism = gb_annotations['regions'][1]
if gb_annotations['genes']:
# Take just the first entry, since it will be the best hit
anno.gene_short_name = gb_annotations['genes'][0]
if gb_annotations['related']:
anno.alt_gene = gb_annotations['related'][0]
anno.alt_org = gb_annotations['related'][1]
# And align the GenBank record to the SNP query sequence, using the
# hit frames from the extracted genbank information
sys.stderr.write(
' Aligning GenBank record and query sequence ... ')
if gb_annotations['regions']:
genbank.align_genbank(q_seq, gb_annotations['regions'][3])
else:
sys.stderr.write(
'\nNo annotations found! Try less stringent criteria.\n')
anno.print_annotation(args.output, args.outfmt)
continue
sys.stderr.write('Done!\n')
# Then use the alignment to calculate the aligned position of the
# query SNP. This method is back in the SNPAnnotation class.
anno.get_aligned_pos(genbank.needle_out.name)
# Calculate whether or not the SNP is coding
func_class = anno.functional_class(gb_annotations['regions'][2])
# If it is coding, get the amino acid states
if func_class == 'Coding':
anno.translate_codons(genbank.genbank_seq.name)
elif func_class == 'Intronic':
anno.notes = 'Intronic'
elif func_class == 'Intergenic':
# Intergenic SNPs are done annotating
pass
elif func_class == 'Gap':
# As are SNPs that don't align nicely to the GenBank record
pass
# Print the annotation information in the desired format
anno.print_annotation(args.output, args.outfmt)
# Cleanup our temporary files
if not args.no_blast:
os.remove(blast.blastin.name)
os.remove(blast.blastout.name)
os.remove(genbank.genbank_seq.name)
os.remove(genbank.needle_out.name)
# Sleep for 1 second to avoid making requests too quickly
time.sleep(1)
# Close the output handle
args.output.close()
return
main()