Skip to content

Commit

Permalink
Implement queuing priority #56
Browse files Browse the repository at this point in the history
  • Loading branch information
patrick-austin committed Dec 6, 2024
1 parent e1ac190 commit d061400
Show file tree
Hide file tree
Showing 9 changed files with 429 additions and 15 deletions.
17 changes: 17 additions & 0 deletions src/main/config/run.properties.example
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,24 @@ anonUserName=anon/anon
# Downloads based on their fileCount up to this limit. If a single Dataset has a fileCount
# greater than this limit, it will still be submitted in a part by itself.
queue.maxFileCount = 10000

# Limit the number maximum of active RESTORING downloads. Does not affect user submitted carts,
# but queued requests will only be started when there are less than this many RESTORING downloads.
# Negative values will start all queued jobs immediately, regardless of load.
queue.maxActiveDownloads = 10

# When queueing Downloads a positive priority will allow a User to proceed.
# Non-positive values will block that User from submitting a request to the queue.
# When automatically moving jobs from the queued to the PREPARING state, all Downloads
# from Users with priority 1 will be scheduled before 2 and so on.
# InstrumentScientists can either be identified for specific Instrument.names, or a global default
# InvestigationUsers can either be identified for specific InvestigationUser.roles, or a global default
# Authenticated Users without InstrumentScientist or InvestigationUser status will use the authenticated priority
# Anyone who does not meet a specific priority class will use the default
# Users meeting multiple criteria will use the highest priority available (lowest number)
queue.priority.instrumentScientist.instruments = {"ABC": 1}
queue.priority.instrumentScientist.default = 2
queue.priority.investigationUser.roles = {"ABC": 3}
queue.priority.investigationUser.default = 4
queue.priority.authenticated = 5
queue.priority.default = 0
56 changes: 55 additions & 1 deletion src/main/java/org/icatproject/topcat/IcatClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import java.util.List;
import java.util.ListIterator;
import java.util.ArrayList;

import java.util.Collections;
import java.net.URLEncoder;

import org.icatproject.topcat.httpclient.*;
Expand Down Expand Up @@ -269,6 +269,60 @@ public List<JsonObject> getEntities(String entityType, List<Long> entityIds) thr
return out;
}

/**
* @param userName ICAT User.name to check for access to the queue
* @throws TopcatException If the user has a non-positive priority value (or
* another internal error is triggered)
*/
public void checkQueueAllowed(String userName) throws TopcatException {
if (getQueuePriority(userName) < 1) {
throw new ForbiddenException("Queuing Downloads forbidden");
}
}

/**
* If explicitly set via InstrumentScientist or InvestigationUser mappings,
* the highest priority (lowest value) will be returned.
* Otherwise, if authenticated, the authenticated user default will be returned.
* Otherwise, global default will be returned.
*
* @param userName ICAT User.name to determine the queue priority of
* @return int representing the queue priority. <1 indicates disabled, >=1
* indicates enabled with higher values having lower priority.
* @throws TopcatException
*/
public int getQueuePriority(String userName) throws TopcatException {
PriorityMap priorityMap = PriorityMap.getInstance();
HashMap<Integer, String> mapping = priorityMap.getMapping();
List<Integer> keyList = new ArrayList<>(mapping.keySet());
Collections.sort(keyList);
for (Integer priority : keyList) {
if (checkUser(userName, mapping.get(priority)) > 0) {
return priority;
}
}

if (!userName.equals(Properties.getInstance().getProperty("anonUserName"))) {
return priorityMap.getAuthenticatedPriority();
} else {
return priorityMap.getDefaultPriority();
}
}

/**
* @param userName ICAT User.name to determine the queue priority of
* @param condition JPQL condition representing the possible ways a user can
* have priority
* @return size of the results, 0 means use did not have priority, 1 means they
* did
* @throws TopcatException
*/
int checkUser(String userName, String condition) throws TopcatException {
String query = "SELECT user FROM User user WHERE user.name = '" + userName + "' AND (" + condition + ")";
JsonArray results = submitQuery(query);
return results.size();
}

protected String[] getAdminUserNames() throws Exception {
return Properties.getInstance().getProperty("adminUserNames", "").split("([ ]*,[ ]*|[ ]+)");
}
Expand Down
145 changes: 145 additions & 0 deletions src/main/java/org/icatproject/topcat/PriorityMap.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package org.icatproject.topcat;

import java.io.ByteArrayInputStream;
import java.util.HashMap;

import org.icatproject.topcat.exceptions.InternalException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jakarta.json.Json;
import jakarta.json.JsonObject;
import jakarta.json.JsonReader;

