Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
e0dc4be
feat: add workflow schedule trigger support
ACAne0320 Aug 24, 2025
081c61c
[autofix.ci] apply automated fixes
autofix-ci[bot] Aug 24, 2025
b7a3393
Merge remote-tracking branch 'upstream/feat/trigger' into feat/schedu…
ACAne0320 Aug 27, 2025
55272f1
refactor: use beat_schedule instead of custom schedule
ACAne0320 Aug 27, 2025
3be64ac
Merge remote-tracking branch 'upstream/feat/trigger' into feat/schedu…
ACAne0320 Aug 28, 2025
9ff8318
refactor: update workflow_schedule_plans table structrue & add daily …
ACAne0320 Aug 28, 2025
57502a4
feat: sync workflow schedule params when publishing workflow
ACAne0320 Aug 28, 2025
aaa2c74
chore: internal func move to ScheduleService
ACAne0320 Aug 29, 2025
3b9d3a3
feat(test): add ScheduleService unit tests
ACAne0320 Aug 29, 2025
29fa80f
chore: update schedule node default config & remove enable param
ACAne0320 Aug 29, 2025
5b49cda
chore: remove unused import and clean up comments in ScheduleService
ACAne0320 Aug 29, 2025
7f4c403
refactor: simplify schedule polling logic and reduce code complexity
ACAne0320 Aug 29, 2025
2bab93b
chore: reformat
ACAne0320 Aug 29, 2025
f0ac008
feat: add start-beat
ACAne0320 Aug 29, 2025
9bb0646
Merge remote-tracking branch 'upstream/feat/trigger' into feat/schedu…
ACAne0320 Aug 29, 2025
5ccca1e
Merge remote-tracking branch 'upstream/feat/trigger' into feat/schedu…
ACAne0320 Sep 1, 2025
66591bc
refactor: use Pydantic.Basemodel handling schedule plan
ACAne0320 Sep 2, 2025
8882df7
feat: add ScheduleNodeError handling & convert def that do not refere…
ACAne0320 Sep 2, 2025
3a3178a
refactor: simplify workflow schedule processing and remove pre-check …
ACAne0320 Sep 2, 2025
3e0414e
refactor: convert def that do not reference ScheduleService to utils …
ACAne0320 Sep 2, 2025
04994d6
chore: update schedule_service unit_tests
ACAne0320 Sep 2, 2025
d77cb4c
Merge remote-tracking branch 'upstream/feat/trigger' into feat/schedu…
ACAne0320 Sep 2, 2025
38ffd95
refactor: update convert_12h_to_24h to raise ValueErrors for invalid …
ACAne0320 Sep 2, 2025
8040580
Merge remote-tracking branch 'upstream/feat/trigger' into feat/schedu…
ACAne0320 Sep 3, 2025
a7fbc9f
feat: add VisualConfig & ScheduleExecutionError class
ACAne0320 Sep 3, 2025
80b28e7
chore: replace datetime.now() with naive_utc_now()
ACAne0320 Sep 3, 2025
42fa1c4
chore: enhance schedule update logic and error handling
ACAne0320 Sep 3, 2025
bce2674
chore: improve error handling and logging in run_schedule_trigger func
ACAne0320 Sep 3, 2025
30bb55d
chore: enhance visual_to_cron method
ACAne0320 Sep 4, 2025
22f3f2a
feat: enhance visual_to_cron method with VisualConfig integration and…
ACAne0320 Sep 4, 2025
a00435c
Merge remote-tracking branch 'upstream/feat/trigger' into feat/schedu…
ACAne0320 Sep 8, 2025
30e562d
feat: improve schedule dispatch with streaming and parallel processing
ACAne0320 Sep 8, 2025
3500d47
chore: remove redundant return statements in account retrieval and sc…
ACAne0320 Sep 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/core/workflow/nodes/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class NodeType(StrEnum):
DOCUMENT_EXTRACTOR = "document-extractor"
LIST_OPERATOR = "list-operator"
AGENT = "agent"
TRIGGER_SCHEDULE = "trigger-schedule"


