From dbc0412443c7e37dca380bd8e08cf9b75224a20b Mon Sep 17 00:00:00 2001 From: "Alan B. Christie" <29806285+alanbchristie@users.noreply.github.com> Date: Fri, 9 Feb 2024 14:00:04 +0000 Subject: [PATCH] Fix cache update on first call (#524) * fix: Better public collection and fix for initial cache respopnse * fix: Fix initial cache setting & faster failure update * fix: Fix timeout reference --------- Co-authored-by: Alan Christie --- api/security.py | 169 ++++++++++++++++++++++++++--------------- fragalysis/settings.py | 2 +- 2 files changed, 109 insertions(+), 62 deletions(-) diff --git a/api/security.py b/api/security.py index 73533bb0..473146c6 100644 --- a/api/security.py +++ b/api/security.py @@ -1,7 +1,7 @@ # pylint: skip-file import logging import os -import time +from datetime import datetime, timedelta from pathlib import Path from typing import Any, Dict, Optional, Union from wsgiref.util import FileWrapper @@ -19,11 +19,17 @@ logger: logging.Logger = logging.getLogger(__name__) -# A list of cached security results. -# Results use the key 'RESULTS' and the collection time uses the key 'TIMESTAMP'. -USER_LIST_DICT: Dict[str, Any] = {} -# Period to cache user lists in seconds -USER_LIST_CACHE_SECONDS: int = settings.SECURITY_CONNECTOR_CACHE_MINUTES * 60 +# Sets of cached query results, indexed by username. +# The cache uses the key 'RESULTS' and the collection time uses the key 'TIMESTAMP'. +# and the time the cache is expires is in 'EXPIRES_AT' +USER_PROPOSAL_CACHE: Dict[str, Dict[str, Any]] = {} +# Period to cache user lists in seconds (on successful reads from the connector) +USER_PROPOSAL_CACHE_MAX_AGE: timedelta = timedelta( + minutes=settings.SECURITY_CONNECTOR_CACHE_MINUTES +) +# A short period, used when caching of results fails. +# This ensures a rapid retry on failure. +USER_PROPOSAL_CACHE_RETRY_TIMEOUT: timedelta = timedelta(seconds=7) # example test: # from rest_framework.test import APIRequestFactory @@ -151,47 +157,87 @@ def get_queryset(self): q_filter = self.get_q_filter(proposal_list) return self.queryset.filter(q_filter).distinct() - def get_open_proposals(self): + def _get_open_proposals(self): """ - Returns the list of proposals anybody can access. - They are defined via an environment variable - and are made available as a list of strings (Project titles) + Returns the set of proposals anybody can access. + These consist of any Projects that are marked "open_to_public" + and any defined via an environment variable. """ - return settings.PUBLIC_TAS_LIST + open_proposals = set( + Project.objects.filter(open_to_public=True).values_list("title", flat=True) + ) + open_proposals.update(settings.PUBLIC_TAS_LIST) + return open_proposals - def get_proposals_for_user_from_django(self, user): - # Get the list of proposals for the user + def _get_proposals_for_user_from_django(self, user): + prop_ids = set() + # Get the set() of proposals for the user if user.pk is None: logger.warning("user.pk is None") - return [] else: - prop_ids = list( + prop_ids.update( Project.objects.filter(user_id=user.pk).values_list("title", flat=True) ) - logger.debug( - "Got %s proposals for user %s: %s", + logger.info( + "Got %s proposals for '%s': %s", len(prop_ids), user.username, prop_ids, ) - return prop_ids - - def needs_updating(self, user): - """Returns true of the data collected for a user is out of date.""" - current_time = time.time() - if user.username not in USER_LIST_DICT: - USER_LIST_DICT[user.username] = {"RESULTS": [], "TIMESTAMP": current_time} - if ( - current_time - USER_LIST_DICT[user.username]["TIMESTAMP"] - >= USER_LIST_CACHE_SECONDS - ): - # Clear the cache (using the current time as the new timestamp) - USER_LIST_DICT[user.username]["TIMESTAMP"] = current_time - return True - # Cached results are still valid... - return False - - def run_query_with_connector(self, conn, user): + return prop_ids + + def _cache_needs_updating(self, user): + """True of the data for a user now needs to be collected + (e.g. the cache is out of date). The response is also True for the first + call for each user. When data is successfully collected you need to + call '_populate_cache()' with the user and new cache content. + This will set the cache content and the cache timestamp. + """ + now = datetime.now() + if user.username not in USER_PROPOSAL_CACHE: + # Unknown user - initilise the entry for this user. + # And make suer it immediately expires! + USER_PROPOSAL_CACHE[user.username] = { + "RESULTS": set(), + "TIMESTAMP": None, + "EXPIRES_AT": now, + } + + # Has the cache expired? + return now >= USER_PROPOSAL_CACHE[user.username]["EXPIRES_AT"] + + def _populate_cache(self, user, new_content): + """Called by code that collects content to replace the cache with new content, + this is typically from '_get_proposals_from_connector()'. The underlying map's + TIMESTAMP for the user will also be set (to 'now') to reflect the time the + cache was most recently populated. + """ + username = user.username + USER_PROPOSAL_CACHE[username]["RESULTS"] = new_content.copy() + # Set the timestamp (which records when the cache was populated with 'stuff') + # and ensure it will now expire after USER_PROPOSAL_CACHE_SECONDS. + now = datetime.now() + USER_PROPOSAL_CACHE[username]["TIMESTAMP"] = now + USER_PROPOSAL_CACHE[username]["EXPIRES_AT"] = now + USER_PROPOSAL_CACHE_MAX_AGE + logger.info( + "USER_PROPOSAL_CACHE populated for '%s' (expires at %s)", + username, + USER_PROPOSAL_CACHE[username]["EXPIRES_AT"], + ) + + def _mark_cache_collection_failure(self, user): + """Called by code that collects content to indicate that although the cache + should have been collected it has not (trough some other problem). + Under these circumstances the cache will not be updated but we have the opportunity + to set a new, short, 'expiry' time. In this way, cache collection will occur + again soon. The cache and its timestamp are left intact. + """ + now = datetime.now() + USER_PROPOSAL_CACHE[user.username]["EXPIRES_AT"] = ( + now + USER_PROPOSAL_CACHE_RETRY_TIMEOUT + ) + + def _run_query_with_connector(self, conn, user): core = conn.core try: rs = core.retrieve_sessions_for_person_login(user.username) @@ -204,22 +250,22 @@ def run_query_with_connector(self, conn, user): conn.server.stop() return rs - def get_proposals_for_user_from_ispyb(self, user): - # Read (update) the results for this user or just return the cache? - needs_updating = self.needs_updating(user) - logger.info("user=%s needs_updating=%s", user.username, needs_updating) - if needs_updating: + def _get_proposals_for_user_from_ispyb(self, user): + if self._cache_needs_updating(user): + logger.info("user='%s' (needs_updating)", user.username) if conn := get_configured_connector(): - logger.info("Got a connector for '%s'", user.username) + logger.debug("Got a connector for '%s'", user.username) self._get_proposals_from_connector(user, conn) else: - logger.info("Failed to get a connector for '%s'", user.username) + logger.warning("Failed to get a connector for '%s'", user.username) + self._mark_cache_collection_failure(user) - # The cache has either beb updated, has not changed or is empty. - # Return what wqe can. - cached_prop_ids = USER_LIST_DICT[user.username]["RESULTS"] + # The cache has either been updated, has not changed or is empty. + # Return what we have for the user. If required, public (open) proposals + # will be added to what we return. + cached_prop_ids = USER_PROPOSAL_CACHE[user.username]["RESULTS"] logger.info( - "Returning %s cached proposals for '%s': %s", + "Got %s proposals for '%s': %s", len(cached_prop_ids), user.username, cached_prop_ids, @@ -227,10 +273,13 @@ def get_proposals_for_user_from_ispyb(self, user): return cached_prop_ids def _get_proposals_from_connector(self, user, conn): + """Updates the USER_LIST_DICT with the results of a query + and marks it as populated. + """ assert user assert conn - rs = self.run_query_with_connector(conn=conn, user=user) + rs = self._run_query_with_connector(conn=conn, user=user) # Typically you'll find the following fields in each item # in the rs response: - @@ -275,16 +324,16 @@ def _get_proposals_from_connector(self, user, conn): # Always display the collected results for the user. # These will be cached. - logger.info( - "Got %s proposals from %s records for user %s: %s", + logger.debug( + "%s proposals from %s records for '%s': %s", len(prop_id_set), len(rs), user.username, prop_id_set, ) - # Replace the cache with what we've found - USER_LIST_DICT[user.username]["RESULTS"] = list(prop_id_set) + # Replace the cache with what we've collected + self._populate_cache(user, prop_id_set) def get_proposals_for_user(self, user, restrict_to_membership=False): """Returns a list of proposals that the user has access to. @@ -299,7 +348,7 @@ def get_proposals_for_user(self, user, restrict_to_membership=False): """ assert user - proposals = [] + proposals = set() ispyb_user = os.environ.get("ISPYB_USER") logger.debug( "ispyb_user=%s restrict_to_membership=%s", @@ -309,23 +358,21 @@ def get_proposals_for_user(self, user, restrict_to_membership=False): if ispyb_user: if user.is_authenticated: logger.info("Getting proposals from ISPyB...") - proposals = self.get_proposals_for_user_from_ispyb(user) + proposals = self._get_proposals_for_user_from_ispyb(user) else: username = user.username or "UNKNOWN" logger.info("No proposals (user '%s' is not authenticated)", username) else: logger.info("Getting proposals from Django...") - proposals = self.get_proposals_for_user_from_django(user) + proposals = self._get_proposals_for_user_from_django(user) - # We have all the proposals where the user has membership. + # We have all the proposals where the user has authortiy. + # Add open/public proposals? if not restrict_to_membership: - # Here we're not restricting proposals to those where the user is a member, - # so we add those projects/proposals that everyone has access to. - for open_proposal in self.get_open_proposals(): - if open_proposal not in proposals: - proposals.append(open_proposal) + proposals.update(self._get_open_proposals()) - return proposals + # Return the set() as a list() + return list(proposals) def get_q_filter(self, proposal_list): """Returns a Q expression representing a (potentially complex) table filter.""" diff --git a/fragalysis/settings.py b/fragalysis/settings.py index 02409d1e..93885b01 100644 --- a/fragalysis/settings.py +++ b/fragalysis/settings.py @@ -469,7 +469,7 @@ 'api.security': {'level': 'INFO'}, 'asyncio': {'level': 'WARNING'}, 'celery': {'level': 'INFO'}, - 'django': {'level': 'WARNING'}, + 'django': {'level': 'ERROR'}, 'mozilla_django_oidc': {'level': 'WARNING'}, 'urllib3': {'level': 'WARNING'}, 'paramiko': {'level': 'WARNING'},