public class PriorityMap {

private static PriorityMap instance = null;

public synchronized static PriorityMap getInstance() throws InternalException {
if (instance == null) {
instance = new PriorityMap();
}
return instance;
}

private int defaultPriority;
private int authenticatedPriority;
private HashMap<Integer, String> mapping = new HashMap<>();
private Logger logger = LoggerFactory.getLogger(PriorityMap.class);

public PriorityMap() {
Properties properties = Properties.getInstance();

String defaultString = properties.getProperty("queue.priority.default", "0");
defaultPriority = Integer.valueOf(defaultString);

String authenticatedString = properties.getProperty("queue.priority.authenticated", defaultString);
setAuthenticatedPriority(authenticatedString);

String property = "queue.priority.investigationUser.default";
String investigationUserString = properties.getProperty(property, authenticatedString);
updateMapping(Integer.valueOf(investigationUserString), "user.investigationUsers IS NOT EMPTY");

property = "queue.priority.instrumentScientist.default";
String instrumentScientistString = properties.getProperty(property, authenticatedString);
updateMapping(Integer.valueOf(instrumentScientistString), "user.instrumentScientists IS NOT EMPTY");

String investigationUserProperty = properties.getProperty("queue.priority.investigationUser.roles");
String investigationUserCondition = "EXISTS ( SELECT o FROM InvestigationUser o WHERE o.role='";
parseObject(investigationUserProperty, investigationUserCondition);

String instrumentScientistProperty = properties.getProperty("queue.priority.instrumentScientist.instruments");
String instrumentScientistCondition = "EXISTS ( SELECT o FROM InstrumentScientist o WHERE o.instrument.name='";
parseObject(instrumentScientistProperty, instrumentScientistCondition);
}

/**
* Set the minimum priority for all authenticated Users. This cannot be lower
* than the defaultPriority, which will be used instead if this is the case.
*
* @param authenticatedString The value read from the run.properties file
*/
private void setAuthenticatedPriority(String authenticatedString) {
authenticatedPriority = Integer.valueOf(authenticatedString);
if (authenticatedPriority < 1 && defaultPriority >= 1) {
String msg = "queue.priority.authenticated disabled with value " + authenticatedString;
msg += " but queue.priority.default enabled with value " + defaultPriority;
msg += "\nAuthenticated users will use default priority if no superseding priority applies";
logger.warn(msg);
authenticatedPriority = defaultPriority;
} else if (authenticatedPriority >= 1 && authenticatedPriority > defaultPriority) {
String msg = "queue.priority.authenticated enabled with value " + authenticatedString;
msg += " but queue.priority.default supersedes with value " + defaultPriority;
msg += "\nAuthenticated users will use default priority if no superseding priority applies";
logger.warn(msg);
authenticatedPriority = defaultPriority;
}
}

/**
* Extracts each key from a JsonObject, and appends this to the JPQL condition
* for this priority level with OR.
*
* @param propertyString String representing a JsonObject from the
* run.properties file, or null
* @param conditionPrefix JPQL condition which will be formatted with each key
* in the object
*/
private void parseObject(String propertyString, String conditionPrefix) {
if (propertyString == null) {
return;
}
JsonReader reader = Json.createReader(new ByteArrayInputStream(propertyString.getBytes()));
JsonObject object = reader.readObject();
for (String key : object.keySet()) {
int priority = object.getInt(key);
updateMapping(priority, conditionPrefix + key + "' AND o.user=user )");
}
}

/**
* Appends the newCondition to the mapping at the specified priority level using
* OR.
*
* @param priority Priority of the new condition
* @param newCondition Fully formatted JPQL condition
*/
private void updateMapping(int priority, String newCondition) {
if (priority < 1) {
logger.warn("Non-positive priority found in mapping, ignoring entry");
return;
} else if (authenticatedPriority >= 1 && priority >= authenticatedPriority) {
logger.warn("Priority set in mapping would be superseded by queue.priority.authenticated, ignoring entry");
return;
}

String oldCondition = mapping.get(priority);
if (oldCondition != null) {
mapping.put(priority, oldCondition + " OR " + newCondition);
} else {
mapping.put(priority, newCondition);
}
}

/**
* @return Mapping of priority level to a JPQL condition which defines the Users
* who have this priority
*/
public HashMap<Integer, String> getMapping() {
return mapping;
}

/**
* @return The priority which applies to all authenticated users
*/
public int getAuthenticatedPriority() {
return authenticatedPriority;
}

/**
* @return The priority which applies to all users, included anonymous access
*/
public int getDefaultPriority() {
return defaultPriority;
}
}
71 changes: 57 additions & 14 deletions src/main/java/org/icatproject/topcat/StatusCheck.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import java.net.URL;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -273,9 +276,13 @@ private void prepareDownload(Download download, IdsClient injectedIdsClient) thr
}

