Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xdsclient: update watcher API as per gRFC A88 #7977

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 10 additions & 25 deletions xds/csds/csds_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,49 +70,37 @@ func Test(t *testing.T) {

type nopListenerWatcher struct{}

func (nopListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
func (nopListenerWatcher) OnResourceChanged(_ *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopListenerWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
func (nopListenerWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) {
onDone()
}

type nopRouteConfigWatcher struct{}

func (nopRouteConfigWatcher) OnUpdate(_ *xdsresource.RouteConfigResourceData, onDone xdsresource.OnDoneFunc) {
func (nopRouteConfigWatcher) OnResourceChanged(_ *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopRouteConfigWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopRouteConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
func (nopRouteConfigWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) {
onDone()
}

type nopClusterWatcher struct{}

func (nopClusterWatcher) OnUpdate(_ *xdsresource.ClusterResourceData, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopClusterWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
func (nopClusterWatcher) OnResourceChanged(_ *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopClusterWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
func (nopClusterWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) {
onDone()
}

type nopEndpointsWatcher struct{}

func (nopEndpointsWatcher) OnUpdate(_ *xdsresource.EndpointsResourceData, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopEndpointsWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
func (nopEndpointsWatcher) OnResourceChanged(_ *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopEndpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
func (nopEndpointsWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) {
onDone()
}

Expand All @@ -137,13 +125,10 @@ func newBlockingListenerWatcher(testCtxDone <-chan struct{}) *blockingListenerWa
}
}

func (w *blockingListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
writeOnDone(w.testCtxDone, w.onDoneCh, onDone)
}
func (w *blockingListenerWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
func (w *blockingListenerWatcher) OnResourceChanged(_ *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) {
writeOnDone(w.testCtxDone, w.onDoneCh, onDone)
}
func (w *blockingListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
func (w *blockingListenerWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) {
writeOnDone(w.testCtxDone, w.onDoneCh, onDone)
}

Expand Down
15 changes: 7 additions & 8 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func (b *cdsBalancer) ResolverError(err error) {
if b.lbCfg != nil {
root = b.lbCfg.ClusterName
}
b.onClusterError(root, err)
b.onClusterAmbientError(root, err)
})
}

Expand Down Expand Up @@ -428,20 +428,20 @@ func (b *cdsBalancer) onClusterUpdate(name string, update xdsresource.ClusterUpd
// If the security config is invalid, for example, if the provider
// instance is not found in the bootstrap config, we need to put the
// channel in transient failure.
b.onClusterError(name, fmt.Errorf("received Cluster resource contains invalid security config: %v", err))
b.onClusterAmbientError(name, fmt.Errorf("received Cluster resource contains invalid security config: %v", err))
return
}
}

clustersSeen := make(map[string]bool)
dms, ok, err := b.generateDMsForCluster(b.lbCfg.ClusterName, 0, nil, clustersSeen)
if err != nil {
b.onClusterError(b.lbCfg.ClusterName, fmt.Errorf("failed to generate discovery mechanisms: %v", err))
b.onClusterAmbientError(b.lbCfg.ClusterName, fmt.Errorf("failed to generate discovery mechanisms: %v", err))
return
}
if ok {
if len(dms) == 0 {
b.onClusterError(b.lbCfg.ClusterName, fmt.Errorf("aggregate cluster graph has no leaf clusters"))
b.onClusterAmbientError(b.lbCfg.ClusterName, fmt.Errorf("aggregate cluster graph has no leaf clusters"))
return
}
// Child policy is built the first time we resolve the cluster graph.
Expand Down Expand Up @@ -501,7 +501,7 @@ func (b *cdsBalancer) onClusterUpdate(name string, update xdsresource.ClusterUpd
// TRANSIENT_FAILURE.
//
// Only executed in the context of a serializer callback.
func (b *cdsBalancer) onClusterError(name string, err error) {
func (b *cdsBalancer) onClusterAmbientError(name string, err error) {
b.logger.Warningf("Cluster resource %q received error update: %v", name, err)

if b.childLB != nil {
Expand All @@ -525,15 +525,14 @@ func (b *cdsBalancer) onClusterError(name string, err error) {
// TRANSIENT_FAILURE.
//
// Only executed in the context of a serializer callback.
func (b *cdsBalancer) onClusterResourceNotFound(name string) {
err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type Cluster not found in received response", name)
func (b *cdsBalancer) onClusterResourceChangedError(name string, err error) {
easwars marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment at the top of this method needs to be updated. This method now handles not only resource-not-found errors, but other types of errors as well that instruct the LB policy to stop using the previously received resource.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. Also, updated the onClusterAmbient error to not stop using the previously seen resource

if b.childLB != nil {
b.childLB.ResolverError(err)
} else {
// If child balancer was never created, fail the RPCs with errors.
b.ccw.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: base.NewErrPicker(err),
Picker: base.NewErrPicker(fmt.Errorf("%q: %v", name, err)),
easwars marked this conversation as resolved.
Show resolved Hide resolved
})
}
}
Expand Down
19 changes: 10 additions & 9 deletions xds/internal/balancer/cdsbalancer/cluster_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,22 @@ type clusterWatcher struct {
parent *cdsBalancer
}

func (cw *clusterWatcher) OnUpdate(u *xdsresource.ClusterResourceData, onDone xdsresource.OnDoneFunc) {
handleUpdate := func(context.Context) { cw.parent.onClusterUpdate(cw.name, u.Resource); onDone() }
func (cw *clusterWatcher) OnResourceChanged(u *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) {
if u.Err != nil {
handleError := func(context.Context) { cw.parent.onClusterResourceChangedError(cw.name, u.Err); onDone() }
cw.parent.serializer.ScheduleOr(handleError, onDone)
return
}
update := u.Data.(*xdsresource.ClusterResourceData)
handleUpdate := func(context.Context) { cw.parent.onClusterUpdate(cw.name, update.Resource); onDone() }
cw.parent.serializer.ScheduleOr(handleUpdate, onDone)
}

func (cw *clusterWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
handleError := func(context.Context) { cw.parent.onClusterError(cw.name, err); onDone() }
func (cw *clusterWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) {
handleError := func(context.Context) { cw.parent.onClusterAmbientError(cw.name, err); onDone() }
cw.parent.serializer.ScheduleOr(handleError, onDone)
}

func (cw *clusterWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
handleNotFound := func(context.Context) { cw.parent.onClusterResourceNotFound(cw.name); onDone() }
cw.parent.serializer.ScheduleOr(handleNotFound, onDone)
}

// watcherState groups the state associated with a clusterWatcher.
type watcherState struct {
watcher *clusterWatcher // The underlying watcher.
Expand Down
52 changes: 26 additions & 26 deletions xds/internal/balancer/clusterresolver/resource_resolver_eds.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,20 +76,43 @@ func newEDSResolver(nameToWatch string, producer xdsresource.Producer, topLevelR
}

// OnUpdate is invoked to report an update for the resource being watched.
func (er *edsDiscoveryMechanism) OnUpdate(update *xdsresource.EndpointsResourceData, onDone xdsresource.OnDoneFunc) {
func (er *edsDiscoveryMechanism) OnResourceChanged(update *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) {
if er.stopped.HasFired() {
onDone()
return
}

if update.Err != nil {
if er.logger.V(2) {
if xdsresource.ErrType(update.Err) == xdsresource.ErrorTypeResourceNotFound {
er.logger.Infof("EDS discovery mechanism for resource %q reported resource-does-not-exist error", er.nameToWatch)
} else {
er.logger.Infof("EDS discovery mechanism for resource %q reported on resource changed error: %v", er.nameToWatch, update.Err)
}
easwars marked this conversation as resolved.
Show resolved Hide resolved
}
// Report an empty update that would result in no priority child being
// created for this discovery mechanism. This would result in the priority
// LB policy reporting TRANSIENT_FAILURE (as there would be no priorities or
// localities) if this was the only discovery mechanism, or would result in
// the priority LB policy using a lower priority discovery mechanism when
// that becomes available.
er.mu.Lock()
er.update = &xdsresource.EndpointsUpdate{}
er.mu.Unlock()

er.topLevelResolver.onUpdate(onDone)
return
}

er.mu.Lock()
er.update = &update.Resource
u := update.Data.(*xdsresource.EndpointsResourceData)
er.update = &u.Resource
er.mu.Unlock()

er.topLevelResolver.onUpdate(onDone)
}

func (er *edsDiscoveryMechanism) OnError(err error, onDone xdsresource.OnDoneFunc) {
func (er *edsDiscoveryMechanism) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) {
if er.stopped.HasFired() {
onDone()
return
Expand Down Expand Up @@ -119,26 +142,3 @@ func (er *edsDiscoveryMechanism) OnError(err error, onDone xdsresource.OnDoneFun

er.topLevelResolver.onUpdate(onDone)
}

func (er *edsDiscoveryMechanism) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
if er.stopped.HasFired() {
onDone()
return
}

if er.logger.V(2) {
er.logger.Infof("EDS discovery mechanism for resource %q reported resource-does-not-exist error", er.nameToWatch)
}

// Report an empty update that would result in no priority child being
// created for this discovery mechanism. This would result in the priority
// LB policy reporting TRANSIENT_FAILURE (as there would be no priorities or
// localities) if this was the only discovery mechanism, or would result in
// the priority LB policy using a lower priority discovery mechanism when
// that becomes available.
er.mu.Lock()
er.update = &xdsresource.EndpointsUpdate{}
er.mu.Unlock()

er.topLevelResolver.onUpdate(onDone)
}
38 changes: 20 additions & 18 deletions xds/internal/resolver/watch_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,22 @@ func newListenerWatcher(resourceName string, parent *xdsResolver) *listenerWatch
return lw
}

func (l *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(update.Resource); onDone() }
func (l *listenerWatcher) OnResourceChanged(update *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) {
if update.Err != nil {
handleError := func(context.Context) { l.parent.onListenerResourceChangedError(update.Err); onDone() }
l.parent.serializer.ScheduleOr(handleError, onDone)
return
}
u := update.Data.(*xdsresource.ListenerResourceData)
handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(u.Resource); onDone() }
l.parent.serializer.ScheduleOr(handleUpdate, onDone)
}

func (l *listenerWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
handleError := func(context.Context) { l.parent.onListenerResourceError(err); onDone() }
func (l *listenerWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) {
handleError := func(context.Context) { l.parent.onListenerResourceAmbientError(err); onDone() }
l.parent.serializer.ScheduleOr(handleError, onDone)
}

func (l *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
handleNotFound := func(context.Context) { l.parent.onListenerResourceNotFound(); onDone() }
l.parent.serializer.ScheduleOr(handleNotFound, onDone)
}

func (l *listenerWatcher) stop() {
l.cancel()
l.parent.logger.Infof("Canceling watch on Listener resource %q", l.resourceName)
Expand All @@ -68,24 +69,25 @@ func newRouteConfigWatcher(resourceName string, parent *xdsResolver) *routeConfi
return rw
}

func (r *routeConfigWatcher) OnUpdate(u *xdsresource.RouteConfigResourceData, onDone xdsresource.OnDoneFunc) {
func (r *routeConfigWatcher) OnResourceChanged(u *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) {
if u.Err != nil {
handleError := func(context.Context) { r.parent.onRouteConfigResourceChangedError(r.resourceName, u.Err); onDone() }
r.parent.serializer.ScheduleOr(handleError, onDone)
return
}
handleUpdate := func(context.Context) {
r.parent.onRouteConfigResourceUpdate(r.resourceName, u.Resource)
update := u.Data.(*xdsresource.RouteConfigResourceData)
r.parent.onRouteConfigResourceUpdate(r.resourceName, update.Resource)
onDone()
}
r.parent.serializer.ScheduleOr(handleUpdate, onDone)
}

func (r *routeConfigWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
handleError := func(context.Context) { r.parent.onRouteConfigResourceError(r.resourceName, err); onDone() }
func (r *routeConfigWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) {
handleError := func(context.Context) { r.parent.onRouteConfigResourceAmbientError(r.resourceName, err); onDone() }
r.parent.serializer.ScheduleOr(handleError, onDone)
}

func (r *routeConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
handleNotFound := func(context.Context) { r.parent.onRouteConfigResourceNotFound(r.resourceName); onDone() }
r.parent.serializer.ScheduleOr(handleNotFound, onDone)
}

func (r *routeConfigWatcher) stop() {
r.cancel()
r.parent.logger.Infof("Canceling watch on RouteConfiguration resource %q", r.resourceName)
Expand Down
20 changes: 14 additions & 6 deletions xds/internal/resolver/xds_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,17 +518,21 @@ func (r *xdsResolver) onListenerResourceUpdate(update xdsresource.ListenerUpdate
r.routeConfigWatcher = newRouteConfigWatcher(r.rdsResourceName, r)
}

func (r *xdsResolver) onListenerResourceError(err error) {
func (r *xdsResolver) onListenerResourceAmbientError(err error) {
if r.logger.V(2) {
r.logger.Infof("Received error for Listener resource %q: %v", r.ldsResourceName, err)
}
r.onError(err)
}

// Only executed in the context of a serializer callback.
func (r *xdsResolver) onListenerResourceNotFound() {
func (r *xdsResolver) onListenerResourceChangedError(err error) {
if r.logger.V(2) {
r.logger.Infof("Received resource-not-found-error for Listener resource %q", r.ldsResourceName)
if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound {
r.logger.Infof("Received resource-not-found-error for Listener resource %q", r.ldsResourceName)
} else {
r.logger.Infof("Received on-resource-changed error for Listener resource %q: %v", r.ldsResourceName, err)
}
easwars marked this conversation as resolved.
Show resolved Hide resolved
}

r.listenerUpdateRecvd = false
Expand Down Expand Up @@ -559,17 +563,21 @@ func (r *xdsResolver) onRouteConfigResourceUpdate(name string, update xdsresourc
}

// Only executed in the context of a serializer callback.
func (r *xdsResolver) onRouteConfigResourceError(name string, err error) {
func (r *xdsResolver) onRouteConfigResourceAmbientError(name string, err error) {
if r.logger.V(2) {
r.logger.Infof("Received error for RouteConfiguration resource %q: %v", name, err)
}
r.onError(err)
}

// Only executed in the context of a serializer callback.
func (r *xdsResolver) onRouteConfigResourceNotFound(name string) {
func (r *xdsResolver) onRouteConfigResourceChangedError(name string, err error) {
if r.logger.V(2) {
r.logger.Infof("Received resource-not-found-error for RouteConfiguration resource %q", name)
if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound {
r.logger.Infof("Received resource-not-found-error for RouteConfiguration resource %q", name)
} else {
r.logger.Infof("Received on-resource-changed error for RouteConfiguration resource %q: %v", name, err)
}
easwars marked this conversation as resolved.
Show resolved Hide resolved
}

if r.rdsResourceName != name {
Expand Down
Loading
Loading