Skip to content

Commit

Permalink
fix job names and checks
Browse files Browse the repository at this point in the history
  • Loading branch information
asauray committed Oct 17, 2019
1 parent 75bbd22 commit 2a3b9de
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 42 deletions.
13 changes: 4 additions & 9 deletions clipper_admin/clipper_admin/clipper_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,9 @@ def build_model(self,
context_file.seek(0)
image = "{cluster}-{name}:{version}".format(
cluster=self.cm.cluster_identifier, name=name, version=version)

if container_registry is not None:
image = "{reg}/{image}".format(
reg=container_registry, image=image)
docker_client = docker.from_env()
self.logger.info(
"Building model Docker image with model data from {}".format(
Expand All @@ -536,19 +538,12 @@ def build_model(self,
if 'stream' in b and b['stream'] != '\n': #log build steps only
self.logger.info(b['stream'].rstrip())

tagged_image = image
if container_registry is not None:
tagged_image = "{reg}/{image}".format(reg=container_registry, image=image)
else:
self.logger.info('Container registry is not set')
docker_client.images.tag(image, tagged_image)

self.logger.info("Pushing model Docker image to {}".format(image))

@retry((docker.errors.APIError, TimeoutError, Timeout),
tries=5, logger=self.logger)
def _push_model():
for line in docker_client.images.push(repository=tagged_image, stream=True):
for line in docker_client.images.push(repository=image, stream=True):
self.logger.debug(line)
_push_model()

Expand Down
6 changes: 2 additions & 4 deletions clipper_admin/clipper_admin/nomad/mgmt_deployment.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from .utils import nomad_job_prefix
from .utils import nomad_job_prefix, mgmt_job_prefix, mgmt_check
import os

def mgmt_job_prefix(cluster_name):
return '{}-mgmt'.format(nomad_job_prefix(cluster_name))

""" Nomad payload to deploy a new mgmt """
def mgmt_deployment(job_id, datacenters, cluster_name, image, redis_ip, redis_port, num_replicas):
Expand Down Expand Up @@ -41,7 +39,7 @@ def mgmt_deployment(job_id, datacenters, cluster_name, image, redis_ip, redis_po
},
'Services': [
{
'Name': '{}-mgmt'.format(nomad_job_prefix(cluster_name)),
'Name': mgmt_check(cluster_name),
'Tags': ['machine-learning', 'model', 'clipper', 'mgmt'],
'PortLabel': 'http',
'Checks': [
Expand Down
12 changes: 1 addition & 11 deletions clipper_admin/clipper_admin/nomad/model_deployment.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,4 @@

from .utils import nomad_job_prefix

def model_job_prefix(cluster_name):
return '{}-model'.format(nomad_job_prefix(cluster_name))

def generate_model_job_name(cluster_name, model_name, model_version):
return '{}-{}-{}'.format(model_job_prefix(cluster_name), model_name, model_version)

def model_check_name(cluster_name, name, version):
return '{}-model-{}-{}'.format(nomad_job_prefix(cluster_name), name, version)
from .utils import nomad_job_prefix, model_job_prefix, generate_model_job_name, model_check_name

""" Nomad payload to deploy a new model """
def model_deployment(
Expand Down
44 changes: 29 additions & 15 deletions clipper_admin/clipper_admin/nomad/nomad_container_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from .query_frontend_deployment import query_frontend_deployment

from .utils import nomad_job_prefix, query_frontend_job_prefix, query_frontend_service_check, query_frontend_rpc_check
from .utils import mgmt_job_prefix, mgmt_check
from .utils import redis_job_prefix, redis_check

from dns.resolver import NXDOMAIN

Expand Down Expand Up @@ -147,15 +149,16 @@ def start_clipper(self,
def _start_redis(self, sleep_time=5):
# If an existing Redis service isn't provided, start one
if self.redis_ip is None:
job_id = 'redis'
job_id = redis_job_prefix(self.cluster_name)
self.nomad.job.register_job(job_id, redis_deployment(job_id, self.datacenters, self.cluster_name))


# Wait for max 10 minutes
wait_count = 0
redis_ip = None
redis_port = None
check_name = '{}-redis'.format(nomad_job_prefix(self.cluster_name))
check_name = redis_check(self.cluster_name)

while redis_ip is None:
time.sleep(3)
wait_count += 3
Expand All @@ -167,17 +170,32 @@ def _start_redis(self, sleep_time=5):
self.logger.info('Redis is at {}:{}'.format(redis_ip, redis_port))
except NXDOMAIN as err:
self.logger.warning('DNS query failed: {}'.format(err))

self.redis_ip = redis_ip
self.redis_port = redis_port

def _start_mgmt(self, mgmt_image, num_replicas):
job_id = 'clipper-mgmt'
self.nomad.job.register_job(job_id, mgmt_deployment(job_id, self.datacenters, self.cluster_name, mgmt_image, self.redis_ip, self.redis_port, num_replicas))
job_id = mgmt_job_prefix(self.cluster_name),
self.nomad.job.register_job(
job_id,
mgmt_deployment(
job_id,
self.datacenters,
self.cluster_name,
mgmt_image,
self.redis_ip,
self.redis_port,
num_replicas
)
)

wait_count = 0

mgmt_ip = None
mgmt_port = None
check_name = '{}-mgmt'.format(nomad_job_prefix(self.cluster_name))

check_name = mgmt_check(self.cluster_name)

while mgmt_ip is None:
time.sleep(3)
wait_count += 3
Expand All @@ -195,7 +213,7 @@ def _start_mgmt(self, mgmt_image, num_replicas):
def _start_query(self, query_image, frontend_exporter_image, cache_size,
qf_http_thread_pool_size, qf_http_timeout_request,
qf_http_timeout_content, num_replicas):
job_id = 'clipper-query-frontend'
job_id = query_frontend_job_prefix(self.cluster_name)
self.nomad.job.register_job(
job_id,
query_frontend_deployment(
Expand All @@ -217,6 +235,7 @@ def _start_query(self, query_image, frontend_exporter_image, cache_size,
wait_count = 0
query_ip = None
query_port = None

while query_ip is None:
time.sleep(3)
wait_count += 3
Expand All @@ -232,9 +251,7 @@ def _start_query(self, query_image, frontend_exporter_image, cache_size,
self.query_port = query_port

def _resolve_query_ip(self):
check_name = 'check-service-query-frontend'
query_ip, query_port = self.dns.resolveSRV(check_name)
return (query_ip, query_port)
return self._resolve_query_frontend_service()

"""
This function queries the DNS server with a SRV request to get ip and port for the query frontend service (REST API)
Expand All @@ -259,9 +276,7 @@ def connect(self):
pass

def deploy_model(self, name, version, input_type, image, num_replicas=1):
check_name = model_check_name(self.cluster_name, name, version)
job_id = '{}-{}-{}'.format(model_job_prefix(self.cluster_name), name, version)

job_id = generate_model_job_name(self.cluster_name, name, version)

if self.load_balancer != None:
query_frontend_ip = self.load_balancer.ip
Expand Down Expand Up @@ -349,13 +364,12 @@ def stop_all(self, graceful=True):


def get_admin_addr(self):
check_name = '{}-mgmt'.format(nomad_job_prefix(self.cluster_name))
check_name = mgmt_check(self.cluster_name)
mgmt_ip, mgmt_port = self.dns.resolveSRV(check_name)
return '{}:{}'.format(mgmt_ip, mgmt_port)

def get_query_addr(self):

check_name = 'check-service-query-frontend'
check_name = query_frontend_service_check(self.cluster_name)
try:
query_ip, query_port= self.dns.resolveSRV(check_name)
self.query_ip = query_ip
Expand Down
6 changes: 3 additions & 3 deletions clipper_admin/clipper_admin/nomad/redis_deployment.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .utils import nomad_job_prefix
from .utils import nomad_job_prefix, redis_job_prefix, redis_check

""" Nomad payload to deploy Redis """
def redis_deployment(job_id, datacenters, cluster_name):
Expand All @@ -20,7 +20,7 @@ def redis_deployment(job_id, datacenters, cluster_name):
},
'Tasks': [
{
'Name': '{}-redis'.format(nomad_job_prefix(cluster_name)),
'Name': redis_job_prefix(cluster_name),
'Driver': 'docker',
'Config': {
'image': 'redis:alpine',
Expand All @@ -39,7 +39,7 @@ def redis_deployment(job_id, datacenters, cluster_name):
},
'Services': [
{
'Name': '{}-redis'.format(nomad_job_prefix(cluster_name)),
'Name': redis_check(cluster_name),
'Tags': ['global', 'cache'],
'PortLabel': 'db',
'Checks': [
Expand Down
29 changes: 29 additions & 0 deletions clipper_admin/clipper_admin/nomad/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@
def nomad_job_prefix(cluster_name):
return 'clipper-{}'.format(cluster_name)


"""
redis
"""
def redis_job_prefix(cluster_name):
return '{}-redis'.format(nomad_job_prefix(cluster_name))
def redis_check(cluster_name):
return '{}-db'.format(mgmt_check(cluster_name))

"""
query frontend
"""
Expand All @@ -11,3 +20,23 @@ def query_frontend_service_check(cluster_name):
return '{}-service'.format(query_frontend_job_prefix(cluster_name))
def query_frontend_rpc_check(cluster_name):
return '{}-rpc'.format(query_frontend_job_prefix(cluster_name))

"""
mgmt
"""
def mgmt_job_prefix(cluster_name):
return '{}-mgmt'.format(nomad_job_prefix(cluster_name))
def mgmt_check(cluster_name):
return '{}-http'.format(mgmt_check(cluster_name))

"""
model
"""
def model_job_prefix(cluster_name):
return '{}-model'.format(nomad_job_prefix(cluster_name))

def generate_model_job_name(cluster_name, model_name, model_version):
return '{}-{}-{}'.format(model_job_prefix(cluster_name), model_name, model_version)

def model_check_name(cluster_name, model_name, model_version):
return '{}-model-{}-{}'.format(nomad_job_prefix(cluster_name), model_name, model_version)

0 comments on commit 2a3b9de

Please sign in to comment.