diff --git a/harvester/post_processing/batch_update_couchdb_by_collection.py b/harvester/post_processing/batch_update_couchdb_by_collection.py index 16b5920..2875ec3 100755 --- a/harvester/post_processing/batch_update_couchdb_by_collection.py +++ b/harvester/post_processing/batch_update_couchdb_by_collection.py @@ -7,74 +7,71 @@ PATH_DELIM = '/' +def traverse(doc, path): + if PATH_DELIM not in path: + return doc, path + parent, remainder = tuple(path.lstrip(PATH_DELIM).split(PATH_DELIM, 1)) + return traverse(doc[parent], remainder) -def setprop(obj, path, val, substring, keyErrorAsNone=False): - """ - Sets the value of the key identified by interpreting - the path as a delimited hierarchy of keys. Enumerate - if object is list. - """ - if '/' not in path: - if type(obj[path]) == list: - values = [] - for t in obj[path]: - if substring: - values.append(t.replace(substring,val)) - else: - values.append(val) - obj[path] = values - return - else: - if path not in obj: - if not keyErrorAsNone: - raise KeyError('Path not found in object: %s' % (path)) - else: - return None - else: - if substring: - obj[path] = obj[path].replace(substring,val) - else: - obj[path] = val - return - if type(obj) == list: - obj = obj[0] - pp, pn = tuple(path.lstrip(PATH_DELIM).split(PATH_DELIM, 1)) - if pp not in obj: - if not keyErrorAsNone: - raise KeyError('Path not found in object: %s (%s)' % (path, pp)) - else: - return None +def substring_replace(doc, path, newValue, substring): + parent, field = traverse(doc, path) + # this will throw a key error if the field doesn't exist + if isinstance(parent[field], list): + parent[field] = [val.replace(substring, newValue) for val in parent[field]] + else: + parent[field] = parent[field].replace(substring, newValue) + return - return setprop(obj[pp], pn, val, substring, keyErrorAsNone) +def replace(doc, path, newValue): + parent, field = traverse(doc, path) + # this will quietly set the field to newValue if it doesn't exist + if field in parent and isinstance(parent[field], list): + parent[field] = [newValue] + else: + parent[field] = newValue + return +def append(doc, path, newValue): + parent, field = traverse(doc, path) + # this will quietly set the field to [newValue] if it doesn't exist + if field not in parent: + parent[field] = [] + if isinstance(parent[field], list): + parent[field].append(newValue) + else: + parent[field] = [parent[field], newValue] -def update_by_id_list(ids, fieldName, newValue, substring, _couchdb=None): +def update_by_id_list(ids, fieldName, newValue, substring, append=False, _couchdb=None): '''For a list of couchdb ids, given field name and new value, update the doc[fieldname] with "new value"''' updated = [] num_updated = 0 - print >> sys.stderr, "SUBSTRING 1: {}".format(substring) for did in ids: doc = _couchdb.get(did) if not doc: continue - setprop(doc, fieldName, newValue, substring) + if substring: + print >> sys.stderr, "SUBSTRING 1: {}".format(substring) + substring_replace(doc, fieldName, newValue, substring) + if append: + append(doc, fieldName, newValue) + else: + replace(doc, fieldName, newValue) _couchdb.save(doc) updated.append(did) print >> sys.stderr, "UPDATED: {0}".format(did) num_updated += 1 return num_updated, updated - -def update_couch_docs_by_collection(cid, fieldName, newValue, substring): +def update_couch_docs_by_collection(cid, fieldName, newValue, substring, append=False): print >> sys.stderr, "UPDATING DOCS FOR COLLECTION: {}".format(cid) _couchdb = get_couchdb() rows = CouchDBCollectionFilter(collection_key=cid, couchdb_obj=_couchdb) ids = [row['id'] for row in rows] num_updated, updated_docs = update_by_id_list( - ids, fieldName, newValue, substring, _couchdb=_couchdb) + ids, fieldName, newValue, substring, append, _couchdb=_couchdb) subject = format_results_subject(cid, 'Updated documents from CouchDB {env} ') publish_to_harvesting( subject, 'Updated {} documents from CouchDB collection CID: {}'.format( num_updated, cid)) - return num_updated, updated_docs + return num_updated, updated_docs \ No newline at end of file diff --git a/scripts/queue_batch_append_couchdb.py b/scripts/queue_batch_append_couchdb.py new file mode 100644 index 0000000..169606b --- /dev/null +++ b/scripts/queue_batch_append_couchdb.py @@ -0,0 +1,116 @@ +#! /bin/env python +# -*- coding: utf-8 -*- +import sys +import logbook +from harvester.config import config as config_harvest +from redis import Redis +from rq import Queue + +JOB_TIMEOUT = 28800 # 8 hrs + + +def def_args(): + import argparse + parser = argparse.ArgumentParser( + description='Batch append field with new value for a given couchdb collection' + ) + parser.add_argument('user_email', type=str, help='user email') + parser.add_argument('rq_queue', type=str, help='RQ queue to put job in') + parser.add_argument('cid', type=str, help='Collection ID') + parser.add_argument( + 'fieldName', type=str, help='Field to update. Cannot be _id or _rev') + parser.add_argument( + 'newValue', type=str, help='New value to append to field') + return parser + + +def queue_update_couchdb_field(redis_host, + redis_port, + redis_password, + redis_timeout, + rq_queue, + collection_key, + fieldName, + newValue, + substring, + timeout=JOB_TIMEOUT): + rQ = Queue( + rq_queue, + connection=Redis( + host=redis_host, + port=redis_port, + password=redis_password, + socket_connect_timeout=redis_timeout)) + job = rQ.enqueue_call( + func='harvester.post_processing.batch_update_couchdb_by_collection.update_couch_docs_by_collection', + args=(collection_key, fieldName, newValue, None, True), + timeout=timeout) + return job + +def main(collection_keys, + fieldName, + newValue, + log_handler=None, + config_file='akara.ini', + rq_queue=None, + **kwargs): + config = config_harvest(config_file=config_file) + + if not log_handler: + log_handler = logbook.StderrHandler(level='DEBUG') + log_handler.push_application() + for collection_key in [x for x in collection_keys.split(';')]: + queue_update_couchdb_field( + config['redis_host'], + config['redis_port'], + config['redis_password'], + config['redis_connect_timeout'], + rq_queue=rq_queue, + fieldName=fieldName, + newValue=newValue, + collection_key=collection_key, + **kwargs) + + log_handler.pop_application() + + +if __name__ == '__main__': + parser = def_args() + args = parser.parse_args(sys.argv[1:]) + if not args.cid or not args.fieldName or not args.newValue: + parser.print_help() + sys.exit(27) + if args.fieldName == '_id' or args.fieldName == '_rev': + parser.print_help() + sys.exit(27) + kwargs = {} + main( + args.cid, + args.fieldName, + args.newValue, + rq_queue=args.rq_queue, + **kwargs) + +# Copyright © 2016, Regents of the University of California +# All rights reserved. +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# - Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# - Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# - Neither the name of the University of California nor the names of its +# contributors may be used to endorse or promote products derived from this +# software without specific prior written permission. +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE.