Skip to content

Commit

Permalink
[CoreEngine] update the workflow examples.
Browse files Browse the repository at this point in the history
  • Loading branch information
fedml-alex committed Feb 4, 2024
1 parent c66dcf8 commit 5ce74ea
Show file tree
Hide file tree
Showing 18 changed files with 131 additions and 40 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -275,3 +275,5 @@ python/tests/test_model_cli/llm_deploy/src/constants/prompt_template.py
python/examples/launch/hello_world/bootstrap.bat
python/examples/launch/hello_world/fedml_job_entry_pack.bat
**mpi_host_file
/python/fedml/workflow/driver_example/customized_job_example/train_job/bootstrap.bat
/python/fedml/workflow/driver_example/customized_job_example/train_job/fedml_job_entry_pack.bat
2 changes: 1 addition & 1 deletion python/fedml/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
_global_training_type = None
_global_comm_backend = None

__version__ = "0.8.18.dev14"
__version__ = "0.8.19.dev1"


# This is the deployment environment used for different roles (RD/PM/BD/Public Developers). Potential VALUE: local, dev, test, release
Expand Down
10 changes: 0 additions & 10 deletions python/fedml/computing/scheduler/comm_utils/job_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -760,15 +760,5 @@ def monitor_endpoint_logs(self):
with open(log_file_path, "a") as f:
f.write(endpoint_logs)

if is_job_container_running and not MLOpsRuntimeLogDaemon.get_instance(fedml_args). \
is_log_processor_running(job.job_id, job.edge_id):
setattr(fedml_args, "log_file_dir", os.path.dirname(log_file_path))
MLOpsRuntimeLogDaemon.get_instance(fedml_args).log_file_dir = os.path.dirname(log_file_path)
MLOpsRuntimeLogDaemon.get_instance(fedml_args).start_log_processor(
job.job_id, job.edge_id,
log_source=device_client_constants.ClientConstants.FEDML_LOG_SOURCE_TYPE_MODEL_END_POINT,
log_file_prefix=JobMonitor.ENDPOINT_CONTAINER_LOG_PREFIX
)

except Exception as e:
print(f"Exception when syncing endpoint log to MLOps {traceback.format_exc()}.")
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,12 @@ def run(self, process_event, completed_event):
logging.info(f"[endpoint/device][{run_id}/{self.edge_id}] Release gpu resource when the worker deployment stopped.")
self.release_gpu_ids(run_id)
self.reset_devices_status(self.edge_id, ClientConstants.MSG_MLOPS_CLIENT_STATUS_KILLED)
MLOpsRuntimeLogDaemon.get_instance(self.args).stop_log_processor(run_id, self.edge_id)
except RunnerCompletedError:
logging.info(f"[endpoint/device][{run_id}/{self.edge_id}] Release gpu resource when the worker deployment completed.")
self.release_gpu_ids(run_id)
logging.info("Runner completed.")
MLOpsRuntimeLogDaemon.get_instance(self.args).stop_log_processor(run_id, self.edge_id)
except Exception as e:
logging.error("Runner exits with exceptions. {}".format(traceback.format_exc()))
self.cleanup_run_when_starting_failed()
Expand All @@ -284,7 +286,6 @@ def run(self, process_event, completed_event):
sys.exit(1)
finally:
logging.info("Release resources.")
MLOpsRuntimeLogDaemon.get_instance(self.args).stop_log_processor(run_id, self.edge_id)
if self.mlops_metrics is not None:
self.mlops_metrics.stop_sys_perf()
time.sleep(3)
Expand Down Expand Up @@ -932,7 +933,8 @@ def callback_runner_id_status(self, topic, payload):
status_process.join(15)

# Stop log processor for current run
MLOpsRuntimeLogDaemon.get_instance(self.args).stop_log_processor(run_id, edge_id)
if status != ClientConstants.MSG_MLOPS_CLIENT_STATUS_FINISHED:
MLOpsRuntimeLogDaemon.get_instance(self.args).stop_log_processor(run_id, edge_id)