/**
* Prepares Downloads which are queued (PAUSED with no preparedId) up to the maxActiveDownloads limit.
* Prepares Downloads which are queued (PAUSED with no preparedId) up to the
* maxActiveDownloads limit.
* Downloads will be prepared in order of priority, with all Downloads from
* Users with a value of 1 being prepared first, then 2 and so on.
*
* @param maxActiveDownloads Limit on the number of concurrent jobs with RESTORING status
* @param maxActiveDownloads Limit on the number of concurrent jobs with
* RESTORING status
* @throws Exception
*/
public void startQueuedDownloads(int maxActiveDownloads) throws Exception {
Expand All @@ -287,25 +294,61 @@ public void startQueuedDownloads(int maxActiveDownloads) throws Exception {
String restoringCondition = "download.status = org.icatproject.topcat.domain.DownloadStatus.RESTORING";
String pausedCondition = "download.status = org.icatproject.topcat.domain.DownloadStatus.PAUSED";

String activeQueryString = selectString + " and " + restoringCondition;
TypedQuery<Download> activeDownloadsQuery = em.createQuery(activeQueryString, Download.class);
List<Download> activeDownloads = activeDownloadsQuery.getResultList();

String queuedQueryString = selectString + " and " + pausedCondition + " and download.preparedId = null";
if (maxActiveDownloads > 0) {
int freeActiveDownloads = maxActiveDownloads - activeDownloads.size();
if (freeActiveDownloads <= 0) {
String activeQueryString = selectString + " and " + restoringCondition;
TypedQuery<Download> activeDownloadsQuery = em.createQuery(activeQueryString, Download.class);
List<Download> activeDownloads = activeDownloadsQuery.getResultList();
maxActiveDownloads -= activeDownloads.size();
if (maxActiveDownloads <= 0) {
return;
}
queuedQueryString += " order by download.createdAt limit " + Integer.toString(freeActiveDownloads);
}

String queuedQueryString = selectString + " and " + pausedCondition + " and download.preparedId = null";
queuedQueryString += " order by download.createdAt";
TypedQuery<Download> queuedDownloadsQuery = em.createQuery(queuedQueryString, Download.class);
List<Download> queuedDownloads = queuedDownloadsQuery.getResultList();

for (Download queuedDownload : queuedDownloads) {
queuedDownload.setStatus(DownloadStatus.PREPARING);
prepareDownload(queuedDownload, null);

if (maxActiveDownloads <= 0) {
// No limits on how many to submit
for (Download queuedDownload : queuedDownloads) {
queuedDownload.setStatus(DownloadStatus.PREPARING);
prepareDownload(queuedDownload, null);
}
} else {
HashMap<Integer, List<Download>> mapping = new HashMap<>();
for (Download queuedDownload : queuedDownloads) {
String icatUrl = FacilityMap.getInstance().getIcatUrl(queuedDownload.getFacilityName());
IcatClient icatClient = new IcatClient(icatUrl, queuedDownload.getSessionId());
int priority = icatClient.getQueuePriority(queuedDownload.getUserName());
if (priority == 1) {
// Highest priority, prepare now
queuedDownload.setStatus(DownloadStatus.PREPARING);
prepareDownload(queuedDownload, null);
maxActiveDownloads -= 1;
if (maxActiveDownloads <= 0) {
return;
}
} else {
// Lower priority, add to mapping
mapping.putIfAbsent(priority, new ArrayList<>());
mapping.get(priority).add(queuedDownload);
}
}
List<Integer> keyList = Arrays.asList((Integer[]) mapping.keySet().toArray());
Collections.sort(keyList);
for (int key : keyList) {
// Prepare from mapping in priority order
List<Download> downloadList = mapping.get(key);
for (Download download : downloadList) {
download.setStatus(DownloadStatus.PREPARING);
prepareDownload(download, null);
maxActiveDownloads -= 1;
if (maxActiveDownloads <= 0) {
return;
}
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,7 @@ public Response queueVisitId(@PathParam("facilityName") String facilityName,
// If we wanted to block the user, this is where we would do it
String userName = icatClient.getUserName();
String fullName = icatClient.getFullName();
icatClient.checkQueueAllowed(userName);
JsonArray datasets = icatClient.getDatasets(visitId);

long downloadId;
Expand Down Expand Up @@ -892,6 +893,7 @@ public Response queueFiles(@PathParam("facilityName") String facilityName,
// If we wanted to block the user, this is where we would do it
String userName = icatClient.getUserName();
String fullName = icatClient.getFullName();
icatClient.checkQueueAllowed(userName);
JsonArray datafiles = icatClient.getDatafiles(files);

long downloadId;
Expand Down
Loading

0 comments on commit d061400

Please sign in to comment.