diff --git a/samples/scripts/pushNotifications/README.md b/samples/scripts/pushNotifications/README.md new file mode 100644 index 000000000..5ff8783c9 --- /dev/null +++ b/samples/scripts/pushNotifications/README.md @@ -0,0 +1,89 @@ +# Pub/Sub Bucket Notifications - Local Testing Plan + +This folder contains scripts to locally test DIVE's bucket notification handling as implemented by the `bucket_notifications` plugin endpoints (`/api/v1/bucket_notifications/gcs` and routing API). + +It provides: + +- A MinIO setup script to stand up a local S3-compatible endpoint, create a bucket, and seed sample data (based on `samples/scripts/assetStoreImport/minIOConfig.py`). +- A script to modify bucket contents to simulate new data arriving (file uploads/overwrites/deletes) that should trigger re-imports when wired to notifications. +- A script to send mock GCS Pub/Sub push messages to the server endpoint for end-to-end testing without GCP. + +## Background + +See `docs/Deployment-Storage.md` → "S3 and MinIO Mirroring" and "Pub/Sub notifications". The server exposes: + +- `POST /api/v1/bucket_notifications/gcs` for receiving push-delivered Pub/Sub messages. On OBJECT_FINALIZE, the server will locate a matching read-only S3/GCS assetstore and re-import the relevant path beneath the configured mount folder. +- `POST /api/v1/bucket_notifications/routing/:id` for configuring `notification_router_rules` on an assetstore (folder mount root). + +Relevant server code: `server/bucket_notifications/views.py` and `server/bucket_notifications/models.py`. + +## Prerequisites + +- Docker installed and running +- Python 3.8+ +- DIVE backend running and reachable (e.g., http://localhost:8010) +- An S3/MinIO assetstore configured in Girder Admin pointing to the MinIO bucket created by these scripts: + - Type: Amazon S3 + - Service: `http://:9000` (use actual IP printed by setup script) + - Bucket: `dive-sample-data` + - Read only: checked + - Region: `us-east-1` (or any string) + - Access Key / Secret Key: values printed by setup script + +After creating the assetstore, set notification routing via: + +```bash +curl -X POST \ + -H "Girder-Token: " \ + -H "Content-Type: application/json" \ + http://localhost:8010/api/v1/bucket_notifications/routing/ \ + -d '{"data": {"folderId": ""}}' +``` + +## Scripts + +1) setup_minio.py + +Launches MinIO and seeds data into bucket `dive-sample-data`. Prints MinIO IP and test credentials. Adapted from `samples/scripts/assetStoreImport/minIOConfig.py`. + +2) modify_bucket.py + +Performs object operations to simulate new data arrival: +- Upload a new folder with images +- Overwrite an existing object +- Optionally delete an object + +3) send_gcs_push.py + +Sends a mock GCS Pub/Sub push payload to DIVE at `/api/v1/bucket_notifications/gcs` with `eventType=OBJECT_FINALIZE` targeting a given `bucketId` and `objectId`. + +## End-to-End Test Flow + +1. Run setup_minio.py to start MinIO and seed data. +2. Configure an S3 assetstore in Girder to point to the printed MinIO service and bucket. +3. Configure notification routing on that assetstore to point at your chosen mount root folder. +4. Run modify_bucket.py to upload new files or folders into the MinIO bucket. +5. For local-only testing, run send_gcs_push.py with matching bucket and object path to simulate Pub/Sub push. Example: + +```bash +uv run --script setup_minio.py -d ./sample + +uv run --script modify_bucket.py upload \ + --bucket dive-sample-data \ + --prefix new-sequence/ \ + --local-path ./newData + +uv run --script send_gcs_push.py \ + --server http://localhost:8010 \ + --bucket dive-sample-data \ + --object "new-sequence/" +``` + +Check the DIVE UI: new datasets should appear or update under the configured mount folder as the server processes the import triggered by the notification. + +## Notes + +- The server only handles `OBJECT_FINALIZE` notifications for automatic imports. +- For true GCP testing, you can configure a Pub/Sub push subscription to `https:///api/v1/bucket_notifications/gcs` as described in `docs/Deployment-Storage.md`. + + diff --git a/samples/scripts/pushNotifications/modify_bucket.py b/samples/scripts/pushNotifications/modify_bucket.py new file mode 100644 index 000000000..007e5d0f9 --- /dev/null +++ b/samples/scripts/pushNotifications/modify_bucket.py @@ -0,0 +1,114 @@ +#!/usr/bin/env python3 +# /// script +# requires-python = ">=3.8" +# dependencies = [ +# "click", +# ] +# /// +import subprocess +import click +import os +import uuid + + +def mc_cmd(*args, capture_output=False): + """Run mc command inside the minio_client container.""" + result = subprocess.run( + ["docker", "exec", "minio_client", "mc", *args], + check=True, + text=True, + capture_output=capture_output, + ) + return result.stdout if capture_output else None + + +def docker_exec_mkdir(container_path): + """Ensure a directory exists inside the container.""" + subprocess.run( + ["docker", "exec", "minio_client", "mkdir", "-p", container_path], + check=True, + ) + + +def docker_cp_to_container(local_path, container_path): + """Copy a local file or directory into the minio_client container.""" + container_dir = os.path.dirname(container_path) + docker_exec_mkdir(container_dir) + subprocess.run( + ["docker", "cp", local_path, f"minio_client:{container_path}"], + check=True, + ) + + +def docker_exec_rm(container_path): + """Remove temporary files/folders from inside the container.""" + subprocess.run( + ["docker", "exec", "minio_client", "rm", "-rf", container_path], + check=False, + ) + + +@click.group() +def cli(): + """CLI for managing MinIO bucket objects.""" + pass + + +@cli.command() +@click.option("--bucket", required=True, help="Bucket name, e.g., dive-sample-data") +@click.option("--prefix", required=True, help="Destination prefix inside the bucket") +@click.option("--local-path", required=True, type=click.Path(exists=True), help="Local folder or file to upload") +def upload(bucket, prefix, local_path): + """Upload file/folder to bucket/prefix.""" + temp_id = uuid.uuid4().hex + container_temp_path = f"/tmp/upload_{temp_id}" + container_upload_path = os.path.join(container_temp_path, os.path.basename(local_path)) + + click.echo(f"Copying {local_path} into container...") + docker_exec_mkdir(container_temp_path) + docker_cp_to_container(local_path, container_upload_path) + + try: + click.echo("Uploading to MinIO...") + mc_cmd("cp", "-r", container_upload_path, f"local/{bucket}/{prefix}") + click.echo("✅ Upload complete") + finally: + click.echo("Cleaning up temporary files...") + docker_exec_rm(container_temp_path) + + +@cli.command() +@click.option("--bucket", required=True) +@click.option("--object", "object_path", required=True, help="Object path inside bucket to overwrite") +@click.option("--local-file", required=True, type=click.Path(exists=True)) +def overwrite(bucket, object_path, local_file): + """Overwrite a single object with a local file.""" + temp_id = uuid.uuid4().hex + container_temp_dir = f"/tmp/overwrite_{temp_id}" + container_temp_file = os.path.join(container_temp_dir, os.path.basename(local_file)) + + click.echo(f"Copying {local_file} into container...") + docker_exec_mkdir(container_temp_dir) + docker_cp_to_container(local_file, container_temp_file) + + try: + click.echo("Overwriting object in MinIO...") + mc_cmd("cp", container_temp_file, f"local/{bucket}/{object_path}") + click.echo("✅ Overwrite complete") + finally: + click.echo("Cleaning up temporary files...") + docker_exec_rm(container_temp_dir) + + +@cli.command() +@click.option("--bucket", required=True) +@click.option("--object", "object_path", required=True) +def delete(bucket, object_path): + """Delete an object from the bucket.""" + click.echo(f"Deleting object: {bucket}/{object_path}...") + mc_cmd("rm", f"local/{bucket}/{object_path}") + click.echo("✅ Delete complete") + + +if __name__ == "__main__": + cli() diff --git a/samples/scripts/pushNotifications/send_gcs_push.py b/samples/scripts/pushNotifications/send_gcs_push.py new file mode 100644 index 000000000..ef8688aed --- /dev/null +++ b/samples/scripts/pushNotifications/send_gcs_push.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python3 +# /// script +# requires-python = ">=3.8" +# dependencies = [ +# "click", +# "requests", +# ] +# /// +import click +import requests +import json +from datetime import datetime, timezone +import base64 +import uuid + + +def build_payload(bucket_id: str, object_id: str, event_type: str = "OBJECT_FINALIZE") -> dict: + now = datetime.now(timezone.utc).isoformat() + fake_data = base64.b64encode(b"{}\n").decode("utf-8") + return { + "message": { + "attributes": { + "bucketId": bucket_id, + "objectId": object_id, + "eventType": event_type, + }, + "data": fake_data, + "messageId": str(uuid.uuid4()), + "publishTime": now, + }, + "subscription": "local-testing", + } + + +@click.command() +@click.option("--server", default="http://localhost:8010", show_default=True, help="DIVE server base URL") +@click.option("--bucket", default="dive-sample-data", required=True, help="Bucket name as configured in assetstore") +@click.option("--object", "object_path", required=True, help="Object path that was finalized") +def main(server, bucket, object_path): + url = f"{server}/api/v1/bucket_notifications/gcs" + payload = build_payload(bucket, object_path) + resp = requests.post(url, json={"message": payload["message"], "subscription": payload["subscription"]}) + click.echo(f"POST {url} -> {resp.status_code}") + if resp.status_code >= 400: + click.echo(resp.text) + + +if __name__ == "__main__": + main() + + diff --git a/samples/scripts/pushNotifications/setup_minio.py b/samples/scripts/pushNotifications/setup_minio.py new file mode 100644 index 000000000..999c53507 --- /dev/null +++ b/samples/scripts/pushNotifications/setup_minio.py @@ -0,0 +1,188 @@ +# /// script +# requires-python = ">=3.8" +# dependencies = [ +# "click", +# ] +# /// +import subprocess +import time +import click +import os +import json +from pathlib import Path + + +DEFAULT_ACCESS_KEY = "OMKF2I2NOD7JGYZ9XHE3" +DEFAULT_SECRET_KEY = "xbze+fJ6Wrfplq17JjSCZZJSz7AxEwRWm1MZXH2O" + + +def get_container_ip(container_name: str, network_name: str) -> str: + result = subprocess.run( + ["docker", "inspect", container_name], capture_output=True, text=True, check=True + ) + info = json.loads(result.stdout) + ip_address = info[0]["NetworkSettings"]["Networks"][network_name]["IPAddress"] + return ip_address + + +@click.command() +@click.option( + "--data-dir", + "-d", + default="../assetStoreImport/sample", + show_default=True, + type=click.Path(file_okay=False), + help="Folder to host in MinIO bucket", +) +@click.option("--api-port", default=9000, show_default=True, help="Port for S3 API access") +@click.option( + "--console-port", default=9001, show_default=True, help="Port for MinIO Console access" +) +@click.option("--bucket", default="dive-sample-data", show_default=True, help="Bucket name") +@click.option("--access-key", default=DEFAULT_ACCESS_KEY, show_default=True) +@click.option("--secret-key", default=DEFAULT_SECRET_KEY, show_default=True) +def main(data_dir, api_port, console_port, bucket, access_key, secret_key): + """ + Launch MinIO and a persistent mc container, configure a bucket and user. + """ + data_dir = Path(data_dir).resolve() + data_dir.mkdir(parents=True, exist_ok=True) + + minio_container = "minio_server" + mc_container = "minio_client" + + uid = os.getuid() + gid = os.getgid() + + # Stop/remove existing containers + for c in [minio_container, mc_container]: + subprocess.run(["docker", "rm", "-f", c], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + + # Create a shared Docker network + subprocess.run(["docker", "network", "create", "dive_default"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + + # Start MinIO server + click.echo(f"Starting MinIO server with data dir: {data_dir}") + subprocess.run( + [ + "docker", + "run", + "-d", + "--name", + minio_container, + "--network", + "dive_default", + "-p", + f"{api_port}:9000", + "-p", + f"{console_port}:9001", + "-e", + "MINIO_ROOT_USER=rootuser", + "-e", + "MINIO_ROOT_PASSWORD=rootpass123", + "minio/minio", + "server", + "/data", + "--console-address", + ":9001", + ], + check=True, + ) + + # Give MinIO time to start + click.echo("Waiting for MinIO server to start...") + time.sleep(5) + + mc_config_dir = Path.home() / ".mc" + mc_config_dir.mkdir(parents=True, exist_ok=True) + os.chown(mc_config_dir, uid, gid) + + # Start persistent mc container with config volume + click.echo("Starting persistent mc client container...") + subprocess.run( + [ + "docker", + "run", + "-dit", + "--name", + mc_container, + "--network", + "dive_default", + "-v", + f"{data_dir}:/data", + "--entrypoint=/bin/sh", + "minio/mc", + ], + check=True, + ) + + minio_ip = get_container_ip(minio_container, "dive_default") + + def mc_cmd(*args, capture_output=False): + result = subprocess.run( + ["docker", "exec", mc_container, "mc", *args], + check=True, + text=True, + capture_output=capture_output, + ) + return result.stdout if capture_output else None + + localhost = "localhost" + # Configure alias for root user + mc_cmd("alias", "set", "local", f"http://{minio_ip}:9000", "rootuser", "rootpass123") + + # Create a bucket + existing_buckets = mc_cmd("ls", "local", capture_output=True) or "" + if bucket in existing_buckets: + click.echo(f"Bucket {bucket} already exists, skipping creation.") + else: + click.echo(f"Creating bucket: {bucket} ...") + mc_cmd("mb", f"local/{bucket}") + + # Load sample data into bucket + click.echo(f"Uploading sample data from {data_dir} to bucket...") + mc_cmd("cp", "-r", "/data/.", f"local/{bucket}") + + # Create a new access key under rootuser if not present + output = mc_cmd("admin", "accesskey", "ls", "local", "rootuser", capture_output=True) or "" + has_existing_key = "AccessKey" in output + if not has_existing_key: + click.echo("Creating a new access key...") + mc_cmd( + "admin", + "accesskey", + "create", + "local", + "rootuser", + "--access-key", + access_key, + "--secret-key", + secret_key, + ) + + # Smoke test the new credentials + alias_name = "smoketest" + click.echo("Running smoke test with new access key...") + try: + mc_cmd("alias", "set", alias_name, f"http://{minio_ip}:9000", access_key, secret_key) + mc_cmd("ls", alias_name) # list buckets + click.echo("Smoke test passed! New access key works.") + except subprocess.CalledProcessError: + click.echo("Smoke test failed! Could not use new access key.") + + # remove the smoketest alias and stop client + mc_cmd("alias", "rm", alias_name) + # subprocess.run(["docker", "stop", mc_container], check=True) + # subprocess.run(["docker", "rm", mc_container], check=True) + + click.echo("\nMinIO setup complete!\n") + click.echo(f" Console: http://{localhost}:{console_port} (user: rootuser / rootpass123)") + click.echo(f" S3 API: http://{minio_ip}:{api_port}") + click.echo(f" Bucket: {bucket}") + click.echo(f" Access: {access_key} / {secret_key}") + + +if __name__ == "__main__": + main() + + diff --git a/server/bucket_notifications/models.py b/server/bucket_notifications/models.py index cbbe2e761..255882054 100644 --- a/server/bucket_notifications/models.py +++ b/server/bucket_notifications/models.py @@ -1,4 +1,5 @@ from pydantic import BaseModel +from typing import Optional from dive_server.crud import PydanticAccessControlModel @@ -33,4 +34,5 @@ def initialize(self): class NotificationRouterRule(BaseModel): - folderId: str + folderId: Optional[str] = None + collectionId: Optional[str] = None diff --git a/server/bucket_notifications/views.py b/server/bucket_notifications/views.py index 1961be9c9..d96258f3f 100644 --- a/server/bucket_notifications/views.py +++ b/server/bucket_notifications/views.py @@ -1,5 +1,5 @@ import os - +import json from bson.objectid import ObjectId from girder import logger from girder.api import access @@ -9,12 +9,14 @@ from girder.exceptions import RestException from girder.models.assetstore import Assetstore from girder.models.folder import Folder +from girder.models.collection import Collection from girder.models.user import User from dive_utils.types import AssetstoreModel, GirderModel - +from typing import Literal from .constants import AssetstoreRuleMarker from .models import GCSNotificationRecord, GCSPushNotificationPayload, NotificationRouterRule +from dive_server.event import process_dangling_annotation_files, convert_video_recursive class BucketNotification(Resource): @@ -27,7 +29,7 @@ def __init__(self): self.route("POST", ('gcs',), self.gcs_save_record) @staticmethod - def processNotification(store: AssetstoreModel, rootFolder: GirderModel, importPath: str): + def processNotification(store: AssetstoreModel, rootFolder: GirderModel, importPath: str, importDestType: Literal['folder', 'collection'] = 'folder'): """ Import at proper location """ @@ -74,12 +76,31 @@ def processNotification(store: AssetstoreModel, rootFolder: GirderModel, importP Assetstore().importData( store, target, - 'folder', + importDestType, {'importPath': realImportPath}, None, owner, force_recursive=False, ) + # Now need to post process the import to import the new data + if importDestType == 'folder': + # go through all sub folders and add a new script to convert + destinationFolder = Folder().findOne({"_id": ObjectId(target['_id'])}) + userId = destinationFolder['creatorId'] or destinationFolder['baseParentId'] + user = User().findOne({'_id': ObjectId(userId)}) + process_dangling_annotation_files(destinationFolder, user) + convert_video_recursive(destinationFolder, user) + if importDestType == 'collection': + logger.info(f'Processing collection import after for destination id {target["_id"]}') + destinationCollection = Collection().findOne({"_id": ObjectId(target['_id'])}) + userId = destinationCollection['creatorId'] or destinationCollection['baseParentId'] + user = User().findOne({'_id': ObjectId(userId)}) + child_folders = Folder().find({'parentId': ObjectId(target['_id'])}) + logger.info(f'Processing {child_folders.count()} child folders for collection import after for destination id {target["_id"]}') + for child in child_folders: + logger.info(f'Processing child folder {child["name"]} for collection import after for destination id {target["_id"]}') + process_dangling_annotation_files(child, user) + convert_video_recursive(child, user) @access.admin @autoDescribeRoute( @@ -117,15 +138,22 @@ def gcs_save_record(self, data: dict): AssetstoreRuleMarker: {'$exists': True}, 'bucket': payload.message.attributes.bucketId, # The only viable GSC Service string - 'service': 'https://storage.googleapis.com', + #'service': 'https://storage.googleapis.com', } ) if store is not None: rule = NotificationRouterRule(**store[AssetstoreRuleMarker]) - mountRoot = Folder().findOne({'_id': ObjectId(rule.folderId)}) - BucketNotification.processNotification( - store, mountRoot, payload.message.attributes.objectId - ) + logger.info(f'Processing notification for bucket {payload.message.attributes.bucketId} with rule {json.dumps(rule.dict())}') + if rule.folderId is not None: + mountRoot = Folder().findOne({'_id': ObjectId(rule.folderId)}) + elif rule.collectionId is not None: + mountRoot = Collection().findOne({'_id': ObjectId(rule.collectionId)}) + else: + logger.warning(f'No mount root found for bucket {payload.message.attributes.bucketId}') + return + BucketNotification.processNotification(store, mountRoot, payload.message.attributes.objectId, importDestType='folder' if rule.folderId is not None else 'collection') + else: + logger.warning(f'No store found for bucket {payload.message.attributes.bucketId}') except Exception as err: # exceptions must be swallowed to prevent pub/sub queue backups # message loss is always easily recoverable by running a manual diff --git a/server/dive_tasks/tasks.py b/server/dive_tasks/tasks.py index 609ffd1f3..e94e8d1f6 100644 --- a/server/dive_tasks/tasks.py +++ b/server/dive_tasks/tasks.py @@ -538,6 +538,7 @@ def convert_video( constants.OriginalFPSMarker: originalFps, constants.OriginalFPSStringMarker: avgFpsString, constants.FPSMarker: newAnnotationFps, + constants.MarkForPostProcess: False, "ffprobe_info": videostream[0], }, ) @@ -604,6 +605,7 @@ def convert_video( constants.OriginalFPSMarker: originalFps, constants.OriginalFPSStringMarker: avgFpsString, constants.FPSMarker: newAnnotationFps, + constants.MarkForPostProcess: False, "ffprobe_info": videostream[0], }, ) @@ -652,7 +654,7 @@ def convert_images(self: Task, folderId, user_id: str, user_login: str): gc.addMetadataToFolder( str(folderId), - {"annotate": True}, # mark the parent folder as able to annotate. + {"annotate": True, constants.MarkForPostProcess: False}, # mark the parent folder as able to annotate. )