Skip to content

Commit

Permalink
Merge pull request #317 from jbernal0019/master
Browse files Browse the repository at this point in the history
Add support for 'groupByInstance' special parameter for plugins of type 'ts'
  • Loading branch information
jbernal0019 authored Apr 6, 2021
2 parents 7a0e2e0 + 34fd0da commit d2ed29e
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 22 deletions.
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ ENTRYPOINT ["/usr/src/docker-entrypoint.sh"]
EXPOSE 8000

# Start ChRIS production server
CMD ["mod_wsgi-express", "start-server", "config/wsgi.py", "--host", "0.0.0.0", "--port", "8000", "--processes", "4", "--server-root", "/home/localuser/mod_wsgi-0.0.0.0:8000"]
#mod_wsgi-express setup-server config/wsgi.py --host 0.0.0.0 --port 8000 --processes 4 --server-name localhost --server-root /home/localuser/mod_wsgi-0.0.0.0:8000
CMD ["mod_wsgi-express", "start-server", "config/wsgi.py", "--host", "0.0.0.0", "--port", "8000", \
"--processes", "4", "--limit-request-body", "5368709120", "--server-root", "/home/localuser/mod_wsgi-0.0.0.0:8000"]
#to start daemon:
#/home/localuser/mod_wsgi-0.0.0.0:8000/apachectl start
#to stop deamon
Expand Down
47 changes: 30 additions & 17 deletions chris_backend/plugininstances/services/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,14 +280,17 @@ def get_plugin_instance_path_parameters(self):

def get_ts_plugin_instance_input_objs(self):
"""
Get a dictionary with keys that are the output dir of each input plugin instance
to this 'ts' plugin instance. The values of this dictionary are lists of all the
objects under the corresponding output dir key that match a filter for each of
the provided plugin instances.
Get a tuple whose first element is a dictionary with keys that are the ids of
each input plugin instance to this 'ts' plugin instance. The values of
this dictionary are also dictionaries containing the output dir of the plugin
instances and the list of all the objects under the output dir that match a
regular expression. The second element of the tuple indicates the value of the
'groupByInstance' flag for this 'ts' plugin instance.
"""
job_id = self.str_job_id
# extract the 'ts' plugin's special parameters from the DB
plg_inst_ids = regexs = []
group_by_instance = False
if self.c_plugin_inst.plugin.meta.type == 'ts':
for param_inst in self.l_plugin_inst_param_instances:
if param_inst.plugin_param.name == 'plugininstances':
Expand All @@ -296,6 +299,8 @@ def get_ts_plugin_instance_input_objs(self):
elif param_inst.plugin_param.name == 'filter':
# string param that represents a comma-separated list of regular expr
regexs = param_inst.value.split(',') if param_inst.value else []
elif param_inst.plugin_param.name == 'groupByInstance':
group_by_instance = param_inst.value
d_objs = {}
for i, inst_id in enumerate(plg_inst_ids):
try:
Expand All @@ -316,10 +321,12 @@ def get_ts_plugin_instance_input_objs(self):
raise
if (i < len(regexs)) and regexs[i]:
r = re.compile(regexs[i])
d_objs[output_path] = [obj for obj in l_ls if r.search(obj)]
d_objs[plg_inst.id] = {'output_path': output_path,
'objs': [obj for obj in l_ls if r.search(obj)]}
else:
d_objs[output_path] = l_ls
return d_objs
d_objs[plg_inst.id] = {'output_path': output_path,
'objs': l_ls}
return d_objs, group_by_instance

