diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f309a92e..5256d9b8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -9,7 +9,7 @@ jobs: fail-fast: true matrix: os: ["ubuntu-latest"] - python-version: ["3.9", "3.10"] + python-version: ["3.10", "3.11", "3.12"] steps: - name: Checkout source @@ -50,7 +50,7 @@ jobs: uses: conda-incubator/setup-miniconda@v2 with: miniconda-version: "latest" - python-version: "3.9" + python-version: "3.12" - name: Run import tests shell: bash -l {0} diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 30fd3238..36fa23bd 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -10,13 +10,13 @@ jobs: - name: Checkout source uses: actions/checkout@v2 - - name: Set up Python 3.9 + - name: Set up Python 3.12 uses: actions/setup-python@v1 with: - python-version: 3.9 + python-version: 3.12 - name: Install pypa/build - run: python -m pip install build wheel + run: python -m pip install build wheel setuptools - name: Build distributions shell: bash -l {0} diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 1ba24797..a2c64c16 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,12 +1,12 @@ repos: - repo: https://github.com/psf/black - rev: 22.3.0 + rev: 23.10.1 hooks: - id: black language_version: python3 exclude: versioneer.py - repo: https://github.com/pycqa/flake8 - rev: 3.9.2 + rev: 6.1.0 hooks: - id: flake8 language_version: python3 diff --git a/.readthedocs.yml b/.readthedocs.yml index 1285a81e..31f71416 100644 --- a/.readthedocs.yml +++ b/.readthedocs.yml @@ -19,4 +19,4 @@ submodules: build: os: ubuntu-22.04 tools: - python: "3" + python: "3.11" diff --git a/README.rst b/README.rst index f806ccae..19eb49b2 100644 --- a/README.rst +++ b/README.rst @@ -24,6 +24,8 @@ Dask Cloud Provider :alt: Conda Forge -Native Cloud integration for Dask. This library intends to allow people to -create dask clusters on a given cloud provider with no set up other than having -credentials. +Native Cloud integration for Dask. + +This library provides tools to enable Dask clusters to more natively integrate with the cloud. +It includes cluster managers to create dask clusters on a given cloud provider using native resources, +plugins to more closely integrate Dask components with the cloud platform they are running on and documentation to empower all folks running Dask on the cloud. diff --git a/ci/environment-3.9.yml b/ci/environment-3.11.yml similarity index 97% rename from ci/environment-3.9.yml rename to ci/environment-3.11.yml index 636f6102..8c4147de 100644 --- a/ci/environment-3.9.yml +++ b/ci/environment-3.11.yml @@ -3,7 +3,7 @@ channels: - defaults - conda-forge dependencies: - - python=3.9 + - python=3.11 - nomkl - pip # Dask diff --git a/ci/environment-3.12.yml b/ci/environment-3.12.yml new file mode 100644 index 00000000..b764807b --- /dev/null +++ b/ci/environment-3.12.yml @@ -0,0 +1,38 @@ +name: dask-cloudprovider-test +channels: + - defaults + - conda-forge +dependencies: + - python=3.12 + - nomkl + - pip + # Dask + - dask + # testing / CI + - flake8 + - ipywidgets + - pytest + - pytest-asyncio + - black >=20.8b1 + - pyyaml + # dask dependencies + - cloudpickle + - toolz + - cytoolz + - numpy + - partd + # distributed dependencies + - click >=6.6 + - msgpack-python + - psutil >=5.0 + - six + - sortedcontainers !=2.0.0,!=2.0.1 + - tblib + - tornado >=5 + - zict >=0.1.3 + # `event_loop_policy` change See https://github.com/dask/distributed/pull/4212 + - pytest-asyncio >=0.14.0 + - pytest-timeout + - pip: + - git+https://github.com/dask/dask.git@main + - git+https://github.com/dask/distributed@main diff --git a/ci/scripts/test_imports.sh b/ci/scripts/test_imports.sh index 16a4f652..2fb1f02b 100644 --- a/ci/scripts/test_imports.sh +++ b/ci/scripts/test_imports.sh @@ -3,9 +3,9 @@ set -o errexit test_import () { - echo "Create environment: python=3.9 $1" + echo "Create environment: python=3.12 $1" # Create an empty environment - conda create -q -y -n test-imports -c conda-forge python=3.9 + conda create -q -y -n test-imports -c conda-forge python=3.12 conda activate test-imports pip install -e .[$1] echo "python -c '$2'" @@ -19,3 +19,5 @@ test_import "aws" "import dask_cloudprovider.aws" test_import "azure" "import dask_cloudprovider.azure" test_import "digitalocean" "import dask_cloudprovider.digitalocean" test_import "gcp" "import dask_cloudprovider.gcp" +test_import "ibm" "import dask_cloudprovider.ibm" +test_import "openstack" "import dask_cloudprovider.openstack" diff --git a/dask_cloudprovider/aws/ecs.py b/dask_cloudprovider/aws/ecs.py index 02db1d55..79e7bb0d 100644 --- a/dask_cloudprovider/aws/ecs.py +++ b/dask_cloudprovider/aws/ecs.py @@ -371,7 +371,7 @@ class Scheduler(Task): Any extra command line arguments to pass to dask-scheduler, e.g. ``["--tls-cert", "/path/to/cert.pem"]`` Defaults to `None`, no extra command line arguments. - kwargs: Dict() + kwargs: Other kwargs to be passed to :class:`Task`. See :class:`Task` for parameter info. @@ -413,7 +413,7 @@ class Worker(Task): scheduler: str The address of the scheduler - kwargs: Dict() + kwargs: Other kwargs to be passed to :class:`Task`. """ @@ -484,6 +484,10 @@ class ECSCluster(SpecCluster, ConfigMixin): The docker image to use for the scheduler and worker tasks. Defaults to ``daskdev/dask:latest`` or ``rapidsai/rapidsai:latest`` if ``worker_gpu`` is set. + cpu_architecture: str (optional) + Runtime platform CPU architecture + + Defaults to ``X86_64``. scheduler_cpu: int (optional) The amount of CPU to request for the scheduler in milli-cpu (1/1024). @@ -678,7 +682,7 @@ class ECSCluster(SpecCluster, ConfigMixin): mounted in worker tasks. This setting controls whether volumes are also mounted in the scheduler task. Default ``False``. - **kwargs: dict + **kwargs: Additional keyword arguments to pass to ``SpecCluster``. Examples @@ -712,6 +716,7 @@ def __init__( fargate_workers=None, fargate_spot=None, image=None, + cpu_architecture="X86_64", scheduler_cpu=None, scheduler_mem=None, scheduler_port=8786, @@ -758,6 +763,7 @@ def __init__( self._fargate_workers = fargate_workers self._fargate_spot = fargate_spot self.image = image + self._cpu_architecture = cpu_architecture.upper() self._scheduler_cpu = scheduler_cpu self._scheduler_mem = scheduler_mem self._scheduler_port = scheduler_port @@ -1223,6 +1229,7 @@ async def _create_scheduler_task_definition_arn(self): if self._volumes and self._mount_volumes_on_scheduler else [], requiresCompatibilities=["FARGATE"] if self._fargate_scheduler else [], + runtimePlatform={"cpuArchitecture": self._cpu_architecture}, cpu=str(self._scheduler_cpu), memory=str(self._scheduler_mem), tags=dict_to_aws(self.tags), @@ -1297,6 +1304,7 @@ async def _create_worker_task_definition_arn(self): ], volumes=self._volumes if self._volumes else [], requiresCompatibilities=["FARGATE"] if self._fargate_workers else [], + runtimePlatform={"cpuArchitecture": self._cpu_architecture}, cpu=str(self._worker_cpu), memory=str(self._worker_mem), tags=dict_to_aws(self.tags), @@ -1388,7 +1396,7 @@ class FargateCluster(ECSCluster): Parameters ---------- - kwargs: dict + kwargs: Keyword arguments to be passed to :class:`ECSCluster`. Examples diff --git a/dask_cloudprovider/azure/tests/test_azurevm.py b/dask_cloudprovider/azure/tests/test_azurevm.py index 20aab38c..f9561bd4 100644 --- a/dask_cloudprovider/azure/tests/test_azurevm.py +++ b/dask_cloudprovider/azure/tests/test_azurevm.py @@ -66,7 +66,6 @@ def inc(x): @skip_without_credentials @pytest.mark.external async def test_create_cluster_sync(): - with AzureVMCluster() as cluster: with Client(cluster) as client: cluster.scale(1) @@ -84,7 +83,6 @@ def inc(x): @skip_without_credentials @pytest.mark.external async def test_create_rapids_cluster_sync(): - with AzureVMCluster( vm_size="Standard_NC12s_v3", docker_image="rapidsai/rapidsai:cuda11.0-runtime-ubuntu18.04-py3.9", diff --git a/dask_cloudprovider/cloudprovider.yaml b/dask_cloudprovider/cloudprovider.yaml index 1a17ff45..fcf201aa 100755 --- a/dask_cloudprovider/cloudprovider.yaml +++ b/dask_cloudprovider/cloudprovider.yaml @@ -15,6 +15,7 @@ cloudprovider: scheduler_timeout: "5 minutes" # Length of inactivity to wait before closing the cluster image: "daskdev/dask:latest" # Docker image to use for non GPU tasks + cpu_architecture: "X86_64" # Runtime platform CPU architecture gpu_image: "rapidsai/rapidsai:latest" # Docker image to use for GPU tasks cluster_name_template: "dask-{uuid}" # Template to use when creating a cluster cluster_arn: "" # ARN of existing ECS cluster to use (if not set one will be created) @@ -117,3 +118,30 @@ cloudprovider: image: "ubuntu-20.04" # Operating System image to use docker_image: "daskdev/dask:latest" # docker image to use bootstrap: true # It is assumed that the OS image does not have Docker and needs bootstrapping. Set this to false if using a custom image with Docker already installed. + + ibm: + api_key: null + image: "ghcr.io/dask/dask:latest" + region: us-east + project_id: null + scheduler_cpu: "1.0" + scheduler_mem: 4G + scheduler_timeout: 600 # seconds + worker_cpu: "2.0" + worker_mem: 8G + worker_threads: 1 + + openstack: + region: "RegionOne" # The name of the region where resources will be allocated in OpenStack. List available regions using: `openstack region list`. + size: null # Openstack flavors define the compute, memory, and storage capacity of computing instances. List available flavors using: `openstack flavor list` + auth_url: null # The authentication URL for the OpenStack Identity service (Keystone). Example: https://cloud.example.com:5000 + application_credential_id: null # The application credential id created in OpenStack. Create application credentials using: openstack application credential create + application_credential_secret: null # The secret associated with the application credential ID for authentication. + auth_type: "v3applicationcredential" # The type of authentication used, typically "v3applicationcredential" for using OpenStack application credentials. + network_id: null # The unique identifier for the internal/private network in OpenStack where the cluster VMs will be connected. List available networks using: `openstack network list` + image: null # The OS image name or id to use for the VM. List available images using: `openstack image list` + keypair_name: null # The name of the SSH keypair used for instance access. Ensure you have created a keypair or use an existing one. List available keypairs using: `openstack keypair list` + security_group: null # The security group name that defines firewall rules for instances. List available security groups using: `openstack security group list` + external_network_id: null # The ID of the external network used for assigning floating IPs. List available external networks using: `openstack network list --external` + create_floating_ip: false # Specifies whether to assign a floating IP to each instance, enabling external access. Set to `True` if external connectivity is needed. + docker_image: "daskdev/dask:latest" # docker image to use diff --git a/dask_cloudprovider/gcp/instances.py b/dask_cloudprovider/gcp/instances.py index 681a42d1..d322b984 100644 --- a/dask_cloudprovider/gcp/instances.py +++ b/dask_cloudprovider/gcp/instances.py @@ -4,6 +4,7 @@ import json import sqlite3 +from typing import Optional, Any, Dict import dask from dask.utils import tmpfile @@ -106,11 +107,9 @@ def __init__( self.instance_labels = _instance_labels self.general_zone = "-".join(self.zone.split("-")[:2]) # us-east1-c -> us-east1 - self.service_account = service_account or self.config.get("service_account") def create_gcp_config(self): - subnetwork = f"projects/{self.network_projectid}/regions/{self.general_zone}/subnetworks/{self.network}" config = { "name": self.name, @@ -205,7 +204,6 @@ def create_gcp_config(self): return config async def create_vm(self): - self.cloud_init = self.cluster.render_process_cloud_init(self) self.gcp_config = self.create_gcp_config() @@ -496,6 +494,8 @@ class GCPCluster(VMCluster): service_account: str Service account that all VMs will run under. Defaults to the default Compute Engine service account for your GCP project. + service_account_credentials: Optional[Dict[str, Any]] + Service account credentials to create the compute engine Vms Examples -------- @@ -589,10 +589,10 @@ def __init__( debug=False, instance_labels=None, service_account=None, + service_account_credentials: Optional[Dict[str, Any]] = None, **kwargs, ): - - self.compute = GCPCompute() + self.compute = GCPCompute(service_account_credentials) self.config = dask.config.get("cloudprovider.gcp", {}) self.auto_shutdown = ( @@ -644,13 +644,20 @@ def __init__( class GCPCompute: - """Wrapper for the ``googleapiclient`` compute object.""" + """ + Wrapper for the ``googleapiclient`` compute object. - def __init__(self): + Attributes + ---------- + service_account_credentials: Optional[dict] + Service account credentials to create the compute engine Vms + """ + + def __init__(self, service_account_credentials: Optional[dict[str, Any]] = None): + self.service_account_credentials = service_account_credentials or {} self._compute = self.refresh_client() def refresh_client(self): - if os.environ.get("GOOGLE_APPLICATION_CREDENTIALS", False): import google.oauth2.service_account # google-auth @@ -658,6 +665,13 @@ def refresh_client(self): os.environ["GOOGLE_APPLICATION_CREDENTIALS"], scopes=["https://www.googleapis.com/auth/cloud-platform"], ) + elif self.service_account_credentials: + import google.oauth2.service_account # google-auth + + creds = google.oauth2.service_account.Credentials.from_service_account_info( + self.service_account_credentials, + scopes=["https://www.googleapis.com/auth/cloud-platform"], + ) else: import google.auth.credentials # google-auth diff --git a/dask_cloudprovider/gcp/tests/test_gcp.py b/dask_cloudprovider/gcp/tests/test_gcp.py index 915f5d88..bf96e9af 100644 --- a/dask_cloudprovider/gcp/tests/test_gcp.py +++ b/dask_cloudprovider/gcp/tests/test_gcp.py @@ -76,7 +76,6 @@ async def test_create_cluster(): async with GCPCluster( asynchronous=True, env_vars={"FOO": "bar"}, security=True ) as cluster: - assert cluster.status == Status.running cluster.scale(2) @@ -132,7 +131,6 @@ async def test_create_rapids_cluster(): auto_shutdown=True, bootstrap=False, ) as cluster: - assert cluster.status == Status.running cluster.scale(1) diff --git a/dask_cloudprovider/ibm/__init__.py b/dask_cloudprovider/ibm/__init__.py new file mode 100644 index 00000000..f91fab73 --- /dev/null +++ b/dask_cloudprovider/ibm/__init__.py @@ -0,0 +1 @@ +from .code_engine import IBMCodeEngineCluster diff --git a/dask_cloudprovider/ibm/code_engine.py b/dask_cloudprovider/ibm/code_engine.py new file mode 100644 index 00000000..e056de12 --- /dev/null +++ b/dask_cloudprovider/ibm/code_engine.py @@ -0,0 +1,485 @@ +import json +import time +import urllib3 +import threading +import random + +import dask +from dask_cloudprovider.generic.vmcluster import ( + VMCluster, + VMInterface, + SchedulerMixin, + WorkerMixin, +) + +from distributed.core import Status +from distributed.security import Security + +try: + from ibm_code_engine_sdk.code_engine_v2 import CodeEngineV2 + from ibm_cloud_sdk_core.authenticators import IAMAuthenticator +except ImportError as e: + msg = ( + "Dask Cloud Provider IBM requirements are not installed.\n\n" + "Please either conda or pip install as follows:\n\n" + " conda install -c conda-forge dask-cloudprovider # either conda install\n" + ' pip install "dask-cloudprovider[ibm]" --upgrade # or python -m pip install' + ) + raise ImportError(msg) from e + + +urllib3.disable_warnings() + + +class IBMCodeEngine(VMInterface): + def __init__( + self, + cluster: str, + config, + image: str = None, + region: str = None, + project_id: str = None, + scheduler_cpu: str = None, + scheduler_mem: str = None, + scheduler_timeout: int = None, + worker_cpu: str = None, + worker_mem: str = None, + worker_threads: int = None, + api_key: str = None, + **kwargs, + ): + super().__init__(**kwargs) + self.cluster = cluster + self.config = config + self.image = image + self.region = region + self.project_id = project_id + self.scheduler_cpu = scheduler_cpu + self.scheduler_mem = scheduler_mem + self.scheduler_timeout = scheduler_timeout + self.worker_cpu = worker_cpu + self.worker_mem = worker_mem + self.worker_threads = worker_threads + self.api_key = api_key + + authenticator = IAMAuthenticator(self.api_key, url="https://iam.cloud.ibm.com") + authenticator.set_disable_ssl_verification( + True + ) # Disable SSL verification for the authenticator + + self.code_engine_service = CodeEngineV2(authenticator=authenticator) + self.code_engine_service.set_service_url( + "https://api." + self.region + ".codeengine.cloud.ibm.com/v2" + ) + self.code_engine_service.set_disable_ssl_verification( + True + ) # Disable SSL verification for the service instance + + async def create_vm(self): + # Deploy a scheduler on a Code Engine application + # It allows listening on a specific port and exposing it to the public + if "scheduler" in self.name: + self.code_engine_service.create_app( + project_id=self.project_id, + image_reference=self.image, + name=self.name, + run_commands=self.command, + image_port=8786, + scale_cpu_limit=self.cpu, + scale_min_instances=1, + scale_concurrency=1000, + scale_memory_limit=self.memory, + scale_request_timeout=self.cluster.scheduler_timeout, + run_env_variables=[ + { + "type": "literal", + "name": "DASK_INTERNAL_INHERIT_CONFIG", + "key": "DASK_INTERNAL_INHERIT_CONFIG", + "value": dask.config.serialize(dask.config.global_config), + } + ], + ) + + # Create a ConfigMap with the Dask configuration once time + self.code_engine_service.create_config_map( + project_id=self.project_id, + name=self.cluster.uuid, + data={ + "DASK_INTERNAL_INHERIT_CONFIG": dask.config.serialize( + dask.config.global_config + ), + }, + ) + + # This loop waits for the app to be ready, then returns the internal and public URLs + while True: + response = self.code_engine_service.get_app( + project_id=self.project_id, + name=self.name, + ) + app = response.get_result() + if app["status"] == "ready": + break + + time.sleep(0.5) + + internal_url = app["endpoint_internal"].split("//")[1] + public_url = app["endpoint"].split("//")[1] + + return internal_url, public_url + + # Deploy a worker on a Code Engine job run + else: + + def create_job_run_thread(): + retry_delay = 1 + + # Add an exponential sleep to avoid overloading the Code Engine API + for attempt in range(5): + try: + self.code_engine_service.create_job_run( + project_id=self.project_id, + image_reference=self.image, + name=self.name, + run_commands=self.command, + scale_cpu_limit=self.cpu, + scale_memory_limit=self.memory, + run_env_variables=[ + { + "type": "config_map_key_reference", + "reference": self.cluster.uuid, + "name": "DASK_INTERNAL_INHERIT_CONFIG", + "key": "DASK_INTERNAL_INHERIT_CONFIG", + } + ], + ) + return + except Exception: + time.sleep(retry_delay) + retry_delay *= 2 + retry_delay += random.uniform(0, 1) + + raise Exception("Maximum retry attempts reached") + + # Create a thread to create multiples job runs in parallel + job_run_thread = threading.Thread(target=create_job_run_thread) + job_run_thread.start() + + async def destroy_vm(self): + self.cluster._log(f"Deleting Instance: {self.name}") + + if "scheduler" in self.name: + self.code_engine_service.delete_app( + project_id=self.project_id, + name=self.name, + ) + else: + self.code_engine_service.delete_job_run( + project_id=self.project_id, + name=self.name, + ) + try: + self.code_engine_service.delete_config_map( + project_id=self.project_id, + name=self.cluster.uuid, + ) + except Exception: + pass + + +class IBMCodeEngineScheduler(SchedulerMixin, IBMCodeEngine): + """Scheduler running in a GCP instance.""" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.cpu = self.cluster.scheduler_cpu + self.memory = self.cluster.scheduler_mem + + self.command = [ + "python", + "-m", + "distributed.cli.dask_scheduler", + "--protocol", + "ws", + ] + + async def start(self): + self.cluster._log( + f"Launching cluster with the following configuration: " + f"\n Source Image: {self.image} " + f"\n Region: {self.region} " + f"\n Project id: {self.project_id} " + f"\n Scheduler CPU: {self.cpu} " + f"\n Scheduler Memory: {self.memory} " + f"\n Scheduler Timeout: {self.cluster.scheduler_timeout} " + f"\n Worker CPU: {self.cluster.worker_cpu} " + f"\n Worker Memory: {self.cluster.worker_mem} " + f"\n Worker Threads: {self.cluster.worker_threads} " + ) + self.cluster._log(f"Creating scheduler instance {self.name}") + + # It must use the external URL with the "wss" protocol and port 443 to establish a + # secure WebSocket connection between the client and the scheduler. + self.internal_ip, self.external_ip = await self.create_vm() + self.address = f"wss://{self.external_ip}:443" + + await self.wait_for_scheduler() + + self.cluster.scheduler_internal_ip = self.internal_ip + self.cluster.scheduler_external_ip = self.external_ip + self.cluster.scheduler_port = self.port + self.status = Status.running + + +class IBMCodeEngineWorker(WorkerMixin, IBMCodeEngine): + def __init__( + self, + *args, + worker_class: str = "distributed.cli.Nanny", + worker_options: dict = {}, + **kwargs, + ): + super().__init__(*args, **kwargs) + self.worker_class = worker_class + self.worker_options = worker_options + self.cpu = self.cluster.worker_cpu + self.memory = self.cluster.worker_mem + + # On this case, the worker must connect to the scheduler internal URL with the "ws" protocol and port 80 + internal_scheduler = f"ws://{self.cluster.scheduler_internal_ip}:80" + + self.command = [ + "python", + "-m", + "distributed.cli.dask_spec", + internal_scheduler, + "--spec", + json.dumps( + { + "cls": self.worker_class, + "opts": { + **worker_options, + "name": self.name, + "nthreads": self.cluster.worker_threads, + }, + } + ), + ] + + async def start(self): + self.cluster._log(f"Creating worker instance {self.name}") + await self.create_vm() + self.status = Status.running + + +class IBMCodeEngineCluster(VMCluster): + """Cluster running on IBM Code Engine. + + This cluster manager builds a Dask cluster running on IBM Code Engine. + + When configuring your cluster, you may find it useful to refer to the IBM Cloud documentation for available options. + + https://cloud.ibm.com/docs/codeengine + + Parameters + ---------- + image: str + The Docker image to run on all instances. This image must have a valid Python environment and have ``dask`` + installed in order for the ``dask-scheduler`` and ``dask-worker`` commands to be available. + region: str + The IBM Cloud region to launch your cluster in. + + See: https://cloud.ibm.com/docs/codeengine?topic=codeengine-regions + project_id: str + Your IBM Cloud project ID. This must be set either here or in your Dask config. + scheduler_cpu: str + The amount of CPU to allocate to the scheduler. + + See: https://cloud.ibm.com/docs/codeengine?topic=codeengine-mem-cpu-combo + scheduler_mem: str + The amount of memory to allocate to the scheduler. + + See: https://cloud.ibm.com/docs/codeengine?topic=codeengine-mem-cpu-combo + scheduler_timeout: int + The timeout for the scheduler in seconds. + worker_cpu: str + The amount of CPU to allocate to each worker. + + See: https://cloud.ibm.com/docs/codeengine?topic=codeengine-mem-cpu-combo + worker_mem: str + The amount of memory to allocate to each worker. + + See: https://cloud.ibm.com/docs/codeengine?topic=codeengine-mem-cpu-combo + worker_threads: int + The number of threads to use on each worker. + debug: bool, optional + More information will be printed when constructing clusters to enable debugging. + + Notes + ----- + + **Credentials** + + In order to use the IBM Cloud API, you will need to set up an API key. You can create an API key in the IBM Cloud + console. + + The best practice way of doing this is to pass an API key to be used by workers. You can set this API key as an + environment variable. Here is a small example to help you do that. + + To expose your IBM API KEY, use this command: + export DASK_CLOUDPROVIDER__IBM__API_KEY=xxxxx + + **Certificates** + + This backend will need to use a Let's Encrypt certificate (ISRG Root X1) to connect the client to the scheduler + between websockets. More information can be found here: https://letsencrypt.org/certificates/ + + Examples + -------- + + Create the cluster. + + >>> from dask_cloudprovider.ibm import IBMCodeEngineCluster + >>> cluster = IBMCodeEngineCluster(n_workers=1) + Launching cluster with the following configuration: + Source Image: daskdev/dask:latest + Region: eu-de + Project id: f21626f6-54f7-4065-a038-75c8b9a0d2e0 + Scheduler CPU: 0.25 + Scheduler Memory: 1G + Scheduler Timeout: 600 + Worker CPU: 2 + Worker Memory: 4G + Creating scheduler dask-xxxxxxxx-scheduler + Waiting for scheduler to run at dask-xxxxxxxx-scheduler.xxxxxxxxxxxx.xx-xx.codeengine.appdomain.cloud:443 + Scheduler is running + Creating worker instance dask-xxxxxxxx-worker-xxxxxxxx + + >>> from dask.distributed import Client + >>> client = Client(cluster) + + Do some work. + + >>> import dask.array as da + >>> arr = da.random.random((1000, 1000), chunks=(100, 100)) + >>> arr.mean().compute() + 0.5001550986751964 + + Close the cluster + + >>> cluster.close() + Deleting Instance: dask-xxxxxxxx-worker-xxxxxxxx + Deleting Instance: dask-xxxxxxxx-scheduler + + You can also do this all in one go with context managers to ensure the cluster is created and cleaned up. + + >>> with IBMCodeEngineCluster(n_workers=1) as cluster: + ... with Client(cluster) as client: + ... print(da.random.random((1000, 1000), chunks=(100, 100)).mean().compute()) + Launching cluster with the following configuration: + Source Image: daskdev/dask:latest + Region: eu-de + Project id: f21626f6-54f7-4065-a038-75c8b9a0d2e0 + Scheduler CPU: 0.25 + Scheduler Memory: 1G + Scheduler Timeout: 600 + Worker CPU: 2 + Worker Memory: 4G + Creating scheduler dask-xxxxxxxx-scheduler + Waiting for scheduler to run at dask-xxxxxxxx-scheduler.xxxxxxxxxxxx.xx-xx.codeengine.appdomain.cloud:443 + Scheduler is running + Creating worker instance dask-xxxxxxxx-worker-xxxxxxxx + 0.5000812282861661 + Deleting Instance: dask-xxxxxxxx-worker-xxxxxxxx + Deleting Instance: dask-xxxxxxxx-scheduler + + """ + + def __init__( + self, + image: str = None, + region: str = None, + project_id: str = None, + scheduler_cpu: str = None, + scheduler_mem: str = None, + scheduler_timeout: int = None, + worker_cpu: str = None, + worker_mem: str = None, + worker_threads: int = 1, + debug: bool = False, + **kwargs, + ): + self.config = dask.config.get("cloudprovider.ibm", {}) + self.scheduler_class = IBMCodeEngineScheduler + self.worker_class = IBMCodeEngineWorker + + self.image = image or self.config.get("image") + self.region = region or self.config.get("region") + self.project_id = project_id or self.config.get("project_id") + api_key = self.config.get("api_key") + self.scheduler_cpu = scheduler_cpu or self.config.get("scheduler_cpu") + self.scheduler_mem = scheduler_mem or self.config.get("scheduler_mem") + self.scheduler_timeout = scheduler_timeout or self.config.get( + "scheduler_timeout" + ) + self.worker_cpu = worker_cpu or self.config.get("worker_cpu") + self.worker_mem = worker_mem or self.config.get("worker_mem") + self.worker_threads = worker_threads or self.config.get("worker_threads") + + self.debug = debug + + self.options = { + "cluster": self, + "config": self.config, + "image": self.image, + "region": self.region, + "project_id": self.project_id, + "scheduler_cpu": self.scheduler_cpu, + "scheduler_mem": self.scheduler_mem, + "scheduler_timeout": self.scheduler_timeout, + "worker_cpu": self.worker_cpu, + "worker_mem": self.worker_mem, + "worker_threads": self.worker_threads, + "api_key": api_key, + } + self.scheduler_options = {**self.options} + self.worker_options = {**self.options} + + # https://letsencrypt.org/certificates/ --> ISRG Root X1 + sec = Security( + require_encryption=False, + tls_ca_file=( + "-----BEGIN CERTIFICATE-----\n" + "MIIFazCCA1OgAwIBAgIRAIIQz7DSQONZRGPgu2OCiwAwDQYJKoZIhvcNAQELBQAw\n" + "TzELMAkGA1UEBhMCVVMxKTAnBgNVBAoTIEludGVybmV0IFNlY3VyaXR5IFJlc2Vh\n" + "cmNoIEdyb3VwMRUwEwYDVQQDEwxJU1JHIFJvb3QgWDEwHhcNMTUwNjA0MTEwNDM4\n" + "WhcNMzUwNjA0MTEwNDM4WjBPMQswCQYDVQQGEwJVUzEpMCcGA1UEChMgSW50ZXJu\n" + "ZXQgU2VjdXJpdHkgUmVzZWFyY2ggR3JvdXAxFTATBgNVBAMTDElTUkcgUm9vdCBY\n" + "MTCCAiIwDQYJKoZIhvcNAQEBBQADggIPADCCAgoCggIBAK3oJHP0FDfzm54rVygc\n" + "h77ct984kIxuPOZXoHj3dcKi/vVqbvYATyjb3miGbESTtrFj/RQSa78f0uoxmyF+\n" + "0TM8ukj13Xnfs7j/EvEhmkvBioZxaUpmZmyPfjxwv60pIgbz5MDmgK7iS4+3mX6U\n" + "A5/TR5d8mUgjU+g4rk8Kb4Mu0UlXjIB0ttov0DiNewNwIRt18jA8+o+u3dpjq+sW\n" + "T8KOEUt+zwvo/7V3LvSye0rgTBIlDHCNAymg4VMk7BPZ7hm/ELNKjD+Jo2FR3qyH\n" + "B5T0Y3HsLuJvW5iB4YlcNHlsdu87kGJ55tukmi8mxdAQ4Q7e2RCOFvu396j3x+UC\n" + "B5iPNgiV5+I3lg02dZ77DnKxHZu8A/lJBdiB3QW0KtZB6awBdpUKD9jf1b0SHzUv\n" + "KBds0pjBqAlkd25HN7rOrFleaJ1/ctaJxQZBKT5ZPt0m9STJEadao0xAH0ahmbWn\n" + "OlFuhjuefXKnEgV4We0+UXgVCwOPjdAvBbI+e0ocS3MFEvzG6uBQE3xDk3SzynTn\n" + "jh8BCNAw1FtxNrQHusEwMFxIt4I7mKZ9YIqioymCzLq9gwQbooMDQaHWBfEbwrbw\n" + "qHyGO0aoSCqI3Haadr8faqU9GY/rOPNk3sgrDQoo//fb4hVC1CLQJ13hef4Y53CI\n" + "rU7m2Ys6xt0nUW7/vGT1M0NPAgMBAAGjQjBAMA4GA1UdDwEB/wQEAwIBBjAPBgNV\n" + "HRMBAf8EBTADAQH/MB0GA1UdDgQWBBR5tFnme7bl5AFzgAiIyBpY9umbbjANBgkq\n" + "hkiG9w0BAQsFAAOCAgEAVR9YqbyyqFDQDLHYGmkgJykIrGF1XIpu+ILlaS/V9lZL\n" + "ubhzEFnTIZd+50xx+7LSYK05qAvqFyFWhfFQDlnrzuBZ6brJFe+GnY+EgPbk6ZGQ\n" + "3BebYhtF8GaV0nxvwuo77x/Py9auJ/GpsMiu/X1+mvoiBOv/2X/qkSsisRcOj/KK\n" + "NFtY2PwByVS5uCbMiogziUwthDyC3+6WVwW6LLv3xLfHTjuCvjHIInNzktHCgKQ5\n" + "ORAzI4JMPJ+GslWYHb4phowim57iaztXOoJwTdwJx4nLCgdNbOhdjsnvzqvHu7Ur\n" + "TkXWStAmzOVyyghqpZXjFaH3pO3JLF+l+/+sKAIuvtd7u+Nxe5AW0wdeRlN8NwdC\n" + "jNPElpzVmbUq4JUagEiuTDkHzsxHpFKVK7q4+63SM1N95R1NbdWhscdCb+ZAJzVc\n" + "oyi3B43njTOQ5yOf+1CceWxG1bQVs5ZufpsMljq4Ui0/1lvh+wjChP4kqKOJ2qxq\n" + "4RgqsahDYVvTH9w7jXbyLeiNdd8XM2w9U/t7y0Ff/9yi0GE44Za4rF2LN9d11TPA\n" + "mRGunUHBcnWEvgJBQl9nJEiU0Zsnvgc/ubhPgXRR4Xq37Z0j4r7g1SgEEzwxA57d\n" + "emyPxgcYxn/eR44/KJ4EBs+lVDR3veyJm+kXQ99b21/+jh5Xos1AnX5iItreGCc=\n" + "-----END CERTIFICATE-----" + ), + ) + super().__init__(security=sec, debug=debug, **kwargs) diff --git a/dask_cloudprovider/ibm/tests/test_code_engine.py b/dask_cloudprovider/ibm/tests/test_code_engine.py new file mode 100644 index 00000000..ec84a421 --- /dev/null +++ b/dask_cloudprovider/ibm/tests/test_code_engine.py @@ -0,0 +1,108 @@ +import pytest + +import dask + +codeengine = pytest.importorskip("ibm_code_engine_sdk.code_engine_v2") + +from dask_cloudprovider.ibm.code_engine import IBMCodeEngineCluster +from dask.distributed import Client +from distributed.core import Status + + +async def skip_without_credentials(): + if dask.config.get("cloudprovider.ibm.api_key") is None: + pytest.skip( + """ + You must configure a IBM API key to run this test. + + Either set this in your config + + # cloudprovider.yaml + cloudprovider: + ibm: + api_key: "your_api_key" + + Or by setting it as an environment variable + + export DASK_CLOUDPROVIDER__IBM__API_KEY="your_api_key" + + """ + ) + + if dask.config.get("cloudprovider.ibm.project_id") is None: + pytest.skip( + """ + You must configure a IBM project id to run this test. + + Either set this in your config + + # cloudprovider.yaml + cloudprovider: + ibm: + project_id: "your_project_id" + + Or by setting it as an environment variable + + export DASK_CLOUDPROVIDER__IBM__PROJECT_ID="your_project_id" + + """ + ) + + if dask.config.get("cloudprovider.ibm.region") is None: + pytest.skip( + """ + You must configure a IBM project id to run this test. + + Either set this in your config + + # cloudprovider.yaml + cloudprovider: + ibm: + region: "your_region" + + Or by setting it as an environment variable + + export DASK_CLOUDPROVIDER__IBM__REGION="your_region" + + """ + ) + + +@pytest.mark.asyncio +async def test_init(): + await skip_without_credentials() + cluster = IBMCodeEngineCluster(asynchronous=True) + assert cluster.status == Status.created + + +@pytest.mark.asyncio +@pytest.mark.timeout(1200) +@pytest.mark.external +async def test_create_cluster(): + async with IBMCodeEngineCluster(asynchronous=True) as cluster: + cluster.scale(2) + await cluster + assert len(cluster.workers) == 2 + + async with Client(cluster, asynchronous=True) as client: + + def inc(x): + return x + 1 + + assert await client.submit(inc, 10).result() == 11 + + +@pytest.mark.asyncio +@pytest.mark.timeout(1200) +@pytest.mark.external +async def test_create_cluster_sync(): + with IBMCodeEngineCluster() as cluster: + with Client(cluster) as client: + cluster.scale(1) + client.wait_for_workers(1) + assert len(cluster.workers) == 1 + + def inc(x): + return x + 1 + + assert client.submit(inc, 10).result() == 11 diff --git a/dask_cloudprovider/openstack/__init__.py b/dask_cloudprovider/openstack/__init__.py new file mode 100644 index 00000000..91ca9fe0 --- /dev/null +++ b/dask_cloudprovider/openstack/__init__.py @@ -0,0 +1 @@ +from .instances import OpenStackCluster diff --git a/dask_cloudprovider/openstack/instances.py b/dask_cloudprovider/openstack/instances.py new file mode 100644 index 00000000..903a4a16 --- /dev/null +++ b/dask_cloudprovider/openstack/instances.py @@ -0,0 +1,372 @@ +import asyncio +import dask + +from dask_cloudprovider.generic.vmcluster import ( + VMCluster, + VMInterface, + SchedulerMixin, + WorkerMixin, +) + +from distributed.core import Status + +try: + from openstack import connection +except ImportError as e: + msg = ( + "Dask Cloud Provider OpenStack requirements are not installed.\n\n" + "Please pip install as follows:\n\n" + ' pip install "openstacksdk" ' + ) + raise ImportError(msg) from e + + +class OpenStackInstance(VMInterface): + def __init__( + self, + cluster, + config, + region: str = None, + size: str = None, + image: str = None, + docker_image: str = None, + env_vars: str = None, + extra_bootstrap: str = None, + **kwargs, + ): + super().__init__(**kwargs) + self.instance = None + self.cluster = cluster + self.config = config + self.region = region + self.size = size + self.image = image + self.env_vars = env_vars + self.bootstrap = True + self.docker_image = docker_image + self.extra_bootstrap = extra_bootstrap + + async def create_vm(self): + conn = connection.Connection( + region_name=self.region, + auth_url=self.config["auth_url"], + application_credential_id=self.config["application_credential_id"], + application_credential_secret=self.config["application_credential_secret"], + compute_api_version="2", + identity_interface="public", + auth_type="v3applicationcredential", + ) + + self.instance = conn.create_server( + name=self.name, + image=self.image, + flavor=self.size, # Changed 'flavor_id' to 'flavor' + key_name=self.config["keypair_name"], # Add the keypair name here + nics=[ + {"net-id": self.config["network_id"]} + ], # Changed from 'networks' to 'nics' + userdata=self.cluster.render_process_cloud_init(self), + security_groups=[self.config["security_group"]], + ) + + # Wait for the instance to be up and running + while self.instance.status.lower() != "active": + await asyncio.sleep(0.1) + self.instance = conn.compute.get_server(self.instance.id) + + # Retrieve the internal IP address + self.internal_ip = await self.get_internal_ip(conn) + + # Check if a floating IP should be created and assigned + if self.config.get("create_floating_ip", False): + self.external_ip = await self.create_and_assign_floating_ip(conn) + else: + self.external_ip = await self.get_external_ip(conn) + + self.cluster._log( + f"{self.name}\n\tInternal IP: {self.internal_ip}\n\tExternal IP: " + f"{self.external_ip if self.external_ip else 'None'}" + ) + return self.internal_ip, self.external_ip + + async def get_internal_ip(self, conn): + """Fetch the internal IP address from the OpenStack instance.""" + instance = conn.compute.get_server(self.instance.id) + for network in instance.addresses.values(): + for addr in network: + if addr["OS-EXT-IPS:type"] == "fixed": + return addr["addr"] + return None + + async def get_external_ip(self, conn): + """Fetch the external IP address from the OpenStack instance, if it exists.""" + instance = conn.compute.get_server(self.instance.id) + for network in instance.addresses.values(): + for addr in network: + if addr["OS-EXT-IPS:type"] == "floating": + return addr["addr"] + return None + + async def create_and_assign_floating_ip(self, conn): + """Create and assign a floating IP to the instance.""" + try: + # Create a floating IP + floating_ip = await self.cluster.call_async( + conn.network.create_ip, + floating_network_id=self.config["external_network_id"], + ) + + # Assign the floating IP to the server + await self.cluster.call_async( + conn.compute.add_floating_ip_to_server, + server=self.instance.id, + address=floating_ip.floating_ip_address, + ) + + return floating_ip.floating_ip_address + except Exception as e: + self.cluster._log(f"Failed to create or assign floating IP: {str(e)}") + return None + + async def destroy_vm(self): + conn = connection.Connection( + region_name=self.region, + auth_url=self.config["auth_url"], + application_credential_id=self.config["application_credential_id"], + application_credential_secret=self.config["application_credential_secret"], + compute_api_version="2", + identity_interface="public", + auth_type="v3applicationcredential", + ) + + # Handle floating IP disassociation and deletion if applicable + if self.config.get( + "create_floating_ip", False + ): # Checks if floating IPs were configured to be created + try: + # Retrieve all floating IPs associated with the instance + floating_ips = conn.network.ips(port_id=self.instance.id) + for ip in floating_ips: + # Disassociate and delete the floating IP + conn.network.update_ip(ip, port_id=None) + conn.network.delete_ip(ip.id) + self.cluster._log(f"Deleted floating IP {ip.floating_ip_address}") + except Exception as e: + self.cluster._log( + f"Failed to clean up floating IPs for instance {self.name}: {str(e)}" + ) + return # Exit if floating IP cleanup fails + + # Then, attempt to delete the instance + try: + instance = conn.compute.get_server(self.instance.id) + if instance: + await self.cluster.call_async(conn.compute.delete_server, instance.id) + self.cluster._log(f"Terminated instance {self.name}") + else: + self.cluster._log(f"Instance {self.name} not found or already deleted.") + except Exception as e: + self.cluster._log(f"Failed to terminate instance {self.name}: {str(e)}") + + async def start_vm(self): + # Code to start the instance + pass # Placeholder to ensure correct indentation + + async def stop_vm(self): + # Code to stop the instance + pass # Placeholder to ensure correct indentation + + +class OpenStackScheduler(SchedulerMixin, OpenStackInstance): + """Scheduler running on an OpenStack Instance.""" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + async def start(self): + await self.start_scheduler() + self.status = Status.running + + async def start_scheduler(self): + self.cluster._log( + f"Launching cluster with the following configuration: " + f"\n OS Image: {self.image} " + f"\n Flavor: {self.size} " + f"\n Docker Image: {self.docker_image} " + f"\n Security Group: {self.config['security_group']} " + ) + self.cluster._log("Creating scheduler instance") + self.internal_ip, self.external_ip = await self.create_vm() + + # Choose the IP based on the access type configuration + if self.config.get("create_floating_ip", True): + # If public access is required and a floating IP is created + self.address = f"{self.cluster.protocol}://{self.external_ip}:{self.port}" + else: + # Use internal IP if no external access is configured + self.address = f"{self.cluster.protocol}://{self.internal_ip}:{self.port}" + + await self.wait_for_scheduler() + + # Storing IPs for cluster-wide use, if necessary + self.cluster.scheduler_internal_ip = self.internal_ip + self.cluster.scheduler_external_ip = self.external_ip + self.cluster.scheduler_port = self.port + + +class OpenStackWorker(WorkerMixin, OpenStackInstance): + """Worker running on a OpenStack Instance.""" + + +class OpenStackCluster(VMCluster): + """Cluster running on Openstack VM Instances + + This cluster manager constructs a Dask cluster running on generic Openstack cloud + + When configuring your cluster you may find it useful to install the 'python-openstackclient' + client for querying the Openstack APIs for available options. + + https://github.com/openstack/python-openstackclient + + Parameters + ---------- + + region: str + The name of the region where resources will be allocated in OpenStack. + Typically set to 'default' unless specified in your cloud configuration. + + List available regions using: `openstack region list`. + auth_url: str + The authentication URL for the OpenStack Identity service (Keystone). + Example: https://cloud.example.com:5000 + application_credential_id: str + The application credential id created in OpenStack. + + Create application credentials using: openstack application credential create + application_credential_secret: str + The secret associated with the application credential ID for authentication. + auth_type: str + The type of authentication used, typically "v3applicationcredential" for + using OpenStack application credentials. + network_id: str + The unique identifier for the internal/private network in OpenStack where the cluster + VMs will be connected. + + List available networks using: `openstack network list` + image: str + The OS image name or id to use for the VM. Dask Cloudprovider will boostrap Ubuntu + based images automatically. Other images require Docker and for GPUs + the NVIDIA Drivers and NVIDIA Docker. + + List available images using: `openstack image list` + keypair_name: str + The name of the SSH keypair used for instance access. Ensure you have created a keypair + or use an existing one. + + List available keypairs using: `openstack keypair list` + security_group: str + The security group name that defines firewall rules for instances. + + The default is `default`. Please ensure the follwing accesses are configured: + - egress 0.0.0.0/0 on all ports for downloading docker images and general data access + - ingress /8 on all ports for internal communication of workers + - ingress 0.0.0.0/0 on 8786-8787 for external accessibility of the dashboard/scheduler + - (optional) ingress 0.0.0.0./0 on 22 for ssh access + + List available security groups using: `openstack security group list` + create_floating_ip: bool + Specifies whether to assign a floating IP to each instance, enabling external + access. Set to `True` if external connectivity is needed. + external_network_id: str + The ID of the external network used for assigning floating IPs. + + List available external networks using: `openstack network list --external` + n_workers: int (optional) + Number of workers to initialise the cluster with. Defaults to ``0``. + worker_module: str + The Python module to run for the worker. Defaults to ``distributed.cli.dask_worker`` + worker_options: dict + Params to be passed to the worker class. + See :class:`distributed.worker.Worker` for default worker class. + If you set ``worker_module`` then refer to the docstring for the custom worker class. + scheduler_options: dict + Params to be passed to the scheduler class. + See :class:`distributed.scheduler.Scheduler`. + env_vars: dict + Environment variables to be passed to the worker. + extra_bootstrap: list[str] (optional) + Extra commands to be run during the bootstrap phase. + docker_image: string (optional) + The Docker image to run on all instances. + + This image must have a valid Python environment and have ``dask`` installed in order for the + ``dask-scheduler`` and ``dask-worker`` commands to be available. It is recommended the Python + environment matches your local environment where ``OpenStackCluster`` is being created from. + + For GPU instance types the Docker image much have NVIDIA drivers and ``dask-cuda`` installed. + + By default the ``daskdev/dask:latest`` image will be used. + + Example + -------- + + >>> from dask_cloudprovider.openstack import OpenStackCluster + >>> cluster = OpenStackCluster(n_workers=1) + Launching cluster with the following configuration: + OS Image: ubuntu-22-04 + Flavor: 4vcpu-8gbram-50gbdisk + Docker Image: daskdev/dask:latest + Security Group: all-open + Creating scheduler instance + dask-9b85a5f8-scheduler + Internal IP: 10.0.30.148 + External IP: None + Waiting for scheduler to run at 10.0.30.148:8786 + Scheduler is running + Creating worker instance + + >>> from dask.distributed import Client + >>> client = Client(cluster) + + >>> import dask.array as da + >>> arr = da.random.random((1000, 1000), chunks=(100, 100)) + >>> arr.mean().compute() + + >>> client.close() + >>> cluster.close() + Terminated instance dask-07280176-worker-319005a2 + Terminated instance dask-07280176-scheduler + """ + + def __init__( + self, + region: str = None, + size: str = None, + image: str = None, + docker_image: str = None, + debug: bool = False, + bootstrap: bool = True, + **kwargs, + ): + self.config = dask.config.get("cloudprovider.openstack", {}) + self.scheduler_class = OpenStackScheduler + self.worker_class = OpenStackWorker + self.debug = debug + self.bootstrap = ( + bootstrap if bootstrap is not None else self.config.get("bootstrap") + ) + self.options = { + "cluster": self, + "config": self.config, + "region": region if region is not None else self.config.get("region"), + "size": size if size is not None else self.config.get("size"), + "image": image if image is not None else self.config.get("image"), + "docker_image": docker_image or self.config.get("docker_image"), + } + self.scheduler_options = {**self.options} + self.worker_options = {**self.options} + + if "extra_bootstrap" not in kwargs: + kwargs["extra_bootstrap"] = self.config.get("extra_bootstrap") + + super().__init__(debug=debug, **kwargs) diff --git a/dask_cloudprovider/openstack/tests/test_instances.py b/dask_cloudprovider/openstack/tests/test_instances.py new file mode 100644 index 00000000..eae815bf --- /dev/null +++ b/dask_cloudprovider/openstack/tests/test_instances.py @@ -0,0 +1,72 @@ +import pytest +import dask +from dask_cloudprovider.openstack.instances import OpenStackCluster +from dask.distributed import Client +from distributed.core import Status + +# Optional: Skips tests if OpenStack credentials are not set + + +async def skip_without_credentials(config): + if ( + config.get("auth_url") is None + or config.get("application_credential_secret") is None + ): + pytest.skip( + """ + You must configure OpenStack credentials to run this test. + + Set this in your config file or environment variables: + + # cloudprovider.yaml + cloudprovider: + openstack: + auth_url: "your_auth_url" + application_credential_id: "your_app_cred_id" + application_credential_secret: "your_app_cred_secret" + """ + ) + + +@pytest.fixture +async def config(): + return dask.config.get("cloudprovider.openstack", {}) + + +@pytest.fixture +@pytest.mark.external +async def cluster(config): + await skip_without_credentials(config) + + async with OpenStackCluster(asynchronous=True) as cluster: + yield cluster + + +@pytest.mark.asyncio +async def test_init(): + cluster = OpenStackCluster(asynchronous=True) + assert cluster.status == Status.created + + +@pytest.mark.asyncio +@pytest.mark.timeout(600) +async def test_create_cluster(cluster): + assert cluster.status == Status.running + cluster.scale(1) + await cluster + assert len(cluster.workers) == 1 + + async with Client(cluster, asynchronous=True) as client: + + def inc(x): + return x + 1 + + assert await client.submit(inc, 10).result() == 11 + + +@pytest.mark.asyncio +async def test_get_cloud_init(): + cloud_init = OpenStackCluster.get_cloud_init( + docker_args="--privileged", + ) + assert " --privileged " in cloud_init diff --git a/doc/requirements-docs.txt b/doc/requirements-docs.txt index 3228340f..1ba263a7 100644 --- a/doc/requirements-docs.txt +++ b/doc/requirements-docs.txt @@ -1,3 +1,12 @@ numpydoc sphinx dask-sphinx-theme>=3.0.5 +# FIXME: This workaround is required until we have sphinx>=5, as enabled by +# dask-sphinx-theme no longer pinning sphinx-book-theme==0.2.0. This is +# tracked in https://github.com/dask/dask-sphinx-theme/issues/68. +# +sphinxcontrib-applehelp<1.0.5 +sphinxcontrib-devhelp<1.0.6 +sphinxcontrib-htmlhelp<2.0.5 +sphinxcontrib-serializinghtml<1.1.10 +sphinxcontrib-qthelp<1.0.7 diff --git a/doc/source/alternatives.rst b/doc/source/alternatives.rst new file mode 100644 index 00000000..4f9f809d --- /dev/null +++ b/doc/source/alternatives.rst @@ -0,0 +1,51 @@ +Alternatives +============ + +Many tools and services exist today for deploying Dask clusters, many of which are commonly used on the cloud. +This project aims to provide cloud native plugins and tools for Dask which can often compliment other approaches. + +Community tools +--------------- + +Dask has a `vibrant ecosystem of community tooling for deploying Dask `_ on various platforms. Many of which can be used on public cloud. + +Kubernetes +^^^^^^^^^^ + +`Kubernetes `_ is an extremely popular project for managing cloud workloads and is part of the broader `Cloud Native Computing Foundation (CNCF) `_ ecosystem. + +Dask has many options for `deploying clusters on Kubernetes `_. + +HPC on Cloud +^^^^^^^^^^^^ + +Many popular HPC scheduling tools are used on the cloud and support features such as elastic scaling. +If you are already leveraging HPC tools like `SLURM on the cloud `_ then `Dask has great integration with HPC schedulers `_. + +Hadoop/Spark/Yarn +^^^^^^^^^^^^^^^^^ + +Many cloud platforms have popular managed services for running Apache Spark workloads. + +If you're already using a managed map-reduce service like `Amazon EMR `_ then check out `dask-yarn `_. + +Nebari +^^^^^^ + +`Nebari `_ is an open source data science platform which can be run locally or on a cloud platform of your choice. +It includes a managed Dask service built on `Dask Gateway `_ for managing Dask clusters. + +Managed Services +---------------- + +Cloud vendors and third-party companies also offer managed Dask clusters as a service + +Coiled +^^^^^^ + +`Coiled `_ is a mature managed Dask service that spawns clusters in your cloud account and allows you to manage them via a central control plane. + +Saturn Cloud +^^^^^^^^^^^^ + +`Saturn Cloud `_ is a managed data science platform with hosted Dask clusters or the option to deploy them in your own AWS account. diff --git a/doc/source/ibm.rst b/doc/source/ibm.rst new file mode 100644 index 00000000..c0ed620a --- /dev/null +++ b/doc/source/ibm.rst @@ -0,0 +1,58 @@ +IBM Cloud +============ + +.. currentmodule:: dask_cloudprovider.ibm + +.. autosummary:: + IBMCodeEngineCluster + +Overview +-------- + +Authentication +^^^^^^^^^^^^^^ + +To authenticate with IBM Cloud you must first generate an +`API key `_. + +Then you must put this in your Dask configuration at ``cloudprovider.ibm.api_key``. This can be done by +adding the API key to your YAML configuration or exporting an environment variable. + +.. code-block:: yaml + + # ~/.config/dask/cloudprovider.yaml + + cloudprovider: + ibm: + api_key: "your_api_key" + +.. code-block:: console + + $ export DASK_CLOUDPROVIDER__IBM__API_KEY="your_api_key" + +Project ID +^^^^^^^^^^ + +To use Dask Cloudprovider with IBM Cloud you must also configure your `Project ID `_. +This can be found at the top of the IBM Cloud dashboard. + +Your Project ID must be added to your Dask config file. + +.. code-block:: yaml + + # ~/.config/dask/cloudprovider.yaml + cloudprovider: + ibm: + project_id: "your_project_id" + +Or via an environment variable. + +.. code-block:: console + + $ export DASK_CLOUDPROVIDER__IBM__PROJECT_ID="your_project_id" + +Code Engine +------- + +.. autoclass:: IBMCodeEngineCluster + :members: \ No newline at end of file diff --git a/doc/source/index.rst b/doc/source/index.rst index c2e57968..a87aaf94 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -3,6 +3,15 @@ Dask Cloud Provider *Native Cloud integration for Dask.* +This package contains open source tools to help you deploy and operate Dask clusters on the cloud. +It contains cluster managers which can help you launch clusters using native cloud resources like VMs or containers, +it has tools and plugins for use in ANY cluster running on the cloud and is a great source of documentation for Dask cloud deployments. + +It is by no means the "complete" or "only" way to run Dask on the cloud, check out the :doc:`alternatives` page for more tools. + +Cluster managers +---------------- + This package provides classes for constructing and managing ephemeral Dask clusters on various cloud platforms. @@ -52,6 +61,26 @@ this code. with Client(cluster) as client: # Do some Dask things +Plugins +------- + +Dask components like Schedulers and Workers can benefit from being cloud-aware. +This project has plugins and tools that extend these components. + +One example is having the workers check for termination warnings when running on ephemeral/spot instances and begin migrating data to other workers. + +For Azure VMs you could use the :class:`dask_cloudprovider.azure.AzurePreemptibleWorkerPlugin` to do this. +It can be used on any cluster that has workers running on Azure VMs, not just ones created with :class:`dask_cloudprovider.azure.AzureVMCluster`. + +.. code-block:: python + + from distributed import Client + client = Client("") + + from dask_cloudprovider.azure import AzurePreemptibleWorkerPlugin + client.register_worker_plugin(AzurePreemptibleWorkerPlugin()) + + .. toctree:: :maxdepth: 2 :hidden: @@ -59,6 +88,7 @@ this code. installation.rst config.rst + alternatives.rst .. toctree:: :maxdepth: 2 @@ -70,6 +100,8 @@ this code. gcp.rst azure.rst hetzner.rst + ibm.rst + openstack.rst .. toctree:: :maxdepth: 2 diff --git a/doc/source/installation.rst b/doc/source/installation.rst index b953cbd6..c28a2be0 100644 --- a/doc/source/installation.rst +++ b/doc/source/installation.rst @@ -16,7 +16,9 @@ You can also restrict your install to just a specific cloud provider by giving t $ pip install dask-cloudprovider[azure]  # or $ pip install dask-cloudprovider[azureml]  # or $ pip install dask-cloudprovider[digitalocean]  # or - $ pip install dask-cloudprovider[gcp] + $ pip install dask-cloudprovider[gcp]  # or + $ pip install dask-cloudprovider[ibm]  # or + $ pip install dask-cloudprovider[openstack] Conda ----- diff --git a/doc/source/openstack.rst b/doc/source/openstack.rst new file mode 100644 index 00000000..cb0a743d --- /dev/null +++ b/doc/source/openstack.rst @@ -0,0 +1,75 @@ +Openstack +============ + +.. currentmodule:: dask_cloudprovider.openstack + +.. autosummary:: + OpenStackCluster + +Overview +-------- + +Authentication +^^^^^^^^^^^^^^ + +To authenticate with the OpenStack Identity service (Keystone) + +1) Get your Authentication URL (auth_url) for OpenStack Identity service (Keystone) and put it in your Dask configuration at ``cloudprovider.openstack.auth_url``. + +2) Get your `region `_ and put it in your Dask configuration at ``cloudprovider.openstack.region``. + .. code-block:: console + + $ openstack region list + +-----------+---------------+-------------+ + | Region | Parent Region | Description | + +-----------+---------------+-------------+ + | RegionOne | None | | + +-----------+---------------+-------------+ + +3) Generate an `application credential `_. + + .. code-block:: console + + $ openstack application credential create dask --unrestricted + +--------------+----------------------------------------------------------------------------------------+ + | Field | Value | + +--------------+----------------------------------------------------------------------------------------+ + | description | None | + | expires_at | None | + | id | 0a0372dbedfb4e82ab66449c3316ef1e | + | name | dask | + | project_id | e99b6f4b9bf84a9da27e20c9cbfe887a | + | roles | Member anotherrole | + | secret | ArOy6DYcLeLTRlTmfvF1TH1QmRzYbmD91cbVPOHL3ckyRaLXlaq5pTGJqvCvqg6leEvTI1SQeX3QK-3iwmdPxg | + | unrestricted | True | + +--------------+----------------------------------------------------------------------------------------+ + + and put ``application_credential_id`` and ``application_credential_secret`` in your Dask configuration at ``cloudprovider.openstack.application_credential_id`` + and ``cloudprovider.openstack.application_credential_secret``. + +All of this variables can be gathered from either `OpenStack RC file `_ +or `clouds.yaml file `_. + +Example Config File +^^^^^^^^^^^^^^ +.. code-block:: yaml + + # ~/.config/dask/cloudprovider.yaml + + cloudprovider: + openstack: + region: "RegionOne" + auth_url: "https://cloud.home.karatosun.xyz:5000" + application_credential_id: "0a0372dbedfb4e82ab66449c3316ef1e" + application_credential_secret: "ArOy6DYcLeLTRlTmfvF1TH1QmRzYbmD91cbVPOHL3ckyRaLXlaq5pTGJqvCvqg6leEvTI1SQeX3QK-3iwmdPxg" + auth_type: "v3applicationcredential" + +You can also export them as environment variables. + +.. code-block:: console + + $ export DASK_CLOUDPROVIDER__APPLICATION_CREDENTIAL_ID="0a0372dbedfb4e82ab66449c3316ef1e" + + +.. autoclass:: OpenStackCluster + :members: diff --git a/doc/source/troubleshooting.rst b/doc/source/troubleshooting.rst index 17975626..eb47e3bf 100644 --- a/doc/source/troubleshooting.rst +++ b/doc/source/troubleshooting.rst @@ -79,3 +79,14 @@ and ``extra_bootstrap`` argument where you can provide additional bash commands cluster = AzureVMCluster(... docker_image="my_private_image:latest", extra_bootstrap=["docker login -u 'username' -p 'password'"]) + +If you need to access Artifact/Container Registry in GCP, one way of doing it would be to authenticate Docker with +`gcloud credential helper `_ by adding extra bootstrap params similar to +the ones below: + +.. code-block:: python + + from dask_cloudprovider.gcp import GCPCluster + cluster = GCPCluster(... + docker_image=f"{region}-docker.pkg.dev/{project}/{repo}/{image}:{tag}", + extra_bootstrap=[f"gcloud auth configure-docker {region}-docker.pkg.dev"]) diff --git a/examples/OpenstackCluster-scorepredict.ipynb b/examples/OpenstackCluster-scorepredict.ipynb new file mode 100644 index 00000000..b8d83fb3 --- /dev/null +++ b/examples/OpenstackCluster-scorepredict.ipynb @@ -0,0 +1,1270 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "bc03588b-e539-4d6f-8c5c-13b9c5a499e1", + "metadata": {}, + "source": [ + "# Score and Predict Large Datasets with Dask Openstack Cloud Provider" + ] + }, + { + "cell_type": "markdown", + "id": "58a2fdac-0f03-48df-bc6b-286515761ba6", + "metadata": {}, + "source": [ + "This example combines the [Score and Predict Large Datasets example at dask-examples git repository](https://github.com/dask/dask-examples/blob/main/machine-learning/parallel-prediction.ipynb) with Dask Openstack Cloud Provider.\n", + "\n", + "Details: https://examples.dask.org/machine-learning/parallel-prediction.html" + ] + }, + { + "cell_type": "markdown", + "id": "d7269385-d165-4d33-884d-0cf9b9f1590d", + "metadata": {}, + "source": [ + "## Prerequisites" + ] + }, + { + "cell_type": "markdown", + "id": "4db66a79-0648-40d3-8c7d-f1b8d69f6c14", + "metadata": {}, + "source": [ + "```bash\n", + "pip install dask-cloudprovider\n", + "pip install numpy scikit-learn dask_ml\n", + "```" + ] + }, + { + "cell_type": "code", + "execution_count": 46, + "id": "b1973e66-8d3b-4f30-bcac-9e506a1d0ca3", + "metadata": {}, + "outputs": [], + "source": [ + "# > (optional) Let's disable the deprecation warnings to improve readability.\n", + "import warnings\n", + "warnings.filterwarnings('ignore', category=UserWarning)" + ] + }, + { + "cell_type": "markdown", + "id": "4000888b-e720-4ea2-955c-9bcbee7a5db3", + "metadata": {}, + "source": [ + "## Create and Connect to Dask Cluster with Openstack Cloud Provider" + ] + }, + { + "cell_type": "code", + "execution_count": 49, + "id": "5b745083-bb26-4fe9-a4f6-f7d049628a79", + "metadata": {}, + "outputs": [], + "source": [ + "import dask\n", + "import dask_cloudprovider\n", + "from instances import OpenStackCluster\n", + "from dask.distributed import Client, progress" + ] + }, + { + "cell_type": "code", + "execution_count": 50, + "id": "169db10e-b9cf-484f-9d3d-115272eec85d", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Launching cluster with the following configuration: \n", + " OS Image: ubuntu-22-04 \n", + " Flavor: 4vcpu-8gbram-50gbdisk \n", + " Docker Image: armagankaratosun/dask-ml:latest \n", + " Security Group: all-open \n", + "Creating scheduler instance\n", + "dask-d523cb04-scheduler\n", + "\tInternal IP: 10.0.30.30\n", + "\tExternal IP: None\n", + "Waiting for scheduler to run at 10.0.30.30:8786\n", + "Scheduler is running\n", + "Creating worker instance\n", + "Creating worker instance\n", + "dask-d523cb04-worker-96fd7205\n", + "\tInternal IP: 10.0.30.95\n", + "\tExternal IP: None\n", + "dask-d523cb04-worker-e0f7591e\n", + "\tInternal IP: 10.0.30.107\n", + "\tExternal IP: None\n" + ] + } + ], + "source": [ + "cluster = OpenStackCluster(n_workers=2, shutdown_on_close=True, docker_image=\"armagankaratosun/dask-ml:latest\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "abdbec11-1154-43d3-91c1-55d03d5935dc", + "metadata": {}, + "outputs": [], + "source": [ + "client = Client(cluster)" + ] + }, + { + "cell_type": "code", + "execution_count": 55, + "id": "9c413324-904c-41af-bf10-a82ecf10d2c0", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "
\n", + "
\n", + "

