-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpipeline.py
92 lines (62 loc) · 2.37 KB
/
pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
"""A Python script that creates a DVC pipeline.
Include scheduling?
We could define a once per day task,
and we break the DVC pipeline up accordingly.
Will want a way to distribute the stages.
If someone runs
calkit run
from the terminal,
and calkit sees a calkit.py or pipeline.py file?
we convert the script into a DVC pipeline then run that
This would actually be a pure extension of the existing system
Kind of similar to Dagster
Should this handle chunked dependencies,
automatically breaking the logic into multiple stages,
one per chunk or partition.
I guess it would also need to partition the outputs.
Is it possible this would be scalable?
We'd want DVC to remotely run necessary stages,
either in processes or different machines,
in parallel.
Stage command could look like this to enable distributed workers,
who run their stage and push
calkit run-stage-remote --remote-name my-hpc --stage get_data_partition_1 some command
calkit run-stage-remote --worker-pool some command
calkit run detects any remote runs and waits until they're all done,
then collects up all of the commits they merged together?
Git history then becomes a log of tasks being run?
We could query for task history from git log?
Can we also store different partitions on different machines,
or have them write direct to the DVC remote,
so our local project can stay clean.
Can we make this work with marimo notebooks?
Can we schedule these pipeline runs in the cloud?
GitHub Actions is an option.
That could enable event-driven workflows, e.g., run the pipeline
and push on all pushes to main.
"""
from typing import Annotated
import pandas as pd
import plotly.graph_objects as go
from calkit.pipeline import Dataset, Dependency, Figure, Output, Pipeline
pipeline = Pipeline(deps=[Dependency(path="environment.yml")])
my_list = list(range(5))
def helper_func():
pass
@pipeline.stage(other_arg=55)
def collect_data(size: int = 10) -> Annotated[
pd.DataFrame,
Dataset(path="data/something.parquet", title="The data"),
]:
something = helper_func()
list2 = my_list
return pd.DataFrame(range(size))
@pipeline.stage
def plot_data(
data: Annotated[pd.DataFrame, Dependency(path="data/something.parquet")]
) -> Annotated[
go.Figure, Figure(path="figures/plot.json", title="The figure")
]:
return data.plot(backend="plotly")
if __name__ == "__main__":
pipeline.run()