Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto-capture requirements #4896

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 159 additions & 0 deletions albatross_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import sys
print(sys.path)
sys.path.append("/home/upravali/telemetry/sagemaker-python-sdk/src/sagemaker")
sys.path.append('/home/upravali/langchain/langchain-aws/libs/aws/')
print("Updated sys.path: ", sys.path)

import json
import os
import time

from sagemaker.serve.builder.model_builder import ModelBuilder
from sagemaker.serve.builder.schema_builder import SchemaBuilder
from sagemaker.serve.spec.inference_spec import InferenceSpec
import langchain_aws
import langchain_core
from langchain_aws import ChatBedrockConverse
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

INPUTS = {
'CPU': {
'INFERENCE_IMAGE': '763104351884.dkr.ecr.us-west-2.amazonaws.com/pytorch-inference:2.4.0-cpu-py311-ubuntu22.04-sagemaker',
'INSTANCE_TYPE': 'ml.m5.xlarge'
},
'GPU': {
'INFERENCE_IMAGE': '763104351884.dkr.ecr.us-west-2.amazonaws.com/pytorch-inference:2.4.0-gpu-py311-cu124-ubuntu22.04-sagemaker',
'INSTANCE_TYPE': 'ml.g5.xlarge'
},
'SERVICE': {
'ROLE': 'arn:aws:iam::971812153697:role/upravali-test-role'
}
}

def deploy(device):

class CustomerInferenceSpec(InferenceSpec):

def load(self, model_dir):
from langchain_aws import ChatBedrockConverse
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
return \
ChatPromptTemplate.from_messages(
[
(
"system",
"You are a verbose assistant that gives long-winded responses at least 500 words long for every comment/question.",
),
("human", "{input}"),
]
) | \
ChatBedrockConverse(
model = 'anthropic.claude-3-sonnet-20240229-v1:0',
temperature = 0,
region_name = 'us-west-2'
) | \
StrOutputParser()

def invoke(self, x, model):
return model.invoke({'input': x['input']}) if x['stream'].lower() != 'true' \
else model.stream({'input': x['input']})



model = ModelBuilder(
##################################################################
# can be service or customer who defines these
##################################################################
name = f'model-{int(time.time())}',

##################################################################
# service should define these
##################################################################
image_uri = INPUTS[device]['INFERENCE_IMAGE'],
env_vars = {
'TS_DISABLE_TOKEN_AUTHORIZATION' : 'true' # ABSOLUTELY NECESSARY
},

##################################################################
# customer should define these
##################################################################
schema_builder = SchemaBuilder(
json.dumps({
'stream': 'true',
'input': 'hello'
}),
"<EOF>"
),
inference_spec = CustomerInferenceSpec(), # Won't be pickled correctly if Python version locally and DLC don't match
dependencies = {
"auto": True,
# 'requirements' : './inference/code/requirements2.txt'
},
role_arn = INPUTS['SERVICE']['ROLE']
).build()
endpoint = model.deploy(
initial_instance_count = 1,
instance_type = INPUTS[device]['INSTANCE_TYPE'],
)
return (model, endpoint)


###################################################################################################
#
#
# PoC DEMO CODE ONLY
#
# Note: invoke vs invoke_stream matters
###################################################################################################
def invoke(endpoint, x):
res = endpoint.predict(x)
return res

def invoke_stream(endpoint, x):
res = endpoint.predict_stream(x)
print(str(res)) # Generator
return res

def clean(model, endpoint):
try:
endpoint.delete_endpoint()
except Exception as e:
print(e)
pass

try:
model.delete_model()
except Exception as e:
print(e)
pass

def main(device):
print("before deploying")
model, endpoint = deploy(device)
print("after deploying")

while True:
x = input(f">>> ")
if x == 'exit':
break
try:
if json.loads(x)['stream'].lower() == 'true':
for chunk in invoke_stream(endpoint, x):
print(
str(chunk, encoding = 'utf-8'),
end = "",
flush = True
)
print()
else:
print(invoke(endpoint, x))
except Exception as e:
print(e)

clean(model, endpoint)

if __name__ == '__main__':
os.environ['AWS_DEFAULT_REGION'] = 'us-west-2'
main('CPU')
75 changes: 40 additions & 35 deletions src/sagemaker/serve/detector/dependency_manager.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,19 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""SageMaker model builder dependency managing module.

