diff --git a/api/.env.example b/api/.env.example index d27dbab885d1be..336b1f47bd90eb 100644 --- a/api/.env.example +++ b/api/.env.example @@ -505,6 +505,12 @@ ENABLE_CLEAN_MESSAGES=false ENABLE_MAIL_CLEAN_DOCUMENT_NOTIFY_TASK=false ENABLE_DATASETS_QUEUE_MONITOR=false ENABLE_CHECK_UPGRADABLE_PLUGIN_TASK=true +ENABLE_WORKFLOW_SCHEDULE_POLLER_TASK=true +# Interval time in minutes for polling scheduled workflows(default: 1 min) +WORKFLOW_SCHEDULE_POLLER_INTERVAL=1 +WORKFLOW_SCHEDULE_POLLER_BATCH_SIZE=100 +# Maximum number of scheduled workflows to dispatch per tick (0 for unlimited) +WORKFLOW_SCHEDULE_MAX_DISPATCH_PER_TICK=0 # Position configuration POSITION_TOOL_PINS= diff --git a/api/configs/feature/__init__.py b/api/configs/feature/__init__.py index 13bd935769343a..986c13085cd769 100644 --- a/api/configs/feature/__init__.py +++ b/api/configs/feature/__init__.py @@ -882,6 +882,22 @@ class CeleryScheduleTasksConfig(BaseSettings): description="Enable check upgradable plugin task", default=True, ) + ENABLE_WORKFLOW_SCHEDULE_POLLER_TASK: bool = Field( + description="Enable workflow schedule poller task", + default=True, + ) + WORKFLOW_SCHEDULE_POLLER_INTERVAL: int = Field( + description="Workflow schedule poller interval in minutes", + default=1, + ) + WORKFLOW_SCHEDULE_POLLER_BATCH_SIZE: int = Field( + description="Maximum number of schedules to process in each poll batch", + default=100, + ) + WORKFLOW_SCHEDULE_MAX_DISPATCH_PER_TICK: int = Field( + description="Maximum schedules to dispatch per tick (0=unlimited, circuit breaker)", + default=0, + ) class PositionConfig(BaseSettings): diff --git a/api/core/workflow/nodes/node_mapping.py b/api/core/workflow/nodes/node_mapping.py index 372c935a54c0d5..ca7b98098386ae 100644 --- a/api/core/workflow/nodes/node_mapping.py +++ b/api/core/workflow/nodes/node_mapping.py @@ -20,6 +20,7 @@ from core.workflow.nodes.template_transform import TemplateTransformNode from core.workflow.nodes.tool import ToolNode from core.workflow.nodes.trigger_plugin import TriggerPluginNode +from core.workflow.nodes.trigger_schedule import TriggerScheduleNode from core.workflow.nodes.trigger_webhook import TriggerWebhookNode from core.workflow.nodes.variable_aggregator import VariableAggregatorNode from core.workflow.nodes.variable_assigner.v1 import VariableAssignerNode as VariableAssignerNodeV1 @@ -142,4 +143,8 @@ LATEST_VERSION: TriggerPluginNode, "1": TriggerPluginNode, }, + NodeType.TRIGGER_SCHEDULE: { + LATEST_VERSION: TriggerScheduleNode, + "1": TriggerScheduleNode, + }, } diff --git a/api/core/workflow/nodes/trigger_schedule/__init__.py b/api/core/workflow/nodes/trigger_schedule/__init__.py new file mode 100644 index 00000000000000..6773bae5027235 --- /dev/null +++ b/api/core/workflow/nodes/trigger_schedule/__init__.py @@ -0,0 +1,3 @@ +from core.workflow.nodes.trigger_schedule.trigger_schedule_node import TriggerScheduleNode + +__all__ = ["TriggerScheduleNode"] diff --git a/api/core/workflow/nodes/trigger_schedule/entities.py b/api/core/workflow/nodes/trigger_schedule/entities.py new file mode 100644 index 00000000000000..0fea4dcda359ab --- /dev/null +++ b/api/core/workflow/nodes/trigger_schedule/entities.py @@ -0,0 +1,51 @@ +from typing import Literal, Optional, Union + +from pydantic import BaseModel, Field + +from core.workflow.nodes.base import BaseNodeData + + +class TriggerScheduleNodeData(BaseNodeData): + """ + Trigger Schedule Node Data + """ + + mode: str = Field(default="visual", description="Schedule mode: visual or cron") + frequency: Optional[str] = Field( + default=None, description="Frequency for visual mode: hourly, daily, weekly, monthly" + ) + cron_expression: Optional[str] = Field(default=None, description="Cron expression for cron mode") + visual_config: Optional[dict] = Field(default=None, description="Visual configuration details") + timezone: str = Field(default="UTC", description="Timezone for schedule execution") + + +class ScheduleConfig(BaseModel): + node_id: str + cron_expression: str + timezone: str = "UTC" + + +class SchedulePlanUpdate(BaseModel): + node_id: Optional[str] = None + cron_expression: Optional[str] = None + timezone: Optional[str] = None + + +class VisualConfig(BaseModel): + """Visual configuration for schedule trigger""" + + # For hourly frequency + on_minute: Optional[int] = Field(default=0, ge=0, le=59, description="Minute of the hour (0-59)") + + # For daily, weekly, monthly frequencies + time: Optional[str] = Field(default="12:00 PM", description="Time in 12-hour format (e.g., '2:30 PM')") + + # For weekly frequency + weekdays: Optional[list[Literal["sun", "mon", "tue", "wed", "thu", "fri", "sat"]]] = Field( + default=None, description="List of weekdays to run on" + ) + + # For monthly frequency + monthly_days: Optional[list[Union[int, Literal["last"]]]] = Field( + default=None, description="Days of month to run on (1-31 or 'last')" + ) diff --git a/api/core/workflow/nodes/trigger_schedule/exc.py b/api/core/workflow/nodes/trigger_schedule/exc.py new file mode 100644 index 00000000000000..2f99880ff138a6 --- /dev/null +++ b/api/core/workflow/nodes/trigger_schedule/exc.py @@ -0,0 +1,31 @@ +from core.workflow.nodes.base.exc import BaseNodeError + + +class ScheduleNodeError(BaseNodeError): + """Base schedule node error.""" + + pass + + +class ScheduleNotFoundError(ScheduleNodeError): + """Schedule not found error.""" + + pass + + +class ScheduleConfigError(ScheduleNodeError): + """Schedule configuration error.""" + + pass + + +class ScheduleExecutionError(ScheduleNodeError): + """Schedule execution error.""" + + pass + + +class TenantOwnerNotFoundError(ScheduleExecutionError): + """Tenant owner not found error for schedule execution.""" + + pass diff --git a/api/core/workflow/nodes/trigger_schedule/trigger_schedule_node.py b/api/core/workflow/nodes/trigger_schedule/trigger_schedule_node.py new file mode 100644 index 00000000000000..056bbc1f98dd8a --- /dev/null +++ b/api/core/workflow/nodes/trigger_schedule/trigger_schedule_node.py @@ -0,0 +1,62 @@ +from collections.abc import Mapping +from datetime import UTC, datetime +from typing import Any, Optional + +from core.workflow.entities.node_entities import NodeRunResult +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus +from core.workflow.nodes.base import BaseNode +from core.workflow.nodes.base.entities import BaseNodeData, RetryConfig +from core.workflow.nodes.enums import ErrorStrategy, NodeType +from core.workflow.nodes.trigger_schedule.entities import TriggerScheduleNodeData + + +class TriggerScheduleNode(BaseNode): + _node_type = NodeType.TRIGGER_SCHEDULE + + _node_data: TriggerScheduleNodeData + + def init_node_data(self, data: Mapping[str, Any]) -> None: + self._node_data = TriggerScheduleNodeData(**data) + + def _get_error_strategy(self) -> Optional[ErrorStrategy]: + return self._node_data.error_strategy + + def _get_retry_config(self) -> RetryConfig: + return self._node_data.retry_config + + def _get_title(self) -> str: + return self._node_data.title + + def _get_description(self) -> Optional[str]: + return self._node_data.desc + + def _get_default_value_dict(self) -> dict[str, Any]: + return self._node_data.default_value_dict + + def get_base_node_data(self) -> BaseNodeData: + return self._node_data + + @classmethod + def version(cls) -> str: + return "1" + + @classmethod + def get_default_config(cls, filters: Optional[dict] = None) -> dict: + return { + "type": "trigger-schedule", + "config": { + "mode": "visual", + "frequency": "weekly", + "visual_config": {"time": "11:30 AM", "on_minute": 0, "weekdays": ["sun"], "monthly_days": [1]}, + "timezone": "UTC", + }, + } + + def _run(self) -> NodeRunResult: + current_time = datetime.now(UTC) + node_outputs = {"current_time": current_time.isoformat()} + + return NodeRunResult( + status=WorkflowNodeExecutionStatus.SUCCEEDED, + outputs=node_outputs, + ) diff --git a/api/docker/entrypoint.sh b/api/docker/entrypoint.sh index df96512878e270..25d1a6d34d4838 100755 --- a/api/docker/entrypoint.sh +++ b/api/docker/entrypoint.sh @@ -34,10 +34,10 @@ if [[ "${MODE}" == "worker" ]]; then if [[ -z "${CELERY_QUEUES}" ]]; then if [[ "${EDITION}" == "CLOUD" ]]; then # Cloud edition: separate queues for dataset and trigger tasks - DEFAULT_QUEUES="dataset,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow_professional,workflow_team,workflow_sandbox" + DEFAULT_QUEUES="dataset,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow_professional,workflow_team,workflow_sandbox,schedule_poller,schedule_executor" else # Community edition (SELF_HOSTED): dataset and workflow have separate queues - DEFAULT_QUEUES="dataset,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow" + DEFAULT_QUEUES="dataset,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor" fi else DEFAULT_QUEUES="${CELERY_QUEUES}" diff --git a/api/events/event_handlers/__init__.py b/api/events/event_handlers/__init__.py index 4f0a163d893879..76b08ccc79157d 100644 --- a/api/events/event_handlers/__init__.py +++ b/api/events/event_handlers/__init__.py @@ -4,6 +4,7 @@ from .create_installed_app_when_app_created import handle from .create_site_record_when_app_created import handle from .delete_tool_parameters_cache_when_sync_draft_workflow import handle +from .sync_workflow_schedule_when_app_published import handle from .update_app_dataset_join_when_app_model_config_updated import handle from .update_app_dataset_join_when_app_published_workflow_updated import handle from .update_app_triggers_when_app_published_workflow_updated import handle diff --git a/api/events/event_handlers/sync_workflow_schedule_when_app_published.py b/api/events/event_handlers/sync_workflow_schedule_when_app_published.py new file mode 100644 index 00000000000000..928ce60bd28575 --- /dev/null +++ b/api/events/event_handlers/sync_workflow_schedule_when_app_published.py @@ -0,0 +1,86 @@ +import logging +from typing import Optional, cast + +from sqlalchemy import select +from sqlalchemy.orm import Session + +from core.workflow.nodes.trigger_schedule.entities import SchedulePlanUpdate +from events.app_event import app_published_workflow_was_updated +from extensions.ext_database import db +from models import AppMode, Workflow, WorkflowSchedulePlan +from services.schedule_service import ScheduleService + +logger = logging.getLogger(__name__) + + +@app_published_workflow_was_updated.connect +def handle(sender, **kwargs): + """ + Handle app published workflow update event to sync workflow_schedule_plans table. + + When a workflow is published, this handler will: + 1. Extract schedule trigger nodes from the workflow graph + 2. Compare with existing workflow_schedule_plans records + 3. Create/update/delete schedule plans as needed + """ + app = sender + if app.mode != AppMode.WORKFLOW.value: + return + + published_workflow = kwargs.get("published_workflow") + published_workflow = cast(Workflow, published_workflow) + + sync_schedule_from_workflow(tenant_id=app.tenant_id, app_id=app.id, workflow=published_workflow) + + +def sync_schedule_from_workflow(tenant_id: str, app_id: str, workflow: Workflow) -> Optional[WorkflowSchedulePlan]: + """ + Sync schedule plan from workflow graph configuration. + + Args: + tenant_id: Tenant ID + app_id: App ID + workflow: Published workflow instance + + Returns: + Updated or created WorkflowSchedulePlan, or None if no schedule node + """ + with Session(db.engine) as session: + schedule_config = ScheduleService.extract_schedule_config(workflow) + + existing_plan = session.scalar( + select(WorkflowSchedulePlan).where( + WorkflowSchedulePlan.tenant_id == tenant_id, + WorkflowSchedulePlan.app_id == app_id, + ) + ) + + if not schedule_config: + if existing_plan: + logger.info("No schedule node in workflow for app %s, removing schedule plan", app_id) + ScheduleService.delete_schedule(session=session, schedule_id=existing_plan.id) + session.commit() + return None + + if existing_plan: + updates = SchedulePlanUpdate( + node_id=schedule_config.node_id, + cron_expression=schedule_config.cron_expression, + timezone=schedule_config.timezone, + ) + updated_plan = ScheduleService.update_schedule( + session=session, + schedule_id=existing_plan.id, + updates=updates, + ) + session.commit() + return updated_plan + else: + new_plan = ScheduleService.create_schedule( + session=session, + tenant_id=tenant_id, + app_id=app_id, + config=schedule_config, + ) + session.commit() + return new_plan diff --git a/api/extensions/ext_celery.py b/api/extensions/ext_celery.py index 48b03731c71759..50fac304f51de8 100644 --- a/api/extensions/ext_celery.py +++ b/api/extensions/ext_celery.py @@ -160,6 +160,12 @@ def __call__(self, *args: object, **kwargs: object) -> object: "task": "schedule.clean_workflow_runlogs_precise.clean_workflow_runlogs_precise", "schedule": crontab(minute="0", hour="2"), } + if dify_config.ENABLE_WORKFLOW_SCHEDULE_POLLER_TASK: + imports.append("schedule.workflow_schedule_task") + beat_schedule["workflow_schedule_task"] = { + "task": "schedule.workflow_schedule_task.poll_workflow_schedules", + "schedule": timedelta(minutes=dify_config.WORKFLOW_SCHEDULE_POLLER_INTERVAL), + } celery_app.conf.update(beat_schedule=beat_schedule, imports=imports) return celery_app diff --git a/api/libs/schedule_utils.py b/api/libs/schedule_utils.py new file mode 100644 index 00000000000000..635550dd7fc28b --- /dev/null +++ b/api/libs/schedule_utils.py @@ -0,0 +1,97 @@ +from datetime import UTC, datetime +from typing import Optional + +import pytz +from croniter import croniter + + +def calculate_next_run_at( + cron_expression: str, + timezone: str, + base_time: Optional[datetime] = None, +) -> datetime: + """ + Calculate the next run time for a cron expression in a specific timezone. + + Args: + cron_expression: Cron expression string (supports croniter extensions like 'L') + timezone: Timezone string (e.g., 'UTC', 'America/New_York') + base_time: Base time to calculate from (defaults to current UTC time) + + Returns: + Next run time in UTC + + Note: + Supports croniter's extended syntax including: + - 'L' for last day of month + - Standard 5-field cron expressions + """ + + tz = pytz.timezone(timezone) + + if base_time is None: + base_time = datetime.now(UTC) + + base_time_tz = base_time.astimezone(tz) + cron = croniter(cron_expression, base_time_tz) + next_run_tz = cron.get_next(datetime) + next_run_utc = next_run_tz.astimezone(UTC) + + return next_run_utc + + +def convert_12h_to_24h(time_str: str) -> tuple[int, int]: + """ + Parse 12-hour time format to 24-hour format for cron compatibility. + + Args: + time_str: Time string in format "HH:MM AM/PM" (e.g., "12:30 PM") + + Returns: + Tuple of (hour, minute) in 24-hour format + + Raises: + ValueError: If time string format is invalid or values are out of range + + Examples: + - "12:00 AM" -> (0, 0) # Midnight + - "12:00 PM" -> (12, 0) # Noon + - "1:30 PM" -> (13, 30) + - "11:59 PM" -> (23, 59) + """ + if not time_str or not time_str.strip(): + raise ValueError("Time string cannot be empty") + + parts = time_str.strip().split() + if len(parts) != 2: + raise ValueError(f"Invalid time format: '{time_str}'. Expected 'HH:MM AM/PM'") + + time_part, period = parts + period = period.upper() + + if period not in ["AM", "PM"]: + raise ValueError(f"Invalid period: '{period}'. Must be 'AM' or 'PM'") + + time_parts = time_part.split(":") + if len(time_parts) != 2: + raise ValueError(f"Invalid time format: '{time_part}'. Expected 'HH:MM'") + + try: + hour = int(time_parts[0]) + minute = int(time_parts[1]) + except ValueError as e: + raise ValueError(f"Invalid time values: {e}") + + if hour < 1 or hour > 12: + raise ValueError(f"Invalid hour: {hour}. Must be between 1 and 12") + + if minute < 0 or minute > 59: + raise ValueError(f"Invalid minute: {minute}. Must be between 0 and 59") + + # Handle 12-hour to 24-hour edge cases + if period == "PM" and hour != 12: + hour += 12 + elif period == "AM" and hour == 12: + hour = 0 + + return hour, minute diff --git a/api/migrations/versions/2025_08_23_2038-4558cfabe44e_add_workflow_trigger_logs.py b/api/migrations/versions/2025_08_23_2038-4558cfabe44e_add_workflow_trigger_logs.py index 4a4f5e8ab6ac95..205d7aea827e60 100644 --- a/api/migrations/versions/2025_08_23_2038-4558cfabe44e_add_workflow_trigger_logs.py +++ b/api/migrations/versions/2025_08_23_2038-4558cfabe44e_add_workflow_trigger_logs.py @@ -1,4 +1,4 @@ -"""empty message +"""Add workflow trigger logs table Revision ID: 4558cfabe44e Revises: 0e154742a5fa diff --git a/api/migrations/versions/2025_08_28_2052-c19938f630b6_.py b/api/migrations/versions/2025_08_28_2052-c19938f630b6_.py new file mode 100644 index 00000000000000..621dcc8a270c08 --- /dev/null +++ b/api/migrations/versions/2025_08_28_2052-c19938f630b6_.py @@ -0,0 +1,47 @@ +"""Add workflow schedule plan table + +Revision ID: c19938f630b6 +Revises: 9ee7d347f4c1 +Create Date: 2025-08-28 20:52:41.300028 + +""" +from alembic import op +import models as models +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'c19938f630b6' +down_revision = '9ee7d347f4c1' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('workflow_schedule_plans', + sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuidv7()'), nullable=False), + sa.Column('app_id', models.types.StringUUID(), nullable=False), + sa.Column('node_id', sa.String(length=64), nullable=False), + sa.Column('tenant_id', models.types.StringUUID(), nullable=False), + sa.Column('cron_expression', sa.String(length=255), nullable=False), + sa.Column('timezone', sa.String(length=64), nullable=False), + sa.Column('next_run_at', sa.DateTime(), nullable=True), + sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), + sa.Column('updated_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), + sa.PrimaryKeyConstraint('id', name='workflow_schedule_plan_pkey'), + sa.UniqueConstraint('app_id', 'node_id', name='uniq_app_node') + ) + with op.batch_alter_table('workflow_schedule_plans', schema=None) as batch_op: + batch_op.create_index('workflow_schedule_plan_next_idx', ['next_run_at'], unique=False) + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('workflow_schedule_plans', schema=None) as batch_op: + batch_op.drop_index('workflow_schedule_plan_next_idx') + + op.drop_table('workflow_schedule_plans') + # ### end Alembic commands ### diff --git a/api/models/__init__.py b/api/models/__init__.py index 42c2fb6eccac73..7df4790fe50f0d 100644 --- a/api/models/__init__.py +++ b/api/models/__init__.py @@ -92,6 +92,7 @@ WorkflowNodeExecutionModel, WorkflowNodeExecutionTriggeredFrom, WorkflowRun, + WorkflowSchedulePlan, WorkflowType, ) @@ -185,6 +186,7 @@ "WorkflowNodeExecutionTriggeredFrom", "WorkflowRun", "WorkflowRunTriggeredFrom", + "WorkflowSchedulePlan", "WorkflowToolProvider", "WorkflowType", "db", diff --git a/api/models/workflow.py b/api/models/workflow.py index 6dfb83510b0f7a..10b1837a34da77 100644 --- a/api/models/workflow.py +++ b/api/models/workflow.py @@ -1525,3 +1525,60 @@ class AppTrigger(Base): default=naive_utc_now(), server_onupdate=func.current_timestamp(), ) + + +class WorkflowSchedulePlan(Base): + """ + Workflow Schedule Configuration + + Store schedule configurations for time-based workflow triggers. + Uses cron expressions with timezone support for flexible scheduling. + + Attributes: + - id (uuid) Primary key + - app_id (uuid) App ID to bind to a specific app + - node_id (varchar) Starting node ID for workflow execution + - tenant_id (uuid) Workspace ID for multi-tenancy + - cron_expression (varchar) Cron expression defining schedule pattern + - timezone (varchar) Timezone for cron evaluation (e.g., 'Asia/Shanghai') + - next_run_at (timestamp) Next scheduled execution time + - created_at (timestamp) Creation timestamp + - updated_at (timestamp) Last update timestamp + """ + + __tablename__ = "workflow_schedule_plans" + __table_args__ = ( + sa.PrimaryKeyConstraint("id", name="workflow_schedule_plan_pkey"), + sa.UniqueConstraint("app_id", "node_id", name="uniq_app_node"), + sa.Index("workflow_schedule_plan_next_idx", "next_run_at"), + ) + + id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuidv7()")) + app_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + node_id: Mapped[str] = mapped_column(String(64), nullable=False) + tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + + # Schedule configuration + cron_expression: Mapped[str] = mapped_column(String(255), nullable=False) + timezone: Mapped[str] = mapped_column(String(64), nullable=False) + + # Schedule control + next_run_at: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True) + created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp()) + updated_at: Mapped[datetime] = mapped_column( + DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp() + ) + + def to_dict(self) -> dict: + """Convert to dictionary representation""" + return { + "id": self.id, + "app_id": self.app_id, + "node_id": self.node_id, + "tenant_id": self.tenant_id, + "cron_expression": self.cron_expression, + "timezone": self.timezone, + "next_run_at": self.next_run_at.isoformat() if self.next_run_at else None, + "created_at": self.created_at.isoformat(), + "updated_at": self.updated_at.isoformat(), + } diff --git a/api/pyproject.toml b/api/pyproject.toml index 3078202498a980..afaf84dd57be2e 100644 --- a/api/pyproject.toml +++ b/api/pyproject.toml @@ -88,6 +88,7 @@ dependencies = [ "httpx-sse>=0.4.0", "sendgrid~=6.12.3", "flask-restx>=1.3.0", + "croniter>=6.0.0", ] # Before adding new dependency, consider place it in # alphabet order (a-z) and suitable group. diff --git a/api/schedule/workflow_schedule_task.py b/api/schedule/workflow_schedule_task.py new file mode 100644 index 00000000000000..30e00ee27cb97f --- /dev/null +++ b/api/schedule/workflow_schedule_task.py @@ -0,0 +1,127 @@ +import logging + +from celery import group, shared_task +from sqlalchemy import and_, select +from sqlalchemy.orm import Session, sessionmaker + +from configs import dify_config +from extensions.ext_database import db +from libs.datetime_utils import naive_utc_now +from libs.schedule_utils import calculate_next_run_at +from models.workflow import AppTrigger, AppTriggerStatus, AppTriggerType, WorkflowSchedulePlan +from services.workflow.queue_dispatcher import QueueDispatcherManager +from tasks.workflow_schedule_tasks import run_schedule_trigger + +logger = logging.getLogger(__name__) + + +@shared_task(queue="schedule_poller") +def poll_workflow_schedules() -> None: + """ + Poll and process due workflow schedules. + + Streaming flow: + 1. Fetch due schedules in batches + 2. Process each batch until all due schedules are handled + 3. Optional: Limit total dispatches per tick as a circuit breaker + """ + session_factory = sessionmaker(bind=db.engine, expire_on_commit=False) + + with session_factory() as session: + total_dispatched = 0 + total_rate_limited = 0 + + # Process in batches until we've handled all due schedules or hit the limit + while True: + due_schedules = _fetch_due_schedules(session) + + if not due_schedules: + break + + dispatched_count, rate_limited_count = _process_schedules(session, due_schedules) + total_dispatched += dispatched_count + total_rate_limited += rate_limited_count + + logger.debug("Batch processed: %d dispatched, %d rate limited", dispatched_count, rate_limited_count) + + # Circuit breaker: check if we've hit the per-tick limit (if enabled) + if ( + dify_config.WORKFLOW_SCHEDULE_MAX_DISPATCH_PER_TICK > 0 + and total_dispatched >= dify_config.WORKFLOW_SCHEDULE_MAX_DISPATCH_PER_TICK + ): + logger.warning( + "Circuit breaker activated: reached dispatch limit (%d), will continue next tick", + dify_config.WORKFLOW_SCHEDULE_MAX_DISPATCH_PER_TICK, + ) + break + + if total_dispatched > 0 or total_rate_limited > 0: + logger.info("Total processed: %d dispatched, %d rate limited", total_dispatched, total_rate_limited) + + +def _fetch_due_schedules(session: Session) -> list[WorkflowSchedulePlan]: + """ + Fetch a batch of due schedules, sorted by most overdue first. + + Returns up to WORKFLOW_SCHEDULE_POLLER_BATCH_SIZE schedules per call. + Used in a loop to progressively process all due schedules. + """ + now = naive_utc_now() + + due_schedules = session.scalars( + ( + select(WorkflowSchedulePlan) + .join( + AppTrigger, + and_( + AppTrigger.app_id == WorkflowSchedulePlan.app_id, + AppTrigger.node_id == WorkflowSchedulePlan.node_id, + AppTrigger.trigger_type == AppTriggerType.TRIGGER_SCHEDULE, + ), + ) + .where( + WorkflowSchedulePlan.next_run_at <= now, + WorkflowSchedulePlan.next_run_at.isnot(None), + AppTrigger.status == AppTriggerStatus.ENABLED, + ) + ) + .order_by(WorkflowSchedulePlan.next_run_at.asc()) + .with_for_update(skip_locked=True) + .limit(dify_config.WORKFLOW_SCHEDULE_POLLER_BATCH_SIZE) + ) + + return list(due_schedules) + + +def _process_schedules(session: Session, schedules: list[WorkflowSchedulePlan]) -> tuple[int, int]: + """Process schedules: check quota, update next run time and dispatch to Celery in parallel.""" + if not schedules: + return 0, 0 + + dispatcher_manager = QueueDispatcherManager() + tasks_to_dispatch = [] + rate_limited_count = 0 + + for schedule in schedules: + next_run_at = calculate_next_run_at( + schedule.cron_expression, + schedule.timezone, + ) + schedule.next_run_at = next_run_at + + dispatcher = dispatcher_manager.get_dispatcher(schedule.tenant_id) + if not dispatcher.check_daily_quota(schedule.tenant_id): + logger.info("Tenant %s rate limited, skipping schedule_plan %s", schedule.tenant_id, schedule.id) + rate_limited_count += 1 + else: + tasks_to_dispatch.append(schedule.id) + + if tasks_to_dispatch: + job = group(run_schedule_trigger.s(schedule_id) for schedule_id in tasks_to_dispatch) + job.apply_async() + + logger.debug("Dispatched %d tasks in parallel", len(tasks_to_dispatch)) + + session.commit() + + return len(tasks_to_dispatch), rate_limited_count diff --git a/api/services/schedule_service.py b/api/services/schedule_service.py new file mode 100644 index 00000000000000..333eeb2cc46e43 --- /dev/null +++ b/api/services/schedule_service.py @@ -0,0 +1,274 @@ +import json +import logging +from datetime import datetime +from typing import Optional + +from sqlalchemy import select +from sqlalchemy.orm import Session + +from core.workflow.nodes import NodeType +from core.workflow.nodes.trigger_schedule.entities import ScheduleConfig, SchedulePlanUpdate, VisualConfig +from core.workflow.nodes.trigger_schedule.exc import ScheduleConfigError, ScheduleNotFoundError +from libs.schedule_utils import calculate_next_run_at, convert_12h_to_24h +from models.account import Account, TenantAccountJoin +from models.workflow import Workflow, WorkflowSchedulePlan + +logger = logging.getLogger(__name__) + + +class ScheduleService: + @staticmethod + def create_schedule( + session: Session, + tenant_id: str, + app_id: str, + config: ScheduleConfig, + ) -> WorkflowSchedulePlan: + """ + Create a new schedule with validated configuration. + + Args: + session: Database session + tenant_id: Tenant ID + app_id: Application ID + config: Validated schedule configuration + + Returns: + Created WorkflowSchedulePlan instance + """ + next_run_at = calculate_next_run_at( + config.cron_expression, + config.timezone, + ) + + schedule = WorkflowSchedulePlan( + tenant_id=tenant_id, + app_id=app_id, + node_id=config.node_id, + cron_expression=config.cron_expression, + timezone=config.timezone, + next_run_at=next_run_at, + ) + + session.add(schedule) + session.flush() + + return schedule + + @staticmethod + def update_schedule( + session: Session, + schedule_id: str, + updates: SchedulePlanUpdate, + ) -> WorkflowSchedulePlan: + """ + Update an existing schedule with validated configuration. + + Args: + session: Database session + schedule_id: Schedule ID to update + updates: Validated update configuration + + Raises: + ScheduleNotFoundError: If schedule not found + + Returns: + Updated WorkflowSchedulePlan instance + """ + schedule = session.get(WorkflowSchedulePlan, schedule_id) + if not schedule: + raise ScheduleNotFoundError(f"Schedule not found: {schedule_id}") + + # If time-related fields are updated, synchronously update the next_run_at. + time_fields_updated = False + + if updates.node_id is not None: + schedule.node_id = updates.node_id + + if updates.cron_expression is not None: + schedule.cron_expression = updates.cron_expression + time_fields_updated = True + + if updates.timezone is not None: + schedule.timezone = updates.timezone + time_fields_updated = True + + if time_fields_updated: + schedule.next_run_at = calculate_next_run_at( + schedule.cron_expression, + schedule.timezone, + ) + + session.flush() + return schedule + + @staticmethod + def delete_schedule( + session: Session, + schedule_id: str, + ) -> None: + """ + Delete a schedule plan. + + Args: + session: Database session + schedule_id: Schedule ID to delete + """ + schedule = session.get(WorkflowSchedulePlan, schedule_id) + if not schedule: + raise ScheduleNotFoundError(f"Schedule not found: {schedule_id}") + + session.delete(schedule) + session.flush() + + @staticmethod + def get_tenant_owner(session: Session, tenant_id: str) -> Optional[Account]: + """ + Returns an account to execute scheduled workflows on behalf of the tenant. + Prioritizes owner over admin to ensure proper authorization hierarchy. + """ + result = session.execute( + select(TenantAccountJoin) + .where(TenantAccountJoin.tenant_id == tenant_id, TenantAccountJoin.role == "owner") + .limit(1) + ).scalar_one_or_none() + + if not result: + # Owner may not exist in some tenant configurations, fallback to admin + result = session.execute( + select(TenantAccountJoin) + .where(TenantAccountJoin.tenant_id == tenant_id, TenantAccountJoin.role == "admin") + .limit(1) + ).scalar_one_or_none() + + if result: + return session.get(Account, result.account_id) + + @staticmethod + def update_next_run_at( + session: Session, + schedule_id: str, + ) -> datetime: + """ + Advances the schedule to its next execution time after a successful trigger. + Uses current time as base to prevent missing executions during delays. + """ + schedule = session.get(WorkflowSchedulePlan, schedule_id) + if not schedule: + raise ScheduleNotFoundError(f"Schedule not found: {schedule_id}") + + # Base on current time to handle execution delays gracefully + next_run_at = calculate_next_run_at( + schedule.cron_expression, + schedule.timezone, + ) + + schedule.next_run_at = next_run_at + session.flush() + return next_run_at + + @staticmethod + def extract_schedule_config(workflow: Workflow) -> Optional[ScheduleConfig]: + """ + Extracts schedule configuration from workflow graph. + + Searches for the first schedule trigger node in the workflow and converts + its configuration (either visual or cron mode) into a unified ScheduleConfig. + + Args: + workflow: The workflow containing the graph definition + + Returns: + ScheduleConfig if a valid schedule node is found, None if no schedule node exists + + Raises: + ScheduleConfigError: If graph parsing fails or schedule configuration is invalid + + Note: + Currently only returns the first schedule node found. + Multiple schedule nodes in the same workflow are not supported. + """ + try: + graph_data = workflow.graph_dict + except (json.JSONDecodeError, TypeError, AttributeError) as e: + raise ScheduleConfigError(f"Failed to parse workflow graph: {e}") + + if not graph_data: + raise ScheduleConfigError("Workflow graph is empty") + + nodes = graph_data.get("nodes", []) + for node in nodes: + node_data = node.get("data", {}) + + if node_data.get("type") != NodeType.TRIGGER_SCHEDULE.value: + continue + + mode = node_data.get("mode", "visual") + timezone = node_data.get("timezone", "UTC") + node_id = node.get("id", "start") + + cron_expression = None + if mode == "cron": + cron_expression = node_data.get("cron_expression") + if not cron_expression: + raise ScheduleConfigError("Cron expression is required for cron mode") + elif mode == "visual": + frequency = node_data.get("frequency") + visual_config_dict = node_data.get("visual_config", {}) + visual_config = VisualConfig(**visual_config_dict) + cron_expression = ScheduleService.visual_to_cron(frequency, visual_config) + else: + raise ScheduleConfigError(f"Invalid schedule mode: {mode}") + + return ScheduleConfig(node_id=node_id, cron_expression=cron_expression, timezone=timezone) + + @staticmethod + def visual_to_cron(frequency: str, visual_config: VisualConfig) -> str: + """ + Converts user-friendly visual schedule settings to cron expression. + Maintains consistency with frontend UI expectations while supporting croniter's extended syntax. + """ + if frequency == "hourly": + if visual_config.on_minute is None: + raise ScheduleConfigError("on_minute is required for hourly schedules") + return f"{visual_config.on_minute} * * * *" + + elif frequency == "daily": + if not visual_config.time: + raise ScheduleConfigError("time is required for daily schedules") + hour, minute = convert_12h_to_24h(visual_config.time) + return f"{minute} {hour} * * *" + + elif frequency == "weekly": + if not visual_config.time: + raise ScheduleConfigError("time is required for weekly schedules") + if not visual_config.weekdays: + raise ScheduleConfigError("Weekdays are required for weekly schedules") + hour, minute = convert_12h_to_24h(visual_config.time) + weekday_map = {"sun": "0", "mon": "1", "tue": "2", "wed": "3", "thu": "4", "fri": "5", "sat": "6"} + cron_weekdays = [weekday_map[day] for day in visual_config.weekdays] + return f"{minute} {hour} * * {','.join(sorted(cron_weekdays))}" + + elif frequency == "monthly": + if not visual_config.time: + raise ScheduleConfigError("time is required for monthly schedules") + if not visual_config.monthly_days: + raise ScheduleConfigError("Monthly days are required for monthly schedules") + hour, minute = convert_12h_to_24h(visual_config.time) + + numeric_days = [] + has_last = False + for day in visual_config.monthly_days: + if day == "last": + has_last = True + else: + numeric_days.append(day) + + result_days = [str(d) for d in sorted(set(numeric_days))] + if has_last: + result_days.append("L") + + return f"{minute} {hour} {','.join(result_days)} * *" + + else: + raise ScheduleConfigError(f"Unsupported frequency: {frequency}") diff --git a/api/tasks/workflow_schedule_tasks.py b/api/tasks/workflow_schedule_tasks.py new file mode 100644 index 00000000000000..17f4e0751c9eee --- /dev/null +++ b/api/tasks/workflow_schedule_tasks.py @@ -0,0 +1,69 @@ +import logging +from datetime import UTC, datetime +from zoneinfo import ZoneInfo + +from celery import shared_task +from sqlalchemy.orm import sessionmaker + +from core.workflow.nodes.trigger_schedule.exc import ( + ScheduleExecutionError, + ScheduleNotFoundError, + TenantOwnerNotFoundError, +) +from extensions.ext_database import db +from models.enums import WorkflowRunTriggeredFrom +from models.workflow import WorkflowSchedulePlan +from services.async_workflow_service import AsyncWorkflowService +from services.schedule_service import ScheduleService +from services.workflow.entities import TriggerData + +logger = logging.getLogger(__name__) + + +@shared_task(queue="schedule_executor") +def run_schedule_trigger(schedule_id: str) -> None: + """ + Execute a scheduled workflow trigger. + + Note: No retry logic needed as schedules will run again at next interval. + The execution result is tracked via WorkflowTriggerLog. + + Raises: + ScheduleNotFoundError: If schedule doesn't exist + TenantOwnerNotFoundError: If no owner/admin for tenant + ScheduleExecutionError: If workflow trigger fails + """ + session_factory = sessionmaker(bind=db.engine, expire_on_commit=False) + + with session_factory() as session: + schedule = session.get(WorkflowSchedulePlan, schedule_id) + if not schedule: + raise ScheduleNotFoundError(f"Schedule {schedule_id} not found") + + tenant_owner = ScheduleService.get_tenant_owner(session, schedule.tenant_id) + if not tenant_owner: + raise TenantOwnerNotFoundError(f"No owner or admin found for tenant {schedule.tenant_id}") + + try: + current_utc = datetime.now(UTC) + schedule_tz = ZoneInfo(schedule.timezone) if schedule.timezone else UTC + current_in_tz = current_utc.astimezone(schedule_tz) + inputs = {"current_time": current_in_tz.isoformat()} + + response = AsyncWorkflowService.trigger_workflow_async( + session=session, + user=tenant_owner, + trigger_data=TriggerData( + app_id=schedule.app_id, + root_node_id=schedule.node_id, + trigger_type=WorkflowRunTriggeredFrom.SCHEDULE, + inputs=inputs, + tenant_id=schedule.tenant_id, + ), + ) + logger.info("Schedule %s triggered workflow: %s", schedule_id, response.workflow_trigger_log_id) + + except Exception as e: + raise ScheduleExecutionError( + f"Failed to trigger workflow for schedule {schedule_id}, app {schedule.app_id}" + ) from e diff --git a/api/tests/unit_tests/extensions/test_celery_ssl.py b/api/tests/unit_tests/extensions/test_celery_ssl.py index bc46fe8322a4a6..d33b7eaf23f1bf 100644 --- a/api/tests/unit_tests/extensions/test_celery_ssl.py +++ b/api/tests/unit_tests/extensions/test_celery_ssl.py @@ -131,6 +131,10 @@ def test_celery_init_applies_ssl_to_broker_and_backend(self): mock_config.ENABLE_MAIL_CLEAN_DOCUMENT_NOTIFY_TASK = False mock_config.ENABLE_DATASETS_QUEUE_MONITOR = False mock_config.ENABLE_CHECK_UPGRADABLE_PLUGIN_TASK = False + mock_config.ENABLE_WORKFLOW_SCHEDULE_POLLER_TASK = False + mock_config.WORKFLOW_SCHEDULE_POLLER_INTERVAL = 1 + mock_config.WORKFLOW_SCHEDULE_POLLER_BATCH_SIZE = 100 + mock_config.WORKFLOW_SCHEDULE_MAX_DISPATCH_PER_TICK = 0 with patch("extensions.ext_celery.dify_config", mock_config): from dify_app import DifyApp diff --git a/api/tests/unit_tests/services/test_schedule_service.py b/api/tests/unit_tests/services/test_schedule_service.py new file mode 100644 index 00000000000000..59849bc3f4abfd --- /dev/null +++ b/api/tests/unit_tests/services/test_schedule_service.py @@ -0,0 +1,780 @@ +import unittest +from datetime import UTC, datetime +from unittest.mock import MagicMock, Mock, patch + +import pytest +from sqlalchemy.orm import Session + +from core.workflow.nodes.trigger_schedule.entities import ScheduleConfig, SchedulePlanUpdate, VisualConfig +from core.workflow.nodes.trigger_schedule.exc import ScheduleConfigError +from events.event_handlers.sync_workflow_schedule_when_app_published import ( + sync_schedule_from_workflow, +) +from libs.schedule_utils import calculate_next_run_at, convert_12h_to_24h +from models.account import Account, TenantAccountJoin +from models.workflow import Workflow, WorkflowSchedulePlan +from services.schedule_service import ScheduleService + + +class TestScheduleService(unittest.TestCase): + """Test cases for ScheduleService class.""" + + def test_calculate_next_run_at_valid_cron(self): + """Test calculating next run time with valid cron expression.""" + # Test daily cron at 10:30 AM + cron_expr = "30 10 * * *" + timezone = "UTC" + base_time = datetime(2025, 8, 29, 9, 0, 0, tzinfo=UTC) + + next_run = calculate_next_run_at(cron_expr, timezone, base_time) + + assert next_run is not None + assert next_run.hour == 10 + assert next_run.minute == 30 + assert next_run.day == 29 + + def test_calculate_next_run_at_with_timezone(self): + """Test calculating next run time with different timezone.""" + cron_expr = "0 9 * * *" # 9:00 AM + timezone = "America/New_York" + base_time = datetime(2025, 8, 29, 12, 0, 0, tzinfo=UTC) # 8:00 AM EDT + + next_run = calculate_next_run_at(cron_expr, timezone, base_time) + + assert next_run is not None + # 9:00 AM EDT = 13:00 UTC (during EDT) + expected_utc_hour = 13 + assert next_run.hour == expected_utc_hour + + def test_calculate_next_run_at_with_last_day_of_month(self): + """Test calculating next run time with 'L' (last day) syntax.""" + cron_expr = "0 10 L * *" # 10:00 AM on last day of month + timezone = "UTC" + base_time = datetime(2025, 2, 15, 9, 0, 0, tzinfo=UTC) + + next_run = calculate_next_run_at(cron_expr, timezone, base_time) + + assert next_run is not None + # February 2025 has 28 days + assert next_run.day == 28 + assert next_run.month == 2 + + def test_calculate_next_run_at_invalid_cron(self): + """Test calculating next run time with invalid cron expression.""" + from croniter import CroniterBadCronError + + cron_expr = "invalid cron" + timezone = "UTC" + + with pytest.raises(CroniterBadCronError): + calculate_next_run_at(cron_expr, timezone) + + def test_calculate_next_run_at_invalid_timezone(self): + """Test calculating next run time with invalid timezone.""" + from pytz import UnknownTimeZoneError + + cron_expr = "30 10 * * *" + timezone = "Invalid/Timezone" + + with pytest.raises(UnknownTimeZoneError): + calculate_next_run_at(cron_expr, timezone) + + @patch("libs.schedule_utils.calculate_next_run_at") + def test_create_schedule(self, mock_calculate_next_run): + """Test creating a new schedule.""" + mock_session = MagicMock(spec=Session) + mock_calculate_next_run.return_value = datetime(2025, 8, 30, 10, 30, 0, tzinfo=UTC) + + config = ScheduleConfig( + node_id="start", + cron_expression="30 10 * * *", + timezone="UTC", + ) + + schedule = ScheduleService.create_schedule( + session=mock_session, + tenant_id="test-tenant", + app_id="test-app", + config=config, + ) + + assert schedule is not None + assert schedule.tenant_id == "test-tenant" + assert schedule.app_id == "test-app" + assert schedule.node_id == "start" + assert schedule.cron_expression == "30 10 * * *" + assert schedule.timezone == "UTC" + assert schedule.next_run_at is not None + mock_session.add.assert_called_once() + mock_session.flush.assert_called_once() + + @patch("services.schedule_service.calculate_next_run_at") + def test_update_schedule(self, mock_calculate_next_run): + """Test updating an existing schedule.""" + mock_session = MagicMock(spec=Session) + mock_schedule = Mock(spec=WorkflowSchedulePlan) + mock_schedule.cron_expression = "0 12 * * *" + mock_schedule.timezone = "America/New_York" + mock_session.get.return_value = mock_schedule + mock_calculate_next_run.return_value = datetime(2025, 8, 30, 12, 0, 0, tzinfo=UTC) + + updates = SchedulePlanUpdate( + cron_expression="0 12 * * *", + timezone="America/New_York", + ) + + result = ScheduleService.update_schedule( + session=mock_session, + schedule_id="test-schedule-id", + updates=updates, + ) + + assert result is not None + assert result.cron_expression == "0 12 * * *" + assert result.timezone == "America/New_York" + mock_calculate_next_run.assert_called_once() + mock_session.flush.assert_called_once() + + def test_update_schedule_not_found(self): + """Test updating a non-existent schedule raises exception.""" + from core.workflow.nodes.trigger_schedule.exc import ScheduleNotFoundError + + mock_session = MagicMock(spec=Session) + mock_session.get.return_value = None + + updates = SchedulePlanUpdate( + cron_expression="0 12 * * *", + ) + + with pytest.raises(ScheduleNotFoundError) as context: + ScheduleService.update_schedule( + session=mock_session, + schedule_id="non-existent-id", + updates=updates, + ) + + assert "Schedule not found: non-existent-id" in str(context.value) + mock_session.flush.assert_not_called() + + def test_delete_schedule(self): + """Test deleting a schedule.""" + mock_session = MagicMock(spec=Session) + mock_schedule = Mock(spec=WorkflowSchedulePlan) + mock_session.get.return_value = mock_schedule + + # Should not raise exception and complete successfully + ScheduleService.delete_schedule( + session=mock_session, + schedule_id="test-schedule-id", + ) + + mock_session.delete.assert_called_once_with(mock_schedule) + mock_session.flush.assert_called_once() + + def test_delete_schedule_not_found(self): + """Test deleting a non-existent schedule raises exception.""" + from core.workflow.nodes.trigger_schedule.exc import ScheduleNotFoundError + + mock_session = MagicMock(spec=Session) + mock_session.get.return_value = None + + # Should raise ScheduleNotFoundError + with pytest.raises(ScheduleNotFoundError) as context: + ScheduleService.delete_schedule( + session=mock_session, + schedule_id="non-existent-id", + ) + + assert "Schedule not found: non-existent-id" in str(context.value) + mock_session.delete.assert_not_called() + + @patch("services.schedule_service.select") + def test_get_tenant_owner(self, mock_select): + """Test getting tenant owner account.""" + mock_session = MagicMock(spec=Session) + mock_account = Mock(spec=Account) + mock_account.id = "owner-account-id" + + # Mock owner query + mock_owner_result = Mock(spec=TenantAccountJoin) + mock_owner_result.account_id = "owner-account-id" + + mock_session.execute.return_value.scalar_one_or_none.return_value = mock_owner_result + mock_session.get.return_value = mock_account + + result = ScheduleService.get_tenant_owner( + session=mock_session, + tenant_id="test-tenant", + ) + + assert result is not None + assert result.id == "owner-account-id" + + @patch("services.schedule_service.select") + def test_get_tenant_owner_fallback_to_admin(self, mock_select): + """Test getting tenant owner falls back to admin if no owner.""" + mock_session = MagicMock(spec=Session) + mock_account = Mock(spec=Account) + mock_account.id = "admin-account-id" + + # Mock admin query (owner returns None) + mock_admin_result = Mock(spec=TenantAccountJoin) + mock_admin_result.account_id = "admin-account-id" + + mock_session.execute.return_value.scalar_one_or_none.side_effect = [None, mock_admin_result] + mock_session.get.return_value = mock_account + + result = ScheduleService.get_tenant_owner( + session=mock_session, + tenant_id="test-tenant", + ) + + assert result is not None + assert result.id == "admin-account-id" + + @patch("services.schedule_service.calculate_next_run_at") + def test_update_next_run_at(self, mock_calculate_next_run): + """Test updating next run time after schedule triggered.""" + mock_session = MagicMock(spec=Session) + mock_schedule = Mock(spec=WorkflowSchedulePlan) + mock_schedule.cron_expression = "30 10 * * *" + mock_schedule.timezone = "UTC" + mock_session.get.return_value = mock_schedule + + next_time = datetime(2025, 8, 31, 10, 30, 0, tzinfo=UTC) + mock_calculate_next_run.return_value = next_time + + result = ScheduleService.update_next_run_at( + session=mock_session, + schedule_id="test-schedule-id", + ) + + assert result == next_time + assert mock_schedule.next_run_at == next_time + mock_session.flush.assert_called_once() + + +class TestVisualToCron(unittest.TestCase): + """Test cases for visual configuration to cron conversion.""" + + def test_visual_to_cron_hourly(self): + """Test converting hourly visual config to cron.""" + visual_config = VisualConfig(on_minute=15) + result = ScheduleService.visual_to_cron("hourly", visual_config) + assert result == "15 * * * *" + + def test_visual_to_cron_daily(self): + """Test converting daily visual config to cron.""" + visual_config = VisualConfig(time="2:30 PM") + result = ScheduleService.visual_to_cron("daily", visual_config) + assert result == "30 14 * * *" + + def test_visual_to_cron_weekly(self): + """Test converting weekly visual config to cron.""" + visual_config = VisualConfig( + time="10:00 AM", + weekdays=["mon", "wed", "fri"], + ) + result = ScheduleService.visual_to_cron("weekly", visual_config) + assert result == "0 10 * * 1,3,5" + + def test_visual_to_cron_monthly_with_specific_days(self): + """Test converting monthly visual config with specific days.""" + visual_config = VisualConfig( + time="11:30 AM", + monthly_days=[1, 15], + ) + result = ScheduleService.visual_to_cron("monthly", visual_config) + assert result == "30 11 1,15 * *" + + def test_visual_to_cron_monthly_with_last_day(self): + """Test converting monthly visual config with last day using 'L' syntax.""" + visual_config = VisualConfig( + time="11:30 AM", + monthly_days=[1, "last"], + ) + result = ScheduleService.visual_to_cron("monthly", visual_config) + assert result == "30 11 1,L * *" + + def test_visual_to_cron_monthly_only_last_day(self): + """Test converting monthly visual config with only last day.""" + visual_config = VisualConfig( + time="9:00 PM", + monthly_days=["last"], + ) + result = ScheduleService.visual_to_cron("monthly", visual_config) + assert result == "0 21 L * *" + + def test_visual_to_cron_monthly_with_end_days_and_last(self): + """Test converting monthly visual config with days 29, 30, 31 and 'last'.""" + visual_config = VisualConfig( + time="3:45 PM", + monthly_days=[29, 30, 31, "last"], + ) + result = ScheduleService.visual_to_cron("monthly", visual_config) + # Should have 29,30,31,L - the L handles all possible last days + assert result == "45 15 29,30,31,L * *" + + def test_visual_to_cron_invalid_frequency(self): + """Test converting with invalid frequency.""" + with pytest.raises(ScheduleConfigError, match="Unsupported frequency: invalid"): + ScheduleService.visual_to_cron("invalid", VisualConfig()) + + def test_visual_to_cron_weekly_no_weekdays(self): + """Test converting weekly with no weekdays specified.""" + visual_config = VisualConfig(time="10:00 AM") + with pytest.raises(ScheduleConfigError, match="Weekdays are required for weekly schedules"): + ScheduleService.visual_to_cron("weekly", visual_config) + + def test_visual_to_cron_hourly_no_minute(self): + """Test converting hourly with no on_minute specified.""" + visual_config = VisualConfig() # on_minute defaults to 0 + result = ScheduleService.visual_to_cron("hourly", visual_config) + assert result == "0 * * * *" # Should use default value 0 + + def test_visual_to_cron_daily_no_time(self): + """Test converting daily with no time specified.""" + visual_config = VisualConfig(time=None) + with pytest.raises(ScheduleConfigError, match="time is required for daily schedules"): + ScheduleService.visual_to_cron("daily", visual_config) + + def test_visual_to_cron_weekly_no_time(self): + """Test converting weekly with no time specified.""" + visual_config = VisualConfig(weekdays=["mon"]) + visual_config.time = None # Override default + with pytest.raises(ScheduleConfigError, match="time is required for weekly schedules"): + ScheduleService.visual_to_cron("weekly", visual_config) + + def test_visual_to_cron_monthly_no_time(self): + """Test converting monthly with no time specified.""" + visual_config = VisualConfig(monthly_days=[1]) + visual_config.time = None # Override default + with pytest.raises(ScheduleConfigError, match="time is required for monthly schedules"): + ScheduleService.visual_to_cron("monthly", visual_config) + + def test_visual_to_cron_monthly_duplicate_days(self): + """Test monthly with duplicate days should be deduplicated.""" + visual_config = VisualConfig( + time="10:00 AM", + monthly_days=[1, 15, 1, 15, 31], # Duplicates + ) + result = ScheduleService.visual_to_cron("monthly", visual_config) + assert result == "0 10 1,15,31 * *" # Should be deduplicated + + def test_visual_to_cron_monthly_unsorted_days(self): + """Test monthly with unsorted days should be sorted.""" + visual_config = VisualConfig( + time="2:30 PM", + monthly_days=[20, 5, 15, 1, 10], # Unsorted + ) + result = ScheduleService.visual_to_cron("monthly", visual_config) + assert result == "30 14 1,5,10,15,20 * *" # Should be sorted + + def test_visual_to_cron_weekly_all_weekdays(self): + """Test weekly with all weekdays.""" + visual_config = VisualConfig( + time="8:00 AM", + weekdays=["sun", "mon", "tue", "wed", "thu", "fri", "sat"], + ) + result = ScheduleService.visual_to_cron("weekly", visual_config) + assert result == "0 8 * * 0,1,2,3,4,5,6" + + def test_visual_to_cron_hourly_boundary_values(self): + """Test hourly with boundary minute values.""" + # Minimum value + visual_config = VisualConfig(on_minute=0) + result = ScheduleService.visual_to_cron("hourly", visual_config) + assert result == "0 * * * *" + + # Maximum value + visual_config = VisualConfig(on_minute=59) + result = ScheduleService.visual_to_cron("hourly", visual_config) + assert result == "59 * * * *" + + def test_visual_to_cron_daily_midnight_noon(self): + """Test daily at special times (midnight and noon).""" + # Midnight + visual_config = VisualConfig(time="12:00 AM") + result = ScheduleService.visual_to_cron("daily", visual_config) + assert result == "0 0 * * *" + + # Noon + visual_config = VisualConfig(time="12:00 PM") + result = ScheduleService.visual_to_cron("daily", visual_config) + assert result == "0 12 * * *" + + def test_visual_to_cron_monthly_mixed_with_last_and_duplicates(self): + """Test monthly with mixed days, 'last', and duplicates.""" + visual_config = VisualConfig( + time="11:45 PM", + monthly_days=[15, 1, "last", 15, 30, 1, "last"], # Mixed with duplicates + ) + result = ScheduleService.visual_to_cron("monthly", visual_config) + assert result == "45 23 1,15,30,L * *" # Deduplicated and sorted with L at end + + def test_visual_to_cron_weekly_single_day(self): + """Test weekly with single weekday.""" + visual_config = VisualConfig( + time="6:30 PM", + weekdays=["sun"], + ) + result = ScheduleService.visual_to_cron("weekly", visual_config) + assert result == "30 18 * * 0" + + def test_visual_to_cron_monthly_all_possible_days(self): + """Test monthly with all 31 days plus 'last'.""" + all_days = list(range(1, 32)) + ["last"] + visual_config = VisualConfig( + time="12:01 AM", + monthly_days=all_days, + ) + result = ScheduleService.visual_to_cron("monthly", visual_config) + expected_days = ','.join([str(i) for i in range(1, 32)]) + ',L' + assert result == f"1 0 {expected_days} * *" + + def test_visual_to_cron_monthly_no_days(self): + """Test monthly without any days specified should raise error.""" + visual_config = VisualConfig(time="10:00 AM", monthly_days=[]) + with pytest.raises(ScheduleConfigError, match="Monthly days are required for monthly schedules"): + ScheduleService.visual_to_cron("monthly", visual_config) + + def test_visual_to_cron_weekly_empty_weekdays_list(self): + """Test weekly with empty weekdays list should raise error.""" + visual_config = VisualConfig(time="10:00 AM", weekdays=[]) + with pytest.raises(ScheduleConfigError, match="Weekdays are required for weekly schedules"): + ScheduleService.visual_to_cron("weekly", visual_config) + + +class TestParseTime(unittest.TestCase): + """Test cases for time parsing function.""" + + def test_parse_time_am(self): + """Test parsing AM time.""" + hour, minute = convert_12h_to_24h("9:30 AM") + assert hour == 9 + assert minute == 30 + + def test_parse_time_pm(self): + """Test parsing PM time.""" + hour, minute = convert_12h_to_24h("2:45 PM") + assert hour == 14 + assert minute == 45 + + def test_parse_time_noon(self): + """Test parsing 12:00 PM (noon).""" + hour, minute = convert_12h_to_24h("12:00 PM") + assert hour == 12 + assert minute == 0 + + def test_parse_time_midnight(self): + """Test parsing 12:00 AM (midnight).""" + hour, minute = convert_12h_to_24h("12:00 AM") + assert hour == 0 + assert minute == 0 + + def test_parse_time_invalid_format(self): + """Test parsing invalid time format.""" + with pytest.raises(ValueError, match="Invalid time format"): + convert_12h_to_24h("25:00") + + def test_parse_time_invalid_hour(self): + """Test parsing invalid hour.""" + with pytest.raises(ValueError, match="Invalid hour: 13"): + convert_12h_to_24h("13:00 PM") + + def test_parse_time_invalid_minute(self): + """Test parsing invalid minute.""" + with pytest.raises(ValueError, match="Invalid minute: 60"): + convert_12h_to_24h("10:60 AM") + + def test_parse_time_empty_string(self): + """Test parsing empty string.""" + with pytest.raises(ValueError, match="Time string cannot be empty"): + convert_12h_to_24h("") + + def test_parse_time_invalid_period(self): + """Test parsing invalid period.""" + with pytest.raises(ValueError, match="Invalid period"): + convert_12h_to_24h("10:30 XM") + + +class TestExtractScheduleConfig(unittest.TestCase): + """Test cases for extracting schedule configuration from workflow.""" + + def test_extract_schedule_config_with_cron_mode(self): + """Test extracting schedule config in cron mode.""" + workflow = Mock(spec=Workflow) + workflow.graph_dict = { + "nodes": [ + { + "id": "schedule-node", + "data": { + "type": "trigger-schedule", + "mode": "cron", + "cron_expression": "0 10 * * *", + "timezone": "America/New_York", + }, + } + ] + } + + config = ScheduleService.extract_schedule_config(workflow) + + assert config is not None + assert config.node_id == "schedule-node" + assert config.cron_expression == "0 10 * * *" + assert config.timezone == "America/New_York" + + def test_extract_schedule_config_with_visual_mode(self): + """Test extracting schedule config in visual mode.""" + workflow = Mock(spec=Workflow) + workflow.graph_dict = { + "nodes": [ + { + "id": "schedule-node", + "data": { + "type": "trigger-schedule", + "mode": "visual", + "frequency": "daily", + "visual_config": {"time": "10:30 AM"}, + "timezone": "UTC", + }, + } + ] + } + + config = ScheduleService.extract_schedule_config(workflow) + + assert config is not None + assert config.node_id == "schedule-node" + assert config.cron_expression == "30 10 * * *" + assert config.timezone == "UTC" + + def test_extract_schedule_config_no_schedule_node(self): + """Test extracting config when no schedule node exists.""" + workflow = Mock(spec=Workflow) + workflow.graph_dict = { + "nodes": [ + { + "id": "other-node", + "data": {"type": "llm"}, + } + ] + } + + config = ScheduleService.extract_schedule_config(workflow) + assert config is None + + def test_extract_schedule_config_invalid_graph(self): + """Test extracting config with invalid graph data.""" + workflow = Mock(spec=Workflow) + workflow.graph_dict = None + + with pytest.raises(ScheduleConfigError, match="Workflow graph is empty"): + ScheduleService.extract_schedule_config(workflow) + + +class TestScheduleWithTimezone(unittest.TestCase): + """Test cases for schedule with timezone handling.""" + + def test_visual_schedule_with_timezone_integration(self): + """Test complete flow: visual config → cron → execution in different timezones. + + This test verifies that when a user in Shanghai sets a schedule for 10:30 AM, + it runs at 10:30 AM Shanghai time, not 10:30 AM UTC. + """ + # User in Shanghai wants to run a task at 10:30 AM local time + visual_config = VisualConfig( + time="10:30 AM", # This is Shanghai time + monthly_days=[1], + ) + + # Convert to cron expression + cron_expr = ScheduleService.visual_to_cron("monthly", visual_config) + assert cron_expr is not None + + assert cron_expr == "30 10 1 * *" # Direct conversion + + # Now test execution with Shanghai timezone + shanghai_tz = "Asia/Shanghai" + # Base time: 2025-01-01 00:00:00 UTC (08:00:00 Shanghai) + base_time = datetime(2025, 1, 1, 0, 0, 0, tzinfo=UTC) + + next_run = calculate_next_run_at(cron_expr, shanghai_tz, base_time) + + assert next_run is not None + + # Should run at 10:30 AM Shanghai time on Jan 1 + # 10:30 AM Shanghai = 02:30 AM UTC (Shanghai is UTC+8) + assert next_run.year == 2025 + assert next_run.month == 1 + assert next_run.day == 1 + assert next_run.hour == 2 # 02:30 UTC + assert next_run.minute == 30 + + def test_visual_schedule_different_timezones_same_local_time(self): + """Test that same visual config in different timezones runs at different UTC times. + + This verifies that a schedule set for "9:00 AM" runs at 9 AM local time + regardless of the timezone. + """ + visual_config = VisualConfig( + time="9:00 AM", + weekdays=["mon"], + ) + + cron_expr = ScheduleService.visual_to_cron("weekly", visual_config) + assert cron_expr is not None + assert cron_expr == "0 9 * * 1" + + # Base time: Sunday 2025-01-05 12:00:00 UTC + base_time = datetime(2025, 1, 5, 12, 0, 0, tzinfo=UTC) + + # Test New York (UTC-5 in January) + ny_next = calculate_next_run_at(cron_expr, "America/New_York", base_time) + assert ny_next is not None + # Monday 9 AM EST = Monday 14:00 UTC + assert ny_next.day == 6 + assert ny_next.hour == 14 # 9 AM EST = 2 PM UTC + + # Test Tokyo (UTC+9) + tokyo_next = calculate_next_run_at(cron_expr, "Asia/Tokyo", base_time) + assert tokyo_next is not None + # Monday 9 AM JST = Monday 00:00 UTC + assert tokyo_next.day == 6 + assert tokyo_next.hour == 0 # 9 AM JST = 0 AM UTC + + def test_visual_schedule_daily_across_dst_change(self): + """Test that daily schedules adjust correctly during DST changes. + + A schedule set for "10:00 AM" should always run at 10 AM local time, + even when DST changes. + """ + visual_config = VisualConfig( + time="10:00 AM", + ) + + cron_expr = ScheduleService.visual_to_cron("daily", visual_config) + assert cron_expr is not None + + assert cron_expr == "0 10 * * *" + + # Test before DST (EST - UTC-5) + winter_base = datetime(2025, 2, 1, 0, 0, 0, tzinfo=UTC) + winter_next = calculate_next_run_at(cron_expr, "America/New_York", winter_base) + assert winter_next is not None + # 10 AM EST = 15:00 UTC + assert winter_next.hour == 15 + + # Test during DST (EDT - UTC-4) + summer_base = datetime(2025, 6, 1, 0, 0, 0, tzinfo=UTC) + summer_next = calculate_next_run_at(cron_expr, "America/New_York", summer_base) + assert summer_next is not None + # 10 AM EDT = 14:00 UTC + assert summer_next.hour == 14 + + +class TestSyncScheduleFromWorkflow(unittest.TestCase): + """Test cases for syncing schedule from workflow.""" + + @patch("events.event_handlers.sync_workflow_schedule_when_app_published.db") + @patch("events.event_handlers.sync_workflow_schedule_when_app_published.ScheduleService") + @patch("events.event_handlers.sync_workflow_schedule_when_app_published.select") + def test_sync_schedule_create_new(self, mock_select, mock_service, mock_db): + """Test creating new schedule when none exists.""" + mock_session = MagicMock() + mock_db.engine = MagicMock() + mock_session.__enter__ = MagicMock(return_value=mock_session) + mock_session.__exit__ = MagicMock(return_value=None) + Session = MagicMock(return_value=mock_session) + with patch("events.event_handlers.sync_workflow_schedule_when_app_published.Session", Session): + mock_session.scalar.return_value = None # No existing plan + + # Mock extract_schedule_config to return a ScheduleConfig object + mock_config = Mock(spec=ScheduleConfig) + mock_config.node_id = "start" + mock_config.cron_expression = "30 10 * * *" + mock_config.timezone = "UTC" + mock_service.extract_schedule_config.return_value = mock_config + + mock_new_plan = Mock(spec=WorkflowSchedulePlan) + mock_service.create_schedule.return_value = mock_new_plan + + workflow = Mock(spec=Workflow) + result = sync_schedule_from_workflow("tenant-id", "app-id", workflow) + + assert result == mock_new_plan + mock_service.create_schedule.assert_called_once() + mock_session.commit.assert_called_once() + + @patch("events.event_handlers.sync_workflow_schedule_when_app_published.db") + @patch("events.event_handlers.sync_workflow_schedule_when_app_published.ScheduleService") + @patch("events.event_handlers.sync_workflow_schedule_when_app_published.select") + def test_sync_schedule_update_existing(self, mock_select, mock_service, mock_db): + """Test updating existing schedule.""" + mock_session = MagicMock() + mock_db.engine = MagicMock() + mock_session.__enter__ = MagicMock(return_value=mock_session) + mock_session.__exit__ = MagicMock(return_value=None) + Session = MagicMock(return_value=mock_session) + + with patch("events.event_handlers.sync_workflow_schedule_when_app_published.Session", Session): + mock_existing_plan = Mock(spec=WorkflowSchedulePlan) + mock_existing_plan.id = "existing-plan-id" + mock_session.scalar.return_value = mock_existing_plan + + # Mock extract_schedule_config to return a ScheduleConfig object + mock_config = Mock(spec=ScheduleConfig) + mock_config.node_id = "start" + mock_config.cron_expression = "0 12 * * *" + mock_config.timezone = "America/New_York" + mock_service.extract_schedule_config.return_value = mock_config + + mock_updated_plan = Mock(spec=WorkflowSchedulePlan) + mock_service.update_schedule.return_value = mock_updated_plan + + workflow = Mock(spec=Workflow) + result = sync_schedule_from_workflow("tenant-id", "app-id", workflow) + + assert result == mock_updated_plan + mock_service.update_schedule.assert_called_once() + # Verify the arguments passed to update_schedule + call_args = mock_service.update_schedule.call_args + assert call_args.kwargs["session"] == mock_session + assert call_args.kwargs["schedule_id"] == "existing-plan-id" + updates_obj = call_args.kwargs["updates"] + assert isinstance(updates_obj, SchedulePlanUpdate) + assert updates_obj.node_id == "start" + assert updates_obj.cron_expression == "0 12 * * *" + assert updates_obj.timezone == "America/New_York" + mock_session.commit.assert_called_once() + + @patch("events.event_handlers.sync_workflow_schedule_when_app_published.db") + @patch("events.event_handlers.sync_workflow_schedule_when_app_published.ScheduleService") + @patch("events.event_handlers.sync_workflow_schedule_when_app_published.select") + def test_sync_schedule_remove_when_no_config(self, mock_select, mock_service, mock_db): + """Test removing schedule when no schedule config in workflow.""" + mock_session = MagicMock() + mock_db.engine = MagicMock() + mock_session.__enter__ = MagicMock(return_value=mock_session) + mock_session.__exit__ = MagicMock(return_value=None) + Session = MagicMock(return_value=mock_session) + + with patch("events.event_handlers.sync_workflow_schedule_when_app_published.Session", Session): + mock_existing_plan = Mock(spec=WorkflowSchedulePlan) + mock_existing_plan.id = "existing-plan-id" + mock_session.scalar.return_value = mock_existing_plan + + mock_service.extract_schedule_config.return_value = None # No schedule config + + workflow = Mock(spec=Workflow) + result = sync_schedule_from_workflow("tenant-id", "app-id", workflow) + + assert result is None + # Now using ScheduleService.delete_schedule instead of session.delete + mock_service.delete_schedule.assert_called_once_with(session=mock_session, schedule_id="existing-plan-id") + mock_session.commit.assert_called_once() + + +if __name__ == "__main__": + unittest.main() diff --git a/api/uv.lock b/api/uv.lock index 144174600dc1b9..c7b9fce0391191 100644 --- a/api/uv.lock +++ b/api/uv.lock @@ -1161,6 +1161,19 @@ version = "1.7" source = { registry = "https://pypi.org/simple" } sdist = { url = "https://files.pythonhosted.org/packages/6b/b0/e595ce2a2527e169c3bcd6c33d2473c1918e0b7f6826a043ca1245dd4e5b/crcmod-1.7.tar.gz", hash = "sha256:dc7051a0db5f2bd48665a990d3ec1cc305a466a77358ca4492826f41f283601e", size = 89670 } +[[package]] +name = "croniter" +version = "6.0.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "python-dateutil" }, + { name = "pytz" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/ad/2f/44d1ae153a0e27be56be43465e5cb39b9650c781e001e7864389deb25090/croniter-6.0.0.tar.gz", hash = "sha256:37c504b313956114a983ece2c2b07790b1f1094fe9d81cc94739214748255577", size = 64481, upload-time = "2024-12-17T17:17:47.32Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/07/4b/290b4c3efd6417a8b0c284896de19b1d5855e6dbdb97d2a35e68fa42de85/croniter-6.0.0-py2.py3-none-any.whl", hash = "sha256:2f878c3856f17896979b2a4379ba1f09c83e374931ea15cc835c5dd2eee9b368", size = 25468, upload-time = "2024-12-17T17:17:45.359Z" }, +] + [[package]] name = "cryptography" version = "45.0.6" @@ -1271,6 +1284,7 @@ dependencies = [ { name = "cachetools" }, { name = "celery" }, { name = "chardet" }, + { name = "croniter" }, { name = "flask" }, { name = "flask-compress" }, { name = "flask-cors" }, @@ -1460,6 +1474,7 @@ requires-dist = [ { name = "cachetools", specifier = "~=5.3.0" }, { name = "celery", specifier = "~=5.5.2" }, { name = "chardet", specifier = "~=5.1.0" }, + { name = "croniter", specifier = ">=6.0.0" }, { name = "flask", specifier = "~=3.1.2" }, { name = "flask-compress", specifier = "~=1.17" }, { name = "flask-cors", specifier = "~=6.0.0" }, diff --git a/dev/start-beat b/dev/start-beat new file mode 100755 index 00000000000000..e417874b258443 --- /dev/null +++ b/dev/start-beat @@ -0,0 +1,60 @@ +#!/bin/bash + +set -x + +# Help function +show_help() { + echo "Usage: $0 [OPTIONS]" + echo "" + echo "Options:" + echo " --loglevel LEVEL Log level (default: INFO)" + echo " --scheduler SCHEDULER Scheduler class (default: celery.beat:PersistentScheduler)" + echo " -h, --help Show this help message" + echo "" + echo "Examples:" + echo " $0" + echo " $0 --loglevel DEBUG" + echo " $0 --scheduler django_celery_beat.schedulers:DatabaseScheduler" + echo "" + echo "Description:" + echo " Starts Celery Beat scheduler for periodic task execution." + echo " Beat sends scheduled tasks to worker queues at specified intervals." +} + +# Parse command line arguments +LOGLEVEL="INFO" +SCHEDULER="celery.beat:PersistentScheduler" + +while [[ $# -gt 0 ]]; do + case $1 in + --loglevel) + LOGLEVEL="$2" + shift 2 + ;; + --scheduler) + SCHEDULER="$2" + shift 2 + ;; + -h|--help) + show_help + exit 0 + ;; + *) + echo "Unknown option: $1" + show_help + exit 1 + ;; + esac +done + +SCRIPT_DIR="$(dirname "$(realpath "$0")")" +cd "$SCRIPT_DIR/.." + +echo "Starting Celery Beat with:" +echo " Log Level: ${LOGLEVEL}" +echo " Scheduler: ${SCHEDULER}" + +uv --directory api run \ + celery -A app.celery beat \ + --loglevel ${LOGLEVEL} \ + --scheduler ${SCHEDULER} \ No newline at end of file diff --git a/dev/start-worker b/dev/start-worker index 5b1b2cbef21c4d..37dd62ea0b4ba4 100755 --- a/dev/start-worker +++ b/dev/start-worker @@ -24,6 +24,8 @@ show_help() { echo " workflow_professional - Professional tier workflows (cloud edition)" echo " workflow_team - Team tier workflows (cloud edition)" echo " workflow_sandbox - Sandbox tier workflows (cloud edition)" + echo " schedule_poller - Schedule polling tasks" + echo " schedule_executor - Schedule execution tasks" echo " generation - Content generation tasks" echo " mail - Email notifications" echo " ops_trace - Operations tracing" @@ -79,10 +81,10 @@ if [[ -z "${QUEUES}" ]]; then # Configure queues based on edition if [[ "${EDITION}" == "CLOUD" ]]; then # Cloud edition: separate queues for dataset and trigger tasks - QUEUES="dataset,generation,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow_professional,workflow_team,workflow_sandbox" + QUEUES="dataset,generation,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow_professional,workflow_team,workflow_sandbox,schedule_poller,schedule_executor" else # Community edition (SELF_HOSTED): dataset and workflow have separate queues - QUEUES="dataset,generation,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow" + QUEUES="dataset,generation,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor" fi echo "No queues specified, using edition-based defaults: ${QUEUES}" diff --git a/docker/.env.example b/docker/.env.example index 1144339fa13fe5..cb1cac78416972 100644 --- a/docker/.env.example +++ b/docker/.env.example @@ -1272,3 +1272,7 @@ ENABLE_CLEAN_MESSAGES=false ENABLE_MAIL_CLEAN_DOCUMENT_NOTIFY_TASK=false ENABLE_DATASETS_QUEUE_MONITOR=false ENABLE_CHECK_UPGRADABLE_PLUGIN_TASK=true +ENABLE_WORKFLOW_SCHEDULE_POLLER_TASK=true +WORKFLOW_SCHEDULE_POLLER_INTERVAL=1 +WORKFLOW_SCHEDULE_POLLER_BATCH_SIZE=100 +WORKFLOW_SCHEDULE_MAX_DISPATCH_PER_TICK=0 diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index 7d5132794e3a14..b2529fdbef811c 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -579,6 +579,10 @@ x-shared-env: &shared-api-worker-env ENABLE_MAIL_CLEAN_DOCUMENT_NOTIFY_TASK: ${ENABLE_MAIL_CLEAN_DOCUMENT_NOTIFY_TASK:-false} ENABLE_DATASETS_QUEUE_MONITOR: ${ENABLE_DATASETS_QUEUE_MONITOR:-false} ENABLE_CHECK_UPGRADABLE_PLUGIN_TASK: ${ENABLE_CHECK_UPGRADABLE_PLUGIN_TASK:-true} + ENABLE_WORKFLOW_SCHEDULE_POLLER_TASK: ${ENABLE_WORKFLOW_SCHEDULE_POLLER_TASK:-true} + WORKFLOW_SCHEDULE_POLLER_INTERVAL: ${WORKFLOW_SCHEDULE_POLLER_INTERVAL:-1} + WORKFLOW_SCHEDULE_POLLER_BATCH_SIZE: ${WORKFLOW_SCHEDULE_POLLER_BATCH_SIZE:-100} + WORKFLOW_SCHEDULE_MAX_DISPATCH_PER_TICK: ${WORKFLOW_SCHEDULE_MAX_DISPATCH_PER_TICK:-0} services: # API service