-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdemo_ai_workflow.py
83 lines (67 loc) · 4.26 KB
/
demo_ai_workflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
"""
This script demonstrates setting up and executing a machine learning workflow for the Iris dataset using the ProActive Scheduler. The workflow consists of several tasks, each representing a step in the machine learning pipeline, including data loading, preprocessing, model training, prediction, and result visualization.
Steps:
1. Establishes a connection to the ProActive Scheduler and creates a new job named "demo_ai_workflow".
2. Retrieves a predefined bucket "ai-machine-learning" containing task templates for machine learning operations.
3. Sequentially creates and configures tasks for loading the Iris dataset, splitting data, logistic regression model preparation, model training, model downloading, making predictions with the trained model, and previewing results.
4. Each task is added to the job, with dependencies set up to ensure the correct execution order.
5. The job is submitted to the ProActive Scheduler for execution, and the script awaits and prints the job's output upon completion.
This script emphasizes the use of the ProActive Scheduler for orchestrating complex workflows, particularly in the context of machine learning, showcasing its capability to manage dependencies and execute tasks in an ordered and efficient manner.
+-----------------------+ +---------------+ +---------------------+ +------------------------+
| | | | | | | |
| Load Iris Dataset |------>| Split Data |------>| Train Model |<------| Logistic Regression |
| | | | | | | |
+-----------------------+ +---------------+ +---------------------+ +------------------------+
| |
+---------------------------+--------------------------+
| |
v v
+---------------------+ +---------------------+ +-----------------------+
| | | | | |
| Download Model | | Predict Model |------>| Preview Results |
| | | | | |
+---------------------+ +---------------------+ +-----------------------+
"""
from proactive import getProActiveGateway
gateway = getProActiveGateway()
print("Creating a proactive job...")
job = gateway.createJob("demo_ai_workflow")
print("Getting the ai-machine-learning bucket")
bucket = gateway.getBucket("ai-machine-learning")
print("Creating the Load_Iris_Dataset task...")
load_iris_dataset_task = bucket.create_Load_Iris_Dataset_task()
job.addTask(load_iris_dataset_task)
print("Creating the Split_Data task...")
split_data_task = bucket.create_Split_Data_task()
split_data_task.addDependency(load_iris_dataset_task)
job.addTask(split_data_task)
print("Creating the Logistic_Regression task...")
logistic_regression_task = bucket.create_Logistic_Regression_task()
job.addTask(logistic_regression_task)
print("Creating the Train_Model task...")
train_model_task = bucket.create_Train_Model_task()
train_model_task.addDependency(split_data_task)
train_model_task.addDependency(logistic_regression_task)
job.addTask(train_model_task)
print("Creating the Download_Model task...")
download_model_task = bucket.create_Download_Model_task()
download_model_task.addDependency(train_model_task)
job.addTask(download_model_task)
print("Creating the Predict_Model task...")
predict_model_task = bucket.create_Predict_Model_task()
predict_model_task.addDependency(split_data_task)
predict_model_task.addDependency(train_model_task)
job.addTask(predict_model_task)
print("Creating the Preview_Results task...")
preview_results_task = bucket.create_Preview_Results_task()
preview_results_task.addDependency(predict_model_task)
job.addTask(preview_results_task)
print("Submitting the job to the proactive scheduler...")
job_id = gateway.submitJob(job)
print("job_id: " + str(job_id))
print("Getting job output...")
job_output = gateway.getJobOutput(job_id)
print(job_output)
print("Disconnecting")
gateway.close()
print("Disconnected and finished.")