This must be kept independent of SageMaker PySDK
"""

from __future__ import absolute_import

from pathlib import Path
import logging
import subprocess
import sys
import re
from pathlib import Path

_SUPPORTED_SUFFIXES = [".txt"]
# TODO : Move PKL_FILE_NAME to common location
PKL_FILE_NAME = "serve.pkl"

logger = logging.getLogger(__name__)


def capture_dependencies(dependencies: dict, work_dir: Path, capture_all: bool = False):
"""Placeholder docstring"""
"""Capture dependencies and print output."""
print(f"Capturing dependencies: {dependencies}, work_dir: {work_dir}, capture_all: {capture_all}")

path = work_dir.joinpath("requirements.txt")
if "auto" in dependencies and dependencies["auto"]:
command = [
Expand All @@ -45,6 +27,8 @@ def capture_dependencies(dependencies: dict, work_dir: Path, capture_all: bool =

if capture_all:
command.append("--capture_all")

print(f"Running subprocess with command: {command}")

subprocess.run(
command,
Expand All @@ -55,62 +39,83 @@ def capture_dependencies(dependencies: dict, work_dir: Path, capture_all: bool =
with open(path, "r") as f:
autodetect_depedencies = f.read().splitlines()
autodetect_depedencies.append("sagemaker[huggingface]>=2.199")
print(f"Auto-detected dependencies: {autodetect_depedencies}")
else:
autodetect_depedencies = ["sagemaker[huggingface]>=2.199"]
print(f"No auto-detection, using default dependencies: {autodetect_depedencies}")

module_version_dict = _parse_dependency_list(autodetect_depedencies)
print(f"Parsed auto-detected dependencies: {module_version_dict}")

if "requirements" in dependencies:
module_version_dict = _process_customer_provided_requirements(
requirements_file=dependencies["requirements"], module_version_dict=module_version_dict
)
print(f"After processing customer-provided requirements: {module_version_dict}")

if "custom" in dependencies:
module_version_dict = _process_custom_dependencies(
custom_dependencies=dependencies.get("custom"), module_version_dict=module_version_dict
)
print(f"After processing custom dependencies: {module_version_dict}")

with open(path, "w") as f:
for module, version in module_version_dict.items():
f.write(f"{module}{version}\n")
print(f"Final dependencies written to {path}")


def _process_custom_dependencies(custom_dependencies: list, module_version_dict: dict):
"""Placeholder docstring"""
"""Process custom dependencies and print output."""
print(f"Processing custom dependencies: {custom_dependencies}")

custom_module_version_dict = _parse_dependency_list(custom_dependencies)
print(f"Parsed custom dependencies: {custom_module_version_dict}")

module_version_dict.update(custom_module_version_dict)
print(f"Updated module_version_dict with custom dependencies: {module_version_dict}")

return module_version_dict


def _process_customer_provided_requirements(requirements_file: str, module_version_dict: dict):
"""Placeholder docstring"""
"""Process customer-provided requirements and print output."""
print(f"Processing customer-provided requirements from file: {requirements_file}")

requirements_file = Path(requirements_file)
if not requirements_file.is_file() or not _is_valid_requirement_file(requirements_file):
raise Exception(f"Path: {requirements_file} to requirements.txt doesn't exist")

logger.debug("Packaging provided requirements.txt from %s", requirements_file)
with open(requirements_file, "r") as f:
custom_dependencies = f.read().splitlines()

print(f"Customer-provided dependencies: {custom_dependencies}")

module_version_dict.update(_parse_dependency_list(custom_dependencies))
print(f"Updated module_version_dict with customer-provided requirements: {module_version_dict}")

return module_version_dict


def _is_valid_requirement_file(path):
"""Placeholder docstring"""
# In the future, we can also check the if the content of customer provided file has valid format
"""Check if the requirements file is valid and print result."""
print(f"Validating requirement file: {path}")

for suffix in _SUPPORTED_SUFFIXES:
if path.name.endswith(suffix):
print(f"File {path} is valid with suffix {suffix}")
return True

print(f"File {path} is not valid")
return False


def _parse_dependency_list(depedency_list: list) -> dict:
"""Placeholder docstring"""

# Divide a string into 2 part, first part is the module name
# and second part is its version constraint or the url
# checkout tests/unit/sagemaker/serve/detector/test_dependency_manager.py
# for examples
"""Parse the dependency list and print output."""
print(f"Parsing dependency list: {depedency_list}")

pattern = r"^([\w.-]+)(@[^,\n]+|((?:[<>=!~]=?[\w.*-]+,?)+)?)$"

module_version_dict = {}

for dependency in depedency_list:
Expand All @@ -119,10 +124,10 @@ def _parse_dependency_list(depedency_list: list) -> dict:
match = re.match(pattern, dependency)
if match:
package = match.group(1)
# Group 2 is either a URL or version constraint, if present
url_or_version = match.group(2) if match.group(2) else ""
module_version_dict.update({package: url_or_version})
else:
module_version_dict.update({dependency: ""})


print(f"Parsed module_version_dict: {module_version_dict}")
return module_version_dict
Loading
Loading