class ErrorStrategy(StrEnum):
Expand Down
5 changes: 5 additions & 0 deletions api/core/workflow/nodes/node_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from core.workflow.nodes.start import StartNode
from core.workflow.nodes.template_transform import TemplateTransformNode
from core.workflow.nodes.tool import ToolNode
from core.workflow.nodes.trigger_schedule import TriggerScheduleNode
from core.workflow.nodes.variable_aggregator import VariableAggregatorNode
from core.workflow.nodes.variable_assigner.v1 import VariableAssignerNode as VariableAssignerNodeV1
from core.workflow.nodes.variable_assigner.v2 import VariableAssignerNode as VariableAssignerNodeV2
Expand Down Expand Up @@ -132,4 +133,8 @@
"2": AgentNode,
"1": AgentNode,
},
NodeType.TRIGGER_SCHEDULE: {
LATEST_VERSION: TriggerScheduleNode,
"1": TriggerScheduleNode,
},
}
3 changes: 3 additions & 0 deletions api/core/workflow/nodes/trigger_schedule/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from core.workflow.nodes.trigger_schedule.trigger_schedule_node import TriggerScheduleNode

__all__ = ["TriggerScheduleNode"]
20 changes: 20 additions & 0 deletions api/core/workflow/nodes/trigger_schedule/entities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from typing import Optional

from pydantic import Field

from core.workflow.nodes.base import BaseNodeData


class TriggerScheduleNodeData(BaseNodeData):
"""
Trigger Schedule Node Data
"""

mode: str = Field(default="visual", description="Schedule mode: visual or cron")
frequency: Optional[str] = Field(
default=None, description="Frequency for visual mode: hourly, daily, weekly, monthly"
)
cron_expression: Optional[str] = Field(default=None, description="Cron expression for cron mode")
visual_config: Optional[dict] = Field(default=None, description="Visual configuration details")
timezone: str = Field(default="UTC", description="Timezone for schedule execution")
enabled: bool = Field(default=True, description="Whether the schedule is enabled")
70 changes: 70 additions & 0 deletions api/core/workflow/nodes/trigger_schedule/trigger_schedule_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from collections.abc import Mapping
from datetime import UTC, datetime
from typing import Any, Optional

from core.workflow.constants import SYSTEM_VARIABLE_NODE_ID
from core.workflow.entities.node_entities import NodeRunResult
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus
from core.workflow.nodes.base import BaseNode
from core.workflow.nodes.base.entities import BaseNodeData, RetryConfig
from core.workflow.nodes.enums import ErrorStrategy, NodeType
from core.workflow.nodes.trigger_schedule.entities import TriggerScheduleNodeData


class TriggerScheduleNode(BaseNode):
_node_type = NodeType.TRIGGER_SCHEDULE

_node_data: TriggerScheduleNodeData

def init_node_data(self, data: Mapping[str, Any]) -> None:
self._node_data = TriggerScheduleNodeData(**data)

def _get_error_strategy(self) -> Optional[ErrorStrategy]:
return self._node_data.error_strategy

def _get_retry_config(self) -> RetryConfig:
return self._node_data.retry_config

def _get_title(self) -> str:
return self._node_data.title

def _get_description(self) -> Optional[str]:
return self._node_data.desc

def _get_default_value_dict(self) -> dict[str, Any]:
return self._node_data.default_value_dict

def get_base_node_data(self) -> BaseNodeData:
return self._node_data

@classmethod
def version(cls) -> str:
return "1"

def _run(self) -> NodeRunResult:
node_inputs = dict(self.graph_runtime_state.variable_pool.user_inputs)
system_inputs = self.graph_runtime_state.variable_pool.system_variables.to_dict()

# Set system variables as node outputs
for var in system_inputs:
node_inputs[SYSTEM_VARIABLE_NODE_ID + "." + var] = system_inputs[var]

