1
1
from pathlib import Path
2
2
3
+ import Bio
3
4
import attr
4
5
import os
6
+
7
+ import networkx as nx
5
8
from Bio import SeqIO
6
9
from Bio .Seq import Seq
7
10
from Bio .SeqRecord import SeqRecord
@@ -58,13 +61,13 @@ class Prune(object):
58
61
tmpdir = attr .ib ()
59
62
mccortex_builder = attr .ib (attr .Factory (builder .Mccortex ))
60
63
min_tip_length = attr .ib (None )
61
- last_record = attr .ib (None )
64
+ records = attr .ib (attr . Factory ( list ) )
62
65
kmer_size = attr .ib (None )
63
66
64
67
def with_records (self , * records ):
65
68
for rec in records :
66
69
self .mccortex_builder .with_dna_sequence (rec )
67
- self .last_record = rec
70
+ self .records . append ( SeqRecord ( Seq ( rec )))
68
71
return self
69
72
70
73
def prune_tips_less_than (self , n ):
@@ -77,18 +80,110 @@ def with_kmer_size(self, size):
77
80
return self
78
81
79
82
def run (self ):
80
- import networkx as nx
81
83
mccortex_graph = self .mccortex_builder .build (self .tmpdir )
82
84
83
85
cortexpy_graph = self .tmpdir / 'cortexpy_graph.pickle'
84
- initial_contig = self .last_record [0 :self .kmer_size ]
86
+ contig_fasta = self .tmpdir / 'initial_contigs.fa'
87
+ with open (str (contig_fasta ), 'w' ) as fh :
88
+ SeqIO .write (self .records , fh , 'fasta' )
85
89
ctp_runner = runner .Cortexpy (SPAWN_PROCESS )
86
- ctp_runner .traverse (graph = mccortex_graph , out = cortexpy_graph , contig = initial_contig )
90
+ ctp_runner .traverse (graph = mccortex_graph , out = cortexpy_graph , contig = contig_fasta ,
91
+ contig_fasta = True , subgraphs = True )
87
92
88
93
pruned_graph = Path (cortexpy_graph ).with_suffix ('.pruned.pickle' )
89
94
completed_process = ctp_runner .prune (graph = cortexpy_graph , out = pruned_graph ,
90
95
remove_tips = self .min_tip_length )
91
96
92
97
assert completed_process .returncode == 0 , completed_process
93
98
94
- return expectation .KmerGraphExpectation (nx .read_gpickle (str (pruned_graph )))
99
+ subgraphs = list (load_graph_stream (str (pruned_graph )))
100
+ return expectation .graph .KmerGraphsExpectation (subgraphs )
101
+
102
+
103
+ @attr .s (slots = True )
104
+ class Traverse (object ):
105
+ """Runner for traverse acceptance tests"""
106
+ tmpdir = attr .ib ()
107
+ mccortex_builder = attr .ib (attr .Factory (builder .Mccortex ))
108
+ traversal_contigs = attr .ib (None )
109
+ added_records = attr .ib (attr .Factory (list ))
110
+ output_subgraphs = attr .ib (False )
111
+ cortexpy_graph = attr .ib (init = False )
112
+
113
+ def with_records (self , * records ):
114
+ for rec in records :
115
+ self .mccortex_builder .with_dna_sequence (rec )
116
+ self .added_records .append (rec )
117
+ return self
118
+
119
+ def with_initial_contigs (self , * contigs ):
120
+ self .traversal_contigs = contigs
121
+
122
+ def with_subgraph_output (self ):
123
+ self .output_subgraphs = True
124
+ return self
125
+
126
+ def with_kmer_size (self , size ):
127
+ self .mccortex_builder .with_kmer_size (size )
128
+ return self
129
+
130
+ def run (self ):
131
+ mccortex_graph = self .mccortex_builder .build (self .tmpdir )
132
+ contig_fasta = self .tmpdir / 'cortexpy_initial_contigs.fa'
133
+ if self .traversal_contigs :
134
+ initial_contigs = self .traversal_contigs
135
+ else :
136
+ initial_contigs = self .added_records
137
+ with open (str (contig_fasta ), 'w' ) as fh :
138
+ Bio .SeqIO .write ([SeqRecord (Seq (s )) for s in initial_contigs ], fh , 'fasta' )
139
+
140
+ self .cortexpy_graph = self .tmpdir / 'cortexpy_graph.pickle'
141
+ ctp_runner = runner .Cortexpy (SPAWN_PROCESS )
142
+ completed_process = ctp_runner .traverse (graph = mccortex_graph ,
143
+ out = self .cortexpy_graph ,
144
+ contig = contig_fasta ,
145
+ contig_fasta = True ,
146
+ subgraphs = self .output_subgraphs )
147
+
148
+ subgraphs = list (load_graph_stream (self .cortexpy_graph ))
149
+ assert completed_process .returncode == 0 , completed_process
150
+
151
+ return expectation .graph .KmerGraphsExpectation (subgraphs )
152
+
153
+
154
+ @attr .s (slots = True )
155
+ class ViewTraversal (object ):
156
+ """Runner for view of traversal acceptance tests"""
157
+ tmpdir = attr .ib ()
158
+ traverse_driver = attr .ib (init = False )
159
+
160
+ def __attrs_post_init__ (self ):
161
+ self .traverse_driver = Traverse (self .tmpdir )
162
+
163
+ def with_records (self , * records ):
164
+ self .traverse_driver .with_records (* records )
165
+ return self
166
+
167
+ def with_subgraph_output (self ):
168
+ self .traverse_driver .with_subgraph_output ()
169
+ return self
170
+
171
+ def with_kmer_size (self , size ):
172
+ self .traverse_driver .with_kmer_size (size )
173
+ return self
174
+
175
+ def run (self ):
176
+ self .traverse_driver .run ()
177
+ ret = runner .Cortexpy (SPAWN_PROCESS ).view (
178
+ cortexpy_graph = self .traverse_driver .cortexpy_graph )
179
+ assert ret .returncode == 0 , ret
180
+ return expectation .Fasta (ret .stdout )
181
+
182
+
183
+ def load_graph_stream (path ):
184
+ with open (str (path ), 'rb' ) as fh :
185
+ while True :
186
+ try :
187
+ yield nx .read_gpickle (fh )
188
+ except EOFError :
189
+ break
0 commit comments