-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdemo_job_internal_endpoint.py
74 lines (56 loc) · 2.2 KB
/
demo_job_internal_endpoint.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
"""
ProActive Job Submission with Internal Endpoint Demo
This script demonstrates the process of job submission and monitoring using the ProActive Python SDK,
with a focus on managing internal endpoints within a job.
It covers:
- Creating and submitting a ProActive job with a Python task
- Adding an internal endpoint URL (Google) during task execution
- Monitoring the job status until completion
- Removing the internal endpoint before task completion
- Retrieving the job's output
The script showcases how to dynamically manage internal endpoints during job execution using the schedulerapi.
"""
import time
from proactive import getProActiveGateway
gateway = getProActiveGateway()
# Create and configure a ProActive job and task
print("Creating a proactive job...")
job = gateway.createJob("demo_job_endpoint")
print("Creating a proactive task...")
task = gateway.createPythonTask("demo_job_endpoint_task")
task.setTaskImplementation('''
import time
schedulerapi.connect()
# Add an external endpoint URL to the job inside the task
schedulerapi.addExternalEndpointUrl(variables.get("PA_JOB_ID"), "google", "https://www.google.com/", "https://cdn-icons-png.flaticon.com/128/2504/2504914.png")
print("Task is running for 15s...")
time.sleep(15)
# Remove an endpoint
schedulerapi.removeExternalEndpointUrl(variables.get("PA_JOB_ID"), "google")
print("Execution completed")
''')
print("Adding proactive task to the proactive job...")
job.addTask(task)
# Submit the job to the ProActive scheduler
print("Submitting the job to the proactive scheduler...")
job_id = gateway.submitJob(job)
print(f"Job submitted with ID: {job_id}")
# Monitor job status
is_finished = False
while not is_finished:
# Get the current state of the job
job_status = gateway.getJobStatus(job_id)
# Print the current job status
print(f"Current job status: {job_status}")
# Check if the job has finished
if job_status.upper() in ["FINISHED", "CANCELED", "FAILED"]:
is_finished = True
else:
# Wait for a few seconds before checking again
time.sleep(.5)
# Retrieve and print job results
print("Job output:")
print(gateway.getJobOutput(job_id))
# Cleanup
gateway.close()
print("Disconnected and finished.")