Skip to content

Commit ab0d651

Browse files
committed
can now parallelize the work
1 parent 7e6f77f commit ab0d651

9 files changed

+241
-79
lines changed

argumentparser.py

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from argparse import ArgumentParser
2+
from argparse import RawTextHelpFormatter
3+
4+
5+
parser = ArgumentParser(description='''Filters contents of large xml data sources and produces json-ified results''', formatter_class = RawTextHelpFormatter)
6+
7+
parser.add_argument('root', help='''
8+
root XPath element name, used to interpret the filter file against
9+
''')
10+
parser.add_argument('filter', help='''
11+
filter file name, containing column separated filter and map definitions like so:
12+
[XPath], [target json attribute name], [string|int], [force array]
13+
(see sample formatting file included with the source)
14+
''')
15+
parser.add_argument('source', help='''
16+
source xml formatted file
17+
''')
18+
parser.add_argument('--destination', required=False, nargs=1, help='''
19+
file name to store the generated json into; if ommited, will output to stdout
20+
''')
21+
22+
parser.add_argument('--split', required=False, help='''
23+
split the xml file into valid xml files containing elements at root
24+
leaves the split documents in a directory named: [source][timestamp]
25+
performs the json conversion using a mrjob job, running on the split xml
26+
''')
27+
28+
parser.add_argument('--split_root', required=False, help='''
29+
root node to split the xml on, if different from the root node to use when filtering
30+
''')
31+

filterprocessor.py

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import csv
2+
import io
3+
import os
4+
5+
from collections import namedtuple
6+
7+
Filter = namedtuple("Filter", "XPath jsonattr type force_array")
8+
9+
10+
11+
def load_filters(filterFileName):
12+
with open(filterFileName, 'rb') as csvfile:
13+
filters = []
14+
filterreader = csv.reader(csvfile, delimiter='|')
15+
for row in filterreader:
16+
result = Filter(
17+
XPath = row[0].strip(),
18+
jsonattr=row[1].strip(),
19+
type=row[2].strip(),
20+
force_array=len(row) > 3)
21+
filters.append(result)
22+
return filters

jsonserialize.py

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import json
2+
import importlib
3+
4+
from lxml import etree
5+
6+
def fast_iter(context, func):
7+
for event, elem in context:
8+
func(elem)
9+
elem.clear()
10+
del context
11+
12+
def convert_type(value, type_):
13+
#assuming built in type
14+
module = importlib.import_module('__builtin__')
15+
cls = getattr(module, type_)
16+
return cls(value)
17+
18+
19+
def inner_json_serialize(elem, filters, outstream):
20+
result = {}
21+
for f in filters:
22+
xp = etree.XPath(f.XPath)
23+
children = xp(elem)
24+
attr_val = []
25+
for c in children:
26+
raw_val = c.text if (type(c) is etree._Element) else c
27+
attr_val.append ( convert_type(raw_val, f.type) )
28+
if not f.force_array and len(children) < 2 and len(attr_val) > 0:
29+
attr_val = attr_val[0]
30+
elif len(attr_val) == 0:
31+
attr_val = None
32+
result[f.jsonattr] = attr_val
33+
if result is not None and len(result) > 0:
34+
outstream.write(unicode(json.dumps(result)))
35+
outstream.write(u'\n')
36+
37+
38+
def json_serialize(context,filters,outstream):
39+
fast_iter(context,
40+
lambda elem:
41+
inner_json_serialize(elem, filters, outstream))

mrjobxmljsonifier.py

+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import io
2+
import os
3+
4+
from lxml import etree
5+
from mrjob.job import MRJob
6+
from mrjob.compat import get_jobconf_value
7+
8+
from jsonserialize import json_serialize
9+
from filterprocessor import load_filters
10+
11+
class MrJobXMLJSONifier(MRJob):
12+
def mapper(self, _, line):
13+
filters = load_filters(get_jobconf_value("settings.filter"))
14+
context = etree.iterparse(line, events=('end',), tag=get_jobconf_value("settings.root"))
15+
result_file = line + ".mapped"
16+
with io.open(result_file, 'w') as file:
17+
json_serialize(context,filters,file)
18+
yield("key", result_file)
19+
20+
def reducer(self, key, file_iterator):
21+
files = list(file_iterator)
22+
23+
result_file = get_jobconf_value("settings.destination")
24+
if result_file is not None:
25+
with open(result_file, "wb") as outfile:
26+
for f in files:
27+
with open(f, "rb") as infile:
28+
outfile.write(infile.read())
29+
os.remove(f)
30+
yield key, result_file
31+
else:
32+
for f in files:
33+
with open(f, "rb") as infile:
34+
sys.stdout.write(infile.read())
35+
36+
37+
38+
39+
def steps(self):
40+
return [self.mr(mapper=self.mapper,reducer=self.reducer)]
41+
42+
43+
44+
if __name__ == "__main__":
45+
MrJobXMLJSONifier.run()

