Skip to content

Commit 2966100

Browse files
author
drjwbaker
committed
first open repo push
0 parents  commit 2966100

File tree

84 files changed

+2531
-0
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

84 files changed

+2531
-0
lines changed

LICENSE

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
The MIT License (MIT)
2+
3+
Copyright (c) 2015 University College London
4+
5+
Permission is hereby granted, free of charge, to any person obtaining a copy
6+
of this software and associated documentation files (the "Software"), to deal
7+
in the Software without restriction, including without limitation the rights
8+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
copies of the Software, and to permit persons to whom the Software is
10+
furnished to do so, subject to the following conditions:
11+
12+
The above copyright notice and this permission notice shall be included in all
13+
copies or substantial portions of the Software.
14+
15+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
SOFTWARE.
22+

README.md

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# Enabling Complex Analysis of Large Scale Digital Collections
2+
3+
This repository contains code, data, and other outputs from the first phase of '[Enabling Complex Analysis of Large Scale Digital Collections](http://figshare.com/articles/Enabling_Complex_Analysis_of_Large_Scale_Digital_Collections/1319482)', a project funded by the [Jisc Research Data Spring](http://opensource.org/licenses/MIT).
4+
5+
The core project team are:
6+
7+
- PI Melissa Terras (UCL)
8+
- CI James Baker (British Library)
9+
- CI David Beavan (UCL)
10+
- CI James Hetherington (UCL)
11+
- CI Martin Zaltz Austwick (UCL)
12+
13+
Associated researchers (without who research questions none of this could have happened!) are:
14+
- Oliver Duke-Williams (UCL)
15+
- Will Finley (Sheffield)
16+
- Helen O'Neill (UCL)
17+
- Anne Welsh (UCL)
18+
19+
All code, data, and other outputs are available for use and reuse under a [MIT Licence](http://opensource.org/licenses/MIT)
20+
21+
For more info on the project see the [UCL DH](http://blogs.ucl.ac.uk/dh/2015/05/07/bluclobber-or-enabling-complex-analysis-of-large-scale-digital-collections/) and [British Library Digital Scholarship](http://britishlibrary.typepad.co.uk/digital-scholarship/) blogs.

bluclobber/harness/__init__.py

Whitespace-only changes.

bluclobber/harness/decomposer.py

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import logging
2+
from itertools import islice
3+
4+
class Decomposer(object):
5+
def __init__(self, iterable, communicator=None, rank=None, size=None, subsample=1, offsets=None):
6+
logger=logging.getLogger('performance')
7+
self.logger=logger
8+
if not size:
9+
if not communicator:
10+
logger.debug("Assuming default rank and size")
11+
rank=0
12+
size=1
13+
else:
14+
logger.debug("Rank and size from MPI communicator")
15+
rank=communicator.rank
16+
size=communicator.size
17+
self.iterable=iterable
18+
if not offsets:
19+
self.count=len(iterable)/size
20+
if rank==size-1:
21+
self.remainder=len(iterable)%size
22+
else:
23+
self.remainder=0
24+
self.start=self.count*rank
25+
self.end=self.count*(rank+1)+self.remainder
26+
self.step=subsample
27+
self.step_offset=self.start%self.step
28+
logger.debug("Splitting " +str(len(iterable))+ " items into " + str(size) + " chunks of " + str(self.count))
29+
logger.debug("This is chunk " + str(rank) + " from " +str(self.start) + " to " + str(self.end))
30+
31+
def __str__(self):
32+
return "Decomposer of len " + str(len(self)) + " from " +str(self.start) + " to " + str(self.end) + " in steps " + str(self.step)
33+
34+
def __getitem__(self, index):
35+
if index >= len(self):
36+
raise IndexError
37+
new_index=self.start+index*self.step+self.step_offset
38+
return self.iterable[new_index]
39+
40+
def __len__(self):
41+
return (self.end-1)/self.step-(self.start-1)/self.step

bluclobber/harness/mapreduce.py

+102
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
from functools import reduce
2+
from mpi4py import MPI
3+
from itertools import islice
4+
from datetime import datetime as time
5+
import logging
6+
from collections import defaultdict
7+
8+
from decomposer import Decomposer
9+
10+
class MapReduce(object):
11+
def __init__(self, mapper, reducer, communicator=None, subsample=1, shuffler=None, prepartitioned=False ):
12+
self.unsafe_mapper = mapper
13+
self.unsafe_reducer = reducer
14+
self.unsafe_shuffler = shuffler
15+
self.subsample = subsample
16+
self.communicator=communicator
17+
self.prepartitioned=prepartitioned
18+
self.logger=logging.getLogger('performance')
19+
# safe reduce
20+
def safeReducer(a, b):
21+
if a is None:
22+
return b
23+
if b is None:
24+
return a
25+
return self.unsafe_reducer(a,b)
26+
self.reducer=safeReducer
27+
# safe map
28+
def safeMap(arg):
29+
self.logger.debug("Entered mapper")
30+
try:
31+
result= self.unsafe_mapper(arg)
32+
self.logger.debug("Exiting mapper")
33+
return result
34+
except Exception as e:
35+
self.logger.warn("Problem with map")
36+
self.logger.warn(str(e))
37+
return None
38+
self.mapper=safeMap
39+
if shuffler:
40+
def safeShuffler(arg, count):
41+
try:
42+
return self.unsafe_shuffler(arg, count)
43+
except Exception as e:
44+
self.logger.warn("Problem with shuffle")
45+
self.logger.warn(str(e))
46+
return None
47+
self.shuffler=safeShuffler
48+
else:
49+
self.shuffler=None
50+
51+
def execute(self, data):
52+
if self.communicator and self.communicator.size>1:
53+
return self.parallel(data)
54+
else:
55+
return self.serial(data)
56+
57+
def serial(self, data):
58+
try:
59+
count=len(data)
60+
except AttributeError:
61+
count=None
62+
subsampled_data=Decomposer(data, subsample=self.subsample)
63+
quantities= map(self.mapper, subsampled_data)
64+
result = reduce(self.reducer, quantities)
65+
return result
66+
67+
def parallel(self, data):
68+
perfLogger=logging.getLogger('performance')
69+
# local map
70+
if self.prepartitioned:
71+
partition=Decomposer(data,subsample=self.subsample)
72+
else:
73+
partition=Decomposer(data, self.communicator, subsample=self.subsample )
74+
perfLogger.info("Built iterator")
75+
quantities=map(self.mapper,partition)
76+
perfLogger.info("Mapped")
77+
local_result=reduce(self.reducer, quantities)
78+
perfLogger.info("Local reduce")
79+
# reduce under mpi
80+
def reduce_arrays(x,y,dtype):
81+
# the signature for the user defined op takes a datatype, which we can ignore
82+
return self.reducer(x,y)
83+
reducer_mpi=MPI.Op.Create(reduce_arrays, True)
84+
perfLogger.debug("Local result: "+str(local_result)[0:60])
85+
if self.shuffler:
86+
perfLogger.info("Shuffling")
87+
shuffled=defaultdict(dict)
88+
if local_result:
89+
for key in local_result:
90+
shuffled[self.shuffler(key, self.communicator.size)][key]=local_result[key]
91+
for root in range(self.communicator.size):
92+
perfLogger.info("Reducing to rank "+str(root))
93+
temp=self.communicator.reduce(shuffled[root],op=reducer_mpi,root=root)
94+
if self.communicator.rank==root:
95+
result=temp
96+
else:
97+
result = self.communicator.reduce(local_result, op=reducer_mpi, root=0)
98+
result = self.communicator.bcast(result, root=0)
99+
perfLogger.info("Global reduce")
100+
101+
reducer_mpi.Free()
102+
return result

bluclobber/harness/query.py

+68
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
from ..model.corpus import Corpus
2+
from ..model.dataset import DataSet
3+
from mpi4py import MPI
4+
import imp
5+
import sys
6+
from datetime import datetime
7+
from argparse import ArgumentParser
8+
import logging
9+
import yaml
10+
from utils import *
11+
12+
shuffler=None
13+
reporter=None
14+
parser=Corpus
15+
16+
def main():
17+
args = clparser(sys.argv[1:])
18+
execfile(args.query_path, globals()) # must define 'mapper' and 'reducer'
19+
# may define shuffler and reporter
20+
perfLogger=logging.getLogger('performance')
21+
communicator=MPI.COMM_WORLD
22+
perfLogger.setLevel(getattr(logging,args.loglevel.upper()))
23+
stdout=logging.StreamHandler()
24+
stdout.setFormatter(logging.Formatter(str(communicator.rank)+'/'+str(communicator.size)+
25+
' %(levelname)s: %(asctime)s %(message)s'))
26+
perfLogger.addHandler(stdout)
27+
result = query(mapper, reducer, args.corpus_path, args.downsample, args.bybook,
28+
parser=parser, shuffler=shuffler, reporter=reporter)
29+
outpath=args.outpath+'_'+str(MPI.COMM_WORLD.rank)+'.yml'
30+
if result:
31+
if args.outpath:
32+
with open(outpath,'w') as result_file:
33+
result_file.write(yaml.safe_dump(result))
34+
perfLogger.info("Written result")
35+
else:
36+
print result
37+
38+
def clparser(commandline):
39+
clparser=ArgumentParser(description="Analyse a corpus")
40+
clparser.add_argument('query_path',type=str, help='path to python file describing query')
41+
clparser.add_argument('corpus_path',type=str, help='path to folder containing zipped corpus')
42+
clparser.add_argument('--downsample',type=int, metavar='N', default=1, help='optionally, use only every Nth zipfile')
43+
clparser.add_argument('--bybook', action="store_true", default=False)
44+
clparser.add_argument('--outpath', default=None, type=str, help = 'output path to yaml dump result')
45+
clparser.add_argument('--loglevel', default='info', type=str, help = 'log level (debug, info, warn, error)')
46+
args=clparser.parse_args(commandline)
47+
return args
48+
49+
def query(mapper, reducer, corpus_path, downsample=1, bybook=False, parser=parser, shuffler=None, reporter=None):
50+
communicator=MPI.COMM_WORLD
51+
perfLogger=logging.getLogger('performance')
52+
if parser==Corpus:
53+
corpus=Corpus(corpus_path,communicator)
54+
perfLogger.info("Constructed")
55+
result = corpus.analyse(mapper, reducer, downsample, bybook, shuffler=shuffler)
56+
else:
57+
corpus=DataSet(parser, corpus_path, communicator)
58+
result = corpus.analyse_by_file(mapper, reducer, downsample, shuffler=shuffler )
59+
perfLogger.info("Finished analysis")
60+
if (not shuffler) and communicator.rank !=0:
61+
result=None
62+
if reporter and result:
63+
result=reporter(result)
64+
perfLogger.info("Finished postprocessing")
65+
return result
66+
67+
if __name__ == "__main__":
68+
main()

bluclobber/harness/repartition.py

+98
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
import os
2+
import zipfile
3+
import sys
4+
from argparse import ArgumentParser
5+
from itertools import islice
6+
import shutil
7+
import tempfile
8+
import subprocess
9+
import logging
10+
from mpi4py import MPI
11+
import glob
12+
from ..model.corpus import Corpus
13+
from decomposer import Decomposer
14+
15+
def main():
16+
perfLogger=logging.getLogger('performance')
17+
communicator=MPI.COMM_WORLD
18+
perfLogger.setLevel(logging.DEBUG)
19+
stdout=logging.StreamHandler()
20+
stdout.setFormatter(logging.Formatter(str(communicator.rank)+'/'+str(communicator.size)+
21+
' %(levelname)s: %(asctime)s %(message)s'))
22+
perfLogger.addHandler(stdout)
23+
args = parser(sys.argv[1:])
24+
# verify outpath exists or create if not
25+
try:
26+
os.makedirs(args.out_path)
27+
except os.error:
28+
pass # Folder exists, nicer in Python 3
29+
# restripe if Lustre striping given
30+
if args.stripe:
31+
subprocess.check_call(['lfs','setstripe','--count', args.stripe, args.out_path])
32+
if '.zip' in args.in_path:
33+
repartition_from_metazip(args.in_path,args.out_path,args.split)
34+
else:
35+
repartition(args.in_path,args.out_path,args.split, args.downsample)
36+
37+
def parser(commandline):
38+
parser=ArgumentParser(description="Repartition a corpus")
39+
parser.add_argument('in_path',type=str, help='path to corpus to repartition')
40+
parser.add_argument('out_path',type=str, help='path to folder to contain repartitioned corpus')
41+
parser.add_argument('--downsample',type=int, metavar='N', default=1, help='optionally, use only every Nth book')
42+
parser.add_argument('--split',type=int, metavar='N', default=64, help='repartition to N zipfiles')
43+
parser.add_argument('--stripe',type=int, metavar='N', default=None, help='Lustre striping for output')
44+
args=parser.parse_args(commandline)
45+
return args
46+
47+
def repartition(in_path, out_path, split, downsample=1, filter=lambda x: True):
48+
perfLogger=logging.getLogger('performance')
49+
corpus=Corpus(in_path, communicator=MPI.COMM_WORLD)
50+
this_processor_out=Decomposer(range(split), communicator=MPI.COMM_WORLD)
51+
processor_paths=Decomposer(corpus.paths, communicator=MPI.COMM_WORLD)
52+
processor_corpus=Corpus(processor_paths)
53+
for chunk_index, chunk in enumerate(this_processor_out):
54+
perfLogger.info("Starting output zip "+str(chunk))
55+
books=Decomposer(processor_corpus, rank=chunk_index, size=len(this_processor_out), subsample=downsample )
56+
perfLogger.debug("Will handle "+str(len(books)) +" books.")
57+
with zipfile.ZipFile(os.path.join(out_path,'chunk'+str(chunk)+'.zip'),'w',allowZip64=True) as outzip:
58+
for book in books:
59+
book.load()
60+
if not filter(book):
61+
continue
62+
info=book.zip_info()
63+
# transfer from small zip to bigger zip
64+
outzip.writestr(info, book.archive.zip.read(info))
65+
for page_code in book.page_codes:
66+
info=book.page_zip_info(page_code)
67+
outzip.writestr(info, book.archive.zip.read(info))
68+
perfLogger.info("Completed output zip " +str(chunk))
69+
MPI.COMM_WORLD.Barrier()
70+
71+
def repartition_from_metazip(in_zip, out_path, split):
72+
tmpdir=tempfile.mkdtemp()
73+
this_processor=Decomposer(range(split))
74+
with zipfile.ZipFile(in_zip) as metazip:
75+
inzips=metazip.infolist()
76+
for chunk in this_processor:
77+
# open a zip for writing
78+
with zipfile.ZipFile(os.path.join(out_path,'chunk'+str(chunk)+'.zip'),'w',allowZip64=True) as outzip:
79+
this_chunk=list(islice(metazip.infolist(),chunk,None,split))
80+
for archive in this_chunk:
81+
# open a smaller zip
82+
metazip.extract(archive,tmpdir)
83+
small=os.path.join(tmpdir,archive.filename)
84+
# should be able to do this in memory, but
85+
# zipfile doesn't like importing from file-like-object
86+
try:
87+
with zipfile.ZipFile(small) as inzip:
88+
# transfer from small zip to bigger zip
89+
for info in inzip.infolist():
90+
outzip.writestr(info, inzip.read(info))
91+
except zipfile.BadZipfile:
92+
print "Bad file:", archive.filename
93+
os.remove(small)
94+
shutil.rmtree(tmpdir)
95+
MPI.COMM_WORLD.Barrier()
96+
97+
98+

0 commit comments

Comments
 (0)