From 81b2e9838a9d246bcf7d4439c9c9ad956dc4e9cd Mon Sep 17 00:00:00 2001 From: Giang Bui Date: Wed, 22 May 2019 14:10:12 -0500 Subject: [PATCH 1/5] feat(integrity): integrity handling --- sheepdog/transactions/upload/__init__.py | 28 +++++++++++++-- sheepdog/transactions/upload/transaction.py | 38 +-------------------- 2 files changed, 27 insertions(+), 39 deletions(-) diff --git a/sheepdog/transactions/upload/__init__.py b/sheepdog/transactions/upload/__init__.py index 60facf19..b1078453 100644 --- a/sheepdog/transactions/upload/__init__.py +++ b/sheepdog/transactions/upload/__init__.py @@ -10,6 +10,10 @@ import flask import lxml +import psqlgraph +from sqlalchemy.exc import IntegrityError +from gdcdictionary import gdcdictionary + from sheepdog import auth from sheepdog import utils from sheepdog.errors import ParsingError, SchemaError, UnsupportedError, UserError @@ -32,8 +36,27 @@ def single_transaction_worker(transaction, *doc_args): transaction.flush() transaction.post_validate() transaction.commit() - except HandledIntegrityError: - pass + except IntegrityError: + transaction.session.rollback() + for entity in transaction.valid_entities: + schema = gdcdictionary.schema[entity.node.label] + node = entity.node + for keys in schema['uniqueKeys']: + props = {} + if keys == ['id']: + continue + for key in keys: + prop = schema['properties'][key].get('systemAlias') + if prop: + props[prop] = node[prop] + else: + props[key] = node[key] + if graph.nodes(type(node)).props(props).count() > 0: + entity.record_error( + '{} with {} already exists in the GDC' + .format(node.label, props), keys=props.keys() + ) + except UserError as e: transaction.record_user_error(e) raise @@ -42,6 +65,7 @@ def single_transaction_worker(transaction, *doc_args): finally: response = transaction.json code = transaction.status_code + return response, code diff --git a/sheepdog/transactions/upload/transaction.py b/sheepdog/transactions/upload/transaction.py index 06999f63..39bf58fe 100644 --- a/sheepdog/transactions/upload/transaction.py +++ b/sheepdog/transactions/upload/transaction.py @@ -178,43 +178,7 @@ def flush(self): """ for entity in self.valid_entities: entity.flush_to_session() - try: - self.session.flush() - except IntegrityError as e: - # don't handle non-unique constraint errors - if "duplicate key value violates unique constraint" not in e.message: - raise - values = VALUES_REGEXP.findall(e.message) - if not values: - raise - values = [v.strip() for v in values[0].split(",")] - keys = KEYS_REGEXP.findall(e.message) - if len(keys) == len(values): - values = dict(zip(keys, values)) - entities = [] - label = None - for en in self.valid_entities: - for k, v in values.items(): - if getattr(en.node, k, None) != v: - break - else: - if label and label != en.node.label: - break - entities.append(en) - label = en.node.label - else: # pylint: disable=useless-else-on-loop - # https://github.com/PyCQA/pylint/pull/2760 - for entity in entities: - entity.record_error( - "{} with {} already exists".format( - entity.node.label, values - ), - keys=keys, - ) - if entities: - raise HandledIntegrityError() - self.record_error("{} already exists".format(values)) - raise HandledIntegrityError() + self.session.flush() @property def status_code(self): From 0e1188e9fe00fddc48935c7f19bf9eb2b79d56b7 Mon Sep 17 00:00:00 2001 From: Giang Bui Date: Wed, 22 May 2019 14:36:49 -0500 Subject: [PATCH 2/5] fix(unittest): fix unittest --- sheepdog/transactions/upload/__init__.py | 4 +- .../datadict/submission/test_endpoints.py | 40 ++++++++++++++----- 2 files changed, 33 insertions(+), 11 deletions(-) diff --git a/sheepdog/transactions/upload/__init__.py b/sheepdog/transactions/upload/__init__.py index b1078453..f334e918 100644 --- a/sheepdog/transactions/upload/__init__.py +++ b/sheepdog/transactions/upload/__init__.py @@ -51,9 +51,9 @@ def single_transaction_worker(transaction, *doc_args): props[prop] = node[prop] else: props[key] = node[key] - if graph.nodes(type(node)).props(props).count() > 0: + if transaction.db_driver.nodes(type(node)).props(props).count() > 0: entity.record_error( - '{} with {} already exists in the GDC' + '{} with {} already exists in the DB' .format(node.label, props), keys=props.keys() ) diff --git a/tests/integration/datadict/submission/test_endpoints.py b/tests/integration/datadict/submission/test_endpoints.py index cfeab2cf..c8975a90 100644 --- a/tests/integration/datadict/submission/test_endpoints.py +++ b/tests/integration/datadict/submission/test_endpoints.py @@ -16,6 +16,7 @@ from datamodelutils import models as md from flask import g from moto import mock_s3 +from sqlalchemy.exc import IntegrityError from sheepdog.errors import HandledIntegrityError from sheepdog.globals import ROLES @@ -861,6 +862,8 @@ def test_duplicate_submission(app, pg_driver, cgci_blgsp, submitter): external_proxies=get_external_proxies(), db_driver=pg_driver, ) for _ in range(2)] + + response = "" with pg_driver.session_scope(can_inherit=False) as s1: with utx1: utx1.parse_doc(*doc_args) @@ -876,18 +879,37 @@ def test_duplicate_submission(app, pg_driver, cgci_blgsp, submitter): with pg_driver.session_scope(session=s2): utx2.commit() - + try: with pg_driver.session_scope(session=s1): utx1.flush() - - with pg_driver.session_scope(session=s1): - utx1.post_validate() - - with pg_driver.session_scope(session=s1): - utx1.commit() - except HandledIntegrityError: - pass + except IntegrityError: + utx1.session.rollback() + from gdcdictionary import gdcdictionary + for entity in utx1.valid_entities: + schema = gdcdictionary.schema[entity.node.label] + node = entity.node + for keys in schema['uniqueKeys']: + props = {} + if keys == ['id']: + continue + for key in keys: + prop = schema['properties'][key].get('systemAlias') + if prop: + props[prop] = node[prop] + else: + props[key] = node[key] + if utx1.db_driver.nodes(type(node)).props(props).count() > 0: + entity.record_error( + '{} with {} already exists in the DB' + .format(node.label, props), keys=props.keys() + ) + response = utx1.json + + + assert response["entity_error_count"]==1 + assert response["code"]==400 + assert response['entities'][0]['errors'][0]['message'] == "experiment with {'project_id': 'CGCI-BLGSP', 'submitter_id': 'BLGSP-71-06-00019'} already exists in the DB" with pg_driver.session_scope(): assert pg_driver.nodes(md.Experiment).count() == 1 From ec63e3e879a36ecda84ac8a3108a73bf77a29026 Mon Sep 17 00:00:00 2001 From: Giang Bui Date: Wed, 22 May 2019 17:44:50 -0500 Subject: [PATCH 3/5] fix(codacy): fix codacy --- sheepdog/transactions/upload/__init__.py | 2 +- tests/integration/datadict/submission/test_endpoints.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sheepdog/transactions/upload/__init__.py b/sheepdog/transactions/upload/__init__.py index f334e918..c97b39bf 100644 --- a/sheepdog/transactions/upload/__init__.py +++ b/sheepdog/transactions/upload/__init__.py @@ -56,7 +56,7 @@ def single_transaction_worker(transaction, *doc_args): '{} with {} already exists in the DB' .format(node.label, props), keys=props.keys() ) - + except UserError as e: transaction.record_user_error(e) raise diff --git a/tests/integration/datadict/submission/test_endpoints.py b/tests/integration/datadict/submission/test_endpoints.py index c8975a90..9a7f0c4f 100644 --- a/tests/integration/datadict/submission/test_endpoints.py +++ b/tests/integration/datadict/submission/test_endpoints.py @@ -879,7 +879,7 @@ def test_duplicate_submission(app, pg_driver, cgci_blgsp, submitter): with pg_driver.session_scope(session=s2): utx2.commit() - + try: with pg_driver.session_scope(session=s1): utx1.flush() From 7ed5afb5bcde1bae43401ecdd0732ff71baec7fb Mon Sep 17 00:00:00 2001 From: Giang Bui Date: Thu, 23 May 2019 18:55:08 -0500 Subject: [PATCH 4/5] fix(session): close session --- tests/integration/datadict/submission/test_endpoints.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/datadict/submission/test_endpoints.py b/tests/integration/datadict/submission/test_endpoints.py index 9a7f0c4f..f5683304 100644 --- a/tests/integration/datadict/submission/test_endpoints.py +++ b/tests/integration/datadict/submission/test_endpoints.py @@ -884,7 +884,7 @@ def test_duplicate_submission(app, pg_driver, cgci_blgsp, submitter): with pg_driver.session_scope(session=s1): utx1.flush() except IntegrityError: - utx1.session.rollback() + s1.rollback() from gdcdictionary import gdcdictionary for entity in utx1.valid_entities: schema = gdcdictionary.schema[entity.node.label] From e9a145f1136572f5fb3be3a6b04d96f42848b905 Mon Sep 17 00:00:00 2001 From: Giang Bui Date: Fri, 24 May 2019 00:24:49 -0500 Subject: [PATCH 5/5] chore(code): refactor --- sheepdog/transactions/upload/__init__.py | 37 +++++++++++---------- sheepdog/transactions/upload/transaction.py | 23 +++++++++++++ 2 files changed, 42 insertions(+), 18 deletions(-) diff --git a/sheepdog/transactions/upload/__init__.py b/sheepdog/transactions/upload/__init__.py index c97b39bf..ba378679 100644 --- a/sheepdog/transactions/upload/__init__.py +++ b/sheepdog/transactions/upload/__init__.py @@ -38,24 +38,25 @@ def single_transaction_worker(transaction, *doc_args): transaction.commit() except IntegrityError: transaction.session.rollback() - for entity in transaction.valid_entities: - schema = gdcdictionary.schema[entity.node.label] - node = entity.node - for keys in schema['uniqueKeys']: - props = {} - if keys == ['id']: - continue - for key in keys: - prop = schema['properties'][key].get('systemAlias') - if prop: - props[prop] = node[prop] - else: - props[key] = node[key] - if transaction.db_driver.nodes(type(node)).props(props).count() > 0: - entity.record_error( - '{} with {} already exists in the DB' - .format(node.label, props), keys=props.keys() - ) + transaction.integrity_check() + # for entity in transaction.valid_entities: + # schema = gdcdictionary.schema[entity.node.label] + # node = entity.node + # for keys in schema['uniqueKeys']: + # props = {} + # if keys == ['id']: + # continue + # for key in keys: + # prop = schema['properties'][key].get('systemAlias') + # if prop: + # props[prop] = node[prop] + # else: + # props[key] = node[key] + # if transaction.db_driver.nodes(type(node)).props(props).count() > 0: + # entity.record_error( + # '{} with {} already exists in the DB' + # .format(node.label, props), keys=props.keys() + # ) except UserError as e: transaction.record_user_error(e) diff --git a/sheepdog/transactions/upload/transaction.py b/sheepdog/transactions/upload/transaction.py index 39bf58fe..ec4721d8 100644 --- a/sheepdog/transactions/upload/transaction.py +++ b/sheepdog/transactions/upload/transaction.py @@ -10,6 +10,8 @@ from sqlalchemy.exc import IntegrityError from sqlalchemy.orm.attributes import flag_modified +from gdcdictionary import gdcdictionary + from sheepdog.auth import dbgap from sheepdog import models from sheepdog import utils @@ -160,6 +162,27 @@ def instantiate(self): for entity in self.valid_entities: entity.instantiate() + def integrity_check(self): + for entity in self.valid_entities: + schema = gdcdictionary.schema[entity.node.label] + node = entity.node + for keys in schema['uniqueKeys']: + props = {} + if keys == ['id']: + continue + for key in keys: + prop = schema['properties'][key].get('systemAlias') + if prop: + props[prop] = node[prop] + else: + props[key] = node[key] + if self.db_driver.nodes(type(node)).props(props).count() > 0: + entity.record_error( + '{} with {} already exists in the DB' + .format(node.label, props), keys=props.keys() + ) + + def create_links(self): """Construct edges between all transaction entities.""" for entity in self.valid_entities: