diff --git a/api/remote_ispyb_connector.py b/api/remote_ispyb_connector.py index 398f3473..c27bb5a8 100644 --- a/api/remote_ispyb_connector.py +++ b/api/remote_ispyb_connector.py @@ -11,9 +11,21 @@ ISPyBNoResultException, ISPyBRetrieveFailed, ) +from pymysql.err import OperationalError logger: logging.Logger = logging.getLogger(__name__) +# Timeout to allow the pymysql.connect() method to connect to the DB. +# The default, if not specified, is 10 seconds. +PYMYSQL_CONNECT_TIMEOUT_S = 3 +PYMYSQL_READ_TIMEOUT_S = 3 +PYMYSQL_WRITE_TIMEOUT_S = 10 +# MySQL DB connection attempts. +# An attempt to cope with intermittent OperationalError exceptions +# that are seen to occur at "busy times". See m2ms-1403. +PYMYSQL_OE_RECONNECT_ATTEMPTS = 5 +PYMYSQL_EXCEPTION_RECONNECT_DELAY_S = 1 + class SSHConnector(Connector): def __init__( @@ -53,6 +65,7 @@ def __init__( 'db_pass': pw, 'db_name': db, } + logger.debug("Creating remote connector: %s", creds) self.remote_connect(**creds) logger.debug( "Started remote ssh_host=%s ssh_user=%s local_bind_port=%s", @@ -61,6 +74,7 @@ def __init__( self.server.local_bind_port, ) else: + logger.debug("Creating connector") self.connect( user=user, pw=pw, @@ -83,9 +97,9 @@ def remote_connect( db_pass, db_name, ): - sshtunnel.SSH_TIMEOUT = 10.0 - sshtunnel.TUNNEL_TIMEOUT = 10.0 - sshtunnel.DEFAULT_LOGLEVEL = logging.CRITICAL + sshtunnel.SSH_TIMEOUT = 5.0 + sshtunnel.TUNNEL_TIMEOUT = 5.0 + sshtunnel.DEFAULT_LOGLEVEL = logging.ERROR self.conn_inactivity = int(self.conn_inactivity) if ssh_pkey: @@ -122,20 +136,55 @@ def remote_connect( self.server.start() logger.debug('Started SSH server') - logger.debug('Connecting to ISPyB (db_user=%s db_name=%s)...', db_user, db_name) - self.conn = pymysql.connect( - user=db_user, - password=db_pass, - host='127.0.0.1', - port=self.server.local_bind_port, - database=db_name, - ) + # Try to connect to the database + # a number of times (because it is known to fail) + # before giving up... + connect_attempts = 0 + self.conn = None + while self.conn is None and connect_attempts < PYMYSQL_OE_RECONNECT_ATTEMPTS: + try: + self.conn = pymysql.connect( + user=db_user, + password=db_pass, + host='127.0.0.1', + port=self.server.local_bind_port, + database=db_name, + connect_timeout=PYMYSQL_CONNECT_TIMEOUT_S, + read_timeout=PYMYSQL_READ_TIMEOUT_S, + write_timeout=PYMYSQL_WRITE_TIMEOUT_S, + ) + except OperationalError as oe_e: + if connect_attempts == 0: + # So we only log our connection attempts once + # an error has occurred - to avoid flooding the log + logger.info( + 'Connecting to MySQL database (db_user=%s db_name=%s)...', + db_user, + db_name, + ) + logger.warning('%s', repr(oe_e)) + connect_attempts += 1 + time.sleep(PYMYSQL_EXCEPTION_RECONNECT_DELAY_S) + except Exception as e: + if connect_attempts == 0: + # So we only log our connection attempts once + # an error has occurred - to avoid flooding the log + logger.info( + 'Connecting to MySQL database (db_user=%s db_name=%s)...', + db_user, + db_name, + ) + logger.warning('Unexpected %s', repr(e)) + connect_attempts += 1 + time.sleep(PYMYSQL_EXCEPTION_RECONNECT_DELAY_S) if self.conn is not None: - logger.debug('Connected') + if connect_attempts > 0: + logger.info('Connected') self.conn.autocommit = True else: - logger.debug('Failed to connect') + if connect_attempts > 0: + logger.info('Failed to connect') self.server.stop() raise ISPyBConnectionException self.last_activity_ts = time.time() diff --git a/api/security.py b/api/security.py index 4c2afc19..8773aa31 100644 --- a/api/security.py +++ b/api/security.py @@ -72,19 +72,22 @@ def get_remote_conn(force_error_display=False) -> Optional[SSHConnector]: # If a host is not defined other properties are useless. if not credentials["host"]: if logging.DEBUG >= logger.level or force_error_display: - logger.info("No ISPyB host - cannot return a connector") + logger.debug("No ISPyB host - cannot return a connector") return None # Try to get an SSH connection (aware that it might fail) + logger.debug("Creating remote connector with credentials: %s", credentials) conn: Optional[SSHConnector] = None try: conn = SSHConnector(**credentials) except Exception: - # Log the exception if DEBUG level or lower/finer? - # The following will not log if the level is set to INFO for example. if logging.DEBUG >= logger.level or force_error_display: logger.info("credentials=%s", credentials) - logger.exception("Got the following exception creating SSHConnector...") + logger.exception("Got the following exception creating Connector...") + if conn: + logger.debug("Got remote connector") + else: + logger.debug("Failed to get a remote connector") return conn @@ -106,6 +109,7 @@ def get_conn(force_error_display=False) -> Optional[Connector]: logger.info("No ISPyB host - cannot return a connector") return None + logger.info("Creating connector with credentials: %s", credentials) conn: Optional[Connector] = None try: conn = Connector(**credentials) @@ -115,6 +119,10 @@ def get_conn(force_error_display=False) -> Optional[Connector]: if logging.DEBUG >= logger.level or force_error_display: logger.info("credentials=%s", credentials) logger.exception("Got the following exception creating Connector...") + if conn: + logger.debug("Got connector") + else: + logger.debug("Did not get a connector") return conn diff --git a/viewer/services.py b/viewer/services.py index 4e208f57..61fca9e4 100644 --- a/viewer/services.py +++ b/viewer/services.py @@ -15,14 +15,18 @@ logger = logging.getLogger(__name__) -DELAY = 10 +# Service query timeout +SERVICE_QUERY_TIMEOUT_S = 28 # Default timeout for any request calls +# Used for keycloak atm. REQUEST_TIMEOUT_S = 5 _NEO4J_LOCATION: str = settings.NEO4J_QUERY _NEO4J_AUTH: str = settings.NEO4J_AUTH +# TIMEOUT is no longer used. +# A service timeout is considered a service that is degraded class State(str, Enum): NOT_CONFIGURED = "NOT_CONFIGURED" DEGRADED = "DEGRADED" @@ -122,7 +126,7 @@ async def wrapper_service_query(*args, **kwargs): # TimeoutError is not caught executor = futures.ThreadPoolExecutor() try: - async with asyncio.timeout(DELAY): + async with asyncio.timeout(SERVICE_QUERY_TIMEOUT_S): future = loop.run_in_executor( executor, functools.partial(func, *args, **kwargs) ) @@ -134,18 +138,20 @@ async def wrapper_service_query(*args, **kwargs): except TimeoutError: # Timeout is an "expected" condition for a service that's expected - # to be running but may be degraded so we don't log it unless debugging. - logger.debug("Service query '%s' timed out", func.__name__) - state = State.TIMEOUT + # to be running but is taking too long to report its state + # and is also considered DEGRADED. + state = State.DEGRADED except Exception as exc: # unknown error with the query logger.exception(exc, exc_info=True) state = State.ERROR - # name and ID are 1nd and 0th params respectively. - # alternative solution for this would be to return just a + # ID and Name are the 1st and 2nd params respectively. + # Alternative solution for this would be to return just a # state and have the service_queries() map the results to the # correct values + if state not in [State.OK, State.NOT_CONFIGURED]: + logger.info('"%s" is %s', args[1], state.name) return {"id": args[0], "name": args[1], "state": state} return wrapper_service_query diff --git a/viewer/target_loader.py b/viewer/target_loader.py index 4e72cc8a..ba4aaa19 100644 --- a/viewer/target_loader.py +++ b/viewer/target_loader.py @@ -52,6 +52,10 @@ # assemblies and xtalforms XTALFORMS_FILE = "assemblies.yaml" +# holding horses for now +# # assigned xtalforms, not all are referenced in meta_aligner +# ASSIGNED_XTALFORMS_FILE = "assigned_xtalforms.yaml" + # target name, nothing else CONFIG_FILE = "config*.yaml" @@ -737,9 +741,6 @@ def process_experiment( logger.debug("incoming data: %s", item_data) experiment_name, data = item_data - if "aligned_files" not in data.keys(): - return None - extract = functools.partial( self._extract, data=data, @@ -1480,7 +1481,7 @@ def process_bundle(self): self.report.log(logging.ERROR, msg) raise FileExistsError(msg) - if project_created and committer.pk == settings.ANONYMOUS_USER: + if project_created and self.project.title in settings.PUBLIC_TAS_LIST: # type: ignore[attr-defined] assert self.project self.project.open_to_public = True self.project.save() @@ -1694,35 +1695,45 @@ def process_bundle(self): # memo to self: there used to be some code to test the # position of the iterator in existing entries. This # was because it was assumed, that when adding v2 - # uploads, it can bring a long new observations under + # uploads, it can bring along new observations under # existing experiment. Following discussions with # Conor, it seems that this will not be the case. But # should it agin be, this code was deleted on # 2024-03-04, if you need to check for so in so_group.filter(code__isnull=True): - code_prefix = experiment_objects[so.experiment.code].index_data[ - "code_prefix" - ] - # iter_pos = next(suffix) - # code = f"{code_prefix}{so.experiment.code.split('-')[1]}{iter_pos}" - code = ( - f"{code_prefix}{so.experiment.code.split('-')[1]}{next(suffix)}" - ) - - # test uniqueness for target - # TODO: this should ideally be solved by db engine, before - # rushing to write the trigger, have think about the - # loader concurrency situations - if SiteObservation.objects.filter( - experiment__experiment_upload__target=self.target, - code=code, - ).exists(): - msg = ( - f"short code {code} already exists for this target; " - + "specify a code_prefix to resolve this conflict" - ) - self.report.log(logging.ERROR, msg) + if so.experiment.type == 1: + # manual. code is pdb code + code = f"{so.experiment.code}-{next(suffix)}" + # NB! at the time of writing this piece of + # code, I haven't seen an example of the data + # so I only have a very vague idea how this is + # going to work. The way I understand it now, + # they cannot belong to separate groups so + # there's no need for different iterators. But + # could be I need to split them up + else: + # model building. generate code + code_prefix = experiment_objects[so.experiment.code].index_data[ + "code_prefix" + ] + # iter_pos = next(suffix) + # code = f"{code_prefix}{so.experiment.code.split('-')[1]}{iter_pos}" + code = f"{code_prefix}{so.experiment.code.split('-')[1]}{next(suffix)}" + + # test uniqueness for target + # TODO: this should ideally be solved by db engine, before + # rushing to write the trigger, have think about the + # loader concurrency situations + if SiteObservation.objects.filter( + experiment__experiment_upload__target=self.target, + code=code, + ).exists(): + msg = ( + f"short code {code} already exists for this target; " + + "specify a code_prefix to resolve this conflict" + ) + self.report.log(logging.ERROR, msg) so.code = code so.save()