Can a "later" part of the DAG depend on an "earlier" part? #450
-
Hi, I'm enjoying Reducing what I have to the minimal example, assume there's a file somewhere out on the internet that lists the names of the files I need to generate. I currently have something analogous to the following: # task_defer_pass_1.py
BLD = Path(__file__).parent.parent.parent.resolve() / "build"
json_definition_path = Path(BLD / "filelist.json")
@task
def download_remote_file(output: Annotated[Path, Product] = json_definition_path):
# Imagine this is downloading a remote file instead of writing out a static one
output.write_text(json.dumps(["file1.txt", "file2.txt", "file3.txt"])) # task_defer_pass_2.py
from .task_defer_pass_1 import json_definition_path, BLD
if json_definition_path.exists():
for f in json.loads(json_definition_path.read_text()):
@task
def create_file(output: Annotated[Path, Product] = Path(BLD / f)):
output.write_text("This is a file") That lets me run My suspicion from reading the code is that the challenge would be in the Is there some secret technique I'm missing? |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 3 replies
-
Hi @egerlach! I am happy you are enjoying pytask. You are right. There is no mechanism for deferred collection and execution of tasks yet. Your solution is exactly what I would have proposed as well as a workaround. Another solution could be to define more of the second tasks than you need and let the unnecessary tasks exit early. For example, I like the idea of lazy task-loading. Here are just quick thoughts I have. Let me know if you have questions or ideas. What could an interface for the second task look like? We could add another keyword to @task(is_ready=lambda *x: json_definition_path.exists())
def create_file(output: Annotated[Path, Product] = Path(BLD / f)):
output.write_text("This is a file.") Internally, we could do something like
The problem is now, how do we repeat the task for all files? Unfortunately, I recently removed def _yield_files(path):
files = json.loads(path.read_text())
for file in files:
yield file
@task(is_ready=lambda *x: json_definition_path.exists())
@pytask.mark.parametrize("output", _yield_paths(BLD / "filelist.json"))
def create_file(output: Annotated[Path, Product]):
output.write_text("This is a file.") Maybe we have to think about a comeback. What are others doing?
|
Beta Was this translation helpful? Give feedback.
-
Okay, I like the hack of having N of the secondary tasks and referencing them ordinally instead of by path. I think I'm going to do that in the "now" timeframe. I'll admit, I'm thinking about this more abstractly because I don't fully understand the internals yet. The abstract concept that I'm toying with in my head is that you have some task generator node in the graph, which when the run starts is an amorphous cloud of potential tasks. It depends on some other tasks being complete to be able to start its generation. Once it's done its generation that sub-graph of the DAG is now realized and the entire DAG can now be executed. The first challenge I imagine is how to connect this cloud into the rest of the DAG. The inputs are easy, but the outputs of a generator are more challenging. I like the idea of your I see two cases for a task that depends on a set of generated tasks. Either:
@task
def after_generator(
paths : NodeSet[Path] = NodeSet(from=generator_task),
output : Annotated[Path, Output] = BLD / "somefile.zip"
):
with ZipFile(output, "w") as zip:
for path in paths.values:
zip.write(path)
@task(generator=True)
def after_first_generator(
paths : NodeSet[Path] = NodeSet(from=generator_task),
output : Annotated[NodeSet[Path], Output] = NodeSet(something_smart_here)
):
for path in paths:
output_path = BLD / "another_dir" / path.name
@task
def copy_file(src : Path = path, dest : Annotated[Path, Output] = output_path):
shutil.copy(src, dest)
yield copy_file
output.add(output_path) I don't love either of those as examples, but I hope they help convey the general idea I'm thinking about. The other big part of this that I see is how to modify the algorithm to handle a dynamic DAG. The way I'm understanding the current algorithm is that there's a number of steps, and if we think about them as a pipeline we get something like:
I imagine that while any(dag, lambda n: not n.is_ready()):
plan = sorter.fulfill_dependencies_of_nodes_with_more_nodes_to_generate(dag)
somethingsomething.execute(plan)
more_nodes = somethingsomething.collect_deferred_tasks(dag)
nodes.extend(more_nodes)
dag = somethingsomething.compute_dag(nodes) Then the algorithm could continue as usual, only a greater fraction of the nodes would already be fulfilled. It's possible that a new hook might not be needed, but that would rely on the existing The idea that I'm peddling here is to have the system realize that it has an incomplete graph, then execute just enough of that graph that any incomplete nodes can complete. In most cases, at least the ones that I'm thinking of, the steps needed to get to a complete graph are smaller compared to what you need to do once you get there. The more I think about it, the more I like the idea of embedding more of this information into the graph itself using richer node types. It allows the use of the Python typing system to catch certain things, like making sure that any outputs from a task generator are an output collection. It also allows for successive generators to decouple from one another because they depend on an abstraction. Things would get messy if you had a generator which generated some concrete tasks along with some other generators, but that just feels like bad design. Not to get too off topic, but the idea of having some richer node metadata could also enable wildcard tasks like Makefile pattern rules. Imagine this, which can't work as written but I think conveys the idea: class CompileCToObj(WildcardTaskProtocol):
def dependencies(self, out : Annotated[Path, Wildcard("Obj file")]) -> dict[str, Annotated[Path, Wildcard["C file"]]:
return { "in": out.with_suffix("c") } # this becomes kwargs when calling execute()
def execute(self, in : Annotated[Path, Wildcard("C file")], out : Annotated[Path, Output, Wildcard("Obj file")]):
out = compile(in) Anything that depended on a Overall, that's a lot of thoughts, and not a lot of them thought through to their consequences. I hope this riffing on your thoughts is at least somewhat useful. |
Beta Was this translation helpful? Give feedback.
-
FYI @tobiasraabe Apache Airlflow supports dynamic DAGs as well. I'm not familiar with airflow at all but at first glance their way looks sorta complicated to me :) |
Beta Was this translation helpful? Give feedback.
Hi @egerlach, I have finished the implementation in #487, which should help you with your problem. If you have some time, can you please read the how-to guide and give me some feedback?