This library facilitates keeping track of dependencies between python functions, and running them asyncronously and/or in parallel.
pip install dgraph
This library exports three functions.
Creates the dependency graph.
The dependencies
argument is a dictionary that maps functions to the functions
they depend on. The keys in the dictionary are functions and the values are list
of functions. The order matters because the the results of the functions that a
function depends on will be fed in to that function in the order that the
functions are listed in the dependencies dict. Functions that return None will
not have their results fed into functions that depend on them.
This will throw for a dependency dictionary with cyclic dependencies, or if there are functions that are depended on but are not present as keys in the dependencies dictionary.
Runs the graph in a single thread.
import dgraph
def a(): return 5
def b(x): return x + 10
def c(x, y): return x + y + 20
dependencies = {
c: [a, b],
b: [a],
a: [],
}
graph = dgraph.create_graph(dependencies)
results = dgraph.run(graph)
assert results == {a: 5, b: 15, c: 40}
Note that, for example, the result of a
is fed to b
, and the results of a
and b
are fed to c
.
Similar to dgraph.run
, but runs functions asyncronously. All functions must be
coroutines. This is useful when many of the functions are bottlenecked by io.
sleep
here and elsewhere is how long to wait before checking if a new function
can be run.
Similar to dgraph.run
, but runs each new function in a new process for up to
ncores
processes as new functions become available. This is useful when many
of the functions are bottlenecked by cpu.
Similar to dgraph.run
, but runs each new function asynchronously on one of up
to ncores
cores. All functions must be coroutines. This is useful when
functions are bottlenecked by both cpu and io.
import time
import dgraph
async def ab(x): return x + 10
async def ac(x, y): return x + y + 20
async def along_task(): time.sleep(.02); return 5
dependencies = {
ac: [along_task, ab],
ab: [along_task],
along_task: [],
}
graph = dgraph.create_graph(dependencies)
results = dgraph.run_async(graph)
assert results == {along_task: 5, ab: 15, ac: 40}