Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PACSSeries registration over celery task and WebSockets for streaming DICOM receive progress messages #572

Merged
merged 14 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ COPY chris_backend/ ./
RUN if [ "$ENVIRONMENT" = "production" ]; then \
env DJANGO_SETTINGS_MODULE=config.settings.common ./manage.py collectstatic; fi

CMD ["gunicorn", "-b", "0.0.0.0:8000", "-w", "4", "config.wsgi:application"]
CMD ["daphne", "-p", "8000", "config.asgi:application"]

HEALTHCHECK --interval=30s --timeout=5s \
CMD curl -f http://localhost:8000/api/v1/users/ || exit 1
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ just prefer podman # or
just prefer docker
```

With Podman, RabbitMQ might fail to start. Simply retry the command. See https://github.com/FNNDSC/ChRIS_ultron_backEnd/issues/573

</details>

### Just Commands
Expand Down
15 changes: 14 additions & 1 deletion chris_backend/config/asgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,22 @@
import os, sys
sys.path.append(os.path.dirname(os.path.dirname(__file__)))

from core.websockets.urls import websocket_urlpatterns
from core.websockets.auth import TokenQsAuthMiddleware

from channels.routing import ProtocolTypeRouter, URLRouter
from django.core.asgi import get_asgi_application
from channels.security.websocket import AllowedHostsOriginValidator


os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.production')

application = get_asgi_application()
django_asgi_app = get_asgi_application()

# see https://channels.readthedocs.io/en/3.x/installation.html
application = ProtocolTypeRouter({
'http': django_asgi_app,
'websocket': AllowedHostsOriginValidator(
TokenQsAuthMiddleware(URLRouter(websocket_urlpatterns))
),
})
3 changes: 2 additions & 1 deletion chris_backend/config/settings/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

# Application definition
INSTALLED_APPS = [
'daphne',
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
Expand Down Expand Up @@ -99,7 +100,7 @@
},
]

WSGI_APPLICATION = 'config.wsgi.application'
ASGI_APPLICATION = 'config.asgi.application'


# Database
Expand Down
3 changes: 3 additions & 0 deletions chris_backend/config/settings/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@
CORS_EXPOSE_HEADERS = ['Allow', 'Content-Type', 'Content-Length']


# NATS settings
NATS_ADDRESS = 'nats://nats:4222'

# Celery settings

#CELERY_BROKER_URL = 'amqp://guest:guest@localhost'
Expand Down
5 changes: 5 additions & 0 deletions chris_backend/config/settings/production.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ def get_secret(setting, secret_type=env):
CORS_ALLOWED_ORIGINS = get_secret('DJANGO_CORS_ALLOWED_ORIGINS', env.list)


# NATS SETTINGS
# ------------------------------------------------------------------------------
NATS_ADDRESS = get_secret('NATS_ADDRESS')


# CELERY SETTINGS
# ------------------------------------------------------------------------------
CELERY_BROKER_URL = get_secret('CELERY_BROKER_URL')
Expand Down
18 changes: 0 additions & 18 deletions chris_backend/config/wsgi.py

This file was deleted.

1 change: 1 addition & 0 deletions chris_backend/core/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
{'queue': 'periodic'},
'plugininstances.tasks.cancel_waiting_plugin_instances':
{'queue': 'periodic'},
'pacsfiles.tasks.register_pacs_series': {'queue': 'main2'}
}
app.conf.update(task_routes=task_routes)

Expand Down
2 changes: 1 addition & 1 deletion chris_backend/core/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def __call__(self, request):
return self.get_response(request)

def process_exception(self, request, exception):
print(exception)
print(exception, flush=True)
mime = request.META.get('HTTP_ACCEPT')
if mime != 'text/html':
return api_500(request)
54 changes: 27 additions & 27 deletions chris_backend/core/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,31 +124,31 @@ def authenticate(self, request):
# Check if 'download_token' is in the request query params
if 'download_token' in request.query_params:
token = request.query_params['download_token']
err_msg = f'Invalid file download token: {token}'

try:
info = jwt.decode(token, settings.SECRET_KEY, algorithms=['HS256'])
except jwt.ExpiredSignatureError:
err_msg = f'Expired file download token: {token}'
logger.error(err_msg)
raise exceptions.AuthenticationFailed(err_msg)
except jwt.InvalidTokenError:
logger.error(err_msg)
raise exceptions.AuthenticationFailed(err_msg)

try:
user = User.objects.get(username=info['user'])
except User.DoesNotExist:
logger.error(err_msg)
raise exceptions.AuthenticationFailed(err_msg)

try:
token_obj = FileDownloadToken.objects.get(owner=user, token=token)
except FileDownloadToken.DoesNotExist:
logger.error(err_msg)
raise exceptions.AuthenticationFailed(err_msg)

token_obj.delete() # one-time-use token, we could instead set revoked=true
return user, None

return authenticate_token(token), None
return super(TokenAuthSupportQueryString, self).authenticate(request)


def authenticate_token(token: str) -> User:
err_msg = f'Invalid file download token: {token}'
try:
info = jwt.decode(token, settings.SECRET_KEY, algorithms=['HS256'])
except jwt.ExpiredSignatureError:
err_msg = f'Expired file download token: {token}'
logger.error(err_msg)
raise exceptions.AuthenticationFailed(err_msg)
except jwt.InvalidTokenError:
logger.error(err_msg)
raise exceptions.AuthenticationFailed(err_msg)

try:
user = User.objects.get(username=info['user'])
except User.DoesNotExist:
logger.error(err_msg)
raise exceptions.AuthenticationFailed(err_msg)

token_obj = FileDownloadToken.objects.filter(owner=user, token=token).first()
if token_obj is None:
raise exceptions.AuthenticationFailed(err_msg)

token_obj.delete() # one-time-use token, we could instead set revoked=true
return user
Empty file.
53 changes: 53 additions & 0 deletions chris_backend/core/websockets/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""
Websockets authentication.

Notes
-----

``channels.auth.AuthMiddlewareStack`` depends on HTTP headers, however HTTP headers
cannot be set for websockets in the web browser. https://stackoverflow.com/a/4361358

A common pattern is to put a token in the query string. We will re-use the file "downloadtokens"
for this purpose.
"""

import urllib.parse
from typing import AnyStr

from channels.db import database_sync_to_async
from django.contrib.auth.models import User
from rest_framework.exceptions import AuthenticationFailed

from core.views import authenticate_token


class TokenQsAuthMiddleware:
"""
Authenticate the request using :class:`TokenAuthSupportQueryString`.

Based on
https://channels.readthedocs.io/en/3.x/topics/authentication.html#custom-authentication
"""

def __init__(self, app):
self.app = app

async def __call__(self, scope, receive, send):
scope['user'] = await self._authenticate(scope)
return await self.app(scope, receive, send)

@database_sync_to_async
def _authenticate(self, scope) -> User | None:
params = _parse_qs_last_string(scope.get('query_string', b''))
if token := params.get('token', None):
try:
return authenticate_token(token)
except AuthenticationFailed:
return None
return None


def _parse_qs_last_string(qs: AnyStr, encoding='utf-8') -> dict[str, str]:
if isinstance(qs, bytes):
qs = qs.decode(encoding=encoding)
return {k: v[-1] for k, v in urllib.parse.parse_qs(qs).items()}
11 changes: 11 additions & 0 deletions chris_backend/core/websockets/urls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
"""
Async routes.
"""

from django.urls import re_path
from pacsfiles import consumers

websocket_urlpatterns = [
re_path(r'v1/pacs/ws/',
consumers.PACSFileProgress.as_asgi()),
]
88 changes: 88 additions & 0 deletions chris_backend/pacsfiles/consumers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import rest_framework.permissions
from channels.db import database_sync_to_async
from channels.generic.websocket import AsyncJsonWebsocketConsumer
from django.conf import settings
from rest_framework import permissions

from pacsfiles.lonk import (
LonkClient,
validate_subscription,
LonkWsSubscription,
Lonk,
)
from pacsfiles.permissions import IsChrisOrIsPACSUserReadOnly


class PACSFileProgress(AsyncJsonWebsocketConsumer):
"""
A WebSockets endpoint which relays progress messages from NATS sent by *oxidicom* to a client.
"""

permission_classes = (
permissions.IsAuthenticated,
IsChrisOrIsPACSUserReadOnly,
)

async def connect(self):
if not await self._has_permission():
return await self.close()
self.client: LonkClient = await LonkClient.connect(
settings.NATS_ADDRESS
)
await self.accept()

async def receive_json(self, content, **kwargs):
if validate_subscription(content):
await self._subscribe(
content['pacs_name'], content['SeriesInstanceUID']
)
return
await self.close(code=400, reason='Invalid subscription')

async def _subscribe(self, pacs_name: str, series_instance_uid: str):
"""
Subscribe to progress notifications about the reception of a DICOM series.
"""
try:
await self.client.subscribe(
pacs_name, series_instance_uid, lambda msg: self.send_json(msg)
)
response = Lonk(
pacs_name=pacs_name,
SeriesInstanceUID=series_instance_uid,
message=LonkWsSubscription(subscription='subscribed'),
)
await self.send_json(response)
except Exception as e:
response = Lonk(
pacs_name=pacs_name,
SeriesInstanceUID=series_instance_uid,
message=LonkWsSubscription(subscription='error'),
)
await self.send_json(response)
await self.close(code=500)
raise e

async def disconnect(self, code):
await super().disconnect(code)
await self.client.close()

@database_sync_to_async
def _has_permission(self) -> bool:
"""
Manual permissions check.

django-channels is going to handle authentication for us,
but we need to implement permissions ourselves.
"""
self.user = self.scope.get('user', None)
if self.user is None:
return False
if getattr(self, 'method', None) is None:
# make it work with ``IsChrisOrIsPACSUserReadOnly``
self.method = rest_framework.permissions.SAFE_METHODS[0]

return all(
permission().has_permission(self, self.__class__)
for permission in self.permission_classes
)
Loading
Loading