From 0fd75432756cd312dd905308018c38eb264bea80 Mon Sep 17 00:00:00 2001 From: Hemant Kumar Shaw Date: Fri, 28 May 2021 13:24:15 +0530 Subject: [PATCH 1/3] Retry support for ingestion layer Fixes AV-113235 Currently, if AMKO is unable to reach the controller because of a network issue while validating the references in a `GSLBHostRule`, it doesn't retry or reconcile even after the network issue is fixed. A reconcilation may also be required if the user provides a non-existent reference, but the user creates the reference later. This PR introduces a retry queue for the ingestion layer. Currently, it will only be used for `GSLBHostRule` objects. It functions like the slow retry queue, but at the end of the tiner expiry, it makes a decision to call add/update the `GSLBHostRule` based on the key that's pushed. Decision to retry: A retry decision is made for all cases for an object reference check, except for non-federated objects. --- gslb/gslbutils/constants.go | 10 +- gslb/gslbutils/errors.go | 73 +++++++++++ gslb/gslbutils/gslbutils.go | 22 +++- gslb/ingestion/fullsync.go | 4 + gslb/ingestion/gdp_controller.go | 10 +- gslb/ingestion/gslb.go | 8 +- gslb/ingestion/gslb_host_rule.go | 74 ++++++----- gslb/ingestion/ingestion_retry.go | 204 ++++++++++++++++++++++++++++++ 8 files changed, 356 insertions(+), 49 deletions(-) create mode 100644 gslb/gslbutils/errors.go create mode 100644 gslb/ingestion/ingestion_retry.go diff --git a/gslb/gslbutils/constants.go b/gslb/gslbutils/constants.go index bee87cf99..1b2736723 100644 --- a/gslb/gslbutils/constants.go +++ b/gslb/gslbutils/constants.go @@ -40,6 +40,7 @@ const ( PassthroughRoute = "passthrough" ThirdPartyMemberType = "ThirdPartyMember" HostRuleType = "HostRule" + GslbHostRuleType = "GSLBHostRule" // Refresh cycle for AVI cache in seconds DefaultRefreshInterval = 600 @@ -54,10 +55,11 @@ const ( GSFQDNKeyLen = 3 // Default values for Retry Operations - SlowSyncTime = 120 - SlowRetryQueue = "SlowRetry" - FastRetryQueue = "FastRetry" - DefaultRetryCount = 5 + SlowSyncTime = 120 + SlowRetryQueue = "SlowRetry" + FastRetryQueue = "FastRetry" + IngestionRetryQueue = "IngestionRetry" + DefaultRetryCount = 5 // Identify objects created by AMKO AmkoUser = "amko-gslb" diff --git a/gslb/gslbutils/errors.go b/gslb/gslbutils/errors.go new file mode 100644 index 000000000..9ea431636 --- /dev/null +++ b/gslb/gslbutils/errors.go @@ -0,0 +1,73 @@ +/* + * Copyright 2021 VMware, Inc. + * All Rights Reserved. +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* http://www.apache.org/licenses/LICENSE-2.0 +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package gslbutils + +// Error codes +const ( + ObjectErrStatus = 1 + ControllerErrStatus = 2 + ResponseParseStatus = 3 + FederatedErrStatus = 4 +) + +type ControllerValidationError struct { + errCode int + msg string +} + +func (vErr ControllerValidationError) Error() string { + if vErr.errCode < 5 && vErr.errCode > 0 { + return vErr.msg + } + return "unknown status code" +} + +func GetIngestionErrorForObjectNotFound(errMsg string) error { + return ControllerValidationError{errCode: ObjectErrStatus, msg: errMsg} +} + +func GetIngestionErrorForController(errMsg string) error { + return ControllerValidationError{errCode: ControllerErrStatus, msg: errMsg} +} + +func GetIngestionErrorForParsing(errMsg string) error { + return ControllerValidationError{errCode: ResponseParseStatus, msg: errMsg} +} + +func GetIngestionErrorForObjectNotFederated(errMsg string) error { + return ControllerValidationError{errCode: FederatedErrStatus, msg: errMsg} +} + +// IsControllerError returns true only if there was an issue in communicating with the controller. +func IsControllerError(err error) bool { + vErr, ok := err.(ControllerValidationError) + if !ok || vErr.errCode != ControllerErrStatus { + return false + } + return true +} + +// IsRetriableOnError returns true only if a retry is required +func IsRetriableOnError(err error) bool { + // For errors other than object not federated, we will retry for everything else + vErr, ok := err.(ControllerValidationError) + if !ok { + return false + } + if vErr.errCode == FederatedErrStatus { + return false + } + return true +} diff --git a/gslb/gslbutils/gslbutils.go b/gslb/gslbutils/gslbutils.go index b9e8cb180..8e0932f4a 100644 --- a/gslb/gslbutils/gslbutils.go +++ b/gslb/gslbutils/gslbutils.go @@ -107,6 +107,14 @@ func ExtractMultiClusterKey(key string) (string, string, string, string, string) return operation, objType, cluster, ns, name } +func ExtractIngestionRetryQueueKey(key string) (string, string, string, string, error) { + segments := strings.Split(key, "/") + if len(segments) != 4 { + return "", "", "", "", fmt.Errorf("unexpected segment length for key %s", key) + } + return segments[0], segments[1], segments[2], segments[3], nil +} + func GetObjectTypeFromKey(key string) (string, error) { segments := strings.Split(key, "/") if len(segments) < 2 { @@ -436,10 +444,11 @@ var waitGroupMap map[string]*sync.WaitGroup var wgSyncOnce sync.Once const ( - WGIngestion = "ingestion" - WGFastRetry = "fastretry" - WGSlowRetry = "slowretry" - WGGraph = "graph" + WGIngestion = "ingestion" + WGFastRetry = "fastretry" + WGSlowRetry = "slowretry" + WGGraph = "graph" + WGIngestionRetry = "ingestionretry" ) func SetWaitGroupMap() { @@ -449,6 +458,7 @@ func SetWaitGroupMap() { waitGroupMap[WGFastRetry] = &sync.WaitGroup{} waitGroupMap[WGGraph] = &sync.WaitGroup{} waitGroupMap[WGSlowRetry] = &sync.WaitGroup{} + waitGroupMap[WGIngestionRetry] = &sync.WaitGroup{} }) } @@ -625,12 +635,12 @@ func GetUriFromAvi(uri string, aviClient *clients.AviClient, infiniteRetry bool) } // For 404, return if aviError.HttpStatusCode == 404 { - return nil, fmt.Errorf("%s", *aviError.Message) + return nil, GetIngestionErrorForObjectNotFound(fmt.Sprintf("object not found for uri: %s", uri)) } // All other errors, retry Errf("uri: %s, aviErr: %v, will retry %d", uri, aviError, i) if !infiniteRetry && i == 2 { - return nil, err + return nil, GetIngestionErrorForController(err.Error()) } time.Sleep(RestSleepTime) } diff --git a/gslb/ingestion/fullsync.go b/gslb/ingestion/fullsync.go index 421369234..de473c358 100644 --- a/gslb/ingestion/fullsync.go +++ b/gslb/ingestion/fullsync.go @@ -145,6 +145,10 @@ func checkGslbHostRulesAndInitialize() error { if err != nil { updateGSLBHR(&gslbHr, err.Error(), GslbHostRuleRejected) gslbutils.Errf("Error in accepting GSLB Host Rule %s : %v", gsFqdn, err) + if gslbutils.IsRetriableOnError(err) { + updateIngestionRetryAddCache(&gslbHr) + publishKeyToIngestionRetry(gslbutils.ObjectAdd, gslbutils.GslbHostRuleType, gslbHr.Namespace, gslbHr.Name) + } continue } gsFqdnHostRule := gsHostRulesList.GetGSHostRulesForFQDN(gsFqdn) diff --git a/gslb/ingestion/gdp_controller.go b/gslb/ingestion/gdp_controller.go index 22c3f4312..787ccee00 100644 --- a/gslb/ingestion/gdp_controller.go +++ b/gslb/ingestion/gdp_controller.go @@ -324,8 +324,9 @@ func GDPSanityChecks(gdp *gdpalphav2.GlobalDeploymentPolicy, fullSync bool) erro // Health monotor validity if len(gdp.Spec.HealthMonitorRefs) != 0 { for _, hmRef := range gdp.Spec.HealthMonitorRefs { - if !isHealthMonitorRefValid(hmRef, true, fullSync) { - return fmt.Errorf("health monitor ref %s is invalid", hmRef) + err := isHealthMonitorRefValid(hmRef, true, fullSync) + if err != nil { + return fmt.Errorf("health monitor ref %s is invalid: %s", hmRef, err.Error()) } } } @@ -339,8 +340,9 @@ func GDPSanityChecks(gdp *gdpalphav2.GlobalDeploymentPolicy, fullSync bool) erro if gdp.Spec.SitePersistenceRef != nil && *gdp.Spec.SitePersistenceRef == "" { return fmt.Errorf("empty string as site persistence reference not supported") } else if gdp.Spec.SitePersistenceRef != nil { - if !isSitePersistenceProfilePresent(*gdp.Spec.SitePersistenceRef, true, fullSync) { - return fmt.Errorf("site persistence ref %s not present", *gdp.Spec.SitePersistenceRef) + err := isSitePersistenceProfilePresent(*gdp.Spec.SitePersistenceRef, true, fullSync) + if err != nil { + return fmt.Errorf("site persistence ref %s is invalid: %s", *gdp.Spec.SitePersistenceRef, err.Error()) } } return nil diff --git a/gslb/ingestion/gslb.go b/gslb/ingestion/gslb.go index 94025c002..58dbac59a 100644 --- a/gslb/ingestion/gslb.go +++ b/gslb/ingestion/gslb.go @@ -680,8 +680,14 @@ func Initialize() { graphQueueParams := utils.WorkerQueue{NumWorkers: gslbutils.NumRestWorkers, WorkqueueName: utils.GraphLayer} slowRetryQParams := utils.WorkerQueue{NumWorkers: 1, WorkqueueName: gslbutils.SlowRetryQueue, SlowSyncTime: gslbutils.SlowSyncTime} fastRetryQParams := utils.WorkerQueue{NumWorkers: 1, WorkqueueName: gslbutils.FastRetryQueue} + ingestionRetryQParams := utils.WorkerQueue{NumWorkers: 1, WorkqueueName: gslbutils.IngestionRetryQueue, SlowSyncTime: gslbutils.SlowSyncTime} - utils.SharedWorkQueue(&ingestionQueueParams, &graphQueueParams, &slowRetryQParams, &fastRetryQParams) + utils.SharedWorkQueue(&ingestionQueueParams, &graphQueueParams, &slowRetryQParams, &fastRetryQParams, &ingestionRetryQParams) + + // Set workers for ingestion queue retry workers + ingestionRetryQueue := utils.SharedWorkQueue().GetQueueByName(gslbutils.IngestionRetryQueue) + ingestionRetryQueue.SyncFunc = IngestionRetryAddUpdate + ingestionRetryQueue.Run(stopCh, gslbutils.GetWaitGroupFromMap(gslbutils.WGIngestionRetry)) // Set workers for layer 3 (REST layer) graphSharedQueue := utils.SharedWorkQueue().GetQueueByName(utils.GraphLayer) diff --git a/gslb/ingestion/gslb_host_rule.go b/gslb/ingestion/gslb_host_rule.go index cacf97e0c..94d05d885 100644 --- a/gslb/ingestion/gslb_host_rule.go +++ b/gslb/ingestion/gslb_host_rule.go @@ -81,10 +81,10 @@ func isSitePersistenceRefPresentInCache(spName string) bool { return present } -func isSitePersistenceProfilePresent(profileName string, gdp bool, fullSync bool) bool { +func isSitePersistenceProfilePresent(profileName string, gdp bool, fullSync bool) error { if fullSync && isSitePersistenceRefPresentInCache(profileName) { gslbutils.Debugf("site persistence ref %s present in site persistence cache", profileName) - return true + return nil } // Check if the profile mentioned in gslbHostRule are present as application persistence profile on the gslb leader aviClient := avictrl.SharedAviClients().AviClient[0] @@ -94,14 +94,13 @@ func isSitePersistenceProfilePresent(profileName string, gdp bool, fullSync bool result, err := gslbutils.GetUriFromAvi(uri, aviClient, gdp) if err != nil { gslbutils.Errf("Error getting uri %s from Avi : %s", uri, err) - return false + return err } if result.Count == 0 { - gslbutils.Errf("Site Persistent Profile %s does not exist", profileName) - return false + return fmt.Errorf("SitePersistence profile %s doesn't exist", profileName) } - return true + return nil } func isFallbackAlgorithmValid(fa *gslbhralphav1.GeoFallback) (bool, error) { @@ -177,10 +176,10 @@ func isHealthMonitorRefPresentInCache(hmName string) bool { // If not, return false. // For non-full sync cases, the hm ref will be fetched from the GSLB leader and verified. The HM // cache won't be checked for such cases. -func isHealthMonitorRefValid(refName string, gdp bool, fullSync bool) bool { +func isHealthMonitorRefValid(refName string, gdp bool, fullSync bool) error { if fullSync && isHealthMonitorRefPresentInCache(refName) { gslbutils.Debugf("health monitor %s present in hm cache", refName) - return true + return nil } aviClient := avictrl.SharedAviClients().AviClient[0] uri := "api/healthmonitor?name=" + refName @@ -188,62 +187,59 @@ func isHealthMonitorRefValid(refName string, gdp bool, fullSync bool) bool { result, err := gslbutils.GetUriFromAvi(uri, aviClient, gdp) if err != nil { gslbutils.Errf("Error in getting uri %s from Avi: %v", uri, err) - return false + return err } if result.Count == 0 { - gslbutils.Errf("Health Monitor %s does not exist", refName) - return false + return gslbutils.GetIngestionErrorForObjectNotFound("healthmonitor " + refName + " doesn't exist") } gslbutils.Logf("health monitor %s fetched from controller", refName) elems := make([]json.RawMessage, result.Count) err = json.Unmarshal(result.Results, &elems) if err != nil { - gslbutils.Errf("failed to unmarshal health monitor data for ref %s: %v", refName, err) - return false + return gslbutils.GetIngestionErrorForParsing(fmt.Sprint("failed to unmarshal health monitor data for ref %s: %v", refName, err)) } hm := models.HealthMonitor{} err = json.Unmarshal(elems[0], &hm) if err != nil { - gslbutils.Errf("failed to unmarshal the first health monitor element: %v", err) - return false + return gslbutils.GetIngestionErrorForParsing(fmt.Sprintf("failed to unmarshal the first health monitor element: %v", err)) } if hm.IsFederated != nil && *hm.IsFederated { - return true + return nil } else { - gslbutils.Errf("health monitor ref %s is not federated, can't add", refName) + errStr := fmt.Sprintf("health monitor ref %s is not federated, can't add", refName) + return gslbutils.GetIngestionErrorForObjectNotFederated(errStr) } - return false } -func isThirdPartyMemberSitePresent(gslbhr *gslbhralphav1.GSLBHostRule, siteName string) bool { +func isThirdPartyMemberSitePresent(gslbhr *gslbhralphav1.GSLBHostRule, siteName string) error { // Verify the presence of the third party member sites on the gslb leader aviClient := avictrl.SharedAviClients().AviClient[0] uri := "api/gslb" result, err := gslbutils.GetUriFromAvi(uri, aviClient, false) if err != nil { gslbutils.Errf("Error in getting uri %s from Avi: %v", uri, err) - return false + return err } elems := make([]json.RawMessage, result.Count) err = json.Unmarshal(result.Results, &elems) if err != nil { - gslbutils.Errf("Failed to unmarshal GSLB data, err: %v", err) + return gslbutils.GetIngestionErrorForParsing(fmt.Sprintf("failed to unmarshal GSLB data, err: %v", err)) } for _, elem := range elems { gslb := models.Gslb{} err = json.Unmarshal(elem, &gslb) if err != nil { gslbutils.Errf("Failed to unmarshal GSLB data, err: %v", err) + continue } tpms := gslb.ThirdPartySites for _, tpm := range tpms { if *tpm.Name == siteName { - return true + return nil } } } - gslbutils.Errf("Third Party Member Site %s does not exist", siteName) - return false + return gslbutils.GetIngestionErrorForObjectNotFound(fmt.Sprintf("third party member site %s doesn't exist", siteName)) } func ValidateGSLBHostRule(gslbhr *gslbhralphav1.GSLBHostRule, fullSync bool) error { @@ -265,9 +261,11 @@ func ValidateGSLBHostRule(gslbhr *gslbhralphav1.GSLBHostRule, fullSync bool) err sitePersistence := gslbhrSpec.SitePersistence if sitePersistence != nil { sitePersistenceProfileName := sitePersistence.ProfileRef - if sitePersistence.Enabled && !isSitePersistenceProfilePresent(sitePersistenceProfileName, false, fullSync) { - errmsg = "SitePersistence Profile " + sitePersistenceProfileName + " error for " + gslbhrName + " GSLBHostRule" - return fmt.Errorf(errmsg) + if sitePersistence.Enabled { + err := isSitePersistenceProfilePresent(sitePersistenceProfileName, false, fullSync) + if err != nil { + return err + } } } @@ -283,17 +281,17 @@ func ValidateGSLBHostRule(gslbhr *gslbhralphav1.GSLBHostRule, fullSync bool) err errmsg := "Invalid VIP for thirdPartyMember site " + tpmember.Site + "," + gslbhrName + " GSLBHostRule (expecting IP address)" return fmt.Errorf(errmsg) } - if !isThirdPartyMemberSitePresent(gslbhr, tpmember.Site) { - errmsg = "ThirdPartyMember site " + tpmember.Site + " does not exist for " + gslbhrName + " GSLBHostRule" - return fmt.Errorf(errmsg) + err := isThirdPartyMemberSitePresent(gslbhr, tpmember.Site) + if err != nil { + return err } } healthMonitorRefs := gslbhrSpec.HealthMonitorRefs for _, ref := range healthMonitorRefs { - if !isHealthMonitorRefValid(ref, false, fullSync) { - errmsg = "Health Monitor Ref " + ref + " error for " + gslbhrName + " GSLBHostRule" - return fmt.Errorf(errmsg) + err := isHealthMonitorRefValid(ref, false, fullSync) + if err != nil { + return err } } return nil @@ -312,6 +310,10 @@ func AddGSLBHostRuleObj(obj interface{}, k8swq []workqueue.RateLimitingInterface if err != nil { updateGSLBHR(gslbhr, err.Error(), GslbHostRuleRejected) gslbutils.Errf("Error in accepting GSLB Host Rule %s : %s", gsFqdn, err.Error()) + if gslbutils.IsRetriableOnError(err) { + updateIngestionRetryAddCache(gslbhr) + publishKeyToIngestionRetry(gslbutils.ObjectAdd, gslbutils.GslbHostRuleType, gslbhr.Namespace, gslbhr.Name) + } return } @@ -386,9 +388,13 @@ func UpdateGSLBHostRuleObj(old, new interface{}, k8swq []workqueue.RateLimitingI gslbutils.Errf("Error in accepting GSLB Host Rule %s : %s", newGslbhr.ObjectMeta.Name, err.Error()) // check if previous GSLB host rule was accepted, if yes, we need to add a delete key if this new // object is rejected - if oldGslbhr.Status.Status == GslbHostRuleAccepted { + if !gslbutils.IsControllerError(err) && (oldGslbhr.Status.Status == GslbHostRuleAccepted) { deleteObjsForGSHostRule(oldGslbhr, k8swq, numWorkers) } + if gslbutils.IsRetriableOnError(err) { + updateIngestionRetryUpdateCache(oldGslbhr, newGslbhr) + publishKeyToIngestionRetry(gslbutils.ObjectUpdate, gslbutils.GslbHostRuleType, newGslbhr.Namespace, newGslbhr.Name) + } return } gsHostRulesList := gslbutils.GetGSHostRulesList() diff --git a/gslb/ingestion/ingestion_retry.go b/gslb/ingestion/ingestion_retry.go new file mode 100644 index 000000000..c473032e4 --- /dev/null +++ b/gslb/ingestion/ingestion_retry.go @@ -0,0 +1,204 @@ +/* + * Copyright 2021 VMware, Inc. + * All Rights Reserved. +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* http://www.apache.org/licenses/LICENSE-2.0 +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package ingestion + +import ( + "context" + "fmt" + "sync" + + "github.com/vmware/global-load-balancing-services-for-kubernetes/gslb/gslbutils" + gslbhralphav1 "github.com/vmware/global-load-balancing-services-for-kubernetes/internal/apis/amko/v1alpha1" + "github.com/vmware/load-balancer-and-ingress-services-for-kubernetes/pkg/utils" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +type retryUpdateValues struct { + old *gslbhralphav1.GSLBHostRule + new *gslbhralphav1.GSLBHostRule +} + +type retryUpdateCache struct { + cache map[string]retryUpdateValues + lock sync.RWMutex +} + +var retryUpdateMap *retryUpdateCache +var retryUpdateOnce sync.Once + +func getRetryUpdateCache() *retryUpdateCache { + retryUpdateOnce.Do(func() { + retryUpdateMap = &retryUpdateCache{ + cache: map[string]retryUpdateValues{}, + } + }) + return retryUpdateMap +} + +func (c *retryUpdateCache) writeValueFor(ns, name string, old, new *gslbhralphav1.GSLBHostRule) { + c.lock.Lock() + defer c.lock.Unlock() + + key := ns + "/" + name + val, ok := c.cache[ns+"/"+name] + if !ok { + // value doesn't exist, just write and return + c.cache[key] = retryUpdateValues{ + old: old, + new: new, + } + return + } + // value exists, just update the new value + c.cache[key] = retryUpdateValues{ + old: val.old, + new: new, + } +} + +func (c *retryUpdateCache) readAndDeleteKeyFor(ns, name string) (retryUpdateValues, error) { + c.lock.Lock() + defer c.lock.Unlock() + + val, ok := c.cache[ns+"/"+name] + if !ok { + return retryUpdateValues{}, fmt.Errorf("value not present for namespace %s and name %s", ns, name) + } + delete(c.cache, ns+"/"+name) + return val, nil +} + +type retryAddCache struct { + cache map[string]*gslbhralphav1.GSLBHostRule + lock sync.RWMutex +} + +var retryAddMap *retryAddCache +var retryAddOnce sync.Once + +func getRetryAddCache() *retryAddCache { + retryAddOnce.Do(func() { + retryAddMap = &retryAddCache{ + cache: map[string]*gslbhralphav1.GSLBHostRule{}, + } + }) + return retryAddMap +} + +func (c *retryAddCache) writeValueFor(ns, name string, obj *gslbhralphav1.GSLBHostRule) { + c.lock.Lock() + defer c.lock.Unlock() + + c.cache[ns+"/"+name] = obj +} + +func (c *retryAddCache) readAndDeleteKeyFor(ns, name string) (*gslbhralphav1.GSLBHostRule, error) { + c.lock.Lock() + defer c.lock.Unlock() + + val, ok := c.cache[ns+"/"+name] + if !ok { + return nil, fmt.Errorf("value not present for namespace %s and name %s", ns, name) + } + delete(c.cache, ns+"/"+name) + return val, nil +} + +func publishKeyToIngestionRetry(op, obj, namespace, name string) { + key := op + "/" + obj + "/" + namespace + "/" + name + rq := utils.SharedWorkQueue().GetQueueByName(gslbutils.IngestionRetryQueue) + rq.Workqueue[0].AddRateLimited(key) + gslbutils.Logf("key: %s, msg: published key to ingestion retry queue", key) +} + +func updateIngestionRetryAddCache(obj *gslbhralphav1.GSLBHostRule) { + if obj == nil { + return + } + gslbhr := obj.DeepCopy() + addCache := getRetryAddCache() + addCache.writeValueFor(gslbhr.Namespace, gslbhr.Name, gslbhr) +} + +func updateIngestionRetryUpdateCache(oldObj, newObj *gslbhralphav1.GSLBHostRule) { + if oldObj == nil || newObj == nil { + return + } + old, new := oldObj.DeepCopy(), newObj.DeepCopy() + updateCache := getRetryUpdateCache() + updateCache.writeValueFor(new.Namespace, new.Name, old, new) +} + +func getGslbHostRule(ns, name string) (*gslbhralphav1.GSLBHostRule, error) { + obj, err := gslbutils.GlobalGslbClient.AmkoV1alpha1().GSLBHostRules(ns).Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + return obj, nil +} + +func IngestionRetryAddUpdate(key string, wg *sync.WaitGroup) error { + gslbutils.Logf("key: %s, msg: processing key in ingestion retry", key) + op, objType, ns, name, err := gslbutils.ExtractIngestionRetryQueueKey(key) + if err != nil { + gslbutils.Errf("key: %s, msg: error in processing key for ingestion retry: %v", key, err) + return nil + } + + k8sQueue := utils.SharedWorkQueue().GetQueueByName(utils.ObjectIngestionLayer) + switch objType { + case gslbutils.GslbHostRuleType: + ghrObj, err := getGslbHostRule(ns, name) + if err != nil { + gslbutils.Errf("key: %s, msg: error in getting GSLBHostRule object: %v", key, err) + return nil + } + + switch op { + case gslbutils.ObjectAdd: + addCache := getRetryAddCache() + addObj, err := addCache.readAndDeleteKeyFor(ns, name) + if err != nil { + gslbutils.Errf("key: %s, ns: %s, name: %s, msg: object not present in retry add cache", + key, ns, name) + return nil + } + if ghrObj.ResourceVersion > addObj.ResourceVersion { + // a new resource version is available, no point in retrying for the old object + gslbutils.Logf("key: %s, ns: %s, name %s, msg: an object with new resource version available, won't retry", + key, ns, name) + return nil + } + gslbutils.Logf("key: %s, ns: %s, name: %s, msg: will retry adding object", key, ns, name) + AddGSLBHostRuleObj(addObj, k8sQueue.Workqueue, k8sQueue.NumWorkers) + return nil + + case gslbutils.ObjectUpdate: + updateCache := getRetryUpdateCache() + updateObjs, err := updateCache.readAndDeleteKeyFor(ns, name) + if err != nil { + gslbutils.Errf("key: %s, ns: %s, name: %s, msg: objects not present in the update cache", + key, ns, name) + return nil + } + UpdateGSLBHostRuleObj(updateObjs.old, updateObjs.new, k8sQueue.Workqueue, k8sQueue.NumWorkers) + return nil + } + + default: + gslbutils.Errf("key: %s, msg: unsupported object in ingestion retry worker", key) + } + return nil +} From c9443a5bf5939597520cd20de6ad3bdab4ec535e Mon Sep 17 00:00:00 2001 From: Hemant Kumar Shaw Date: Fri, 28 May 2021 15:20:20 +0530 Subject: [PATCH 2/3] Integration test fixes for ref failures --- .../integration/third_party_vips/gslb_hostrule_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/gslb/test/integration/third_party_vips/gslb_hostrule_test.go b/gslb/test/integration/third_party_vips/gslb_hostrule_test.go index 2d0a47a6a..fda465cd4 100644 --- a/gslb/test/integration/third_party_vips/gslb_hostrule_test.go +++ b/gslb/test/integration/third_party_vips/gslb_hostrule_test.go @@ -166,7 +166,8 @@ func TestGDPPropertiesForInvalidHealthMonitor(t *testing.T) { {Cluster: K8sContext}, {Cluster: OshiftContext}, } gdpObj.Spec.HealthMonitorRefs = hmRefs - _, err := AddAndVerifyTestGDPStatus(t, gdpObj, "health monitor ref my-hm3 is invalid") + _, err := AddAndVerifyTestGDPStatus(t, gdpObj, + "health monitor ref my-hm3 is invalid: health monitor ref my-hm3 is not federated, can't add") t.Cleanup(func() { DeleteTestGDP(t, gdpObj.Namespace, gdpObj.Name) }) @@ -489,7 +490,7 @@ func TestGSLBHostRuleCreateInvalidHM(t *testing.T) { gslbHRHmRefs := []string{"my-hm3"} gslbHRTTL := 20 addGSLBHostRule(t, gslbHRName, gslbutils.AVISystem, hostName, gslbHRHmRefs, nil, &gslbHRTTL, - ingestion.GslbHostRuleRejected, "Health Monitor Ref my-hm3 error for test-gslb-hr GSLBHostRule") + ingestion.GslbHostRuleRejected, "health monitor ref my-hm3 is not federated, can't add") g.Eventually(func() bool { // All fields remain unchanged because of the invalid GSLBHostRule return verifyGSMembers(t, expectedMembers, routeObj.Spec.Host, utils.ADMIN_NS, hmRefs, @@ -517,7 +518,7 @@ func TestGDPPropertiesForInvalidHealthMonitorUpdate(t *testing.T) { // update GDP with valid and invalid hm refs currGDP := getTestGDP(t, oldGDP.Name, oldGDP.Namespace) currGDP.Spec.HealthMonitorRefs = []string{"System-GSLB-Ping", "System-GSLB-HTTP", "System-Ping"} - gdp2 := updateTestGDPFailure(t, currGDP, "health monitor ref System-Ping is invalid") + gdp2 := updateTestGDPFailure(t, currGDP, "health monitor ref System-Ping is invalid: health monitor ref System-Ping is not federated, can't add") g.Eventually(func() bool { // member properties should remain unchanged return verifyGSMembers(t, expectedMembers, routeObj.Spec.Host, utils.ADMIN_NS, nil, nil, nil, nil) From 41435686cbfe7cd1c55d586fcdf753e5e990d0a8 Mon Sep 17 00:00:00 2001 From: Hemant Kumar Shaw Date: Fri, 28 May 2021 16:06:52 +0530 Subject: [PATCH 3/3] Fix Sprint to Sprintf --- gslb/ingestion/gslb_host_rule.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gslb/ingestion/gslb_host_rule.go b/gslb/ingestion/gslb_host_rule.go index 94d05d885..b77896488 100644 --- a/gslb/ingestion/gslb_host_rule.go +++ b/gslb/ingestion/gslb_host_rule.go @@ -196,7 +196,7 @@ func isHealthMonitorRefValid(refName string, gdp bool, fullSync bool) error { elems := make([]json.RawMessage, result.Count) err = json.Unmarshal(result.Results, &elems) if err != nil { - return gslbutils.GetIngestionErrorForParsing(fmt.Sprint("failed to unmarshal health monitor data for ref %s: %v", refName, err)) + return gslbutils.GetIngestionErrorForParsing(fmt.Sprintf("failed to unmarshal health monitor data for ref %s: %v", refName, err)) } hm := models.HealthMonitor{} err = json.Unmarshal(elems[0], &hm)