# Add schedule-specific outputs
triggered_at = datetime.now(UTC)
node_inputs["triggered_at"] = triggered_at.isoformat()
node_inputs["timezone"] = self._node_data.timezone
node_inputs["mode"] = self._node_data.mode
node_inputs["enabled"] = self._node_data.enabled

# Add configuration details based on mode
if self._node_data.mode == "cron" and self._node_data.cron_expression:
node_inputs["cron_expression"] = self._node_data.cron_expression
elif self._node_data.mode == "visual":
node_inputs["frequency"] = self._node_data.frequency or "unknown"
if self._node_data.visual_config:
node_inputs["visual_config"] = self._node_data.visual_config

return NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
outputs=node_inputs,
)
9 changes: 7 additions & 2 deletions api/docker/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ if [[ "${MODE}" == "worker" ]]; then
if [[ -z "${CELERY_QUEUES}" ]]; then
if [[ "${EDITION}" == "CLOUD" ]]; then
# Cloud edition: separate queues for dataset and trigger tasks
DEFAULT_QUEUES="dataset,mail,ops_trace,app_deletion,plugin,workflow_storage,workflow_professional,workflow_team,workflow_sandbox"
DEFAULT_QUEUES="dataset,mail,ops_trace,app_deletion,plugin,workflow_storage,workflow_professional,workflow_team,workflow_sandbox,schedule"
else
# Community edition (SELF_HOSTED): dataset and workflow have separate queues
DEFAULT_QUEUES="dataset,mail,ops_trace,app_deletion,plugin,workflow_storage,workflow"
DEFAULT_QUEUES="dataset,mail,ops_trace,app_deletion,plugin,workflow_storage,workflow,schedule"
fi
else
DEFAULT_QUEUES="${CELERY_QUEUES}"
Expand Down Expand Up @@ -68,6 +68,11 @@ if [[ "${MODE}" == "worker" ]]; then

elif [[ "${MODE}" == "beat" ]]; then
exec celery -A app.celery beat --loglevel ${LOG_LEVEL:-INFO}
elif [[ "${MODE}" == "workflow-scheduler" ]]; then
echo "Starting Workflow Scheduler..."
exec celery -A app.celery beat \
--scheduler schedule.schedule_dispatch:WorkflowScheduler \
--loglevel ${LOG_LEVEL:-INFO}
else
if [[ "${DEBUG}" == "true" ]]; then
exec flask run --host=${DIFY_BIND_ADDRESS:-0.0.0.0} --port=${DIFY_PORT:-5001} --debug
Expand Down
1 change: 1 addition & 0 deletions api/extensions/ext_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def __call__(self, *args: object, **kwargs: object) -> object:

imports = [
"tasks.async_workflow_tasks", # trigger workers
"tasks.workflow_schedule_tasks", # schedule trigger tasks
]
day = dify_config.CELERY_BEAT_SCHEDULER_TIME

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""add workflow_schedule_plan

Revision ID: 1e06b2654c6c
Revises: 4558cfabe44e
Create Date: 2025-08-24 13:13:23.495063

