From a48b5bb2dbdf08f9eb23f37088ba9e4e9f537a6f Mon Sep 17 00:00:00 2001 From: Purnesh Dixit Date: Fri, 3 Jan 2025 02:39:37 +0530 Subject: [PATCH 1/7] xdsclient: update watcher API as per gRFC A88 --- xds/csds/csds_e2e_test.go | 35 ++++-------- .../balancer/cdsbalancer/cdsbalancer.go | 15 +++-- .../balancer/cdsbalancer/cluster_watcher.go | 16 +++--- .../clusterresolver/resource_resolver_eds.go | 49 ++++++++-------- xds/internal/resolver/watch_service.go | 32 +++++------ xds/internal/resolver/xds_resolver.go | 20 +++++-- xds/internal/server/listener_wrapper.go | 36 ++++++------ xds/internal/server/rds_handler.go | 29 ++++------ xds/internal/testutils/resource_watcher.go | 38 +++++++------ xds/internal/xdsclient/authority.go | 53 +++++++++-------- xds/internal/xdsclient/clientimpl_watchers.go | 4 +- .../tests/ads_stream_flow_control_test.go | 38 ++++++++----- .../xdsclient/tests/cds_watchers_test.go | 21 +++---- .../xdsclient/tests/eds_watchers_test.go | 18 +++--- .../xdsclient/tests/lds_watchers_test.go | 35 ++++++------ .../xdsclient/tests/misc_watchers_test.go | 14 ++--- .../xdsclient/tests/rds_watchers_test.go | 21 +++---- .../xdsclient/tests/resource_update_test.go | 24 ++++---- .../xdsresource/cluster_resource_type.go | 49 ++++++++-------- .../xdsresource/endpoints_resource_type.go | 49 ++++++++-------- .../xdsresource/listener_resource_type.go | 49 ++++++++-------- .../xdsclient/xdsresource/resource_type.go | 57 +++++++++++++------ .../xdsresource/route_config_resource_type.go | 49 ++++++++-------- 23 files changed, 385 insertions(+), 366 deletions(-) diff --git a/xds/csds/csds_e2e_test.go b/xds/csds/csds_e2e_test.go index 3c838afb67fc..bd11580bb640 100644 --- a/xds/csds/csds_e2e_test.go +++ b/xds/csds/csds_e2e_test.go @@ -70,49 +70,37 @@ func Test(t *testing.T) { type nopListenerWatcher struct{} -func (nopListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) { +func (nopListenerWatcher) OnResourceChanged(_ *xdsresource.ListenerResourceData, _ error, onDone xdsresource.OnDoneFunc) { onDone() } -func (nopListenerWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) { - onDone() -} -func (nopListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { +func (nopListenerWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) { onDone() } type nopRouteConfigWatcher struct{} -func (nopRouteConfigWatcher) OnUpdate(_ *xdsresource.RouteConfigResourceData, onDone xdsresource.OnDoneFunc) { +func (nopRouteConfigWatcher) OnResourceChanged(_ *xdsresource.RouteConfigResourceData, _ error, onDone xdsresource.OnDoneFunc) { onDone() } -func (nopRouteConfigWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) { - onDone() -} -func (nopRouteConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { +func (nopRouteConfigWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) { onDone() } type nopClusterWatcher struct{} -func (nopClusterWatcher) OnUpdate(_ *xdsresource.ClusterResourceData, onDone xdsresource.OnDoneFunc) { - onDone() -} -func (nopClusterWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) { +func (nopClusterWatcher) OnResourceChanged(_ *xdsresource.ClusterResourceData, _ error, onDone xdsresource.OnDoneFunc) { onDone() } -func (nopClusterWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { +func (nopClusterWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) { onDone() } type nopEndpointsWatcher struct{} -func (nopEndpointsWatcher) OnUpdate(_ *xdsresource.EndpointsResourceData, onDone xdsresource.OnDoneFunc) { - onDone() -} -func (nopEndpointsWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) { +func (nopEndpointsWatcher) OnResourceChanged(_ *xdsresource.EndpointsResourceData, _ error, onDone xdsresource.OnDoneFunc) { onDone() } -func (nopEndpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { +func (nopEndpointsWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) { onDone() } @@ -137,13 +125,10 @@ func newBlockingListenerWatcher(testCtxDone <-chan struct{}) *blockingListenerWa } } -func (w *blockingListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) { - writeOnDone(w.testCtxDone, w.onDoneCh, onDone) -} -func (w *blockingListenerWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) { +func (w *blockingListenerWatcher) OnResourceChanged(_ *xdsresource.ListenerResourceData, _ error, onDone xdsresource.OnDoneFunc) { writeOnDone(w.testCtxDone, w.onDoneCh, onDone) } -func (w *blockingListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { +func (w *blockingListenerWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) { writeOnDone(w.testCtxDone, w.onDoneCh, onDone) } diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer.go b/xds/internal/balancer/cdsbalancer/cdsbalancer.go index 9a112e276977..6254833bd100 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer.go @@ -342,7 +342,7 @@ func (b *cdsBalancer) ResolverError(err error) { if b.lbCfg != nil { root = b.lbCfg.ClusterName } - b.onClusterError(root, err) + b.onClusterAmbientError(root, err) }) } @@ -428,7 +428,7 @@ func (b *cdsBalancer) onClusterUpdate(name string, update xdsresource.ClusterUpd // If the security config is invalid, for example, if the provider // instance is not found in the bootstrap config, we need to put the // channel in transient failure. - b.onClusterError(name, fmt.Errorf("received Cluster resource contains invalid security config: %v", err)) + b.onClusterAmbientError(name, fmt.Errorf("received Cluster resource contains invalid security config: %v", err)) return } } @@ -436,12 +436,12 @@ func (b *cdsBalancer) onClusterUpdate(name string, update xdsresource.ClusterUpd clustersSeen := make(map[string]bool) dms, ok, err := b.generateDMsForCluster(b.lbCfg.ClusterName, 0, nil, clustersSeen) if err != nil { - b.onClusterError(b.lbCfg.ClusterName, fmt.Errorf("failed to generate discovery mechanisms: %v", err)) + b.onClusterAmbientError(b.lbCfg.ClusterName, fmt.Errorf("failed to generate discovery mechanisms: %v", err)) return } if ok { if len(dms) == 0 { - b.onClusterError(b.lbCfg.ClusterName, fmt.Errorf("aggregate cluster graph has no leaf clusters")) + b.onClusterAmbientError(b.lbCfg.ClusterName, fmt.Errorf("aggregate cluster graph has no leaf clusters")) return } // Child policy is built the first time we resolve the cluster graph. @@ -501,7 +501,7 @@ func (b *cdsBalancer) onClusterUpdate(name string, update xdsresource.ClusterUpd // TRANSIENT_FAILURE. // // Only executed in the context of a serializer callback. -func (b *cdsBalancer) onClusterError(name string, err error) { +func (b *cdsBalancer) onClusterAmbientError(name string, err error) { b.logger.Warningf("Cluster resource %q received error update: %v", name, err) if b.childLB != nil { @@ -525,15 +525,14 @@ func (b *cdsBalancer) onClusterError(name string, err error) { // TRANSIENT_FAILURE. // // Only executed in the context of a serializer callback. -func (b *cdsBalancer) onClusterResourceNotFound(name string) { - err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type Cluster not found in received response", name) +func (b *cdsBalancer) onClusterResourceChangedError(name string, err error) { if b.childLB != nil { b.childLB.ResolverError(err) } else { // If child balancer was never created, fail the RPCs with errors. b.ccw.UpdateState(balancer.State{ ConnectivityState: connectivity.TransientFailure, - Picker: base.NewErrPicker(err), + Picker: base.NewErrPicker(fmt.Errorf("%q: %v", name, err)), }) } } diff --git a/xds/internal/balancer/cdsbalancer/cluster_watcher.go b/xds/internal/balancer/cdsbalancer/cluster_watcher.go index 835461d0997b..e6d6c6d0d34a 100644 --- a/xds/internal/balancer/cdsbalancer/cluster_watcher.go +++ b/xds/internal/balancer/cdsbalancer/cluster_watcher.go @@ -32,21 +32,21 @@ type clusterWatcher struct { parent *cdsBalancer } -func (cw *clusterWatcher) OnUpdate(u *xdsresource.ClusterResourceData, onDone xdsresource.OnDoneFunc) { +func (cw *clusterWatcher) OnResourceChanged(u *xdsresource.ClusterResourceData, err error, onDone xdsresource.OnDoneFunc) { + if err != nil { + handleError := func(context.Context) { cw.parent.onClusterResourceChangedError(cw.name, err); onDone() } + cw.parent.serializer.ScheduleOr(handleError, onDone) + return + } handleUpdate := func(context.Context) { cw.parent.onClusterUpdate(cw.name, u.Resource); onDone() } cw.parent.serializer.ScheduleOr(handleUpdate, onDone) } -func (cw *clusterWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { - handleError := func(context.Context) { cw.parent.onClusterError(cw.name, err); onDone() } +func (cw *clusterWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) { + handleError := func(context.Context) { cw.parent.onClusterAmbientError(cw.name, err); onDone() } cw.parent.serializer.ScheduleOr(handleError, onDone) } -func (cw *clusterWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { - handleNotFound := func(context.Context) { cw.parent.onClusterResourceNotFound(cw.name); onDone() } - cw.parent.serializer.ScheduleOr(handleNotFound, onDone) -} - // watcherState groups the state associated with a clusterWatcher. type watcherState struct { watcher *clusterWatcher // The underlying watcher. diff --git a/xds/internal/balancer/clusterresolver/resource_resolver_eds.go b/xds/internal/balancer/clusterresolver/resource_resolver_eds.go index ddb949019ee5..7ad3628ccc22 100644 --- a/xds/internal/balancer/clusterresolver/resource_resolver_eds.go +++ b/xds/internal/balancer/clusterresolver/resource_resolver_eds.go @@ -76,12 +76,34 @@ func newEDSResolver(nameToWatch string, producer xdsresource.Producer, topLevelR } // OnUpdate is invoked to report an update for the resource being watched. -func (er *edsDiscoveryMechanism) OnUpdate(update *xdsresource.EndpointsResourceData, onDone xdsresource.OnDoneFunc) { +func (er *edsDiscoveryMechanism) OnResourceChanged(update *xdsresource.EndpointsResourceData, err error, onDone xdsresource.OnDoneFunc) { if er.stopped.HasFired() { onDone() return } + if err != nil { + if er.logger.V(2) { + if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound { + er.logger.Infof("EDS discovery mechanism for resource %q reported resource-does-not-exist error", er.nameToWatch) + } else { + er.logger.Infof("EDS discovery mechanism for resource %q reported on resource changed error: %v", er.nameToWatch, err) + } + } + // Report an empty update that would result in no priority child being + // created for this discovery mechanism. This would result in the priority + // LB policy reporting TRANSIENT_FAILURE (as there would be no priorities or + // localities) if this was the only discovery mechanism, or would result in + // the priority LB policy using a lower priority discovery mechanism when + // that becomes available. + er.mu.Lock() + er.update = &xdsresource.EndpointsUpdate{} + er.mu.Unlock() + + er.topLevelResolver.onUpdate(onDone) + return + } + er.mu.Lock() er.update = &update.Resource er.mu.Unlock() @@ -89,7 +111,7 @@ func (er *edsDiscoveryMechanism) OnUpdate(update *xdsresource.EndpointsResourceD er.topLevelResolver.onUpdate(onDone) } -func (er *edsDiscoveryMechanism) OnError(err error, onDone xdsresource.OnDoneFunc) { +func (er *edsDiscoveryMechanism) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) { if er.stopped.HasFired() { onDone() return @@ -119,26 +141,3 @@ func (er *edsDiscoveryMechanism) OnError(err error, onDone xdsresource.OnDoneFun er.topLevelResolver.onUpdate(onDone) } - -func (er *edsDiscoveryMechanism) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { - if er.stopped.HasFired() { - onDone() - return - } - - if er.logger.V(2) { - er.logger.Infof("EDS discovery mechanism for resource %q reported resource-does-not-exist error", er.nameToWatch) - } - - // Report an empty update that would result in no priority child being - // created for this discovery mechanism. This would result in the priority - // LB policy reporting TRANSIENT_FAILURE (as there would be no priorities or - // localities) if this was the only discovery mechanism, or would result in - // the priority LB policy using a lower priority discovery mechanism when - // that becomes available. - er.mu.Lock() - er.update = &xdsresource.EndpointsUpdate{} - er.mu.Unlock() - - er.topLevelResolver.onUpdate(onDone) -} diff --git a/xds/internal/resolver/watch_service.go b/xds/internal/resolver/watch_service.go index 0de6604484b1..6b716ea08b4c 100644 --- a/xds/internal/resolver/watch_service.go +++ b/xds/internal/resolver/watch_service.go @@ -36,21 +36,21 @@ func newListenerWatcher(resourceName string, parent *xdsResolver) *listenerWatch return lw } -func (l *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) { +func (l *listenerWatcher) OnResourceChanged(update *xdsresource.ListenerResourceData, err error, onDone xdsresource.OnDoneFunc) { + if err != nil { + handleError := func(context.Context) { l.parent.onListenerResourceChangedError(err); onDone() } + l.parent.serializer.ScheduleOr(handleError, onDone) + return + } handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(update.Resource); onDone() } l.parent.serializer.ScheduleOr(handleUpdate, onDone) } -func (l *listenerWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { - handleError := func(context.Context) { l.parent.onListenerResourceError(err); onDone() } +func (l *listenerWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) { + handleError := func(context.Context) { l.parent.onListenerResourceAmbientError(err); onDone() } l.parent.serializer.ScheduleOr(handleError, onDone) } -func (l *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { - handleNotFound := func(context.Context) { l.parent.onListenerResourceNotFound(); onDone() } - l.parent.serializer.ScheduleOr(handleNotFound, onDone) -} - func (l *listenerWatcher) stop() { l.cancel() l.parent.logger.Infof("Canceling watch on Listener resource %q", l.resourceName) @@ -68,7 +68,12 @@ func newRouteConfigWatcher(resourceName string, parent *xdsResolver) *routeConfi return rw } -func (r *routeConfigWatcher) OnUpdate(u *xdsresource.RouteConfigResourceData, onDone xdsresource.OnDoneFunc) { +func (r *routeConfigWatcher) OnResourceChanged(u *xdsresource.RouteConfigResourceData, err error, onDone xdsresource.OnDoneFunc) { + if err != nil { + handleError := func(context.Context) { r.parent.onRouteConfigResourceChangedError(r.resourceName, err); onDone() } + r.parent.serializer.ScheduleOr(handleError, onDone) + return + } handleUpdate := func(context.Context) { r.parent.onRouteConfigResourceUpdate(r.resourceName, u.Resource) onDone() @@ -76,16 +81,11 @@ func (r *routeConfigWatcher) OnUpdate(u *xdsresource.RouteConfigResourceData, on r.parent.serializer.ScheduleOr(handleUpdate, onDone) } -func (r *routeConfigWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { - handleError := func(context.Context) { r.parent.onRouteConfigResourceError(r.resourceName, err); onDone() } +func (r *routeConfigWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) { + handleError := func(context.Context) { r.parent.onRouteConfigResourceAmbientError(r.resourceName, err); onDone() } r.parent.serializer.ScheduleOr(handleError, onDone) } -func (r *routeConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { - handleNotFound := func(context.Context) { r.parent.onRouteConfigResourceNotFound(r.resourceName); onDone() } - r.parent.serializer.ScheduleOr(handleNotFound, onDone) -} - func (r *routeConfigWatcher) stop() { r.cancel() r.parent.logger.Infof("Canceling watch on RouteConfiguration resource %q", r.resourceName) diff --git a/xds/internal/resolver/xds_resolver.go b/xds/internal/resolver/xds_resolver.go index 1ba6c001d93d..ea6a6bc62d4f 100644 --- a/xds/internal/resolver/xds_resolver.go +++ b/xds/internal/resolver/xds_resolver.go @@ -518,7 +518,7 @@ func (r *xdsResolver) onListenerResourceUpdate(update xdsresource.ListenerUpdate r.routeConfigWatcher = newRouteConfigWatcher(r.rdsResourceName, r) } -func (r *xdsResolver) onListenerResourceError(err error) { +func (r *xdsResolver) onListenerResourceAmbientError(err error) { if r.logger.V(2) { r.logger.Infof("Received error for Listener resource %q: %v", r.ldsResourceName, err) } @@ -526,9 +526,13 @@ func (r *xdsResolver) onListenerResourceError(err error) { } // Only executed in the context of a serializer callback. -func (r *xdsResolver) onListenerResourceNotFound() { +func (r *xdsResolver) onListenerResourceChangedError(err error) { if r.logger.V(2) { - r.logger.Infof("Received resource-not-found-error for Listener resource %q", r.ldsResourceName) + if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound { + r.logger.Infof("Received resource-not-found-error for Listener resource %q", r.ldsResourceName) + } else { + r.logger.Infof("Received on-resource-changed error for Listener resource %q: %v", r.ldsResourceName, err) + } } r.listenerUpdateRecvd = false @@ -559,7 +563,7 @@ func (r *xdsResolver) onRouteConfigResourceUpdate(name string, update xdsresourc } // Only executed in the context of a serializer callback. -func (r *xdsResolver) onRouteConfigResourceError(name string, err error) { +func (r *xdsResolver) onRouteConfigResourceAmbientError(name string, err error) { if r.logger.V(2) { r.logger.Infof("Received error for RouteConfiguration resource %q: %v", name, err) } @@ -567,9 +571,13 @@ func (r *xdsResolver) onRouteConfigResourceError(name string, err error) { } // Only executed in the context of a serializer callback. -func (r *xdsResolver) onRouteConfigResourceNotFound(name string) { +func (r *xdsResolver) onRouteConfigResourceChangedError(name string, err error) { if r.logger.V(2) { - r.logger.Infof("Received resource-not-found-error for RouteConfiguration resource %q", name) + if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound { + r.logger.Infof("Received resource-not-found-error for RouteConfiguration resource %q", name) + } else { + r.logger.Infof("Received on-resource-changed error for RouteConfiguration resource %q: %v", name, err) + } } if r.rdsResourceName != name { diff --git a/xds/internal/server/listener_wrapper.go b/xds/internal/server/listener_wrapper.go index 09d320018aee..a820a921afa1 100644 --- a/xds/internal/server/listener_wrapper.go +++ b/xds/internal/server/listener_wrapper.go @@ -397,7 +397,7 @@ func (l *listenerWrapper) switchModeLocked(newMode connectivity.ServingMode, err } } -func (l *listenerWrapper) onLDSResourceDoesNotExist(err error) { +func (l *listenerWrapper) onLDSResourceChangedError(err error) { l.mu.Lock() defer l.mu.Unlock() l.switchModeLocked(connectivity.ServingModeNotServing, err) @@ -414,19 +414,31 @@ type ldsWatcher struct { name string } -func (lw *ldsWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) { +func (lw *ldsWatcher) OnResourceChanged(update *xdsresource.ListenerResourceData, err error, onDone xdsresource.OnDoneFunc) { defer onDone() if lw.parent.closed.HasFired() { - lw.logger.Warningf("Resource %q received update: %#v after listener was closed", lw.name, update) + if err != nil { + lw.logger.Warningf("Resource %q received err: %#v after listener was closed", lw.name, err) + } else { + lw.logger.Warningf("Resource %q received update: %#v after listener was closed", lw.name, update) + } return } if lw.logger.V(2) { - lw.logger.Infof("LDS watch for resource %q received update: %#v", lw.name, update.Resource) + if err != nil { + lw.logger.Infof("LDS watch for resource %q received error: %#v", lw.name, err) + } else { + lw.logger.Infof("LDS watch for resource %q received update: %#v", lw.name, update.Resource) + } + } + if err != nil { + lw.parent.onLDSResourceChangedError(err) + return } lw.parent.handleLDSUpdate(update.Resource) } -func (lw *ldsWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { +func (lw *ldsWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) { defer onDone() if lw.parent.closed.HasFired() { lw.logger.Warningf("Resource %q received error: %v after listener was closed", lw.name, err) @@ -438,17 +450,3 @@ func (lw *ldsWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { // For errors which are anything other than "resource-not-found", we // continue to use the old configuration. } - -func (lw *ldsWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { - defer onDone() - if lw.parent.closed.HasFired() { - lw.logger.Warningf("Resource %q received resource-does-not-exist error after listener was closed", lw.name) - return - } - if lw.logger.V(2) { - lw.logger.Infof("LDS watch for resource %q reported resource-does-not-exist error", lw.name) - } - - err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type Listener not found in received response", lw.name) - lw.parent.onLDSResourceDoesNotExist(err) -} diff --git a/xds/internal/server/rds_handler.go b/xds/internal/server/rds_handler.go index bcd3938e6f1a..998145b32767 100644 --- a/xds/internal/server/rds_handler.go +++ b/xds/internal/server/rds_handler.go @@ -147,7 +147,7 @@ type rdsWatcher struct { canceled bool // eats callbacks if true } -func (rw *rdsWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData, onDone xdsresource.OnDoneFunc) { +func (rw *rdsWatcher) OnResourceChanged(update *xdsresource.RouteConfigResourceData, err error, onDone xdsresource.OnDoneFunc) { defer onDone() rw.mu.Lock() if rw.canceled { @@ -156,26 +156,20 @@ func (rw *rdsWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData, onDo } rw.mu.Unlock() if rw.logger.V(2) { - rw.logger.Infof("RDS watch for resource %q received update: %#v", rw.routeName, update.Resource) + if err != nil { + rw.logger.Infof("RDS watch for resource %q received error: %#v", rw.routeName, err) + } else { + rw.logger.Infof("RDS watch for resource %q received update: %#v", rw.routeName, update.Resource) + } } - rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{data: &update.Resource}) -} - -func (rw *rdsWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { - defer onDone() - rw.mu.Lock() - if rw.canceled { - rw.mu.Unlock() + if err != nil { + rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{err: err}) return } - rw.mu.Unlock() - if rw.logger.V(2) { - rw.logger.Infof("RDS watch for resource %q reported error: %v", rw.routeName, err) - } - rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{err: err}) + rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{data: &update.Resource}) } -func (rw *rdsWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { +func (rw *rdsWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) { defer onDone() rw.mu.Lock() if rw.canceled { @@ -184,8 +178,7 @@ func (rw *rdsWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { } rw.mu.Unlock() if rw.logger.V(2) { - rw.logger.Infof("RDS watch for resource %q reported resource-does-not-exist error: %v", rw.routeName) + rw.logger.Infof("RDS watch for resource %q reported error: %v", rw.routeName, err) } - err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type RouteConfiguration not found in received response", rw.routeName) rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{err: err}) } diff --git a/xds/internal/testutils/resource_watcher.go b/xds/internal/testutils/resource_watcher.go index dae72e2a7733..522b5d9f37a9 100644 --- a/xds/internal/testutils/resource_watcher.go +++ b/xds/internal/testutils/resource_watcher.go @@ -35,10 +35,27 @@ type TestResourceWatcher struct { ResourceDoesNotExistCh chan struct{} } -// OnUpdate is invoked by the xDS client to report the latest update on the resource -// being watched. -func (w *TestResourceWatcher) OnUpdate(data xdsresource.ResourceData, onDone xdsresource.OnDoneFunc) { +// OnResourceChanged is invoked by the xDS client to report the latest update +// or an error on the resource being watched. +func (w *TestResourceWatcher) OnResourceChanged(data xdsresource.ResourceData, err error, onDone xdsresource.OnDoneFunc) { defer onDone() + if err != nil { + if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound { + select { + case <-w.ResourceDoesNotExistCh: + default: + } + w.ResourceDoesNotExistCh <- struct{}{} + return + } + select { + case <-w.ErrorCh: + default: + } + w.ErrorCh <- err + return + + } select { case <-w.UpdateCh: default: @@ -46,8 +63,8 @@ func (w *TestResourceWatcher) OnUpdate(data xdsresource.ResourceData, onDone xds w.UpdateCh <- &data } -// OnError is invoked by the xDS client to report the latest error. -func (w *TestResourceWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { +// OnAmbientError is invoked by the xDS client to report the latest error. +func (w *TestResourceWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) { defer onDone() select { case <-w.ErrorCh: @@ -56,17 +73,6 @@ func (w *TestResourceWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) w.ErrorCh <- err } -// OnResourceDoesNotExist is used by the xDS client to report that the resource -// being watched no longer exists. -func (w *TestResourceWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { - defer onDone() - select { - case <-w.ResourceDoesNotExistCh: - default: - } - w.ResourceDoesNotExistCh <- struct{}{} -} - // NewTestResourceWatcher returns a TestResourceWatcher to watch for resources // via the xDS client. func NewTestResourceWatcher() *TestResourceWatcher { diff --git a/xds/internal/xdsclient/authority.go b/xds/internal/xdsclient/authority.go index f81685a45e69..4e324208e0f8 100644 --- a/xds/internal/xdsclient/authority.go +++ b/xds/internal/xdsclient/authority.go @@ -177,24 +177,13 @@ func (a *authority) handleADSStreamFailure(serverConfig *bootstrap.ServerConfig, a.logger.Infof("Connection to server %s failed with error: %v", serverConfig, err) } - // We do not consider it an error if the ADS stream was closed after having - // received a response on the stream. This is because there are legitimate - // reasons why the server may need to close the stream during normal - // operations, such as needing to rebalance load or the underlying - // connection hitting its max connection age limit. See gRFC A57 for more - // details. - if xdsresource.ErrType(err) == xdsresource.ErrTypeStreamFailedAfterRecv { - a.logger.Warningf("Watchers not notified since ADS stream failed after having received at least one response: %v", err) - return - } - // Propagate the connection error from the transport layer to all watchers. for _, rType := range a.resources { for _, state := range rType { for watcher := range state.watchers { watcher := watcher a.watcherCallbackSerializer.TrySchedule(func(context.Context) { - watcher.OnError(xdsresource.NewErrorf(xdsresource.ErrorTypeConnection, "xds: error received from xDS stream: %v", err), func() {}) + watcher.OnAmbientError(xdsresource.NewErrorf(xdsresource.ErrorTypeConnection, "xds: error received from xDS stream: %v", err), func() {}) }) } } @@ -363,7 +352,7 @@ func (a *authority) handleADSResourceUpdate(serverConfig *bootstrap.ServerConfig watcher := watcher err := uErr.Err watcherCnt.Add(1) - funcsToSchedule = append(funcsToSchedule, func(context.Context) { watcher.OnError(err, done) }) + funcsToSchedule = append(funcsToSchedule, func(context.Context) { watcher.OnAmbientError(err, done) }) } continue } @@ -388,7 +377,7 @@ func (a *authority) handleADSResourceUpdate(serverConfig *bootstrap.ServerConfig watcher := watcher resource := uErr.Resource watcherCnt.Add(1) - funcsToSchedule = append(funcsToSchedule, func(context.Context) { watcher.OnUpdate(resource, done) }) + funcsToSchedule = append(funcsToSchedule, func(context.Context) { watcher.OnResourceChanged(resource, nil, done) }) } } @@ -436,9 +425,15 @@ func (a *authority) handleADSResourceUpdate(serverConfig *bootstrap.ServerConfig } if state.md.Status == xdsresource.ServiceStatusNotExist { // The metadata status is set to "ServiceStatusNotExist" if a - // previous update deleted this resource, in which case we do not - // want to repeatedly call the watch callbacks with a - // "resource-not-found" error. + // previous update deleted this resource, in which case we + // want to send an ambient error. + for watcher := range state.watchers { + watcher := watcher + watcherCnt.Add(1) + funcsToSchedule = append(funcsToSchedule, func(context.Context) { + watcher.OnAmbientError(xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds: previous update deleted this resource"), done) + }) + } continue } if serverConfig.ServerFeaturesIgnoreResourceDeletion() { @@ -455,17 +450,17 @@ func (a *authority) handleADSResourceUpdate(serverConfig *bootstrap.ServerConfig continue } - // If we get here, it means that the resource exists in cache, but not - // in the new update. Delete the resource from cache, and send a - // resource not found error to indicate that the resource has been - // removed. Metadata for the resource is still maintained, as this is - // required by CSDS. + // If we get here, it means that the resource exists in cache, but + // not in the new update. Delete the resource from cache. Metadata + // for the resource is still maintained, as this is required by CSDS. state.cache = nil state.md = xdsresource.UpdateMetadata{Status: xdsresource.ServiceStatusNotExist} for watcher := range state.watchers { watcher := watcher watcherCnt.Add(1) - funcsToSchedule = append(funcsToSchedule, func(context.Context) { watcher.OnResourceDoesNotExist(done) }) + funcsToSchedule = append(funcsToSchedule, func(context.Context) { + watcher.OnResourceChanged(nil, xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds: resource has been removed"), done) + }) } } } @@ -507,7 +502,9 @@ func (a *authority) handleADSResourceDoesNotExist(rType xdsresource.Type, resour state.md = xdsresource.UpdateMetadata{Status: xdsresource.ServiceStatusNotExist} for watcher := range state.watchers { watcher := watcher - a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnResourceDoesNotExist(func() {}) }) + a.watcherCallbackSerializer.TrySchedule(func(context.Context) { + watcher.OnResourceChanged(nil, xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds: resource %s does not exist", rType.TypeName()), func() {}) + }) } } @@ -643,7 +640,7 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w // xdsClientSerializer callback. Hence making a copy of the cached // resource here for watchCallbackSerializer. resource := state.cache - a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnUpdate(resource, func() {}) }) + a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnResourceChanged(resource, nil, func() {}) }) } // If last update was NACK'd, notify the new watcher of error // immediately as well. @@ -655,12 +652,14 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w // xdsClientSerializer callback. Hence making a copy of the error // here for watchCallbackSerializer. err := state.md.ErrState.Err - a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnError(err, func() {}) }) + a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnAmbientError(err, func() {}) }) } // If the metadata field is updated to indicate that the management // server does not have this resource, notify the new watcher. if state.md.Status == xdsresource.ServiceStatusNotExist { - a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnResourceDoesNotExist(func() {}) }) + a.watcherCallbackSerializer.TrySchedule(func(context.Context) { + watcher.OnResourceChanged(nil, xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds: resource %s does not exist", rType.TypeName()), func() {}) + }) } cleanup = a.unwatchResource(rType, resourceName, watcher) }, func() { diff --git a/xds/internal/xdsclient/clientimpl_watchers.go b/xds/internal/xdsclient/clientimpl_watchers.go index ed4ee360fb7d..b21f89131296 100644 --- a/xds/internal/xdsclient/clientimpl_watchers.go +++ b/xds/internal/xdsclient/clientimpl_watchers.go @@ -45,7 +45,7 @@ func (c *clientImpl) WatchResource(rType xdsresource.Type, resourceName string, if err := c.resourceTypes.maybeRegister(rType); err != nil { logger.Warningf("Watch registered for name %q of type %q which is already registered", rType.TypeName(), resourceName) - c.serializer.TrySchedule(func(context.Context) { watcher.OnError(err, func() {}) }) + c.serializer.TrySchedule(func(context.Context) { watcher.OnResourceChanged(nil, err, func() {}) }) return func() {} } @@ -54,7 +54,7 @@ func (c *clientImpl) WatchResource(rType xdsresource.Type, resourceName string, if a == nil { logger.Warningf("Watch registered for name %q of type %q, authority %q is not found", rType.TypeName(), resourceName, n.Authority) c.serializer.TrySchedule(func(context.Context) { - watcher.OnError(fmt.Errorf("authority %q not found in bootstrap config for resource %q", n.Authority, resourceName), func() {}) + watcher.OnResourceChanged(nil, fmt.Errorf("authority %q not found in bootstrap config for resource %q", n.Authority, resourceName), func() {}) }) return func() {} } diff --git a/xds/internal/xdsclient/tests/ads_stream_flow_control_test.go b/xds/internal/xdsclient/tests/ads_stream_flow_control_test.go index ff0243f3d462..ffa6cdf09d03 100644 --- a/xds/internal/xdsclient/tests/ads_stream_flow_control_test.go +++ b/xds/internal/xdsclient/tests/ads_stream_flow_control_test.go @@ -60,7 +60,28 @@ func newBLockingListenerWatcher() *blockingListenerWatcher { } } -func (lw *blockingListenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, done xdsresource.OnDoneFunc) { +func (lw *blockingListenerWatcher) OnResourceChanged(update *xdsresource.ListenerResourceData, err error, done xdsresource.OnDoneFunc) { + if err != nil { + if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound { + // Notify receipt of resource not found. + select { + case lw.notFoundCh <- struct{}{}: + default: + } + } else { + select { + case lw.errorCh <- struct{}{}: + default: + } + } + + select { + case lw.doneNotifierCh <- done: + default: + } + + return + } // Notify receipt of the update. select { case lw.updateCh <- struct{}{}: @@ -73,7 +94,7 @@ func (lw *blockingListenerWatcher) OnUpdate(update *xdsresource.ListenerResource } } -func (lw *blockingListenerWatcher) OnError(err error, done xdsresource.OnDoneFunc) { +func (lw *blockingListenerWatcher) OnAmbientError(err error, done xdsresource.OnDoneFunc) { // Notify receipt of an error. select { case lw.errorCh <- struct{}{}: @@ -86,19 +107,6 @@ func (lw *blockingListenerWatcher) OnError(err error, done xdsresource.OnDoneFun } } -func (lw *blockingListenerWatcher) OnResourceDoesNotExist(done xdsresource.OnDoneFunc) { - // Notify receipt of resource not found. - select { - case lw.notFoundCh <- struct{}{}: - default: - } - - select { - case lw.doneNotifierCh <- done: - default: - } -} - type wrappedADSStream struct { v3adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient recvCh chan struct{} diff --git a/xds/internal/xdsclient/tests/cds_watchers_test.go b/xds/internal/xdsclient/tests/cds_watchers_test.go index f8cd6dac7691..165ca0057b6b 100644 --- a/xds/internal/xdsclient/tests/cds_watchers_test.go +++ b/xds/internal/xdsclient/tests/cds_watchers_test.go @@ -44,13 +44,10 @@ import ( type noopClusterWatcher struct{} -func (noopClusterWatcher) OnUpdate(update *xdsresource.ClusterResourceData, onDone xdsresource.OnDoneFunc) { +func (noopClusterWatcher) OnResourceChanged(_ *xdsresource.ClusterResourceData, _ error, onDone xdsresource.OnDoneFunc) { onDone() } -func (noopClusterWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { - onDone() -} -func (noopClusterWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { +func (noopClusterWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) { onDone() } @@ -67,12 +64,17 @@ func newClusterWatcher() *clusterWatcher { return &clusterWatcher{updateCh: testutils.NewChannel()} } -func (cw *clusterWatcher) OnUpdate(update *xdsresource.ClusterResourceData, onDone xdsresource.OnDoneFunc) { +func (cw *clusterWatcher) OnResourceChanged(update *xdsresource.ClusterResourceData, err error, onDone xdsresource.OnDoneFunc) { + if err != nil { + cw.updateCh.Replace(clusterUpdateErrTuple{err: err}) + onDone() + return + } cw.updateCh.Send(clusterUpdateErrTuple{update: update.Resource}) onDone() } -func (cw *clusterWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { +func (cw *clusterWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) { // When used with a go-control-plane management server that continuously // resends resources which are NACKed by the xDS client, using a `Replace()` // here and in OnResourceDoesNotExist() simplifies tests which will have @@ -81,11 +83,6 @@ func (cw *clusterWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { onDone() } -func (cw *clusterWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { - cw.updateCh.Replace(clusterUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Cluster not found in received response")}) - onDone() -} - // badClusterResource returns a cluster resource for the given name which // contains a config_source_specifier for the `lrs_server` field which is not // set to `self`, and hence is expected to be NACKed by the client. diff --git a/xds/internal/xdsclient/tests/eds_watchers_test.go b/xds/internal/xdsclient/tests/eds_watchers_test.go index 21021b8992bb..c6506ddf408a 100644 --- a/xds/internal/xdsclient/tests/eds_watchers_test.go +++ b/xds/internal/xdsclient/tests/eds_watchers_test.go @@ -53,10 +53,10 @@ const ( type noopEndpointsWatcher struct{} -func (noopEndpointsWatcher) OnUpdate(update *xdsresource.EndpointsResourceData, onDone xdsresource.OnDoneFunc) { +func (noopEndpointsWatcher) OnResourceChanged(_ *xdsresource.EndpointsResourceData, _ error, onDone xdsresource.OnDoneFunc) { onDone() } -func (noopEndpointsWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { +func (noopEndpointsWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) { onDone() } func (noopEndpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { @@ -76,12 +76,17 @@ func newEndpointsWatcher() *endpointsWatcher { return &endpointsWatcher{updateCh: testutils.NewChannel()} } -func (ew *endpointsWatcher) OnUpdate(update *xdsresource.EndpointsResourceData, onDone xdsresource.OnDoneFunc) { +func (ew *endpointsWatcher) OnResourceChanged(update *xdsresource.EndpointsResourceData, err error, onDone xdsresource.OnDoneFunc) { + if err != nil { + ew.updateCh.Replace(endpointsUpdateErrTuple{err: err}) + onDone() + return + } ew.updateCh.Send(endpointsUpdateErrTuple{update: update.Resource}) onDone() } -func (ew *endpointsWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { +func (ew *endpointsWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) { // When used with a go-control-plane management server that continuously // resends resources which are NACKed by the xDS client, using a `Replace()` // here and in OnResourceDoesNotExist() simplifies tests which will have @@ -90,11 +95,6 @@ func (ew *endpointsWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { onDone() } -func (ew *endpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { - ew.updateCh.Replace(endpointsUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Endpoints not found in received response")}) - onDone() -} - // badEndpointsResource returns a endpoints resource for the given // edsServiceName which contains an endpoint with a load_balancing weight of // `0`. This is expected to be NACK'ed by the xDS client. diff --git a/xds/internal/xdsclient/tests/lds_watchers_test.go b/xds/internal/xdsclient/tests/lds_watchers_test.go index b05b9caf4adc..b03e296e207e 100644 --- a/xds/internal/xdsclient/tests/lds_watchers_test.go +++ b/xds/internal/xdsclient/tests/lds_watchers_test.go @@ -48,13 +48,10 @@ import ( type noopListenerWatcher struct{} -func (noopListenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) { +func (noopListenerWatcher) OnResourceChanged(_ *xdsresource.ListenerResourceData, _ error, onDone xdsresource.OnDoneFunc) { onDone() } -func (noopListenerWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { - onDone() -} -func (noopListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { +func (noopListenerWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) { onDone() } @@ -71,12 +68,17 @@ func newListenerWatcher() *listenerWatcher { return &listenerWatcher{updateCh: testutils.NewChannel()} } -func (lw *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) { +func (lw *listenerWatcher) OnResourceChanged(update *xdsresource.ListenerResourceData, err error, onDone xdsresource.OnDoneFunc) { + if err != nil { + lw.updateCh.Replace(listenerUpdateErrTuple{err: err}) + onDone() + return + } lw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource}) onDone() } -func (lw *listenerWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { +func (lw *listenerWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) { // When used with a go-control-plane management server that continuously // resends resources which are NACKed by the xDS client, using a `Replace()` // here and in OnResourceDoesNotExist() simplifies tests which will have @@ -85,11 +87,6 @@ func (lw *listenerWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { onDone() } -func (lw *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { - lw.updateCh.Replace(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")}) - onDone() -} - type listenerWatcherMultiple struct { updateCh *testutils.Channel } @@ -100,21 +97,21 @@ func newListenerWatcherMultiple(size int) *listenerWatcherMultiple { return &listenerWatcherMultiple{updateCh: testutils.NewChannelWithSize(size)} } -func (lw *listenerWatcherMultiple) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) { +func (lw *listenerWatcherMultiple) OnResourceChanged(update *xdsresource.ListenerResourceData, err error, onDone xdsresource.OnDoneFunc) { + if err != nil { + lw.updateCh.Send(listenerUpdateErrTuple{err: err}) + onDone() + return + } lw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource}) onDone() } -func (lw *listenerWatcherMultiple) OnError(err error, onDone xdsresource.OnDoneFunc) { +func (lw *listenerWatcherMultiple) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) { lw.updateCh.Send(listenerUpdateErrTuple{err: err}) onDone() } -func (lw *listenerWatcherMultiple) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { - lw.updateCh.Send(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")}) - onDone() -} - // badListenerResource returns a listener resource for the given name which does // not contain the `RouteSpecifier` field in the HTTPConnectionManager, and // hence is expected to be NACKed by the client. diff --git a/xds/internal/xdsclient/tests/misc_watchers_test.go b/xds/internal/xdsclient/tests/misc_watchers_test.go index 6b8152620231..76e764421730 100644 --- a/xds/internal/xdsclient/tests/misc_watchers_test.go +++ b/xds/internal/xdsclient/tests/misc_watchers_test.go @@ -69,7 +69,12 @@ func newTestRouteConfigWatcher(client xdsclient.XDSClient, name1, name2 string) } } -func (rw *testRouteConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData, onDone xdsresource.OnDoneFunc) { +func (rw *testRouteConfigWatcher) OnResourceChanged(update *xdsresource.RouteConfigResourceData, err error, onDone xdsresource.OnDoneFunc) { + if err != nil { + rw.updateCh.Replace(routeConfigUpdateErrTuple{err: err}) + onDone() + return + } rw.updateCh.Send(routeConfigUpdateErrTuple{update: update.Resource}) rw.cancel1 = xdsresource.WatchRouteConfig(rw.client, rw.name1, rw.rcw1) @@ -77,7 +82,7 @@ func (rw *testRouteConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResour onDone() } -func (rw *testRouteConfigWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { +func (rw *testRouteConfigWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) { // When used with a go-control-plane management server that continuously // resends resources which are NACKed by the xDS client, using a `Replace()` // here and in OnResourceDoesNotExist() simplifies tests which will have @@ -86,11 +91,6 @@ func (rw *testRouteConfigWatcher) OnError(err error, onDone xdsresource.OnDoneFu onDone() } -func (rw *testRouteConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { - rw.updateCh.Replace(routeConfigUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "RouteConfiguration not found in received response")}) - onDone() -} - func (rw *testRouteConfigWatcher) cancel() { rw.cancel1() rw.cancel2() diff --git a/xds/internal/xdsclient/tests/rds_watchers_test.go b/xds/internal/xdsclient/tests/rds_watchers_test.go index b8dd1c72f465..dfb161bb69a5 100644 --- a/xds/internal/xdsclient/tests/rds_watchers_test.go +++ b/xds/internal/xdsclient/tests/rds_watchers_test.go @@ -43,13 +43,10 @@ import ( type noopRouteConfigWatcher struct{} -func (noopRouteConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData, onDone xdsresource.OnDoneFunc) { +func (noopRouteConfigWatcher) OnResourceChanged(_ *xdsresource.RouteConfigResourceData, _ error, onDone xdsresource.OnDoneFunc) { onDone() } -func (noopRouteConfigWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { - onDone() -} -func (noopRouteConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { +func (noopRouteConfigWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) { onDone() } @@ -66,12 +63,17 @@ func newRouteConfigWatcher() *routeConfigWatcher { return &routeConfigWatcher{updateCh: testutils.NewChannel()} } -func (rw *routeConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData, onDone xdsresource.OnDoneFunc) { +func (rw *routeConfigWatcher) OnResourceChanged(update *xdsresource.RouteConfigResourceData, err error, onDone xdsresource.OnDoneFunc) { + if err != nil { + rw.updateCh.Replace(routeConfigUpdateErrTuple{err: err}) + onDone() + return + } rw.updateCh.Send(routeConfigUpdateErrTuple{update: update.Resource}) onDone() } -func (rw *routeConfigWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { +func (rw *routeConfigWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) { // When used with a go-control-plane management server that continuously // resends resources which are NACKed by the xDS client, using a `Replace()` // here and in OnResourceDoesNotExist() simplifies tests which will have @@ -80,11 +82,6 @@ func (rw *routeConfigWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) onDone() } -func (rw *routeConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { - rw.updateCh.Replace(routeConfigUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "RouteConfiguration not found in received response")}) - onDone() -} - // badRouteConfigResource returns a RouteConfiguration resource for the given // routeName which contains a retry config with num_retries set to `0`. This is // expected to be NACK'ed by the xDS client. diff --git a/xds/internal/xdsclient/tests/resource_update_test.go b/xds/internal/xdsclient/tests/resource_update_test.go index 0460385d0fb7..67681a1ce641 100644 --- a/xds/internal/xdsclient/tests/resource_update_test.go +++ b/xds/internal/xdsclient/tests/resource_update_test.go @@ -161,7 +161,7 @@ func (s) TestHandleListenerResponseFromManagementServer(t *testing.T) { Value: []byte{1, 2, 3, 4}, }}, }, - wantErr: "Listener not found in received response", + wantErr: "xds: resource ListenerResource does not exist", wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{ { TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", @@ -177,7 +177,7 @@ func (s) TestHandleListenerResponseFromManagementServer(t *testing.T) { TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", VersionInfo: "1", }, - wantErr: "Listener not found in received response", + wantErr: "xds: resource ListenerResource does not exist", wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{ { TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", @@ -194,7 +194,7 @@ func (s) TestHandleListenerResponseFromManagementServer(t *testing.T) { VersionInfo: "1", Resources: []*anypb.Any{testutils.MarshalAny(t, &v3routepb.RouteConfiguration{})}, }, - wantErr: "Listener not found in received response", + wantErr: "xds: resource ListenerResource does not exist", wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{ { TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", @@ -418,7 +418,7 @@ func (s) TestHandleRouteConfigResponseFromManagementServer(t *testing.T) { Value: []byte{1, 2, 3, 4}, }}, }, - wantErr: "RouteConfiguration not found in received response", + wantErr: "xds: resource RouteConfigResource does not exist", wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{ { TypeUrl: "type.googleapis.com/envoy.config.route.v3.RouteConfiguration", @@ -434,7 +434,7 @@ func (s) TestHandleRouteConfigResponseFromManagementServer(t *testing.T) { TypeUrl: "type.googleapis.com/envoy.config.route.v3.RouteConfiguration", VersionInfo: "1", }, - wantErr: "RouteConfiguration not found in received response", + wantErr: "xds: resource RouteConfigResource does not exist", wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{ { TypeUrl: "type.googleapis.com/envoy.config.route.v3.RouteConfiguration", @@ -451,7 +451,7 @@ func (s) TestHandleRouteConfigResponseFromManagementServer(t *testing.T) { VersionInfo: "1", Resources: []*anypb.Any{testutils.MarshalAny(t, &v3clusterpb.Cluster{})}, }, - wantErr: "RouteConfiguration not found in received response", + wantErr: "xds: resource RouteConfigResource does not exist", wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{ { TypeUrl: "type.googleapis.com/envoy.config.route.v3.RouteConfiguration", @@ -667,7 +667,7 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) { Value: []byte{1, 2, 3, 4}, }}, }, - wantErr: "Cluster not found in received response", + wantErr: "xds: resource ClusterResource does not exist", wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{ { TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster", @@ -683,7 +683,7 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) { TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster", VersionInfo: "1", }, - wantErr: "Cluster not found in received response", + wantErr: "xds: resource ClusterResource does not exist", wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{ { TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster", @@ -700,7 +700,7 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) { VersionInfo: "1", Resources: []*anypb.Any{testutils.MarshalAny(t, &v3endpointpb.ClusterLoadAssignment{})}, }, - wantErr: "Cluster not found in received response", + wantErr: "xds: resource ClusterResource does not exist", wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{ { TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster", @@ -974,7 +974,7 @@ func (s) TestHandleEndpointsResponseFromManagementServer(t *testing.T) { Value: []byte{1, 2, 3, 4}, }}, }, - wantErr: "Endpoints not found in received response", + wantErr: "xds: resource EndpointsResource does not exist", wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{ { TypeUrl: "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", @@ -990,7 +990,7 @@ func (s) TestHandleEndpointsResponseFromManagementServer(t *testing.T) { TypeUrl: "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", VersionInfo: "1", }, - wantErr: "Endpoints not found in received response", + wantErr: "xds: resource EndpointsResource does not exist", wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{ { TypeUrl: "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", @@ -1007,7 +1007,7 @@ func (s) TestHandleEndpointsResponseFromManagementServer(t *testing.T) { VersionInfo: "1", Resources: []*anypb.Any{testutils.MarshalAny(t, &v3listenerpb.Listener{})}, }, - wantErr: "Endpoints not found in received response", + wantErr: "xds: resource EndpointsResource does not exist", wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{ { TypeUrl: "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", diff --git a/xds/internal/xdsclient/xdsresource/cluster_resource_type.go b/xds/internal/xdsclient/xdsresource/cluster_resource_type.go index 8e9375fcbbec..0e43f0261cd4 100644 --- a/xds/internal/xdsclient/xdsresource/cluster_resource_type.go +++ b/xds/internal/xdsclient/xdsresource/cluster_resource_type.go @@ -110,39 +110,42 @@ func (c *ClusterResourceData) Raw() *anypb.Any { // ClusterWatcher wraps the callbacks to be invoked for different events // corresponding to the cluster resource being watched. type ClusterWatcher interface { - // OnUpdate is invoked to report an update for the resource being watched. - OnUpdate(*ClusterResourceData, OnDoneFunc) - - // OnError is invoked under different error conditions including but not + // OnResourceChanged is invoked to notify the watcher of a new version of + // the resource received from the xDS server or an error indicating the + // reason why the resource cannot be obtained. + // + // It is invoked under different error conditions including but not // limited to the following: - // - authority mentioned in the resource is not found - // - resource name parsing error - // - resource deserialization error - // - resource validation error - // - ADS stream failure - // - connection failure - OnError(error, OnDoneFunc) - - // OnResourceDoesNotExist is invoked for a specific error condition where - // the requested resource is not found on the xDS management server. - OnResourceDoesNotExist(OnDoneFunc) + // - authority mentioned in the resource is not found + // - resource name parsing error + // - resource validation error (if resource is not cached) + // - ADS stream failure (if resource is not cached) + // - connection failure (if resource is not cached) + OnResourceChanged(*ClusterResourceData, error, OnDoneFunc) + + // If resource is already cached, it is invoked under different error + // conditions including but not limited to the following: + // - resource validation error + // - ADS stream failure + // - connection failure + OnAmbientError(error, OnDoneFunc) } type delegatingClusterWatcher struct { watcher ClusterWatcher } -func (d *delegatingClusterWatcher) OnUpdate(data ResourceData, onDone OnDoneFunc) { +func (d *delegatingClusterWatcher) OnResourceChanged(data ResourceData, err error, onDone OnDoneFunc) { + if err != nil { + d.watcher.OnResourceChanged(nil, err, onDone) + return + } c := data.(*ClusterResourceData) - d.watcher.OnUpdate(c, onDone) -} - -func (d *delegatingClusterWatcher) OnError(err error, onDone OnDoneFunc) { - d.watcher.OnError(err, onDone) + d.watcher.OnResourceChanged(c, nil, onDone) } -func (d *delegatingClusterWatcher) OnResourceDoesNotExist(onDone OnDoneFunc) { - d.watcher.OnResourceDoesNotExist(onDone) +func (d *delegatingClusterWatcher) OnAmbientError(err error, onDone OnDoneFunc) { + d.watcher.OnAmbientError(err, onDone) } // WatchCluster uses xDS to discover the configuration associated with the diff --git a/xds/internal/xdsclient/xdsresource/endpoints_resource_type.go b/xds/internal/xdsclient/xdsresource/endpoints_resource_type.go index 94c03d0c5228..2f0faf5b70aa 100644 --- a/xds/internal/xdsclient/xdsresource/endpoints_resource_type.go +++ b/xds/internal/xdsclient/xdsresource/endpoints_resource_type.go @@ -106,39 +106,42 @@ func (e *EndpointsResourceData) Raw() *anypb.Any { // EndpointsWatcher wraps the callbacks to be invoked for different // events corresponding to the endpoints resource being watched. type EndpointsWatcher interface { - // OnUpdate is invoked to report an update for the resource being watched. - OnUpdate(*EndpointsResourceData, OnDoneFunc) - - // OnError is invoked under different error conditions including but not + // OnResourceChanged is invoked to notify the watcher of a new version of + // the resource received from the xDS server or an error indicating the + // reason why the resource cannot be obtained. + // + // It is invoked under different error conditions including but not // limited to the following: - // - authority mentioned in the resource is not found - // - resource name parsing error - // - resource deserialization error - // - resource validation error - // - ADS stream failure - // - connection failure - OnError(error, OnDoneFunc) - - // OnResourceDoesNotExist is invoked for a specific error condition where - // the requested resource is not found on the xDS management server. - OnResourceDoesNotExist(OnDoneFunc) + // - authority mentioned in the resource is not found + // - resource name parsing error + // - resource validation error (if resource is not cached) + // - ADS stream failure (if resource is not cached) + // - connection failure (if resource is not cached) + OnResourceChanged(*EndpointsResourceData, error, OnDoneFunc) + + // If resource is already cached, it is invoked under different error + // conditions including but not limited to the following: + // - resource validation error + // - ADS stream failure + // - connection failure + OnAmbientError(error, OnDoneFunc) } type delegatingEndpointsWatcher struct { watcher EndpointsWatcher } -func (d *delegatingEndpointsWatcher) OnUpdate(data ResourceData, onDone OnDoneFunc) { +func (d *delegatingEndpointsWatcher) OnResourceChanged(data ResourceData, err error, onDone OnDoneFunc) { + if err != nil { + d.watcher.OnResourceChanged(nil, err, onDone) + return + } e := data.(*EndpointsResourceData) - d.watcher.OnUpdate(e, onDone) -} - -func (d *delegatingEndpointsWatcher) OnError(err error, onDone OnDoneFunc) { - d.watcher.OnError(err, onDone) + d.watcher.OnResourceChanged(e, nil, onDone) } -func (d *delegatingEndpointsWatcher) OnResourceDoesNotExist(onDone OnDoneFunc) { - d.watcher.OnResourceDoesNotExist(onDone) +func (d *delegatingEndpointsWatcher) OnAmbientError(err error, onDone OnDoneFunc) { + d.watcher.OnAmbientError(err, onDone) } // WatchEndpoints uses xDS to discover the configuration associated with the diff --git a/xds/internal/xdsclient/xdsresource/listener_resource_type.go b/xds/internal/xdsclient/xdsresource/listener_resource_type.go index e3ca1134a07b..07ddd5ae1bfc 100644 --- a/xds/internal/xdsclient/xdsresource/listener_resource_type.go +++ b/xds/internal/xdsclient/xdsresource/listener_resource_type.go @@ -143,39 +143,42 @@ func (l *ListenerResourceData) Raw() *anypb.Any { // ListenerWatcher wraps the callbacks to be invoked for different // events corresponding to the listener resource being watched. type ListenerWatcher interface { - // OnUpdate is invoked to report an update for the resource being watched. - OnUpdate(*ListenerResourceData, OnDoneFunc) - - // OnError is invoked under different error conditions including but not + // OnResourceChanged is invoked to notify the watcher of a new version of + // the resource received from the xDS server or an error indicating the + // reason why the resource cannot be obtained. + // + // It is invoked under different error conditions including but not // limited to the following: - // - authority mentioned in the resource is not found - // - resource name parsing error - // - resource deserialization error - // - resource validation error - // - ADS stream failure - // - connection failure - OnError(error, OnDoneFunc) - - // OnResourceDoesNotExist is invoked for a specific error condition where - // the requested resource is not found on the xDS management server. - OnResourceDoesNotExist(OnDoneFunc) + // - authority mentioned in the resource is not found + // - resource name parsing error + // - resource validation error (if resource is not cached) + // - ADS stream failure (if resource is not cached) + // - connection failure (if resource is not cached) + OnResourceChanged(*ListenerResourceData, error, OnDoneFunc) + + // If resource is already cached, it is invoked under different error + // conditions including but not limited to the following: + // - resource validation error + // - ADS stream failure + // - connection failure + OnAmbientError(error, OnDoneFunc) } type delegatingListenerWatcher struct { watcher ListenerWatcher } -func (d *delegatingListenerWatcher) OnUpdate(data ResourceData, onDone OnDoneFunc) { +func (d *delegatingListenerWatcher) OnResourceChanged(data ResourceData, err error, onDone OnDoneFunc) { + if err != nil { + d.watcher.OnResourceChanged(nil, err, onDone) + return + } l := data.(*ListenerResourceData) - d.watcher.OnUpdate(l, onDone) -} - -func (d *delegatingListenerWatcher) OnError(err error, onDone OnDoneFunc) { - d.watcher.OnError(err, onDone) + d.watcher.OnResourceChanged(l, nil, onDone) } -func (d *delegatingListenerWatcher) OnResourceDoesNotExist(onDone OnDoneFunc) { - d.watcher.OnResourceDoesNotExist(onDone) +func (d *delegatingListenerWatcher) OnAmbientError(err error, onDone OnDoneFunc) { + d.watcher.OnAmbientError(err, onDone) } // WatchListener uses xDS to discover the configuration associated with the diff --git a/xds/internal/xdsclient/xdsresource/resource_type.go b/xds/internal/xdsclient/xdsresource/resource_type.go index e14f56f781d1..55b5f4a88430 100644 --- a/xds/internal/xdsclient/xdsresource/resource_type.go +++ b/xds/internal/xdsclient/xdsresource/resource_type.go @@ -58,27 +58,48 @@ type Producer interface { // from the xDS server. type OnDoneFunc func() -// ResourceWatcher wraps the callbacks to be invoked for different events -// corresponding to the resource being watched. +// ResourceWatcher is an interface that can to be implemented to wrap the +// callbacks to be invoked for different events corresponding to the resource +// being watched. type ResourceWatcher interface { - // OnUpdate is invoked to report an update for the resource being watched. + // OnResourceChanged is invoked to notify the watcher of a new version of + // the resource received from the xDS server or an error indicating the + // reason why the resource cannot be obtained. + // // The ResourceData parameter needs to be type asserted to the appropriate - // type for the resource being watched. - OnUpdate(ResourceData, OnDoneFunc) - - // OnError is invoked under different error conditions including but not + // type for the resource being watched. In case of error, the ResourceData + // is nil otherwise its not nil and error is nil but both will never be nil + // together. + // + // Watcher is expected to use the most recent value passed to + // OnResourceChanged(), regardless of whether that's a resource or an error + // i.e., if the watcher is given an error via OnResourceChanged(), that + // means it should stop using any previously delivered resource. + // + // It is invoked under different error conditions including but not // limited to the following: - // - authority mentioned in the resource is not found - // - resource name parsing error - // - resource deserialization error - // - resource validation error - // - ADS stream failure - // - connection failure - OnError(error, OnDoneFunc) - - // OnResourceDoesNotExist is invoked for a specific error condition where - // the requested resource is not found on the xDS management server. - OnResourceDoesNotExist(OnDoneFunc) + // - authority mentioned in the resource is not found + // - resource name parsing error + // - resource validation error (if resource is not cached) + // - ADS stream failure (if resource is not cached) + // - connection failure (if resource is not cached) + OnResourceChanged(ResourceData, error, OnDoneFunc) + + // OnAmbientError is invoked to notify the watcher of an error that occurs + // after a resource has been received (i.e. we already have a cached + // resource) that should not modify the watcher’s use of that resource but + // that may be useful information about the ambient state of the XdsClient. + // In particular, the watcher should NOT stop using the previously seen + // resource, and the XdsClient will NOT remove the resource from its cache. + // However, the error message may be useful as additional context to + // include in errors that are being generated for other reasons. + // + // If resource is already cached, it is invoked under different error + // conditions including but not limited to the following: + // - resource validation error + // - ADS stream failure + // - connection failure + OnAmbientError(error, OnDoneFunc) } // TODO: Once the implementation is complete, rename this interface as diff --git a/xds/internal/xdsclient/xdsresource/route_config_resource_type.go b/xds/internal/xdsclient/xdsresource/route_config_resource_type.go index 98ac313288a2..25576903d96d 100644 --- a/xds/internal/xdsclient/xdsresource/route_config_resource_type.go +++ b/xds/internal/xdsclient/xdsresource/route_config_resource_type.go @@ -107,39 +107,42 @@ func (r *RouteConfigResourceData) Raw() *anypb.Any { // RouteConfigWatcher wraps the callbacks to be invoked for different // events corresponding to the route configuration resource being watched. type RouteConfigWatcher interface { - // OnUpdate is invoked to report an update for the resource being watched. - OnUpdate(*RouteConfigResourceData, OnDoneFunc) - - // OnError is invoked under different error conditions including but not + // OnResourceChanged is invoked to notify the watcher of a new version of + // the resource received from the xDS server or an error indicating the + // reason why the resource cannot be obtained. + // + // It is invoked under different error conditions including but not // limited to the following: - // - authority mentioned in the resource is not found - // - resource name parsing error - // - resource deserialization error - // - resource validation error - // - ADS stream failure - // - connection failure - OnError(error, OnDoneFunc) - - // OnResourceDoesNotExist is invoked for a specific error condition where - // the requested resource is not found on the xDS management server. - OnResourceDoesNotExist(OnDoneFunc) + // - authority mentioned in the resource is not found + // - resource name parsing error + // - resource validation error (if resource is not cached) + // - ADS stream failure (if resource is not cached) + // - connection failure (if resource is not cached) + OnResourceChanged(*RouteConfigResourceData, error, OnDoneFunc) + + // If resource is already cached, it is invoked under different error + // conditions including but not limited to the following: + // - resource validation error + // - ADS stream failure + // - connection failure + OnAmbientError(error, OnDoneFunc) } type delegatingRouteConfigWatcher struct { watcher RouteConfigWatcher } -func (d *delegatingRouteConfigWatcher) OnUpdate(data ResourceData, onDone OnDoneFunc) { +func (d *delegatingRouteConfigWatcher) OnResourceChanged(data ResourceData, err error, onDone OnDoneFunc) { + if err != nil { + d.watcher.OnResourceChanged(nil, err, onDone) + return + } rc := data.(*RouteConfigResourceData) - d.watcher.OnUpdate(rc, onDone) -} - -func (d *delegatingRouteConfigWatcher) OnError(err error, onDone OnDoneFunc) { - d.watcher.OnError(err, onDone) + d.watcher.OnResourceChanged(rc, nil, onDone) } -func (d *delegatingRouteConfigWatcher) OnResourceDoesNotExist(onDone OnDoneFunc) { - d.watcher.OnResourceDoesNotExist(onDone) +func (d *delegatingRouteConfigWatcher) OnAmbientError(err error, onDone OnDoneFunc) { + d.watcher.OnAmbientError(err, onDone) } // WatchRouteConfig uses xDS to discover the configuration associated with the From 89caa23f7f363031d8efde8a89b7598cdf0ea294 Mon Sep 17 00:00:00 2001 From: Purnesh Dixit Date: Wed, 8 Jan 2025 22:52:54 +0530 Subject: [PATCH 2/7] revert notififying watchers on stream close if one response is received --- xds/internal/xdsclient/authority.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/xds/internal/xdsclient/authority.go b/xds/internal/xdsclient/authority.go index 4e324208e0f8..c0723686b3bf 100644 --- a/xds/internal/xdsclient/authority.go +++ b/xds/internal/xdsclient/authority.go @@ -177,6 +177,17 @@ func (a *authority) handleADSStreamFailure(serverConfig *bootstrap.ServerConfig, a.logger.Infof("Connection to server %s failed with error: %v", serverConfig, err) } + // We do not consider it an error if the ADS stream was closed after having + // received a response on the stream. This is because there are legitimate + // reasons why the server may need to close the stream during normal + // operations, such as needing to rebalance load or the underlying + // connection hitting its max connection age limit. See gRFC A57 for more + // details. + if xdsresource.ErrType(err) == xdsresource.ErrTypeStreamFailedAfterRecv { + a.logger.Warningf("Watchers not notified since ADS stream failed after having received at least one response: %v", err) + return + } + // Propagate the connection error from the transport layer to all watchers. for _, rType := range a.resources { for _, state := range rType { From 25dce25c7ea0dc68c388c2acf652dafd130e051e Mon Sep 17 00:00:00 2001 From: Purnesh Dixit Date: Wed, 8 Jan 2025 23:28:47 +0530 Subject: [PATCH 3/7] revert notifying watcher if previous updated deleted it --- xds/internal/xdsclient/authority.go | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/xds/internal/xdsclient/authority.go b/xds/internal/xdsclient/authority.go index c0723686b3bf..3313b9e45df8 100644 --- a/xds/internal/xdsclient/authority.go +++ b/xds/internal/xdsclient/authority.go @@ -436,15 +436,9 @@ func (a *authority) handleADSResourceUpdate(serverConfig *bootstrap.ServerConfig } if state.md.Status == xdsresource.ServiceStatusNotExist { // The metadata status is set to "ServiceStatusNotExist" if a - // previous update deleted this resource, in which case we - // want to send an ambient error. - for watcher := range state.watchers { - watcher := watcher - watcherCnt.Add(1) - funcsToSchedule = append(funcsToSchedule, func(context.Context) { - watcher.OnAmbientError(xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds: previous update deleted this resource"), done) - }) - } + // previous update deleted this resource, in which case we do not + // want to repeatedly call the watch callbacks with a + // "resource-not-found" error. continue } if serverConfig.ServerFeaturesIgnoreResourceDeletion() { From 4c7a2043f83da7fa2f4d4e205cd4506871047142 Mon Sep 17 00:00:00 2001 From: Purnesh Dixit Date: Fri, 10 Jan 2025 21:55:51 +0530 Subject: [PATCH 4/7] update OnResourceChanged() param to ResourceDataOrErr --- xds/csds/csds_e2e_test.go | 10 ++++---- .../balancer/cdsbalancer/cluster_watcher.go | 9 ++++--- .../clusterresolver/resource_resolver_eds.go | 11 ++++---- xds/internal/resolver/watch_service.go | 18 +++++++------ xds/internal/server/listener_wrapper.go | 20 ++++++++------- xds/internal/server/rds_handler.go | 16 ++++++------ xds/internal/testutils/resource_watcher.go | 10 ++++---- xds/internal/xdsclient/authority.go | 14 +++++++---- xds/internal/xdsclient/clientimpl_watchers.go | 6 +++-- .../tests/ads_stream_flow_control_test.go | 6 ++--- .../xdsclient/tests/cds_watchers_test.go | 11 ++++---- .../xdsclient/tests/eds_watchers_test.go | 11 ++++---- .../xdsclient/tests/lds_watchers_test.go | 20 ++++++++------- .../xdsclient/tests/misc_watchers_test.go | 9 ++++--- .../xdsclient/tests/rds_watchers_test.go | 11 ++++---- .../xdsresource/cluster_resource_type.go | 12 ++++----- .../xdsresource/endpoints_resource_type.go | 12 ++++----- .../xdsresource/listener_resource_type.go | 12 ++++----- .../xdsclient/xdsresource/resource_type.go | 25 ++++++++++++------- .../xdsresource/route_config_resource_type.go | 12 ++++----- 20 files changed, 141 insertions(+), 114 deletions(-) diff --git a/xds/csds/csds_e2e_test.go b/xds/csds/csds_e2e_test.go index bd11580bb640..7d75a2f83339 100644 --- a/xds/csds/csds_e2e_test.go +++ b/xds/csds/csds_e2e_test.go @@ -70,7 +70,7 @@ func Test(t *testing.T) { type nopListenerWatcher struct{} -func (nopListenerWatcher) OnResourceChanged(_ *xdsresource.ListenerResourceData, _ error, onDone xdsresource.OnDoneFunc) { +func (nopListenerWatcher) OnResourceChanged(_ *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { onDone() } func (nopListenerWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) { @@ -79,7 +79,7 @@ func (nopListenerWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) type nopRouteConfigWatcher struct{} -func (nopRouteConfigWatcher) OnResourceChanged(_ *xdsresource.RouteConfigResourceData, _ error, onDone xdsresource.OnDoneFunc) { +func (nopRouteConfigWatcher) OnResourceChanged(_ *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { onDone() } func (nopRouteConfigWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) { @@ -88,7 +88,7 @@ func (nopRouteConfigWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFu type nopClusterWatcher struct{} -func (nopClusterWatcher) OnResourceChanged(_ *xdsresource.ClusterResourceData, _ error, onDone xdsresource.OnDoneFunc) { +func (nopClusterWatcher) OnResourceChanged(_ *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { onDone() } func (nopClusterWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) { @@ -97,7 +97,7 @@ func (nopClusterWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) type nopEndpointsWatcher struct{} -func (nopEndpointsWatcher) OnResourceChanged(_ *xdsresource.EndpointsResourceData, _ error, onDone xdsresource.OnDoneFunc) { +func (nopEndpointsWatcher) OnResourceChanged(_ *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { onDone() } func (nopEndpointsWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) { @@ -125,7 +125,7 @@ func newBlockingListenerWatcher(testCtxDone <-chan struct{}) *blockingListenerWa } } -func (w *blockingListenerWatcher) OnResourceChanged(_ *xdsresource.ListenerResourceData, _ error, onDone xdsresource.OnDoneFunc) { +func (w *blockingListenerWatcher) OnResourceChanged(_ *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { writeOnDone(w.testCtxDone, w.onDoneCh, onDone) } func (w *blockingListenerWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) { diff --git a/xds/internal/balancer/cdsbalancer/cluster_watcher.go b/xds/internal/balancer/cdsbalancer/cluster_watcher.go index e6d6c6d0d34a..f6aeff1f7ef0 100644 --- a/xds/internal/balancer/cdsbalancer/cluster_watcher.go +++ b/xds/internal/balancer/cdsbalancer/cluster_watcher.go @@ -32,13 +32,14 @@ type clusterWatcher struct { parent *cdsBalancer } -func (cw *clusterWatcher) OnResourceChanged(u *xdsresource.ClusterResourceData, err error, onDone xdsresource.OnDoneFunc) { - if err != nil { - handleError := func(context.Context) { cw.parent.onClusterResourceChangedError(cw.name, err); onDone() } +func (cw *clusterWatcher) OnResourceChanged(u *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { + if u.Err != nil { + handleError := func(context.Context) { cw.parent.onClusterResourceChangedError(cw.name, u.Err); onDone() } cw.parent.serializer.ScheduleOr(handleError, onDone) return } - handleUpdate := func(context.Context) { cw.parent.onClusterUpdate(cw.name, u.Resource); onDone() } + update := u.Data.(*xdsresource.ClusterResourceData) + handleUpdate := func(context.Context) { cw.parent.onClusterUpdate(cw.name, update.Resource); onDone() } cw.parent.serializer.ScheduleOr(handleUpdate, onDone) } diff --git a/xds/internal/balancer/clusterresolver/resource_resolver_eds.go b/xds/internal/balancer/clusterresolver/resource_resolver_eds.go index 7ad3628ccc22..9bd551331a31 100644 --- a/xds/internal/balancer/clusterresolver/resource_resolver_eds.go +++ b/xds/internal/balancer/clusterresolver/resource_resolver_eds.go @@ -76,18 +76,18 @@ func newEDSResolver(nameToWatch string, producer xdsresource.Producer, topLevelR } // OnUpdate is invoked to report an update for the resource being watched. -func (er *edsDiscoveryMechanism) OnResourceChanged(update *xdsresource.EndpointsResourceData, err error, onDone xdsresource.OnDoneFunc) { +func (er *edsDiscoveryMechanism) OnResourceChanged(update *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { if er.stopped.HasFired() { onDone() return } - if err != nil { + if update.Err != nil { if er.logger.V(2) { - if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound { + if xdsresource.ErrType(update.Err) == xdsresource.ErrorTypeResourceNotFound { er.logger.Infof("EDS discovery mechanism for resource %q reported resource-does-not-exist error", er.nameToWatch) } else { - er.logger.Infof("EDS discovery mechanism for resource %q reported on resource changed error: %v", er.nameToWatch, err) + er.logger.Infof("EDS discovery mechanism for resource %q reported on resource changed error: %v", er.nameToWatch, update.Err) } } // Report an empty update that would result in no priority child being @@ -105,7 +105,8 @@ func (er *edsDiscoveryMechanism) OnResourceChanged(update *xdsresource.Endpoints } er.mu.Lock() - er.update = &update.Resource + u := update.Data.(*xdsresource.EndpointsResourceData) + er.update = &u.Resource er.mu.Unlock() er.topLevelResolver.onUpdate(onDone) diff --git a/xds/internal/resolver/watch_service.go b/xds/internal/resolver/watch_service.go index 6b716ea08b4c..cddd571cf4f3 100644 --- a/xds/internal/resolver/watch_service.go +++ b/xds/internal/resolver/watch_service.go @@ -36,13 +36,14 @@ func newListenerWatcher(resourceName string, parent *xdsResolver) *listenerWatch return lw } -func (l *listenerWatcher) OnResourceChanged(update *xdsresource.ListenerResourceData, err error, onDone xdsresource.OnDoneFunc) { - if err != nil { - handleError := func(context.Context) { l.parent.onListenerResourceChangedError(err); onDone() } +func (l *listenerWatcher) OnResourceChanged(update *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { + if update.Err != nil { + handleError := func(context.Context) { l.parent.onListenerResourceChangedError(update.Err); onDone() } l.parent.serializer.ScheduleOr(handleError, onDone) return } - handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(update.Resource); onDone() } + u := update.Data.(*xdsresource.ListenerResourceData) + handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(u.Resource); onDone() } l.parent.serializer.ScheduleOr(handleUpdate, onDone) } @@ -68,14 +69,15 @@ func newRouteConfigWatcher(resourceName string, parent *xdsResolver) *routeConfi return rw } -func (r *routeConfigWatcher) OnResourceChanged(u *xdsresource.RouteConfigResourceData, err error, onDone xdsresource.OnDoneFunc) { - if err != nil { - handleError := func(context.Context) { r.parent.onRouteConfigResourceChangedError(r.resourceName, err); onDone() } +func (r *routeConfigWatcher) OnResourceChanged(u *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { + if u.Err != nil { + handleError := func(context.Context) { r.parent.onRouteConfigResourceChangedError(r.resourceName, u.Err); onDone() } r.parent.serializer.ScheduleOr(handleError, onDone) return } handleUpdate := func(context.Context) { - r.parent.onRouteConfigResourceUpdate(r.resourceName, u.Resource) + update := u.Data.(*xdsresource.RouteConfigResourceData) + r.parent.onRouteConfigResourceUpdate(r.resourceName, update.Resource) onDone() } r.parent.serializer.ScheduleOr(handleUpdate, onDone) diff --git a/xds/internal/server/listener_wrapper.go b/xds/internal/server/listener_wrapper.go index a820a921afa1..19fe2acfe957 100644 --- a/xds/internal/server/listener_wrapper.go +++ b/xds/internal/server/listener_wrapper.go @@ -414,28 +414,30 @@ type ldsWatcher struct { name string } -func (lw *ldsWatcher) OnResourceChanged(update *xdsresource.ListenerResourceData, err error, onDone xdsresource.OnDoneFunc) { +func (lw *ldsWatcher) OnResourceChanged(update *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { defer onDone() if lw.parent.closed.HasFired() { - if err != nil { - lw.logger.Warningf("Resource %q received err: %#v after listener was closed", lw.name, err) + if update.Err != nil { + lw.logger.Warningf("Resource %q received err: %#v after listener was closed", lw.name, update.Err) } else { lw.logger.Warningf("Resource %q received update: %#v after listener was closed", lw.name, update) } return } if lw.logger.V(2) { - if err != nil { - lw.logger.Infof("LDS watch for resource %q received error: %#v", lw.name, err) + if update.Err != nil { + lw.logger.Infof("LDS watch for resource %q received error: %#v", lw.name, update.Err) } else { - lw.logger.Infof("LDS watch for resource %q received update: %#v", lw.name, update.Resource) + u := update.Data.(*xdsresource.ListenerResourceData) + lw.logger.Infof("LDS watch for resource %q received update: %#v", lw.name, u.Resource) } } - if err != nil { - lw.parent.onLDSResourceChangedError(err) + if update.Err != nil { + lw.parent.onLDSResourceChangedError(update.Err) return } - lw.parent.handleLDSUpdate(update.Resource) + u := update.Data.(*xdsresource.ListenerResourceData) + lw.parent.handleLDSUpdate(u.Resource) } func (lw *ldsWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) { diff --git a/xds/internal/server/rds_handler.go b/xds/internal/server/rds_handler.go index 998145b32767..90ba071ec226 100644 --- a/xds/internal/server/rds_handler.go +++ b/xds/internal/server/rds_handler.go @@ -147,7 +147,7 @@ type rdsWatcher struct { canceled bool // eats callbacks if true } -func (rw *rdsWatcher) OnResourceChanged(update *xdsresource.RouteConfigResourceData, err error, onDone xdsresource.OnDoneFunc) { +func (rw *rdsWatcher) OnResourceChanged(update *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { defer onDone() rw.mu.Lock() if rw.canceled { @@ -156,17 +156,19 @@ func (rw *rdsWatcher) OnResourceChanged(update *xdsresource.RouteConfigResourceD } rw.mu.Unlock() if rw.logger.V(2) { - if err != nil { - rw.logger.Infof("RDS watch for resource %q received error: %#v", rw.routeName, err) + if update.Err != nil { + rw.logger.Infof("RDS watch for resource %q received error: %#v", rw.routeName, update.Err) } else { - rw.logger.Infof("RDS watch for resource %q received update: %#v", rw.routeName, update.Resource) + u := update.Data.(*xdsresource.RouteConfigResourceData) + rw.logger.Infof("RDS watch for resource %q received update: %#v", rw.routeName, u.Resource) } } - if err != nil { - rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{err: err}) + if update.Err != nil { + rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{err: update.Err}) return } - rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{data: &update.Resource}) + u := update.Data.(*xdsresource.RouteConfigResourceData) + rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{data: &u.Resource}) } func (rw *rdsWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) { diff --git a/xds/internal/testutils/resource_watcher.go b/xds/internal/testutils/resource_watcher.go index 522b5d9f37a9..1c9fa5143d43 100644 --- a/xds/internal/testutils/resource_watcher.go +++ b/xds/internal/testutils/resource_watcher.go @@ -37,10 +37,10 @@ type TestResourceWatcher struct { // OnResourceChanged is invoked by the xDS client to report the latest update // or an error on the resource being watched. -func (w *TestResourceWatcher) OnResourceChanged(data xdsresource.ResourceData, err error, onDone xdsresource.OnDoneFunc) { +func (w *TestResourceWatcher) OnResourceChanged(update xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { defer onDone() - if err != nil { - if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound { + if update.Err != nil { + if xdsresource.ErrType(update.Err) == xdsresource.ErrorTypeResourceNotFound { select { case <-w.ResourceDoesNotExistCh: default: @@ -52,7 +52,7 @@ func (w *TestResourceWatcher) OnResourceChanged(data xdsresource.ResourceData, e case <-w.ErrorCh: default: } - w.ErrorCh <- err + w.ErrorCh <- update.Err return } @@ -60,7 +60,7 @@ func (w *TestResourceWatcher) OnResourceChanged(data xdsresource.ResourceData, e case <-w.UpdateCh: default: } - w.UpdateCh <- &data + w.UpdateCh <- &update.Data } // OnAmbientError is invoked by the xDS client to report the latest error. diff --git a/xds/internal/xdsclient/authority.go b/xds/internal/xdsclient/authority.go index 3313b9e45df8..977cdfa71e78 100644 --- a/xds/internal/xdsclient/authority.go +++ b/xds/internal/xdsclient/authority.go @@ -388,7 +388,9 @@ func (a *authority) handleADSResourceUpdate(serverConfig *bootstrap.ServerConfig watcher := watcher resource := uErr.Resource watcherCnt.Add(1) - funcsToSchedule = append(funcsToSchedule, func(context.Context) { watcher.OnResourceChanged(resource, nil, done) }) + funcsToSchedule = append(funcsToSchedule, func(context.Context) { + watcher.OnResourceChanged(xdsresource.ResourceDataOrError{Data: resource}, done) + }) } } @@ -464,7 +466,7 @@ func (a *authority) handleADSResourceUpdate(serverConfig *bootstrap.ServerConfig watcher := watcher watcherCnt.Add(1) funcsToSchedule = append(funcsToSchedule, func(context.Context) { - watcher.OnResourceChanged(nil, xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds: resource has been removed"), done) + watcher.OnResourceChanged(xdsresource.ResourceDataOrError{Err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds: resource has been removed")}, done) }) } } @@ -508,7 +510,7 @@ func (a *authority) handleADSResourceDoesNotExist(rType xdsresource.Type, resour for watcher := range state.watchers { watcher := watcher a.watcherCallbackSerializer.TrySchedule(func(context.Context) { - watcher.OnResourceChanged(nil, xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds: resource %s does not exist", rType.TypeName()), func() {}) + watcher.OnResourceChanged(xdsresource.ResourceDataOrError{Err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds: resource %s does not exist", rType.TypeName())}, func() {}) }) } } @@ -645,7 +647,9 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w // xdsClientSerializer callback. Hence making a copy of the cached // resource here for watchCallbackSerializer. resource := state.cache - a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnResourceChanged(resource, nil, func() {}) }) + a.watcherCallbackSerializer.TrySchedule(func(context.Context) { + watcher.OnResourceChanged(xdsresource.ResourceDataOrError{Data: resource}, func() {}) + }) } // If last update was NACK'd, notify the new watcher of error // immediately as well. @@ -663,7 +667,7 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w // server does not have this resource, notify the new watcher. if state.md.Status == xdsresource.ServiceStatusNotExist { a.watcherCallbackSerializer.TrySchedule(func(context.Context) { - watcher.OnResourceChanged(nil, xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds: resource %s does not exist", rType.TypeName()), func() {}) + watcher.OnResourceChanged(xdsresource.ResourceDataOrError{Err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds: resource %s does not exist", rType.TypeName())}, func() {}) }) } cleanup = a.unwatchResource(rType, resourceName, watcher) diff --git a/xds/internal/xdsclient/clientimpl_watchers.go b/xds/internal/xdsclient/clientimpl_watchers.go index b21f89131296..22292dbbdc7d 100644 --- a/xds/internal/xdsclient/clientimpl_watchers.go +++ b/xds/internal/xdsclient/clientimpl_watchers.go @@ -45,7 +45,9 @@ func (c *clientImpl) WatchResource(rType xdsresource.Type, resourceName string, if err := c.resourceTypes.maybeRegister(rType); err != nil { logger.Warningf("Watch registered for name %q of type %q which is already registered", rType.TypeName(), resourceName) - c.serializer.TrySchedule(func(context.Context) { watcher.OnResourceChanged(nil, err, func() {}) }) + c.serializer.TrySchedule(func(context.Context) { + watcher.OnResourceChanged(xdsresource.ResourceDataOrError{Err: err}, func() {}) + }) return func() {} } @@ -54,7 +56,7 @@ func (c *clientImpl) WatchResource(rType xdsresource.Type, resourceName string, if a == nil { logger.Warningf("Watch registered for name %q of type %q, authority %q is not found", rType.TypeName(), resourceName, n.Authority) c.serializer.TrySchedule(func(context.Context) { - watcher.OnResourceChanged(nil, fmt.Errorf("authority %q not found in bootstrap config for resource %q", n.Authority, resourceName), func() {}) + watcher.OnResourceChanged(xdsresource.ResourceDataOrError{Err: fmt.Errorf("authority %q not found in bootstrap config for resource %q", n.Authority, resourceName)}, func() {}) }) return func() {} } diff --git a/xds/internal/xdsclient/tests/ads_stream_flow_control_test.go b/xds/internal/xdsclient/tests/ads_stream_flow_control_test.go index ffa6cdf09d03..ee6b610ecb55 100644 --- a/xds/internal/xdsclient/tests/ads_stream_flow_control_test.go +++ b/xds/internal/xdsclient/tests/ads_stream_flow_control_test.go @@ -60,9 +60,9 @@ func newBLockingListenerWatcher() *blockingListenerWatcher { } } -func (lw *blockingListenerWatcher) OnResourceChanged(update *xdsresource.ListenerResourceData, err error, done xdsresource.OnDoneFunc) { - if err != nil { - if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound { +func (lw *blockingListenerWatcher) OnResourceChanged(update *xdsresource.ResourceDataOrError, done xdsresource.OnDoneFunc) { + if update.Err != nil { + if xdsresource.ErrType(update.Err) == xdsresource.ErrorTypeResourceNotFound { // Notify receipt of resource not found. select { case lw.notFoundCh <- struct{}{}: diff --git a/xds/internal/xdsclient/tests/cds_watchers_test.go b/xds/internal/xdsclient/tests/cds_watchers_test.go index 165ca0057b6b..229e821d9d5a 100644 --- a/xds/internal/xdsclient/tests/cds_watchers_test.go +++ b/xds/internal/xdsclient/tests/cds_watchers_test.go @@ -44,7 +44,7 @@ import ( type noopClusterWatcher struct{} -func (noopClusterWatcher) OnResourceChanged(_ *xdsresource.ClusterResourceData, _ error, onDone xdsresource.OnDoneFunc) { +func (noopClusterWatcher) OnResourceChanged(_ *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { onDone() } func (noopClusterWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) { @@ -64,13 +64,14 @@ func newClusterWatcher() *clusterWatcher { return &clusterWatcher{updateCh: testutils.NewChannel()} } -func (cw *clusterWatcher) OnResourceChanged(update *xdsresource.ClusterResourceData, err error, onDone xdsresource.OnDoneFunc) { - if err != nil { - cw.updateCh.Replace(clusterUpdateErrTuple{err: err}) +func (cw *clusterWatcher) OnResourceChanged(update *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { + if update.Err != nil { + cw.updateCh.Replace(clusterUpdateErrTuple{err: update.Err}) onDone() return } - cw.updateCh.Send(clusterUpdateErrTuple{update: update.Resource}) + u := update.Data.(*xdsresource.ClusterResourceData) + cw.updateCh.Send(clusterUpdateErrTuple{update: u.Resource}) onDone() } diff --git a/xds/internal/xdsclient/tests/eds_watchers_test.go b/xds/internal/xdsclient/tests/eds_watchers_test.go index c6506ddf408a..12b9b004b76d 100644 --- a/xds/internal/xdsclient/tests/eds_watchers_test.go +++ b/xds/internal/xdsclient/tests/eds_watchers_test.go @@ -53,7 +53,7 @@ const ( type noopEndpointsWatcher struct{} -func (noopEndpointsWatcher) OnResourceChanged(_ *xdsresource.EndpointsResourceData, _ error, onDone xdsresource.OnDoneFunc) { +func (noopEndpointsWatcher) OnResourceChanged(_ *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { onDone() } func (noopEndpointsWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) { @@ -76,13 +76,14 @@ func newEndpointsWatcher() *endpointsWatcher { return &endpointsWatcher{updateCh: testutils.NewChannel()} } -func (ew *endpointsWatcher) OnResourceChanged(update *xdsresource.EndpointsResourceData, err error, onDone xdsresource.OnDoneFunc) { - if err != nil { - ew.updateCh.Replace(endpointsUpdateErrTuple{err: err}) +func (ew *endpointsWatcher) OnResourceChanged(update *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { + if update.Err != nil { + ew.updateCh.Replace(endpointsUpdateErrTuple{err: update.Err}) onDone() return } - ew.updateCh.Send(endpointsUpdateErrTuple{update: update.Resource}) + u := update.Data.(*xdsresource.EndpointsResourceData) + ew.updateCh.Send(endpointsUpdateErrTuple{update: u.Resource}) onDone() } diff --git a/xds/internal/xdsclient/tests/lds_watchers_test.go b/xds/internal/xdsclient/tests/lds_watchers_test.go index b03e296e207e..ac913e01512f 100644 --- a/xds/internal/xdsclient/tests/lds_watchers_test.go +++ b/xds/internal/xdsclient/tests/lds_watchers_test.go @@ -48,7 +48,7 @@ import ( type noopListenerWatcher struct{} -func (noopListenerWatcher) OnResourceChanged(_ *xdsresource.ListenerResourceData, _ error, onDone xdsresource.OnDoneFunc) { +func (noopListenerWatcher) OnResourceChanged(_ *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { onDone() } func (noopListenerWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) { @@ -68,13 +68,14 @@ func newListenerWatcher() *listenerWatcher { return &listenerWatcher{updateCh: testutils.NewChannel()} } -func (lw *listenerWatcher) OnResourceChanged(update *xdsresource.ListenerResourceData, err error, onDone xdsresource.OnDoneFunc) { - if err != nil { - lw.updateCh.Replace(listenerUpdateErrTuple{err: err}) +func (lw *listenerWatcher) OnResourceChanged(update *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { + if update.Err != nil { + lw.updateCh.Replace(listenerUpdateErrTuple{err: update.Err}) onDone() return } - lw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource}) + u := update.Data.(*xdsresource.ListenerResourceData) + lw.updateCh.Send(listenerUpdateErrTuple{update: u.Resource}) onDone() } @@ -97,13 +98,14 @@ func newListenerWatcherMultiple(size int) *listenerWatcherMultiple { return &listenerWatcherMultiple{updateCh: testutils.NewChannelWithSize(size)} } -func (lw *listenerWatcherMultiple) OnResourceChanged(update *xdsresource.ListenerResourceData, err error, onDone xdsresource.OnDoneFunc) { - if err != nil { - lw.updateCh.Send(listenerUpdateErrTuple{err: err}) +func (lw *listenerWatcherMultiple) OnResourceChanged(update *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { + if update.Err != nil { + lw.updateCh.Send(listenerUpdateErrTuple{err: update.Err}) onDone() return } - lw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource}) + u := update.Data.(*xdsresource.ListenerResourceData) + lw.updateCh.Send(listenerUpdateErrTuple{update: u.Resource}) onDone() } diff --git a/xds/internal/xdsclient/tests/misc_watchers_test.go b/xds/internal/xdsclient/tests/misc_watchers_test.go index 76e764421730..a77b59ae490a 100644 --- a/xds/internal/xdsclient/tests/misc_watchers_test.go +++ b/xds/internal/xdsclient/tests/misc_watchers_test.go @@ -69,13 +69,14 @@ func newTestRouteConfigWatcher(client xdsclient.XDSClient, name1, name2 string) } } -func (rw *testRouteConfigWatcher) OnResourceChanged(update *xdsresource.RouteConfigResourceData, err error, onDone xdsresource.OnDoneFunc) { - if err != nil { - rw.updateCh.Replace(routeConfigUpdateErrTuple{err: err}) +func (rw *testRouteConfigWatcher) OnResourceChanged(update *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { + if update.Err != nil { + rw.updateCh.Replace(routeConfigUpdateErrTuple{err: update.Err}) onDone() return } - rw.updateCh.Send(routeConfigUpdateErrTuple{update: update.Resource}) + rc := update.Data.(*xdsresource.RouteConfigResourceData) + rw.updateCh.Send(routeConfigUpdateErrTuple{update: rc.Resource}) rw.cancel1 = xdsresource.WatchRouteConfig(rw.client, rw.name1, rw.rcw1) rw.cancel2 = xdsresource.WatchRouteConfig(rw.client, rw.name2, rw.rcw2) diff --git a/xds/internal/xdsclient/tests/rds_watchers_test.go b/xds/internal/xdsclient/tests/rds_watchers_test.go index dfb161bb69a5..1facba7afbce 100644 --- a/xds/internal/xdsclient/tests/rds_watchers_test.go +++ b/xds/internal/xdsclient/tests/rds_watchers_test.go @@ -43,7 +43,7 @@ import ( type noopRouteConfigWatcher struct{} -func (noopRouteConfigWatcher) OnResourceChanged(_ *xdsresource.RouteConfigResourceData, _ error, onDone xdsresource.OnDoneFunc) { +func (noopRouteConfigWatcher) OnResourceChanged(_ *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { onDone() } func (noopRouteConfigWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) { @@ -63,13 +63,14 @@ func newRouteConfigWatcher() *routeConfigWatcher { return &routeConfigWatcher{updateCh: testutils.NewChannel()} } -func (rw *routeConfigWatcher) OnResourceChanged(update *xdsresource.RouteConfigResourceData, err error, onDone xdsresource.OnDoneFunc) { - if err != nil { - rw.updateCh.Replace(routeConfigUpdateErrTuple{err: err}) +func (rw *routeConfigWatcher) OnResourceChanged(update *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) { + if update.Err != nil { + rw.updateCh.Replace(routeConfigUpdateErrTuple{err: update.Err}) onDone() return } - rw.updateCh.Send(routeConfigUpdateErrTuple{update: update.Resource}) + rc := update.Data.(*xdsresource.RouteConfigResourceData) + rw.updateCh.Send(routeConfigUpdateErrTuple{update: rc.Resource}) onDone() } diff --git a/xds/internal/xdsclient/xdsresource/cluster_resource_type.go b/xds/internal/xdsclient/xdsresource/cluster_resource_type.go index 0e43f0261cd4..1c45867a419e 100644 --- a/xds/internal/xdsclient/xdsresource/cluster_resource_type.go +++ b/xds/internal/xdsclient/xdsresource/cluster_resource_type.go @@ -121,7 +121,7 @@ type ClusterWatcher interface { // - resource validation error (if resource is not cached) // - ADS stream failure (if resource is not cached) // - connection failure (if resource is not cached) - OnResourceChanged(*ClusterResourceData, error, OnDoneFunc) + OnResourceChanged(*ResourceDataOrError, OnDoneFunc) // If resource is already cached, it is invoked under different error // conditions including but not limited to the following: @@ -135,13 +135,13 @@ type delegatingClusterWatcher struct { watcher ClusterWatcher } -func (d *delegatingClusterWatcher) OnResourceChanged(data ResourceData, err error, onDone OnDoneFunc) { - if err != nil { - d.watcher.OnResourceChanged(nil, err, onDone) +func (d *delegatingClusterWatcher) OnResourceChanged(update ResourceDataOrError, onDone OnDoneFunc) { + if update.Err != nil { + d.watcher.OnResourceChanged(&ResourceDataOrError{Err: update.Err}, onDone) return } - c := data.(*ClusterResourceData) - d.watcher.OnResourceChanged(c, nil, onDone) + c := update.Data.(*ClusterResourceData) + d.watcher.OnResourceChanged(&ResourceDataOrError{Data: c}, onDone) } func (d *delegatingClusterWatcher) OnAmbientError(err error, onDone OnDoneFunc) { diff --git a/xds/internal/xdsclient/xdsresource/endpoints_resource_type.go b/xds/internal/xdsclient/xdsresource/endpoints_resource_type.go index 2f0faf5b70aa..f92dc1a2734b 100644 --- a/xds/internal/xdsclient/xdsresource/endpoints_resource_type.go +++ b/xds/internal/xdsclient/xdsresource/endpoints_resource_type.go @@ -117,7 +117,7 @@ type EndpointsWatcher interface { // - resource validation error (if resource is not cached) // - ADS stream failure (if resource is not cached) // - connection failure (if resource is not cached) - OnResourceChanged(*EndpointsResourceData, error, OnDoneFunc) + OnResourceChanged(*ResourceDataOrError, OnDoneFunc) // If resource is already cached, it is invoked under different error // conditions including but not limited to the following: @@ -131,13 +131,13 @@ type delegatingEndpointsWatcher struct { watcher EndpointsWatcher } -func (d *delegatingEndpointsWatcher) OnResourceChanged(data ResourceData, err error, onDone OnDoneFunc) { - if err != nil { - d.watcher.OnResourceChanged(nil, err, onDone) +func (d *delegatingEndpointsWatcher) OnResourceChanged(update ResourceDataOrError, onDone OnDoneFunc) { + if update.Err != nil { + d.watcher.OnResourceChanged(&ResourceDataOrError{Err: update.Err}, onDone) return } - e := data.(*EndpointsResourceData) - d.watcher.OnResourceChanged(e, nil, onDone) + e := update.Data.(*EndpointsResourceData) + d.watcher.OnResourceChanged(&ResourceDataOrError{Data: e}, onDone) } func (d *delegatingEndpointsWatcher) OnAmbientError(err error, onDone OnDoneFunc) { diff --git a/xds/internal/xdsclient/xdsresource/listener_resource_type.go b/xds/internal/xdsclient/xdsresource/listener_resource_type.go index 07ddd5ae1bfc..af5a8564924d 100644 --- a/xds/internal/xdsclient/xdsresource/listener_resource_type.go +++ b/xds/internal/xdsclient/xdsresource/listener_resource_type.go @@ -154,7 +154,7 @@ type ListenerWatcher interface { // - resource validation error (if resource is not cached) // - ADS stream failure (if resource is not cached) // - connection failure (if resource is not cached) - OnResourceChanged(*ListenerResourceData, error, OnDoneFunc) + OnResourceChanged(*ResourceDataOrError, OnDoneFunc) // If resource is already cached, it is invoked under different error // conditions including but not limited to the following: @@ -168,13 +168,13 @@ type delegatingListenerWatcher struct { watcher ListenerWatcher } -func (d *delegatingListenerWatcher) OnResourceChanged(data ResourceData, err error, onDone OnDoneFunc) { - if err != nil { - d.watcher.OnResourceChanged(nil, err, onDone) +func (d *delegatingListenerWatcher) OnResourceChanged(update ResourceDataOrError, onDone OnDoneFunc) { + if update.Err != nil { + d.watcher.OnResourceChanged(&ResourceDataOrError{Err: update.Err}, onDone) return } - l := data.(*ListenerResourceData) - d.watcher.OnResourceChanged(l, nil, onDone) + l := update.Data.(*ListenerResourceData) + d.watcher.OnResourceChanged(&ResourceDataOrError{Data: l}, onDone) } func (d *delegatingListenerWatcher) OnAmbientError(err error, onDone OnDoneFunc) { diff --git a/xds/internal/xdsclient/xdsresource/resource_type.go b/xds/internal/xdsclient/xdsresource/resource_type.go index 55b5f4a88430..19542b8c1096 100644 --- a/xds/internal/xdsclient/xdsresource/resource_type.go +++ b/xds/internal/xdsclient/xdsresource/resource_type.go @@ -58,18 +58,25 @@ type Producer interface { // from the xDS server. type OnDoneFunc func() -// ResourceWatcher is an interface that can to be implemented to wrap the -// callbacks to be invoked for different events corresponding to the resource -// being watched. +// ResourceDataOrError is a struct that contains either ResourceData or error. +// It is used to represent the result of an xDS resource update. Exactly one of +// Data or Err will be non-nil. +type ResourceDataOrError struct { + Data ResourceData + Err error +} + +// ResourceWatcher wraps the callbacks to be invoked for different events +// corresponding to the resource being watched. type ResourceWatcher interface { // OnResourceChanged is invoked to notify the watcher of a new version of // the resource received from the xDS server or an error indicating the - // reason why the resource cannot be obtained. + // reason why the resource could not be obtained. // - // The ResourceData parameter needs to be type asserted to the appropriate - // type for the resource being watched. In case of error, the ResourceData - // is nil otherwise its not nil and error is nil but both will never be nil - // together. + // The ResourceData of the ResourceDataOrError needs to be type asserted to + // the appropriate type for the resource being watched. In case of error, + // the ResourceData is nil otherwise its not nil and error is nil but both + // will never be nil together. // // Watcher is expected to use the most recent value passed to // OnResourceChanged(), regardless of whether that's a resource or an error @@ -83,7 +90,7 @@ type ResourceWatcher interface { // - resource validation error (if resource is not cached) // - ADS stream failure (if resource is not cached) // - connection failure (if resource is not cached) - OnResourceChanged(ResourceData, error, OnDoneFunc) + OnResourceChanged(ResourceDataOrError, OnDoneFunc) // OnAmbientError is invoked to notify the watcher of an error that occurs // after a resource has been received (i.e. we already have a cached diff --git a/xds/internal/xdsclient/xdsresource/route_config_resource_type.go b/xds/internal/xdsclient/xdsresource/route_config_resource_type.go index 25576903d96d..2569e2b62a3a 100644 --- a/xds/internal/xdsclient/xdsresource/route_config_resource_type.go +++ b/xds/internal/xdsclient/xdsresource/route_config_resource_type.go @@ -118,7 +118,7 @@ type RouteConfigWatcher interface { // - resource validation error (if resource is not cached) // - ADS stream failure (if resource is not cached) // - connection failure (if resource is not cached) - OnResourceChanged(*RouteConfigResourceData, error, OnDoneFunc) + OnResourceChanged(*ResourceDataOrError, OnDoneFunc) // If resource is already cached, it is invoked under different error // conditions including but not limited to the following: @@ -132,13 +132,13 @@ type delegatingRouteConfigWatcher struct { watcher RouteConfigWatcher } -func (d *delegatingRouteConfigWatcher) OnResourceChanged(data ResourceData, err error, onDone OnDoneFunc) { - if err != nil { - d.watcher.OnResourceChanged(nil, err, onDone) +func (d *delegatingRouteConfigWatcher) OnResourceChanged(update ResourceDataOrError, onDone OnDoneFunc) { + if update.Err != nil { + d.watcher.OnResourceChanged(&ResourceDataOrError{Err: update.Err}, onDone) return } - rc := data.(*RouteConfigResourceData) - d.watcher.OnResourceChanged(rc, nil, onDone) + rc := update.Data.(*RouteConfigResourceData) + d.watcher.OnResourceChanged(&ResourceDataOrError{Data: rc}, onDone) } func (d *delegatingRouteConfigWatcher) OnAmbientError(err error, onDone OnDoneFunc) { From 1fdeb0da8576fc8697c6b94b68f1e49afff5f5c1 Mon Sep 17 00:00:00 2001 From: Purnesh Dixit Date: Tue, 14 Jan 2025 01:31:29 +0530 Subject: [PATCH 5/7] easwars review 2 --- .../balancer/cdsbalancer/cdsbalancer.go | 2 ++ .../clusterresolver/resource_resolver_eds.go | 6 +---- xds/internal/resolver/xds_resolver.go | 12 ++-------- xds/internal/server/listener_wrapper.go | 14 +++++------ xds/internal/server/rds_handler.go | 14 +++++------ xds/internal/testutils/resource_watcher.go | 1 - xds/internal/xdsclient/authority.go | 6 ++--- .../xdsclient/tests/resource_update_test.go | 24 +++++++++---------- .../xdsresource/cluster_resource_type.go | 4 ++-- .../xdsresource/endpoints_resource_type.go | 4 ++-- .../xdsresource/listener_resource_type.go | 4 ++-- .../xdsclient/xdsresource/resource_type.go | 17 +++++++------ .../xdsresource/route_config_resource_type.go | 4 ++-- 13 files changed, 50 insertions(+), 62 deletions(-) diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer.go b/xds/internal/balancer/cdsbalancer/cdsbalancer.go index 6254833bd100..d72f365f8202 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer.go @@ -526,6 +526,8 @@ func (b *cdsBalancer) onClusterAmbientError(name string, err error) { // // Only executed in the context of a serializer callback. func (b *cdsBalancer) onClusterResourceChangedError(name string, err error) { + b.logger.Warningf("Cluster resource %q received error update: %v", name, err) + if b.childLB != nil { b.childLB.ResolverError(err) } else { diff --git a/xds/internal/balancer/clusterresolver/resource_resolver_eds.go b/xds/internal/balancer/clusterresolver/resource_resolver_eds.go index 9bd551331a31..3aa757437d9b 100644 --- a/xds/internal/balancer/clusterresolver/resource_resolver_eds.go +++ b/xds/internal/balancer/clusterresolver/resource_resolver_eds.go @@ -84,11 +84,7 @@ func (er *edsDiscoveryMechanism) OnResourceChanged(update *xdsresource.ResourceD if update.Err != nil { if er.logger.V(2) { - if xdsresource.ErrType(update.Err) == xdsresource.ErrorTypeResourceNotFound { - er.logger.Infof("EDS discovery mechanism for resource %q reported resource-does-not-exist error", er.nameToWatch) - } else { - er.logger.Infof("EDS discovery mechanism for resource %q reported on resource changed error: %v", er.nameToWatch, update.Err) - } + er.logger.Infof("EDS discovery mechanism for resource %q reported on resource changed error: %v", er.nameToWatch, update.Err) } // Report an empty update that would result in no priority child being // created for this discovery mechanism. This would result in the priority diff --git a/xds/internal/resolver/xds_resolver.go b/xds/internal/resolver/xds_resolver.go index ea6a6bc62d4f..3795d625d526 100644 --- a/xds/internal/resolver/xds_resolver.go +++ b/xds/internal/resolver/xds_resolver.go @@ -528,11 +528,7 @@ func (r *xdsResolver) onListenerResourceAmbientError(err error) { // Only executed in the context of a serializer callback. func (r *xdsResolver) onListenerResourceChangedError(err error) { if r.logger.V(2) { - if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound { - r.logger.Infof("Received resource-not-found-error for Listener resource %q", r.ldsResourceName) - } else { - r.logger.Infof("Received on-resource-changed error for Listener resource %q: %v", r.ldsResourceName, err) - } + r.logger.Infof("Received on-resource-changed error for Listener resource %q: %v", r.ldsResourceName, err) } r.listenerUpdateRecvd = false @@ -573,11 +569,7 @@ func (r *xdsResolver) onRouteConfigResourceAmbientError(name string, err error) // Only executed in the context of a serializer callback. func (r *xdsResolver) onRouteConfigResourceChangedError(name string, err error) { if r.logger.V(2) { - if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound { - r.logger.Infof("Received resource-not-found-error for RouteConfiguration resource %q", name) - } else { - r.logger.Infof("Received on-resource-changed error for RouteConfiguration resource %q: %v", name, err) - } + r.logger.Infof("Received on-resource-changed error for RouteConfiguration resource %q: %v", name, err) } if r.rdsResourceName != name { diff --git a/xds/internal/server/listener_wrapper.go b/xds/internal/server/listener_wrapper.go index 19fe2acfe957..b8c2fa477666 100644 --- a/xds/internal/server/listener_wrapper.go +++ b/xds/internal/server/listener_wrapper.go @@ -424,19 +424,17 @@ func (lw *ldsWatcher) OnResourceChanged(update *xdsresource.ResourceDataOrError, } return } - if lw.logger.V(2) { - if update.Err != nil { - lw.logger.Infof("LDS watch for resource %q received error: %#v", lw.name, update.Err) - } else { - u := update.Data.(*xdsresource.ListenerResourceData) - lw.logger.Infof("LDS watch for resource %q received update: %#v", lw.name, u.Resource) - } - } if update.Err != nil { + if lw.logger.V(2) { + lw.logger.Infof("LDS watch for resource %q received error: %v", lw.name, update.Err) + } lw.parent.onLDSResourceChangedError(update.Err) return } u := update.Data.(*xdsresource.ListenerResourceData) + if update.Err != nil { + lw.logger.Infof("LDS watch for resource %q received update: %v", lw.name, u.Resource) + } lw.parent.handleLDSUpdate(u.Resource) } diff --git a/xds/internal/server/rds_handler.go b/xds/internal/server/rds_handler.go index 90ba071ec226..7999bcbc7140 100644 --- a/xds/internal/server/rds_handler.go +++ b/xds/internal/server/rds_handler.go @@ -155,19 +155,17 @@ func (rw *rdsWatcher) OnResourceChanged(update *xdsresource.ResourceDataOrError, return } rw.mu.Unlock() - if rw.logger.V(2) { - if update.Err != nil { - rw.logger.Infof("RDS watch for resource %q received error: %#v", rw.routeName, update.Err) - } else { - u := update.Data.(*xdsresource.RouteConfigResourceData) - rw.logger.Infof("RDS watch for resource %q received update: %#v", rw.routeName, u.Resource) - } - } if update.Err != nil { + if rw.logger.V(2) { + rw.logger.Infof("RDS watch for resource %q received error: %v", rw.routeName, update.Err) + } rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{err: update.Err}) return } u := update.Data.(*xdsresource.RouteConfigResourceData) + if rw.logger.V(2) { + rw.logger.Infof("RDS watch for resource %q received update: %#v", rw.routeName, u.Resource) + } rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{data: &u.Resource}) } diff --git a/xds/internal/testutils/resource_watcher.go b/xds/internal/testutils/resource_watcher.go index 1c9fa5143d43..50db5d63554a 100644 --- a/xds/internal/testutils/resource_watcher.go +++ b/xds/internal/testutils/resource_watcher.go @@ -54,7 +54,6 @@ func (w *TestResourceWatcher) OnResourceChanged(update xdsresource.ResourceDataO } w.ErrorCh <- update.Err return - } select { case <-w.UpdateCh: diff --git a/xds/internal/xdsclient/authority.go b/xds/internal/xdsclient/authority.go index 977cdfa71e78..ed4540c4f5ed 100644 --- a/xds/internal/xdsclient/authority.go +++ b/xds/internal/xdsclient/authority.go @@ -466,7 +466,7 @@ func (a *authority) handleADSResourceUpdate(serverConfig *bootstrap.ServerConfig watcher := watcher watcherCnt.Add(1) funcsToSchedule = append(funcsToSchedule, func(context.Context) { - watcher.OnResourceChanged(xdsresource.ResourceDataOrError{Err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds: resource has been removed")}, done) + watcher.OnResourceChanged(xdsresource.ResourceDataOrError{Err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds: resource %q of type %q has been removed", name, rType.TypeName())}, done) }) } } @@ -510,7 +510,7 @@ func (a *authority) handleADSResourceDoesNotExist(rType xdsresource.Type, resour for watcher := range state.watchers { watcher := watcher a.watcherCallbackSerializer.TrySchedule(func(context.Context) { - watcher.OnResourceChanged(xdsresource.ResourceDataOrError{Err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds: resource %s does not exist", rType.TypeName())}, func() {}) + watcher.OnResourceChanged(xdsresource.ResourceDataOrError{Err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds: resource %q of type %q does not exist", resourceName, rType.TypeName())}, func() {}) }) } } @@ -667,7 +667,7 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w // server does not have this resource, notify the new watcher. if state.md.Status == xdsresource.ServiceStatusNotExist { a.watcherCallbackSerializer.TrySchedule(func(context.Context) { - watcher.OnResourceChanged(xdsresource.ResourceDataOrError{Err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds: resource %s does not exist", rType.TypeName())}, func() {}) + watcher.OnResourceChanged(xdsresource.ResourceDataOrError{Err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds: resource %q of type %q does not exist", resourceName, rType.TypeName())}, func() {}) }) } cleanup = a.unwatchResource(rType, resourceName, watcher) diff --git a/xds/internal/xdsclient/tests/resource_update_test.go b/xds/internal/xdsclient/tests/resource_update_test.go index 67681a1ce641..b66c66c35c69 100644 --- a/xds/internal/xdsclient/tests/resource_update_test.go +++ b/xds/internal/xdsclient/tests/resource_update_test.go @@ -161,7 +161,7 @@ func (s) TestHandleListenerResponseFromManagementServer(t *testing.T) { Value: []byte{1, 2, 3, 4}, }}, }, - wantErr: "xds: resource ListenerResource does not exist", + wantErr: fmt.Sprintf("xds: resource \"%v\" of type \"ListenerResource\" does not exist", resourceName1), wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{ { TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", @@ -177,7 +177,7 @@ func (s) TestHandleListenerResponseFromManagementServer(t *testing.T) { TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", VersionInfo: "1", }, - wantErr: "xds: resource ListenerResource does not exist", + wantErr: fmt.Sprintf("xds: resource \"%v\" of type \"ListenerResource\" does not exist", resourceName1), wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{ { TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", @@ -194,7 +194,7 @@ func (s) TestHandleListenerResponseFromManagementServer(t *testing.T) { VersionInfo: "1", Resources: []*anypb.Any{testutils.MarshalAny(t, &v3routepb.RouteConfiguration{})}, }, - wantErr: "xds: resource ListenerResource does not exist", + wantErr: fmt.Sprintf("xds: resource \"%v\" of type \"ListenerResource\" does not exist", resourceName1), wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{ { TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", @@ -418,7 +418,7 @@ func (s) TestHandleRouteConfigResponseFromManagementServer(t *testing.T) { Value: []byte{1, 2, 3, 4}, }}, }, - wantErr: "xds: resource RouteConfigResource does not exist", + wantErr: fmt.Sprintf("xds: resource \"%v\" of type \"RouteConfigResource\" does not exist", resourceName1), wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{ { TypeUrl: "type.googleapis.com/envoy.config.route.v3.RouteConfiguration", @@ -434,7 +434,7 @@ func (s) TestHandleRouteConfigResponseFromManagementServer(t *testing.T) { TypeUrl: "type.googleapis.com/envoy.config.route.v3.RouteConfiguration", VersionInfo: "1", }, - wantErr: "xds: resource RouteConfigResource does not exist", + wantErr: fmt.Sprintf("xds: resource \"%v\" of type \"RouteConfigResource\" does not exist", resourceName1), wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{ { TypeUrl: "type.googleapis.com/envoy.config.route.v3.RouteConfiguration", @@ -451,7 +451,7 @@ func (s) TestHandleRouteConfigResponseFromManagementServer(t *testing.T) { VersionInfo: "1", Resources: []*anypb.Any{testutils.MarshalAny(t, &v3clusterpb.Cluster{})}, }, - wantErr: "xds: resource RouteConfigResource does not exist", + wantErr: fmt.Sprintf("xds: resource \"%v\" of type \"RouteConfigResource\" does not exist", resourceName1), wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{ { TypeUrl: "type.googleapis.com/envoy.config.route.v3.RouteConfiguration", @@ -667,7 +667,7 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) { Value: []byte{1, 2, 3, 4}, }}, }, - wantErr: "xds: resource ClusterResource does not exist", + wantErr: fmt.Sprintf("xds: resource \"%v\" of type \"ClusterResource\" does not exist", resourceName1), wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{ { TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster", @@ -683,7 +683,7 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) { TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster", VersionInfo: "1", }, - wantErr: "xds: resource ClusterResource does not exist", + wantErr: fmt.Sprintf("xds: resource \"%v\" of type \"ClusterResource\" does not exist", resourceName1), wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{ { TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster", @@ -700,7 +700,7 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) { VersionInfo: "1", Resources: []*anypb.Any{testutils.MarshalAny(t, &v3endpointpb.ClusterLoadAssignment{})}, }, - wantErr: "xds: resource ClusterResource does not exist", + wantErr: fmt.Sprintf("xds: resource \"%v\" of type \"ClusterResource\" does not exist", resourceName1), wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{ { TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster", @@ -974,7 +974,7 @@ func (s) TestHandleEndpointsResponseFromManagementServer(t *testing.T) { Value: []byte{1, 2, 3, 4}, }}, }, - wantErr: "xds: resource EndpointsResource does not exist", + wantErr: fmt.Sprintf("xds: resource \"%v\" of type \"EndpointsResource\" does not exist", resourceName1), wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{ { TypeUrl: "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", @@ -990,7 +990,7 @@ func (s) TestHandleEndpointsResponseFromManagementServer(t *testing.T) { TypeUrl: "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", VersionInfo: "1", }, - wantErr: "xds: resource EndpointsResource does not exist", + wantErr: fmt.Sprintf("xds: resource \"%v\" of type \"EndpointsResource\" does not exist", resourceName1), wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{ { TypeUrl: "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", @@ -1007,7 +1007,7 @@ func (s) TestHandleEndpointsResponseFromManagementServer(t *testing.T) { VersionInfo: "1", Resources: []*anypb.Any{testutils.MarshalAny(t, &v3listenerpb.Listener{})}, }, - wantErr: "xds: resource EndpointsResource does not exist", + wantErr: fmt.Sprintf("xds: resource \"%v\" of type \"EndpointsResource\" does not exist", resourceName1), wantGenericXDSConfig: []*v3statuspb.ClientConfig_GenericXdsConfig{ { TypeUrl: "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", diff --git a/xds/internal/xdsclient/xdsresource/cluster_resource_type.go b/xds/internal/xdsclient/xdsresource/cluster_resource_type.go index 1c45867a419e..dbc107519b28 100644 --- a/xds/internal/xdsclient/xdsresource/cluster_resource_type.go +++ b/xds/internal/xdsclient/xdsresource/cluster_resource_type.go @@ -123,8 +123,8 @@ type ClusterWatcher interface { // - connection failure (if resource is not cached) OnResourceChanged(*ResourceDataOrError, OnDoneFunc) - // If resource is already cached, it is invoked under different error - // conditions including but not limited to the following: + // OnAmbientError is invoked if resource is already cached under different + // error conditions including but not limited to the following: // - resource validation error // - ADS stream failure // - connection failure diff --git a/xds/internal/xdsclient/xdsresource/endpoints_resource_type.go b/xds/internal/xdsclient/xdsresource/endpoints_resource_type.go index f92dc1a2734b..1e21426e9ef3 100644 --- a/xds/internal/xdsclient/xdsresource/endpoints_resource_type.go +++ b/xds/internal/xdsclient/xdsresource/endpoints_resource_type.go @@ -119,8 +119,8 @@ type EndpointsWatcher interface { // - connection failure (if resource is not cached) OnResourceChanged(*ResourceDataOrError, OnDoneFunc) - // If resource is already cached, it is invoked under different error - // conditions including but not limited to the following: + // OnAmbientError is invoked if resource is already cached under different + // error conditions including but not limited to the following: // - resource validation error // - ADS stream failure // - connection failure diff --git a/xds/internal/xdsclient/xdsresource/listener_resource_type.go b/xds/internal/xdsclient/xdsresource/listener_resource_type.go index af5a8564924d..5946082cfb66 100644 --- a/xds/internal/xdsclient/xdsresource/listener_resource_type.go +++ b/xds/internal/xdsclient/xdsresource/listener_resource_type.go @@ -156,8 +156,8 @@ type ListenerWatcher interface { // - connection failure (if resource is not cached) OnResourceChanged(*ResourceDataOrError, OnDoneFunc) - // If resource is already cached, it is invoked under different error - // conditions including but not limited to the following: + // OnAmbientError is invoked if resource is already cached under different + // error conditions including but not limited to the following: // - resource validation error // - ADS stream failure // - connection failure diff --git a/xds/internal/xdsclient/xdsresource/resource_type.go b/xds/internal/xdsclient/xdsresource/resource_type.go index 19542b8c1096..6d79053b544f 100644 --- a/xds/internal/xdsclient/xdsresource/resource_type.go +++ b/xds/internal/xdsclient/xdsresource/resource_type.go @@ -73,15 +73,18 @@ type ResourceWatcher interface { // the resource received from the xDS server or an error indicating the // reason why the resource could not be obtained. // - // The ResourceData of the ResourceDataOrError needs to be type asserted to - // the appropriate type for the resource being watched. In case of error, - // the ResourceData is nil otherwise its not nil and error is nil but both - // will never be nil together. + // In the former case, this callback will be invoked with a non-nil + // ResourceData in ResourceDataOrError. The ResourceData of the + // ResourceDataOrError needs to be type asserted to the appropriate type + // for the resource being watched. + // + // In the latter case, this callback will be invoked with a non-nil error + // value in ResourceDataOrError. // // Watcher is expected to use the most recent value passed to - // OnResourceChanged(), regardless of whether that's a resource or an error - // i.e., if the watcher is given an error via OnResourceChanged(), that - // means it should stop using any previously delivered resource. + // OnResourceChanged(), regardless of whether that's a ResourceData or an + // error i.e., if the watcher is given an error via OnResourceChanged(), + // that means it should stop using any previously delivered resource. // // It is invoked under different error conditions including but not // limited to the following: diff --git a/xds/internal/xdsclient/xdsresource/route_config_resource_type.go b/xds/internal/xdsclient/xdsresource/route_config_resource_type.go index 2569e2b62a3a..8077748201ba 100644 --- a/xds/internal/xdsclient/xdsresource/route_config_resource_type.go +++ b/xds/internal/xdsclient/xdsresource/route_config_resource_type.go @@ -120,8 +120,8 @@ type RouteConfigWatcher interface { // - connection failure (if resource is not cached) OnResourceChanged(*ResourceDataOrError, OnDoneFunc) - // If resource is already cached, it is invoked under different error - // conditions including but not limited to the following: + // OnAmbientError is invoked if resource is already cached under different + // error conditions including but not limited to the following: // - resource validation error // - ADS stream failure // - connection failure From 516a01f07db624e15c793ec7124d7bc9964fb98b Mon Sep 17 00:00:00 2001 From: Purnesh Dixit Date: Fri, 31 Jan 2025 16:43:15 +0530 Subject: [PATCH 6/7] update ResourceWatcher documentation --- .../xdsresource/cluster_resource_type.go | 21 ++++---- .../xdsresource/endpoints_resource_type.go | 21 ++++---- .../xdsresource/listener_resource_type.go | 21 ++++---- .../xdsclient/xdsresource/resource_type.go | 51 +++++-------------- .../xdsresource/route_config_resource_type.go | 22 ++++---- 5 files changed, 50 insertions(+), 86 deletions(-) diff --git a/xds/internal/xdsclient/xdsresource/cluster_resource_type.go b/xds/internal/xdsclient/xdsresource/cluster_resource_type.go index dbc107519b28..4c16900af86c 100644 --- a/xds/internal/xdsclient/xdsresource/cluster_resource_type.go +++ b/xds/internal/xdsclient/xdsresource/cluster_resource_type.go @@ -108,26 +108,23 @@ func (c *ClusterResourceData) Raw() *anypb.Any { } // ClusterWatcher wraps the callbacks to be invoked for different events -// corresponding to the cluster resource being watched. +// corresponding to the cluster resource being watched. gRFC A88 contains an +// exhaustive list of what method is invoked under what conditions. type ClusterWatcher interface { // OnResourceChanged is invoked to notify the watcher of a new version of // the resource received from the xDS server or an error indicating the // reason why the resource cannot be obtained. // - // It is invoked under different error conditions including but not - // limited to the following: - // - authority mentioned in the resource is not found - // - resource name parsing error - // - resource validation error (if resource is not cached) - // - ADS stream failure (if resource is not cached) - // - connection failure (if resource is not cached) + // Upon receiving this, in case of an error, the watcher should + // stop using any previously seen resource. xDS client will remove the + // resource from its cache. OnResourceChanged(*ResourceDataOrError, OnDoneFunc) // OnAmbientError is invoked if resource is already cached under different - // error conditions including but not limited to the following: - // - resource validation error - // - ADS stream failure - // - connection failure + // error conditions. + // + // Upon receiving this, the watcher may continue using the previously seen + // resource. xDS client will not remove the resource from its cache. OnAmbientError(error, OnDoneFunc) } diff --git a/xds/internal/xdsclient/xdsresource/endpoints_resource_type.go b/xds/internal/xdsclient/xdsresource/endpoints_resource_type.go index 1e21426e9ef3..c74e26abc113 100644 --- a/xds/internal/xdsclient/xdsresource/endpoints_resource_type.go +++ b/xds/internal/xdsclient/xdsresource/endpoints_resource_type.go @@ -104,26 +104,23 @@ func (e *EndpointsResourceData) Raw() *anypb.Any { } // EndpointsWatcher wraps the callbacks to be invoked for different -// events corresponding to the endpoints resource being watched. +// events corresponding to the endpoints resource being watched. gRFC A88 +// contains an exhaustive list of what method is invoked under what conditions. type EndpointsWatcher interface { // OnResourceChanged is invoked to notify the watcher of a new version of // the resource received from the xDS server or an error indicating the // reason why the resource cannot be obtained. // - // It is invoked under different error conditions including but not - // limited to the following: - // - authority mentioned in the resource is not found - // - resource name parsing error - // - resource validation error (if resource is not cached) - // - ADS stream failure (if resource is not cached) - // - connection failure (if resource is not cached) + // Upon receiving this, in case of an error, the watcher should + // stop using any previously seen resource. xDS client will remove the + // resource from its cache. OnResourceChanged(*ResourceDataOrError, OnDoneFunc) // OnAmbientError is invoked if resource is already cached under different - // error conditions including but not limited to the following: - // - resource validation error - // - ADS stream failure - // - connection failure + // error conditions. + // + // Upon receiving this, the watcher may continue using the previously seen + // resource. xDS client will not remove the resource from its cache. OnAmbientError(error, OnDoneFunc) } diff --git a/xds/internal/xdsclient/xdsresource/listener_resource_type.go b/xds/internal/xdsclient/xdsresource/listener_resource_type.go index 5946082cfb66..8cc6a307d8eb 100644 --- a/xds/internal/xdsclient/xdsresource/listener_resource_type.go +++ b/xds/internal/xdsclient/xdsresource/listener_resource_type.go @@ -141,26 +141,23 @@ func (l *ListenerResourceData) Raw() *anypb.Any { } // ListenerWatcher wraps the callbacks to be invoked for different -// events corresponding to the listener resource being watched. +// events corresponding to the listener resource being watched. gRFC A88 +// contains an exhaustive list of what method is invoked under what conditions. type ListenerWatcher interface { // OnResourceChanged is invoked to notify the watcher of a new version of // the resource received from the xDS server or an error indicating the // reason why the resource cannot be obtained. // - // It is invoked under different error conditions including but not - // limited to the following: - // - authority mentioned in the resource is not found - // - resource name parsing error - // - resource validation error (if resource is not cached) - // - ADS stream failure (if resource is not cached) - // - connection failure (if resource is not cached) + // Upon receiving this, in case of an error, the watcher should + // stop using any previously seen resource. xDS client will remove the + // resource from its cache. OnResourceChanged(*ResourceDataOrError, OnDoneFunc) // OnAmbientError is invoked if resource is already cached under different - // error conditions including but not limited to the following: - // - resource validation error - // - ADS stream failure - // - connection failure + // error conditions. + // + // Upon receiving this, the watcher may continue using the previously seen + // resource. xDS client will not remove the resource from its cache. OnAmbientError(error, OnDoneFunc) } diff --git a/xds/internal/xdsclient/xdsresource/resource_type.go b/xds/internal/xdsclient/xdsresource/resource_type.go index 6d79053b544f..fcc85e2f699a 100644 --- a/xds/internal/xdsclient/xdsresource/resource_type.go +++ b/xds/internal/xdsclient/xdsresource/resource_type.go @@ -58,57 +58,32 @@ type Producer interface { // from the xDS server. type OnDoneFunc func() -// ResourceDataOrError is a struct that contains either ResourceData or error. -// It is used to represent the result of an xDS resource update. Exactly one of -// Data or Err will be non-nil. +// ResourceDataOrError contains either ResourceData or error. It is used to +// represent the result of an xDS resource update. Exactly one of Data or Err +// will be non-nil. type ResourceDataOrError struct { Data ResourceData Err error } // ResourceWatcher wraps the callbacks to be invoked for different events -// corresponding to the resource being watched. +// corresponding to the resource being watched. gRFC A88 contains an exhaustive +// list of what method is invoked under what conditions. type ResourceWatcher interface { // OnResourceChanged is invoked to notify the watcher of a new version of // the resource received from the xDS server or an error indicating the - // reason why the resource could not be obtained. + // reason why the resource cannot be obtained. // - // In the former case, this callback will be invoked with a non-nil - // ResourceData in ResourceDataOrError. The ResourceData of the - // ResourceDataOrError needs to be type asserted to the appropriate type - // for the resource being watched. - // - // In the latter case, this callback will be invoked with a non-nil error - // value in ResourceDataOrError. - // - // Watcher is expected to use the most recent value passed to - // OnResourceChanged(), regardless of whether that's a ResourceData or an - // error i.e., if the watcher is given an error via OnResourceChanged(), - // that means it should stop using any previously delivered resource. - // - // It is invoked under different error conditions including but not - // limited to the following: - // - authority mentioned in the resource is not found - // - resource name parsing error - // - resource validation error (if resource is not cached) - // - ADS stream failure (if resource is not cached) - // - connection failure (if resource is not cached) + // Upon receiving this, in case of an error, the watcher should + // stop using any previously seen resource. xDS client will remove the + // resource from its cache. OnResourceChanged(ResourceDataOrError, OnDoneFunc) - // OnAmbientError is invoked to notify the watcher of an error that occurs - // after a resource has been received (i.e. we already have a cached - // resource) that should not modify the watcher’s use of that resource but - // that may be useful information about the ambient state of the XdsClient. - // In particular, the watcher should NOT stop using the previously seen - // resource, and the XdsClient will NOT remove the resource from its cache. - // However, the error message may be useful as additional context to - // include in errors that are being generated for other reasons. + // OnAmbientError is invoked if resource is already cached under different + // error conditions. // - // If resource is already cached, it is invoked under different error - // conditions including but not limited to the following: - // - resource validation error - // - ADS stream failure - // - connection failure + // Upon receiving this, the watcher may continue using the previously seen + // resource. xDS client will not remove the resource from its cache. OnAmbientError(error, OnDoneFunc) } diff --git a/xds/internal/xdsclient/xdsresource/route_config_resource_type.go b/xds/internal/xdsclient/xdsresource/route_config_resource_type.go index 8077748201ba..60cddd9ba37e 100644 --- a/xds/internal/xdsclient/xdsresource/route_config_resource_type.go +++ b/xds/internal/xdsclient/xdsresource/route_config_resource_type.go @@ -105,26 +105,24 @@ func (r *RouteConfigResourceData) Raw() *anypb.Any { } // RouteConfigWatcher wraps the callbacks to be invoked for different -// events corresponding to the route configuration resource being watched. +// events corresponding to the route configuration resource being watched. gRFC +// A88 contains an exhaustive list of what method is invoked under what +// conditions. type RouteConfigWatcher interface { // OnResourceChanged is invoked to notify the watcher of a new version of // the resource received from the xDS server or an error indicating the // reason why the resource cannot be obtained. // - // It is invoked under different error conditions including but not - // limited to the following: - // - authority mentioned in the resource is not found - // - resource name parsing error - // - resource validation error (if resource is not cached) - // - ADS stream failure (if resource is not cached) - // - connection failure (if resource is not cached) + // Upon receiving this, in case of an error, the watcher should + // stop using any previously seen resource. xDS client will remove the + // resource from its cache. OnResourceChanged(*ResourceDataOrError, OnDoneFunc) // OnAmbientError is invoked if resource is already cached under different - // error conditions including but not limited to the following: - // - resource validation error - // - ADS stream failure - // - connection failure + // error conditions. + // + // Upon receiving this, the watcher may continue using the previously seen + // resource. xDS client will not remove the resource from its cache. OnAmbientError(error, OnDoneFunc) } From b9d2a920326072460d62453b6cf165067d317a30 Mon Sep 17 00:00:00 2001 From: Purnesh Dixit Date: Tue, 4 Feb 2025 16:54:21 +0530 Subject: [PATCH 7/7] update watcher documentation to clarify stop/not stop using cached resource --- xds/internal/balancer/cdsbalancer/cdsbalancer.go | 12 ++++++------ .../xdsclient/xdsresource/cluster_resource_type.go | 4 ++-- .../xdsclient/xdsresource/endpoints_resource_type.go | 4 ++-- .../xdsclient/xdsresource/listener_resource_type.go | 4 ++-- xds/internal/xdsclient/xdsresource/resource_type.go | 4 ++-- .../xdsresource/route_config_resource_type.go | 4 ++-- 6 files changed, 16 insertions(+), 16 deletions(-) diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer.go b/xds/internal/balancer/cdsbalancer/cdsbalancer.go index d72f365f8202..43b2d88b2a4c 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer.go @@ -496,9 +496,9 @@ func (b *cdsBalancer) onClusterUpdate(name string, update xdsresource.ClusterUpd } } -// Handles an error Cluster update from the xDS client. Propagates the error -// down to the child policy if one exists, or puts the channel in -// TRANSIENT_FAILURE. +// Handles an error Cluster update from the xDS client to not stop using the +// previously seen resource. Propagates the error down to the child policy +// if one exists, or puts the channel in TRANSIENT_FAILURE. // // Only executed in the context of a serializer callback. func (b *cdsBalancer) onClusterAmbientError(name string, err error) { @@ -520,9 +520,9 @@ func (b *cdsBalancer) onClusterAmbientError(name string, err error) { } } -// Handles a resource-not-found error from the xDS client. Propagates the error -// down to the child policy if one exists, or puts the channel in -// TRANSIENT_FAILURE. +// Handles an error Cluster update from the xDS client to stop using the +// previously seen resource. Propagates the error down to the child policy +// if one exists, or puts the channel in TRANSIENT_FAILURE. // // Only executed in the context of a serializer callback. func (b *cdsBalancer) onClusterResourceChangedError(name string, err error) { diff --git a/xds/internal/xdsclient/xdsresource/cluster_resource_type.go b/xds/internal/xdsclient/xdsresource/cluster_resource_type.go index 4c16900af86c..e28c5768fe3d 100644 --- a/xds/internal/xdsclient/xdsresource/cluster_resource_type.go +++ b/xds/internal/xdsclient/xdsresource/cluster_resource_type.go @@ -123,8 +123,8 @@ type ClusterWatcher interface { // OnAmbientError is invoked if resource is already cached under different // error conditions. // - // Upon receiving this, the watcher may continue using the previously seen - // resource. xDS client will not remove the resource from its cache. + // Upon receiving this, the watcher should not stop using the previously + // seen resource. xDS client will not remove the resource from its cache. OnAmbientError(error, OnDoneFunc) } diff --git a/xds/internal/xdsclient/xdsresource/endpoints_resource_type.go b/xds/internal/xdsclient/xdsresource/endpoints_resource_type.go index c74e26abc113..8854bb590060 100644 --- a/xds/internal/xdsclient/xdsresource/endpoints_resource_type.go +++ b/xds/internal/xdsclient/xdsresource/endpoints_resource_type.go @@ -119,8 +119,8 @@ type EndpointsWatcher interface { // OnAmbientError is invoked if resource is already cached under different // error conditions. // - // Upon receiving this, the watcher may continue using the previously seen - // resource. xDS client will not remove the resource from its cache. + // Upon receiving this, the watcher should not stop using the previously + // seen resource. xDS client will not remove the resource from its cache. OnAmbientError(error, OnDoneFunc) } diff --git a/xds/internal/xdsclient/xdsresource/listener_resource_type.go b/xds/internal/xdsclient/xdsresource/listener_resource_type.go index 8cc6a307d8eb..89146bfe8ec4 100644 --- a/xds/internal/xdsclient/xdsresource/listener_resource_type.go +++ b/xds/internal/xdsclient/xdsresource/listener_resource_type.go @@ -156,8 +156,8 @@ type ListenerWatcher interface { // OnAmbientError is invoked if resource is already cached under different // error conditions. // - // Upon receiving this, the watcher may continue using the previously seen - // resource. xDS client will not remove the resource from its cache. + // Upon receiving this, the watcher should not stop using the previously + // seen resource. xDS client will not remove the resource from its cache. OnAmbientError(error, OnDoneFunc) } diff --git a/xds/internal/xdsclient/xdsresource/resource_type.go b/xds/internal/xdsclient/xdsresource/resource_type.go index fcc85e2f699a..2ed3cbe58d9b 100644 --- a/xds/internal/xdsclient/xdsresource/resource_type.go +++ b/xds/internal/xdsclient/xdsresource/resource_type.go @@ -82,8 +82,8 @@ type ResourceWatcher interface { // OnAmbientError is invoked if resource is already cached under different // error conditions. // - // Upon receiving this, the watcher may continue using the previously seen - // resource. xDS client will not remove the resource from its cache. + // Upon receiving this, the watcher should not stop using the previously + // seen resource. xDS client will not remove the resource from its cache. OnAmbientError(error, OnDoneFunc) } diff --git a/xds/internal/xdsclient/xdsresource/route_config_resource_type.go b/xds/internal/xdsclient/xdsresource/route_config_resource_type.go index 60cddd9ba37e..45cb4db9b096 100644 --- a/xds/internal/xdsclient/xdsresource/route_config_resource_type.go +++ b/xds/internal/xdsclient/xdsresource/route_config_resource_type.go @@ -121,8 +121,8 @@ type RouteConfigWatcher interface { // OnAmbientError is invoked if resource is already cached under different // error conditions. // - // Upon receiving this, the watcher may continue using the previously seen - // resource. xDS client will not remove the resource from its cache. + // Upon receiving this, the watcher should not stop using the previously + // seen resource. xDS client will not remove the resource from its cache. OnAmbientError(error, OnDoneFunc) }