Skip to content

Commit 4a743e6

Browse files
feat: add workflow schedule trigger support (#24428)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
1 parent 07dda61 commit 4a743e6

27 files changed

+1815
-5
lines changed

api/.env.example

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -505,6 +505,12 @@ ENABLE_CLEAN_MESSAGES=false
505505
ENABLE_MAIL_CLEAN_DOCUMENT_NOTIFY_TASK=false
506506
ENABLE_DATASETS_QUEUE_MONITOR=false
507507
ENABLE_CHECK_UPGRADABLE_PLUGIN_TASK=true
508+
ENABLE_WORKFLOW_SCHEDULE_POLLER_TASK=true
509+
# Interval time in minutes for polling scheduled workflows(default: 1 min)
510+
WORKFLOW_SCHEDULE_POLLER_INTERVAL=1
511+
WORKFLOW_SCHEDULE_POLLER_BATCH_SIZE=100
512+
# Maximum number of scheduled workflows to dispatch per tick (0 for unlimited)
513+
WORKFLOW_SCHEDULE_MAX_DISPATCH_PER_TICK=0
508514

509515
# Position configuration
510516
POSITION_TOOL_PINS=

api/configs/feature/__init__.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -882,6 +882,22 @@ class CeleryScheduleTasksConfig(BaseSettings):
882882
description="Enable check upgradable plugin task",
883883
default=True,
884884
)
885+
ENABLE_WORKFLOW_SCHEDULE_POLLER_TASK: bool = Field(
886+
description="Enable workflow schedule poller task",
887+
default=True,
888+
)
889+
WORKFLOW_SCHEDULE_POLLER_INTERVAL: int = Field(
890+
description="Workflow schedule poller interval in minutes",
891+
default=1,
892+
)
893+
WORKFLOW_SCHEDULE_POLLER_BATCH_SIZE: int = Field(
894+
description="Maximum number of schedules to process in each poll batch",
895+
default=100,
896+
)
897+
WORKFLOW_SCHEDULE_MAX_DISPATCH_PER_TICK: int = Field(
898+
description="Maximum schedules to dispatch per tick (0=unlimited, circuit breaker)",
899+
default=0,
900+
)
885901

886902

887903
class PositionConfig(BaseSettings):

