diff --git a/.github/workflows/python-ci.yml b/.github/workflows/python-ci.yml index e155cfa..5bef899 100644 --- a/.github/workflows/python-ci.yml +++ b/.github/workflows/python-ci.yml @@ -4,32 +4,34 @@ on: push jobs: test: - - runs-on: ubuntu-20.04 + runs-on: ubuntu-24.04 strategy: matrix: - python-version: ['3.8', '3.9', '3.10'] + python-version: ["3.9", "3.10"] steps: - - uses: actions/checkout@v2 - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 - with: - python-version: ${{ matrix.python-version }} - - name: Install dependencies - env: - PIP_INDEX_URL: https://pypi.pacificclimate.org/simple - run: | - sudo apt-get update - sudo apt-get install libgit2-dev libudunits2-dev libnetcdf-dev libcurl4-openssl-dev libssl-dev libfontconfig1-dev libxml2-dev libharfbuzz-dev libfribidi-dev libfreetype6-dev libpng-dev libtiff5-dev libjpeg-dev - pip install -r requirements.txt - pip install -e .[dev] - Rscript install_pkgs.R r_requirements.txt - - name: Test with pytest (full) - if: github.ref == 'refs/heads/master' - run: | - py.test -m "not online" -v - - name: Test with pytest (fast) - if: github.ref != 'refs/heads/master' - run: | - py.test -m "not online and not slow" -v + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + env: + PIP_INDEX_URL: https://pypi.pacificclimate.org/simple + run: | + sudo apt-get update + sudo apt-get install r-base libgit2-dev libudunits2-dev libnetcdf-dev libcurl4-openssl-dev libssl-dev libfontconfig1-dev libxml2-dev libharfbuzz-dev libfribidi-dev libfreetype6-dev libpng-dev libtiff5-dev libjpeg-dev + pip install -r requirements.txt + pip install -e .[dev] + - name: Set GITHUB_PAT env var for R + run: echo "GITHUB_PAT=${{ secrets.GITHUB_TOKEN }}" >> $GITHUB_ENV + - name: Install R packages + run: Rscript install_pkgs.R r_requirements.txt + - name: Test with pytest (full) + if: github.ref == 'refs/heads/master' + run: | + py.test -m "not online" -v + - name: Test with pytest (fast) + if: github.ref != 'refs/heads/master' + run: | + py.test -m "not online and not slow" -v diff --git a/Dockerfile b/Dockerfile index 50c9d5e..a419a39 100644 --- a/Dockerfile +++ b/Dockerfile @@ -53,10 +53,14 @@ COPY . /tmp COPY requirements.txt ./ RUN apt-get update && \ - apt-get install -y --no-install-recommends python3.8 python3-pip && \ - pip install -U pip && \ - pip install -r requirements.txt && \ - pip install gunicorn + apt-get install -y --no-install-recommends python3.9 python3-pip sqlite3 git && \ + # Use Python 3.9 explicitly + update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.9 1 && \ + update-alternatives --set python3 /usr/bin/python3.9 && \ + # Update pip for Python 3.9 + python3.9 -m pip install -U pip && \ + python3.9 -m pip install -r requirements.txt && \ + python3.9 -m pip install gunicorn EXPOSE 5000 CMD gunicorn --bind=0.0.0.0:5000 --timeout 0 chickadee.wsgi:application diff --git a/chickadee/cancel_process.py b/chickadee/cancel_process.py new file mode 100644 index 0000000..a583b3c --- /dev/null +++ b/chickadee/cancel_process.py @@ -0,0 +1,68 @@ +import json +import os +import signal +from pywps.dblog import store_status, get_session, ProcessInstance +from pywps.response.status import WPS_STATUS +from chickadee.response_tracker import get_response + + +def handle_cancel(environ, start_response): + def error(msg, code): + return _simple_json_response(start_response, {"error": msg}, code) + + if environ["REQUEST_METHOD"].upper() != "POST": + return error("Method not allowed, use POST", "405 Method Not Allowed") + + content_length = int(environ.get("CONTENT_LENGTH") or 0) + try: + body = environ["wsgi.input"].read(content_length) + data = json.loads(body.decode("utf-8")) + except (json.JSONDecodeError, UnicodeDecodeError): + return error("Invalid JSON", "400 Bad Request") + + process_uuid = data.get("uuid") + if not process_uuid: + return error("Missing 'uuid' in request body", "400 Bad Request") + + session = get_session() + try: + process = session.query(ProcessInstance).filter_by(uuid=process_uuid).first() + if not process or not process.pid: + return error("Process UUID not found or no PID recorded.", "404 Not Found") + + pid = process.pid + try: + response = get_response(process_uuid) + + if process.status in {WPS_STATUS.STARTED, WPS_STATUS.PAUSED}: + os.kill(pid, signal.SIGINT) # Graceful termination + if response: + response.clean() + store_status( + process_uuid, WPS_STATUS.FAILED, "Process cancelled by user", 100 + ) + return _simple_json_response( + start_response, + {"message": f"Process {process_uuid} (PID {pid}) cancelled."}, + "200 OK", + ) + + except ProcessLookupError: + return error(f"Process {pid} not found.", "404 Not Found") + except PermissionError: + return error(f"Permission denied to stop process {pid}.", "403 Forbidden") + + except Exception as e: + return error(f"Failed to update status: {str(e)}", "500 Internal Server Error") + finally: + session.close() + + +def _simple_json_response(start_response, data, status="200 OK"): + body = json.dumps(data).encode("utf-8") + headers = [ + ("Content-Type", "application/json; charset=utf-8"), + ("Content-Length", str(len(body))), + ] + start_response(status, headers) + return [body] diff --git a/chickadee/processes/wps_CI.py b/chickadee/processes/wps_CI.py index 362f37f..06919d8 100644 --- a/chickadee/processes/wps_CI.py +++ b/chickadee/processes/wps_CI.py @@ -8,6 +8,7 @@ from wps_tools import logging, R, io, error_handling import chickadee.utils as util import chickadee.io as chick_io +from chickadee.response_tracker import track_response, untrack_response class CI(Process): @@ -50,104 +51,116 @@ def __init__(self): ) def _handler(self, request, response): - args = io.collect_args(request.inputs, self.workdir) - ( - gcm_file, - obs_file, - output_file, - num_cores, - loglevel, - ) = util.select_args_from_input_list(args, self.handler_inputs) - - logging.log_handler( - self, - response, - "Starting Process", - util.logger, - log_level=loglevel, - process_step="start", - ) - - logging.log_handler( - self, - response, - "Importing R package 'ClimDown'", - util.logger, - log_level=loglevel, - process_step="get_ClimDown", - ) - climdown = R.get_package("ClimDown") - - logging.log_handler( - self, - response, - "Setting R options", - util.logger, - log_level=loglevel, - process_step="set_R_options", - ) - # Uses general_options_input - util.set_general_options( - *util.select_args_from_input_list(args, chick_io.general_options_input) - ) - - # Uses ci_options_input - util.set_ci_options( - *util.select_args_from_input_list(args, chick_io.ci_options_input) - ) - - # Set parallelization - logging.log_handler( - self, - response, - "Setting parallelization", - util.logger, - log_level=loglevel, - process_step="parallelization", - ) - doPar = R.get_package("doParallel") - doPar.registerDoParallel(cores=num_cores) - - logging.log_handler( - self, - response, - "Processing CI downscaling", - util.logger, - log_level=loglevel, - process_step="process", - ) - - with TemporaryDirectory() as td: - try: - output_path = td + "/" + output_file - climdown.ci_netcdf_wrapper(gcm_file, obs_file, output_path) - except RRuntimeError as e: - error_handling.custom_process_error(e) - - # stop parallelization - doPar.stopImplicitCluster() - + track_response(response.uuid, response) + try: + args = io.collect_args(request.inputs, self.workdir) + ( + gcm_file, + obs_file, + output_file, + num_cores, + loglevel, + ) = util.select_args_from_input_list(args, self.handler_inputs) + util.raise_if_failed(response) logging.log_handler( self, response, - "Building final output", + "Starting Process", util.logger, log_level=loglevel, - process_step="build_output", + process_step="start", + ) + util.raise_if_failed(response) + logging.log_handler( + self, + response, + "Importing R package 'ClimDown'", + util.logger, + log_level=loglevel, + process_step="get_ClimDown", + ) + climdown = R.get_package("ClimDown") + util.raise_if_failed(response) + logging.log_handler( + self, + response, + "Setting R options", + util.logger, + log_level=loglevel, + process_step="set_R_options", + ) + # Uses general_options_input + util.set_general_options( + *util.select_args_from_input_list(args, chick_io.general_options_input) ) - response.outputs["output"].file = output_path + # Uses ci_options_input + util.set_ci_options( + *util.select_args_from_input_list(args, chick_io.ci_options_input) + ) - # Clear R global env - robjects.r("rm(list=ls())") + # Set parallelization + logging.log_handler( + self, + response, + "Setting parallelization", + util.logger, + log_level=loglevel, + process_step="parallelization", + ) + util.raise_if_failed(response) + doPar = R.get_package("doParallel") + doPar.registerDoParallel(cores=num_cores) logging.log_handler( self, response, - "Process Complete", + "Processing CI downscaling", util.logger, log_level=loglevel, - process_step="complete", + process_step="process", + ) + util.raise_if_failed(response) + set_r_monitor, remove_r_monitor = util.create_r_progress_monitor( + self, response, util.logger, loglevel ) - return response + with TemporaryDirectory() as td: + try: + output_path = td + "/" + output_file + set_r_monitor() + climdown.ci_netcdf_wrapper(gcm_file, obs_file, output_path) + remove_r_monitor() + except RRuntimeError as e: + remove_r_monitor() + error_handling.custom_process_error(e) + + # stop parallelization + doPar.stopImplicitCluster() + util.raise_if_failed(response) + logging.log_handler( + self, + response, + "Building final output", + util.logger, + log_level=loglevel, + process_step="build_output", + ) + + response.outputs["output"].file = output_path + + # Clear R global env + robjects.r("rm(list=ls())") + + logging.log_handler( + self, + response, + "Process Complete", + util.logger, + log_level=loglevel, + process_step="complete", + ) + + return response + finally: + untrack_response(response.uuid) diff --git a/chickadee/processes/wps_rerank.py b/chickadee/processes/wps_rerank.py index 0e9393c..3cfd8bc 100644 --- a/chickadee/processes/wps_rerank.py +++ b/chickadee/processes/wps_rerank.py @@ -78,7 +78,7 @@ def __init__(self): status_supported=True, ) - def read_analogues_file(sef, analogues, analogues_name): + def read_analogues_file(self, analogues, analogues_name): try: return R.load_rdata_to_python(analogues, analogues_name) except (RRuntimeError, ProcessError, IndexError): diff --git a/chickadee/response_tracker.py b/chickadee/response_tracker.py new file mode 100644 index 0000000..947abc2 --- /dev/null +++ b/chickadee/response_tracker.py @@ -0,0 +1,13 @@ +response_registry = {} + + +def track_response(uuid, response): + response_registry[uuid] = response + + +def get_response(uuid): + return response_registry.get(uuid) + + +def untrack_response(uuid): + response_registry.pop(uuid, None) diff --git a/chickadee/utils.py b/chickadee/utils.py index 7307acf..b359829 100644 --- a/chickadee/utils.py +++ b/chickadee/utils.py @@ -1,11 +1,13 @@ import pytest, logging, io, re from rpy2 import robjects +from rpy2.rinterface_lib import callbacks from tempfile import NamedTemporaryFile from urllib.request import urlretrieve from pkg_resources import resource_filename +from pywps.response.status import WPS_STATUS from pywps.app.exceptions import ProcessError from contextlib import redirect_stderr - +from pywps.dblog import get_session, ProcessInstance from wps_tools.testing import run_wps_process @@ -142,3 +144,112 @@ def test_analogues(url, analogues_name, expected_file, expected_analogues): # Clear R global env robjects.r("rm(list=ls())") + + +def raise_if_failed(response): + # Check in-memory response status + if response.status == WPS_STATUS.FAILED: + response.clean() + raise ProcessError("Process failed.") + + uuid = response.uuid + + session = get_session() + try: + process = session.query(ProcessInstance).filter_by(uuid=uuid).first() + if process and process.status == WPS_STATUS.FAILED: + response.update_status(WPS_STATUS.FAILED, "Process failed.", 100) + response.clean() + raise ProcessError("Process failed.") + finally: + session.close() + + +def update_status_with_check(response, message, percentage): + session = get_session() + try: + process = session.query(ProcessInstance).filter_by(uuid=response.uuid).first() + if process and process.status == WPS_STATUS.FAILED: + response.update_status(WPS_STATUS.FAILED, message, percentage) + else: + response.update_status(message, percentage) + finally: + session.close() + + +# Using Rpy2 callbacks to monitor process progress. +# See https://rpy2.github.io/doc/latest/html/callbacks.html#write-console +def create_r_progress_monitor(process_instance, response, logger, log_level): + original_console_write = callbacks.consolewrite_print + + progress_markers = { + "Calculating daily anomalies on the GCM": 23, + "Creating cache file for the interpolated GCM": 26, + "Interpolating the GCM daily anomalies to observation grid": 29, + "Check observations file": 58, + "Reading the monthly climatologies from the observations": 61, + "Calculating the monthly factor across the GCM time series": 64, + "Adding the monthly climatologies to the interpolated GCM": 67, + } + + # Callback to capture R console output and update progress + def custom_console_write(text): + original_console_write(text) + + session = get_session() + try: + process = ( + session.query(ProcessInstance).filter_by(uuid=response.uuid).first() + ) + if process and process.status == WPS_STATUS.FAILED: + logger.info("Process was cancelled. Sending interrupt to R.") + response.clean() + robjects.r("stop('Process cancelled by user')") + raise ProcessError("Process failed.") + finally: + session.close() + # Check for fixed progress markers + for marker, percentage in progress_markers.items(): + if marker in text: + update_status_with_check(response, marker, percentage) + logger.info(marker) + return + + # 29% to 55% + if "Interpolating timesteps" in text: + match = re.search(r"Interpolating timesteps (\d+) - (\d+) / (\d+)", text) + if match: + start, end, total = map(int, match.groups()) + progress = end / int(total) + percentage = 29 + (progress * 26) # 26 = (55 - 29) + message = f"Interpolating timesteps {start}-{end} of {total}" + update_status_with_check(response, message, int(percentage)) + logger.info(message) + return + + # 67% to 95% + if "Applying climatologies to file" in text: + match = re.search( + r"Applying climatologies to file .* steps (\d+) : (\d+) / (\d+)", + text, + ) + if match: + start, end, total = map(int, match.groups()) + progress = end / int(total) + percentage = 67 + (progress * 28) # 28 = (95 - 67) + message = ( + f"Applying climatologies to timesteps {start}-{end} of {total}" + ) + update_status_with_check(response, message, int(percentage)) + logger.info(message) + return + + def set_monitor(): + # Set the R console monitoring callback + callbacks.consolewrite_print = custom_console_write + + def remove_monitor(): + # Restore the original R console callback + callbacks.consolewrite_print = original_console_write + + return set_monitor, remove_monitor diff --git a/chickadee/wsgi.py b/chickadee/wsgi.py index 9820db7..fe536c6 100644 --- a/chickadee/wsgi.py +++ b/chickadee/wsgi.py @@ -1,7 +1,11 @@ import os +import logging from pywps.app.Service import Service - from .processes import processes +from .cancel_process import handle_cancel + +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) def create_app(cfgfiles=None): @@ -10,8 +14,30 @@ def create_app(cfgfiles=None): config_files.extend(cfgfiles) if "PYWPS_CFG" in os.environ: config_files.append(os.environ["PYWPS_CFG"]) - service = Service(processes=processes, cfgfiles=config_files) - return service + + try: + wps_service = Service(processes=processes, cfgfiles=config_files) + except Exception as e: + logger.error(f"Service initialization failed: {str(e)}") + raise + + def application(environ, start_response): + try: + path_info = environ.get("PATH_INFO", "") + logger.debug(f"Request to path: {path_info}") + + if path_info == "/wps/cancel-process": + return handle_cancel(environ, start_response) + return wps_service(environ, start_response) + + except Exception as e: + logger.error(f"Request failed: {str(e)}", exc_info=True) + start_response( + "500 Internal Server Error", [("Content-type", "text/plain")] + ) + return [b"Internal server error"] + + return application application = create_app() diff --git a/docs/cancel_process.md b/docs/cancel_process.md new file mode 100644 index 0000000..dd6b9b6 --- /dev/null +++ b/docs/cancel_process.md @@ -0,0 +1,62 @@ +### Cancelling a Running WPS Process + +The `handle_cancel` function provides an HTTP endpoint for **gracefully cancelling a running WPS process** by sending it a `SIGINT` and updating its status in the PyWPS database. + +#### Endpoint + +``` +POST /wps/cancel-process +``` + +This endpoint is handled by the `handle_cancel` function via WSGI middleware. It is only accessible using the POST method. + +#### Request Format + +**Headers**: + `Content-Type: application/json` + +**Body**: + +```json +{ "uuid": "your-process-uuid" } +``` + +- `uuid` is required and must refer to a running WPS process with a recorded PID in the PyWPS database. + +#### Successful Response + +```json +{ "message": "Process your-process-uuid (PID 12345) cancelled." } +``` + +HTTP Status: `200 OK` + +#### Error Responses + +| Status Code | Reason | +| --------------------------- | ------------------------------------------------------ | +| `400 Bad Request` | Missing or invalid JSON, or UUID not provided | +| `403 Forbidden` | Permission denied when trying to send signal | +| `404 Not Found` | No process found for the given UUID or no PID recorded | +| `405 Method Not Allowed` | Request method was not POST | +| `500 Internal Server Error` | Internal database or signal error | + +--- + +#### Internals + +- Uses `pywps.dblog.get_session()` to query the process from the database. +- Sends `SIGINT` (graceful stop) to the stored PID using `os.kill()`. +- Updates process status using `store_status()` to reflect cancellation as `WPS_STATUS.FAILED`. +- Cleans up the temporary directories using [response.clean()](https://github.com/geopython/pywps/blob/10dd07a9ee55c3033e240fa882eebadfc3ac4ad8/pywps/app/Process.py#L333). +- Closes the database session. + +--- + +#### Example `curl` Request + +```bash +curl -X POST http://localhost:5000/wps/cancel-process \ + -H "Content-Type: application/json" \ + -d '{"uuid": "abcd-1234-efgh-5678"}' +``` diff --git a/install_pkgs.R b/install_pkgs.R index 63641b6..d652a31 100644 --- a/install_pkgs.R +++ b/install_pkgs.R @@ -34,4 +34,4 @@ install.packages('githubinstall') # Install Climdown from GitHub branch library(githubinstall) -gh_install_packages('ClimDown', ref = 'ci-climatex') +gh_install_packages('pacificclimate/ClimDown', ref = 'ci-climatex') diff --git a/pavics-component/wps.cfg.template b/pavics-component/wps.cfg.template index 6a4a85d..2359fa2 100644 --- a/pavics-component/wps.cfg.template +++ b/pavics-component/wps.cfg.template @@ -2,10 +2,20 @@ outputurl = https://${PAVICS_FQDN_PUBLIC}/wpsoutputs outputpath = /data/wpsoutputs maxrequestsize = 65gb -parallelprocesses = 14 +maxprocesses = 6 +parallelprocesses = 2 +workdir = /tmp + +[processing] +mode = default + [logging] level = INFO -db_echo = false +db_echo = true +database = sqlite:////tmp/pywps.db +file = /tmp/pywps.log +format = %(asctime)s] [%(levelname)s] file=%(pathname)s line=%(lineno)s module=%(module)s function=%(funcName)s %(message)s + ${EXTRA_PYWPS_CONFIG} diff --git a/requirements.txt b/requirements.txt index f0afb1d..0afd7a5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,8 @@ -pywps==4.2.6 +pywps==4.6.0 jinja2 click psutil -nchelpers==5.5.7 +nchelpers==5.5.11 rpy2==3.3.6 werkzeug==1.0.1 -wps-tools[r]==2.0.0 +wps-tools[r]==2.1.3 \ No newline at end of file diff --git a/tests/test_utils.py b/tests/test_utils.py index 0db04c1..22cba38 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -33,6 +33,7 @@ def test_select_args_from_input_list(args, inputs): "n_pr_bool", "tasmax_units", "tasmin_units", + "tg_units", "pr_units", "max_gb", "start_date", @@ -44,6 +45,7 @@ def test_select_args_from_input_list(args, inputs): False, "farenheit", "farenheit", + "celsius", "mm", 0.5, date(1996, 9, 14), @@ -56,6 +58,7 @@ def test_set_general_options( n_pr_bool, tasmax_units, tasmin_units, + tg_units, pr_units, max_gb, start_date, @@ -66,6 +69,7 @@ def test_set_general_options( n_pr_bool, tasmax_units, tasmin_units, + tg_units, pr_units, max_gb, start_date, @@ -77,6 +81,7 @@ def test_set_general_options( assert list(base.getOption("target.units")) == [ tasmax_units, tasmin_units, + tg_units, pr_units, ] assert base.getOption("max.GB")[0] == max_gb diff --git a/tests/test_wps_CI.py b/tests/test_wps_CI.py index 79f85e2..235fc35 100644 --- a/tests/test_wps_CI.py +++ b/tests/test_wps_CI.py @@ -12,6 +12,8 @@ def build_params(gcm_file, obs_file, var, out_file): f"obs_file=@xlink:href={obs_file};" f"varname={var};" f"out_file={out_file};" + f"gcm_varname={var};" + f"obs_varname={var};" )