Skip to content

Commit

Permalink
Fix cache update on first call (#524)
Browse files Browse the repository at this point in the history
* fix: Better public collection and fix for initial cache respopnse

* fix: Fix initial cache setting & faster failure update

* fix: Fix timeout reference

---------

Co-authored-by: Alan Christie <[email protected]>
  • Loading branch information
alanbchristie and Alan Christie authored Feb 9, 2024
1 parent 7016c1b commit dbc0412
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 62 deletions.
169 changes: 108 additions & 61 deletions api/security.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# pylint: skip-file
import logging
import os
import time
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Dict, Optional, Union
from wsgiref.util import FileWrapper
Expand All @@ -19,11 +19,17 @@

logger: logging.Logger = logging.getLogger(__name__)

# A list of cached security results.
# Results use the key 'RESULTS' and the collection time uses the key 'TIMESTAMP'.
USER_LIST_DICT: Dict[str, Any] = {}
# Period to cache user lists in seconds
USER_LIST_CACHE_SECONDS: int = settings.SECURITY_CONNECTOR_CACHE_MINUTES * 60
# Sets of cached query results, indexed by username.
# The cache uses the key 'RESULTS' and the collection time uses the key 'TIMESTAMP'.
# and the time the cache is expires is in 'EXPIRES_AT'
USER_PROPOSAL_CACHE: Dict[str, Dict[str, Any]] = {}
# Period to cache user lists in seconds (on successful reads from the connector)
USER_PROPOSAL_CACHE_MAX_AGE: timedelta = timedelta(
minutes=settings.SECURITY_CONNECTOR_CACHE_MINUTES
)
# A short period, used when caching of results fails.
# This ensures a rapid retry on failure.
USER_PROPOSAL_CACHE_RETRY_TIMEOUT: timedelta = timedelta(seconds=7)

# example test:
# from rest_framework.test import APIRequestFactory
Expand Down Expand Up @@ -151,47 +157,87 @@ def get_queryset(self):
q_filter = self.get_q_filter(proposal_list)
return self.queryset.filter(q_filter).distinct()

def get_open_proposals(self):
def _get_open_proposals(self):
"""
Returns the list of proposals anybody can access.
They are defined via an environment variable
and are made available as a list of strings (Project titles)
Returns the set of proposals anybody can access.
These consist of any Projects that are marked "open_to_public"
and any defined via an environment variable.
"""
return settings.PUBLIC_TAS_LIST
open_proposals = set(
Project.objects.filter(open_to_public=True).values_list("title", flat=True)
)
open_proposals.update(settings.PUBLIC_TAS_LIST)
return open_proposals

def get_proposals_for_user_from_django(self, user):
# Get the list of proposals for the user
def _get_proposals_for_user_from_django(self, user):
prop_ids = set()
# Get the set() of proposals for the user
if user.pk is None:
logger.warning("user.pk is None")
return []
else:
prop_ids = list(
prop_ids.update(
Project.objects.filter(user_id=user.pk).values_list("title", flat=True)
)
logger.debug(
"Got %s proposals for user %s: %s",
logger.info(
"Got %s proposals for '%s': %s",
len(prop_ids),
user.username,
prop_ids,
)
return prop_ids

def needs_updating(self, user):
"""Returns true of the data collected for a user is out of date."""
current_time = time.time()
if user.username not in USER_LIST_DICT:
USER_LIST_DICT[user.username] = {"RESULTS": [], "TIMESTAMP": current_time}
if (
current_time - USER_LIST_DICT[user.username]["TIMESTAMP"]
>= USER_LIST_CACHE_SECONDS
):
# Clear the cache (using the current time as the new timestamp)
USER_LIST_DICT[user.username]["TIMESTAMP"] = current_time
return True
# Cached results are still valid...
return False

def run_query_with_connector(self, conn, user):
return prop_ids

def _cache_needs_updating(self, user):
"""True of the data for a user now needs to be collected
(e.g. the cache is out of date). The response is also True for the first
call for each user. When data is successfully collected you need to
call '_populate_cache()' with the user and new cache content.
This will set the cache content and the cache timestamp.
"""
now = datetime.now()
if user.username not in USER_PROPOSAL_CACHE:
# Unknown user - initilise the entry for this user.
# And make suer it immediately expires!
USER_PROPOSAL_CACHE[user.username] = {
"RESULTS": set(),
"TIMESTAMP": None,
"EXPIRES_AT": now,
}

# Has the cache expired?
return now >= USER_PROPOSAL_CACHE[user.username]["EXPIRES_AT"]

def _populate_cache(self, user, new_content):
"""Called by code that collects content to replace the cache with new content,
this is typically from '_get_proposals_from_connector()'. The underlying map's
TIMESTAMP for the user will also be set (to 'now') to reflect the time the
cache was most recently populated.
"""
username = user.username
USER_PROPOSAL_CACHE[username]["RESULTS"] = new_content.copy()
# Set the timestamp (which records when the cache was populated with 'stuff')
# and ensure it will now expire after USER_PROPOSAL_CACHE_SECONDS.
now = datetime.now()
USER_PROPOSAL_CACHE[username]["TIMESTAMP"] = now
USER_PROPOSAL_CACHE[username]["EXPIRES_AT"] = now + USER_PROPOSAL_CACHE_MAX_AGE
logger.info(
"USER_PROPOSAL_CACHE populated for '%s' (expires at %s)",
username,
USER_PROPOSAL_CACHE[username]["EXPIRES_AT"],
)

def _mark_cache_collection_failure(self, user):
"""Called by code that collects content to indicate that although the cache
should have been collected it has not (trough some other problem).
Under these circumstances the cache will not be updated but we have the opportunity
to set a new, short, 'expiry' time. In this way, cache collection will occur
again soon. The cache and its timestamp are left intact.
"""
now = datetime.now()
USER_PROPOSAL_CACHE[user.username]["EXPIRES_AT"] = (
now + USER_PROPOSAL_CACHE_RETRY_TIMEOUT
)

def _run_query_with_connector(self, conn, user):
core = conn.core
try:
rs = core.retrieve_sessions_for_person_login(user.username)
Expand All @@ -204,33 +250,36 @@ def run_query_with_connector(self, conn, user):
conn.server.stop()
return rs

def get_proposals_for_user_from_ispyb(self, user):
# Read (update) the results for this user or just return the cache?
needs_updating = self.needs_updating(user)
logger.info("user=%s needs_updating=%s", user.username, needs_updating)
if needs_updating:
def _get_proposals_for_user_from_ispyb(self, user):
if self._cache_needs_updating(user):
logger.info("user='%s' (needs_updating)", user.username)
if conn := get_configured_connector():
logger.info("Got a connector for '%s'", user.username)
logger.debug("Got a connector for '%s'", user.username)
self._get_proposals_from_connector(user, conn)
else:
logger.info("Failed to get a connector for '%s'", user.username)
logger.warning("Failed to get a connector for '%s'", user.username)
self._mark_cache_collection_failure(user)

# The cache has either beb updated, has not changed or is empty.
# Return what wqe can.
cached_prop_ids = USER_LIST_DICT[user.username]["RESULTS"]
# The cache has either been updated, has not changed or is empty.
# Return what we have for the user. If required, public (open) proposals
# will be added to what we return.
cached_prop_ids = USER_PROPOSAL_CACHE[user.username]["RESULTS"]
logger.info(
"Returning %s cached proposals for '%s': %s",
"Got %s proposals for '%s': %s",
len(cached_prop_ids),
user.username,
cached_prop_ids,
)
return cached_prop_ids

def _get_proposals_from_connector(self, user, conn):
"""Updates the USER_LIST_DICT with the results of a query
and marks it as populated.
"""
assert user
assert conn

rs = self.run_query_with_connector(conn=conn, user=user)
rs = self._run_query_with_connector(conn=conn, user=user)

# Typically you'll find the following fields in each item
# in the rs response: -
Expand Down Expand Up @@ -275,16 +324,16 @@ def _get_proposals_from_connector(self, user, conn):

# Always display the collected results for the user.
# These will be cached.
logger.info(
"Got %s proposals from %s records for user %s: %s",
logger.debug(
"%s proposals from %s records for '%s': %s",
len(prop_id_set),
len(rs),
user.username,
prop_id_set,
)

# Replace the cache with what we've found
USER_LIST_DICT[user.username]["RESULTS"] = list(prop_id_set)
# Replace the cache with what we've collected
self._populate_cache(user, prop_id_set)

def get_proposals_for_user(self, user, restrict_to_membership=False):
"""Returns a list of proposals that the user has access to.
Expand All @@ -299,7 +348,7 @@ def get_proposals_for_user(self, user, restrict_to_membership=False):
"""
assert user

proposals = []
proposals = set()
ispyb_user = os.environ.get("ISPYB_USER")
logger.debug(
"ispyb_user=%s restrict_to_membership=%s",
Expand All @@ -309,23 +358,21 @@ def get_proposals_for_user(self, user, restrict_to_membership=False):
if ispyb_user:
if user.is_authenticated:
logger.info("Getting proposals from ISPyB...")
proposals = self.get_proposals_for_user_from_ispyb(user)
proposals = self._get_proposals_for_user_from_ispyb(user)
else:
username = user.username or "UNKNOWN"
logger.info("No proposals (user '%s' is not authenticated)", username)
else:
logger.info("Getting proposals from Django...")
proposals = self.get_proposals_for_user_from_django(user)
proposals = self._get_proposals_for_user_from_django(user)

# We have all the proposals where the user has membership.
# We have all the proposals where the user has authortiy.
# Add open/public proposals?
if not restrict_to_membership:
# Here we're not restricting proposals to those where the user is a member,
# so we add those projects/proposals that everyone has access to.
for open_proposal in self.get_open_proposals():
if open_proposal not in proposals:
proposals.append(open_proposal)
proposals.update(self._get_open_proposals())

return proposals
# Return the set() as a list()
return list(proposals)

def get_q_filter(self, proposal_list):
"""Returns a Q expression representing a (potentially complex) table filter."""
Expand Down
2 changes: 1 addition & 1 deletion fragalysis/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@
'api.security': {'level': 'INFO'},
'asyncio': {'level': 'WARNING'},
'celery': {'level': 'INFO'},
'django': {'level': 'WARNING'},
'django': {'level': 'ERROR'},
'mozilla_django_oidc': {'level': 'WARNING'},
'urllib3': {'level': 'WARNING'},
'paramiko': {'level': 'WARNING'},
Expand Down

0 comments on commit dbc0412

Please sign in to comment.