-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdemo_3controls.py
199 lines (176 loc) · 8.76 KB
/
demo_3controls.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
"""
Comprehensive Control Flow Demonstration in ProActive Scheduler
This script presents an advanced demonstration of utilizing various control flows within a single ProActive Scheduler job. The job, named "demo_3controls_job," integrates conditional branching, task replication, and loop control to execute a complex workflow. The process begins with a start task, proceeds through conditional branching (IF/ELSE), continues to task replication, and concludes with a looping mechanism that potentially repeats the entire sequence.
Workflow Overview:
- Initiation of the ProActive Scheduler gateway and job creation with the title "demo_3controls_job."
- Execution of a start task to initiate the workflow.
- Implementation of a conditional flow with a condition task that dictates the subsequent path (IF or ELSE), followed by a continuation task.
- Incorporation of a split task that triggers task replication, creating multiple instances of a process task based on predefined criteria.
- Merging of replicated tasks' outcomes and transitioning into a merge task.
- Finalization with an end task that includes a loop back to the start under certain conditions, allowing for the repetition of the entire workflow.
Key Features:
- Demonstrates the application of ProactiveFlowBlock for defining start and end points within the job.
- Utilizes the ProactiveScriptLanguage for scripting flow controls.
- Integrates a branch flow script to direct workflow based on conditions.
- Employs replication through a replicate flow script, enhancing parallel task execution.
- Establishes a loop control to potentially repeat the workflow, demonstrating dynamic job execution based on runtime data.
This script exemplifies the ProActive Scheduler's capabilities to orchestrate complex job flows, combining multiple control mechanisms to achieve sophisticated task coordination and execution strategies.
+-------------+ +----------------+
| | | |
| Start Task |-------->| Condition Task |---------------------+
| | | | |
+-------------+ +----------------+ |
^ | |
| v v
| +------+-------+ +---------------+
| | IF | ELSE | | Continuation |
| +------+-------+ | Task |
| +---------------+
| |
| v
| +-------------+ +----------------+
| | | | |
| | Split Task |-------->| Process Task |
| | | | (x3) |
| +-------------+ +----------------+
| |
| v
| +-------------+
| | Merge |
| | Task |
| +-------------+
| |
| v
| +-------------+ +-------------+
| | | | |
+------------------------------------| Loop |<----------------------| End Task |
| Control | | |
+-------------+ +-------------+
"""
from proactive import getProActiveGateway, ProactiveFlowBlock, ProactiveScriptLanguage
# Initialize the ProActive gateway
gateway = getProActiveGateway()
# Create a new ProActive job
print("Creating a proactive job...")
job = gateway.createJob("demo_3controls_job")
# Create the start task
print("Creating the start task...")
task_start = gateway.createPythonTask("task_start")
task_start.setFlowBlock(ProactiveFlowBlock().start())
task_start.setTaskImplementation("""
print("Hello from " + variables.get("PA_TASK_NAME"))
""")
# Create the condition task
print("Creating the condition task...")
task_condition = gateway.createPythonTask("task_condition")
task_condition.addDependency(task_start)
task_condition.setTaskImplementation("""
print("Hello from " + variables.get("PA_TASK_NAME"))
""")
# Create the IF task
print("Creating the IF task...")
task_if = gateway.createPythonTask("task_IF")
task_if.setTaskImplementation("""
print("Hello from " + variables.get("PA_TASK_NAME"))
""")
# Create the ELSE task
print("Creating the ELSE task...")
task_else = gateway.createPythonTask("task_ELSE")
task_else.setTaskImplementation("""
print("Hello from " + variables.get("PA_TASK_NAME"))
""")
# Create the continuation task
# The continuation task will always be executed regardless of the branch criteria
print("Creating the continuation task...")
task_continuation = gateway.createPythonTask("task_continuation")
task_continuation.setTaskImplementation("""
print("Hello from " + variables.get("PA_TASK_NAME"))
""")
# Define the branch flow script
branch_script = """
# Always select the "IF" branch
if True:
branch = "if"
else:
branch = "else"
"""
flow_script = gateway.createBranchFlowScript(
branch_script,
task_if.getTaskName(),
task_else.getTaskName(),
task_continuation.getTaskName(),
script_language=ProactiveScriptLanguage().python()
)
task_condition.setFlowScript(flow_script)
# Create the split task
print("Creating the split task...")
task_split = gateway.createPythonTask("task_split")
task_split.addDependency(task_continuation)
task_split.setFlowBlock(ProactiveFlowBlock().start())
task_split.setTaskImplementation("""
print("Hello from " + variables.get("PA_TASK_NAME"))
""")
# Create the replicate criteria script
replicate_script = """
runs = 3 # Trigger 3 parallel instances of the process task
"""
flow_script = gateway.createReplicateFlowScript(replicate_script, script_language=ProactiveScriptLanguage().python())
# Associate the replicate flow script to the split task
task_split.setFlowScript(flow_script)
# Create the process task to be replicated
print("Creating the process task...")
task_process = gateway.createPythonTask("task_process")
task_process.addDependency(task_split)
task_process.setTaskImplementation("""
print("Hello from " + variables.get("PA_TASK_NAME"))
""")
# Create the merge task
print("Creating the merge task...")
task_merge = gateway.createPythonTask("task_merge")
task_merge.addDependency(task_process)
task_merge.setFlowBlock(ProactiveFlowBlock().end())
task_merge.setTaskImplementation("""
print("Hello from " + variables.get("PA_TASK_NAME"))
""")
# Create the end task
print("Creating the end task...")
task_end = gateway.createPythonTask("task_end")
task_end.addDependency(task_merge)
task_end.setFlowBlock(ProactiveFlowBlock().end())
task_end.setTaskImplementation("""
print("Hello from " + variables.get("PA_TASK_NAME"))
""")
# Define the loop criteria script
loop_script = """
i = int(variables.get('PA_TASK_ITERATION'))
if i < 1:
loop = True
else:
loop = False
"""
# Create the loop flow between the start and end tasks
flow_script = gateway.createLoopFlowScript(loop_script, task_start.getTaskName(), script_language=ProactiveScriptLanguage().python())
# Associate the loop flow script to the end task
task_end.setFlowScript(flow_script)
# Add the Python tasks to the job
print("Adding proactive tasks to the proactive job...")
job.addTask(task_start)
job.addTask(task_condition)
job.addTask(task_if)
job.addTask(task_else)
job.addTask(task_continuation)
job.addTask(task_split)
job.addTask(task_process)
job.addTask(task_merge)
job.addTask(task_end)
# Job submission
print("Submitting the job to the proactive scheduler...")
job_id = gateway.submitJob(job)
print("job_id: " + str(job_id))
# Retrieve job output
print("Getting job output...")
job_output = gateway.getJobOutput(job_id)
print(job_output)
# Cleanup
gateway.close()
print("Disconnected and finished.")