def callback_report_current_status(self, topic, payload):
self.send_agent_active_msg()
Expand Down
7 changes: 6 additions & 1 deletion python/fedml/core/mlops/mlops_device_perfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,15 @@ def report_gpu_device_info(edge_id, mqtt_mgr=None):
gpu_cores_available, sent_bytes, recv_bytes, gpu_available_ids = sys_utils.get_sys_realtime_stats(edge_id)

topic_name = "ml_client/mlops/gpu_device_info"
deploy_worker_id_list = list()
try:
deploy_worker_id_list = json.loads(os.environ.get("FEDML_DEPLOY_WORKER_IDS", "[]")),
except Exception as e:
pass
device_info_json = {
"edgeId": edge_id,
"deployMasterId": os.environ.get("FEDML_DEPLOY_MASTER_ID", ""),
"deployWorkerIds": os.environ.get("FEDML_DEPLOY_WORKER_IDS", "[]"),
"deployWorkerIds": deploy_worker_id_list,
"memoryTotal": round(total_mem * MLOpsUtils.BYTES_TO_GB, 2),
"memoryAvailable": round(free_mem * MLOpsUtils.BYTES_TO_GB, 2),
"diskSpaceTotal": round(total_disk_size * MLOpsUtils.BYTES_TO_GB, 2),
Expand Down
5 changes: 2 additions & 3 deletions python/fedml/workflow/customized_jobs/deploy_job.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from fedml.workflow.jobs import JobStatus

from fedml.workflow.customized_jobs.customized_base_job import CustomizedBaseJob
import fedml


class DeployJob(CustomizedBaseJob):
Expand All @@ -14,7 +13,7 @@ def run(self):
super().run()

def status(self):
super().status()
return super().status()

def kill(self):
super().kill()
4 changes: 1 addition & 3 deletions python/fedml/workflow/customized_jobs/train_job.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@

from fedml.workflow.jobs import JobStatus
from fedml.workflow.customized_jobs.customized_base_job import CustomizedBaseJob
import fedml


class TrainJob(CustomizedBaseJob):
Expand All @@ -15,7 +13,7 @@ def run(self):
super().run()

def status(self):
super().status()
return super().status()

def kill(self):
super().kill()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@

## Make your own workflow with multiple jobs
# Define the job yaml
```
working_directory = os.path.dirname(os.path.abspath(__file__))
deploy_image_job_yaml = os.path.join(working_directory, "deploy_image_job.yaml")
deploy_3d_job_yaml = os.path.join(working_directory, "deploy_3d_job.yaml")
train_job_yaml = os.path.join(working_directory, "train_job.yaml")
```

# If needed, we may load the job yaml and change some config items.
```
deploy_image_job_yaml_obj = DeployImageJob.load_yaml_config(deploy_image_job_yaml)
deploy_3d_job_yaml_obj = DeployImageJob.load_yaml_config(deploy_3d_job_yaml)
train_job_yaml_obj = DeployImageJob.load_yaml_config(train_job_yaml)
# deploy_image_job_yaml_obj["computing"]["resource_type"] = "A100-80GB-SXM"
# deploy_image_job_yaml_obj["computing"]["device_type"] = "GPU"
# DeployImageJob.generate_yaml_doc(deploy_image_job_yaml_obj, deploy_image_job_yaml)
```

# Generate the job object
```
deploy_image_job = DeployImageJob(name="deploy_image_job", job_yaml_absolute_path=deploy_image_job_yaml)
deploy_3d_job = Deploy3DJob(name="deploy_3d_job", job_yaml_absolute_path=deploy_3d_job_yaml)
train_job = TrainJob(name="train_job", job_yaml_absolute_path=train_job_yaml)
```

# Define the workflow
```
workflow = Workflow(name="workflow_with_multi_jobs", loop=False)
```

# Add the job object to workflow and set the dependency (DAG based).
```
workflow.add_job(deploy_image_job)
#workflow.add_job(deploy_3d_job, dependencies=[deploy_image_job])
workflow.add_job(train_job, dependencies=[deploy_image_job])
```

# Run workflow
```
workflow.run()
```

# After the workflow finished, print the graph, nodes and topological order
```
print("graph", workflow.metadata.graph)
print("nodes", workflow.metadata.nodes)
print("topological_order", workflow.metadata.topological_order)
print("loop", workflow.loop)
```
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import os

from fedml.workflow.workflow import Workflow
from fedml.workflow.workflow import JobStatus, Workflow
from fedml.workflow.customized_jobs.deploy_job import DeployJob
from fedml.workflow.customized_jobs.train_job import TrainJob

CURRENT_CONFIG_VERSION = "release"
CURRENT_ON_PREM_LOCAL_HOST = "localhost"
CURRENT_ON_PREM_LOCAL_PORT = 18080
MY_API_KEY = "1316b93c82da40ce90113a2ed12f0b14"
MY_API_KEY = "" # Here you need to set your API key from nexus.fedml.ai


class DeployImageJob(DeployJob):
def __init__(self, name, job_yaml_absolute_path=None):
Expand All @@ -20,7 +21,17 @@ def run(self):
super().run()

def status(self):
super().status()
current_status = super().status()
if current_status == JobStatus.FINISHED:
pass
elif current_status == JobStatus.FAILED:
pass
elif current_status == JobStatus.RUNNING:
pass
elif current_status == JobStatus.PROVISIONING:
pass

return current_status

def kill(self):
super().kill()
Expand All @@ -38,7 +49,17 @@ def run(self):
super().run()

def status(self):
super().status()
current_status = super().status()
if current_status == JobStatus.FINISHED:
pass
elif current_status == JobStatus.FAILED:
pass
elif current_status == JobStatus.RUNNING:
pass
elif current_status == JobStatus.PROVISIONING:
pass

return current_status

def kill(self):
super().kill()
Expand All @@ -56,7 +77,17 @@ def run(self):
super().run()

def status(self):
super().status()
current_status = super().status()
if current_status == JobStatus.FINISHED:
pass
elif current_status == JobStatus.FAILED:
pass
elif current_status == JobStatus.RUNNING:
pass
elif current_status == JobStatus.PROVISIONING:
pass

return current_status

def kill(self):
super().kill()
Expand Down Expand Up @@ -87,8 +118,8 @@ def kill(self):

# Add the job object to workflow and set the dependency (DAG based).
workflow.add_job(deploy_image_job)
workflow.add_job(deploy_3d_job, dependencies=[deploy_image_job])
workflow.add_job(train_job, dependencies=[deploy_3d_job])
#workflow.add_job(deploy_3d_job, dependencies=[deploy_image_job])
workflow.add_job(train_job, dependencies=[deploy_image_job])

# Run workflow
workflow.run()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ bootstrap: |
echo "Bootstrap finished."
computing:
resource_type: A100-80GB-SXM # e.g., A100-80G, please check the resource type list by "fedml show-resource-type" or visiting URL: https://open.fedml.ai/accelerator_resource_type
resource_type: RTX-3090 # e.g., A100-80G, please check the resource type list by "fedml show-resource-type" or visiting URL: https://open.fedml.ai/accelerator_resource_type
minimum_num_gpus: 1 # minimum # of GPUs to provision
maximum_cost_per_hour: $10 # max cost per hour of all machines for your job
# device_type: GPU # GPU or CPU
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ server_external_port: 20202
server_internal_port: 2203

environment_variables:
NEXUS_API_KEY: ""
NEXUS_API_KEY: "" # Here you need to set your API key from nexus.fedml.ai

bootstrap: |
echo "Bootstrap start..."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,15 @@ def predict(self, request: dict):
response_text = self.chatbot.predict(instruction=question)

try:
unique_id = str(uuid.uuid4())
unique_id = "3D_MODEL_KEY"
with open(f"{unique_id}.txt", "w") as f:
f.write(question)
f.write("\n\n")
f.write(response_text)
f.write("\n\n")
fedml.api.upload(data_path=f"{unique_id}.txt", name=unique_id,
response = fedml.api.upload(data_path=f"{unique_id}.txt", name=unique_id,
api_key=os.environ.get("NEXUS_API_KEY", None), metadata={"type": "chatbot"})
print(f"upload response {response}")
except Exception as e:
pass

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ bootstrap: |
echo "Bootstrap finished."
computing:
resource_type: A100-80GB-SXM # e.g., A100-80G, please check the resource type list by "fedml show-resource-type" or visiting URL: https://open.fedml.ai/accelerator_resource_type
resource_type: RTX-3090 # e.g., A100-80G, please check the resource type list by "fedml show-resource-type" or visiting URL: https://open.fedml.ai/accelerator_resource_type
minimum_num_gpus: 1 # minimum # of GPUs to provision
maximum_cost_per_hour: $10 # max cost per hour of all machines for your job
# device_type: GPU # GPU or CPU
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ server_external_port: 20202
server_internal_port: 2203

environment_variables:
NEXUS_API_KEY: ""
NEXUS_API_KEY: "" # Here you need to set your API key from nexus.fedml.ai

bootstrap: |
echo "Bootstrap start..."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import uuid


class Chatbot(FedMLPredictor): # Inherit FedMLClientPredictor
class Chatbot(FedMLPredictor): # Inherit FedMLClientPredictor
def __init__(self):
super().__init__()
PROMPT_FOR_GENERATION_FORMAT = f""""Below is an instruction that describes a task. Write a response that appropriately completes the request."
Expand All @@ -34,7 +34,7 @@ def __init__(self):
config = AutoConfig.from_pretrained("EleutherAI/pythia-70m")
model = AutoModelForCausalLM.from_pretrained(
"EleutherAI/pythia-70m",
torch_dtype=torch.float32, # float 16 not supported on CPU
torch_dtype=torch.float32, # float 16 not supported on CPU
trust_remote_code=True,
device_map="auto"
)
Expand Down Expand Up @@ -64,14 +64,15 @@ def predict(self, request: dict):
response_text = self.chatbot.predict(instruction=question)

try:
unique_id = str(uuid.uuid4())
unique_id = "IMAGE_MODEL_KEY"
with open(f"{unique_id}.txt", "w") as f:
f.write(question)
f.write("\n\n")
f.write(response_text)
f.write("\n\n")
fedml.api.upload(data_path=f"{unique_id}.txt", name=unique_id,
api_key=os.environ.get("NEXUS_API_KEY", None), metadata={"type": "chatbot"})
response = fedml.api.upload(data_path=f"{unique_id}.txt", name=unique_id,
api_key=os.environ.get("NEXUS_API_KEY", None), metadata={"type": "chatbot"})
print(f"upload response: code {response.code}, message {response.message}, data {response.data}")
except Exception as e:
pass

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ bootstrap: |
echo "Bootstrap finished."
computing:
resource_type: A100-80GB-SXM # e.g., A100-80G, please check the resource type list by "fedml show-resource-type" or visiting URL: https://open.fedml.ai/accelerator_resource_type
resource_type: RTX-3090 # e.g., A100-80G, please check the resource type list by "fedml show-resource-type" or visiting URL: https://open.fedml.ai/accelerator_resource_type
minimum_num_gpus: 1 # minimum # of GPUs to provision
maximum_cost_per_hour: $10 # max cost per hour of all machines for your job
# device_type: GPU # GPU or CPU
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import time
import traceback

import fedml

Expand All @@ -11,6 +12,16 @@
run_id = os.getenv('FEDML_CURRENT_RUN_ID', 0)
edge_id = os.getenv('FEDML_CURRENT_EDGE_ID', 0)

try:
unique_id = "IMAGE_MODEL_KEY"
my_api_key = "" # Here you need to set your API key from nexus.fedml.ai
response = fedml.api.download(unique_id,
api_key=my_api_key, dest_path=f"{unique_id}")
print(f"download response: code {response.code}, message {response.message}, data {response.data}")
except Exception as e:
print(f"download exception {traceback.format_exc()}")
pass

artifact = fedml.mlops.Artifact(name=f"general-file@{run_id}-{edge_id}", type=fedml.mlops.ARTIFACT_TYPE_NAME_GENERAL)
artifact.add_file("./requirements.txt")
artifact.add_dir("./config")
Expand Down
2 changes: 1 addition & 1 deletion python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def finalize_options(self):

setup(
name="fedml",
version="0.8.18.dev14",
version="0.8.19.dev1",
author="FedML Team",
author_email="[email protected]",
description="A research and production integrated edge-cloud library for "
Expand Down

0 comments on commit 5ce74ea

Please sign in to comment.