Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
412ca17
Use PyWPS 4.6.0, .references for OPeNDAP inputs
QSparks Feb 14, 2025
01fb841
Handle OPeNDAP vs. local data in select_args util
QSparks Feb 14, 2025
1fe48fb
Revert select_args_from_input_list
QSparks Feb 14, 2025
ebf3b0f
extract .data if it exists in select_args_from_input_list
QSparks Feb 18, 2025
af6a5cb
Revert .data extract, add logging
QSparks Feb 19, 2025
fcb7a19
Use as_reference for ComplexInput
QSparks Feb 19, 2025
2ac6641
Black
QSparks Feb 19, 2025
0d2c00d
Revet as_reference usage
QSparks Feb 19, 2025
6dd6276
Test OPeNDAP input fix
QSparks Feb 21, 2025
f2e5efa
Revert test OPeNDAP input fix
QSparks Feb 21, 2025
a7abef7
Log wps_CI inputs
QSparks Feb 25, 2025
022eb8d
Test set gcm and obs files to href
QSparks Feb 25, 2025
75b86aa
Test override collect_args for gcm and obs files
QSparks Feb 25, 2025
5153ba4
Remove as_reference to allow container starts
QSparks Feb 25, 2025
9d98fd9
Use updated wps_tools
QSparks Feb 27, 2025
76f9aa7
Install git in Dockerfile
QSparks Feb 27, 2025
9a82fc0
Use python3.9 required by wps_tools
QSparks Feb 27, 2025
68e789f
Use nchelpers v5.5.11
QSparks Feb 27, 2025
526ab0c
Add fine-grained progress steps
QSparks Mar 3, 2025
d1358bf
Use wps-tools v2.1.2 on pypi
QSparks Mar 5, 2025
07d19ff
Add cancellation process
QSparks Mar 6, 2025
ae822a3
Add check for failed status
QSparks Mar 7, 2025
3ec8557
SIGTERM target process in wps_cancel
QSparks Mar 7, 2025
58eeed0
Revert sigterm calls in wps_cancel
QSparks Mar 8, 2025
961acb2
Move WPS cancellation to a separate endpoint
QSparks Mar 10, 2025
b2d796f
Add logging to wsgi.py
QSparks Mar 10, 2025
ba06924
Prevent updates after cancellation
QSparks Mar 11, 2025
13445d4
Update in-memory status on cancel
QSparks Mar 11, 2025
c48d21a
Add graceful termination on cancel
QSparks Mar 11, 2025
91c72c6
Check running before cancel
QSparks Mar 14, 2025
aaa5409
Add sqlite3 to Dockerfile, adjust wps.cfg.template
QSparks Mar 14, 2025
e8bc5be
Launch next process on cancel
QSparks Mar 18, 2025
03d05e7
Revert launch next process on cancel
QSparks Mar 19, 2025
38aaa30
replace os.kill with safe cancel using psutil and signal
QSparks Mar 24, 2025
d858320
Remove timeout check and allow DB-based self-cancel
QSparks Mar 24, 2025
1964167
Test cooperative cancel with os.kill in R callback
QSparks Mar 25, 2025
7db6642
Revert to OS-level process termination
QSparks Mar 25, 2025
14ba2df
Set DB status when process is failed
QSparks Mar 25, 2025
1828690
Track response objects to enable cleanup
QSparks Mar 26, 2025
2f24a06
Use response.uuid for tracking
QSparks Mar 26, 2025
ee8d760
Include response cleanup on failure
QSparks Mar 26, 2025
e7a08bc
Include cleanup on in-response failure
QSparks Mar 26, 2025
eede1ec
Add cancel_process.md
QSparks Mar 26, 2025
0aa396b
First cut fixing CI
QSparks Mar 27, 2025
31c7a4b
Install r-base in python-ci
QSparks Mar 27, 2025
fd3123a
CI Fix
QSparks Mar 31, 2025
f9c26aa
Use tagged wps-tools v2.1.3
QSparks Apr 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 27 additions & 25 deletions .github/workflows/python-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,34 @@ on: push

jobs:
test:

runs-on: ubuntu-20.04
runs-on: ubuntu-24.04
strategy:
matrix:
python-version: ['3.8', '3.9', '3.10']
python-version: ["3.9", "3.10"]

steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
env:
PIP_INDEX_URL: https://pypi.pacificclimate.org/simple
run: |
sudo apt-get update
sudo apt-get install libgit2-dev libudunits2-dev libnetcdf-dev libcurl4-openssl-dev libssl-dev libfontconfig1-dev libxml2-dev libharfbuzz-dev libfribidi-dev libfreetype6-dev libpng-dev libtiff5-dev libjpeg-dev
pip install -r requirements.txt
pip install -e .[dev]
Rscript install_pkgs.R r_requirements.txt
- name: Test with pytest (full)
if: github.ref == 'refs/heads/master'
run: |
py.test -m "not online" -v
- name: Test with pytest (fast)
if: github.ref != 'refs/heads/master'
run: |
py.test -m "not online and not slow" -v
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
env:
PIP_INDEX_URL: https://pypi.pacificclimate.org/simple
run: |
sudo apt-get update
sudo apt-get install r-base libgit2-dev libudunits2-dev libnetcdf-dev libcurl4-openssl-dev libssl-dev libfontconfig1-dev libxml2-dev libharfbuzz-dev libfribidi-dev libfreetype6-dev libpng-dev libtiff5-dev libjpeg-dev
pip install -r requirements.txt
pip install -e .[dev]
- name: Set GITHUB_PAT env var for R
run: echo "GITHUB_PAT=${{ secrets.GITHUB_TOKEN }}" >> $GITHUB_ENV
- name: Install R packages
run: Rscript install_pkgs.R r_requirements.txt
- name: Test with pytest (full)
if: github.ref == 'refs/heads/master'
run: |
py.test -m "not online" -v
- name: Test with pytest (fast)
if: github.ref != 'refs/heads/master'
run: |
py.test -m "not online and not slow" -v
12 changes: 8 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,14 @@ COPY . /tmp
COPY requirements.txt ./

RUN apt-get update && \
apt-get install -y --no-install-recommends python3.8 python3-pip && \
pip install -U pip && \
pip install -r requirements.txt && \
pip install gunicorn
apt-get install -y --no-install-recommends python3.9 python3-pip sqlite3 git && \
# Use Python 3.9 explicitly
update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.9 1 && \
update-alternatives --set python3 /usr/bin/python3.9 && \
# Update pip for Python 3.9
python3.9 -m pip install -U pip && \
python3.9 -m pip install -r requirements.txt && \
python3.9 -m pip install gunicorn

EXPOSE 5000
CMD gunicorn --bind=0.0.0.0:5000 --timeout 0 chickadee.wsgi:application
68 changes: 68 additions & 0 deletions chickadee/cancel_process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import json
import os
import signal
from pywps.dblog import store_status, get_session, ProcessInstance
from pywps.response.status import WPS_STATUS
from chickadee.response_tracker import get_response


def handle_cancel(environ, start_response):
def error(msg, code):
return _simple_json_response(start_response, {"error": msg}, code)

if environ["REQUEST_METHOD"].upper() != "POST":
return error("Method not allowed, use POST", "405 Method Not Allowed")

content_length = int(environ.get("CONTENT_LENGTH") or 0)
try:
body = environ["wsgi.input"].read(content_length)
data = json.loads(body.decode("utf-8"))
except (json.JSONDecodeError, UnicodeDecodeError):
return error("Invalid JSON", "400 Bad Request")

process_uuid = data.get("uuid")
if not process_uuid:
return error("Missing 'uuid' in request body", "400 Bad Request")

session = get_session()
try:
process = session.query(ProcessInstance).filter_by(uuid=process_uuid).first()
if not process or not process.pid:
return error("Process UUID not found or no PID recorded.", "404 Not Found")

pid = process.pid
try:
response = get_response(process_uuid)

if process.status in {WPS_STATUS.STARTED, WPS_STATUS.PAUSED}:
os.kill(pid, signal.SIGINT) # Graceful termination
if response:
response.clean()
store_status(
process_uuid, WPS_STATUS.FAILED, "Process cancelled by user", 100
)
return _simple_json_response(
start_response,
{"message": f"Process {process_uuid} (PID {pid}) cancelled."},
"200 OK",
)

except ProcessLookupError:
return error(f"Process {pid} not found.", "404 Not Found")
except PermissionError:
return error(f"Permission denied to stop process {pid}.", "403 Forbidden")

except Exception as e:
return error(f"Failed to update status: {str(e)}", "500 Internal Server Error")
finally:
session.close()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good, good error handling 👍


def _simple_json_response(start_response, data, status="200 OK"):
body = json.dumps(data).encode("utf-8")
headers = [
("Content-Type", "application/json; charset=utf-8"),
("Content-Length", str(len(body))),
]
start_response(status, headers)
return [body]
183 changes: 98 additions & 85 deletions chickadee/processes/wps_CI.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from wps_tools import logging, R, io, error_handling
import chickadee.utils as util
import chickadee.io as chick_io
from chickadee.response_tracker import track_response, untrack_response


class CI(Process):
Expand Down Expand Up @@ -50,104 +51,116 @@ def __init__(self):
)

