Skip to content
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
e0dc4be
feat: add workflow schedule trigger support
ACAne0320 Aug 24, 2025
081c61c
[autofix.ci] apply automated fixes
autofix-ci[bot] Aug 24, 2025
b7a3393
Merge remote-tracking branch 'upstream/feat/trigger' into feat/schedu…
ACAne0320 Aug 27, 2025
55272f1
refactor: use beat_schedule instead of custom schedule
ACAne0320 Aug 27, 2025
3be64ac
Merge remote-tracking branch 'upstream/feat/trigger' into feat/schedu…
ACAne0320 Aug 28, 2025
9ff8318
refactor: update workflow_schedule_plans table structrue & add daily …
ACAne0320 Aug 28, 2025
57502a4
feat: sync workflow schedule params when publishing workflow
ACAne0320 Aug 28, 2025
aaa2c74
chore: internal func move to ScheduleService
ACAne0320 Aug 29, 2025
3b9d3a3
feat(test): add ScheduleService unit tests
ACAne0320 Aug 29, 2025
29fa80f
chore: update schedule node default config & remove enable param
ACAne0320 Aug 29, 2025
5b49cda
chore: remove unused import and clean up comments in ScheduleService
ACAne0320 Aug 29, 2025
7f4c403
refactor: simplify schedule polling logic and reduce code complexity
ACAne0320 Aug 29, 2025
2bab93b
chore: reformat
ACAne0320 Aug 29, 2025
f0ac008
feat: add start-beat
ACAne0320 Aug 29, 2025
9bb0646
Merge remote-tracking branch 'upstream/feat/trigger' into feat/schedu…
ACAne0320 Aug 29, 2025
5ccca1e
Merge remote-tracking branch 'upstream/feat/trigger' into feat/schedu…
ACAne0320 Sep 1, 2025
66591bc
refactor: use Pydantic.Basemodel handling schedule plan
ACAne0320 Sep 2, 2025
8882df7
feat: add ScheduleNodeError handling & convert def that do not refere…
ACAne0320 Sep 2, 2025
3a3178a
refactor: simplify workflow schedule processing and remove pre-check …
ACAne0320 Sep 2, 2025
3e0414e
refactor: convert def that do not reference ScheduleService to utils …
ACAne0320 Sep 2, 2025
04994d6
chore: update schedule_service unit_tests
ACAne0320 Sep 2, 2025
d77cb4c
Merge remote-tracking branch 'upstream/feat/trigger' into feat/schedu…
ACAne0320 Sep 2, 2025
38ffd95
refactor: update convert_12h_to_24h to raise ValueErrors for invalid …
ACAne0320 Sep 2, 2025
8040580
Merge remote-tracking branch 'upstream/feat/trigger' into feat/schedu…
ACAne0320 Sep 3, 2025
a7fbc9f
feat: add VisualConfig & ScheduleExecutionError class
ACAne0320 Sep 3, 2025
80b28e7
chore: replace datetime.now() with naive_utc_now()
ACAne0320 Sep 3, 2025
42fa1c4
chore: enhance schedule update logic and error handling
ACAne0320 Sep 3, 2025
bce2674
chore: improve error handling and logging in run_schedule_trigger func
ACAne0320 Sep 3, 2025
30bb55d
chore: enhance visual_to_cron method
ACAne0320 Sep 4, 2025
22f3f2a
feat: enhance visual_to_cron method with VisualConfig integration and…
ACAne0320 Sep 4, 2025
a00435c
Merge remote-tracking branch 'upstream/feat/trigger' into feat/schedu…
ACAne0320 Sep 8, 2025
30e562d
feat: improve schedule dispatch with streaming and parallel processing
ACAne0320 Sep 8, 2025
3500d47
chore: remove redundant return statements in account retrieval and sc…
ACAne0320 Sep 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,10 @@ ENABLE_CLEAN_MESSAGES=false
ENABLE_MAIL_CLEAN_DOCUMENT_NOTIFY_TASK=false
ENABLE_DATASETS_QUEUE_MONITOR=false
ENABLE_CHECK_UPGRADABLE_PLUGIN_TASK=true
ENABLE_WORKFLOW_SCHEDULE_POLLER_TASK=true
# Interval time in minutes for polling scheduled workflows(default: 1 min)
WORKFLOW_SCHEDULE_POLLER_INTERVAL=1
WORKFLOW_SCHEDULE_POLLER_BATCH_SIZE=100

# Position configuration
POSITION_TOOL_PINS=
Expand Down
12 changes: 12 additions & 0 deletions api/configs/feature/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,18 @@ class CeleryScheduleTasksConfig(BaseSettings):
description="Enable check upgradable plugin task",
default=True,
)
ENABLE_WORKFLOW_SCHEDULE_POLLER_TASK: bool = Field(
description="Enable workflow schedule poller task",
default=True,
)
WORKFLOW_SCHEDULE_POLLER_INTERVAL: int = Field(
description="Workflow schedule poller interval in minutes",
default=1,
)
WORKFLOW_SCHEDULE_POLLER_BATCH_SIZE: int = Field(
description="Maximum number of schedules to process in each poll batch",
default=100,
)


