Skip to content
16 changes: 15 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,30 @@

### Enhancements

- Enhanced `appligator` with Dockerfile generation and improved Airflow integration:
- Enhanced `appligator` with Dockerfile generation and improved Airflow DAG generation:
- Added `appligator.airflow.gen_dockerfile.generate` for Jinja2-template-based
Dockerfile generation. Produces a two-stage pixi build with support for
non-editable local package installs. The runtime base image is configurable
via `base_image` (default: `debian:bookworm-slim`).
- Added `--skip-build` flag to the `appligator` CLI to skip Docker image
building and only generate DAG files, using the provided `--image-name` directly.
Skipping the build is now the default; use `--no-skip-build` to opt in to building.
- Updated `appligator.airflow.run_step` with `coerce_inputs` (casts Airflow
Jinja string params to their declared types) and `_XComEncoder` (serialises
Pydantic models and other non-JSON-native objects for XCom output).
- Added `--secret-name` option to inject Kubernetes secrets as environment
variables into every generated pod (repeatable).
- Added resource requests and limits support via four new CLI options
(`--cpu-request`, `--memory-request`, `--cpu-limit`, `--memory-limit`).
- Added volume support via two new model types (`PvcMount`, `ConfigMapMount`) and
two new repeatable CLI options:
- `--pvc-mount name:claim_name:mount_path` mounts a PersistentVolumeClaim.
- `--config-map-mount name:config_map_name:mount_path[:sub_path]` mounts a
ConfigMap, with optional `sub_path` for single-file mounts.
- Added `--config-file PATH` option to load all Kubernetes options from an
`appligator-config.yaml` file (`image_name`, `dag_name`, `secret_names`,
resource fields, `pvc_mounts`, `config_map_mounts`). CLI flags take
precedence over file values.
- The Cuiman client package has been enhanced by _job result openers_,
which ease working with the results of a process job (#65):
- Client classes now have a method
Expand Down
15 changes: 14 additions & 1 deletion appligator/src/appligator/airflow/gen_workflow_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# https://opensource.org/license/apache-2-0.

from appligator.airflow.ir import workflow_to_ir
from appligator.airflow.models import ConfigMapMount, PvcMount, ResourceRequirements
from appligator.airflow.renderer import AirflowRenderer
from procodile.workflow import WorkflowStepRegistry

Expand All @@ -11,6 +12,10 @@ def gen_workflow_dag(
dag_id: str,
registry: WorkflowStepRegistry,
image: str,
env_from_secrets: list[str] | None = None,
resources: ResourceRequirements | None = None,
pvc_mounts: list[PvcMount] | None = None,
config_map_mounts: list[ConfigMapMount] | None = None,
) -> str:
"""Generates a fully-formed Airflow DAG Python file."""

Expand All @@ -21,7 +26,15 @@ def gen_workflow_dag(
raise ValueError("Image name is required to generate dag.")

# operator-agnostic intermediate representation
ir = workflow_to_ir(registry, dag_id, image_name=image)
ir = workflow_to_ir(
registry,
dag_id,
image_name=image,
env_from_secrets=env_from_secrets,
resources=resources,
pvc_mounts=pvc_mounts,
config_map_mounts=config_map_mounts,
)

dag_code = AirflowRenderer().render(ir)

Expand Down
44 changes: 43 additions & 1 deletion appligator/src/appligator/airflow/handlers/k8s_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,48 @@ def render(self, task: TaskIR) -> str:

inputs = render_task_inputs(task.inputs)

env_from_block = ""
if task.env_from_secrets:
entries = ", ".join(
f"k8s.V1EnvFromSource(secret_ref=k8s.V1SecretEnvSource(name={name!r}))"
for name in task.env_from_secrets
)
env_from_block = f"\n env_from=[{entries}],"

resources_block = ""
if task.resources:
r = task.resources
requests = {k: v for k, v in {"cpu": r.cpu_request, "memory": r.memory_request}.items() if v}
limits = {k: v for k, v in {"cpu": r.cpu_limit, "memory": r.memory_limit}.items() if v}
requests_str = f"requests={requests!r}, " if requests else ""
limits_str = f"limits={limits!r}" if limits else ""
resources_block = f"\n container_resources=k8s.V1ResourceRequirements({requests_str}{limits_str}),"

volumes_block = ""
volume_mounts_block = ""
if task.pvc_mounts or task.config_map_mounts:
vol_entries = []
mount_entries = []
for pvc in task.pvc_mounts:
vol_entries.append(
f"k8s.V1Volume(name={pvc.name!r}, "
f"persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name={pvc.claim_name!r}))"
)
mount_entries.append(
f"k8s.V1VolumeMount(name={pvc.name!r}, mount_path={pvc.mount_path!r})"
)
for cm in task.config_map_mounts:
vol_entries.append(
f"k8s.V1Volume(name={cm.name!r}, "
f"config_map=k8s.V1ConfigMapVolumeSource(name={cm.config_map_name!r}))"
)
sub = f", sub_path={cm.sub_path!r}" if cm.sub_path else ""
mount_entries.append(
f"k8s.V1VolumeMount(name={cm.name!r}, mount_path={cm.mount_path!r}{sub})"
)
volumes_block = f"\n volumes=[{', '.join(vol_entries)}],"
volume_mounts_block = f"\n volume_mounts=[{', '.join(mount_entries)}],"

return f"""
tasks["{task.id}"] = KubernetesPodOperator(
task_id="{task.id}",
Expand All @@ -34,7 +76,7 @@ def render(self, task: TaskIR) -> str:
"func_qualname": "{task.func_qualname}",
"inputs": {{{inputs}}},
"output_keys": {task.outputs},
}})],
}})],{env_from_block}{resources_block}{volumes_block}{volume_mounts_block}
do_xcom_push=True,
)
"""
25 changes: 23 additions & 2 deletions appligator/src/appligator/airflow/ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,26 @@

from typing import Any

from appligator.airflow.models import TaskIR, WorkflowIR
from appligator.airflow.models import (
ConfigMapMount,
PvcMount,
ResourceRequirements,
TaskIR,
WorkflowIR,
)
from gavicore.models import InputDescription
from procodile import WorkflowStepRegistry
from procodile.workflow import FINAL_STEP_ID


def workflow_to_ir(
registry: WorkflowStepRegistry, workflow_id: str, image_name: str = ""
registry: WorkflowStepRegistry,
workflow_id: str,
image_name: str = "",
env_from_secrets: list[str] | None = None,
resources: ResourceRequirements | None = None,
pvc_mounts: list[PvcMount] | None = None,
config_map_mounts: list[ConfigMapMount] | None = None,
) -> WorkflowIR:
"""
Convert a WorkflowStepRegistry into a fully normalized WorkflowIR (Workflow
Expand All @@ -38,6 +50,7 @@ def workflow_to_ir(
registry: A WorkflowStepRegistry produced by the workflow DSL.
workflow_id: The DAG/workflow identifier.
image_name: Optional Container image used for Kubernetes tasks.
resources: Optional CPU/memory requests and limits applied to every task.

Returns:
A WorkflowIR representing the complete workflow execution graph.
Expand All @@ -62,6 +75,10 @@ def workflow_to_ir(
func_module=main_step.function.__module__,
func_qualname=main_step.function.__qualname__,
image=image_name,
env_from_secrets=env_from_secrets,
resources=resources,
pvc_mounts=pvc_mounts or [],
config_map_mounts=config_map_mounts or [],
inputs={name: f"param:{name}" for name in params},
outputs=list((main_step.description.outputs or {}).keys()),
depends_on=[],
Expand Down Expand Up @@ -94,6 +111,10 @@ def workflow_to_ir(
func_module=step.function.__module__,
func_qualname=step.function.__qualname__,
image=image_name,
env_from_secrets=env_from_secrets,
resources=resources,
pvc_mounts=pvc_mounts or [],
config_map_mounts=config_map_mounts or [],
inputs=inputs,
outputs=list((step.description.outputs or {}).keys()),
depends_on=sorted(set(depends_on)),
Expand Down
36 changes: 36 additions & 0 deletions appligator/src/appligator/airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,37 @@
]


class PvcMount(BaseModel):
"""A PersistentVolumeClaim volume with its mount point."""

name: str
claim_name: str
mount_path: str


class ConfigMapMount(BaseModel):
"""A ConfigMap volume with its mount point."""

name: str
config_map_name: str
mount_path: str
sub_path: str | None = None


class ResourceRequirements(BaseModel):
"""
CPU and memory resource requests and limits for a container.

Values follow the Kubernetes quantity syntax (e.g. "500m", "2", "256Mi", "1Gi").
All fields are optional; only the non-None ones are emitted into the generated DAG.
"""

cpu_request: str | None = None
memory_request: str | None = None
cpu_limit: str | None = None
memory_limit: str | None = None


class TaskIR(BaseModel):
"""
Operator-agnostic description of a single executable task.
Expand All @@ -27,6 +58,7 @@ class TaskIR(BaseModel):
image: Container image for container-based runtimes (if applicable).
command: Optional command override.
env: Optional environment variables.
resources: Optional CPU/memory requests and limits.
inputs: Mapping of input names to param:/xcom: references.
outputs: List of output keys produced by the task.
depends_on: List of upstream task IDs.
Expand All @@ -44,6 +76,10 @@ class TaskIR(BaseModel):
image: str | None = None
command: list[str] | None = None
env: dict[str, str] | None = None
env_from_secrets: list[str] | None = None
resources: ResourceRequirements | None = None
pvc_mounts: list[PvcMount] = Field(default_factory=list)
config_map_mounts: list[ConfigMapMount] = Field(default_factory=list)

# Data flow
inputs: dict[str, str] = Field(default_factory=dict)
Expand Down
15 changes: 11 additions & 4 deletions appligator/src/appligator/airflow/renderer.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ def render(self, workflow: WorkflowIR) -> str:
"""
lines: list[str] = []

lines.extend(render_header())
needs_k8s = any(
t.env_from_secrets or t.resources or t.pvc_mounts or t.config_map_mounts
for t in workflow.tasks
)
lines.extend(render_header(needs_k8s=needs_k8s))

lines.append(render_dag_open(workflow))

Expand All @@ -62,16 +66,19 @@ def render_task(self, task: TaskIR) -> str:
raise ValueError(f"No operator adapter for task {task.id}")


def render_header() -> list[str]:
return [
def render_header(needs_k8s: bool = False) -> list[str]:
lines = [
"import json",
"from datetime import datetime",
"\nfrom airflow import DAG",
"from airflow.models.param import Param",
"from airflow.providers.standard.operators.python import PythonOperator",
"from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator",
"",
]
if needs_k8s:
lines.append("from kubernetes.client import models as k8s")
lines.append("")
return lines


def render_dag_open(workflow: WorkflowIR) -> str:
Expand Down
Loading
Loading