api/core/workflow/nodes/node_mapping.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from core.workflow.nodes.template_transform import TemplateTransformNode
2121
from core.workflow.nodes.tool import ToolNode
2222
from core.workflow.nodes.trigger_plugin import TriggerPluginNode
23+
from core.workflow.nodes.trigger_schedule import TriggerScheduleNode
2324
from core.workflow.nodes.trigger_webhook import TriggerWebhookNode
2425
from core.workflow.nodes.variable_aggregator import VariableAggregatorNode
2526
from core.workflow.nodes.variable_assigner.v1 import VariableAssignerNode as VariableAssignerNodeV1
@@ -142,4 +143,8 @@
142143
LATEST_VERSION: TriggerPluginNode,
143144
"1": TriggerPluginNode,
144145
},
146+
NodeType.TRIGGER_SCHEDULE: {
147+
LATEST_VERSION: TriggerScheduleNode,
148+
"1": TriggerScheduleNode,
149+
},
145150
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from core.workflow.nodes.trigger_schedule.trigger_schedule_node import TriggerScheduleNode
2+
3+
__all__ = ["TriggerScheduleNode"]
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
from typing import Literal, Optional, Union
2+
3+
from pydantic import BaseModel, Field
4+
5+
from core.workflow.nodes.base import BaseNodeData
6+
7+
8+
class TriggerScheduleNodeData(BaseNodeData):
9+
"""
10+
Trigger Schedule Node Data
11+
"""
12+
13+
mode: str = Field(default="visual", description="Schedule mode: visual or cron")
14+
frequency: Optional[str] = Field(
15+
default=None, description="Frequency for visual mode: hourly, daily, weekly, monthly"
16+
)
17+
cron_expression: Optional[str] = Field(default=None, description="Cron expression for cron mode")
18+
visual_config: Optional[dict] = Field(default=None, description="Visual configuration details")
19+
timezone: str = Field(default="UTC", description="Timezone for schedule execution")
20+
21+
22+
class ScheduleConfig(BaseModel):
23+
node_id: str
24+
cron_expression: str
25+
timezone: str = "UTC"
26+
27+
28+
class SchedulePlanUpdate(BaseModel):
29+
node_id: Optional[str] = None
30+
cron_expression: Optional[str] = None
31+
timezone: Optional[str] = None
32+
33+
34+
class VisualConfig(BaseModel):
35+
"""Visual configuration for schedule trigger"""
36+
37+
# For hourly frequency
38+
on_minute: Optional[int] = Field(default=0, ge=0, le=59, description="Minute of the hour (0-59)")
39+
40+
# For daily, weekly, monthly frequencies
41+
time: Optional[str] = Field(default="12:00 PM", description="Time in 12-hour format (e.g., '2:30 PM')")
42+
43+
# For weekly frequency
44+
weekdays: Optional[list[Literal["sun", "mon", "tue", "wed", "thu", "fri", "sat"]]] = Field(
45+
default=None, description="List of weekdays to run on"
46+
)
47+
48+
# For monthly frequency
49+
monthly_days: Optional[list[Union[int, Literal["last"]]]] = Field(
50+
default=None, description="Days of month to run on (1-31 or 'last')"
51+
)
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from core.workflow.nodes.base.exc import BaseNodeError
2+
3+
4+
class ScheduleNodeError(BaseNodeError):
5+
"""Base schedule node error."""
6+
7+
pass
8+
9+
10+
class ScheduleNotFoundError(ScheduleNodeError):
11+
"""Schedule not found error."""
12+
13+
pass
14+
15+
16+
class ScheduleConfigError(ScheduleNodeError):
17+
"""Schedule configuration error."""
18+
19+
pass
20+
21+
22+
class ScheduleExecutionError(ScheduleNodeError):
23+
"""Schedule execution error."""
24+
25+
pass
26+
27+
28+
class TenantOwnerNotFoundError(ScheduleExecutionError):
29+
"""Tenant owner not found error for schedule execution."""
30+
31+
pass
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
from collections.abc import Mapping
2+
from datetime import UTC, datetime
3+
from typing import Any, Optional
4+
5+
from core.workflow.entities.node_entities import NodeRunResult
6+
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus
7+
from core.workflow.nodes.base import BaseNode
8+
from core.workflow.nodes.base.entities import BaseNodeData, RetryConfig
9+
from core.workflow.nodes.enums import ErrorStrategy, NodeType
10+
from core.workflow.nodes.trigger_schedule.entities import TriggerScheduleNodeData
11+
12+
13+
class TriggerScheduleNode(BaseNode):
14+
_node_type = NodeType.TRIGGER_SCHEDULE
15+
16+
_node_data: TriggerScheduleNodeData
17+
18+
def init_node_data(self, data: Mapping[str, Any]) -> None:
19+
self._node_data = TriggerScheduleNodeData(**data)
20+
21+
def _get_error_strategy(self) -> Optional[ErrorStrategy]:
22+
return self._node_data.error_strategy
23+
24+
def _get_retry_config(self) -> RetryConfig:
25+
return self._node_data.retry_config
26+
27+
def _get_title(self) -> str:
28+
return self._node_data.title
29+
30+
def _get_description(self) -> Optional[str]:
31+
return self._node_data.desc
32+
33+
def _get_default_value_dict(self) -> dict[str, Any]:
34+
return self._node_data.default_value_dict
35+
36+
def get_base_node_data(self) -> BaseNodeData:
37+
return self._node_data
38+
39+
@classmethod
40+
def version(cls) -> str:
41+
return "1"
42+
43+
@classmethod
44+
def get_default_config(cls, filters: Optional[dict] = None) -> dict:
45+
return {
46+
"type": "trigger-schedule",
47+
"config": {
48+
"mode": "visual",
49+
"frequency": "weekly",
50+
"visual_config": {"time": "11:30 AM", "on_minute": 0, "weekdays": ["sun"], "monthly_days": [1]},
51+
"timezone": "UTC",
52+
},
53+
}
54+
55+
def _run(self) -> NodeRunResult:
56+
current_time = datetime.now(UTC)
57+
node_outputs = {"current_time": current_time.isoformat()}
58+
59+
return NodeRunResult(
60+
status=WorkflowNodeExecutionStatus.SUCCEEDED,
61+
outputs=node_outputs,
62+
)