Client

\n", + "

Client-6f338508-661e-11ef-936f-2a7e43ef1ca5

\n", + " \n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + "
Connection method: Cluster objectCluster type: instances.OpenStackCluster
\n", + " Dashboard: http://10.0.30.30:8787/status\n", + "
\n", + "\n", + " \n", + "\n", + " \n", + "
\n", + "

Cluster Info

\n", + "
\n", + "
\n", + "
\n", + "
\n", + "

OpenStackCluster

\n", + "

3b3f98df

\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
\n", + " Dashboard: http://10.0.30.30:8787/status\n", + " \n", + " Workers: 2\n", + "
\n", + " Total threads: 8\n", + " \n", + " Total memory: 15.32 GiB\n", + "
\n", + "\n", + "
\n", + " \n", + "

Scheduler Info

\n", + "
\n", + "\n", + "
\n", + "
\n", + "
\n", + "
\n", + "

Scheduler

\n", + "

Scheduler-c3c079d0-f8e5-4ce6-8f11-d26e8654fe07

\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
\n", + " Comm: tls://10.0.30.30:8786\n", + " \n", + " Workers: 2\n", + "
\n", + " Dashboard: http://10.0.30.30:8787/status\n", + " \n", + " Total threads: 8\n", + "
\n", + " Started: 4 minutes ago\n", + " \n", + " Total memory: 15.32 GiB\n", + "
\n", + "
\n", + "
\n", + "\n", + "
\n", + " \n", + "

