Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .github/actions/run-test/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,22 @@ runs:
shell: bash
run: |
uv pip install --system -r examples/applications/requirements_applications.txt
uv pip install --system -r examples/ray_compat/requirements.txt
for example in "./examples"/*.py; do
echo "Running $example"
python $example
done
readarray -t skip_tests < examples/ray_compat/skip_tests.txt
for example in "./examples/ray_compat"/*.py; do
filename=$(basename "$example")
if [[ " ${skip_tests[*]} " =~ [[:space:]]${filename}[[:space:]] ]]; then
echo "Skipping $example"
continue
fi

echo "Running $example"
python $example
done
for example in "./examples/applications"/*.py; do
if python -c 'import sys; sys.exit(not sys.version_info <= (3, 10))'; then
if [ "$example" = "./examples/applications/yfinance_historical_price.py" ]; then
Expand Down
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Content

tutorials/quickstart
tutorials/features
tutorials/compatibility/ray
tutorials/configuration
tutorials/examples
tutorials/development/devcontainer
Expand Down
129 changes: 129 additions & 0 deletions docs/source/tutorials/compatibility/ray.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
.. _ray_compatibility:

Ray Compatibility Layer
=======================

Scaler is a lightweight distributed computation engine similar to Ray. Scaler supports many of the same concepts as Ray including
remote functions (known as tasks in Scaler), futures, cluster object storage, labels (known as capabilities in Scaler), and it comes with comparable monitoring tools.

Unlike Ray, Scaler supports both local clusters and also easily integrates with multiple cloud providers out of the box, including AWS EC2 and IBM Symphony,
with more providers planned for the future. You can view our `roadmap on GitHub <https://github.com/finos/opengris-scaler/discussions/333>`_
for details on upcoming cloud integrations.

Scaler provides a compatibility layer that allows developers familiar with the `Ray <https://www.ray.io/>`_ API to adopt Scaler with minimal code changes.

Quickstart
----------

To start using Scaler's Ray compatibility layer, ensure you have `opengris-scaler <https://pypi.org/project/opengris-scaler/>`_ installed in your Python environment.

Then import ``scaler.compat.ray`` in your application after importing ``ray`` and before using any Ray APIs.

This import patches the ``ray`` module, allowing you to use Ray's API as you normally would.

.. code-block:: python

import ray
import scaler.compat.ray

# existing Ray app

This will start a new local scheduler and cluster combo. To use an existing cluster, pass the address of the scheduler to ``scaler_init()``:

.. code-block:: python

import ray
from scaler.compat.ray import scaler_init

# connects to an existing cluster
# when an address is provided, a local cluster is not started
scaler_init(address="tcp://<scheduler-ip>:<scheduler-port>")

# existing Ray app

You can also provide scheduler and cluster configuration options to ``scaler_init()`` to configure the locally created cluster:

.. code-block:: python

import ray
from scaler.compat.ray import scaler_init

# overrides the number of workers in the implicitly-created local cluster (defaults to number of CPU cores)
scaler_init(n_workers=5)

# existing Ray app

Remote Function Limitations
---------------------------

``ray.remote()`` accepts many parameters, but Scaler's compatibility layer only supports ``num_returns``. Other parameters will be ignored.

Shutting Down
-------------

The implicitly-created local cluster is a subprocess with global scope, and won't be shut down automatically.
This can cause your program to keep executing after your program has completed, it is therefore important to call ``ray.shutdown()``
when your program is done when using the implicit local cluster.

A Note about the Actor Model
----------------------------

Ray supports a powerful actor model that allows for stateful computation. This is currently not supported by the Scaler, but is planned for a future release.

This documentation will be updated when actor support is added. For now please view our `roadmap on GitHub <https://github.com/finos/opengris-scaler/discussions/333>`_ for more details.

Decorating a class with ``@ray.remote`` will raise a ``NotImplementedError``.

Full Examples
-------------

See `the examples directory <https://github.com/finos/opengris-scaler/tree/main/examples/ray_compat>`_ for complete Scaler Ray compatibility layer examples including:

* `basic_local_cluster.py`: Demonstrates using the Scaler Ray compatibility layer with the implicitly-created local cluster.
* `basic_remote_cluster.py`: Demonstrates using the Scaler Ray compatibility layer with an existing remote cluster.
* `batch_prediction.py`: Demonstrates using Scaler's Ray compatibility layer for batch prediction, copied from Ray Core's documentation.
* `highly_parallel.py`: Demonstrates highly parallel computations, copied from Ray Core's documentation.
* `map_reduce.py`: Demonstrates a MapReduce pattern using Scaler's Ray compatibility layer, copied from Ray Core's documentation.
* `plot_hyperparameter.py`: Demonstrates hyperparameter tuning and plotting, copied from Ray Core's documentation.
* `web_crawler.py`: Demonstrates a web crawling example, copied from Ray Core's documentation.

Supported APIs
--------------

The compatibility layer supports a subset of Ray Core's API.

Below is a comprehensive list of the supported APIs. Functions and classes not in this list

Core API
~~~~~~~~

* ``@ray.remote``: Only supports remote functions. Decorating a class with ``@ray.remote`` will raise a ``NotImplementedError``.
* ``ray.shutdown()``
* ``ray.is_initialized()``
* ``ray.get()``
* ``ray.put()``
* ``ray.wait()``
* ``ray.cancel()``

Ray Utilities (`ray.util`)
~~~~~~~~~~~~~~~~~~~~~~~~~~

* ``ray.util.as_completed()``
* ``ray.util.map_unordered()``

Unsupported APIs
~~~~~~~~~~~~~~~~

The following APIs are not supported by the Scaler Ray compatibility layer.

Some functions will be no-ops or return a mock object while others will raise a ``NotImplementedError`` exception.

* ``ray.init()``: No-op. Use ``scaler_init()`` from ``scaler.compat.ray`` instead.
* ``ray.get_actor()``: Returns a mock object.
* ``ray.method()``: Raises ``NotImplementedError``.
* ``ray.actor``: Raises ``NotImplementedError``.
* ``ray.runtime_context``: Raises ``NotImplementedError``.
* ``ray.cross_language``: Raises ``NotImplementedError``.
* ``ray.get_gpu_ids()``: Raises ``NotImplementedError``.
* ``ray.get_runtime_context()``: Raises ``NotImplementedError``.
* ``ray.kill()``: Raises ``NotImplementedError``.
26 changes: 26 additions & 0 deletions examples/ray_compat/basic_local_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""This is a basic example showing the minimal changes needed to start using Scaler for a Ray application."""

import ray

# this patches the ray module
import scaler.compat.ray # noqa: F401


def main():
# the scaler is implicitly initialized here
# see basic_remote_cluster.py for more advanced usage
@ray.remote
def my_function():
return 1

# this is executed by the local scaler cluster
future = my_function.remote()
assert ray.get(future) == 1

# the implicitly-created cluster is globally-scoped
# so we need to shut it down explicitly
ray.shutdown()


if __name__ == "__main__":
main()
35 changes: 35 additions & 0 deletions examples/ray_compat/basic_remote_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""This is a basic example showing the minimal changes needed to start using Scaler for a Ray application."""

import ray

from scaler.cluster.combo import SchedulerClusterCombo

# this patches the ray module
from scaler.compat.ray import scaler_init


# this is an example and we don't have a real remote cluster here
# so for demonstration purposes we just start a local cluster
def start_remote_cluster() -> SchedulerClusterCombo:
return SchedulerClusterCombo(n_workers=1)


def main(address: str):
# explicitly init the scaler
# we explicitly provide the address of the remote scheduler
scaler_init(address=address)

@ray.remote
def my_function():
return 1

# this is executed by the remote scaler cluster
future = my_function.remote()
assert ray.get(future) == 1


if __name__ == "__main__":
combo = start_remote_cluster()
main(combo.get_address())

combo.shutdown()
78 changes: 78 additions & 0 deletions examples/ray_compat/batch_prediction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""
This example was copied from https://docs.ray.io/en/latest/ray-core/examples/batch_prediction.html

Like in `highly_parallel.py`, only minimal changes are needed for the example to work on Scaler.
"""


import numpy as np
import pandas as pd
import pyarrow.parquet as pq
import ray

# changed line 1/2
import scaler.compat.ray # noqa: F401


def load_model():
# A dummy model.
def model(batch: pd.DataFrame) -> pd.DataFrame:
# Dummy payload so copying the model will actually copy some data
# across nodes.
model.payload = np.zeros(100_000_000) # type: ignore[attr-defined]
return pd.DataFrame({"score": batch["passenger_count"] % 2 == 0})

return model


def main():
@ray.remote
def make_prediction(model, shard_path):
df = pq.read_table(shard_path).to_pandas()
result = model(df)

# Write out the prediction result.
# NOTE: unless the driver will have to further process the
# result (other than simply writing out to storage system),
# writing out at remote task is recommended, as it can avoid
# congesting or overloading the driver.
# ...

# Here we just return the size about the result in this example.
return len(result)

# 12 files, one for each remote task.
input_files = [
f"s3://anonymous@air-example-data/ursa-labs-taxi-data/downsampled_2009_full_year_data.parquet"
f"/fe41422b01c04169af2a65a83b753e0f_{i:06d}.parquet"
for i in range(12)
]

# ray.put() the model just once to local object store, and then pass the
# reference to the remote tasks.
model = load_model()
model_ref = ray.put(model)

result_refs = []

# Launch all prediction tasks.
for file in input_files:
# Launch a prediction task by passing model reference and shard file to it.
# NOTE: it would be highly inefficient if you are passing the model itself
# like make_prediction.remote(model, file), which in order to pass the model
# to remote node will ray.put(model) for each task, potentially overwhelming
# the local object store and causing out-of-disk error.
result_refs.append(make_prediction.remote(model_ref, file))

results = ray.get(result_refs)

# Let's check prediction output size.
for r in results:
print("Prediction output size:", r)


if __name__ == "__main__":
main()

# changed line 2/2
ray.shutdown()
50 changes: 50 additions & 0 deletions examples/ray_compat/highly_parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""
This example was copied from https://docs.ray.io/en/latest/ray-core/examples/highly_parallel.html

Only one or two changes are needed to make this example work on Scaler.
First is to import the compatibility layer, this patches Ray Core's API.
The second is to call `ray.shutdown()`, necessary only if using a local cluster.
"""

import random
import time
from fractions import Fraction

import ray

# this is one of only two changed lines
import scaler.compat.ray # noqa: F401

# Let's start Ray
ray.init(address="auto")


def main():
@ray.remote
def pi4_sample(sample_count):
"""pi4_sample runs sample_count experiments, and returns the
fraction of time it was inside the circle.
"""
in_count = 0
for i in range(sample_count):
x = random.random()
y = random.random()
if x * x + y * y <= 1:
in_count += 1
return Fraction(in_count, sample_count)

SAMPLE_COUNT = 1000 * 1000
start = time.time()
future = pi4_sample.remote(sample_count=SAMPLE_COUNT) # type: ignore[call-arg]
pi4 = ray.get(future) # noqa: F841
end = time.time()
dur = end - start
print(f"Running {SAMPLE_COUNT} tests took {dur} seconds")


if __name__ == "__main__":
main()

# this is the second changed line
# we need to explicitly shut down the implicit cluster
ray.shutdown()
Loading