diff --git a/clipper_admin/clipper_admin/__init__.py b/clipper_admin/clipper_admin/__init__.py index 821a0a428..8b4395ccd 100644 --- a/clipper_admin/clipper_admin/__init__.py +++ b/clipper_admin/clipper_admin/__init__.py @@ -2,6 +2,8 @@ from .docker.docker_container_manager import DockerContainerManager from .kubernetes.kubernetes_container_manager import KubernetesContainerManager +from .nomad.nomad_container_manager import NomadContainerManager +from .nomad.consul_dns import ConsulDNS from .clipper_admin import * from . import deployers from .version import __version__, __registry__ diff --git a/clipper_admin/clipper_admin/nomad/consul_dns.py b/clipper_admin/clipper_admin/nomad/consul_dns.py new file mode 100644 index 000000000..3408a7d1e --- /dev/null +++ b/clipper_admin/clipper_admin/nomad/consul_dns.py @@ -0,0 +1,26 @@ +from .dns import DNS +import dns.resolver +import socket + +""" + Consul is a service networking solution to connect and secure services across any runtime platform and public or private cloud +""" +class ConsulDNS(DNS): + + """ + This method resolves records of IP and Ports with a SRV DNS request + Parameters: + domain str: + The domain to resolve, in Consul this correspond to the healthcheck name + """ + def resolveSRV(self, check_name): + addr = '{}.service.consul'.format(check_name) + srv_records= dns.resolver.query(addr, 'SRV') + srvInfo = {} + for srv in srv_records: + srvInfo['host'] = str(srv.target).rstrip('.') + srvInfo['port'] = srv.port + host = srvInfo['host'] + port = srvInfo['port'] + return (socket.gethostbyname(host), port) + diff --git a/clipper_admin/clipper_admin/nomad/dns.py b/clipper_admin/clipper_admin/nomad/dns.py new file mode 100644 index 000000000..8138bca74 --- /dev/null +++ b/clipper_admin/clipper_admin/nomad/dns.py @@ -0,0 +1,14 @@ +import abc +from abc import abstractmethod +class DNS(abc.ABC): + + """ + This method resolves records of IP and Ports with a SRV DNS request + Parameters: + domain str: + The domain to resolve + """ + @abstractmethod + def resolveSRV(self, domain): + pass + diff --git a/clipper_admin/clipper_admin/nomad/fabio_load_balancer.py b/clipper_admin/clipper_admin/nomad/fabio_load_balancer.py new file mode 100644 index 000000000..dab80bb41 --- /dev/null +++ b/clipper_admin/clipper_admin/nomad/fabio_load_balancer.py @@ -0,0 +1,22 @@ +import abc +from abc import abstractmethod +from load_balancer import LoadBalancer + +""" + +""" +class FabioLoadBalancer(LoadBalancer): + + """ + Parameters + ---------- + + address: str + The address at which the load balancer is located. For instance fabio.service.consul + port: str + The port on which the TCP proxy listens, this is not the http port on which fabio proxy http requests ! + https://fabiolb.net/feature/tcp-proxy/ + """ + def __init__(self, address, port): + self.address = address + self.port = port diff --git a/clipper_admin/clipper_admin/nomad/load_balancer.py b/clipper_admin/clipper_admin/nomad/load_balancer.py new file mode 100644 index 000000000..8235d35ec --- /dev/null +++ b/clipper_admin/clipper_admin/nomad/load_balancer.py @@ -0,0 +1,12 @@ +import abc +from abc import abstractmethod + +class LoadBalancer(abc.ABC): + + @abstractmethod + def tcp(self, address): + pass + @abstractmethod + def http(self, address): + pass + diff --git a/clipper_admin/clipper_admin/nomad/mgmt_deployment.py b/clipper_admin/clipper_admin/nomad/mgmt_deployment.py new file mode 100644 index 000000000..d4048f786 --- /dev/null +++ b/clipper_admin/clipper_admin/nomad/mgmt_deployment.py @@ -0,0 +1,74 @@ +from .utils import nomad_job_prefix, mgmt_job_prefix, mgmt_check +import os + + +""" Nomad payload to deploy a new mgmt """ +def mgmt_deployment( + job_id, + datacenters, + cluster_name, + image, + redis_ip, + redis_port, + num_replicas, + cpu=500, + memory=256, + health_check_interval=3000000000, + health_check_timeout=2000000000 + ): + job = { + 'Job': + { + 'ID': job_id, + 'Datacenters': datacenters, + 'Type': 'service', + 'TaskGroups': [ + { + 'Name': nomad_job_prefix(cluster_name), + 'Count': num_replicas, + 'Tasks': [ + { + 'Name': mgmt_job_prefix(cluster_name), + 'Driver': 'docker', + 'Config': { + 'args': [ + "--redis_ip={}".format(redis_ip or os.environ('REDIS_SERVICE_IP')), # If redis_service_host == None, default to env var + "--redis_port={}".format(redis_port or os.environ('REDIS_SERVICE_PORT') or True) + ], + 'image': image, + 'port_map': [ + {'http': 1338} + ] + }, + 'Resources': { + 'CPU': cpu, + 'MemoryMB': memory, + 'Networks': [ + { + 'DynamicPorts': [{'Label': 'http', 'Value': 1338}] + } + ] + }, + 'Services': [ + { + 'Name': mgmt_check(cluster_name), + 'Tags': ['machine-learning', 'model', 'clipper', 'mgmt'], + 'PortLabel': 'http', + 'Checks': [ + { + 'Name': 'alive', + 'Type': 'tcp', + 'interval': health_check_interval, + 'timeout': health_check_timeout + } + ] + } + ] + } + ] + } + ] + + } + } + return job diff --git a/clipper_admin/clipper_admin/nomad/model_deployment.py b/clipper_admin/clipper_admin/nomad/model_deployment.py new file mode 100644 index 000000000..abc442581 --- /dev/null +++ b/clipper_admin/clipper_admin/nomad/model_deployment.py @@ -0,0 +1,80 @@ +from .utils import nomad_job_prefix, model_job_prefix, generate_model_job_name, model_check_name + +""" Nomad payload to deploy a new model """ +def model_deployment( + job_id, + datacenters, + cluster_name, + model_name, + model_version, + input_type, + image, + num_replicas, + query_frontend_ip, + query_frontend_port, + cpu=500, + memory=256, + health_check_interval=3000000000, + health_check_timeout=2000000000 + ): + job = { + 'Job': { + 'ID': job_id, + 'Datacenters': datacenters, + 'Type': 'service', + 'TaskGroups': [ + { + 'Name': 'clipper-{}'.format(cluster_name), + 'Count': num_replicas, + 'Tasks': [ + { + 'Name': generate_model_job_name(cluster_name, model_name, model_version), + 'Driver': 'docker', + 'Env': { + 'CLIPPER_MODEL_NAME': model_name, + 'CLIPPER_MODEL_VERSION': model_version, + 'CLIPPER_IP': query_frontend_ip, + 'CLIPPER_PORT': query_frontend_port, + 'CLIPPER_INPUT_TYPE': input_type + }, + 'Config': { + 'image': image, + 'port_map': [ + {'zeromq': 1390} + ], + 'dns_servers': ["${attr.unique.network.ip-address}"] + }, + 'Resources': { + 'CPU': cpu, + 'MemoryMB': memory, + 'Networks': [ + { + 'DynamicPorts': [ + {'Label': 'zeromq', 'Value': 1390} + ] + } + ] + }, + 'Services': [ + { + 'Name': model_check_name(cluster_name, model_name, model_version), + 'Tags': ['machine-learning', 'model', 'clipper', model_name], + 'PortLabel': 'zeromq', + 'Checks': [ + { + 'Name': 'alive', + 'Type': 'tcp', + 'interval': health_check_interval, + 'timeout': health_check_timeout + } + ] + } + ] + } + ] + } + ] + + } + } + return job diff --git a/clipper_admin/clipper_admin/nomad/nomad_container_manager.py b/clipper_admin/clipper_admin/nomad/nomad_container_manager.py new file mode 100644 index 000000000..c4075e1a2 --- /dev/null +++ b/clipper_admin/clipper_admin/nomad/nomad_container_manager.py @@ -0,0 +1,412 @@ +from __future__ import absolute_import, division, print_function +from ..container_manager import ( + create_model_container_label, ContainerManager, CLIPPER_DOCKER_LABEL, + CLIPPER_MODEL_CONTAINER_LABEL, CLIPPER_INTERNAL_RPC_PORT, + CLIPPER_INTERNAL_MANAGEMENT_PORT, CLIPPER_INTERNAL_QUERY_PORT, + CLIPPER_INTERNAL_METRIC_PORT, CLIPPER_NAME_LABEL, ClusterAdapter) +from ..exceptions import ClipperException + +from contextlib import contextmanager +import nomad + +from .redis_deployment import redis_deployment +from .model_deployment import model_deployment, generate_model_job_name, model_job_prefix, model_check_name +from .mgmt_deployment import mgmt_deployment +from .query_frontend_deployment import query_frontend_deployment + +from .utils import nomad_job_prefix, query_frontend_job_prefix, query_frontend_service_check, query_frontend_rpc_check +from .utils import mgmt_job_prefix, mgmt_check +from .utils import redis_job_prefix, redis_check + +from dns.resolver import NXDOMAIN + +import logging +import json +import yaml +import os +import time +import jinja2 +from jinja2.exceptions import TemplateNotFound + +logger = logging.getLogger(__name__) +cur_dir = os.path.dirname(os.path.abspath(__file__)) + + +@contextmanager +def _pass_conflicts(): + try: + yield + except ApiException as e: + body = json.loads(e.body) + if body['reason'] == 'AlreadyExists': + logger.info("{} already exists, skipping!".format(body['details'])) + pass + else: + raise e + + +class NomadContainerManager(ContainerManager): + def __init__(self, + nomad_ip, + dns, + load_balancer=None, + cluster_name="default-cluster", + datacenters=["dc1"], + redis_ip=None, + redis_port=6379, + namespace=None, + create_namespace_if_not_exists=False, + default_cpu=500, + default_memory=256 + ): + """ + + Parameters + ---------- + nomad_ip: str + The ip of Nomad + dns: DNS + The DNS service that you used with Nomad. Consul is the most popular option. + load_balancer: str + The Load Balancer used with Nomad. If you dont have one or dont want to use it, leave it None + cluster_name: str + A unique name for this Clipper cluster. This can be used to run multiple Clipper + clusters on the same Kubernetes cluster without interfering with each other. + Kubernetes cluster name must follow Kubernetes label value naming rule, namely: + Valid label values must be 63 characters or less and must be empty or begin and end with + an alphanumeric character ([a-z0-9A-Z]) with dashes (-), underscores (_), dots (.), + and alphanumerics between. See more at: + https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set + datacenter: str, optional + The name of your Nomad datacenter + redis_ip : str, optional + The address of a running Redis cluster. If set to None, Clipper will start + a Redis deployment for you. + redis_port : int, optional + The Redis port. If ``redis_ip`` is set to None, Clipper will start Redis on this port. + If ``redis_ip`` is provided, Clipper will connect to Redis on this port. + namespace: str, optional + The Nomad namespace to use . + If this argument is provided, all Clipper artifacts and resources will be created in this + Nomad namespace. If not, the default namespace is used. + create_namespace_if_not_exists: bool, False + Create a Nomad namespace if the namespace doesnt already exist. This feature is only in Enterprise Edition. + If this argument is provided and the Nomad namespace does not exist a new Nomad namespace will + be created. + default_cpu: int + The frequency allocated to nomad jobs by default, in mhz + default_memory: int + The amount of memory allocated to nomad jobs by default, in mb + Note + ---- + Clipper stores all persistent configuration state (such as application and model + information) in Redis. If you want Clipper to be durable and able to recover from failures, + we recommend configuring your own persistent and replicated Redis cluster rather than + letting Clipper launch one for you. + """ + + self.cluster_name = cluster_name + + self.dns = dns + self.load_balancer = load_balancer + self.datacenters = datacenters + self.default_cpu = default_cpu + self.default_memory = default_memory + + self.redis_ip = redis_ip + self.redis_port = redis_port + + self.query_ip = None + self.query_port = None + + # connect to nomad cluster + self.nomad = nomad.Nomad(host=nomad_ip, timeout=5) + namespaces = [] + + if namespace in namespaces: + pass + elif namespace is not None and create_namespace_if_not_exists: + pass + + self.cluster_name = cluster_name + self.cluster_identifier = "{cluster}".format(cluster=self.cluster_name) + self.logger = ClusterAdapter(logger, { + 'cluster_name': self.cluster_identifier + }) + + def start_clipper(self, + query_frontend_image, + mgmt_frontend_image, + frontend_exporter_image, + cache_size, + qf_http_thread_pool_size, + qf_http_timeout_request, + qf_http_timeout_content, + num_frontend_replicas=1): + self._start_redis() + self._start_mgmt(mgmt_frontend_image, num_replicas=1) + self.num_frontend_replicas = num_frontend_replicas + self._start_query(query_frontend_image, frontend_exporter_image, + cache_size, qf_http_thread_pool_size, + qf_http_timeout_request, qf_http_timeout_content, + num_frontend_replicas) + #self._start_prometheus() + #self.connect() + + def _start_redis(self, sleep_time=5): + # If an existing Redis service isn't provided, start one + if self.redis_ip is None: + job_id = redis_job_prefix(self.cluster_name) + self.nomad.job.register_job( + job_id, + redis_deployment( + job_id, + self.datacenters, + self.cluster_name, + cpu=self.default_cpu, + memory=self.default_memory + ) + ) + + + # Wait for max 10 minutes + wait_count = 0 + redis_ip = None + redis_port = None + check_name = redis_check(self.cluster_name) + + while redis_ip is None: + time.sleep(3) + wait_count += 3 + if wait_count > 600: + raise ClipperException( + "Could not create a Nomad deployment: {}".format(job_id)) + try: + redis_ip, redis_port = self.dns.resolveSRV(check_name) + self.logger.info('Redis is at {}:{}'.format(redis_ip, redis_port)) + except NXDOMAIN as err: + self.logger.warning('DNS query failed: {}'.format(err)) + + self.redis_ip = redis_ip + self.redis_port = redis_port + + def _start_mgmt(self, mgmt_image, num_replicas): + job_id = mgmt_job_prefix(self.cluster_name), + self.nomad.job.register_job( + job_id, + mgmt_deployment( + job_id, + self.datacenters, + self.cluster_name, + mgmt_image, + self.redis_ip, + self.redis_port, + num_replicas, + cpu=self.default_cpu, + memory=self.default_memory + ) + ) + + wait_count = 0 + + mgmt_ip = None + mgmt_port = None + + check_name = mgmt_check(self.cluster_name) + + while mgmt_ip is None: + time.sleep(3) + wait_count += 3 + if wait_count > 600: + raise ClipperException( + "Could not create a Nomad deployment: {}".format(job_id)) + try: + mgmt_ip, mgmt_port = self.dns.resolveSRV(check_name) + self.logger.info('Clipper mgmt is at {}:{}'.format(mgmt_ip, mgmt_port)) + except NXDOMAIN as err: + self.logger.warning('DNS query failed: {}'.format(err)) + self.mgmt_ip = mgmt_ip + self.mgmt_port = mgmt_port + + def _start_query(self, query_image, frontend_exporter_image, cache_size, + qf_http_thread_pool_size, qf_http_timeout_request, + qf_http_timeout_content, num_replicas): + job_id = query_frontend_job_prefix(self.cluster_name) + self.nomad.job.register_job( + job_id, + query_frontend_deployment( + job_id, + self.datacenters, + self.cluster_name, + query_image, + self.redis_ip, + self.redis_port, + num_replicas, + cache_size, + qf_http_thread_pool_size, + qf_http_timeout_request, + qf_http_timeout_content, + cpu=self.default_cpu, + memory=self.default_memory + ) + ) + + # Wait for max 10 minutes + wait_count = 0 + query_ip = None + query_port = None + + while query_ip is None: + time.sleep(3) + wait_count += 3 + if wait_count > 600: + raise ClipperException( + "Could not create a Nomad deployment: {}".format(job_id)) + try: + query_ip, query_port = self._resolve_query_ip() + self.logger.info('Clipper query is at {}:{}'.format(query_ip, query_port)) + except NXDOMAIN as err: + self.logger.warning('DNS query failed: {}'.format(err)) + self.query_ip = query_ip + self.query_port = query_port + + def _resolve_query_ip(self): + return self._resolve_query_frontend_service() + + """ + This function queries the DNS server with a SRV request to get ip and port for the query frontend service (REST API) + """ + def _resolve_query_frontend_service(self): + check_name = query_frontend_service_check(self.cluster_name) + query_ip, query_port = self.dns.resolveSRV(check_name) + return (query_ip, query_port) + + """ + This function queries the DNS server with a SRV request to get ip and port for the query frontend rpc + """ + def _resolve_query_frontend_rpc(self): + check_name = query_frontend_rpc_check(self.cluster_name) + query_ip, query_port = self.dns.resolveSRV(check_name) + return (query_ip, query_port) + + def _start_prometheus(self): + pass + + def connect(self): + pass + + def deploy_model(self, name, version, input_type, image, num_replicas=1): + job_id = generate_model_job_name(self.cluster_name, name, version) + + if self.load_balancer != None: + query_frontend_ip = self.load_balancer.ip + query_frontend_port = self.load_balancer.port + else: + self.logger.warning(''' + You did not set a load balancer, this is potentially dangerous because ip and ports may change over time + and not be updated on the model sides, prefer using a load balancer like Fabio + ''') + query_frontend_ip, query_frontend_port = self._resolve_query_frontend_rpc() + + self.nomad.job.register_job( + job_id, + model_deployment( + job_id, + self.datacenters, + self.cluster_name, + name, + version, + input_type, + image, + num_replicas, + query_frontend_ip, + query_frontend_port, + cpu=self.default_cpu, + memory=self.default_memory + ) + ) + + # Wait for max 10 minutes + wait_count = 0 + model_ip = None + model_port = None + while model_ip is None: + time.sleep(3) + wait_count += 3 + if wait_count > 600: + raise ClipperException( + "Could not create a Nomad deployment: {}".format(job_id)) + try: + model_ip, model_port = self.dns.resolveSRV(check_name) + self.logger.info('Clipper model is at {}:{}'.format(model_ip, model_port)) + except NXDOMAIN as err: + self.logger.warning('DNS query failed: {}'.format(err)) + + def get_num_replicas(self, name, version): + self.logger.warning('get_num_replicasis not supported with Nomad') + return 0 + + def set_num_replicas(self, name, version, input_type, image, num_replicas): + self.logger.warning('set_num_replicas is not supported with Nomad') + + def get_logs(self, logging_dir): + self.logger.warning('get_logs is not supported with Nomad') + + def stop_models(self, models): + # Stops all deployments of containers running Clipper models with the specified + # names and versions. + try: + for m in models: + for v in models[m]: + job_name = generate_model_job_name(self.cluster_name, m, v) + self.nomad.job.deregister_job(job_name) + except Exception as e: + self.logger.warning( + "Exception deleting Nomad deployments: {}".format(e)) + raise e + + def stop_all_model_containers(self): + print('model job prefix {}', model_job_prefix(self.cluster_name)) + jobs = self.nomad.jobs.get_jobs(prefix=model_job_prefix(self.cluster_name)) + print('jobs: {}', jobs) + for job in jobs: + self.logger.warning('nomad job below') + self.logger.warning(job) + self.nomad.job.deregister_job(job['Name']) + + def stop_all(self, graceful=True): + self.logger.info("Stopping all running Clipper resources") + jobs = self.nomad.jobs.get_jobs(prefix=nomad_job_prefix(self.cluster_name)) + + for job in jobs: + self.logger.warning('nomad job below') + self.logger.warning(job) + self.nomad.job.deregister_job(job['Name']) + + + def get_admin_addr(self): + check_name = mgmt_check(self.cluster_name) + mgmt_ip, mgmt_port = self.dns.resolveSRV(check_name) + return '{}:{}'.format(mgmt_ip, mgmt_port) + + def get_query_addr(self): + check_name = query_frontend_service_check(self.cluster_name) + try: + query_ip, query_port= self.dns.resolveSRV(check_name) + self.query_ip = query_ip + self.query_port = query_port + return '{}:{}'.format(query_ip, query_port) + except NXDOMAIN: + return '' + + + def get_metric_addr(self): + self.logger.warning("get_metric_addr is not supported with Nomad") + + +def get_model_deployment_name(name, version, query_frontend_id, cluster_name): + return "{name}-{version}-deployment-at-{query_frontend_id}-at-{cluster_name}".format( + name=name, + version=version, + query_frontend_id=query_frontend_id, + cluster_name=cluster_name) diff --git a/clipper_admin/clipper_admin/nomad/query_frontend_deployment.py b/clipper_admin/clipper_admin/nomad/query_frontend_deployment.py new file mode 100644 index 000000000..8d8816a69 --- /dev/null +++ b/clipper_admin/clipper_admin/nomad/query_frontend_deployment.py @@ -0,0 +1,98 @@ +from .utils import nomad_job_prefix, query_frontend_job_prefix, query_frontend_service_check, query_frontend_rpc_check +import os + + +""" Nomad payload to deploy a new query frontend""" +def query_frontend_deployment( + job_id, + datacenters, + cluster_name, + image, + redis_ip, + redis_port, + num_replicas, + cache_size, + thread_pool_size, + timeout_request, + timeout_content, + cpu=500, + memory=256, + health_check_interval=3000000000, + health_check_timeout=2000000000 + ): + job = { + 'Job': { + 'ID': job_id, + 'Datacenters': datacenters, + 'Type': 'service', + 'TaskGroups': [ + { + 'Name': nomad_job_prefix(cluster_name), + 'Count': num_replicas, + 'Tasks': [ + { + 'Name': query_frontend_job_prefix(cluster_name), + 'Driver': 'docker', + 'Config': { + 'args': [ + "--redis_ip={}".format(redis_ip or os.environ('REDIS_SERVICE_IP')), # If redis_service_host == None, default to env var + "--redis_port={}".format(redis_port or os.environ('REDIS_SERVICE_PORT') or True), + "--prediction_cache_size={}".format(cache_size), + "--thread_pool_size={}".format(thread_pool_size), + "--timeout_request={}".format(timeout_request), + "--timeout_content={}".format(timeout_content) + ], + 'image': image, + 'port_map': [ + {'rpc': 7000}, + {'service': 1337} + ] + }, + 'Resources': { + 'CPU': cpu, + 'MemoryMB': memory, + 'Networks': [ + { + 'DynamicPorts': [ + {'Label': 'rpc', 'Value': 7000}, + {'Label': 'service', 'Value': 1337}, + ], + } + ] + }, + 'Services': [ + { + 'name': query_frontend_service_check(cluster_name), + 'tags': ['machine-learning', 'clipper', 'query-frontend', 'urlprefix-/clipper strip=/clipper'], + 'portlabel': 'service', + 'checks': [ + { + 'name': 'alive', + 'type': 'tcp', + 'interval': health_check_interval, + 'timeout':health_check_timeout + } + ] + }, + { + 'name': query_frontend_rpc_check(cluster_name), + 'tags': ['machine-learning', 'clipper', 'query-frontend', "urlprefix-:7000 proto=tcp"], + 'portlabel': 'rpc', + 'checks': [ + { + 'name': 'alive', + 'type': 'tcp', + 'interval': health_check_interval, + 'timeout': health_check_timeout + } + ] + } + ] + } + ] + } + ] + + } + } + return job diff --git a/clipper_admin/clipper_admin/nomad/redis_deployment.py b/clipper_admin/clipper_admin/nomad/redis_deployment.py new file mode 100644 index 000000000..6d1e3ac8e --- /dev/null +++ b/clipper_admin/clipper_admin/nomad/redis_deployment.py @@ -0,0 +1,70 @@ +from .utils import nomad_job_prefix, redis_job_prefix, redis_check + +""" Nomad payload to deploy Redis """ +def redis_deployment( + job_id, + datacenters, + cluster_name, + cpu=500, + memory=256, + health_check_interval=3000000000, + health_check_timeout=2000000000 + ): + job = { 'Job': { + 'ID': job_id, + 'Datacenters': datacenters, + 'Type': 'service', + 'Update': { + 'MaxParallel': 1, + 'AutoRevert': False, + 'Canary': 0 + }, + 'TaskGroups': [ + { + 'Name': nomad_job_prefix(cluster_name), + 'Count': 1, + 'EphemeralDisk': { + 'Size': 300 + }, + 'Tasks': [ + { + 'Name': redis_job_prefix(cluster_name), + 'Driver': 'docker', + 'Config': { + 'image': 'redis:alpine', + 'port_map': [ + {'db': 6379} + ] + }, + 'Resources': { + 'CPU': cpu, + 'MemoryMB': memory, + 'Networks': [ + { + 'DynamicPorts': [{'Label': 'db', 'Value': 6379}], + } + ] + }, + 'Services': [ + { + 'Name': redis_check(cluster_name), + 'Tags': ['global', 'cache'], + 'PortLabel': 'db', + 'Checks': [ + { + 'Name': 'alive', + 'Type': 'tcp', + 'interval': health_check_interval, + 'timeout': health_check_timeout + } + ] + } + ] + } + ] + } + ] + + } + } + return job diff --git a/clipper_admin/clipper_admin/nomad/utils.py b/clipper_admin/clipper_admin/nomad/utils.py new file mode 100644 index 000000000..b3c12446e --- /dev/null +++ b/clipper_admin/clipper_admin/nomad/utils.py @@ -0,0 +1,42 @@ + +def nomad_job_prefix(cluster_name): + return 'clipper-{}'.format(cluster_name) + + +""" + redis +""" +def redis_job_prefix(cluster_name): + return '{}-redis'.format(nomad_job_prefix(cluster_name)) +def redis_check(cluster_name): + return '{}-db'.format(mgmt_check(cluster_name)) + +""" + query frontend +""" +def query_frontend_job_prefix(cluster_name): + return '{}-query-frontend'.format(nomad_job_prefix(cluster_name)) +def query_frontend_service_check(cluster_name): + return '{}-service'.format(query_frontend_job_prefix(cluster_name)) +def query_frontend_rpc_check(cluster_name): + return '{}-rpc'.format(query_frontend_job_prefix(cluster_name)) + +""" + mgmt +""" +def mgmt_job_prefix(cluster_name): + return '{}-mgmt'.format(nomad_job_prefix(cluster_name)) +def mgmt_check(cluster_name): + return '{}-http'.format(mgmt_check(cluster_name)) + +""" + model +""" +def model_job_prefix(cluster_name): + return '{}-model'.format(nomad_job_prefix(cluster_name)) + +def generate_model_job_name(cluster_name, model_name, model_version): + return '{}-{}-{}'.format(model_job_prefix(cluster_name), model_name, model_version) + +def model_check_name(cluster_name, model_name, model_version): + return '{}-model-{}-{}'.format(nomad_job_prefix(cluster_name), model_name, model_version) diff --git a/clipper_admin/requirements.txt b/clipper_admin/requirements.txt index a3678fdbe..d9f9f7ff2 100644 --- a/clipper_admin/requirements.txt +++ b/clipper_admin/requirements.txt @@ -9,3 +9,5 @@ redis # https://github.com/ucbrise/clipper/issues/678. # It should be removed once urllib3>=1.25 is compatible with requests library urllib3<1.25 +dnspython==1.16.0 +python-nomad==1.1.0