Skip to content

Commit 0509ea8

Browse files
RamenModeKevin Xuebtovar
authored
Implementation of taskvine allpairs/map/reduce (#4011)
* Implementation of taskvine allpairs/map/reduce * lint * lint v2 * cleanup code * cleanup reduce * add test * remove debug print * cleanup map * format * allpairs in terms of map * format * do not create lib in map * error on lib name --------- Co-authored-by: Kevin Xue <[email protected]> Co-authored-by: Benjamin Tovar <[email protected]>
1 parent 8db0273 commit 0509ea8

File tree

3 files changed

+279
-22
lines changed

3 files changed

+279
-22
lines changed

taskvine/src/bindings/python3/ndcctools/taskvine/futures.py

+135-22
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,33 @@
1-
21
from . import cvine
32
import hashlib
4-
from collections import deque
5-
from concurrent.futures import Executor
6-
from concurrent.futures import Future
7-
from concurrent.futures import FIRST_COMPLETED
8-
from concurrent.futures import FIRST_EXCEPTION
9-
from concurrent.futures import ALL_COMPLETED
10-
from concurrent.futures._base import PENDING
11-
from concurrent.futures._base import CANCELLED
12-
from concurrent.futures._base import FINISHED
3+
from collections import deque, namedtuple
4+
from concurrent.futures import (
5+
Executor,
6+
Future,
7+
FIRST_COMPLETED,
8+
FIRST_EXCEPTION,
9+
ALL_COMPLETED,
10+
)
11+
from concurrent.futures._base import PENDING, CANCELLED, FINISHED
1312
from concurrent.futures import TimeoutError
14-
from collections import namedtuple
13+
1514
from .task import (
1615
PythonTask,
1716
FunctionCall,
1817
FunctionCallNoResult,
1918
)
19+
2020
from .manager import (
2121
Factory,
2222
Manager,
2323
)
2424

25+
import math
2526
import os
2627
import time
2728
import textwrap
29+
from functools import partial
30+
from collections.abc import Sequence
2831

2932
RESULT_PENDING = 'result_pending'
3033

@@ -109,7 +112,7 @@ def as_completed(fs, timeout=None):
109112
f.module_manager.submit(f._task)
110113

111114
start = time.perf_counter()
112-
result_timeout = min(timeout, 5) if timeout is not None else 5
115+
result_timeout = max(1, min(timeout, 5)) if timeout else 5
113116

114117
def _iterator():
115118
# iterate of queue of futures, yeilding completed futures and
@@ -133,22 +136,39 @@ def _iterator():
133136
assert result != RESULT_PENDING
134137
yield f
135138

136-
if (
137-
fs and timeout is not None
138-
and time.perf_counter() - start > timeout
139-
):
139+
if fs and timeout and time.perf_counter() - start > timeout:
140140
raise TimeoutError()
141141

142142
return _iterator()
143143

144144

145+
def run_iterable(fn, *args):
146+
return list(map(fn, args))
147+
148+
149+
def reduction_tree(fn, *args, n=2):
150+
# n is the arity of the reduction function fn
151+
# if less than 2, we have an infinite loop
152+
assert n > 1
153+
entries = [f.result() if isinstance(f, VineFuture) else f for f in args]
154+
if len(entries) < 2:
155+
return entries[0]
156+
157+
len_multiple = int(math.ceil(len(entries) / n) * n)
158+
new_args = map(fn, [entries[i:i + n] for i in range(0, len_multiple, n)])
159+
160+
return reduction_tree(fn, *new_args, n=n)
161+
145162
##
146163
# \class FuturesExecutor
147164
#
148165
# TaskVine FuturesExecutor object
149166
#
150167
# This class acts as an interface for the creation of Futures
168+
169+
151170
class FuturesExecutor(Executor):
171+
152172
def __init__(self, port=9123, batch_type="local", manager=None, manager_host_port=None, manager_name=None, factory_binary=None, worker_binary=None, log_file=os.devnull, factory=True, opts={}):
153173
self.manager = Manager(port=port)
154174
self.port = self.manager.port
@@ -173,6 +193,100 @@ def __init__(self, port=9123, batch_type="local", manager=None, manager_host_por
173193
else:
174194
self.factory = None
175195

196+
def map(self, fn, iterable, library_name=None, chunk_size=1):
197+
assert chunk_size > 0
198+
assert isinstance(iterable, Sequence)
199+
200+
def wait_for_map_resolution(*futures_batch):
201+
result = []
202+
for f in futures_batch:
203+
result.extend(f.result() if isinstance(f, VineFuture) else f)
204+
return result
205+
206+
tasks = []
207+
fn_wrapped = partial(run_iterable, fn)
208+
while iterable:
209+
heads, iterable = iterable[:chunk_size], iterable[chunk_size:]
210+
211+
if library_name:
212+
raise NotImplementedError("Using a library not currently supported.")
213+
future_batch_task = self.submit(self.future_funcall(library_name, fn_wrapped, *heads))
214+
else:
215+
future_batch_task = self.submit(self.future_task(fn_wrapped, *heads))
216+
217+
tasks.append(future_batch_task)
218+
219+
return self.submit(self.future_task(wait_for_map_resolution, *tasks))
220+
221+
# Reduce performs a reduction tree on the iterable and currently returns a single value
222+
#
223+
# parameters:
224+
# - Function
225+
# - a function that receives fn_arity arguments
226+
# - A sequence of parameters that function will take
227+
# - a chunk_size to group elements in sequence to dispatch to a single task
228+
# - arity of the function, elements of a chunk are reduce arity-wise.
229+
# - an optional library_name for a library function call
230+
def reduce(self, fn, iterable, library_name=None, chunk_size=2, fn_arity=2):
231+
assert chunk_size > 1
232+
assert fn_arity > 1
233+
assert isinstance(iterable, Sequence)
234+
chunk_size = max(fn_arity, chunk_size)
235+
236+
new_iterable = []
237+
while iterable:
238+
heads, iterable = iterable[:chunk_size], iterable[chunk_size:]
239+
heads = [f.result() if isinstance(f, VineFuture) else f for f in heads]
240+
if library_name:
241+
raise NotImplementedError("Using a library not currently supported.")
242+
future_batch_task = self.submit(
243+
self.future_funcall(
244+
library_name, reduction_tree, fn, *heads, n=fn_arity
245+
)
246+
)
247+
else:
248+
future_batch_task = self.submit(self.future_task(reduction_tree, fn, *heads, n=fn_arity))
249+
250+
new_iterable.append(future_batch_task)
251+
252+
if len(new_iterable) > 1:
253+
return self.reduce(fn, new_iterable, library_name, chunk_size, fn_arity)
254+
else:
255+
return new_iterable[0]
256+
257+
def allpairs(self, fn, iterable_rows, iterable_cols, library_name=None, chunk_size=1):
258+
assert chunk_size > 0
259+
assert isinstance(iterable_rows, Sequence)
260+
assert isinstance(iterable_cols, Sequence)
261+
262+
def wait_for_allpairs_resolution(row_size, col_size, mapped):
263+
result = []
264+
for _ in range(row_size):
265+
result.append([0] * col_size)
266+
267+
mapped = mapped.result() if isinstance(mapped, VineFuture) else mapped
268+
for p in mapped:
269+
(i, j, r) = p.result() if isinstance(p, VineFuture) else p
270+
result[i][j] = r
271+
272+
return result
273+
274+
def wrap_idx(args):
275+
i, j, a, b = args
276+
return (i, j, fn(a, b))
277+
278+
iterable = [(i, j, a, b) for (i, a) in enumerate(iterable_rows) for (j, b) in enumerate(iterable_cols)]
279+
mapped = self.map(wrap_idx, iterable, library_name, chunk_size)
280+
281+
return self.submit(
282+
self.future_task(
283+
wait_for_allpairs_resolution,
284+
len(iterable_rows),
285+
len(iterable_cols),
286+
mapped,
287+
)
288+
)
289+
176290
def submit(self, fn, *args, **kwargs):
177291
if isinstance(fn, (FuturePythonTask, FutureFunctionCall)):
178292
self.manager.submit(fn)
@@ -240,15 +354,15 @@ def cancelled(self):
240354
return False
241355

242356
def running(self):
243-
state = self._task.state
244-
if state == "RUNNING":
357+
state = self._task._module_manager.task_state(self._task.id)
358+
if state == cvine.VINE_TASK_RUNNING:
245359
return True
246360
else:
247361
return False
248362

249363
def done(self):
250-
state = self._task.state
251-
if state == "DONE" or state == "RETRIEVED":
364+
state = self._task._module_manager.task_state(self._task.id)
365+
if state == cvine.VINE_TASK_DONE:
252366
return True
253367
else:
254368
return False
@@ -301,7 +415,6 @@ def __init__(self, manager, library_name, fn, *args, **kwargs):
301415
self.manager = manager
302416
self.library_name = library_name
303417
self._envs = []
304-
305418
self._future = VineFuture(self)
306419
self._has_retrieved = False
307420

@@ -326,7 +439,6 @@ def output(self, timeout="wait_forever"):
326439
self._saved_output = output['Result']
327440
else:
328441
self._saved_output = FunctionCallNoResult(output['Reason'])
329-
330442
except Exception as e:
331443
self._saved_output = e
332444
else:
@@ -400,6 +512,7 @@ def output(self, timeout="wait_forever"):
400512
# task or the exception object of a failed task.
401513
self._output = cloudpickle.loads(self._output_file.contents())
402514
except Exception as e:
515+
print(self._output_file.contents())
403516
# handle output file fetch/deserialization failures
404517
self._output = e
405518
self._output_loaded = True
+84
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
#!/bin/sh
2+
3+
set -e
4+
5+
. ../../dttools/test/test_runner_common.sh
6+
7+
import_config_val CCTOOLS_PYTHON_TEST_EXEC
8+
import_config_val CCTOOLS_PYTHON_TEST_DIR
9+
10+
export PYTHONPATH=$(pwd)/../../test_support/python_modules/${CCTOOLS_PYTHON_TEST_DIR}:$PYTHONPATH
11+
12+
STATUS_FILE=vine.status
13+
PORT_FILE=vine.port
14+
15+
check_needed()
16+
{
17+
[ -n "${CCTOOLS_PYTHON_TEST_EXEC}" ] || return 1
18+
19+
# Poncho currently requires ast.unparse to serialize the function,
20+
# which only became available in Python 3.9. Some older platforms
21+
# (e.g. almalinux8) will not have this natively.
22+
"${CCTOOLS_PYTHON_TEST_EXEC}" -c "from ast import unparse" || return 1
23+
24+
# In some limited build circumstances (e.g. macos build on github),
25+
# poncho doesn't work due to lack of conda-pack or cloudpickle
26+
"${CCTOOLS_PYTHON_TEST_EXEC}" -c "import conda_pack" || return 1
27+
"${CCTOOLS_PYTHON_TEST_EXEC}" -c "import cloudpickle" || return 1
28+
29+
return 0
30+
}
31+
32+
prepare()
33+
{
34+
rm -f $STATUS_FILE
35+
rm -f $PORT_FILE
36+
return 0
37+
}
38+
39+
run()
40+
{
41+
( ${CCTOOLS_PYTHON_TEST_EXEC} vine_python_future_hof.py $PORT_FILE; echo $? > $STATUS_FILE ) &
42+
43+
# wait at most 15 seconds for vine to find a port.
44+
wait_for_file_creation $PORT_FILE 15
45+
46+
run_taskvine_worker $PORT_FILE worker.log --cores 2 --memory 2000 --disk 2000
47+
48+
# wait for vine to exit.
49+
wait_for_file_creation $STATUS_FILE 15
50+
51+
# retrieve exit status
52+
status=$(cat $STATUS_FILE)
53+
if [ $status -ne 0 ]
54+
then
55+
# display log files in case of failure.
56+
logfile=$(latest_vine_debug_log)
57+
if [ -f ${logfile} ]
58+
then
59+
echo "master log:"
60+
cat ${logfile}
61+
fi
62+
63+
if [ -f worker.log ]
64+
then
65+
echo "worker log:"
66+
cat worker.log
67+
fi
68+
69+
exit 1
70+
fi
71+
72+
exit 0
73+
}
74+
75+
clean()
76+
{
77+
rm -f $STATUS_FILE
78+
rm -f $PORT_FILE
79+
rm -rf vine-run-info
80+
exit 0
81+
}
82+
83+
84+
dispatch "$@"
+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
#! /usr/bin/env python
2+
3+
import sys
4+
import ndcctools.taskvine as vine
5+
6+
port_file = None
7+
try:
8+
port_file = sys.argv[1]
9+
except IndexError:
10+
sys.stderr.write("Usage: {} PORTFILE\n".format(sys.argv[0]))
11+
raise
12+
13+
def main():
14+
executor = vine.FuturesExecutor(
15+
port=[9123, 9129], manager_name="vine_hof_test", factory=False
16+
)
17+
18+
print("listening on port {}".format(executor.manager.port))
19+
with open(port_file, "w") as f:
20+
f.write(str(executor.manager.port))
21+
22+
nums = list(range(101))
23+
24+
rows = 3
25+
mult_table = executor.allpairs(lambda x, y: x*y, range(rows), nums, chunk_size=11).result()
26+
assert sum(mult_table[1]) == sum(nums)
27+
assert sum(sum(r) for r in mult_table) == sum(sum(nums) * n for n in range(rows))
28+
29+
doubles = executor.map(lambda x: 2*x, nums, chunk_size=10).result()
30+
assert sum(doubles) == sum(nums)*2
31+
32+
doubles = executor.map(lambda x: 2*x, nums, chunk_size=13).result()
33+
assert sum(doubles) == sum(nums)*2
34+
35+
maximum = executor.reduce(max, nums, fn_arity=2).result()
36+
assert maximum == 100
37+
38+
maximum = executor.reduce(max, nums, fn_arity=25).result()
39+
assert maximum == 100
40+
41+
maximum = executor.reduce(max, nums, fn_arity=1000).result()
42+
assert maximum == 100
43+
44+
maximum = executor.reduce(max, nums, fn_arity=2, chunk_size=50).result()
45+
assert maximum == 100
46+
47+
minimum = executor.reduce(min, nums, fn_arity=2, chunk_size=50).result()
48+
assert minimum == 0
49+
50+
total = executor.reduce(sum, nums, fn_arity=11, chunk_size=13).result()
51+
assert total == sum(nums)
52+
53+
54+
55+
56+
if __name__ == "__main__":
57+
main()
58+
59+
60+
# vim: set sts=4 sw=4 ts=4 expandtab ft=python:

0 commit comments

Comments
 (0)