api/docker/entrypoint.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@ if [[ "${MODE}" == "worker" ]]; then
3434
if [[ -z "${CELERY_QUEUES}" ]]; then
3535
if [[ "${EDITION}" == "CLOUD" ]]; then
3636
# Cloud edition: separate queues for dataset and trigger tasks
37-
DEFAULT_QUEUES="dataset,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow_professional,workflow_team,workflow_sandbox"
37+
DEFAULT_QUEUES="dataset,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow_professional,workflow_team,workflow_sandbox,schedule_poller,schedule_executor"
3838
else
3939
# Community edition (SELF_HOSTED): dataset and workflow have separate queues
40-
DEFAULT_QUEUES="dataset,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow"
40+
DEFAULT_QUEUES="dataset,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor"
4141
fi
4242
else
4343
DEFAULT_QUEUES="${CELERY_QUEUES}"

api/events/event_handlers/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from .create_installed_app_when_app_created import handle
55
from .create_site_record_when_app_created import handle
66
from .delete_tool_parameters_cache_when_sync_draft_workflow import handle
7+
from .sync_workflow_schedule_when_app_published import handle
78
from .update_app_dataset_join_when_app_model_config_updated import handle
89
from .update_app_dataset_join_when_app_published_workflow_updated import handle
910
from .update_app_triggers_when_app_published_workflow_updated import handle
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import logging
2+
from typing import Optional, cast
3+
4+
from sqlalchemy import select
5+
from sqlalchemy.orm import Session
6+
7+
from core.workflow.nodes.trigger_schedule.entities import SchedulePlanUpdate
8+
from events.app_event import app_published_workflow_was_updated
9+
from extensions.ext_database import db
10+
from models import AppMode, Workflow, WorkflowSchedulePlan
11+
from services.schedule_service import ScheduleService
12+
13+
logger = logging.getLogger(__name__)
14+
15+
16+
@app_published_workflow_was_updated.connect
17+
def handle(sender, **kwargs):
18+
"""
19+
Handle app published workflow update event to sync workflow_schedule_plans table.
20+
21+
When a workflow is published, this handler will:
22+
1. Extract schedule trigger nodes from the workflow graph
23+
2. Compare with existing workflow_schedule_plans records
24+
3. Create/update/delete schedule plans as needed
25+
"""
26+
app = sender
27+
if app.mode != AppMode.WORKFLOW.value:
28+
return
29+
30+
published_workflow = kwargs.get("published_workflow")
31+
published_workflow = cast(Workflow, published_workflow)
32+
33+
sync_schedule_from_workflow(tenant_id=app.tenant_id, app_id=app.id, workflow=published_workflow)
34+
35+
36+
def sync_schedule_from_workflow(tenant_id: str, app_id: str, workflow: Workflow) -> Optional[WorkflowSchedulePlan]:
37+
"""
38+
Sync schedule plan from workflow graph configuration.
39+
40+
Args:
41+
tenant_id: Tenant ID
42+
app_id: App ID
43+
workflow: Published workflow instance
44+
45+
Returns:
46+
Updated or created WorkflowSchedulePlan, or None if no schedule node
47+
"""
48+
with Session(db.engine) as session:
49+
schedule_config = ScheduleService.extract_schedule_config(workflow)
50+
51+
existing_plan = session.scalar(
52+
select(WorkflowSchedulePlan).where(
53+
WorkflowSchedulePlan.tenant_id == tenant_id,
54+
WorkflowSchedulePlan.app_id == app_id,
55+
)
56+
)
57+
58+
if not schedule_config:
59+
if existing_plan:
60+
logger.info("No schedule node in workflow for app %s, removing schedule plan", app_id)
61+
ScheduleService.delete_schedule(session=session, schedule_id=existing_plan.id)
62+
session.commit()
63+
return None
64+
65+
if existing_plan:
66+
updates = SchedulePlanUpdate(
67+
node_id=schedule_config.node_id,
68+
cron_expression=schedule_config.cron_expression,
69+
timezone=schedule_config.timezone,
70+
)
71+
updated_plan = ScheduleService.update_schedule(
72+
session=session,
73+
schedule_id=existing_plan.id,
74+
updates=updates,
75+
)
76+
session.commit()
77+
return updated_plan
78+
else:
79+
new_plan = ScheduleService.create_schedule(
80+
session=session,
81+
tenant_id=tenant_id,
82+
app_id=app_id,
83+
config=schedule_config,
84+
)
85+
session.commit()
86+
return new_plan

0 commit comments

Comments
 (0)