requirements.txt

+1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
lxml==3.4.0
2+
mrjob==0.4.2
23
xmlutils==1.1

test/test_ClinVarFullRelease_2014_corrected.sh

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
#!/bin/bash -v
22

3-
#curl "ftp://ftp.ncbi.nlm.nih.gov/pub/clinvar/xml/ClinVarFullRelease_2014-08.xml.gz" -o ClinVarFullRelease_2014-08.xml.gz
4-
#gunzip ClinVarFullRelease_2014-08.xml.gz
5-
63
STARTTIME=$(date +%s)
7-
python ../xmljsonifier.py "ClinVarSet" clinvar_filter_corrected.txt ClinVarFullRelease_2014-08.xml --destination=output.xml
4+
curl "ftp://ftp.ncbi.nlm.nih.gov/pub/clinvar/xml/ClinVarFullRelease_2014-08.xml.gz" -o ClinVarFullRelease_2014-08.xml.gz
5+
gunzip ClinVarFullRelease_2014-08.xml.gz
6+
7+
python ../xmljsonifier.py "ClinVarSet" clinvar_filter_corrected.txt ClinVarFullRelease_2014-08.xml --destination=output.json
88
ENDTIME=$(date +%s)
99
echo "$(($ENDTIME - $STARTTIME)) seconds to complete jsonifying..."
1010

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
#!/bin/bash -v
2+
3+
#curl "ftp://ftp.ncbi.nlm.nih.gov/pub/clinvar/xml/ClinVarFullRelease_2014-08.xml.gz" -o ClinVarFullRelease_2014-08.xml.gz
4+
#gunzip ClinVarFullRelease_2014-08.xml.gz
5+
6+
STARTTIME=$(date +%s)
7+
python ../xmljsonifier.py "ReferenceClinVarAssertion" "`pwd`/clinvar_filter_corrected.txt" ClinVarFullRelease_2014-08.xml --destination="`pwd`/output.json" --split=true --split_root="ClinVarSet"
8+
ENDTIME=$(date +%s)
9+
echo "$(($ENDTIME - $STARTTIME)) seconds to complete jsonifying..."
10+
11+
grep -o -w "<ReferenceClinVarAssertion" ClinVarFullRelease_2014-08.xml| wc -w
12+
wc -l output.json

xmlSplit.py

+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import io
2+
import os
3+
import time
4+
5+
from lxml import etree
6+
from lxml.etree import tostring
7+
8+
def create_and_set_output_dir (name):
9+
output_directory = name + time.strftime("%Y%m%d-%H%M%S")
10+
if not os.path.exists(output_directory):
11+
os.makedirs(output_directory)
12+
return output_directory
13+
14+
def fast_iter(context, batch_size, output_directory, func):
15+
batch = 0
16+
count = 0
17+
18+
for event, elem in context:
19+
if count == 0:
20+
with io.open(os.path.join(output_directory, str(batch)), 'a') as outstream:
21+
outstream.write(u'<split_root>\n')
22+
func(elem, batch)
23+
elem.clear()
24+
count = count + 1
25+
if count == batch_size:
26+
with io.open(os.path.join(output_directory, str(batch)), 'a') as outstream:
27+
outstream.write(u'\n</split_root>')
28+
batch = batch + 1
29+
count = 0
30+
if count > 0:
31+
with io.open(os.path.join(output_directory, str(batch)), 'a') as outstream:
32+
outstream.write(u'\n</split_root>')
33+
del context
34+
35+
def dump_to_file(elem, output_directory, batch):
36+
with io.open(os.path.join(output_directory, str(batch)), 'a') as outstream:
37+
outstream.write(unicode(tostring(elem, with_tail=False)))
38+
39+
def xmlSplit(xmlFileName, root, batch_size):
40+
output_directory = create_and_set_output_dir(xmlFileName)
41+
context = etree.iterparse(xmlFileName, events=('end',), tag=root)
42+
fast_iter(context, batch_size, output_directory,
43+
lambda elem, count:
44+
dump_to_file(elem, output_directory, count))
45+
return output_directory

xmljsonifier.py

+40-75
Original file line numberDiff line numberDiff line change
@@ -1,92 +1,57 @@
11
import sys
2-
import csv
32
import json
3+
import os
44
import io
55

6-
from argparse import ArgumentParser
7-
from argparse import RawTextHelpFormatter
8-
from copy import deepcopy
9-
from collections import namedtuple
106
from lxml import etree
117