def _handler(self, request, response):
args = io.collect_args(request.inputs, self.workdir)
(
gcm_file,
obs_file,
output_file,
num_cores,
loglevel,
) = util.select_args_from_input_list(args, self.handler_inputs)

logging.log_handler(
self,
response,
"Starting Process",
util.logger,
log_level=loglevel,
process_step="start",
)

logging.log_handler(
self,
response,
"Importing R package 'ClimDown'",
util.logger,
log_level=loglevel,
process_step="get_ClimDown",
)
climdown = R.get_package("ClimDown")

logging.log_handler(
self,
response,
"Setting R options",
util.logger,
log_level=loglevel,
process_step="set_R_options",
)
# Uses general_options_input
util.set_general_options(
*util.select_args_from_input_list(args, chick_io.general_options_input)
)

# Uses ci_options_input
util.set_ci_options(
*util.select_args_from_input_list(args, chick_io.ci_options_input)
)

# Set parallelization
logging.log_handler(
self,
response,
"Setting parallelization",
util.logger,
log_level=loglevel,
process_step="parallelization",
)
doPar = R.get_package("doParallel")
doPar.registerDoParallel(cores=num_cores)

logging.log_handler(
self,
response,
"Processing CI downscaling",
util.logger,
log_level=loglevel,
process_step="process",
)

with TemporaryDirectory() as td:
try:
output_path = td + "/" + output_file
climdown.ci_netcdf_wrapper(gcm_file, obs_file, output_path)
except RRuntimeError as e:
error_handling.custom_process_error(e)

# stop parallelization
doPar.stopImplicitCluster()

track_response(response.uuid, response)
try:
args = io.collect_args(request.inputs, self.workdir)
(
gcm_file,
obs_file,
output_file,
num_cores,
loglevel,
) = util.select_args_from_input_list(args, self.handler_inputs)
util.raise_if_failed(response)
logging.log_handler(
self,
response,
"Building final output",
"Starting Process",
util.logger,
log_level=loglevel,
process_step="build_output",
process_step="start",
)
util.raise_if_failed(response)
logging.log_handler(
self,
response,
"Importing R package 'ClimDown'",
util.logger,
log_level=loglevel,
process_step="get_ClimDown",
)
climdown = R.get_package("ClimDown")
util.raise_if_failed(response)
logging.log_handler(
self,
response,
"Setting R options",
util.logger,
log_level=loglevel,
process_step="set_R_options",
)
# Uses general_options_input
util.set_general_options(
*util.select_args_from_input_list(args, chick_io.general_options_input)
)

response.outputs["output"].file = output_path
# Uses ci_options_input
util.set_ci_options(
*util.select_args_from_input_list(args, chick_io.ci_options_input)
)

# Clear R global env
robjects.r("rm(list=ls())")
# Set parallelization
logging.log_handler(
self,
response,
"Setting parallelization",
util.logger,
log_level=loglevel,
process_step="parallelization",
)
util.raise_if_failed(response)
doPar = R.get_package("doParallel")
doPar.registerDoParallel(cores=num_cores)

logging.log_handler(
self,
response,
"Process Complete",
"Processing CI downscaling",
util.logger,
log_level=loglevel,
process_step="complete",
process_step="process",
)
util.raise_if_failed(response)
set_r_monitor, remove_r_monitor = util.create_r_progress_monitor(
self, response, util.logger, loglevel
)

return response
with TemporaryDirectory() as td:
try:
output_path = td + "/" + output_file
set_r_monitor()
climdown.ci_netcdf_wrapper(gcm_file, obs_file, output_path)
remove_r_monitor()
except RRuntimeError as e:
remove_r_monitor()
error_handling.custom_process_error(e)

# stop parallelization
doPar.stopImplicitCluster()
util.raise_if_failed(response)
logging.log_handler(
self,
response,
"Building final output",
util.logger,
log_level=loglevel,
process_step="build_output",
)

response.outputs["output"].file = output_path

# Clear R global env
robjects.r("rm(list=ls())")

logging.log_handler(
self,
response,
"Process Complete",
util.logger,
log_level=loglevel,
process_step="complete",
)

return response
finally:
untrack_response(response.uuid)
2 changes: 1 addition & 1 deletion chickadee/processes/wps_rerank.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def __init__(self):
status_supported=True,
)

def read_analogues_file(sef, analogues, analogues_name):
def read_analogues_file(self, analogues, analogues_name):
try:
return R.load_rdata_to_python(analogues, analogues_name)
except (RRuntimeError, ProcessError, IndexError):
Expand Down
13 changes: 13 additions & 0 deletions chickadee/response_tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
response_registry = {}


def track_response(uuid, response):
response_registry[uuid] = response


def get_response(uuid):
return response_registry.get(uuid)


def untrack_response(uuid):
response_registry.pop(uuid, None)
Loading
Loading