Skip to content

Conversation

ACAne0320
Copy link
Contributor

@ACAne0320 ACAne0320 commented Aug 24, 2025

Important

  1. Make sure you have read our contribution guidelines
  2. Ensure there is an associated issue and you have been assigned to it
  3. Use the correct syntax to link this PR: Fixes #<issue number>.

Summary

part of #23981
close #24021

This PR implements workflow schedule trigger support for Dify, enabling users to execute workflows automatically based on cron expressions with timezone support.

Features Implemented

1. Schedule Trigger Node

  • New trigger node type trigger-schedule for workflow configuration
  • Support for standard cron expressions (e.g., 0 9 * * 1-5 for weekdays at 9 AM)
  • Passes current time as input to triggered workflows

2. Custom Celery Beat Scheduler

  • Database-driven dynamic scheduler replacing default Beat scheduler
  • High availability support using PostgreSQL's SELECT FOR UPDATE SKIP LOCKED
  • Configurable tick intervals and batch processing

3. Automatic Schedule Management

  • Schedules plan automatically created/updated when publishing workflows
  • Schedules plan delete when workflow has no schedule trigger node
  • One-to-one mapping between app and schedule plan

4. Performance Optimizations

  • Batch database updates to reduce round trips

How to Test

1. Migrate database

 uv run flask db upgrade

2. Setup Schedule Trigger in Workflow:

  • Create a new workflow
  • Add a "Schedule Trigger" node as the start node
  • Configure cron expression
  • Add workflow logic after the trigger
  • Publish the workflow

3. Start the Scheduler Service:

# Start celery worker
dev/start-worker

# Start celery beat
dev/start-beat

3. Verify Execution:

  • Check workflow run history in the UI

TODO

Schedule Trigger

  • Use UUIDv7 as PK for workflow_schedule_plans table
  • Use beat_schedule instead of Custom Scheduler
  • Adjust workflow_schedule_plans table structure
  • Stop dispatching tasks when the daily limit is reached
  • Add a check when exporting DSL files
  • Add a failure log when the daily limit is reached

Test

  • Unit Tests
  • Integration Tests

Checklist

  • This change requires a documentation update, included: Dify Document
  • I understand that this PR may be closed in case there was no previous discussion or issues. (This doesn't apply to typos!)
  • I've added a test for each change that was introduced, and I tried as much as possible to make a single atomic change.
  • I've updated the documentation accordingly.
  • I ran dev/reformat(backend) and cd web && npx lint-staged(frontend) to appease the lint gods

@Copilot Copilot AI review requested due to automatic review settings August 24, 2025 23:20
@dosubot dosubot bot added size:XXL This PR changes 1000+ lines, ignoring generated files. 💪 enhancement New feature or request 📚 documentation Improvements or additions to documentation labels Aug 24, 2025
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements workflow schedule trigger support for Dify, enabling automatic workflow execution based on cron expressions with timezone support. The implementation includes a new trigger node type, custom Celery Beat scheduler, and automatic schedule management.

  • Database-driven dynamic scheduler using PostgreSQL for real-time schedule updates and high availability
  • New schedule trigger node supporting both visual and cron-based scheduling configurations
  • Automatic schedule lifecycle management tied to workflow publishing

Reviewed Changes

Copilot reviewed 18 out of 19 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
docker/docker-compose.yaml Adds workflow_scheduler service configuration
dev/start-workflow-scheduler Development script for starting the workflow scheduler
dev/start-worker Updates worker queues to include schedule queue
api/tasks/workflow_schedule_tasks.py Celery task for executing scheduled workflow triggers
api/services/workflow_service.py Integrates schedule sync during workflow publishing
api/services/workflow/schedule_sync.py Service for syncing schedule configuration from workflow graph
api/services/workflow/schedule_manager.py Core schedule management service with CRUD operations
api/schedule/schedule_dispatch.py Custom Celery Beat scheduler with database-driven scheduling
api/models/workflow.py WorkflowSchedulePlan model definition
api/migrations/versions/2025_08_24_1313-1e06b2654c6c_add_workflow_schedule_plan.py Database migration for schedule plan table
api/core/workflow/nodes/trigger_schedule/ Trigger schedule node implementation
api/core/workflow/nodes/node_mapping.py Registers new trigger schedule node type
api/core/workflow/nodes/enums.py Adds TRIGGER_SCHEDULE node type enum

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

@ACAne0320 ACAne0320 marked this pull request as draft August 27, 2025 16:25
@ACAne0320 ACAne0320 marked this pull request as ready for review August 29, 2025 14:53
@ACAne0320 ACAne0320 requested a review from Yeuoly August 29, 2025 14:53
@dosubot dosubot bot added the 🌊 feat:workflow Workflow related stuff. label Aug 29, 2025
echo "Examples:"
echo " $0"
echo " $0 --loglevel DEBUG"
echo " $0 --scheduler django_celery_beat.schedulers:DatabaseScheduler"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is django_celery

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This script doesn't use django_celery.
This example follows the official Celery documentation's instructions for custom scheduler classes:
https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html#using-custom-scheduler-classes

@dosubot dosubot bot added size:L This PR changes 100-499 lines, ignoring generated files. and removed size:XXL This PR changes 1000+ lines, ignoring generated files. labels Sep 2, 2025
@ACAne0320 ACAne0320 force-pushed the feat/schedule-trigger branch from b1ce7cc to 5ccca1e Compare September 2, 2025 04:13
@dosubot dosubot bot added size:XXL This PR changes 1000+ lines, ignoring generated files. and removed size:L This PR changes 100-499 lines, ignoring generated files. labels Sep 2, 2025
@ACAne0320 ACAne0320 requested a review from Yeuoly September 2, 2025 17:20
)

if next_run_at:
schedule.next_run_at = next_run_at

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we take this operation after the execution of the scheduled async task?

Consider the following scenario for reference:

  1. The scheduled poller runs per 1min.
  2. The scheduled trigger of a certain workflow is set to every 5min.
  3. Due to some non-declarative reasons, the workflow task takes 15 minutes to complete within a certain period.

There will be two different outcomes depending on where this code is placed:

  1. If we reset next_run_at before the task execution, the task will run at least 3 times within 15 minutes, with 1 run completing successfully and the other 2 remaining in a running state.
  2. If we reset next_run_at after the task execution, the task will only run once within 15 minutes.

In combination with the business scenario, I recommend adopting the second outcome. By the way, the above case isn't the worst-case scenario. The workflow task might also hang for an extended period. If this happens, the more times the task runs, the higher load of the service will be, which could eventually lead to the whole service unhealthy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for raising this question.

Under the current architecture, regardless of where next_run_at is updated, the task will execute 3 times within 15 minutes.
The reason for this is that the execution of trigger_workflow_async() is non-blocking.

Our scheduling system is intentionally designed to be fully decoupled:

  1. Scheduler: Responsible for triggering tasks based on cron expressions
  2. Executor: Handles workflow execution asyn via Unified Trigger Entry
  3. Non-blocking: trigger_workflow_async() immediately returns after successfully entering the queue without waiting for completion

While your concern about multiple executions is valid from a scheduling perspective, our architecture has a critical safety mechanism that prevents the system overload you're worried about: the workflow execution queue.

When trigger_workflow_async() is called, it doesn't immediately execute the workflow. Instead:

  1. Creates a trigger log entry with status QUEUED
  2. Dispatches to tier-specific queues (professional/team/sandbox)
  3. Returns immediately without waiting

The actual workflow execution happens in dedicated worker pools with concurrency limits per workspace.

like this:
Schedule Trigger → Unified Trigger Entry → Workflow Queue → [Concurrency Control] → Actual Execution

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds interesting. With this solution, whether tasks of a specified workflow can run in parallel (with multiple instances over a period) is controlled by the workflow itself—specifically through the "Parallel Setting" in the trigger, which wasn't mentioned in this feature scenario.

This is acceptable for now. However, looking ahead, I strongly recommend adding a Parallel Setting to the trigger node with three modes:

  1. Parallel – this is the current solution
  2. Serial Waiting – only one task instance exists per workflow; new tasks wait if another is already running
  3. Serial Rejection – new tasks get rejected if one is already running, ensuring only one instance at a time

Following the design approach you described, if we considered this solution, this check could be implemented before trigger_workflow_async within run_schedule_trigger.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm glad you were able to provide this solution.
Maybe we need to discuss with the Dify team whether we should add this feature to the schedule trigger.
I'll get back to you if there's any result.

current_utc = datetime.now(UTC)
schedule_tz = ZoneInfo(schedule.timezone) if schedule.timezone else UTC
current_in_tz = current_utc.astimezone(schedule_tz)
inputs = {"current_time": current_in_tz.isoformat()}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally, I have a question regarding current_time—the only parameter that the trigger node passes to the workflow. This seems more like a discussion at the requirement level rather than a technical communication. If this is not the appropriate place to discuss this, please point me to the right channel.

I’m curious about how we handle the input variables defined in the original Start node. Given that the trigger does not add these user inputs to the variable_pool, should the subsequent nodes that depend on these inputs for execution throw an error, or should they not depend on these input variables in the first place?

Furthermore, if the bussiness scenario must set default values for the variables defined in the "Start" node during scheduled tasks, how should I implement this under the current requirement design? Are there any practices or solutions I can refer to?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Schedule features an entry-type node, rather than a time-based call to Start node, additionally, each entry-type branch can only have a single entry node, these kind of nodes we now called it start kind, which means the original Start node was now renamed to User Input node.

The variables you defined in User Input will never occur in the same entry branch, so NVMD about it.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your explaination, and sorry for the late reply.

This is acceptable from the perspective of decoupled design. However, what if I want to reuse existing workflows that depend on the variables from User Input node? Specifically, how can I enable the auto-schedule trigger feature for those existing workflows created prior to this version? What is the recommendation solution about this scenario?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you might be able to achieve this by adding some Environment Variables.
image

@ACAne0320 ACAne0320 requested a review from Yeuoly September 8, 2025 19:17
Copy link
Contributor

@Yeuoly Yeuoly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@dosubot dosubot bot added the lgtm This PR has been approved by a maintainer label Sep 10, 2025
@Yeuoly Yeuoly merged commit 4a743e6 into langgenius:feat/trigger Sep 10, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
📚 documentation Improvements or additions to documentation 💪 enhancement New feature or request 🌊 feat:workflow Workflow related stuff. lgtm This PR has been approved by a maintainer size:XXL This PR changes 1000+ lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants