Skip to content

Commit

Permalink
xdsclient: update watcher API as per gRFC A88
Browse files Browse the repository at this point in the history
  • Loading branch information
purnesh42H committed Jan 6, 2025
1 parent 724f450 commit 57dbf23
Show file tree
Hide file tree
Showing 22 changed files with 380 additions and 352 deletions.
35 changes: 10 additions & 25 deletions xds/csds/csds_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,49 +70,37 @@ func Test(t *testing.T) {

type nopListenerWatcher struct{}

func (nopListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
func (nopListenerWatcher) OnResourceChanged(_ *xdsresource.ListenerResourceData, _ error, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopListenerWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
func (nopListenerWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) {
onDone()
}

type nopRouteConfigWatcher struct{}

func (nopRouteConfigWatcher) OnUpdate(_ *xdsresource.RouteConfigResourceData, onDone xdsresource.OnDoneFunc) {
func (nopRouteConfigWatcher) OnResourceChanged(_ *xdsresource.RouteConfigResourceData, _ error, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopRouteConfigWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopRouteConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
func (nopRouteConfigWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) {
onDone()
}

type nopClusterWatcher struct{}

func (nopClusterWatcher) OnUpdate(_ *xdsresource.ClusterResourceData, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopClusterWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
func (nopClusterWatcher) OnResourceChanged(_ *xdsresource.ClusterResourceData, _ error, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopClusterWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
func (nopClusterWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) {
onDone()
}

type nopEndpointsWatcher struct{}

func (nopEndpointsWatcher) OnUpdate(_ *xdsresource.EndpointsResourceData, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopEndpointsWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
func (nopEndpointsWatcher) OnResourceChanged(_ *xdsresource.EndpointsResourceData, _ error, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopEndpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
func (nopEndpointsWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) {
onDone()
}

Expand All @@ -137,13 +125,10 @@ func newBlockingListenerWatcher(testCtxDone <-chan struct{}) *blockingListenerWa
}
}

func (w *blockingListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
writeOnDone(w.testCtxDone, w.onDoneCh, onDone)
}
func (w *blockingListenerWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
func (w *blockingListenerWatcher) OnResourceChanged(_ *xdsresource.ListenerResourceData, _ error, onDone xdsresource.OnDoneFunc) {
writeOnDone(w.testCtxDone, w.onDoneCh, onDone)
}
func (w *blockingListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
func (w *blockingListenerWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) {
writeOnDone(w.testCtxDone, w.onDoneCh, onDone)
}

Expand Down
19 changes: 12 additions & 7 deletions xds/internal/balancer/cdsbalancer/cluster_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,26 @@ type clusterWatcher struct {
parent *cdsBalancer
}

func (cw *clusterWatcher) OnUpdate(u *xdsresource.ClusterResourceData, onDone xdsresource.OnDoneFunc) {
func (cw *clusterWatcher) OnResourceChanged(u *xdsresource.ClusterResourceData, err error, onDone xdsresource.OnDoneFunc) {
if err != nil {
if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound {
handleNotFound := func(context.Context) { cw.parent.onClusterResourceNotFound(cw.name); onDone() }
cw.parent.serializer.ScheduleOr(handleNotFound, onDone)
} else {
handleError := func(context.Context) { cw.parent.onClusterError(cw.name, err); onDone() }
cw.parent.serializer.ScheduleOr(handleError, onDone)

Check warning on line 42 in xds/internal/balancer/cdsbalancer/cluster_watcher.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/balancer/cdsbalancer/cluster_watcher.go#L40-L42

Added lines #L40 - L42 were not covered by tests
}
return
}
handleUpdate := func(context.Context) { cw.parent.onClusterUpdate(cw.name, u.Resource); onDone() }
cw.parent.serializer.ScheduleOr(handleUpdate, onDone)
}

func (cw *clusterWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
func (cw *clusterWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) {
handleError := func(context.Context) { cw.parent.onClusterError(cw.name, err); onDone() }
cw.parent.serializer.ScheduleOr(handleError, onDone)
}

func (cw *clusterWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
handleNotFound := func(context.Context) { cw.parent.onClusterResourceNotFound(cw.name); onDone() }
cw.parent.serializer.ScheduleOr(handleNotFound, onDone)
}

// watcherState groups the state associated with a clusterWatcher.
type watcherState struct {
watcher *clusterWatcher // The underlying watcher.
Expand Down
49 changes: 24 additions & 25 deletions xds/internal/balancer/clusterresolver/resource_resolver_eds.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,20 +76,42 @@ func newEDSResolver(nameToWatch string, producer xdsresource.Producer, topLevelR
}

// OnUpdate is invoked to report an update for the resource being watched.
func (er *edsDiscoveryMechanism) OnUpdate(update *xdsresource.EndpointsResourceData, onDone xdsresource.OnDoneFunc) {
func (er *edsDiscoveryMechanism) OnResourceChanged(update *xdsresource.EndpointsResourceData, err error, onDone xdsresource.OnDoneFunc) {
if er.stopped.HasFired() {
onDone()
return
}

if err != nil {
if er.logger.V(2) {
if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound {
er.logger.Infof("EDS discovery mechanism for resource %q reported resource-does-not-exist error", er.nameToWatch)
} else {
er.logger.Infof("EDS discovery mechanism for resource %q reported error: %v", er.nameToWatch, err)
}

Check warning on line 91 in xds/internal/balancer/clusterresolver/resource_resolver_eds.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/balancer/clusterresolver/resource_resolver_eds.go#L87-L91

Added lines #L87 - L91 were not covered by tests
}
// Report an empty update that would result in no priority child being
// created for this discovery mechanism. This would result in the priority
// LB policy reporting TRANSIENT_FAILURE (as there would be no priorities or
// localities) if this was the only discovery mechanism, or would result in
// the priority LB policy using a lower priority discovery mechanism when
// that becomes available.
er.mu.Lock()
er.update = &xdsresource.EndpointsUpdate{}
er.mu.Unlock()

er.topLevelResolver.onUpdate(onDone)
return
}

er.mu.Lock()
er.update = &update.Resource
er.mu.Unlock()

er.topLevelResolver.onUpdate(onDone)
}

func (er *edsDiscoveryMechanism) OnError(err error, onDone xdsresource.OnDoneFunc) {
func (er *edsDiscoveryMechanism) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) {
if er.stopped.HasFired() {
onDone()
return
Expand Down Expand Up @@ -119,26 +141,3 @@ func (er *edsDiscoveryMechanism) OnError(err error, onDone xdsresource.OnDoneFun

er.topLevelResolver.onUpdate(onDone)
}

func (er *edsDiscoveryMechanism) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
if er.stopped.HasFired() {
onDone()
return
}

if er.logger.V(2) {
er.logger.Infof("EDS discovery mechanism for resource %q reported resource-does-not-exist error", er.nameToWatch)
}

// Report an empty update that would result in no priority child being
// created for this discovery mechanism. This would result in the priority
// LB policy reporting TRANSIENT_FAILURE (as there would be no priorities or
// localities) if this was the only discovery mechanism, or would result in
// the priority LB policy using a lower priority discovery mechanism when
// that becomes available.
er.mu.Lock()
er.update = &xdsresource.EndpointsUpdate{}
er.mu.Unlock()

er.topLevelResolver.onUpdate(onDone)
}
35 changes: 20 additions & 15 deletions xds/internal/resolver/watch_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,21 @@ func newListenerWatcher(resourceName string, parent *xdsResolver) *listenerWatch
return lw
}

func (l *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
func (l *listenerWatcher) OnResourceChanged(update *xdsresource.ListenerResourceData, err error, onDone xdsresource.OnDoneFunc) {
if err != nil {
handleError := func(context.Context) { l.parent.onListenerResourceChangedError(err); onDone() }
l.parent.serializer.ScheduleOr(handleError, onDone)
return
}
handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(update.Resource); onDone() }
l.parent.serializer.ScheduleOr(handleUpdate, onDone)
}

func (l *listenerWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
handleError := func(context.Context) { l.parent.onListenerResourceError(err); onDone() }
func (l *listenerWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) {
handleError := func(context.Context) { l.parent.onListenerResourceAmbientError(err); onDone() }
l.parent.serializer.ScheduleOr(handleError, onDone)
}

func (l *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
handleNotFound := func(context.Context) { l.parent.onListenerResourceNotFound(); onDone() }
l.parent.serializer.ScheduleOr(handleNotFound, onDone)
}

func (l *listenerWatcher) stop() {
l.cancel()
l.parent.logger.Infof("Canceling watch on Listener resource %q", l.resourceName)
Expand All @@ -68,24 +68,29 @@ func newRouteConfigWatcher(resourceName string, parent *xdsResolver) *routeConfi
return rw
}

func (r *routeConfigWatcher) OnUpdate(u *xdsresource.RouteConfigResourceData, onDone xdsresource.OnDoneFunc) {
func (r *routeConfigWatcher) OnResourceChanged(u *xdsresource.RouteConfigResourceData, err error, onDone xdsresource.OnDoneFunc) {
if err != nil {
var handleError func(context.Context)
if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound {
handleError = func(context.Context) { r.parent.onRouteConfigResourceNotFound(r.resourceName); onDone() }

Check warning on line 75 in xds/internal/resolver/watch_service.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/resolver/watch_service.go#L75

Added line #L75 was not covered by tests
} else {
handleError = func(context.Context) { r.parent.onRouteConfigResourceError(r.resourceName, err); onDone() }
}
r.parent.serializer.ScheduleOr(handleError, onDone)
return
}
handleUpdate := func(context.Context) {
r.parent.onRouteConfigResourceUpdate(r.resourceName, u.Resource)
onDone()
}
r.parent.serializer.ScheduleOr(handleUpdate, onDone)
}

func (r *routeConfigWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
func (r *routeConfigWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) {
handleError := func(context.Context) { r.parent.onRouteConfigResourceError(r.resourceName, err); onDone() }
r.parent.serializer.ScheduleOr(handleError, onDone)
}

func (r *routeConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
handleNotFound := func(context.Context) { r.parent.onRouteConfigResourceNotFound(r.resourceName); onDone() }
r.parent.serializer.ScheduleOr(handleNotFound, onDone)
}

func (r *routeConfigWatcher) stop() {
r.cancel()
r.parent.logger.Infof("Canceling watch on RouteConfiguration resource %q", r.resourceName)
Expand Down
10 changes: 7 additions & 3 deletions xds/internal/resolver/xds_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,17 +518,21 @@ func (r *xdsResolver) onListenerResourceUpdate(update xdsresource.ListenerUpdate
r.routeConfigWatcher = newRouteConfigWatcher(r.rdsResourceName, r)
}

func (r *xdsResolver) onListenerResourceError(err error) {
func (r *xdsResolver) onListenerResourceAmbientError(err error) {
if r.logger.V(2) {
r.logger.Infof("Received error for Listener resource %q: %v", r.ldsResourceName, err)
}
r.onError(err)
}

// Only executed in the context of a serializer callback.
func (r *xdsResolver) onListenerResourceNotFound() {
func (r *xdsResolver) onListenerResourceChangedError(err error) {
if r.logger.V(2) {
r.logger.Infof("Received resource-not-found-error for Listener resource %q", r.ldsResourceName)
if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound {
r.logger.Infof("Received resource-not-found-error for Listener resource %q", r.ldsResourceName)
} else {
r.logger.Infof("Received on-resource-changed error for Listener resource %q: %v", r.ldsResourceName, err)
}

Check warning on line 535 in xds/internal/resolver/xds_resolver.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/resolver/xds_resolver.go#L531-L535

Added lines #L531 - L535 were not covered by tests
}

r.listenerUpdateRecvd = false
Expand Down
36 changes: 18 additions & 18 deletions xds/internal/server/listener_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,19 +414,33 @@ type ldsWatcher struct {
name string
}

func (lw *ldsWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
func (lw *ldsWatcher) OnResourceChanged(update *xdsresource.ListenerResourceData, err error, onDone xdsresource.OnDoneFunc) {
defer onDone()
if lw.parent.closed.HasFired() {
lw.logger.Warningf("Resource %q received update: %#v after listener was closed", lw.name, update)
if err != nil {
lw.logger.Warningf("Resource %q received err: %#v after listener was closed", lw.name, err)
} else {
lw.logger.Warningf("Resource %q received update: %#v after listener was closed", lw.name, update)
}

Check warning on line 424 in xds/internal/server/listener_wrapper.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/server/listener_wrapper.go#L420-L424

Added lines #L420 - L424 were not covered by tests
return
}
if lw.logger.V(2) {
lw.logger.Infof("LDS watch for resource %q received update: %#v", lw.name, update.Resource)
if err != nil {
lw.logger.Infof("LDS watch for resource %q received error: %#v", lw.name, err)
} else {
lw.logger.Infof("LDS watch for resource %q received update: %#v", lw.name, update.Resource)
}

Check warning on line 432 in xds/internal/server/listener_wrapper.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/server/listener_wrapper.go#L428-L432

Added lines #L428 - L432 were not covered by tests
}
if err != nil {
if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound {
lw.parent.onLDSResourceDoesNotExist(err)
}
return
}
lw.parent.handleLDSUpdate(update.Resource)
}

func (lw *ldsWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
func (lw *ldsWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) {
defer onDone()
if lw.parent.closed.HasFired() {
lw.logger.Warningf("Resource %q received error: %v after listener was closed", lw.name, err)
Expand All @@ -438,17 +452,3 @@ func (lw *ldsWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
// For errors which are anything other than "resource-not-found", we
// continue to use the old configuration.
}

func (lw *ldsWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
defer onDone()
if lw.parent.closed.HasFired() {
lw.logger.Warningf("Resource %q received resource-does-not-exist error after listener was closed", lw.name)
return
}
if lw.logger.V(2) {
lw.logger.Infof("LDS watch for resource %q reported resource-does-not-exist error", lw.name)
}

err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type Listener not found in received response", lw.name)
lw.parent.onLDSResourceDoesNotExist(err)
}
29 changes: 11 additions & 18 deletions xds/internal/server/rds_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ type rdsWatcher struct {
canceled bool // eats callbacks if true
}

func (rw *rdsWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData, onDone xdsresource.OnDoneFunc) {
func (rw *rdsWatcher) OnResourceChanged(update *xdsresource.RouteConfigResourceData, err error, onDone xdsresource.OnDoneFunc) {
defer onDone()
rw.mu.Lock()
if rw.canceled {
Expand All @@ -156,26 +156,20 @@ func (rw *rdsWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData, onDo
}
rw.mu.Unlock()
if rw.logger.V(2) {
rw.logger.Infof("RDS watch for resource %q received update: %#v", rw.routeName, update.Resource)
if err != nil {
rw.logger.Infof("RDS watch for resource %q received error: %#v", rw.routeName, err)
} else {
rw.logger.Infof("RDS watch for resource %q received update: %#v", rw.routeName, update.Resource)
}
}
rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{data: &update.Resource})
}

func (rw *rdsWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
defer onDone()
rw.mu.Lock()
if rw.canceled {
rw.mu.Unlock()
if err != nil {
rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{err: err})
return
}
rw.mu.Unlock()
if rw.logger.V(2) {
rw.logger.Infof("RDS watch for resource %q reported error: %v", rw.routeName, err)
}
rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{err: err})
rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{data: &update.Resource})
}

func (rw *rdsWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
func (rw *rdsWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) {
defer onDone()
rw.mu.Lock()
if rw.canceled {
Expand All @@ -184,8 +178,7 @@ func (rw *rdsWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
}
rw.mu.Unlock()
if rw.logger.V(2) {
rw.logger.Infof("RDS watch for resource %q reported resource-does-not-exist error: %v", rw.routeName)
rw.logger.Infof("RDS watch for resource %q reported error: %v", rw.routeName, err)
}
err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type RouteConfiguration not found in received response", rw.routeName)
rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{err: err})
}
Loading

0 comments on commit 57dbf23

Please sign in to comment.