class PositionConfig(BaseSettings):
Expand Down
5 changes: 5 additions & 0 deletions api/core/workflow/nodes/node_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from core.workflow.nodes.template_transform import TemplateTransformNode
from core.workflow.nodes.tool import ToolNode
from core.workflow.nodes.trigger_plugin import TriggerPluginNode
from core.workflow.nodes.trigger_schedule import TriggerScheduleNode
from core.workflow.nodes.trigger_webhook import TriggerWebhookNode
from core.workflow.nodes.variable_aggregator import VariableAggregatorNode
from core.workflow.nodes.variable_assigner.v1 import VariableAssignerNode as VariableAssignerNodeV1
Expand Down Expand Up @@ -142,4 +143,8 @@
LATEST_VERSION: TriggerPluginNode,
"1": TriggerPluginNode,
},
NodeType.TRIGGER_SCHEDULE: {
LATEST_VERSION: TriggerScheduleNode,
"1": TriggerScheduleNode,
},
}
3 changes: 3 additions & 0 deletions api/core/workflow/nodes/trigger_schedule/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from core.workflow.nodes.trigger_schedule.trigger_schedule_node import TriggerScheduleNode

__all__ = ["TriggerScheduleNode"]
51 changes: 51 additions & 0 deletions api/core/workflow/nodes/trigger_schedule/entities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from typing import Literal, Optional, Union

from pydantic import BaseModel, Field

from core.workflow.nodes.base import BaseNodeData


class TriggerScheduleNodeData(BaseNodeData):
"""
Trigger Schedule Node Data
"""

mode: str = Field(default="visual", description="Schedule mode: visual or cron")
frequency: Optional[str] = Field(
default=None, description="Frequency for visual mode: hourly, daily, weekly, monthly"
)
cron_expression: Optional[str] = Field(default=None, description="Cron expression for cron mode")
visual_config: Optional[dict] = Field(default=None, description="Visual configuration details")
timezone: str = Field(default="UTC", description="Timezone for schedule execution")


class ScheduleConfig(BaseModel):
node_id: str
cron_expression: str
timezone: str = "UTC"


class SchedulePlanUpdate(BaseModel):
node_id: Optional[str] = None
cron_expression: Optional[str] = None
timezone: Optional[str] = None


class VisualConfig(BaseModel):
"""Visual configuration for schedule trigger"""

# For hourly frequency
on_minute: Optional[int] = Field(default=0, ge=0, le=59, description="Minute of the hour (0-59)")

# For daily, weekly, monthly frequencies
time: Optional[str] = Field(default="12:00 PM", description="Time in 12-hour format (e.g., '2:30 PM')")

# For weekly frequency
weekdays: Optional[list[Literal["sun", "mon", "tue", "wed", "thu", "fri", "sat"]]] = Field(
default=None, description="List of weekdays to run on"
)

# For monthly frequency
monthly_days: Optional[list[Union[int, Literal["last"]]]] = Field(
default=None, description="Days of month to run on (1-31 or 'last')"
)
31 changes: 31 additions & 0 deletions api/core/workflow/nodes/trigger_schedule/exc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from core.workflow.nodes.base.exc import BaseNodeError


class ScheduleNodeError(BaseNodeError):
"""Base schedule node error."""

pass


class ScheduleNotFoundError(ScheduleNodeError):
"""Schedule not found error."""

pass


class ScheduleConfigError(ScheduleNodeError):
"""Schedule configuration error."""

pass


class ScheduleExecutionError(ScheduleNodeError):
"""Schedule execution error."""

pass


class TenantOwnerNotFoundError(ScheduleExecutionError):
"""Tenant owner not found error for schedule execution."""

pass
62 changes: 62 additions & 0 deletions api/core/workflow/nodes/trigger_schedule/trigger_schedule_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from collections.abc import Mapping
from datetime import UTC, datetime
from typing import Any, Optional

from core.workflow.entities.node_entities import NodeRunResult
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus
from core.workflow.nodes.base import BaseNode
from core.workflow.nodes.base.entities import BaseNodeData, RetryConfig
from core.workflow.nodes.enums import ErrorStrategy, NodeType
from core.workflow.nodes.trigger_schedule.entities import TriggerScheduleNodeData


class TriggerScheduleNode(BaseNode):
_node_type = NodeType.TRIGGER_SCHEDULE

_node_data: TriggerScheduleNodeData

def init_node_data(self, data: Mapping[str, Any]) -> None:
self._node_data = TriggerScheduleNodeData(**data)

def _get_error_strategy(self) -> Optional[ErrorStrategy]:
return self._node_data.error_strategy

def _get_retry_config(self) -> RetryConfig:
return self._node_data.retry_config

def _get_title(self) -> str:
return self._node_data.title

def _get_description(self) -> Optional[str]:
return self._node_data.desc

def _get_default_value_dict(self) -> dict[str, Any]:
return self._node_data.default_value_dict

def get_base_node_data(self) -> BaseNodeData:
return self._node_data

@classmethod
def version(cls) -> str:
return "1"