12-
Filter = namedtuple("Filter", "XPath jsonattr type force_array")
8+
from xmlSplit import xmlSplit
9+
from jsonserialize import json_serialize
10+
from argumentparser import parser
11+
from filterprocessor import load_filters
12+
from mrjobxmljsonifier import MrJobXMLJSONifier
1313

14-
parser = ArgumentParser(description='''Filters contents of large xml data sources and produces json-ified results''', formatter_class = RawTextHelpFormatter)
1514

16-
parser.add_argument('root', help='''
17-
root XPath element name, used to interpret the filter file against
18-
''')
19-
parser.add_argument('filter', help='''
20-
filter file name, containing column separated filter and map definitions like so:
21-
[XPath], [target json attribute name], [type], [force array]
22-
(see sample formatting file included with the source)
23-
''')
24-
parser.add_argument('source', help='''
25-
source xml formatted file
26-
''')
27-
parser.add_argument('--destination', required=False, nargs=1, help='''
28-
file name to store the generated json into; if ommited, will output to stdout
29-
''')
15+
cargs = parser.parse_args()
3016

3117

32-
args = parser.parse_args()
18+
if (cargs.split is not None):
19+
root_elem = cargs.root if cargs.split_root is None else cargs.split_root
20+
21+
splitfilesdirectory = xmlSplit(cargs.source, root_elem, 5000)
22+
with open("splitfiles.tmp", "w") as split_file_list:
23+
for path, subdirs, files in os.walk(splitfilesdirectory):
24+
for filename in files:
25+
f = os.path.join(path, filename)
26+
split_file_list.write(os.path.join(os.getcwd(), str(f)) + os.linesep)
27+
#'-r', 'local',
28+
mr_jsonifier= MrJobXMLJSONifier(
29+
args=[ '-r', 'local',
30+
'--jobconf', 'settings.root=' + cargs.root,
31+
'--jobconf', 'settings.filter=' + cargs.filter,
32+
'--jobconf', 'settings.destination=' + cargs.destination[0],
33+
'splitfiles.tmp'])
34+
with mr_jsonifier.make_runner() as runner:
35+
runner.run()
36+
os.remove("splitfiles.tmp")
37+
sys.exit(0)
38+
3339

3440
# read filter definitions
35-
filters = []
36-
with open(args.filter, 'rb') as csvfile:
37-
filterreader = csv.reader(csvfile, delimiter='|')
38-
for row in filterreader:
39-
result = Filter(
40-
XPath = row[0].strip(),
41-
jsonattr=row[1].strip(),
42-
type=row[2].strip(),
43-
force_array=len(row) > 3)
44-
filters.append(result)
45-
46-
47-
def fast_iter(context, func):
48-
for event, elem in context:
49-
func(elem)
50-
elem.clear()
51-
del context
41+
filters = load_filters(cargs.filter)
42+
43+
44+
5245

5346
#attempts to guess at (and convert into) a builtin type based on a string
54-
def convert_type(value, type_):
55-
import importlib
56-
#assuming built in type
57-
module = importlib.import_module('__builtin__')
58-
cls = getattr(module, type_)
59-
return cls(value)
60-
61-
def json_serialize(elem, outstream):
62-
result = {}
63-
for f in filters:
64-
xp = etree.XPath(f.XPath)
65-
children = xp(elem)
66-
attr_val = []
67-
for c in children:
68-
raw_val = c.text if (type(c) is etree._Element) else c
69-
attr_val.append (convert_type(raw_val, f.type))
70-
if not f.force_array and len(children) < 2 and len(attr_val) > 0:
71-
attr_val = attr_val[0]
72-
elif len(attr_val) == 0:
73-
attr_val = None
74-
result[f.jsonattr] = attr_val
75-
if result is not None and len(result) > 0:
76-
outstream.write(unicode(json.dumps(result)))
77-
outstream.write(u'\n') #easier on the eyes
78-
79-
80-
context = etree.iterparse(args.source, events=('end',), tag=args.root)
81-
82-
if args.destination is not None:
83-
with io.open(args.destination[0], 'w') as file:
84-
fast_iter(context,
85-
lambda elem:
86-
json_serialize(elem, file))
47+
48+
49+
context = etree.iterparse(cargs.source, events=('end',), tag=cargs.root)
50+
51+
if cargs.destination is not None:
52+
with io.open(cargs.destination[0], 'w') as file:
53+
json_serialize(context,filters,file)
8754
else:
88-
fast_iter(context,
89-
lambda elem:
90-
json_serialize(elem, sys.stdout))
55+
json_serialize(context, filters, sys.stdout)
9156

9257

0 commit comments

Comments
 (0)