def manage_plugin_instance_app_empty_inputdir(self):
"""
Expand Down Expand Up @@ -446,7 +453,8 @@ def _handle_app_unextpath_parameters(self, unextpath_parameters_dict):
if not obj_output_path.startswith(outputdir + '/'):
obj_output_path = outputdir + '/' + obj.split('/')[-1]
try:
self.swift_manager.copy_obj(obj, obj_output_path)
if not self.swift_manager.obj_exists(obj_output_path):
self.swift_manager.copy_obj(obj, obj_output_path)
except ClientException as e:
logger.error(f'[CODE09,{job_id}]: Error while copying file '
f'from {obj} to {obj_output_path} in swift storage, '
Expand All @@ -459,20 +467,25 @@ def _handle_app_unextpath_parameters(self, unextpath_parameters_dict):
self.str_job_id)
self._register_output_files(obj_output_path_list)

def _handle_app_ts_unextracted_input_objs(self, d_ts_input_objs):
def _handle_app_ts_unextracted_input_objs(self, d_ts_input_objs, group_by_instance):
"""
Internal method to handle a 'ts' plugin's input instances' filtered objects
that are not extracted from object storage.
(which are not extracted from object storage).
"""
job_id = self.str_job_id
outputdir = self.c_plugin_inst.get_output_path()
obj_output_path_list = []
for plg_inst_outputdir in d_ts_input_objs:
obj_list = d_ts_input_objs[plg_inst_outputdir]
for plg_inst_id in d_ts_input_objs:
plg_inst_output_path = d_ts_input_objs[plg_inst_id]['output_path']
obj_list = d_ts_input_objs[plg_inst_id]['objs']
plg_inst_outputdir = outputdir
if group_by_instance:
plg_inst_outputdir = os.path.join(outputdir, str(plg_inst_id))
for obj in obj_list:
obj_output_path = obj.replace(plg_inst_outputdir, outputdir, 1)
obj_output_path = obj.replace(plg_inst_output_path, plg_inst_outputdir, 1)
try:
self.swift_manager.copy_obj(obj, obj_output_path)
if not self.swift_manager.obj_exists(obj_output_path):
self.swift_manager.copy_obj(obj, obj_output_path)
except ClientException as e:
logger.error(f'[CODE09,{job_id}]: Error while copying file '
f'from {obj} to {obj_output_path} in swift storage, '
Expand Down Expand Up @@ -524,10 +537,10 @@ def _handle_finished_successfully_status(self):
if d_unextpath_params:
self._handle_app_unextpath_parameters(d_unextpath_params)

# register files from filtered input instance paths for 'ts' plugins
# register files from filtered input instance paths ('ts' plugins)
if self.c_plugin_inst.plugin.meta.type == 'ts':
d_ts_input_objs = self.get_ts_plugin_instance_input_objs()
self._handle_app_ts_unextracted_input_objs(d_ts_input_objs)
d_ts_input_objs, tf = self.get_ts_plugin_instance_input_objs()
self._handle_app_ts_unextracted_input_objs(d_ts_input_objs, tf)
except Exception:
self.c_plugin_inst.status = 'cancelled' # giving up
else:
Expand Down
180 changes: 180 additions & 0 deletions chris_backend/plugininstances/tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import time
import io
import os
from unittest import mock, skip

from django.test import TestCase, TransactionTestCase, tag
Expand Down Expand Up @@ -304,6 +305,117 @@ def test_integration_plugin_instance_create_success(self):
# delete files from swift storage
self.swift_manager.delete_obj(self.user_space_path + 'test.txt')

@tag('integration')
def test_integration_ts_plugin_instance_create_success(self):
# create an FS plugin instance
user = User.objects.get(username=self.username)
plugin = Plugin.objects.get(meta__name="pacspull")
(fs_plg_inst, tf) = PluginInstance.objects.get_or_create(
plugin=plugin, owner=user, compute_resource=plugin.compute_resources.all()[0])

# upload FS plugin instace output file to Swift storage
path = os.path.join(fs_plg_inst.get_output_path(), 'test.txt')
with io.StringIO("test file") as test_file:
self.swift_manager.upload_obj(path, test_file.read(),
content_type='text/plain')
(fs_plg_inst_file, tf) = PluginInstanceFile.objects.get_or_create(plugin_inst=fs_plg_inst)
fs_plg_inst_file.fname.name = path
fs_plg_inst_file.save()
fs_plg_inst.status = 'finishedSuccessfully'
fs_plg_inst.save()

# add a TS plugin to the system
plugin_parameters = [{'name': 'plugininstances', 'type': 'string',
'action': 'store', 'optional': True,
'flag': '--plugininstances', 'short_flag': '--plugininstances',
'help': 'test plugin parameter', 'ui_exposed': True},

{'name': 'filter', 'type': 'string',
'action': 'store', 'optional': True,
'flag': '--filter',
'short_flag': '-f',
'help': 'test plugin parameter', 'ui_exposed': True}
]
self.plg_data = {'description': 'A toplological copy ts plugin',
'version': '0.1',
'dock_image': 'fnndsc/pl-topologicalcopy',
'execshell': 'python3',
'selfpath': '/usr/local/bin',
'selfexec': 'topologicalcopy'}

self.plg_meta_data = {'name': 'topologicalcopy',
'title': 'TS copy plugin',
'license': 'MIT',
'type': 'ts',
'icon': 'http://github.com/plugin',
'category': 'Utility',
'stars': 0,
'authors': 'FNNDSC ([email protected])'}

self.plugin_repr = self.plg_data.copy()
self.plugin_repr.update(self.plg_meta_data)
self.plugin_repr['parameters'] = plugin_parameters

(compute_resource, tf) = ComputeResource.objects.get_or_create(
name="host", compute_url=COMPUTE_RESOURCE_URL)

data = self.plg_meta_data.copy()
(pl_meta, tf) = PluginMeta.objects.get_or_create(**data)
data = self.plg_data.copy()
(plugin, tf) = Plugin.objects.get_or_create(meta=pl_meta, **data)
plugin.compute_resources.set([compute_resource])
plugin.save()

# add plugin's parameters
parameters = plugin_parameters
PluginParameter.objects.get_or_create(
plugin=plugin,
name=parameters[0]['name'],
type=parameters[0]['type'],
flag=parameters[0]['flag'])

# make POST API request to create a ts plugin instance
create_read_url = reverse("plugininstance-list", kwargs={"pk": plugin.id})
post = json.dumps(
{"template": {"data": [{"name": "previous_id", "value": fs_plg_inst.id},
{"name": "plugininstances", "value": str(fs_plg_inst.id)},
{"name": "filter", "value": ".txt$"}]}})
self.client.login(username=self.username, password=self.password)
response = self.client.post(create_read_url, data=post,
content_type=self.content_type)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)

# instance must be 'started' before checking its status
pl_inst = PluginInstance.objects.get(pk=response.data['id'])
for _ in range(10):
time.sleep(3)
pl_inst.refresh_from_db()
if pl_inst.status == 'started': break
self.assertEqual(pl_inst.status, 'started') # instance must be started

# In the following we keep checking the status until the job ends with
# 'finishedSuccessfully'. The code runs in a lazy loop poll with a
# max number of attempts at 10 second intervals.
plg_inst_manager = PluginInstanceManager(pl_inst)
maxLoopTries = 10
currentLoop = 1
b_checkAgain = True
time.sleep(10)
while b_checkAgain:
str_responseStatus = plg_inst_manager.check_plugin_instance_app_exec_status()
if str_responseStatus == 'finishedSuccessfully':
b_checkAgain = False
elif currentLoop < maxLoopTries:
time.sleep(10)
if currentLoop == maxLoopTries:
b_checkAgain = False
currentLoop += 1
self.assertEqual(pl_inst.status, 'finishedSuccessfully')
self.assertEqual(pl_inst.files.count(), 3)

# delete files from swift storage
self.swift_manager.delete_obj(path)

def test_plugin_instance_create_failure_unauthenticated(self):
response = self.client.post(self.create_read_url, data=self.post,
content_type=self.content_type)
Expand Down Expand Up @@ -603,6 +715,74 @@ def test_plugin_instance_descendant_list_failure_unauthenticated(self):
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)


class PluginInstanceSplitListViewTests(ViewTests):
"""
Test the plugininstancesplit-list view.
"""

def setUp(self):
super(PluginInstanceSplitListViewTests, self).setUp()

user = User.objects.get(username=self.username)

# create an 'fs' plugin instance
plugin = Plugin.objects.get(meta__name="pacspull")
(self.fs_inst, tf) = PluginInstance.objects.get_or_create(
plugin=plugin, owner=user, compute_resource=plugin.compute_resources.all()[0])

# create a 'ts' plugin
(pl_meta, tf) = PluginMeta.objects.get_or_create(name='pl-topologicalcopy', type='ts')
(plugin_ts, tf) = Plugin.objects.get_or_create(meta=pl_meta, version='0.1')
plugin_ts.compute_resources.set([self.compute_resource])
plugin_ts.save()

self.create_read_url = reverse("plugininstancesplit-list", kwargs={"pk": self.fs_inst.id})

def test_plugin_instance_split_create_failure_access_denied(self):
post = json.dumps({"template": {"data": [{"name": "filter", "value": ""}]}})
self.client.login(username=self.other_username, password=self.other_password)
response = self.client.post(self.create_read_url, data=post,
content_type=self.content_type)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)

def test_plugin_instance_split_create_success(self):
post = json.dumps({"template": {"data": [{"name": "filter", "value": ""}]}})

# add parameters to the plugin before the POST request
plugin = Plugin.objects.get(meta__name="pl-topologicalcopy")
PluginParameter.objects.get_or_create(plugin=plugin, name='filter', type='string')
PluginParameter.objects.get_or_create(plugin=plugin, name='plugininstances',
type='string')

self.client.login(username=self.username, password=self.password)

# make API requests
response = self.client.post(self.create_read_url, data=post,
content_type=self.content_type)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)

self.fs_inst.status = 'finishedSuccessfully'
self.fs_inst.save()
with mock.patch.object(views.run_plugin_instance, 'delay',
return_value=None) as delay_mock:

response = self.client.post(self.create_read_url, data=post,
content_type=self.content_type)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
# check that the run_plugin_instance task was called with appropriate args
delay_mock.assert_called_once()

def test_plugin_instance_split_list_success(self):
self.client.login(username=self.username, password=self.password)
response = self.client.get(self.create_read_url)
# response should contain all the instances in the tree
self.assertContains(response, "filter")

def test_plugin_instance_split_list_failure_unauthenticated(self):
response = self.client.get(self.create_read_url)
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)


class PluginInstanceParameterListViewTests(ViewTests):
"""
Test the plugininstance-parameter-list view.
Expand Down
2 changes: 1 addition & 1 deletion chris_backend/plugininstances/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ def perform_create(self, serializer):
if cr_name:
compute_resource = plg_topologcopy.compute_resources.get(name=cr_name)
else:
compute_resource = instance.compute_resource
compute_resource = plg_topologcopy.compute_resources.first()

plg_filter_param = plg_topologcopy.parameters.get(name='filter')
plg_plugininstances_param = plg_topologcopy.parameters.get(name='plugininstances')
Expand Down
4 changes: 2 additions & 2 deletions docker-deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ if [[ "$1" == 'up' ]]; then
export STOREBASE=$(pwd)/FS/remote
windowBottom

title -d 1 "Starting chris_stack single-machine production deployment on swarm using " " ./docker-compose.yml"
title -d 1 "Starting chris_stack production deployment on swarm using " " ./docker-compose.yml"
declare -a A_CONTAINER=(
"fnndsc/chris"
"fnndsc/chris_store"
Expand Down Expand Up @@ -114,7 +114,7 @@ fi

if [[ "$1" == 'down' ]]; then

title -d 1 "Destroying chris_stack single-machine production deployment on swarm" "from ./docker-compose.yml"
title -d 1 "Destroying chris_stack production deployment on swarm" "from ./docker-compose.yml"
echo
docker stack rm chris_stack
docker swarm leave --force
Expand Down

0 comments on commit d2ed29e

Please sign in to comment.