diff --git a/examples/echo-app.yaml b/examples/echo-app.yaml new file mode 100644 index 0000000..d400440 --- /dev/null +++ b/examples/echo-app.yaml @@ -0,0 +1,29 @@ +apiVersion: v1 +kind: ReplicationController +metadata: + name: echo-rc +spec: + replicas: 1 + template: + metadata: + labels: + app: echo + spec: + containers: + - name: echo + image: gcr.io/google_containers/echoserver:1.4 + ports: + - containerPort: 8080 +--- +apiVersion: v1 +kind: Service +metadata: + name: echo-svc + labels: + app: echo +spec: + ports: + - port: 80 + targetPort: 8080 + protocol: TCP + name: echoheaders-x \ No newline at end of file diff --git a/examples/echo-ingress.yaml b/examples/echo-ingress.yaml new file mode 100644 index 0000000..9c43cf1 --- /dev/null +++ b/examples/echo-ingress.yaml @@ -0,0 +1,10 @@ +apiVersion: extensions/v1beta1 +kind: Ingress +metadata: + name: ingress-echo-svc + labels: + app: loadbalancer +spec: + backend: + serviceName: echo-svc + servicePort: 80 \ No newline at end of file diff --git a/loadbalancer/backend/backends/daemon/loadbalancer.go b/loadbalancer/backend/backends/daemon/loadbalancer.go index 655530b..d8b7e9f 100644 --- a/loadbalancer/backend/backends/daemon/loadbalancer.go +++ b/loadbalancer/backend/backends/daemon/loadbalancer.go @@ -28,6 +28,7 @@ import ( "github.com/hpcloud/kubernetes-service-loadbalancer/loadbalancer/controllers" "github.com/hpcloud/kubernetes-service-loadbalancer/loadbalancer/utils" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/client/unversioned" ) @@ -138,7 +139,7 @@ func (lbControl *LoadbalancerDaemonController) HandleConfigMapCreate(configMap * } //generate Virtual IP - bindIP, err := lbControl.ipManager.GenerateVirtualIP(configMap) + bindIP, err := lbControl.ipManager.GenerateVirtualIP(configMap, nil) if err != nil { err = fmt.Errorf("Error generating Virtual IP - %v", err) return err @@ -233,3 +234,50 @@ func (lbControl *LoadbalancerDaemonController) getDaemonConfigMap() *api.ConfigM } return cm } + +// HandleIngressCreate a new loadbalancer resource +func (lbControl *LoadbalancerDaemonController) HandleIngressCreate(ingress *extensions.Ingress) error { + // Block execution until the ip config map gets updated + configMapMutex.Lock() + defer configMapMutex.Unlock() + + name := ingress.Namespace + "-" + ingress.Name + glog.Infof("Adding group %v to daemon configmap", name) + + daemonCM := lbControl.getDaemonConfigMap() + daemonData := daemonCM.Data + + namespace := ingress.Namespace + serviceName := ingress.Spec.Backend.ServiceName + + serviceObj, err := lbControl.kubeClient.Services(namespace).Get(serviceName) + if err != nil { + err = fmt.Errorf("Error getting service object %v/%v. %v", namespace, serviceName, err) + return err + } + + //generate Virtual IP + bindIP, err := lbControl.ipManager.GenerateVirtualIP(nil, ingress) + if err != nil { + err = fmt.Errorf("Error generating Virtual IP - %v", err) + return err + } + + servicePorts := serviceObj.Spec.Ports + if len(servicePorts) == 0 { + err = fmt.Errorf("Could not find any port from service %v.", serviceName) + return err + } + daemonData[name+".namespace"] = namespace + daemonData[name+".bind-ip"] = bindIP + daemonData[name+".target-service-name"] = serviceName + daemonData[name+".target-ip"] = serviceObj.Spec.ClusterIP + port := servicePorts[0] + daemonData[name+".port"] = strconv.Itoa(int(port.Port)) + + _, err = lbControl.kubeClient.ConfigMaps(lbControl.namespace).Update(daemonCM) + if err != nil { + glog.Infof("Error updating daemon configmap %v: %v", daemonCM.Name, err) + } + return nil +} diff --git a/loadbalancer/backend/backends/f5/loadbalancer.go b/loadbalancer/backend/backends/f5/loadbalancer.go index cb7e9a1..a71eaba 100644 --- a/loadbalancer/backend/backends/f5/loadbalancer.go +++ b/loadbalancer/backend/backends/f5/loadbalancer.go @@ -30,6 +30,7 @@ import ( "github.com/hpcloud/kubernetes-service-loadbalancer/loadbalancer/utils" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/apis/extensions" ) const ( @@ -129,7 +130,7 @@ func (ctr *F5Controller) HandleConfigMapCreate(configMap *api.ConfigMap) error { } //generate Virtual IP - bindIP, err := ctr.ipManager.GenerateVirtualIP(configMap) + bindIP, err := ctr.ipManager.GenerateVirtualIP(configMap, nil) if err != nil { err = fmt.Errorf("Error generating Virtual IP - %v", err) return err @@ -452,3 +453,9 @@ func (ctr *F5Controller) deletePreviouslyCreatedF5Resources(portNameList []strin monitorName := utils.GetResourceName(monitorResource, name) ctr.deleteF5Resource(monitorName, monitorResource) } + + +// HandleIngressCreate creates a new F5 pool, nodes, monitor and virtual server to provide loadbalancing to the app defined in the ingress resource +func (ctr *F5Controller) HandleIngressCreate(ingress *extensions.Ingress) error { + return nil +} diff --git a/loadbalancer/backend/backends/openstack/lbaasv2/loadbalancer.go b/loadbalancer/backend/backends/openstack/lbaasv2/loadbalancer.go index d0369a9..892e73c 100644 --- a/loadbalancer/backend/backends/openstack/lbaasv2/loadbalancer.go +++ b/loadbalancer/backend/backends/openstack/lbaasv2/loadbalancer.go @@ -23,6 +23,8 @@ import ( "time" "github.com/golang/glog" + "github.com/hpcloud/kubernetes-service-loadbalancer/loadbalancer/backend" + "github.com/hpcloud/kubernetes-service-loadbalancer/loadbalancer/utils" "github.com/rackspace/gophercloud" openstack_lib "github.com/rackspace/gophercloud/openstack" "github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas_v2/listeners" @@ -31,9 +33,8 @@ import ( "github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas_v2/pools" "github.com/rackspace/gophercloud/openstack/networking/v2/subnets" "github.com/rackspace/gophercloud/pagination" - "github.com/hpcloud/kubernetes-service-loadbalancer/loadbalancer/backend" - "github.com/hpcloud/kubernetes-service-loadbalancer/loadbalancer/utils" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/util/wait" ) @@ -686,3 +687,8 @@ func (lbaas *LBaaSController) getReadyNodeIPs() ([]string, error) { } return nodeIPs, nil } + +// HandleIngressCreate creates a new lbaas loadbalancer resource +func (lbaas *LBaaSController) HandleIngressCreate(ingress *extensions.Ingress) error { + return nil +} diff --git a/loadbalancer/backend/factory.go b/loadbalancer/backend/factory.go index 124e990..2d85ed0 100644 --- a/loadbalancer/backend/factory.go +++ b/loadbalancer/backend/factory.go @@ -22,6 +22,7 @@ import ( "sync" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/client/unversioned" "github.com/golang/glog" @@ -36,6 +37,7 @@ type BackendController interface { HandleNodeCreate(node *api.Node) HandleNodeDelete(node *api.Node) HandleNodeUpdate(oldNode *api.Node, curNode *api.Node) + HandleIngressCreate(ingress *extensions.Ingress) error } // BackendControllerFactory Factory for Backend controllers diff --git a/loadbalancer/controllers/controller.go b/loadbalancer/controllers/controller.go index 445179d..63923cd 100644 --- a/loadbalancer/controllers/controller.go +++ b/loadbalancer/controllers/controller.go @@ -18,6 +18,7 @@ package controllers import ( "fmt" + "reflect" "strings" "time" @@ -25,6 +26,7 @@ import ( "github.com/hpcloud/kubernetes-service-loadbalancer/loadbalancer/backend" "github.com/hpcloud/kubernetes-service-loadbalancer/loadbalancer/utils" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/client/cache" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/controller/framework" @@ -34,15 +36,18 @@ import ( "k8s.io/kubernetes/pkg/watch" ) -// LoadBalancerController watches Kubernetes API for ConfigMap and node changes +// LoadBalancerController watches Kubernetes API for ConfigMap, Ingress and node changes // and reconfigures backend when needed type LoadBalancerController struct { client *client.Client configMapController *framework.Controller configMapLister StoreToConfigMapLister + ingController *framework.Controller + ingLister StoreToIngressLister nodeController *framework.Controller nodeLister cache.StoreToNodeLister configMapQueue *taskQueue + ingQueue *taskQueue stopCh chan struct{} backendController backend.BackendController } @@ -52,6 +57,11 @@ type StoreToConfigMapLister struct { cache.Store } +// StoreToIngressLister makes a Store that lists Ingress. +type StoreToIngressLister struct { + cache.Store +} + var keyFunc = framework.DeletionHandlingMetaNamespaceKeyFunc // NewLoadBalancerController creates a controller @@ -62,6 +72,7 @@ func NewLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Dura backendController: controller, } lbController.configMapQueue = NewTaskQueue(lbController.syncConfigMap) + lbController.ingQueue = NewTaskQueue(lbController.syncIngressStatus) configMapHandlers := framework.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -136,17 +147,54 @@ func NewLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Dura }, &api.Node{}, 0, nodeHandlers) + ingEventHandler := framework.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + lbController.ingQueue.enqueue(obj) + }, + DeleteFunc: func(obj interface{}) { + lbController.ingQueue.enqueue(obj) + }, + UpdateFunc: func(old, cur interface{}) { + if !reflect.DeepEqual(old, cur) { + lbController.ingQueue.enqueue(cur) + } + }, + } + + lbController.ingLister.Store, lbController.ingController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: ingressListFunc(kubeClient, namespace), + WatchFunc: ingressWatchFunc(kubeClient, namespace), + }, + &extensions.Ingress{}, resyncPeriod, ingEventHandler) + return &lbController, nil } -// Run starts the configmap controller +func ingressListFunc(c *client.Client, ns string) func(api.ListOptions) (runtime.Object, error) { + return func(opts api.ListOptions) (runtime.Object, error) { + return c.Extensions().Ingress(ns).List(opts) + } +} + +func ingressWatchFunc(c *client.Client, ns string) func(options api.ListOptions) (watch.Interface, error) { + return func(options api.ListOptions) (watch.Interface, error) { + return c.Extensions().Ingress(ns).Watch(options) + } +} + +// Run starts the configmap and ingress controller func (lbController *LoadBalancerController) Run() { go lbController.nodeController.Run(lbController.stopCh) // Sleep for 3 seconds to give some times for service and node lister to be synced time.Sleep(time.Second * 3) go lbController.configMapController.Run(lbController.stopCh) + go lbController.ingController.Run(lbController.stopCh) + go lbController.configMapQueue.run(time.Second, lbController.stopCh) + go lbController.ingQueue.run(time.Second, lbController.stopCh) + <-lbController.stopCh } @@ -217,3 +265,25 @@ func (lbController *LoadBalancerController) updateConfigMapStatusBindIP(errMessa glog.Errorf("Error updating ConfigMap Status : %v", err) } } + +func (lbController *LoadBalancerController) syncIngressStatus(key string) { + glog.Infof("Syncing Ingress Status %v", key) + + obj, ingExists, err := lbController.ingLister.Store.GetByKey(key) + if err != nil { + lbController.ingQueue.requeue(key, err) + return + } + + if !ingExists { + return + } + go func() { + ing := obj.(*extensions.Ingress) + err := lbController.backendController.HandleIngressCreate(ing) + if err != nil { + glog.Errorf("Error creating loadbalancer: %v", err) + return + } + }() +} diff --git a/loadbalancer/controllers/ipmanager.go b/loadbalancer/controllers/ipmanager.go index 23b2c57..db89061 100644 --- a/loadbalancer/controllers/ipmanager.go +++ b/loadbalancer/controllers/ipmanager.go @@ -26,6 +26,7 @@ import ( "github.com/golang/glog" "github.com/hpcloud/kubernetes-service-loadbalancer/loadbalancer/utils" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/client/unversioned" ) @@ -130,15 +131,20 @@ func (ipManager *IPManager) checkConfigMap(cmName string) (bool, string) { } // GenerateVirtualIP gets a VIP for the configmap passed and allocates to be used for loadbalancer -func (ipManager *IPManager) GenerateVirtualIP(configMap *api.ConfigMap) (string, error) { +func (ipManager *IPManager) GenerateVirtualIP(configMap *api.ConfigMap, ingress *extensions.Ingress) (string, error) { // Block execution until the ip config map gets updated with the new virtual IP ipConfigMutex.Lock() defer ipConfigMutex.Unlock() + resourceName := "" + if configMap == nil { + resourceName = ingress.Namespace + "-" + ingress.Name + } else { + resourceName = configMap.Namespace + "-" + configMap.Name + } //check if the user configmap entry already exists in ip configmap - cmName := configMap.Namespace + "-" + configMap.Name - if ok, vip := ipManager.checkConfigMap(cmName); ok { + if ok, vip := ipManager.checkConfigMap(resourceName); ok { return vip, nil } @@ -150,8 +156,8 @@ func (ipManager *IPManager) GenerateVirtualIP(configMap *api.ConfigMap) (string, //update ipConfigMap to add new configMap entry ipConfigMap := ipManager.getIPConfigMap() ipConfigMapData := ipConfigMap.Data - name := configMap.Namespace + "-" + configMap.Name - ipConfigMapData[virtualIP] = name + // name := configMap.Namespace + "-" + configMap.Name + ipConfigMapData[virtualIP] = resourceName err = ipManager.updateIPConfigMap(ipConfigMap) if err != nil {