@classmethod
def get_default_config(cls, filters: Optional[dict] = None) -> dict:
return {
"type": "trigger-schedule",
"config": {
"mode": "visual",
"frequency": "weekly",
"visual_config": {"time": "11:30 AM", "on_minute": 0, "weekdays": ["sun"], "monthly_days": [1]},
"timezone": "UTC",
},
}

def _run(self) -> NodeRunResult:
current_time = datetime.now(UTC)
node_outputs = {"current_time": current_time.isoformat()}

return NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
outputs=node_outputs,
)
4 changes: 2 additions & 2 deletions api/docker/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ if [[ "${MODE}" == "worker" ]]; then
if [[ -z "${CELERY_QUEUES}" ]]; then
if [[ "${EDITION}" == "CLOUD" ]]; then
# Cloud edition: separate queues for dataset and trigger tasks
DEFAULT_QUEUES="dataset,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow_professional,workflow_team,workflow_sandbox"
DEFAULT_QUEUES="dataset,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow_professional,workflow_team,workflow_sandbox,schedule"
else
# Community edition (SELF_HOSTED): dataset and workflow have separate queues
DEFAULT_QUEUES="dataset,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow"
DEFAULT_QUEUES="dataset,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule"
fi
else
DEFAULT_QUEUES="${CELERY_QUEUES}"
Expand Down
1 change: 1 addition & 0 deletions api/events/event_handlers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from .create_installed_app_when_app_created import handle
from .create_site_record_when_app_created import handle
from .delete_tool_parameters_cache_when_sync_draft_workflow import handle
from .sync_workflow_schedule_when_app_published import handle
from .update_app_dataset_join_when_app_model_config_updated import handle
from .update_app_dataset_join_when_app_published_workflow_updated import handle
from .update_app_triggers_when_app_published_workflow_updated import handle
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import logging
from typing import Optional, cast

from sqlalchemy import select
from sqlalchemy.orm import Session

from core.workflow.nodes.trigger_schedule.entities import SchedulePlanUpdate
from events.app_event import app_published_workflow_was_updated
from extensions.ext_database import db
from models import AppMode, Workflow, WorkflowSchedulePlan
from services.schedule_service import ScheduleService

logger = logging.getLogger(__name__)


@app_published_workflow_was_updated.connect
def handle(sender, **kwargs):
"""
Handle app published workflow update event to sync workflow_schedule_plans table.

When a workflow is published, this handler will:
1. Extract schedule trigger nodes from the workflow graph
2. Compare with existing workflow_schedule_plans records
3. Create/update/delete schedule plans as needed
"""
app = sender
if app.mode != AppMode.WORKFLOW.value:
return

published_workflow = kwargs.get("published_workflow")
published_workflow = cast(Workflow, published_workflow)

sync_schedule_from_workflow(tenant_id=app.tenant_id, app_id=app.id, workflow=published_workflow)


def sync_schedule_from_workflow(tenant_id: str, app_id: str, workflow: Workflow) -> Optional[WorkflowSchedulePlan]:
"""
Sync schedule plan from workflow graph configuration.

Args:
tenant_id: Tenant ID
app_id: App ID
workflow: Published workflow instance

Returns:
Updated or created WorkflowSchedulePlan, or None if no schedule node
"""
with Session(db.engine) as session:
schedule_config = ScheduleService.extract_schedule_config(workflow)

existing_plan = session.scalar(
select(WorkflowSchedulePlan).where(
WorkflowSchedulePlan.tenant_id == tenant_id,
WorkflowSchedulePlan.app_id == app_id,
)
)

if not schedule_config:
if existing_plan:
logger.info("No schedule node in workflow for app %s, removing schedule plan", app_id)
ScheduleService.delete_schedule(session=session, schedule_id=existing_plan.id)
session.commit()
return None

if existing_plan:
updates = SchedulePlanUpdate(
node_id=schedule_config.node_id,
cron_expression=schedule_config.cron_expression,
timezone=schedule_config.timezone,
)
updated_plan = ScheduleService.update_schedule(
session=session,
schedule_id=existing_plan.id,
updates=updates,
)
session.commit()
return updated_plan
else:
new_plan = ScheduleService.create_schedule(
session=session,
tenant_id=tenant_id,
app_id=app_id,
config=schedule_config,
)
session.commit()
return new_plan
6 changes: 6 additions & 0 deletions api/extensions/ext_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ def __call__(self, *args: object, **kwargs: object) -> object:
"task": "schedule.clean_workflow_runlogs_precise.clean_workflow_runlogs_precise",
"schedule": crontab(minute="0", hour="2"),
}
if dify_config.ENABLE_WORKFLOW_SCHEDULE_POLLER_TASK:
imports.append("schedule.workflow_schedule_task")
beat_schedule["workflow_schedule_task"] = {
"task": "schedule.workflow_schedule_task.poll_workflow_schedules",
"schedule": timedelta(minutes=dify_config.WORKFLOW_SCHEDULE_POLLER_INTERVAL),
}
celery_app.conf.update(beat_schedule=beat_schedule, imports=imports)

return celery_app
Loading