From 57bbe71b9b277efdf91da31f40a748a900aac7be Mon Sep 17 00:00:00 2001 From: Karl Isenberg Date: Mon, 16 May 2022 18:05:31 -0700 Subject: [PATCH] fix: Stop StatusWatcher on Forbidden API error - This matches previous StatusPoller behavior which would error and exit if there was a 403 Forbidden error from the apiserver. - Handle status error before synchronization with immediate exit --- pkg/apply/taskrunner/runner.go | 6 + .../watcher/dynamic_informer_factory.go | 35 +++++ .../watcher/dynamic_informer_factory_test.go | 120 +++++++++++++++--- pkg/kstatus/watcher/event_funnel.go | 3 + pkg/kstatus/watcher/object_status_reporter.go | 100 ++++++++------- 5 files changed, 199 insertions(+), 65 deletions(-) diff --git a/pkg/apply/taskrunner/runner.go b/pkg/apply/taskrunner/runner.go index 6fc18b0c..9ce537d7 100644 --- a/pkg/apply/taskrunner/runner.go +++ b/pkg/apply/taskrunner/runner.go @@ -121,6 +121,9 @@ func (tsr *TaskStatusRunner) Run( statusEvent.Error) if currentTask != nil { currentTask.Cancel(taskContext) + } else { + // tasks not started yet - abort now + return complete(abortReason) } continue } @@ -207,6 +210,9 @@ func (tsr *TaskStatusRunner) Run( klog.V(7).Infof("Runner aborting: %v", abortReason) if currentTask != nil { currentTask.Cancel(taskContext) + } else { + // tasks not started yet - abort now + return complete(abortReason) } } } diff --git a/pkg/kstatus/watcher/dynamic_informer_factory.go b/pkg/kstatus/watcher/dynamic_informer_factory.go index 27810d60..cd58e93c 100644 --- a/pkg/kstatus/watcher/dynamic_informer_factory.go +++ b/pkg/kstatus/watcher/dynamic_informer_factory.go @@ -5,6 +5,8 @@ package watcher import ( "context" + "regexp" + "strings" "time" "k8s.io/apimachinery/pkg/api/meta" @@ -55,3 +57,36 @@ func (f *DynamicInformerFactory) NewInformer(ctx context.Context, mapping *meta. f.Indexers, ) } + +// resourceNotFoundMessage is the condition message for metav1.StatusReasonNotFound. +// This is necessary because the Informer doesn't properly wrap list errors. +// https://github.com/kubernetes/client-go/blob/v0.24.0/tools/cache/reflector.go#L325 +// https://github.com/kubernetes/apimachinery/blob/v0.24.0/pkg/api/errors/errors.go#L448 +// TODO: Remove once fix is released (1.25+): https://github.com/kubernetes/kubernetes/pull/110076 +const resourceNotFoundMessage = "the server could not find the requested resource" + +// containsNotFoundMessage checks if the error string contains the message for +// StatusReasonNotFound. +func containsNotFoundMessage(err error) bool { + return strings.Contains(err.Error(), resourceNotFoundMessage) +} + +// resourceForbiddenMessagePattern is a regex pattern to match the condition +// message for metav1.StatusForbidden. +// This is necessary because the Informer doesn't properly wrap list errors. +// https://github.com/kubernetes/client-go/blob/v0.24.0/tools/cache/reflector.go#L325 +// https://github.com/kubernetes/apimachinery/blob/v0.24.0/pkg/api/errors/errors.go#L458 +// https://github.com/kubernetes/apimachinery/blob/v0.24.0/pkg/api/errors/errors.go#L208 +// https://github.com/kubernetes/apiserver/blob/master/pkg/endpoints/handlers/responsewriters/errors.go#L51 +// TODO: Remove once fix is released (1.25+): https://github.com/kubernetes/kubernetes/pull/110076 +const resourceForbiddenMessagePattern = `(.+) is forbidden: User "(.*)" cannot (.+) resource "(.*)" in API group "(.*)"` + +// resourceForbiddenMessageRegexp is the pre-compiled Regexp of +// resourceForbiddenMessagePattern. +var resourceForbiddenMessageRegexp = regexp.MustCompile(resourceForbiddenMessagePattern) + +// containsForbiddenMessage checks if the error string contains the message for +// StatusForbidden. +func containsForbiddenMessage(err error) bool { + return resourceForbiddenMessageRegexp.Match([]byte(err.Error())) +} diff --git a/pkg/kstatus/watcher/dynamic_informer_factory_test.go b/pkg/kstatus/watcher/dynamic_informer_factory_test.go index 3d479e6c..9d368803 100644 --- a/pkg/kstatus/watcher/dynamic_informer_factory_test.go +++ b/pkg/kstatus/watcher/dynamic_informer_factory_test.go @@ -5,7 +5,7 @@ package watcher import ( "context" - "net/http" + "fmt" "testing" "time" @@ -54,13 +54,7 @@ func TestResourceNotFoundError(t *testing.T) { // dynamicClient converts Status objects from the apiserver into errors. // So we can just return the right error here to simulate an error from // the apiserver. - name := "" // unused by LIST requests - // The apisevrer confusingly does not return apierrors.NewNotFound, - // which has a nice constant for its error message. - // err = apierrors.NewNotFound(exampleGR, name) - // Instead it uses apierrors.NewGenericServerResponse, which uses - // a hard-coded error message. - err = apierrors.NewGenericServerResponse(http.StatusNotFound, "list", exampleGR, name, "unused", -1, false) + err = newGenericServerResponse(action, newNotFoundResourceStatusError(action)) return true, nil, err }) }, @@ -88,13 +82,7 @@ func TestResourceNotFoundError(t *testing.T) { // dynamicClient converts Status objects from the apiserver into errors. // So we can just return the right error here to simulate an error from // the apiserver. - name := "" // unused by LIST requests - // The apisevrer confusingly does not return apierrors.NewNotFound, - // which has a nice constant for its error message. - // err = apierrors.NewNotFound(exampleGR, name) - // Instead it uses apierrors.NewGenericServerResponse, which uses - // a hard-coded error message. - err = apierrors.NewGenericServerResponse(http.StatusNotFound, "list", exampleGR, name, "unused", -1, false) + err = newGenericServerResponse(action, newNotFoundResourceStatusError(action)) return true, nil, err }) }, @@ -110,7 +98,67 @@ func TestResourceNotFoundError(t *testing.T) { t.Errorf("Expected typed NotFound error, but got untyped NotFound error: %v", err) default: // If we got this error, the test is probably broken. - t.Errorf("Expected untyped NotFound error, but got a different error: %v", err) + t.Errorf("Expected typed NotFound error, but got a different error: %v", err) + } + }, + }, + { + name: "List resource forbidden error", + setup: func(fakeClient *dynamicfake.FakeDynamicClient) { + fakeClient.PrependReactor("list", exampleGR.Resource, func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) { + listAction := action.(clienttesting.ListAction) + if listAction.GetNamespace() != namespace { + assert.Fail(t, "Received unexpected LIST namespace: %s", listAction.GetNamespace()) + return false, nil, nil + } + // dynamicClient converts Status objects from the apiserver into errors. + // So we can just return the right error here to simulate an error from + // the apiserver. + err = newGenericServerResponse(action, newForbiddenResourceStatusError(action)) + return true, nil, err + }) + }, + errorHandler: func(t *testing.T, err error) { + switch { + case apierrors.IsForbidden(err): + // If we got this error, something changed in the apiserver or + // client. If the client changed, it might be safe to stop parsing + // the error string. + t.Errorf("Expected untyped Forbidden error, but got typed Forbidden error: %v", err) + case containsForbiddenMessage(err): + // This is the expected hack, because the Informer/Reflector + // doesn't wrap the error with "%w". + t.Logf("Received expected untyped Forbidden error: %v", err) + default: + // If we got this error, the test is probably broken. + t.Errorf("Expected untyped Forbidden error, but got a different error: %v", err) + } + }, + }, + { + name: "Watch resource forbidden error", + setup: func(fakeClient *dynamicfake.FakeDynamicClient) { + fakeClient.PrependWatchReactor(exampleGR.Resource, func(action clienttesting.Action) (handled bool, ret watch.Interface, err error) { + // dynamicClient converts Status objects from the apiserver into errors. + // So we can just return the right error here to simulate an error from + // the apiserver. + err = newGenericServerResponse(action, newForbiddenResourceStatusError(action)) + return true, nil, err + }) + }, + errorHandler: func(t *testing.T, err error) { + switch { + case apierrors.IsForbidden(err): + // This is the expected behavior, because the + // Informer/Reflector DOES wrap watch errors + t.Logf("Received expected untyped Forbidden error: %v", err) + case containsForbiddenMessage(err): + // If this happens, there was a regression. + // Watch errors are expected to be wrapped with "%w" + t.Errorf("Expected typed Forbidden error, but got untyped Forbidden error: %v", err) + default: + // If we got this error, the test is probably broken. + t.Errorf("Expected typed Forbidden error, but got a different error: %v", err) } }, }, @@ -164,3 +212,43 @@ func TestResourceNotFoundError(t *testing.T) { }) } } + +// newForbiddenResourceStatusError emulates a Forbidden error from the apiserver +// for a namespace-scoped resource. +// https://github.com/kubernetes/apiserver/blob/master/pkg/endpoints/handlers/responsewriters/errors.go#L36 +func newForbiddenResourceStatusError(action clienttesting.Action) *apierrors.StatusError { + username := "unused" + verb := action.GetVerb() + resource := action.GetResource().Resource + if subresource := action.GetSubresource(); len(subresource) > 0 { + resource = resource + "/" + subresource + } + apiGroup := action.GetResource().Group + namespace := action.GetNamespace() + + // https://github.com/kubernetes/apiserver/blob/master/pkg/endpoints/handlers/responsewriters/errors.go#L51 + err := fmt.Errorf("User %q cannot %s resource %q in API group %q in the namespace %q", + username, verb, resource, apiGroup, namespace) + + qualifiedResource := action.GetResource().GroupResource() + name := "" // unused by ListAndWatch + return apierrors.NewForbidden(qualifiedResource, name, err) +} + +// newNotFoundResourceStatusError emulates a NotFOund error from the apiserver +// for a resource (not an object). +func newNotFoundResourceStatusError(action clienttesting.Action) *apierrors.StatusError { + qualifiedResource := action.GetResource().GroupResource() + name := "" // unused by ListAndWatch + return apierrors.NewNotFound(qualifiedResource, name) +} + +// newGenericServerResponse emulates a StatusError from the apiserver. +func newGenericServerResponse(action clienttesting.Action, statusError *apierrors.StatusError) *apierrors.StatusError { + errorCode := int(statusError.ErrStatus.Code) + verb := action.GetVerb() + qualifiedResource := action.GetResource().GroupResource() + name := statusError.ErrStatus.Details.Name + // https://github.com/kubernetes/apimachinery/blob/v0.24.0/pkg/api/errors/errors.go#L435 + return apierrors.NewGenericServerResponse(errorCode, verb, qualifiedResource, name, statusError.Error(), -1, false) +} diff --git a/pkg/kstatus/watcher/event_funnel.go b/pkg/kstatus/watcher/event_funnel.go index 3dcfa1f8..d94a050b 100644 --- a/pkg/kstatus/watcher/event_funnel.go +++ b/pkg/kstatus/watcher/event_funnel.go @@ -7,6 +7,7 @@ import ( "context" "fmt" + "k8s.io/klog/v2" "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event" ) @@ -37,6 +38,7 @@ func newEventFunnel(ctx context.Context) *eventFunnel { go func() { defer func() { // Don't close counterCh, otherwise AddInputChannel may panic. + klog.V(5).Info("Closing funnel") close(funnel.outCh) close(funnel.doneCh) }() @@ -48,6 +50,7 @@ func newEventFunnel(ctx context.Context) *eventFunnel { select { case delta := <-funnel.counterCh: inputs += delta + klog.V(5).Infof("Funnel input channels (%+d): %d", delta, inputs) case <-ctxDoneCh: // Stop waiting for context closure. // Nil channel avoids busy waiting. diff --git a/pkg/kstatus/watcher/object_status_reporter.go b/pkg/kstatus/watcher/object_status_reporter.go index 244dc48d..febad50c 100644 --- a/pkg/kstatus/watcher/object_status_reporter.go +++ b/pkg/kstatus/watcher/object_status_reporter.go @@ -8,7 +8,6 @@ import ( "errors" "fmt" "io" - "strings" "sync" "time" @@ -92,7 +91,9 @@ type ObjectStatusReporter struct { // ObjectFilter is used to decide which objects to ingore. ObjectFilter ObjectFilter - // TODO: handle automatic? + // RESTScope specifies whether to ListAndWatch resources at the namespace + // or cluster (root) level. Using root scope is more efficient, but + // namespace scope may require fewer permissions. RESTScope meta.RESTScope // lock guards modification of the subsequent stateful fields @@ -220,10 +221,15 @@ func (w *ObjectStatusReporter) Start(ctx context.Context) <-chan event.Event { return w.funnel.OutputChannel() } +// Stop triggers the cancellation of the reporter context, and closure of the +// event channel without sending an error event. +func (w *ObjectStatusReporter) Stop() { + klog.V(4).Info("Stopping reporter") + w.cancel() +} + // HasSynced returns true if all the started informers have been synced. // -// TODO: provide a callback function, channel, or event to avoid needing to block with a rety loop. -// // Use the following to block waiting for synchronization: // synced := cache.WaitForCacheSync(stopCh, informer.HasSynced) func (w *ObjectStatusReporter) HasSynced() bool { @@ -322,17 +328,6 @@ func (w *ObjectStatusReporter) startInformerNow( informer := w.InformerFactory.NewInformer(ctx, mapping, gkn.Namespace) - // Handler called when ListAndWatch errors. - // Custom handler stops the informer if the resource is NotFound (CRD deleted). - err = informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) { - w.watchErrorHandler(gkn, err) - }) - if err != nil { - // Should never happen. - // Informer can't have started yet. We just created it. - return fmt.Errorf("failed to set error handler on new informer for %v: %v", mapping.Resource, err) - } - w.informerRefs[gkn].SetInformer(informer) eventCh := make(chan event.Event) @@ -344,6 +339,17 @@ func (w *ObjectStatusReporter) startInformerNow( return fmt.Errorf("informer failed to build event handler: %w", err) } + // Handler called when ListAndWatch errors. + // Custom handler stops the informer if the resource is NotFound (CRD deleted). + err = informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) { + w.watchErrorHandler(gkn, eventCh, err) + }) + if err != nil { + // Should never happen. + // Informer can't have started yet. We just created it. + return fmt.Errorf("failed to set error handler on new informer for %v: %v", mapping.Resource, err) + } + informer.AddEventHandler(w.eventHandler(ctx, eventCh)) // Start the informer in the background. @@ -699,63 +705,59 @@ func (w *ObjectStatusReporter) newStatusCheckTaskFunc( } func (w *ObjectStatusReporter) handleFatalError(eventCh chan<- event.Event, err error) { + klog.V(5).Infof("Reporter error: %v", err) if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - klog.V(5).Infof("Watch closed: %v", err) return } eventCh <- event.Event{ Type: event.ErrorEvent, Error: err, } - // Terminate the reporter. - w.cancel() + w.Stop() } // watchErrorHandler logs errors and cancels the informer for this GroupKind // if the NotFound error is received, which usually means the CRD was deleted. // Based on DefaultWatchErrorHandler from k8s.io/client-go@v0.23.2/tools/cache/reflector.go -func (w *ObjectStatusReporter) watchErrorHandler(gkn GroupKindNamespace, err error) { - // Note: Informers use a stop channel, not a Context, so we don't expect - // Canceled or DeadlineExceeded here. +func (w *ObjectStatusReporter) watchErrorHandler(gkn GroupKindNamespace, eventCh chan<- event.Event, err error) { switch { + // Stop channel closed case err == io.EOF: - // Stop channel closed - klog.V(5).Infof("Watch closed: %v: %v", gkn, err) + klog.V(5).Infof("ListAndWatch error (termination expected): %v: %v", gkn, err) + + // Watch connection closed case err == io.ErrUnexpectedEOF: - // Keep retrying - klog.V(1).Infof("Watch closed: %v: %v", gkn, err) + klog.V(1).Infof("ListAndWatch error (retry expected): %v: %v", gkn, err) + + // Context done case errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded): - // Context cancelled - klog.V(5).Infof("Watch closed: %v: %v", gkn, err) - case apierrors.IsResourceExpired(err): // resourceVersion too old - // Keep retrying - klog.V(5).Infof("Watch closed: %v: %v", gkn, err) - case apierrors.IsGone(err): // DEPRECATED + klog.V(5).Infof("ListAndWatch error (termination expected): %v: %v", gkn, err) + + // resourceVersion too old + case apierrors.IsResourceExpired(err): // Keep retrying - klog.V(5).Infof("Watch closed: %v: %v", gkn, err) - case apierrors.IsNotFound(err) || containsNotFoundMessage(err): // CRD deleted or not created - // Stop watching this resource - klog.V(3).Infof("Watch error: %v: stopping all watchers for this GroupKind: %v", gkn, err) + klog.V(5).Infof("ListAndWatch error (retry expected): %v: %v", gkn, err) + + // Resource unregistered (DEPRECATED, see NotFound) + case apierrors.IsGone(err): + klog.V(5).Infof("ListAndWatch error (retry expected): %v: %v", gkn, err) - // Stop all informers for this GK + // Resource not registered + case apierrors.IsNotFound(err) || containsNotFoundMessage(err): + klog.V(3).Infof("ListAndWatch error (termination expected): %v: stopping all informers for this GroupKind: %v", gkn, err) w.forEachTargetWithGroupKind(gkn.GroupKind(), func(gkn GroupKindNamespace) { w.stopInformer(gkn) }) - default: - // Keep retrying - klog.Warningf("Watch error (will retry): %v: %v", gkn, err) - } -} -// resourceNotFoundMessage is the condition message for metav1.StatusReasonNotFound. -const resourceNotFoundMessage = "the server could not find the requested resource" + // Insufficient permissions + case apierrors.IsForbidden(err) || containsForbiddenMessage(err): + klog.V(3).Infof("ListAndWatch error (termination expected): %v: stopping all informers: %v", gkn, err) + w.handleFatalError(eventCh, err) -// containsNotFoundMessage checks if the error string contains the message for -// StatusReasonNotFound. -// See k8s.io/apimachinery@v0.23.2/pkg/api/errors/errors.go -// This is necessary because the Informer doesn't properly wrap errors. -func containsNotFoundMessage(err error) bool { - return strings.Contains(err.Error(), resourceNotFoundMessage) + // Unexpected error + default: + klog.Warningf("ListAndWatch error (retry expected): %v: %v", gkn, err) + } } // informerReference tracks informer lifecycle.