Skip to content

Commit c1784df

Browse files
authored
Dask compat fallback (#4059)
* auto fallback to compat DaskVine * add test
1 parent c732f7c commit c1784df

File tree

4 files changed

+164
-31
lines changed

4 files changed

+164
-31
lines changed

taskvine/src/bindings/python3/ndcctools/taskvine/__init__.py

+5-12
Original file line numberDiff line numberDiff line change
@@ -71,23 +71,16 @@ class DaskVineWarning(UserWarning):
7171
vd = Version(dask.__version__)
7272
vr = Version("2024.12.0")
7373
if vd < vr:
74-
warnings.warn("ndcctools.taskvine.DaskVine only works with dask version >= 2024.12.0", DaskVineWarning)
74+
raise ImportError
7575

7676
from .dask_executor import DaskVine
7777
from .dask_dag import DaskVineDag
78-
except ImportError as e:
79-
warnings.warn(f"DaskVine not available. Couldn't find module: {e.name}", DaskVineWarning)
78+
except (ImportError, ModuleNotFoundError):
79+
warnings.warn("Dask >= 2024.12.0 not available, using DaskVine legacy task graph representation.", DaskVineWarning)
8080

81-
##
82-
# DaskVine compatibility class.
83-
# See @ref dask_executor.DaskVine
84-
class DaskVine:
85-
exception = ImportError()
81+
from .compat import DaskVine
82+
from .compat import DaskVineDag
8683

87-
def __init__(*args, **kwargs):
88-
raise DaskVine.exception
89-
90-
DaskVine.exception = e
9184

9285
__all__ = [
9386
"Manager",

taskvine/src/bindings/python3/ndcctools/taskvine/compat/__init__.py

+10-19
Original file line numberDiff line numberDiff line change
@@ -24,32 +24,23 @@
2424
# form ndcctools.taskvine.compat import DaskVine
2525
# @endcode
2626
#
27-
28-
import dask
2927
import warnings
3028
from packaging.version import Version
31-
vd = Version(dask.__version__)
32-
vr = Version("2024.12.0")
33-
if vd >= vr:
34-
warnings.warn("ndcctools.taskvine.compat only works with dask version < 2024.12.0")
35-
3629

3730
try:
38-
from .dask_executor import DaskVine
39-
from .dask_dag import DaskVineDag
40-
except ImportError as e:
41-
print(f"DaskVine not available. Couldn't find module: {e.name}")
31+
import dask
32+
33+
vd = Version(dask.__version__)
34+
vr = Version("2024.12.0")
4235

43-
##
44-
# DaskVine compatibility class.
45-
# See @ref dask_executor.DaskVine
46-
class DaskVine:
47-
exception = ImportError()
36+
if vd >= vr:
37+
warnings.warn("ndcctools.taskvine.compat only works with dask version < 2024.12.0")
38+
except (ImportError, ModuleNotFoundError):
39+
pass
4840

49-
def __init__(*args, **kwargs):
50-
raise DaskVine.exception
5141

52-
DaskVine.exception = e
42+
from .dask_executor import DaskVine
43+
from .dask_dag import DaskVineDag
5344

5445
__all__ = [
5546
"DaskVine",
+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
#!/bin/sh
2+
set -e
3+
4+
. ../../dttools/test/test_runner_common.sh
5+
6+
import_config_val CCTOOLS_PYTHON_TEST_EXEC
7+
import_config_val CCTOOLS_PYTHON_TEST_DIR
8+
9+
export PYTHONPATH=$(pwd)/../../test_support/python_modules/${CCTOOLS_PYTHON_TEST_DIR}:$PYTHONPATH
10+
export PATH=$(pwd)/../src/worker:$(pwd)/../../batch_job/src:$PATH
11+
12+
STATUS_FILE=vine.status
13+
PORT_FILE=vine.port
14+
15+
check_needed()
16+
{
17+
[ -n "${CCTOOLS_PYTHON_TEST_EXEC}" ] || return 1
18+
"${CCTOOLS_PYTHON_TEST_EXEC}" -c "import cloudpickle; import dask" || return 1
19+
20+
return 0
21+
}
22+
23+
prepare()
24+
{
25+
rm -f $STATUS_FILE
26+
rm -f $PORT_FILE
27+
28+
return 0
29+
}
30+
31+
run()
32+
{
33+
${CCTOOLS_PYTHON_TEST_EXEC} vine_task_graph_compat.py
34+
echo $? > $STATUS_FILE
35+
36+
# retrieve taskvine exit status
37+
status=$(cat $STATUS_FILE)
38+
if [ $status -ne 0 ]
39+
then
40+
exit 1
41+
fi
42+
43+
exit 0
44+
}
45+
46+
clean()
47+
{
48+
rm -rf vine-run-info
49+
exit 0
50+
}
51+
52+
dispatch "$@"
+97
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
#!/usr/bin/env python
2+
3+
# Copyright (C) 2023- The University of Notre Dame
4+
# This software is distributed under the GNU General Public License.
5+
# See the file COPYING for details.
6+
7+
# This example shows TaskVine executing a manually constructed dask graph.
8+
# See vine_example_dask_delayed.py for an example where the graph
9+
# is constructed by dask.
10+
11+
import ndcctools.taskvine as vine
12+
from ndcctools.taskvine.compat import DaskVine
13+
import argparse
14+
import getpass
15+
import sys
16+
import traceback
17+
18+
19+
from operator import add # use add function in the example graph
20+
dsk_graph = {
21+
"x": 1,
22+
"y": 2,
23+
"z": (add, "x", "y"),
24+
"w": (sum, ["x", "y", "z"]),
25+
"v": [(sum, ["w", "z"]), 2],
26+
"t": (sum, "v")
27+
}
28+
29+
expected_result = 11
30+
31+
if __name__ == "__main__":
32+
parser = argparse.ArgumentParser(
33+
prog="vine_example_dask_graph.py",
34+
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
35+
description="""This example shows TaskVine executing a manually constructed dask graph.
36+
See vine_example_dask_delayed.py for an example where the graph
37+
is constructed by dask.""")
38+
parser.add_argument(
39+
"--name",
40+
nargs="?",
41+
type=str,
42+
help="name to assign to the manager.",
43+
default=f"vine-dask-graph-{getpass.getuser()}",
44+
)
45+
parser.add_argument(
46+
"--port",
47+
nargs="?",
48+
type=int,
49+
help="port for the manager to listen for connections. If 0, pick any available.",
50+
default=9123,
51+
)
52+
parser.add_argument(
53+
"--disable-peer-transfers",
54+
action="store_true",
55+
help="disable transfers among workers.",
56+
default=False,
57+
)
58+
59+
args = parser.parse_args()
60+
61+
m = DaskVine(port=args.port, ssl=True)
62+
m.set_name(args.name)
63+
print(f"Listening for workers at port: {m.port}")
64+
65+
if args.disable_peer_transfers:
66+
m.disable_peer_transfers()
67+
68+
# checkpoint at even levels when nodes have at least one dependency
69+
def checkpoint(dag, key):
70+
if dag.depth_of(key) % 2 == 0 and len(dag.get_dependencies(key)) > 0:
71+
print(f"checkpoint for {key}")
72+
return True
73+
return False
74+
75+
f = vine.Factory(manager=m)
76+
f.cores = 4
77+
f.disk = 2000
78+
f.memory = 2000
79+
f.max_workers = 1
80+
f.min_workers = 1
81+
with f:
82+
desired_keys = ["t", "v"]
83+
desired_keys = list(dsk_graph.keys())
84+
print(f"dask graph example is:\n{dsk_graph}")
85+
print(f"desired keys are {desired_keys}")
86+
87+
try:
88+
results = m.get(dsk_graph, desired_keys, lazy_transfers=True, checkpoint_fn=checkpoint, resources={"cores": 1})
89+
print({k: v for k, v in zip(desired_keys, results)})
90+
except Exception:
91+
traceback.print_exc()
92+
93+
print("Terminating workers...", end="")
94+
95+
print("done!")
96+
sys.exit(0)
97+
# vim: set sts=4 sw=4 ts=4 expandtab ft=python:

0 commit comments

Comments
 (0)