Skip to content

Commit 003bf92

Browse files
authored
Make threads the default executor (#621)
* Make `threads` the default executor * Fix chunking in store tests and run on multiple executors * Use single-threaded executor in test_default_spec_config_override to avoid memory error * Don't measure peak mem with threads executor
1 parent 4d79a26 commit 003bf92

File tree

6 files changed

+60
-23
lines changed

6 files changed

+60
-23
lines changed

cubed/core/array.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -274,9 +274,9 @@ def compute(
274274
if executor is None:
275275
executor = arrays[0].spec.executor
276276
if executor is None:
277-
from cubed.runtime.executors.local import SingleThreadedExecutor
277+
from cubed.runtime.executors.local import ThreadsExecutor
278278

279-
executor = SingleThreadedExecutor()
279+
executor = ThreadsExecutor()
280280

281281
_return_in_memory_array = kwargs.pop("_return_in_memory_array", True)
282282
plan.execute(

cubed/runtime/executors/local.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from cubed.runtime.types import Callback, CubedPipeline, DagExecutor, TaskEndEvent
1919
from cubed.runtime.utils import (
2020
execution_stats,
21+
execution_timing,
2122
handle_callbacks,
2223
handle_operation_start_callbacks,
2324
profile_memray,
@@ -61,9 +62,14 @@ def execute_dag(
6162
[callback.on_task_end(event) for callback in callbacks]
6263

6364

65+
@execution_timing
66+
def run_func_threads(input, func=None, config=None, name=None, compute_id=None):
67+
return func(input, config=config)
68+
69+
6470
@profile_memray
6571
@execution_stats
66-
def run_func(input, func=None, config=None, name=None, compute_id=None):
72+
def run_func_processes(input, func=None, config=None, name=None, compute_id=None):
6773
return func(input, config=config)
6874

6975

@@ -142,7 +148,11 @@ def create_futures_func_multiprocessing(input, **kwargs):
142148

143149

144150
def pipeline_to_stream(
145-
concurrent_executor: Executor, name: str, pipeline: CubedPipeline, **kwargs
151+
concurrent_executor: Executor,
152+
run_func: Callable,
153+
name: str,
154+
pipeline: CubedPipeline,
155+
**kwargs,
146156
) -> Stream:
147157
return stream.iterate(
148158
map_unordered(
@@ -200,15 +210,17 @@ async def async_execute_dag(
200210
mp_context=context,
201211
max_tasks_per_child=max_tasks_per_child,
202212
)
213+
run_func = run_func_processes
203214
else:
204215
concurrent_executor = ThreadPoolExecutor(max_workers=max_workers)
216+
run_func = run_func_threads
205217
try:
206218
if not compute_arrays_in_parallel:
207219
# run one pipeline at a time
208220
for name, node in visit_nodes(dag, resume=resume):
209221
handle_operation_start_callbacks(callbacks, name)
210222
st = pipeline_to_stream(
211-
concurrent_executor, name, node["pipeline"], **kwargs
223+
concurrent_executor, run_func, name, node["pipeline"], **kwargs
212224
)
213225
async with st.stream() as streamer:
214226
async for _, stats in streamer:
@@ -218,7 +230,7 @@ async def async_execute_dag(
218230
# run pipelines in the same topological generation in parallel by merging their streams
219231
streams = [
220232
pipeline_to_stream(
221-
concurrent_executor, name, node["pipeline"], **kwargs
233+
concurrent_executor, run_func, name, node["pipeline"], **kwargs
222234
)
223235
for name, node in gen
224236
]

cubed/runtime/utils.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,33 @@ def execute_with_stats(function, *args, **kwargs):
4040
)
4141

4242

43+
def execute_with_timing(function, *args, **kwargs):
44+
"""Invoke function and measure timing information.
45+
46+
Returns the result of the function call and a stats dictionary.
47+
"""
48+
49+
function_start_tstamp = time.time()
50+
result = function(*args, **kwargs)
51+
function_end_tstamp = time.time()
52+
return result, dict(
53+
function_start_tstamp=function_start_tstamp,
54+
function_end_tstamp=function_end_tstamp,
55+
)
56+
57+
4358
def execution_stats(func):
4459
"""Decorator to measure timing information and peak memory usage of a function call."""
4560

4661
return partial(execute_with_stats, func)
4762

4863

64+
def execution_timing(func):
65+
"""Decorator to measure timing information of a function call."""
66+
67+
return partial(execute_with_timing, func)
68+
69+
4970
def execute_with_memray(function, input, **kwargs):
5071
# only run memray if installed, and only for first input (for operations that run on block locations)
5172
if (

cubed/tests/test_core.py

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -127,43 +127,43 @@ def test_from_zarr(tmp_path, spec, executor, path):
127127
)
128128

129129

130-
def test_store(tmp_path, spec):
130+
def test_store(tmp_path, spec, executor):
131131
a = xp.asarray([[1, 2, 3], [4, 5, 6], [7, 8, 9]], chunks=(2, 2), spec=spec)
132132

133133
store = tmp_path / "source.zarr"
134-
target = zarr.empty(a.shape, store=store)
134+
target = zarr.empty(a.shape, chunks=a.chunksize, store=store)
135135

136-
cubed.store(a, target)
136+
cubed.store(a, target, executor=executor)
137137
assert_array_equal(target[:], np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]]))
138138

139139

140-
def test_store_multiple(tmp_path, spec):
140+
def test_store_multiple(tmp_path, spec, executor):
141141
a = xp.asarray([[1, 2, 3], [4, 5, 6], [7, 8, 9]], chunks=(2, 2), spec=spec)
142142
b = xp.asarray([[1, 1, 1], [1, 1, 1], [1, 1, 1]], chunks=(2, 2), spec=spec)
143143

144144
store1 = tmp_path / "source1.zarr"
145-
target1 = zarr.empty(a.shape, store=store1)
145+
target1 = zarr.empty(a.shape, chunks=a.chunksize, store=store1)
146146
store2 = tmp_path / "source2.zarr"
147-
target2 = zarr.empty(b.shape, store=store2)
147+
target2 = zarr.empty(b.shape, chunks=b.chunksize, store=store2)
148148

149-
cubed.store([a, b], [target1, target2])
149+
cubed.store([a, b], [target1, target2], executor=executor)
150150
assert_array_equal(target1[:], np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]]))
151151
assert_array_equal(target2[:], np.array([[1, 1, 1], [1, 1, 1], [1, 1, 1]]))
152152

153153

154-
def test_store_fails(tmp_path, spec):
154+
def test_store_fails(tmp_path, spec, executor):
155155
a = xp.asarray([[1, 2, 3], [4, 5, 6], [7, 8, 9]], chunks=(2, 2), spec=spec)
156156
b = xp.asarray([[1, 2, 3], [4, 5, 6], [7, 8, 9]], chunks=(2, 2), spec=spec)
157157
store = tmp_path / "source.zarr"
158-
target = zarr.empty(a.shape, store=store)
158+
target = zarr.empty(a.shape, chunks=a.chunksize, store=store)
159159

160160
with pytest.raises(
161161
ValueError, match=r"Different number of sources \(2\) and targets \(1\)"
162162
):
163-
cubed.store([a, b], [target])
163+
cubed.store([a, b], [target], executor=executor)
164164

165165
with pytest.raises(ValueError, match="All sources must be cubed array objects"):
166-
cubed.store([1], [target])
166+
cubed.store([1], [target], executor=executor)
167167

168168

169169
@pytest.mark.parametrize("path", [None, "sub", "sub/group"])
@@ -370,7 +370,9 @@ def test_default_spec_config_override():
370370
# override default spec to increase allowed_mem
371371
from cubed import config
372372

373-
with config.set({"spec.allowed_mem": "4GB"}):
373+
with config.set(
374+
{"spec.allowed_mem": "4GB", "spec.executor_name": "single-threaded"}
375+
):
374376
a = xp.ones((20000, 10000), chunks=(10000, 10000))
375377
b = xp.negative(a)
376378
assert_array_equal(b.compute(), -np.ones((20000, 10000)))

docs/configuration.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,9 @@ These properties can be passed directly to the {py:class}`Spec <cubed.Spec>` con
9595
| Property | Default | Description |
9696
|--------------------|-------------------|-----------------------------------------------------------------------------------------------------------------------------------------|
9797
| `work_dir` | `None` | The directory path (specified as an fsspec URL) used for storing intermediate data. If not set, the user's temporary directory is used. |
98-
| `allowed_mem` | `"2GB"` | The total memory available to a worker for running a task. This includes any `reserved_mem` that has been set. |
99-
| `reserved_mem` | `"100MB"` | The memory reserved on a worker for non-data use when running a task |
100-
| `executor_name` | `"single-threaded"` | The executor for running computations. One of `"single-threaded"`, `"threads"`, `"processes"`, `"beam"`, `"coiled"`, `"dask"`, `"lithops"`, `"modal"`. |
98+
| `allowed_mem` | `"2GB"` | The total memory available to a worker for running a task. This includes any `reserved_mem` that has been set. |
99+
| `reserved_mem` | `"100MB"` | The memory reserved on a worker for non-data use when running a task |
100+
| `executor_name` | `"threads"` | The executor for running computations. One of `"single-threaded"`, `"threads"`, `"processes"`, `"beam"`, `"coiled"`, `"dask"`, `"lithops"`, `"modal"`. |
101101
| `executor_options` | `None` | Options to pass to the executor on construction. See below for possible options for each executor. |
102102
| `zarr_compressor` | `"default"`| The compressor used by Zarr for intermediate data. If not specified, or set to `"default"`, Zarr will use the default Blosc compressor. If set to `None`, compression is disabled, which can be a good option when using local storage. Use a dictionary (or nested YAML) to configure arbitrary compression using Numcodecs. |
103103

docs/user-guide/executors.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ Cubed provides a variety of executors for running the tasks in a computation, wh
66

77
## Local single-machine executors
88

9-
If you don't specify an executor then the local in-process single-threaded Python executor is used. This is a very simple executor (called `single-threaded`) that is intended for testing on small amounts of data before running larger computations using the `processes` executor on a single machine, or a distributed executor in the cloud.
9+
If you don't specify an executor then the local in-process multi-threaded Python executor is used by default. This is called the `threads` executor. It doesn't require any set up so it is useful for quickly getting started and running on datasets that don't fit in memory, but that can fit on a single machine's disk.
1010

11-
The `processes` executor runs on a single machine, and uses all the cores on the machine. It doesn't require any set up so it is useful for quickly getting started and running on datasets that don't fit in memory, but can fit on a single machine's disk.
11+
The `processes` executor also runs on a single machine, and uses all the cores on the machine. However, unlike the `threads` executor, each task runs in a separate process, which avoids GIL contention, but adds some overhead in process startup time and communication. Typically, running using `processes` is more performant than `threads`, but it is worth trying both on your workload to see which is best.
12+
13+
There is a third local executor called `single-threaded` that runs tasks sequentially in a single thread, and is intended for testing on small amounts of data.
1214

1315
## Which cloud service executor should I use?
1416

0 commit comments

Comments
 (0)