Workers

\n", + "
\n", + "\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: dask-d523cb04-worker-96fd7205

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tls://10.0.30.95:40477\n", + " \n", + " Total threads: 4\n", + "
\n", + " Dashboard: http://10.0.30.95:43771/status\n", + " \n", + " Memory: 7.66 GiB\n", + "
\n", + " Nanny: tls://10.0.30.95:44951\n", + "
\n", + " Local directory: /tmp/dask-scratch-space/worker-e5rljt6_\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: dask-d523cb04-worker-e0f7591e

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tls://10.0.30.107:35117\n", + " \n", + " Total threads: 4\n", + "
\n", + " Dashboard: http://10.0.30.107:46297/status\n", + " \n", + " Memory: 7.66 GiB\n", + "
\n", + " Nanny: tls://10.0.30.107:43675\n", + "
\n", + " Local directory: /tmp/dask-scratch-space/worker-5l075jui\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "\n", + "
\n", + "
\n", + "\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "\n", + "
\n", + "
" + ], + "text/plain": [ + "" + ] + }, + "execution_count": 55, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client" + ] + }, + { + "cell_type": "markdown", + "id": "bac247cc-e089-4a2d-9f6b-b5f565cf90d1", + "metadata": {}, + "source": [ + "## Score and Predict Large Datasets" + ] + }, + { + "cell_type": "code", + "execution_count": 56, + "id": "68f0aec9-36af-4818-b9b3-c9af6dcede84", + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np\n", + "import dask.array as da\n", + "from sklearn.datasets import make_classification" + ] + }, + { + "cell_type": "markdown", + "id": "0dcb00a8-6ba6-40bb-bcbc-e30e47af0ec1", + "metadata": {}, + "source": [ + "We'll generate a small random dataset with scikit-learn." + ] + }, + { + "cell_type": "code", + "execution_count": 57, + "id": "74433cd6-5997-424f-9004-723f6748ad21", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "array([[ 1.53682958, -1.39869399],\n", + " [ 1.36917601, -0.63734411],\n", + " [ 0.50231787, -0.45910529],\n", + " [ 1.83319262, -1.29808229],\n", + " [ 1.04235568, 1.12152929]])" + ] + }, + "execution_count": 57, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "X_train, y_train = make_classification(\n", + " n_features=2, n_redundant=0, n_informative=2,\n", + " random_state=1, n_clusters_per_class=1, n_samples=1000)\n", + "X_train[:5]" + ] + }, + { + "cell_type": "markdown", + "id": "54d40a88-145d-4fb0-bafd-b6d5f852f5dd", + "metadata": {}, + "source": [ + "And we'll clone that dataset many times with dask.array. X_large and y_large represent our larger than memory dataset." + ] + }, + { + "cell_type": "code", + "execution_count": 58, + "id": "92067947-c8ca-4bd6-a589-9b8c877b96ca", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "\n", + " \n", + " \n", + " \n", + " \n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
Array Chunk
Bytes 1.53 MiB 15.62 kiB
Shape (100000, 2) (1000, 2)
Dask graph 100 chunks in 2 graph layers
Data type float64 numpy.ndarray
\n", + "
\n", + " \n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + " \n", + " \n", + "\n", + " \n", + " \n", + "\n", + " \n", + " 2\n", + " 100000\n", + "\n", + "
" + ], + "text/plain": [ + "dask.array" + ] + }, + "execution_count": 58, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# Scale up: increase N, the number of times we replicate the data.\n", + "N = 100\n", + "X_large = da.concatenate([da.from_array(X_train, chunks=X_train.shape)\n", + " for _ in range(N)])\n", + "y_large = da.concatenate([da.from_array(y_train, chunks=y_train.shape)\n", + " for _ in range(N)])\n", + "X_large" + ] + }, + { + "cell_type": "markdown", + "id": "ea21bdbb-fbe5-4801-8132-b41d4d62f190", + "metadata": {}, + "source": [ + "Since our training dataset fits in memory, we can use a scikit-learn estimator as the actual estimator fit during training. But we know that we'll want to predict for a large dataset, so we'll wrap the scikit-learn estimator with ParallelPostFit." + ] + }, + { + "cell_type": "code", + "execution_count": 59, + "id": "3df99eb7-c643-4f00-a41c-d3e4f0f37a12", + "metadata": {}, + "outputs": [], + "source": [ + "from sklearn.linear_model import LogisticRegressionCV\n", + "from dask_ml.wrappers import ParallelPostFit" + ] + }, + { + "cell_type": "code", + "execution_count": 60, + "id": "aa0dfd51-ab6b-4709-bd5a-465c1d0f8350", + "metadata": {}, + "outputs": [], + "source": [ + "clf = ParallelPostFit(LogisticRegressionCV(cv=3), scoring=\"r2\")" + ] + }, + { + "cell_type": "markdown", + "id": "86d0c69c-2a06-4f6b-b59f-99301786f9d7", + "metadata": {}, + "source": [ + "See the note in the dask-ml's documentation about when and why a scoring parameter is needed: https://ml.dask.org/modules/generated/dask_ml.wrappers.ParallelPostFit.html#dask_ml.wrappers.ParallelPostFit.\n", + "\n", + "Now we'll call clf.fit. Dask-ML does nothing here, so this step can only use datasets that fit in memory." + ] + }, + { + "cell_type": "code", + "execution_count": 61, + "id": "7157844d-3d66-41d0-8360-0b4e4a700ff6", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
ParallelPostFit(estimator=LogisticRegressionCV(cv=3), scoring='r2')
In a Jupyter environment, please rerun this cell to show the HTML representation or trust the notebook.
On GitHub, the HTML representation is unable to render, please try loading this page with nbviewer.org.
" + ], + "text/plain": [ + "ParallelPostFit(estimator=LogisticRegressionCV(cv=3), scoring='r2')" + ] + }, + "execution_count": 61, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "clf.fit(X_train, y_train)" + ] + }, + { + "cell_type": "markdown", + "id": "262f1ee3-475e-4542-b662-e33be7302f78", + "metadata": {}, + "source": [ + "Now that training is done, we'll turn to predicting for the full (larger than memory) dataset." + ] + }, + { + "cell_type": "code", + "execution_count": 62, + "id": "fd23ebc8-8355-4a01-8493-b9e1f6ceb42a", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "\n", + " \n", + " \n", + " \n", + " \n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
Array Chunk
Bytes 781.25 kiB 7.81 kiB
Shape (100000,) (1000,)
Dask graph 100 chunks in 3 graph layers
Data type int64 numpy.ndarray
\n", + "
\n", + " \n", + "\n", + " \n", + " \n", + " \n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + " \n", + "\n", + " \n", + " 100000\n", + " 1\n", + "\n", + "
" + ], + "text/plain": [ + "dask.array<_predict, shape=(100000,), dtype=int64, chunksize=(1000,), chunktype=numpy.ndarray>" + ] + }, + "execution_count": 62, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "y_pred = clf.predict(X_large)\n", + "y_pred" + ] + }, + { + "cell_type": "markdown", + "id": "7e59a188-6de6-43fe-9648-54a4b5596c0d", + "metadata": {}, + "source": [ + "`y_pred` is a Dask array. Workers can write the predicted values to a shared file system, without ever having to collect the data on a single machine.\n", + "\n", + "Or we can check the models score on the entire large dataset. The computation will be done in parallel, and no single machine will have to hold all the data." + ] + }, + { + "cell_type": "markdown", + "id": "276d251a-81aa-4d4a-b155-38a1c02ae115", + "metadata": {}, + "source": [ + "## Results" + ] + }, + { + "cell_type": "code", + "execution_count": 63, + "id": "7782c30a-a237-49a0-840d-ebae6dedc6ab", + "metadata": {}, + "outputs": [], + "source": [ + "from dask.distributed import Client, performance_report\n", + "from IPython.display import IFrame" + ] + }, + { + "cell_type": "code", + "execution_count": 64, + "id": "8bbd740d-81d4-4b4f-84c7-42cefde782e5", + "metadata": {}, + "outputs": [], + "source": [ + "with performance_report(filename=\"dask-report.html\"):\n", + " # Place your Dask computations here\n", + " # For example, a delayed operation or compute()\n", + " result = clf.score(X_large, y_large) " + ] + }, + { + "cell_type": "code", + "execution_count": 65, + "id": "8f01f103-ec33-48e9-b7ce-fbac32f90e62", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "np.float64(0.596)" + ] + }, + "execution_count": 65, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "result" + ] + }, + { + "cell_type": "code", + "execution_count": 66, + "id": "932eb174-8587-478a-bb68-ccf951ed6b25", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "\n", + " \n", + " " + ], + "text/plain": [ + "" + ] + }, + "execution_count": 66, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# Display the performance report within the notebook\n", + "IFrame(src=\"dask-report.html\", width=\"100%\", height=\"500px\")" + ] + }, + { + "cell_type": "markdown", + "id": "251cacbb-f69d-4484-991a-e2671876a66c", + "metadata": {}, + "source": [ + "## Clean Up" + ] + }, + { + "cell_type": "markdown", + "id": "a4678b96-915c-482d-bbe5-c1b5d250dd46", + "metadata": {}, + "source": [ + "After the prediction, we can close the client and throw-away the Dask Openstack Cloud Cluster" + ] + }, + { + "cell_type": "code", + "execution_count": 67, + "id": "1a861ef5-f98c-4ea2-8883-36eaf03eec2d", + "metadata": {}, + "outputs": [], + "source": [ + "client.close()" + ] + }, + { + "cell_type": "code", + "execution_count": 68, + "id": "19781f45", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Terminated instance dask-d523cb04-worker-96fd7205\n", + "Terminated instance dask-d523cb04-worker-e0f7591e\n", + "Terminated instance dask-d523cb04-scheduler\n" + ] + } + ], + "source": [ + "cluster.close()" + ] + }, + { + "cell_type": "markdown", + "id": "fab56ddf-700c-46e5-80f2-055a8b01d421", + "metadata": {}, + "source": [ + "## Credits" + ] + }, + { + "cell_type": "markdown", + "id": "3d409e66-838a-4fe6-948c-7d2fc9935fd9", + "metadata": {}, + "source": [ + "All credit for [Score and Predict Large Datasets](https://github.com/dask/dask-examples/blob/main/machine-learning/parallel-prediction.ipynb) belongs to their respective developers\n", + "\n", + "* https://examples.dask.org/machine-learning/parallel-prediction.html\n", + "* https://github.com/dask/dask-examples/blob/main/machine-learning/parallel-prediction.ipynb" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.14" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/setup.py b/setup.py index 64ca18a5..c00dbcdb 100644 --- a/setup.py +++ b/setup.py @@ -15,6 +15,8 @@ "digitalocean": ["python-digitalocean>=1.15.0"], "gcp": ["google-api-python-client>=1.12.5", "google-auth>=1.23.0"], "hetzner": ["hcloud>=1.10.0"], + "ibm": ["ibm_code_engine_sdk>=3.1.0"], + "openstack": ["openstacksdk>=3.3.0"], } extras_require["all"] = set(pkg for pkgs in extras_require.values() for pkg in pkgs) @@ -37,5 +39,5 @@ [console_scripts] dask-ecs=dask_cloudprovider.cli.ecs:go """, - python_requires=">=3.9", + python_requires=">=3.10", ) diff --git a/versioneer.py b/versioneer.py index 722989c3..d38b5b79 100644 --- a/versioneer.py +++ b/versioneer.py @@ -343,9 +343,9 @@ def get_config_from_root(root): # configparser.NoOptionError (if it lacks "VCS="). See the docstring at # the top of versioneer.py for instructions on writing your setup.cfg . setup_cfg = os.path.join(root, "setup.cfg") - parser = configparser.SafeConfigParser() + parser = configparser.ConfigParser() with open(setup_cfg, "r") as f: - parser.readfp(f) + parser.read_file(f) VCS = parser.get("versioneer", "VCS") # mandatory def get(parser, name):