From 2e1e61edb2e55605fc7aac83dc056b959a32a1b6 Mon Sep 17 00:00:00 2001 From: Jennings Zhang Date: Tue, 27 Aug 2024 23:11:53 -0400 Subject: [PATCH 01/11] Replace WSGI with ASGI and add websocket authentication --- Dockerfile | 2 +- chris_backend/config/asgi.py | 15 ++++++- chris_backend/config/settings/common.py | 3 +- chris_backend/config/wsgi.py | 18 -------- chris_backend/core/middleware.py | 2 +- chris_backend/core/views.py | 54 +++++++++++------------ chris_backend/core/websockets/__init__.py | 0 chris_backend/core/websockets/auth.py | 53 ++++++++++++++++++++++ chris_backend/core/websockets/urls.py | 11 +++++ chris_backend/pacsfiles/consumers.py | 41 +++++++++++++++++ requirements/base.txt | 1 + 11 files changed, 151 insertions(+), 49 deletions(-) delete mode 100755 chris_backend/config/wsgi.py create mode 100755 chris_backend/core/websockets/__init__.py create mode 100755 chris_backend/core/websockets/auth.py create mode 100755 chris_backend/core/websockets/urls.py create mode 100755 chris_backend/pacsfiles/consumers.py diff --git a/Dockerfile b/Dockerfile index bbeb9bce..3f696b5e 100755 --- a/Dockerfile +++ b/Dockerfile @@ -39,7 +39,7 @@ COPY chris_backend/ ./ RUN if [ "$ENVIRONMENT" = "production" ]; then \ env DJANGO_SETTINGS_MODULE=config.settings.common ./manage.py collectstatic; fi -CMD ["gunicorn", "-b", "0.0.0.0:8000", "-w", "4", "config.wsgi:application"] +CMD ["daphne", "-p", "8000", "config.asgi:application"] LABEL org.opencontainers.image.authors="FNNDSC " \ org.opencontainers.image.title="ChRIS Backend" \ diff --git a/chris_backend/config/asgi.py b/chris_backend/config/asgi.py index 789432b0..a909fe40 100755 --- a/chris_backend/config/asgi.py +++ b/chris_backend/config/asgi.py @@ -10,9 +10,22 @@ import os, sys sys.path.append(os.path.dirname(os.path.dirname(__file__))) +from core.websockets.urls import websocket_urlpatterns +from core.websockets.auth import TokenQsAuthMiddleware + +from channels.routing import ProtocolTypeRouter, URLRouter from django.core.asgi import get_asgi_application +from channels.security.websocket import AllowedHostsOriginValidator os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.production') -application = get_asgi_application() +django_asgi_app = get_asgi_application() + +# see https://channels.readthedocs.io/en/3.x/installation.html +application = ProtocolTypeRouter({ + 'http': django_asgi_app, + 'websocket': AllowedHostsOriginValidator( + TokenQsAuthMiddleware(URLRouter(websocket_urlpatterns)) + ), +}) diff --git a/chris_backend/config/settings/common.py b/chris_backend/config/settings/common.py index c39ca94c..27e4a913 100755 --- a/chris_backend/config/settings/common.py +++ b/chris_backend/config/settings/common.py @@ -19,6 +19,7 @@ # Application definition INSTALLED_APPS = [ + 'daphne', 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', @@ -99,7 +100,7 @@ }, ] -WSGI_APPLICATION = 'config.wsgi.application' +ASGI_APPLICATION = 'config.asgi.application' # Database diff --git a/chris_backend/config/wsgi.py b/chris_backend/config/wsgi.py deleted file mode 100755 index 4de1837e..00000000 --- a/chris_backend/config/wsgi.py +++ /dev/null @@ -1,18 +0,0 @@ -""" -WSGI config for chris_backend project. - -It exposes the WSGI callable as a module-level variable named ``application``. - -For more information on this file, see -https://docs.djangoproject.com/en/4.2/howto/deployment/wsgi/ -""" - -import os, sys -sys.path.append(os.path.dirname(os.path.dirname(__file__))) - -from django.core.wsgi import get_wsgi_application - - -os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.production') - -application = get_wsgi_application() diff --git a/chris_backend/core/middleware.py b/chris_backend/core/middleware.py index 29cebe47..fd926bc1 100755 --- a/chris_backend/core/middleware.py +++ b/chris_backend/core/middleware.py @@ -41,7 +41,7 @@ def __call__(self, request): return self.get_response(request) def process_exception(self, request, exception): - print(exception) + print(exception, flush=True) mime = request.META.get('HTTP_ACCEPT') if mime != 'text/html': return api_500(request) diff --git a/chris_backend/core/views.py b/chris_backend/core/views.py index 7fcab7f1..c0541214 100755 --- a/chris_backend/core/views.py +++ b/chris_backend/core/views.py @@ -124,31 +124,31 @@ def authenticate(self, request): # Check if 'download_token' is in the request query params if 'download_token' in request.query_params: token = request.query_params['download_token'] - err_msg = f'Invalid file download token: {token}' - - try: - info = jwt.decode(token, settings.SECRET_KEY, algorithms=['HS256']) - except jwt.ExpiredSignatureError: - err_msg = f'Expired file download token: {token}' - logger.error(err_msg) - raise exceptions.AuthenticationFailed(err_msg) - except jwt.InvalidTokenError: - logger.error(err_msg) - raise exceptions.AuthenticationFailed(err_msg) - - try: - user = User.objects.get(username=info['user']) - except User.DoesNotExist: - logger.error(err_msg) - raise exceptions.AuthenticationFailed(err_msg) - - try: - token_obj = FileDownloadToken.objects.get(owner=user, token=token) - except FileDownloadToken.DoesNotExist: - logger.error(err_msg) - raise exceptions.AuthenticationFailed(err_msg) - - token_obj.delete() # one-time-use token, we could instead set revoked=true - return user, None - + return authenticate_token(token), None return super(TokenAuthSupportQueryString, self).authenticate(request) + + +def authenticate_token(token: str) -> User: + err_msg = f'Invalid file download token: {token}' + try: + info = jwt.decode(token, settings.SECRET_KEY, algorithms=['HS256']) + except jwt.ExpiredSignatureError: + err_msg = f'Expired file download token: {token}' + logger.error(err_msg) + raise exceptions.AuthenticationFailed(err_msg) + except jwt.InvalidTokenError: + logger.error(err_msg) + raise exceptions.AuthenticationFailed(err_msg) + + try: + user = User.objects.get(username=info['user']) + except User.DoesNotExist: + logger.error(err_msg) + raise exceptions.AuthenticationFailed(err_msg) + + token_obj = FileDownloadToken.objects.filter(owner=user, token=token).first() + if token_obj is None: + raise exceptions.AuthenticationFailed(err_msg) + + token_obj.delete() # one-time-use token, we could instead set revoked=true + return user diff --git a/chris_backend/core/websockets/__init__.py b/chris_backend/core/websockets/__init__.py new file mode 100755 index 00000000..e69de29b diff --git a/chris_backend/core/websockets/auth.py b/chris_backend/core/websockets/auth.py new file mode 100755 index 00000000..b1ca99bf --- /dev/null +++ b/chris_backend/core/websockets/auth.py @@ -0,0 +1,53 @@ +""" +Websockets authentication. + +Notes +----- + +``channels.auth.AuthMiddlewareStack`` depends on HTTP headers, however HTTP headers +cannot be set for websockets in the web browser. https://stackoverflow.com/a/4361358 + +A common pattern is to put a token in the query string. We will re-use the file "downloadtokens" +for this purpose. +""" + +import urllib.parse +from typing import AnyStr + +from channels.db import database_sync_to_async +from django.contrib.auth.models import User +from rest_framework.exceptions import AuthenticationFailed + +from core.views import authenticate_token + + +class TokenQsAuthMiddleware: + """ + Authenticate the request using :class:`TokenAuthSupportQueryString`. + + Based on + https://channels.readthedocs.io/en/3.x/topics/authentication.html#custom-authentication + """ + + def __init__(self, app): + self.app = app + + async def __call__(self, scope, receive, send): + scope['user'] = await self._authenticate(scope) + return await self.app(scope, receive, send) + + @database_sync_to_async + def _authenticate(self, scope) -> User | None: + params = _parse_qs_last_string(scope.get('query_string', b'')) + if token := params.get('token', None): + try: + return authenticate_token(token) + except AuthenticationFailed: + return None + return None + + +def _parse_qs_last_string(qs: AnyStr, encoding='utf-8') -> dict[str, str]: + if isinstance(qs, bytes): + qs = qs.decode(encoding=encoding) + return {k: v[-1] for k, v in urllib.parse.parse_qs(qs).items()} diff --git a/chris_backend/core/websockets/urls.py b/chris_backend/core/websockets/urls.py new file mode 100755 index 00000000..3b315b28 --- /dev/null +++ b/chris_backend/core/websockets/urls.py @@ -0,0 +1,11 @@ +""" +Async routes. +""" + +from django.urls import re_path +from pacsfiles import consumers + +websocket_urlpatterns = [ + re_path(r'v1/pacs/progress/', + consumers.PACSFileProgress.as_asgi()), +] \ No newline at end of file diff --git a/chris_backend/pacsfiles/consumers.py b/chris_backend/pacsfiles/consumers.py new file mode 100755 index 00000000..bb6840f6 --- /dev/null +++ b/chris_backend/pacsfiles/consumers.py @@ -0,0 +1,41 @@ +import json + +import channels.exceptions +from channels.generic.websocket import WebsocketConsumer +from rest_framework import permissions + +from pacsfiles.permissions import IsChrisOrIsPACSUserReadOnly + + +class PACSFileProgress(WebsocketConsumer): + + permission_classes = (permissions.IsAuthenticated, IsChrisOrIsPACSUserReadOnly,) + + def connect(self): + if not self._has_permission(): + raise channels.exceptions.DenyConnection() + self.accept() + + def _has_permission(self) -> bool: + """ + Manual permissions check. + + django-channels is going to handle authentication for us, + but we need to implement permissions ourselves. + """ + self.user = self.scope.get('user', None) + if self.user is None: + return False + return all( + permission().has_permission(self, self.__class__) + for permission in self.permission_classes + ) + + def disconnect(self, close_code): + pass + + def receive(self, text_data): + text_data_json = json.loads(text_data) + message = text_data_json["message"] + + self.send(text_data=json.dumps({"message": message})) \ No newline at end of file diff --git a/requirements/base.txt b/requirements/base.txt index 255fb9a2..e4131083 100755 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -14,3 +14,4 @@ django-auth-ldap==4.5.0 PyYAML==6.0.1 whitenoise[brotli]==6.5.0 PyJWT===2.8.0 +channels[daphne]==4.1.0 From 6986e615079731ff835893f239c2b5ac57da5283 Mon Sep 17 00:00:00 2001 From: Jennings Zhang Date: Wed, 28 Aug 2024 14:04:40 -0400 Subject: [PATCH 02/11] Register PACSSeries from celery task instead of HTTP POST --- chris_backend/core/celery.py | 1 + chris_backend/pacsfiles/tasks.py | 38 ++++++++++++++++++++++++++++++++ chris_backend/pacsfiles/views.py | 2 +- 3 files changed, 40 insertions(+), 1 deletion(-) create mode 100644 chris_backend/pacsfiles/tasks.py diff --git a/chris_backend/core/celery.py b/chris_backend/core/celery.py index 09fe8350..23fbd439 100755 --- a/chris_backend/core/celery.py +++ b/chris_backend/core/celery.py @@ -33,6 +33,7 @@ {'queue': 'periodic'}, 'plugininstances.tasks.cancel_waiting_plugin_instances': {'queue': 'periodic'}, + 'pacsfiles.tasks.register_pacs_series': {'queue': 'main2'} } app.conf.update(task_routes=task_routes) diff --git a/chris_backend/pacsfiles/tasks.py b/chris_backend/pacsfiles/tasks.py new file mode 100644 index 00000000..0f066cc3 --- /dev/null +++ b/chris_backend/pacsfiles/tasks.py @@ -0,0 +1,38 @@ +from celery import shared_task +from django.contrib.auth.models import User + +from .serializers import PACSSeriesSerializer + + +@shared_task +def register_pacs_series( + patient_id: str, + patient_name: str, + study_date: str, + study_instance_uid: str, + study_description: str, + series_description: str, + series_instance_uid: str, + pacs_name: str, + path: str, + ndicom: int +): + """ + Register a DICOM series (directory of DICOM files) existing in storage to the database, + """ + data = { + 'PatientID': patient_id, + 'PatientName': patient_name, + 'StudyDate': study_date, + 'StudyInstanceUID': study_instance_uid, + 'StudyDescription': study_description, + 'SeriesInstanceUID': series_instance_uid, + 'SeriesDescription': series_description, + 'pacs_name': pacs_name, + 'path': path, + 'ndicom': ndicom + } + serializer = PACSSeriesSerializer(data=data) + serializer.is_valid(raise_exception=True) + owner = User.objects.get(username='chris') + serializer.save(owner=owner) diff --git a/chris_backend/pacsfiles/views.py b/chris_backend/pacsfiles/views.py index d01e5204..7cb35fb7 100755 --- a/chris_backend/pacsfiles/views.py +++ b/chris_backend/pacsfiles/views.py @@ -16,7 +16,7 @@ class PACSSeriesList(generics.ListCreateAPIView): """ A view for the collection of PACS Series. """ - http_method_names = ['get', 'post'] + http_method_names = ['get'] queryset = PACSSeries.objects.all() serializer_class = PACSSeriesSerializer permission_classes = (permissions.IsAuthenticated, IsChrisOrIsPACSUserReadOnly,) From c2207b596c772b74d4b3265b17e4e04c09bff675 Mon Sep 17 00:00:00 2001 From: Jennings Zhang Date: Wed, 28 Aug 2024 18:56:15 -0400 Subject: [PATCH 03/11] Change order of parameters --- chris_backend/pacsfiles/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chris_backend/pacsfiles/tasks.py b/chris_backend/pacsfiles/tasks.py index 0f066cc3..dc7d09cf 100644 --- a/chris_backend/pacsfiles/tasks.py +++ b/chris_backend/pacsfiles/tasks.py @@ -11,8 +11,8 @@ def register_pacs_series( study_date: str, study_instance_uid: str, study_description: str, - series_description: str, series_instance_uid: str, + series_description: str, pacs_name: str, path: str, ndicom: int From e3d158e33174072bdaff037f914fdfb775276d5e Mon Sep 17 00:00:00 2001 From: Jennings Zhang Date: Fri, 6 Sep 2024 00:25:27 -0400 Subject: [PATCH 04/11] Adapt pacsfiles registration tests to call task instead of HTTP POST --- chris_backend/pacsfiles/tasks.py | 62 +++++++---- chris_backend/pacsfiles/tests/test_tasks.py | 108 ++++++++++++++++++++ chris_backend/pacsfiles/tests/test_views.py | 79 -------------- unregister_pacsfiles.sh | 15 +++ 4 files changed, 165 insertions(+), 99 deletions(-) mode change 100644 => 100755 chris_backend/pacsfiles/tasks.py create mode 100755 chris_backend/pacsfiles/tests/test_tasks.py create mode 100755 unregister_pacsfiles.sh diff --git a/chris_backend/pacsfiles/tasks.py b/chris_backend/pacsfiles/tasks.py old mode 100644 new mode 100755 index dc7d09cf..8f09fbbc --- a/chris_backend/pacsfiles/tasks.py +++ b/chris_backend/pacsfiles/tasks.py @@ -1,3 +1,4 @@ +from typing import Optional from celery import shared_task from django.contrib.auth.models import User @@ -6,33 +7,54 @@ @shared_task def register_pacs_series( - patient_id: str, - patient_name: str, - study_date: str, - study_instance_uid: str, - study_description: str, - series_instance_uid: str, - series_description: str, - pacs_name: str, - path: str, - ndicom: int + PatientID: str, + StudyDate: str, + StudyInstanceUID: str, + SeriesInstanceUID: str, + pacs_name: str, + path: str, + ndicom: int, + PatientName: Optional[str] = None, + PatientBirthDate: Optional[str] = None, + PatientAge: Optional[int] = None, + PatientSex: Optional[str] = None, + AccessionNumber: Optional[str] = None, + Modality: Optional[str] = None, + ProtocolName: Optional[str] = None, + StudyDescription: Optional[str] = None, + SeriesDescription: Optional[str] = None, ): """ - Register a DICOM series (directory of DICOM files) existing in storage to the database, + Register a DICOM series (directory of DICOM files) to the database. + + Pre-condition: DICOM files *must* exist in storage before running this task. """ data = { - 'PatientID': patient_id, - 'PatientName': patient_name, - 'StudyDate': study_date, - 'StudyInstanceUID': study_instance_uid, - 'StudyDescription': study_description, - 'SeriesInstanceUID': series_instance_uid, - 'SeriesDescription': series_description, + 'PatientID': PatientID, + 'PatientName': PatientName, + 'PatientBirthDate': PatientBirthDate, + 'PatientAge': PatientAge, + 'PatientSex': PatientSex, + 'StudyDate': StudyDate, + 'AccessionNumber': AccessionNumber, + 'Modality': Modality, + 'ProtocolName': ProtocolName, + 'StudyInstanceUID': StudyInstanceUID, + 'StudyDescription': StudyDescription, + 'SeriesInstanceUID': SeriesInstanceUID, + 'SeriesDescription': SeriesDescription, 'pacs_name': pacs_name, 'path': path, - 'ndicom': ndicom + 'ndicom': ndicom, } - serializer = PACSSeriesSerializer(data=data) + serializer = PACSSeriesSerializer(data=_filter_some_values(data)) serializer.is_valid(raise_exception=True) owner = User.objects.get(username='chris') serializer.save(owner=owner) + + +def _filter_some_values(x: dict[str, any]) -> dict[str, any]: + """ + Remove entries where the value is ``None``.` + """ + return {k: v for k, v in x.items() if v is not None} diff --git a/chris_backend/pacsfiles/tests/test_tasks.py b/chris_backend/pacsfiles/tests/test_tasks.py new file mode 100755 index 00000000..30da2844 --- /dev/null +++ b/chris_backend/pacsfiles/tests/test_tasks.py @@ -0,0 +1,108 @@ +import logging +import json +import io +from unittest import mock + +from django.test import TestCase, tag +from django.conf import settings +from django.contrib.auth.models import User, Group +from django.urls import reverse + +from rest_framework import status +from rest_framework.exceptions import ValidationError + +from core.models import ChrisFolder +from core.storage import connect_storage +from pacsfiles.models import PACS, PACSSeries, PACSFile +from pacsfiles.tasks import register_pacs_series + + +class PACSSeriesCreateTests(TestCase): + """ + Test creating PACS series using the task function. + """ + + def setUp(self): + self.storage_manager = connect_storage(settings) + + def tearDown(self): + super().tearDown() + test_data_dir = 'SERVICES/PACS/MyPACS/123456-crazy' + if self.storage_manager.path_exists(test_data_dir): + self.storage_manager.delete_path(test_data_dir) + + @tag('integration') + def test_integration_pacs_series_create_success(self): + series_instance_uid = '1.1.3432.54.6545674765.765434' + series_dir = 'SERVICES/PACS/MyPACS/123456-crazy/brain_crazy_study/SAG_T1_MPRAGE1/file2.dcm' + path = series_dir + '/file2.dcm' + fake_dicom_content = b'test file content' + # upload fake file to storage + self.storage_manager.upload_obj(path, fake_dicom_content) + + # invoke the task to register the file + register_pacs_series( + path=path, + ndicom=1, + PatientID='12345', + PatientName='crazy', + PatientSex='O', + StudyDate='2020-07-15', + StudyInstanceUID='1.1.3432.54.6545674765.765434', + StudyDescription='brain crazy study', + SeriesInstanceUID=series_instance_uid, + SeriesDescription='SAG T1 MPRAGE', + pacs_name='MyPACS', + ) + + # assert PACSSeries and PACSFile were created + series = PACSSeries.objects.get(SeriesInstanceUID=series_instance_uid) + self.assertEqual(series.StudyDescription, 'brain crazy study') + + def test_pacs_series_create_failure_already_exists(self): + pacs_path = 'SERVICES/PACS/MyPACS' + series_path = ( + f'{pacs_path}/123456-crazy/brain_crazy_study/SAG_T1_MPRAGE' + ) + + owner = User.objects.get(username='chris') + series_folder, _ = ChrisFolder.objects.get_or_create( + path=series_path, owner=owner + ) + pacs_folder, _ = ChrisFolder.objects.get_or_create( + path=pacs_path, owner=owner + ) + pacs, _ = PACS.objects.get_or_create( + identifier='MyPACS', folder=pacs_folder + ) + existing_series, _ = PACSSeries.objects.get_or_create( + PatientID='123456', + StudyDate='2020-07-15', + StudyInstanceUID='1.1.3432.54.6545674765.765434', + SeriesInstanceUID='2.4.3432.54.845674765.763345', + pacs=pacs, + folder=series_folder, + ) + + file_content = b'example DICOM which should not be registered' + self.storage_manager.upload_obj( + f'{series_path}/file1.dcm', file_content + ) + expected = ( + f'A DICOM series with SeriesInstanceUID={existing_series.SeriesInstanceUID} ' + 'already registered for pacs MyPACS' + ) + with self.assertRaisesRegex(ValidationError, expected): + register_pacs_series( + PatientID='12345', + StudyDate='2020-07-15', + StudyInstanceUID='1.1.3432.54.6545674765.765434', + SeriesInstanceUID='2.4.3432.54.845674765.763345', + pacs_name='MyPACS', + path=series_path, + ndicom=1, + PatientName='crazy', + PatientSex='O', + StudyDescription='brain_crazy_study', + SeriesDescription='SAG T1 MPRAGE', + ) diff --git a/chris_backend/pacsfiles/tests/test_views.py b/chris_backend/pacsfiles/tests/test_views.py index b4c70be5..cd044282 100755 --- a/chris_backend/pacsfiles/tests/test_views.py +++ b/chris_backend/pacsfiles/tests/test_views.py @@ -90,85 +90,6 @@ class PACSSeriesListViewTests(PACSViewTests): def setUp(self): super(PACSSeriesListViewTests, self).setUp() self.create_read_url = reverse("pacsseries-list") - path = 'SERVICES/PACS/MyPACS/123456-crazy/brain_crazy_study/SAG_T1_MPRAGE1' - self.post = json.dumps( - {"template": {"data": [{"name": "path", "value": path}, - {"name": "ndicom", "value": 1}, - {"name": "PatientID", "value": "12345"}, - {"name": "PatientName", "value": "crazy"}, - {"name": "PatientSex", "value": "O"}, - {"name": "StudyDate", "value": '2020-07-15'}, - {"name": "StudyInstanceUID", - "value": '1.1.3432.54.6545674765.765434'}, - {"name": "StudyDescription", "value": "brain_crazy_study"}, - {"name": "SeriesInstanceUID", - "value": "2.4.3432.54.845674765.763346"}, - {"name": "SeriesDescription", "value": "SAG T1 MPRAGE"}, - {"name": "pacs_name", "value": "MyPACS"}]}}) - - @tag('integration') - def test_integration_pacs_series_create_success(self): - path = 'SERVICES/PACS/MyPACS/123456-crazy/brain_crazy_study/SAG_T1_MPRAGE1' - # upload file to storage - with io.StringIO("test file") as file1: - self.storage_manager.upload_obj(path + '/file2.dcm', file1.read(), - content_type='text/plain') - - # make the POST request using the chris user - self.client.login(username=self.chris_username, password=self.chris_password) - response = self.client.post(self.create_read_url, data=self.post, - content_type=self.content_type) - self.assertEqual(response.status_code, status.HTTP_201_CREATED) - - # delete file from storage - self.storage_manager.delete_obj(path + '/file2.dcm') - - def test_pacs_series_create_failure_unauthenticated(self): - response = self.client.post(self.create_read_url, data=self.post, - content_type=self.content_type) - self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) - - def test_pacs_series_create_failure_already_exists(self): - self.client.login(username=self.chris_username, password=self.chris_password) - path = 'SERVICES/PACS/MyPACS/123456-crazy/brain_crazy_study/SAG_T1_MPRAGE' - - owner = User.objects.get(username=self.chris_username) - (series_folder, _) = ChrisFolder.objects.get_or_create(path=path, - owner=owner) - pacs = PACS.objects.get(identifier='MyPACS') - PACSSeries.objects.get_or_create(PatientID='123456', - StudyDate='2020-07-15', - StudyInstanceUID='1.1.3432.54.6545674765.765434', - SeriesInstanceUID='2.4.3432.54.845674765.763345', - pacs=pacs, - folder=series_folder) - - post = json.dumps( - {"template": {"data": [{"name": "path", "value": path}, - {"name": "ndicom", "value": 1}, - {"name": "PatientID", "value": "12345"}, - {"name": "PatientName", "value": "crazy"}, - {"name": "PatientSex", "value": "O"}, - {"name": "StudyDate", "value": '2020-07-15'}, - {"name": "StudyInstanceUID", - "value": '1.1.3432.54.6545674765.765434'}, - {"name": "StudyDescription", - "value": "brain_crazy_study"}, - {"name": "SeriesInstanceUID", - "value": "2.4.3432.54.845674765.763345"}, - {"name": "SeriesDescription", - "value": "SAG T1 MPRAGE"}, - {"name": "pacs_name", "value": "MyPACS"}]}}) - - response = self.client.post(self.create_read_url, data=post, - content_type=self.content_type) - self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) - - def test_pacs_series_create_failure_permission_denied_not_chris_user(self): - self.client.login(username=self.username, password=self.password) - response = self.client.post(self.create_read_url, data=self.post, - content_type=self.content_type) - self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) def test_pacs_series_list_success(self): self.client.login(username=self.username, password=self.password) diff --git a/unregister_pacsfiles.sh b/unregister_pacsfiles.sh new file mode 100755 index 00000000..9b35f185 --- /dev/null +++ b/unregister_pacsfiles.sh @@ -0,0 +1,15 @@ +#!/usr/bin/env bash + +docker compose -f docker-compose_dev.yml exec chris_dev pip install tqdm +docker compose -f docker-compose_dev.yml exec chris_dev python manage.py shell -c ' +from tqdm import tqdm +from pacsfiles.models import PACSSeries, PACSFile + +with tqdm(PACSFile.objects.all()) as pbar: + for pacs_file in pbar: + _ = pacs_file.delete() + +with tqdm(PACSSeries) as pbar: + for pacs_series in pbar: + _ = pacs_series.delete() +' From 4fdc5a148d6fb44f47fde8d270393620d21b7949 Mon Sep 17 00:00:00 2001 From: Jennings Zhang Date: Sun, 8 Sep 2024 16:00:06 -0400 Subject: [PATCH 05/11] Add oxidicom and nats --- docker-compose_just.yml | 44 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/docker-compose_just.yml b/docker-compose_just.yml index 4ad7bb4d..e2bd280c 100755 --- a/docker-compose_just.yml +++ b/docker-compose_just.yml @@ -100,6 +100,12 @@ services: image: docker.io/library/rabbitmq:3 volumes: - rabbitmq:/var/lib/rabbitmq + healthcheck: + test: rabbitmq-diagnostics -q ping + start_period: 20s + retries: 3 + timeout: 10s + interval: 5s networks: local: aliases: @@ -148,6 +154,44 @@ services: - local userns_mode: "host" + oxidicom: + image: ghcr.io/fnndsc/oxidicom:pr-3 # https://github.com/FNNDSC/oxidicom/pull/3 + ports: + - "11111:11111" + volumes: + - chris_files:/data + # https://chrisproject.org/docs/oxidicom#environment-variables + environment: + OXIDICOM_AMQP_ADDRESS: "amqp://queue:5672" + OXIDICOM_QUEUE_NAME: main2 + OXIDICOM_NATS_ADDRESS: "nats:4222" + OXIDICOM_PROGRESS_INTERVAL: "100ms" + OXIDICOM_FILES_ROOT: /data + OXIDICOM_LISTENER_PORT: 11111 + OXIDICOM_SCP_AET: CUBEDEVOXIDE + OXIDICOM_SCP_PROMISCUOUS: "true" + OXIDICOM_VERBOSE: "true" + user: ${UID}:${GID} + networks: + local: + depends_on: + nats: + condition: service_healthy + rabbitmq: + condition: service_healthy + nats: + image: docker.io/library/nats:2.10.20-alpine3.20 + ports: + - "4222:4222" + - "8222:8222" + networks: + local: + healthcheck: + test: wget http://localhost:8222/healthz -q -S -O - + start_period: 20s + retries: 3 + timeout: 10s + interval: 5s lldap: image: docker.io/nitnelave/lldap:stable ports: From c4ebc75470f016341e00e3e499c6125b680116e8 Mon Sep 17 00:00:00 2001 From: Jennings Zhang Date: Sun, 8 Sep 2024 20:29:26 -0400 Subject: [PATCH 06/11] Retry startup 5 times --- githubActions/main.js | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/githubActions/main.js b/githubActions/main.js index ca7d97be..242dc4f9 100755 --- a/githubActions/main.js +++ b/githubActions/main.js @@ -6,6 +6,16 @@ const JUST_COMMAND = process.env.INPUT_COMMAND; const script = ` set -x just prefer ${CONTAINER_ENGINE} + +for i in {1..5}; do + just start-ancillary && start=good && break +done + +if [ "$start" != "good" ]; then + echo "::error ::Failed to start ancillary services." + exit 1 +fi + just ${JUST_COMMAND} rc=$? if [ "$rc" != '0' ]; then From d1e3cafed1fbeec0f25309b364a03b3eda6c0d0d Mon Sep 17 00:00:00 2001 From: Jennings Zhang Date: Sun, 8 Sep 2024 20:36:04 -0400 Subject: [PATCH 07/11] Add links to https://github.com/FNNDSC/ChRIS_ultron_backEnd/issues/573 --- README.md | 2 ++ githubActions/main.js | 3 +++ 2 files changed, 5 insertions(+) diff --git a/README.md b/README.md index 657a460e..e6c142f5 100755 --- a/README.md +++ b/README.md @@ -74,6 +74,8 @@ just prefer podman # or just prefer docker ``` +With Podman, RabbitMQ might fail to start. Simply retry the command. See https://github.com/FNNDSC/ChRIS_ultron_backEnd/issues/573 + ### Just Commands diff --git a/githubActions/main.js b/githubActions/main.js index 242dc4f9..423385df 100755 --- a/githubActions/main.js +++ b/githubActions/main.js @@ -7,8 +7,11 @@ const script = ` set -x just prefer ${CONTAINER_ENGINE} +# start-up is being retried as a workaround for +# https://github.com/FNNDSC/ChRIS_ultron_backEnd/issues/573 for i in {1..5}; do just start-ancillary && start=good && break + echo "::warning ::Ancillary services failed to start. Attempt=$i" done if [ "$start" != "good" ]; then From b68145b9173c242364d1f27ce95a4a9b6292724b Mon Sep 17 00:00:00 2001 From: Jennings Zhang Date: Sun, 8 Sep 2024 20:40:53 -0400 Subject: [PATCH 08/11] Remove rabbitmq healthcheck and oxidicom Works around https://github.com/FNNDSC/ChRIS_ultron_backEnd/issues/573 by avoiding the problem --- docker-compose_just.yml | 31 ------------------------------- 1 file changed, 31 deletions(-) diff --git a/docker-compose_just.yml b/docker-compose_just.yml index e2bd280c..f46fd2a2 100755 --- a/docker-compose_just.yml +++ b/docker-compose_just.yml @@ -100,12 +100,6 @@ services: image: docker.io/library/rabbitmq:3 volumes: - rabbitmq:/var/lib/rabbitmq - healthcheck: - test: rabbitmq-diagnostics -q ping - start_period: 20s - retries: 3 - timeout: 10s - interval: 5s networks: local: aliases: @@ -154,31 +148,6 @@ services: - local userns_mode: "host" - oxidicom: - image: ghcr.io/fnndsc/oxidicom:pr-3 # https://github.com/FNNDSC/oxidicom/pull/3 - ports: - - "11111:11111" - volumes: - - chris_files:/data - # https://chrisproject.org/docs/oxidicom#environment-variables - environment: - OXIDICOM_AMQP_ADDRESS: "amqp://queue:5672" - OXIDICOM_QUEUE_NAME: main2 - OXIDICOM_NATS_ADDRESS: "nats:4222" - OXIDICOM_PROGRESS_INTERVAL: "100ms" - OXIDICOM_FILES_ROOT: /data - OXIDICOM_LISTENER_PORT: 11111 - OXIDICOM_SCP_AET: CUBEDEVOXIDE - OXIDICOM_SCP_PROMISCUOUS: "true" - OXIDICOM_VERBOSE: "true" - user: ${UID}:${GID} - networks: - local: - depends_on: - nats: - condition: service_healthy - rabbitmq: - condition: service_healthy nats: image: docker.io/library/nats:2.10.20-alpine3.20 ports: From 596f422dbb3e91bfcf43d42ea6703a24fe457238 Mon Sep 17 00:00:00 2001 From: Jennings Zhang Date: Sun, 8 Sep 2024 21:43:46 -0400 Subject: [PATCH 09/11] Test websocket connection and auth --- chris_backend/core/websockets/urls.py | 2 +- chris_backend/pacsfiles/consumers.py | 41 +++++++------- .../pacsfiles/tests/test_consumers.py | 54 +++++++++++++++++++ 3 files changed, 78 insertions(+), 19 deletions(-) create mode 100644 chris_backend/pacsfiles/tests/test_consumers.py diff --git a/chris_backend/core/websockets/urls.py b/chris_backend/core/websockets/urls.py index 3b315b28..f7217b3f 100755 --- a/chris_backend/core/websockets/urls.py +++ b/chris_backend/core/websockets/urls.py @@ -6,6 +6,6 @@ from pacsfiles import consumers websocket_urlpatterns = [ - re_path(r'v1/pacs/progress/', + re_path(r'v1/pacs/ws/', consumers.PACSFileProgress.as_asgi()), ] \ No newline at end of file diff --git a/chris_backend/pacsfiles/consumers.py b/chris_backend/pacsfiles/consumers.py index bb6840f6..7acfec40 100755 --- a/chris_backend/pacsfiles/consumers.py +++ b/chris_backend/pacsfiles/consumers.py @@ -1,21 +1,31 @@ -import json - -import channels.exceptions -from channels.generic.websocket import WebsocketConsumer +import rest_framework.permissions +from channels.db import database_sync_to_async +from channels.generic.websocket import AsyncJsonWebsocketConsumer from rest_framework import permissions from pacsfiles.permissions import IsChrisOrIsPACSUserReadOnly -class PACSFileProgress(WebsocketConsumer): +class PACSFileProgress(AsyncJsonWebsocketConsumer): + """ + A WebSockets endpoint which relays progress messages from NATS sent by *oxidicom* to a client. + """ permission_classes = (permissions.IsAuthenticated, IsChrisOrIsPACSUserReadOnly,) - def connect(self): - if not self._has_permission(): - raise channels.exceptions.DenyConnection() - self.accept() + async def connect(self): + if not await self._has_permission(): + await self.close() + else: + await self.accept() + + async def receive_json(self, content, **kwargs): + ... + async def disconnect(self, code): + ... + + @database_sync_to_async def _has_permission(self) -> bool: """ Manual permissions check. @@ -26,16 +36,11 @@ def _has_permission(self) -> bool: self.user = self.scope.get('user', None) if self.user is None: return False + if getattr(self, 'method', None) is None: + # make it work with ``IsChrisOrIsPACSUserReadOnly`` + self.method = rest_framework.permissions.SAFE_METHODS[0] + return all( permission().has_permission(self, self.__class__) for permission in self.permission_classes ) - - def disconnect(self, close_code): - pass - - def receive(self, text_data): - text_data_json = json.loads(text_data) - message = text_data_json["message"] - - self.send(text_data=json.dumps({"message": message})) \ No newline at end of file diff --git a/chris_backend/pacsfiles/tests/test_consumers.py b/chris_backend/pacsfiles/tests/test_consumers.py new file mode 100644 index 00000000..58baa41c --- /dev/null +++ b/chris_backend/pacsfiles/tests/test_consumers.py @@ -0,0 +1,54 @@ +import jwt +from channels.db import database_sync_to_async +from django.conf import settings +from django.contrib.auth.models import User, Group + +# note: use TransactionTestCase instead of TestCase for async tests that speak to DB. +# See https://stackoverflow.com/a/71763849 +from django.test import TransactionTestCase + +from channels.testing import WebsocketCommunicator +from django.utils import timezone + +from core.models import FileDownloadToken +from core.websockets.auth import TokenQsAuthMiddleware + +from pacsfiles.consumers import PACSFileProgress + + +class PACSFileProgressTests(TransactionTestCase): + + def setUp(self): + self.username = 'PintoGideon' + self.password = 'gideon1234' + self.email = 'gideon@example.org' + self.user = User.objects.create_user(username=self.username, + email=self.email, + password=self.password) + pacs_grp, _ = Group.objects.get_or_create(name='pacs_users') + self.user.groups.set([pacs_grp]) + self.user.save() + + async def test_my_consumer(self): + token = await self._get_download_token() + app = TokenQsAuthMiddleware(PACSFileProgress.as_asgi()) + communicator = WebsocketCommunicator(app, f'v1/pacs/ws/?token={token.token}') + connected, subprotocol = await communicator.connect() + assert connected + + async def test_unauthenticated_not_connected(self): + app = TokenQsAuthMiddleware(PACSFileProgress.as_asgi()) + communicator = WebsocketCommunicator(app, 'v1/pacs/ws/') # no token + connected, subprotocol = await communicator.connect() + assert not connected + + @database_sync_to_async + def _get_download_token(self) -> FileDownloadToken: + """ + Copy-pasted from + https://github.com/FNNDSC/ChRIS_ultron_backEnd/blob/7bcccc2031386955875ef4e9758025577f5ee067/chris_backend/userfiles/tests/test_views.py#L210-L213 + """ + dt = timezone.now() + timezone.timedelta(minutes=10) + token = jwt.encode({'user': self.user.username, 'exp': dt}, settings.SECRET_KEY, + algorithm='HS256') + return FileDownloadToken.objects.create(token=token, owner=self.user) \ No newline at end of file From 66a770420680941e5e5686d6764be2f33f9bf5d1 Mon Sep 17 00:00:00 2001 From: Jennings Zhang Date: Sun, 8 Sep 2024 21:50:09 -0400 Subject: [PATCH 10/11] Add NATS_ADDRESS environment variable --- chris_backend/config/settings/local.py | 3 +++ chris_backend/config/settings/production.py | 5 +++++ docker-compose_just.yml | 2 ++ requirements/base.txt | 1 + 4 files changed, 11 insertions(+) diff --git a/chris_backend/config/settings/local.py b/chris_backend/config/settings/local.py index 296cd494..2cac4fa0 100755 --- a/chris_backend/config/settings/local.py +++ b/chris_backend/config/settings/local.py @@ -161,6 +161,9 @@ CORS_EXPOSE_HEADERS = ['Allow', 'Content-Type', 'Content-Length'] +# NATS settings +NATS_ADDRESS = 'nats://nats:4222' + # Celery settings #CELERY_BROKER_URL = 'amqp://guest:guest@localhost' diff --git a/chris_backend/config/settings/production.py b/chris_backend/config/settings/production.py index 94f04bcc..2e9da7f9 100755 --- a/chris_backend/config/settings/production.py +++ b/chris_backend/config/settings/production.py @@ -128,6 +128,11 @@ def get_secret(setting, secret_type=env): CORS_ALLOWED_ORIGINS = get_secret('DJANGO_CORS_ALLOWED_ORIGINS', env.list) +# NATS SETTINGS +# ------------------------------------------------------------------------------ +NATS_ADDRESS = get_secret('NATS_ADDRESS') + + # CELERY SETTINGS # ------------------------------------------------------------------------------ CELERY_BROKER_URL = get_secret('CELERY_BROKER_URL') diff --git a/docker-compose_just.yml b/docker-compose_just.yml index f46fd2a2..f3ea8c62 100755 --- a/docker-compose_just.yml +++ b/docker-compose_just.yml @@ -39,6 +39,8 @@ services: condition: service_healthy rabbitmq: condition: service_started + nats: + condition: service_started cube-nonroot-user-volume-fix: condition: service_completed_successfully networks: diff --git a/requirements/base.txt b/requirements/base.txt index e4131083..6014b949 100755 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -15,3 +15,4 @@ PyYAML==6.0.1 whitenoise[brotli]==6.5.0 PyJWT===2.8.0 channels[daphne]==4.1.0 +nats-py==2.9.0 \ No newline at end of file From 4c12c3bc557c597c5b467c128d6113e3aa86fb99 Mon Sep 17 00:00:00 2001 From: Jennings Zhang Date: Tue, 10 Sep 2024 17:02:17 -0400 Subject: [PATCH 11/11] Implement LONK-WS and test --- chris_backend/pacsfiles/consumers.py | 54 ++++- chris_backend/pacsfiles/lonk.py | 209 ++++++++++++++++++ chris_backend/pacsfiles/tests/mocks.py | 47 ++++ .../pacsfiles/tests/test_consumers.py | 89 +++++++- 4 files changed, 383 insertions(+), 16 deletions(-) create mode 100644 chris_backend/pacsfiles/lonk.py create mode 100644 chris_backend/pacsfiles/tests/mocks.py diff --git a/chris_backend/pacsfiles/consumers.py b/chris_backend/pacsfiles/consumers.py index 7acfec40..7ce50662 100755 --- a/chris_backend/pacsfiles/consumers.py +++ b/chris_backend/pacsfiles/consumers.py @@ -1,8 +1,15 @@ import rest_framework.permissions from channels.db import database_sync_to_async from channels.generic.websocket import AsyncJsonWebsocketConsumer +from django.conf import settings from rest_framework import permissions +from pacsfiles.lonk import ( + LonkClient, + validate_subscription, + LonkWsSubscription, + Lonk, +) from pacsfiles.permissions import IsChrisOrIsPACSUserReadOnly @@ -11,19 +18,54 @@ class PACSFileProgress(AsyncJsonWebsocketConsumer): A WebSockets endpoint which relays progress messages from NATS sent by *oxidicom* to a client. """ - permission_classes = (permissions.IsAuthenticated, IsChrisOrIsPACSUserReadOnly,) + permission_classes = ( + permissions.IsAuthenticated, + IsChrisOrIsPACSUserReadOnly, + ) async def connect(self): if not await self._has_permission(): - await self.close() - else: - await self.accept() + return await self.close() + self.client: LonkClient = await LonkClient.connect( + settings.NATS_ADDRESS + ) + await self.accept() async def receive_json(self, content, **kwargs): - ... + if validate_subscription(content): + await self._subscribe( + content['pacs_name'], content['SeriesInstanceUID'] + ) + return + await self.close(code=400, reason='Invalid subscription') + + async def _subscribe(self, pacs_name: str, series_instance_uid: str): + """ + Subscribe to progress notifications about the reception of a DICOM series. + """ + try: + await self.client.subscribe( + pacs_name, series_instance_uid, lambda msg: self.send_json(msg) + ) + response = Lonk( + pacs_name=pacs_name, + SeriesInstanceUID=series_instance_uid, + message=LonkWsSubscription(subscription='subscribed'), + ) + await self.send_json(response) + except Exception as e: + response = Lonk( + pacs_name=pacs_name, + SeriesInstanceUID=series_instance_uid, + message=LonkWsSubscription(subscription='error'), + ) + await self.send_json(response) + await self.close(code=500) + raise e async def disconnect(self, code): - ... + await super().disconnect(code) + await self.client.close() @database_sync_to_async def _has_permission(self) -> bool: diff --git a/chris_backend/pacsfiles/lonk.py b/chris_backend/pacsfiles/lonk.py new file mode 100644 index 00000000..b4dc51bd --- /dev/null +++ b/chris_backend/pacsfiles/lonk.py @@ -0,0 +1,209 @@ +""" +Implementation of the "Light Oxidicom NotifiKations Encoding" + +See https://chrisproject.org/docs/oxidicom/lonk +""" +import asyncio +import enum +from sys import byteorder +from typing import ( + Self, + Callable, + TypedDict, + Literal, + TypeGuard, + Any, + Awaitable, +) + +import nats +from nats import NATS +from nats.aio.subscription import Subscription +from nats.aio.msg import Msg + + +class SubscriptionRequest(TypedDict): + """ + A request to subscribe to LONK notifications about a DICOM series. + """ + + pacs_name: str + SeriesInstanceUID: str + action: Literal['subscribe'] + + +def validate_subscription(data: Any) -> TypeGuard[SubscriptionRequest]: + if not isinstance(data, dict): + return False + return ( + data.get('action', None) == 'subscribe' + and isinstance(data.get('SeriesInstanceUID', None), str) + and isinstance(data.get('pacs_name', None), str) + ) + + +class LonkProgress(TypedDict): + """ + LONK "done" message. + + https://chrisproject.org/docs/oxidicom/lonk#lonk-message-encoding + """ + + ndicom: int + + +class LonkError(TypedDict): + """ + LONK "error" message. + + https://chrisproject.org/docs/oxidicom/lonk#lonk-message-encoding + """ + + error: str + + +class LonkDone(TypedDict): + """ + LONK "done" message. + + https://chrisproject.org/docs/oxidicom/lonk#lonk-message-encoding + """ + + done: bool + + +class LonkWsSubscription(TypedDict): + """ + LONK-WS "subscribed" message. + + https://chrisproject.org/docs/oxidicom/lonk-ws#lonk-ws-subscription + """ + + subscription: Literal['subscribed', 'error'] + + +LonkMessageData = LonkProgress | LonkError | LonkDone | LonkWsSubscription +""" +Lonk message data. + +https://chrisproject.org/docs/oxidicom/lonk-ws#messages +""" + + +class Lonk(TypedDict): + """ + Serialized LONK message about a DICOM series. + + https://chrisproject.org/docs/oxidicom#lonk-message-encoding + """ + + SeriesInstanceUID: str + pacs_name: str + message: LonkMessageData + + +class LonkClient: + """ + "Light Oxidicom NotifiKations Encoding" client: + A client for the messages sent by *oxidicom* over NATS. + + https://chrisproject.org/docs/oxidicom/lonk + """ + + def __init__(self, nc: NATS): + self._nc = nc + self._subscriptions: list[Subscription] = [] + + @classmethod + async def connect(cls, servers: str | list[str]) -> Self: + return cls(await nats.connect(servers)) + + async def subscribe( + self, + pacs_name: str, + series_instance_uid: str, + cb: Callable[[Lonk], Awaitable[None]], + ): + subject = subject_of(pacs_name, series_instance_uid) + cb = _curry_message2json(pacs_name, series_instance_uid, cb) + subscription = await self._nc.subscribe(subject, cb=cb) + self._subscriptions.append(subscription) + return subscription + + async def close(self): + await asyncio.gather(*(s.unsubscribe() for s in self._subscriptions)) + await self._nc.close() + + +def subject_of(pacs_name: str, series_instance_uid: str) -> str: + """ + Get the NATS subject for a series. + + Equivalent to https://github.com/FNNDSC/oxidicom/blob/33838f22a5431a349b3b83a313035b8e22d16bb1/src/lonk.rs#L36-L48 + """ + return f'oxidicom.{_sanitize_topic_part(pacs_name)}.{_sanitize_topic_part(series_instance_uid)}' + + +def _sanitize_topic_part(s: str) -> str: + return ( + s.replace('\0', '') + .replace(' ', '_') + .replace('.', '_') + .replace('*', '_') + .replace('>', '_') + ) + + +def _message2json( + pacs_name: str, series_instance_uid: str, message: Msg +) -> Lonk: + return Lonk( + pacs_name=pacs_name, + SeriesInstanceUID=series_instance_uid, + message=_serialize_to_lonkws(message.data), + ) + + +def _curry_message2json( + pacs_name: str, + series_instance_uid: str, + cb: Callable[[Lonk], Awaitable[None]], +): + async def nats_callback(message: Msg): + lonk = _message2json(pacs_name, series_instance_uid, message) + await cb(lonk) + + return nats_callback + + +@enum.unique +class LonkMagicByte(enum.IntEnum): + """ + LONK message first magic byte. + """ + + DONE = 0x00 + PROGRESS = 0x01 + ERROR = 0x02 + + +def _serialize_to_lonkws(payload: bytes) -> LonkMessageData: + """ + Translate LONK binary encoding to LONK-WS JSON. + """ + if len(payload) == 0: + raise ValueError('Empty message') + data = payload[1:] + + match payload[0]: + case LonkMagicByte.DONE.value: + return LonkDone(done=True) + case LonkMagicByte.PROGRESS.value: + ndicom = int.from_bytes(data, 'little', signed=False) + return LonkProgress(ndicom=ndicom) + case LonkMagicByte.ERROR.value: + error = data.decode(encoding='utf-8') + return LonkError(error=error) + case _: + hexstr = ' '.join(hex(b) for b in payload) + raise ValueError(f'Unrecognized message: {hexstr}') diff --git a/chris_backend/pacsfiles/tests/mocks.py b/chris_backend/pacsfiles/tests/mocks.py new file mode 100644 index 00000000..4f4825bf --- /dev/null +++ b/chris_backend/pacsfiles/tests/mocks.py @@ -0,0 +1,47 @@ +from typing import Self + +import nats +from nats import NATS + +from pacsfiles import lonk +from pacsfiles.lonk import LonkMagicByte + + +class Mockidicom: + """ + A mock *oxidicom* which sends LONK messages to NATS. + + Somewhat similar to https://github.com/FNNDSC/oxidicom/blob/e6bb83d1ea2fbaf5bb4af7dbf518a4b1a2957f2d/src/lonk.rs + """ + + def __init__(self, nc: NATS): + self._nc = nc + + @classmethod + async def connect(cls, servers: str | list[str]) -> Self: + nc = await nats.connect(servers) + return cls(nc) + + async def send_progress( + self, pacs_name: str, SeriesInstanceUID: str, ndicom: int + ): + subject = lonk.subject_of(pacs_name, SeriesInstanceUID) + u32 = ndicom.to_bytes(length=4, byteorder='little', signed=False) + data = LonkMagicByte.PROGRESS.value.to_bytes() + u32 + await self._nc.publish(subject, data) + + async def send_done(self, pacs_name: str, SeriesInstanceUID: str): + subject = lonk.subject_of(pacs_name, SeriesInstanceUID) + await self._nc.publish(subject, LonkMagicByte.DONE.value.to_bytes()) + + async def send_error( + self, pacs_name: str, SeriesInstanceUID: str, error: str + ): + subject = lonk.subject_of(pacs_name, SeriesInstanceUID) + data = LonkMagicByte.ERROR.value.to_bytes() + error.encode( + encoding='utf-8' + ) + await self._nc.publish(subject, data) + + async def close(self): + self._nc.close() diff --git a/chris_backend/pacsfiles/tests/test_consumers.py b/chris_backend/pacsfiles/tests/test_consumers.py index 58baa41c..458992a7 100644 --- a/chris_backend/pacsfiles/tests/test_consumers.py +++ b/chris_backend/pacsfiles/tests/test_consumers.py @@ -5,7 +5,7 @@ # note: use TransactionTestCase instead of TestCase for async tests that speak to DB. # See https://stackoverflow.com/a/71763849 -from django.test import TransactionTestCase +from django.test import TransactionTestCase, tag from channels.testing import WebsocketCommunicator from django.utils import timezone @@ -13,29 +13,95 @@ from core.models import FileDownloadToken from core.websockets.auth import TokenQsAuthMiddleware +from pacsfiles.lonk import ( + SubscriptionRequest, + Lonk, + LonkWsSubscription, + LonkProgress, + LonkDone, + LonkError, +) from pacsfiles.consumers import PACSFileProgress +from pacsfiles.tests.mocks import Mockidicom class PACSFileProgressTests(TransactionTestCase): - def setUp(self): self.username = 'PintoGideon' self.password = 'gideon1234' self.email = 'gideon@example.org' - self.user = User.objects.create_user(username=self.username, - email=self.email, - password=self.password) + self.user = User.objects.create_user( + username=self.username, email=self.email, password=self.password + ) pacs_grp, _ = Group.objects.get_or_create(name='pacs_users') self.user.groups.set([pacs_grp]) self.user.save() - async def test_my_consumer(self): + @tag('integration') + async def test_lonk_ws(self): token = await self._get_download_token() app = TokenQsAuthMiddleware(PACSFileProgress.as_asgi()) - communicator = WebsocketCommunicator(app, f'v1/pacs/ws/?token={token.token}') + communicator = WebsocketCommunicator( + app, f'v1/pacs/ws/?token={token.token}' + ) connected, subprotocol = await communicator.connect() assert connected + oxidicom: Mockidicom = await Mockidicom.connect(settings.NATS_ADDRESS) + + series1 = {'pacs_name': 'MyPACS', 'SeriesInstanceUID': '1.234.567890'} + subscription_request = SubscriptionRequest( + action='subscribe', **series1 + ) + await communicator.send_json_to(subscription_request) + self.assertEqual( + await communicator.receive_json_from(), + Lonk( + message=LonkWsSubscription(subscription='subscribed'), + **series1, + ), + ) + series2 = {'pacs_name': 'MyPACS', 'SeriesInstanceUID': '5.678.90123'} + subscription_request = SubscriptionRequest( + action='subscribe', **series2 + ) + await communicator.send_json_to(subscription_request) + self.assertEqual( + await communicator.receive_json_from(), + Lonk( + message=LonkWsSubscription(subscription='subscribed'), + **series2, + ), + ) + + await oxidicom.send_progress(ndicom=1, **series1) + self.assertEqual( + await communicator.receive_json_from(), + Lonk(message=LonkProgress(ndicom=1), **series1), + ) + await oxidicom.send_progress(ndicom=115, **series1) + self.assertEqual( + await communicator.receive_json_from(), + Lonk(message=LonkProgress(ndicom=115), **series1), + ) + + await oxidicom.send_error(error='stuck in chimney', **series2) + self.assertEqual( + await communicator.receive_json_from(), + Lonk(message=LonkError(error='stuck in chimney'), **series2), + ) + + await oxidicom.send_progress(ndicom=192, **series1) + self.assertEqual( + await communicator.receive_json_from(), + Lonk(message=LonkProgress(ndicom=192), **series1), + ) + await oxidicom.send_done(**series1) + self.assertEqual( + await communicator.receive_json_from(), + Lonk(message=LonkDone(done=True), **series1), + ) + async def test_unauthenticated_not_connected(self): app = TokenQsAuthMiddleware(PACSFileProgress.as_asgi()) communicator = WebsocketCommunicator(app, 'v1/pacs/ws/') # no token @@ -49,6 +115,9 @@ def _get_download_token(self) -> FileDownloadToken: https://github.com/FNNDSC/ChRIS_ultron_backEnd/blob/7bcccc2031386955875ef4e9758025577f5ee067/chris_backend/userfiles/tests/test_views.py#L210-L213 """ dt = timezone.now() + timezone.timedelta(minutes=10) - token = jwt.encode({'user': self.user.username, 'exp': dt}, settings.SECRET_KEY, - algorithm='HS256') - return FileDownloadToken.objects.create(token=token, owner=self.user) \ No newline at end of file + token = jwt.encode( + {'user': self.user.username, 'exp': dt}, + settings.SECRET_KEY, + algorithm='HS256', + ) + return FileDownloadToken.objects.create(token=token, owner=self.user)