Skip to content

Commit

Permalink
Better MySQL/Tunnel control (1403/1407) (#570)
Browse files Browse the repository at this point in the history
* stashing

* Target loader now accepts experiments marked as 'manual'

* Attempt to reduce pop-up "flicker" (1403) (#569)

* fix: Attempt to debug timeout errors

* fix: More logging on service_query

* fix: Varioustimeout adjustments

* fix: Removed exception during timeout

* fix: Explit log on SSH connection error

* fix: Retry attempts for MySQL connections

* fix: Service timeout now 28 (was 17)

* fix: Add pymysql read and write timeouts

* fix: Quiter (expected) connection failure handling

* fix: TIMEOUT now DEGRADED

* fix: Fix while loop exit conditions

* fix: Better loop logic

* style: services logging reduced and back to debug

* fix: SSHTunnel logging now ERROR (was DEBUG)

* fix: Quieter securty

* fix: More failures permitted (and debug tweaks)

* fix: Leaner logging

* fix: Leaner logging (only report when we're having topruble)

* fix: Better constant name

* fix: Reduced service logging

* docs: Doc tweak

* fix: Minor log tweak

* fix: Fixed duplicate log content

---------

Co-authored-by: Alan Christie <[email protected]>

---------

Co-authored-by: Kalev Takkis <[email protected]>
Co-authored-by: Alan Christie <[email protected]>
  • Loading branch information
3 people authored Apr 5, 2024
1 parent ee5fd26 commit 3f59569
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 51 deletions.
75 changes: 62 additions & 13 deletions api/remote_ispyb_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,21 @@
ISPyBNoResultException,
ISPyBRetrieveFailed,
)
from pymysql.err import OperationalError

logger: logging.Logger = logging.getLogger(__name__)

# Timeout to allow the pymysql.connect() method to connect to the DB.
# The default, if not specified, is 10 seconds.
PYMYSQL_CONNECT_TIMEOUT_S = 3
PYMYSQL_READ_TIMEOUT_S = 3
PYMYSQL_WRITE_TIMEOUT_S = 10
# MySQL DB connection attempts.
# An attempt to cope with intermittent OperationalError exceptions
# that are seen to occur at "busy times". See m2ms-1403.
PYMYSQL_OE_RECONNECT_ATTEMPTS = 5
PYMYSQL_EXCEPTION_RECONNECT_DELAY_S = 1


class SSHConnector(Connector):
def __init__(
Expand Down Expand Up @@ -53,6 +65,7 @@ def __init__(
'db_pass': pw,
'db_name': db,
}
logger.debug("Creating remote connector: %s", creds)
self.remote_connect(**creds)
logger.debug(
"Started remote ssh_host=%s ssh_user=%s local_bind_port=%s",
Expand All @@ -61,6 +74,7 @@ def __init__(
self.server.local_bind_port,
)
else:
logger.debug("Creating connector")
self.connect(
user=user,
pw=pw,
Expand All @@ -83,9 +97,9 @@ def remote_connect(
db_pass,
db_name,
):
sshtunnel.SSH_TIMEOUT = 10.0
sshtunnel.TUNNEL_TIMEOUT = 10.0
sshtunnel.DEFAULT_LOGLEVEL = logging.CRITICAL
sshtunnel.SSH_TIMEOUT = 5.0
sshtunnel.TUNNEL_TIMEOUT = 5.0
sshtunnel.DEFAULT_LOGLEVEL = logging.ERROR
self.conn_inactivity = int(self.conn_inactivity)

if ssh_pkey:
Expand Down Expand Up @@ -122,20 +136,55 @@ def remote_connect(
self.server.start()
logger.debug('Started SSH server')

logger.debug('Connecting to ISPyB (db_user=%s db_name=%s)...', db_user, db_name)
self.conn = pymysql.connect(
user=db_user,
password=db_pass,
host='127.0.0.1',
port=self.server.local_bind_port,
database=db_name,
)
# Try to connect to the database
# a number of times (because it is known to fail)
# before giving up...
connect_attempts = 0
self.conn = None
while self.conn is None and connect_attempts < PYMYSQL_OE_RECONNECT_ATTEMPTS:
try:
self.conn = pymysql.connect(
user=db_user,
password=db_pass,
host='127.0.0.1',
port=self.server.local_bind_port,
database=db_name,
connect_timeout=PYMYSQL_CONNECT_TIMEOUT_S,
read_timeout=PYMYSQL_READ_TIMEOUT_S,
write_timeout=PYMYSQL_WRITE_TIMEOUT_S,
)
except OperationalError as oe_e:
if connect_attempts == 0:
# So we only log our connection attempts once
# an error has occurred - to avoid flooding the log
logger.info(
'Connecting to MySQL database (db_user=%s db_name=%s)...',
db_user,
db_name,
)
logger.warning('%s', repr(oe_e))
connect_attempts += 1
time.sleep(PYMYSQL_EXCEPTION_RECONNECT_DELAY_S)
except Exception as e:
if connect_attempts == 0:
# So we only log our connection attempts once
# an error has occurred - to avoid flooding the log
logger.info(
'Connecting to MySQL database (db_user=%s db_name=%s)...',
db_user,
db_name,
)
logger.warning('Unexpected %s', repr(e))
connect_attempts += 1
time.sleep(PYMYSQL_EXCEPTION_RECONNECT_DELAY_S)

if self.conn is not None:
logger.debug('Connected')
if connect_attempts > 0:
logger.info('Connected')
self.conn.autocommit = True
else:
logger.debug('Failed to connect')
if connect_attempts > 0:
logger.info('Failed to connect')
self.server.stop()
raise ISPyBConnectionException
self.last_activity_ts = time.time()
Expand Down
16 changes: 12 additions & 4 deletions api/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,22 @@ def get_remote_conn(force_error_display=False) -> Optional[SSHConnector]:
# If a host is not defined other properties are useless.
if not credentials["host"]:
if logging.DEBUG >= logger.level or force_error_display:
logger.info("No ISPyB host - cannot return a connector")
logger.debug("No ISPyB host - cannot return a connector")
return None

# Try to get an SSH connection (aware that it might fail)
logger.debug("Creating remote connector with credentials: %s", credentials)
conn: Optional[SSHConnector] = None
try:
conn = SSHConnector(**credentials)
except Exception:
# Log the exception if DEBUG level or lower/finer?
# The following will not log if the level is set to INFO for example.
if logging.DEBUG >= logger.level or force_error_display:
logger.info("credentials=%s", credentials)
logger.exception("Got the following exception creating SSHConnector...")
logger.exception("Got the following exception creating Connector...")
if conn:
logger.debug("Got remote connector")
else:
logger.debug("Failed to get a remote connector")

return conn

Expand All @@ -106,6 +109,7 @@ def get_conn(force_error_display=False) -> Optional[Connector]:
logger.info("No ISPyB host - cannot return a connector")
return None

logger.info("Creating connector with credentials: %s", credentials)
conn: Optional[Connector] = None
try:
conn = Connector(**credentials)
Expand All @@ -115,6 +119,10 @@ def get_conn(force_error_display=False) -> Optional[Connector]:
if logging.DEBUG >= logger.level or force_error_display:
logger.info("credentials=%s", credentials)
logger.exception("Got the following exception creating Connector...")
if conn:
logger.debug("Got connector")
else:
logger.debug("Did not get a connector")

return conn

Expand Down
20 changes: 13 additions & 7 deletions viewer/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@

logger = logging.getLogger(__name__)

DELAY = 10
# Service query timeout
SERVICE_QUERY_TIMEOUT_S = 28
# Default timeout for any request calls
# Used for keycloak atm.
REQUEST_TIMEOUT_S = 5

_NEO4J_LOCATION: str = settings.NEO4J_QUERY
_NEO4J_AUTH: str = settings.NEO4J_AUTH


# TIMEOUT is no longer used.
# A service timeout is considered a service that is degraded
class State(str, Enum):
NOT_CONFIGURED = "NOT_CONFIGURED"
DEGRADED = "DEGRADED"
Expand Down Expand Up @@ -122,7 +126,7 @@ async def wrapper_service_query(*args, **kwargs):
# TimeoutError is not caught
executor = futures.ThreadPoolExecutor()
try:
async with asyncio.timeout(DELAY):
async with asyncio.timeout(SERVICE_QUERY_TIMEOUT_S):
future = loop.run_in_executor(
executor, functools.partial(func, *args, **kwargs)
)
Expand All @@ -134,18 +138,20 @@ async def wrapper_service_query(*args, **kwargs):

except TimeoutError:
# Timeout is an "expected" condition for a service that's expected
# to be running but may be degraded so we don't log it unless debugging.
logger.debug("Service query '%s' timed out", func.__name__)
state = State.TIMEOUT
# to be running but is taking too long to report its state
# and is also considered DEGRADED.
state = State.DEGRADED
except Exception as exc:
# unknown error with the query
logger.exception(exc, exc_info=True)
state = State.ERROR

# name and ID are 1nd and 0th params respectively.
# alternative solution for this would be to return just a
# ID and Name are the 1st and 2nd params respectively.
# Alternative solution for this would be to return just a
# state and have the service_queries() map the results to the
# correct values
if state not in [State.OK, State.NOT_CONFIGURED]:
logger.info('"%s" is %s', args[1], state.name)
return {"id": args[0], "name": args[1], "state": state}

return wrapper_service_query
Expand Down
65 changes: 38 additions & 27 deletions viewer/target_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@
# assemblies and xtalforms
XTALFORMS_FILE = "assemblies.yaml"

# holding horses for now
# # assigned xtalforms, not all are referenced in meta_aligner
# ASSIGNED_XTALFORMS_FILE = "assigned_xtalforms.yaml"

# target name, nothing else
CONFIG_FILE = "config*.yaml"

Expand Down Expand Up @@ -737,9 +741,6 @@ def process_experiment(
logger.debug("incoming data: %s", item_data)
experiment_name, data = item_data

if "aligned_files" not in data.keys():
return None

extract = functools.partial(
self._extract,
data=data,
Expand Down Expand Up @@ -1480,7 +1481,7 @@ def process_bundle(self):
self.report.log(logging.ERROR, msg)
raise FileExistsError(msg)

if project_created and committer.pk == settings.ANONYMOUS_USER:
if project_created and self.project.title in settings.PUBLIC_TAS_LIST: # type: ignore[attr-defined]
assert self.project
self.project.open_to_public = True
self.project.save()
Expand Down Expand Up @@ -1694,35 +1695,45 @@ def process_bundle(self):
# memo to self: there used to be some code to test the
# position of the iterator in existing entries. This
# was because it was assumed, that when adding v2
# uploads, it can bring a long new observations under
# uploads, it can bring along new observations under
# existing experiment. Following discussions with
# Conor, it seems that this will not be the case. But
# should it agin be, this code was deleted on
# 2024-03-04, if you need to check

for so in so_group.filter(code__isnull=True):
code_prefix = experiment_objects[so.experiment.code].index_data[
"code_prefix"
]
# iter_pos = next(suffix)
# code = f"{code_prefix}{so.experiment.code.split('-')[1]}{iter_pos}"
code = (
f"{code_prefix}{so.experiment.code.split('-')[1]}{next(suffix)}"
)

# test uniqueness for target
# TODO: this should ideally be solved by db engine, before
# rushing to write the trigger, have think about the
# loader concurrency situations
if SiteObservation.objects.filter(
experiment__experiment_upload__target=self.target,
code=code,
).exists():
msg = (
f"short code {code} already exists for this target; "
+ "specify a code_prefix to resolve this conflict"
)
self.report.log(logging.ERROR, msg)
if so.experiment.type == 1:
# manual. code is pdb code
code = f"{so.experiment.code}-{next(suffix)}"
# NB! at the time of writing this piece of
# code, I haven't seen an example of the data
# so I only have a very vague idea how this is
# going to work. The way I understand it now,
# they cannot belong to separate groups so
# there's no need for different iterators. But
# could be I need to split them up
else:
# model building. generate code
code_prefix = experiment_objects[so.experiment.code].index_data[
"code_prefix"
]
# iter_pos = next(suffix)
# code = f"{code_prefix}{so.experiment.code.split('-')[1]}{iter_pos}"
code = f"{code_prefix}{so.experiment.code.split('-')[1]}{next(suffix)}"

# test uniqueness for target
# TODO: this should ideally be solved by db engine, before
# rushing to write the trigger, have think about the
# loader concurrency situations
if SiteObservation.objects.filter(
experiment__experiment_upload__target=self.target,
code=code,
).exists():
msg = (
f"short code {code} already exists for this target; "
+ "specify a code_prefix to resolve this conflict"
)
self.report.log(logging.ERROR, msg)

so.code = code
so.save()
Expand Down

0 comments on commit 3f59569

Please sign in to comment.