"""
from alembic import op
import models as models
import sqlalchemy as sa

# revision identifiers, used by Alembic.
revision = '1e06b2654c6c'
down_revision = '4558cfabe44e'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('workflow_schedule_plans',
sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuid_generate_v4()'), nullable=False),
sa.Column('tenant_id', models.types.StringUUID(), nullable=False),
sa.Column('app_id', models.types.StringUUID(), nullable=False),
sa.Column('workflow_id', models.types.StringUUID(), nullable=False),
sa.Column('root_node_id', sa.String(length=255), nullable=False),
sa.Column('cron_expression', sa.String(length=255), nullable=False),
sa.Column('timezone', sa.String(length=64), nullable=False),
sa.Column('enabled', sa.Boolean(), nullable=False),
sa.Column('next_run_at', sa.DateTime(timezone=True), nullable=True),
sa.Column('created_by', models.types.StringUUID(), nullable=False),
sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False),
sa.Column('updated_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False),
sa.PrimaryKeyConstraint('id', name='workflow_schedule_plan_pkey')
)
with op.batch_alter_table('workflow_schedule_plans', schema=None) as batch_op:
batch_op.create_index('workflow_schedule_plan_app_workflow_idx', ['app_id', 'workflow_id'], unique=False)
batch_op.create_index('workflow_schedule_plan_created_at_idx', ['created_at'], unique=False)
batch_op.create_index('workflow_schedule_plan_tenant_enabled_next_idx', ['tenant_id', 'enabled', 'next_run_at'], unique=False)

# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('workflow_schedule_plans', schema=None) as batch_op:
batch_op.drop_index('workflow_schedule_plan_tenant_enabled_next_idx')
batch_op.drop_index('workflow_schedule_plan_created_at_idx')
batch_op.drop_index('workflow_schedule_plan_app_workflow_idx')

op.drop_table('workflow_schedule_plans')
# ### end Alembic commands ###
69 changes: 69 additions & 0 deletions api/models/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1383,3 +1383,72 @@ def to_dict(self) -> dict:
"triggered_at": self.triggered_at.isoformat() if self.triggered_at else None,
"finished_at": self.finished_at.isoformat() if self.finished_at else None,
}


class WorkflowSchedulePlan(Base):
"""
Workflow Schedule Configuration

Store schedule configurations for time-based workflow triggers.
Uses cron expressions with timezone support for flexible scheduling.

Attributes:
id: UUID primary key
tenant_id: Workspace ID for multi-tenancy
app_id: Application ID this schedule belongs to
workflow_id: Workflow to trigger
root_node_id: Starting node ID for workflow execution
cron_expression: Cron expression defining schedule pattern
timezone: Timezone for cron evaluation (e.g., 'Asia/Shanghai')
enabled: Whether schedule is active
next_run_at: Next scheduled execution time (timestamptz)
created_by: Creator account ID
created_at: Creation timestamp
updated_at: Last update timestamp
"""

__tablename__ = "workflow_schedule_plans"
__table_args__ = (
sa.PrimaryKeyConstraint("id", name="workflow_schedule_plan_pkey"),
sa.Index("workflow_schedule_plan_tenant_enabled_next_idx", "tenant_id", "enabled", "next_run_at"),
sa.Index("workflow_schedule_plan_app_workflow_idx", "app_id", "workflow_id"),
sa.Index("workflow_schedule_plan_created_at_idx", "created_at"),
)

id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()"))
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
workflow_id: Mapped[str] = mapped_column(StringUUID, nullable=False)

root_node_id: Mapped[str] = mapped_column(String(255), nullable=False)

# Schedule configuration
cron_expression: Mapped[str] = mapped_column(String(255), nullable=False)
timezone: Mapped[str] = mapped_column(String(64), nullable=False)

# Schedule control
enabled: Mapped[bool] = mapped_column(sa.Boolean, nullable=False, default=True)
next_run_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True), nullable=True)

created_by: Mapped[str] = mapped_column(StringUUID, nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
updated_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp()
)

def to_dict(self) -> dict:
"""Convert to dictionary representation"""
return {
"id": self.id,
"tenant_id": self.tenant_id,
"app_id": self.app_id,
"workflow_id": self.workflow_id,
"root_node_id": self.root_node_id,
"cron_expression": self.cron_expression,
"timezone": self.timezone,
"enabled": self.enabled,
"next_run_at": self.next_run_at.isoformat() if self.next_run_at else None,
"created_by": self.created_by,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
}
1 change: 1 addition & 0 deletions api/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ dependencies = [
"sseclient-py>=1.8.0",
"httpx-sse>=0.4.0",
"sendgrid~=6.12.3",
"croniter>=6.0.0",
]
# Before adding new dependency, consider place it in
# alphabet order (a-z) and suitable group.
Expand Down
Loading