diff --git a/sheepdog/transactions/upload/__init__.py b/sheepdog/transactions/upload/__init__.py index 60facf19..ba378679 100644 --- a/sheepdog/transactions/upload/__init__.py +++ b/sheepdog/transactions/upload/__init__.py @@ -10,6 +10,10 @@ import flask import lxml +import psqlgraph +from sqlalchemy.exc import IntegrityError +from gdcdictionary import gdcdictionary + from sheepdog import auth from sheepdog import utils from sheepdog.errors import ParsingError, SchemaError, UnsupportedError, UserError @@ -32,8 +36,28 @@ def single_transaction_worker(transaction, *doc_args): transaction.flush() transaction.post_validate() transaction.commit() - except HandledIntegrityError: - pass + except IntegrityError: + transaction.session.rollback() + transaction.integrity_check() + # for entity in transaction.valid_entities: + # schema = gdcdictionary.schema[entity.node.label] + # node = entity.node + # for keys in schema['uniqueKeys']: + # props = {} + # if keys == ['id']: + # continue + # for key in keys: + # prop = schema['properties'][key].get('systemAlias') + # if prop: + # props[prop] = node[prop] + # else: + # props[key] = node[key] + # if transaction.db_driver.nodes(type(node)).props(props).count() > 0: + # entity.record_error( + # '{} with {} already exists in the DB' + # .format(node.label, props), keys=props.keys() + # ) + except UserError as e: transaction.record_user_error(e) raise @@ -42,6 +66,7 @@ def single_transaction_worker(transaction, *doc_args): finally: response = transaction.json code = transaction.status_code + return response, code diff --git a/sheepdog/transactions/upload/transaction.py b/sheepdog/transactions/upload/transaction.py index 06999f63..ec4721d8 100644 --- a/sheepdog/transactions/upload/transaction.py +++ b/sheepdog/transactions/upload/transaction.py @@ -10,6 +10,8 @@ from sqlalchemy.exc import IntegrityError from sqlalchemy.orm.attributes import flag_modified +from gdcdictionary import gdcdictionary + from sheepdog.auth import dbgap from sheepdog import models from sheepdog import utils @@ -160,6 +162,27 @@ def instantiate(self): for entity in self.valid_entities: entity.instantiate() + def integrity_check(self): + for entity in self.valid_entities: + schema = gdcdictionary.schema[entity.node.label] + node = entity.node + for keys in schema['uniqueKeys']: + props = {} + if keys == ['id']: + continue + for key in keys: + prop = schema['properties'][key].get('systemAlias') + if prop: + props[prop] = node[prop] + else: + props[key] = node[key] + if self.db_driver.nodes(type(node)).props(props).count() > 0: + entity.record_error( + '{} with {} already exists in the DB' + .format(node.label, props), keys=props.keys() + ) + + def create_links(self): """Construct edges between all transaction entities.""" for entity in self.valid_entities: @@ -178,43 +201,7 @@ def flush(self): """ for entity in self.valid_entities: entity.flush_to_session() - try: - self.session.flush() - except IntegrityError as e: - # don't handle non-unique constraint errors - if "duplicate key value violates unique constraint" not in e.message: - raise - values = VALUES_REGEXP.findall(e.message) - if not values: - raise - values = [v.strip() for v in values[0].split(",")] - keys = KEYS_REGEXP.findall(e.message) - if len(keys) == len(values): - values = dict(zip(keys, values)) - entities = [] - label = None - for en in self.valid_entities: - for k, v in values.items(): - if getattr(en.node, k, None) != v: - break - else: - if label and label != en.node.label: - break - entities.append(en) - label = en.node.label - else: # pylint: disable=useless-else-on-loop - # https://github.com/PyCQA/pylint/pull/2760 - for entity in entities: - entity.record_error( - "{} with {} already exists".format( - entity.node.label, values - ), - keys=keys, - ) - if entities: - raise HandledIntegrityError() - self.record_error("{} already exists".format(values)) - raise HandledIntegrityError() + self.session.flush() @property def status_code(self): diff --git a/tests/integration/datadict/submission/test_endpoints.py b/tests/integration/datadict/submission/test_endpoints.py index cfeab2cf..f5683304 100644 --- a/tests/integration/datadict/submission/test_endpoints.py +++ b/tests/integration/datadict/submission/test_endpoints.py @@ -16,6 +16,7 @@ from datamodelutils import models as md from flask import g from moto import mock_s3 +from sqlalchemy.exc import IntegrityError from sheepdog.errors import HandledIntegrityError from sheepdog.globals import ROLES @@ -861,6 +862,8 @@ def test_duplicate_submission(app, pg_driver, cgci_blgsp, submitter): external_proxies=get_external_proxies(), db_driver=pg_driver, ) for _ in range(2)] + + response = "" with pg_driver.session_scope(can_inherit=False) as s1: with utx1: utx1.parse_doc(*doc_args) @@ -880,14 +883,33 @@ def test_duplicate_submission(app, pg_driver, cgci_blgsp, submitter): try: with pg_driver.session_scope(session=s1): utx1.flush() - - with pg_driver.session_scope(session=s1): - utx1.post_validate() - - with pg_driver.session_scope(session=s1): - utx1.commit() - except HandledIntegrityError: - pass + except IntegrityError: + s1.rollback() + from gdcdictionary import gdcdictionary + for entity in utx1.valid_entities: + schema = gdcdictionary.schema[entity.node.label] + node = entity.node + for keys in schema['uniqueKeys']: + props = {} + if keys == ['id']: + continue + for key in keys: + prop = schema['properties'][key].get('systemAlias') + if prop: + props[prop] = node[prop] + else: + props[key] = node[key] + if utx1.db_driver.nodes(type(node)).props(props).count() > 0: + entity.record_error( + '{} with {} already exists in the DB' + .format(node.label, props), keys=props.keys() + ) + response = utx1.json + + + assert response["entity_error_count"]==1 + assert response["code"]==400 + assert response['entities'][0]['errors'][0]['message'] == "experiment with {'project_id': 'CGCI-BLGSP', 'submitter_id': 'BLGSP-71-06-00019'} already exists in the DB" with pg_driver.session_scope(): assert pg_driver.nodes(md.Experiment).count() == 1