5
5
6
6
"""
7
7
8
- import sys
9
8
import collections as cx
9
+ import sys
10
+
10
11
from itertools import chain
11
- from goatools .anno .init .reader_genetogo import InitAssc
12
- from goatools .anno .annoreader_base import AnnoReaderBase
13
- from goatools .anno .opts import AnnoOptions
14
12
15
- __copyright__ = "Copyright (C) 2016-present, DV Klopfenstein, H Tang. All rights reserved."
13
+ from .annoreader_base import AnnoReaderBase
14
+ from .init .reader_genetogo import InitAssc
15
+ from .opts import AnnoOptions
16
+
17
+ __copyright__ = (
18
+ "Copyright (C) 2016-present, DV Klopfenstein, H Tang. All rights reserved."
19
+ )
16
20
__author__ = "DV Klopfenstein"
17
21
18
22
19
23
# pylint: disable=broad-except,too-few-public-methods,line-too-long
20
24
class Gene2GoReader (AnnoReaderBase ):
21
25
"""Reads a Gene Annotation File (GAF). Returns a Python object."""
22
26
23
- exp_kws = {' taxids' , ' taxid' , ' namespaces' , ' godag' }
27
+ exp_kws = {" taxids" , " taxid" , " namespaces" , " godag" }
24
28
25
29
def __init__ (self , filename = None , ** kws ):
26
30
# kws: taxids or taxid
27
- super (Gene2GoReader , self ).__init__ (' gene2go' , filename , ** kws )
31
+ super ().__init__ (" gene2go" , filename , ** kws )
28
32
# Each taxid has a list of namedtuples - one for each line in the annotations
29
33
self .taxid2asscs = self ._init_taxid2asscs ()
30
34
31
35
def get_ns2assc (self , taxid = None , ** kws ):
32
36
"""Return given associations into 3 (BP, MF, CC) dicts, id2gos"""
33
- return {ns :self ._get_id2gos (nts , ** kws ) for ns , nts in self .get_ns2ntsanno (taxid ).items ()}
37
+ return {
38
+ ns : self ._get_id2gos (nts , ** kws )
39
+ for ns , nts in self .get_ns2ntsanno (taxid ).items ()
40
+ }
34
41
35
42
def get_ns2ntsanno (self , taxid = None ):
36
43
"""Return all associations in three (one for BP MF CC) dicts, id2gos"""
@@ -63,28 +70,33 @@ def get_associations(self, taxid=None):
63
70
@staticmethod
64
71
def _warning_taxid (taxid ):
65
72
"""Warn if an unexpected taxid"""
66
- pat = ('**WARNING: NO ASSOCIATIONS FOR taxid({TAXID}). '
67
- 'Taxid MUST BE AN int, list of ints, OR bool' )
73
+ pat = (
74
+ "**WARNING: NO ASSOCIATIONS FOR taxid({TAXID}). "
75
+ "Taxid MUST BE AN int, list of ints, OR bool"
76
+ )
68
77
print (pat .format (TAXID = taxid ))
69
78
return {}
70
79
71
80
def get_id2gos_nss (self , ** kws ):
72
81
"""Return all associations in a dict, id2gos, regardless of namespace"""
73
- taxids = self ._get_taxids (kws .get (' taxids' ), kws .get (' taxid' ))
82
+ taxids = self ._get_taxids (kws .get (" taxids" ), kws .get (" taxid" ))
74
83
assert taxids , "NO TAXIDS FOUND"
75
84
assc = list (chain .from_iterable ([self .taxid2asscs [t ] for t in taxids ]))
76
85
return self ._get_id2gos (assc , ** kws )
77
86
78
87
def get_name (self ):
79
88
"""Get name using taxid"""
80
89
if len (self .taxid2asscs ) == 1 :
81
- return '{BASE}_{TAXID}' .format (
82
- BASE = self .name , TAXID = next (iter (self .taxid2asscs .keys ())))
83
- return '{BASE}_various' .format (BASE = self .name )
90
+ return "{BASE}_{TAXID}" .format (
91
+ BASE = self .name , TAXID = next (iter (self .taxid2asscs .keys ()))
92
+ )
93
+ return "{BASE}_various" .format (BASE = self .name )
84
94
85
95
def get_taxid (self ):
86
96
"""Return taxid, if one was provided. Other wise return True representing all taxids"""
87
- return next (iter (self .taxid2asscs .keys ())) if len (self .taxid2asscs ) == 1 else True
97
+ return (
98
+ next (iter (self .taxid2asscs .keys ())) if len (self .taxid2asscs ) == 1 else True
99
+ )
88
100
89
101
def has_ns (self ):
90
102
"""Return True if namespace field, NS exists on annotation namedtuples"""
@@ -96,56 +108,66 @@ def prt_counts(self, prt=sys.stdout):
96
108
num_annos = sum (len (a ) for a in self .taxid2asscs .values ())
97
109
# 792,891 annotations for 3 taxids stored: 10090 7227 9606
98
110
cnts = self ._get_counts (list (chain .from_iterable (self .taxid2asscs .values ())))
99
- prt .write ('{A:8,} annotations, {P:,} proteins/genes, {G:,} GO IDs, {N} taxids stored' .format (
100
- A = num_annos , N = num_taxids , G = cnts ['GOs' ], P = cnts ['geneids' ]))
111
+ prt .write (
112
+ "{A:8,} annotations, {P:,} proteins/genes, {G:,} GO IDs, {N} taxids stored" .format (
113
+ A = num_annos , N = num_taxids , G = cnts ["GOs" ], P = cnts ["geneids" ]
114
+ )
115
+ )
101
116
if num_taxids < 5 :
102
- prt .write (': {Ts}' .format (Ts = ' ' .join (str (t ) for t in sorted (self .taxid2asscs ))))
103
- prt .write ('\n ' )
117
+ prt .write (
118
+ ": {Ts}" .format (Ts = " " .join (str (t ) for t in sorted (self .taxid2asscs )))
119
+ )
120
+ prt .write ("\n " )
104
121
# 102,430 annotations for taxid 7227
105
122
# 323,776 annotations for taxid 9606
106
123
# 366,685 annotations for taxid 10090
107
124
if num_taxids == 1 :
108
125
return
109
126
for taxid , assc in self .taxid2asscs .items ():
110
127
cnts = self ._get_counts (assc )
111
- prt .write ('{A:8,} annotations, {P:,} proteins/genes, {G:,} GO IDs for taxid {T}\n ' .format (
112
- A = len (assc ), T = taxid , G = cnts ['GOs' ], P = cnts ['geneids' ]))
128
+ prt .write (
129
+ "{A:8,} annotations, {P:,} proteins/genes, {G:,} GO IDs for taxid {T}\n " .format (
130
+ A = len (assc ), T = taxid , G = cnts ["GOs" ], P = cnts ["geneids" ]
131
+ )
132
+ )
113
133
114
134
@staticmethod
115
135
def _get_counts (nts ):
116
136
"""Return the count of GO IDs and genes/proteins in a set of annotation namedtuples"""
117
137
sets = cx .defaultdict (set )
118
138
for ntd in nts :
119
- sets [' geneids' ].add (ntd .DB_ID )
120
- sets [' GOs' ].add (ntd .GO_ID )
121
- return {' GOs' : len (sets [' GOs' ]), ' geneids' : len (sets [' geneids' ])}
139
+ sets [" geneids" ].add (ntd .DB_ID )
140
+ sets [" GOs" ].add (ntd .GO_ID )
141
+ return {" GOs" : len (sets [" GOs" ]), " geneids" : len (sets [" geneids" ])}
122
142
123
143
# -- taxids2asscs -------------------------------------------------------------------------
124
144
def get_taxid2asscs (self , taxids = None , ** kws ):
125
145
"""Read Gene Association File (GAF). Return data."""
126
146
# WAS: get_annotations_taxid2dct
127
- taxid2asscs = cx .defaultdict (lambda : cx .defaultdict (lambda : cx .defaultdict (set )))
147
+ taxid2asscs = cx .defaultdict (
148
+ lambda : cx .defaultdict (lambda : cx .defaultdict (set ))
149
+ )
128
150
options = AnnoOptions (self .evobj , ** kws )
129
151
for taxid in self ._get_taxids (taxids ):
130
152
nts = self .taxid2asscs [taxid ]
131
153
assc = self .reduce_annotations (nts , options )
132
- taxid2asscs [taxid ][' ID2GOs' ] = self .get_dbid2goids (assc )
133
- taxid2asscs [taxid ][' GO2IDs' ] = self .get_goid2dbids (assc )
154
+ taxid2asscs [taxid ][" ID2GOs" ] = self .get_dbid2goids (assc )
155
+ taxid2asscs [taxid ][" GO2IDs" ] = self .get_goid2dbids (assc )
134
156
return taxid2asscs
135
157
136
158
@staticmethod
137
159
def fill_taxid2asscs (taxid2asscs_usr , taxid2asscs_ret ):
138
160
"""Fill user taxid2asscs for backward compatibility."""
139
161
for taxid , ab_ret in taxid2asscs_ret .items ():
140
- taxid2asscs_usr [taxid ][' ID2GOs' ] = ab_ret [' ID2GOs' ]
141
- taxid2asscs_usr [taxid ][' GO2IDs' ] = ab_ret [' GO2IDs' ]
162
+ taxid2asscs_usr [taxid ][" ID2GOs" ] = ab_ret [" ID2GOs" ]
163
+ taxid2asscs_usr [taxid ][" GO2IDs" ] = ab_ret [" GO2IDs" ]
142
164
143
165
@staticmethod
144
166
def get_id2gos_all (taxid2asscs_a2b ):
145
167
"""Get associations for all stored species taxid2asscs[taxid][ID2GOs|GO2IDs]."""
146
168
id2gos_all = {}
147
169
for a2b in taxid2asscs_a2b .values ():
148
- for geneid , gos in a2b [' ID2GOs' ].items ():
170
+ for geneid , gos in a2b [" ID2GOs" ].items ():
149
171
id2gos_all [geneid ] = gos
150
172
return id2gos_all
151
173
0 commit comments