Skip to content

Commit

Permalink
add load balancer support (Fabio)
Browse files Browse the repository at this point in the history
  • Loading branch information
asauray committed Oct 17, 2019
1 parent 525f066 commit 75bbd22
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 14 deletions.
22 changes: 22 additions & 0 deletions clipper_admin/clipper_admin/nomad/fabio_load_balancer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import abc
from abc import abstractmethod
from load_balancer import LoadBalancer

"""
"""
class FabioLoadBalancer(LoadBalancer):

"""
Parameters
----------
address: str
The address at which the load balancer is located. For instance fabio.service.consul
port: str
The port on which the TCP proxy listens, this is not the http port on which fabio proxy http requests !
https://fabiolb.net/feature/tcp-proxy/
"""
def __init__(self, address, port):
self.address = address
self.port = port
12 changes: 12 additions & 0 deletions clipper_admin/clipper_admin/nomad/load_balancer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import abc
from abc import abstractmethod

class LoadBalancer(abc.ABC):

@abstractmethod
def tcp(self, address):
pass
@abstractmethod
def http(self, address):
pass

33 changes: 23 additions & 10 deletions clipper_admin/clipper_admin/nomad/model_deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,20 @@ def model_check_name(cluster_name, name, version):
return '{}-model-{}-{}'.format(nomad_job_prefix(cluster_name), name, version)

""" Nomad payload to deploy a new model """
def model_deployment(job_id, datacenters, cluster_name, name, version, input_type, image, num_replicas):
def model_deployment(
job_id,
datacenters,
cluster_name,
model_name,
model_version,
input_type,
image,
num_replicas,
query_frontend_ip,
query_frontend_port,
health_check_interval=3000000000,
health_check_timeout=2000000000
):
job = {
'Job': {
'ID': job_id,
Expand All @@ -23,13 +36,13 @@ def model_deployment(job_id, datacenters, cluster_name, name, version, input_typ
'Count': num_replicas,
'Tasks': [
{
'Name': generate_model_job_name(cluster_name, name, version),
'Name': generate_model_job_name(cluster_name, model_name, model_version),
'Driver': 'docker',
'Env': {
'CLIPPER_MODEL_NAME': name,
'CLIPPER_MODEL_VERSION': version,
'CLIPPER_IP': 'fabio.service.consul',
'CLIPPER_PORT': '7000',
'CLIPPER_MODEL_NAME': model_name,
'CLIPPER_MODEL_VERSION': model_version,
'CLIPPER_IP': query_frontend_ip,
'CLIPPER_PORT': query_frontend_port,
'CLIPPER_INPUT_TYPE': input_type
},
'Config': {
Expand All @@ -52,15 +65,15 @@ def model_deployment(job_id, datacenters, cluster_name, name, version, input_typ
},
'Services': [
{
'Name': model_check_name(cluster_name, name, version),
'Tags': ['machine-learning', 'model', 'clipper', name],
'Name': model_check_name(cluster_name, model_name, model_version),
'Tags': ['machine-learning', 'model', 'clipper', model_name],
'PortLabel': 'zeromq',
'Checks': [
{
'Name': 'alive',
'Type': 'tcp',
'interval': 3000000000,
'timeout': 2000000000
'interval': health_check_interval,
'timeout': health_check_timeout
}
]
}
Expand Down
51 changes: 47 additions & 4 deletions clipper_admin/clipper_admin/nomad/nomad_container_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
from .model_deployment import model_deployment, generate_model_job_name, model_job_prefix, model_check_name
from .mgmt_deployment import mgmt_deployment
from .query_frontend_deployment import query_frontend_deployment
from .utils import nomad_job_prefix

from .utils import nomad_job_prefix, query_frontend_job_prefix, query_frontend_service_check, query_frontend_rpc_check

from dns.resolver import NXDOMAIN

Expand Down Expand Up @@ -46,6 +47,7 @@ class NomadContainerManager(ContainerManager):
def __init__(self,
nomad_ip,
dns,
load_balancer=None,
cluster_name="default-cluster",
datacenters=["dc1"],
redis_ip=None,
Expand All @@ -61,7 +63,8 @@ def __init__(self,
The ip of Nomad
dns: DNS
The DNS service that you used with Nomad. Consul is the most popular option.
load_balancer: str
The Load Balancer used with Nomad. If you dont have one or dont want to use it, leave it None
cluster_name: str
A unique name for this Clipper cluster. This can be used to run multiple Clipper
clusters on the same Kubernetes cluster without interfering with each other.
Expand Down Expand Up @@ -98,6 +101,7 @@ def __init__(self,
self.cluster_name = cluster_name

self.dns = dns
self.load_balancer = load_balancer
self.datacenters = datacenters

self.redis_ip = redis_ip
Expand All @@ -108,7 +112,6 @@ def __init__(self,

# connect to nomad cluster
self.nomad = nomad.Nomad(host=nomad_ip, timeout=5)
#namespaces = self.nomad.namespaces.get_namespaces()
namespaces = []

if namespace in namespaces:
Expand Down Expand Up @@ -233,6 +236,22 @@ def _resolve_query_ip(self):
query_ip, query_port = self.dns.resolveSRV(check_name)
return (query_ip, query_port)

"""
This function queries the DNS server with a SRV request to get ip and port for the query frontend service (REST API)
"""
def _resolve_query_frontend_service(self):
check_name = query_frontend_service_check(self.cluster_name)
query_ip, query_port = self.dns.resolveSRV(check_name)
return (query_ip, query_port)

"""
This function queries the DNS server with a SRV request to get ip and port for the query frontend rpc
"""
def _resolve_query_frontend_rpc(self):
check_name = query_frontend_rpc_check(self.cluster_name)
query_ip, query_port = self.dns.resolveSRV(check_name)
return (query_ip, query_port)

def _start_prometheus(self):
pass

Expand All @@ -242,9 +261,33 @@ def connect(self):
def deploy_model(self, name, version, input_type, image, num_replicas=1):
check_name = model_check_name(self.cluster_name, name, version)
job_id = '{}-{}-{}'.format(model_job_prefix(self.cluster_name), name, version)


if self.load_balancer != None:
query_frontend_ip = self.load_balancer.ip
query_frontend_port = self.load_balancer.port
else:
self.logger.warning('''
You did not set a load balancer, this is potentially dangerous because ip and ports may change over time
and not be updated on the model sides, prefer using a load balancer like Fabio
''')
query_frontend_ip, query_frontend_port = self._resolve_query_frontend_rpc()

self.nomad.job.register_job(
job_id,
model_deployment(job_id, self.datacenters, self.cluster_name, name, version, input_type, image, num_replicas)
model_deployment(
job_id,
self.datacenters,
self.cluster_name,
name,
version,
input_type,
image,
num_replicas,
query_frontend_ip,
query_frontend_port

)
)

# Wait for max 10 minutes
Expand Down

0 comments on commit 75bbd22

Please sign in to comment.