Skip to content

Commit

Permalink
Fix for final inventory update
Browse files Browse the repository at this point in the history
  • Loading branch information
seans3 committed Jan 26, 2021
1 parent b62349a commit 6db68fd
Show file tree
Hide file tree
Showing 6 changed files with 251 additions and 132 deletions.
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -703,8 +703,6 @@ sigs.k8s.io/controller-runtime v0.6.0 h1:Fzna3DY7c4BIP6KwfSlrfnj20DJ+SeMBK8HSFvO
sigs.k8s.io/controller-runtime v0.6.0/go.mod h1:CpYf5pdNY/B352A1TFLAS2JVSlnGQ5O2cftPHndTroo=
sigs.k8s.io/kustomize v2.0.3+incompatible h1:JUufWFNlI44MdtnjUqVnvh29rR37PQFzPbLXqhyOyX0=
sigs.k8s.io/kustomize v2.0.3+incompatible/go.mod h1:MkjgH3RdOWrievjo6c9T245dYlB5QeXV4WCbnt/PEpU=
sigs.k8s.io/kustomize/kyaml v0.10.5 h1:PbJcsZsEM7O3hHtUWTR+4WkHVbQRW9crSy75or1gRbI=
sigs.k8s.io/kustomize/kyaml v0.10.5/go.mod h1:P6Oy/ah/GZMKzJMIJA2a3/bc8YrBkuL5kJji13PSIzY=
sigs.k8s.io/kustomize/kyaml v0.10.6 h1:xUJxc/k8JoWqHUahaB8DTqY0KwEPxTbTGStvW8TOcDc=
sigs.k8s.io/kustomize/kyaml v0.10.6/go.mod h1:K9yg1k/HB/6xNOf5VH3LhTo1DK9/5ykSZO5uIv+Y/1k=
sigs.k8s.io/structured-merge-diff/v3 v3.0.0-20200116222232-67a7b8c61874/go.mod h1:PlARxl6Hbt/+BC80dRLi1qAmnMqwqDg62YvvVkZjemw=
Expand Down
10 changes: 9 additions & 1 deletion pkg/apply/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/klog"
"sigs.k8s.io/cli-utils/pkg/apply/event"
"sigs.k8s.io/cli-utils/pkg/apply/info"
"sigs.k8s.io/cli-utils/pkg/apply/poller"
Expand Down Expand Up @@ -89,6 +90,7 @@ func (a *Applier) Initialize() error {
// resources for the subsequent apply. Returns the sorted resources to
// apply as well as the objects for the prune, or an error if one occurred.
func (a *Applier) prepareObjects(localInv inventory.InventoryInfo, localObjs []*unstructured.Unstructured) (*ResourceObjects, error) {
klog.V(4).Infof("applier preparing %d objects", len(localObjs))
if localInv == nil {
return nil, fmt.Errorf("the local inventory can't be nil")
}
Expand All @@ -97,20 +99,22 @@ func (a *Applier) prepareObjects(localInv inventory.InventoryInfo, localObjs []*
}
// Ensures the namespace exists before applying the inventory object into it.
if invNamespace := inventoryNamespaceInSet(localInv, localObjs); invNamespace != nil {
klog.V(4).Infof("applier prepareObjects applying namespace %s", invNamespace.GetName())
if err := a.invClient.ApplyInventoryNamespace(invNamespace); err != nil {
return nil, err
}
}

klog.V(4).Infof("applier merging %d objects into inventory", len(localObjs))
currentObjs := object.UnstructuredsToObjMetas(localObjs)
// returns the objects (pruneIds) to prune after apply. The prune
// algorithm requires stopping if the merge is not successful. Otherwise,
// the stored objects in inventory could become inconsistent.
pruneIds, err := a.invClient.Merge(localInv, currentObjs)

if err != nil {
return nil, err
}
klog.V(4).Infof("after inventory merge; %d objects to prune", len(pruneIds))
// Sort order for applied resources.
sort.Sort(ordering.SortableUnstructureds(localObjs))

Expand Down Expand Up @@ -173,6 +177,7 @@ func (r *ResourceObjects) AllIds() []object.ObjMetadata {
// cancellation or timeout will only affect how long we Wait for the
// resources to become current.
func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, objects []*unstructured.Unstructured, options Options) <-chan event.Event {
klog.V(4).Infof("apply run for %d objects", len(objects))
eventChannel := make(chan event.Event)
setDefaults(&options)
a.invClient.SetDryRunStrategy(options.DryRunStrategy) // client shared with prune, so sets dry-run for prune too.
Expand All @@ -195,6 +200,7 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, obje
}

// Fetch the queue (channel) of tasks that should be executed.
klog.V(4).Infoln("applier building task queue...")
taskQueue := (&solver.TaskQueueSolver{
PruneOptions: a.PruneOptions,
Factory: a.provider.Factory(),
Expand Down Expand Up @@ -229,7 +235,9 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, obje
}

// Create a new TaskStatusRunner to execute the taskQueue.
klog.V(4).Infoln("applier building TaskStatusRunner...")
runner := taskrunner.NewTaskStatusRunner(resourceObjects.AllIds(), a.StatusPoller)
klog.V(4).Infoln("applier running TaskStatusRunner...")
err = runner.Run(ctx, taskQueue, eventChannel, taskrunner.Options{
PollInterval: options.PollInterval,
UseCache: true,
Expand Down
184 changes: 122 additions & 62 deletions pkg/apply/prune/prune.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,114 +83,174 @@ type Options struct {
}

// Prune deletes the set of resources which were previously applied
// (retrieved from previous inventory objects) but omitted in
// the current apply. Prune also delete all previous inventory
// objects. Returns an error if there was a problem.
func (po *PruneOptions) Prune(localInv inventory.InventoryInfo, localObjs []*unstructured.Unstructured, currentUIDs sets.String,
taskContext *taskrunner.TaskContext, o Options) error {
// but omitted in the current apply. Calculates the set of objects
// to prune by removing the currently applied objects from the union
// set of the previously applied objects and currently applied objects
// stored in the cluster inventory. As a final step, stores the current
// inventory which is all the successfully applied objects and the
// prune failures. Does not stop when encountering prune failures.
// Returns an error for unrecoverable errors.
//
// Parameters:
// localInv - locally read inventory object
// localObjs - locally read, currently applied (attempted) objects
// currentUIDs - UIDs for successfully applied objects
// taskContext - task for apply/prune
func (po *PruneOptions) Prune(localInv inventory.InventoryInfo,
localObjs []*unstructured.Unstructured,
currentUIDs sets.String,
taskContext *taskrunner.TaskContext,
o Options) error {
// Validate parameters
if localInv == nil {
return fmt.Errorf("the local inventory object can't be nil")
}
invNamespace := localInv.Namespace()
klog.V(4).Infof("prune local inventory object: %s/%s", invNamespace, localInv.Name())
// Get the list of Object Meta from the local objects.
localIds := object.UnstructuredsToObjMetas(localObjs)
// Create the set of namespaces for currently (locally) applied objects, including
// the namespace the inventory object lives in (if it's not cluster-scoped). When
// pruning, check this set of namespaces to ensure these namespaces are not deleted.
localNamespaces := mergeObjNamespaces(localObjs)
if invNamespace != "" {
localNamespaces.Insert(invNamespace)
}
clusterObjs, err := po.InvClient.GetClusterObjs(localInv)
localNamespaces := localNamespaces(localInv, localIds)
clusterInv, err := po.InvClient.GetClusterObjs(localInv)
if err != nil {
return err
}
klog.V(4).Infof("prune %d currently applied objects", len(currentUIDs))
klog.V(4).Infof("prune %d previously applied objects", len(clusterObjs))
klog.V(4).Infof("prune: %d objects attempted to apply", len(localIds))
klog.V(4).Infof("prune: %d objects successfully applied", len(currentUIDs))
klog.V(4).Infof("prune: %d union objects stored in cluster inventory", len(clusterInv))
pruneObjs := object.SetDiff(clusterInv, localIds)
klog.V(4).Infof("prune: %d objects to prune (clusterInv - localIds)", len(pruneObjs))
// Sort the resources in reverse order using the same rules as is
// used for apply.
sort.Sort(sort.Reverse(ordering.SortableMetas(clusterObjs)))
for _, clusterObj := range clusterObjs {
mapping, err := po.mapper.RESTMapping(clusterObj.GroupKind)
if err != nil {
localIds = append(localIds, clusterObj)
taskContext.EventChannel() <- createPruneFailedEvent(clusterObj, err)
taskContext.CaptureResourceFailure(clusterObj)
continue
}
namespacedClient := po.client.Resource(mapping.Resource).Namespace(clusterObj.Namespace)
obj, err := namespacedClient.Get(context.TODO(), clusterObj.Name, metav1.GetOptions{})
sort.Sort(sort.Reverse(ordering.SortableMetas(pruneObjs)))
// Store prune failures to ensure they remain in the inventory.
pruneFailures := []object.ObjMetadata{}
for _, pruneObj := range pruneObjs {
klog.V(5).Infof("attempting prune: %s", pruneObj)
obj, err := po.getObject(pruneObj)
if err != nil {
// Object not found in cluster, so no need to delete it; skip to next object.
if apierrors.IsNotFound(err) {
klog.V(5).Infof("%s/%s not found in cluster--skipping",
pruneObj.Namespace, pruneObj.Name)
continue
}
localIds = append(localIds, clusterObj)
taskContext.EventChannel() <- createPruneFailedEvent(clusterObj, err)
taskContext.CaptureResourceFailure(clusterObj)
continue
}
metadata, err := meta.Accessor(obj)
if err != nil {
localIds = append(localIds, clusterObj)
taskContext.EventChannel() <- createPruneFailedEvent(clusterObj, err)
taskContext.CaptureResourceFailure(clusterObj)
if klog.V(5) {
klog.Errorf("prune obj (%s/%s) UID retrival error: %s",
pruneObj.Namespace, pruneObj.Name, err)
}
pruneFailures = append(pruneFailures, pruneObj)
taskContext.EventChannel() <- createPruneFailedEvent(pruneObj, err)
taskContext.CaptureResourceFailure(pruneObj)
continue
}
// If this cluster object is not also a currently applied
// object, then it has been omitted--prune it. If the cluster
// object is part of the local apply set, skip it.
uid := string(metadata.GetUID())
klog.V(7).Infof("prune previously applied object UID: %s", uid)
// Do not prune objects that are in set of currently applied objects.
uid := string(obj.GetUID())
if currentUIDs.Has(uid) {
klog.V(7).Infof("prune object in current apply; do not prune: %s", uid)
klog.V(5).Infof("prune object in current apply; do not prune: %s", uid)
continue
}
// Handle lifecycle directive preventing deletion.
if !canPrune(localInv, obj, o.InventoryPolicy, uid) {
taskContext.EventChannel() <- createPruneEvent(clusterObj, obj, event.PruneSkipped)
localIds = append(localIds, clusterObj)
klog.V(4).Infof("skip prune for lifecycle directive %s/%s", pruneObj.Namespace, pruneObj.Name)
taskContext.EventChannel() <- createPruneEvent(pruneObj, obj, event.PruneSkipped)
pruneFailures = append(pruneFailures, pruneObj)
continue
}
// If regular pruning (not destroying), skip deleting namespace containing
// currently applied objects.
if !po.Destroy {
if clusterObj.GroupKind == object.CoreV1Namespace.GroupKind() &&
localNamespaces.Has(clusterObj.Name) {
klog.V(4).Infof("skip pruning namespace: %s", clusterObj.Name)
taskContext.EventChannel() <- createPruneEvent(clusterObj, obj, event.PruneSkipped)
localIds = append(localIds, clusterObj)
taskContext.CaptureResourceFailure(clusterObj)
if pruneObj.GroupKind == object.CoreV1Namespace.GroupKind() &&
localNamespaces.Has(pruneObj.Name) {
klog.V(4).Infof("skip pruning namespace: %s", pruneObj.Name)
taskContext.EventChannel() <- createPruneEvent(pruneObj, obj, event.PruneSkipped)
pruneFailures = append(pruneFailures, pruneObj)
taskContext.CaptureResourceFailure(pruneObj)
continue
}
}
if !o.DryRunStrategy.ClientOrServerDryRun() {
klog.V(4).Infof("prune object delete: %s/%s", clusterObj.Namespace, clusterObj.Name)
err = namespacedClient.Delete(context.TODO(), clusterObj.Name, metav1.DeleteOptions{})
klog.V(4).Infof("prune object delete: %s/%s", pruneObj.Namespace, pruneObj.Name)
namespacedClient, err := po.namespacedClient(pruneObj)
if err != nil {
taskContext.EventChannel() <- createPruneFailedEvent(clusterObj, err)
localIds = append(localIds, clusterObj)
taskContext.CaptureResourceFailure(clusterObj)
if klog.V(4) {
klog.Errorf("prune failed for %s/%s (%s)", pruneObj.Namespace, pruneObj.Name, err)
}
taskContext.EventChannel() <- createPruneFailedEvent(pruneObj, err)
pruneFailures = append(pruneFailures, pruneObj)
taskContext.CaptureResourceFailure(pruneObj)
continue
}
err = namespacedClient.Delete(context.TODO(), pruneObj.Name, metav1.DeleteOptions{})
if err != nil {
if klog.V(4) {
klog.Errorf("prune failed for %s/%s (%s)", pruneObj.Namespace, pruneObj.Name, err)
}
taskContext.EventChannel() <- createPruneFailedEvent(pruneObj, err)
pruneFailures = append(pruneFailures, pruneObj)
taskContext.CaptureResourceFailure(pruneObj)
continue
}
}
taskContext.EventChannel() <- createPruneEvent(pruneObj, obj, event.Pruned)
}
// Calculate final inventory items, ensuring only successfully applied
// objects are in the inventory along with prune failures.
finalInventory := []object.ObjMetadata{}
for _, localObj := range localIds {
obj, err := po.getObject(localObj)
if err != nil {
if klog.V(4) {
klog.Errorf("error retrieving object for inventory determination: %s", err)
}
continue
}
uid := string(obj.GetUID())
if currentUIDs.Has(uid) {
klog.V(5).Infof("adding final inventory object %s/%s", localObj.Namespace, localObj.Name)
finalInventory = append(finalInventory, localObj)
} else {
klog.V(5).Infof("uid not found (%s); not adding final inventory obj %s/%s",
uid, localObj.Namespace, localObj.Name)
}
taskContext.EventChannel() <- createPruneEvent(clusterObj, obj, event.Pruned)
}
return po.InvClient.Replace(localInv, localIds)
klog.V(4).Infof("final inventory %d successfully applied objects", len(finalInventory))
finalInventory = append(finalInventory, pruneFailures...)
klog.V(4).Infof("final inventory %d objects after appending prune failures", len(finalInventory))
return po.InvClient.Replace(localInv, finalInventory)
}

// mergeObjNamespaces returns a set of strings of all the namespaces
// for non cluster-scoped objects. These namespaces are forced to
// lower-case.
func mergeObjNamespaces(objs []*unstructured.Unstructured) sets.String {
func (po *PruneOptions) namespacedClient(obj object.ObjMetadata) (dynamic.ResourceInterface, error) {
mapping, err := po.mapper.RESTMapping(obj.GroupKind)
if err != nil {
return nil, err
}
return po.client.Resource(mapping.Resource).Namespace(obj.Namespace), nil
}

func (po *PruneOptions) getObject(obj object.ObjMetadata) (*unstructured.Unstructured, error) {
namespacedClient, err := po.namespacedClient(obj)
if err != nil {
return nil, err
}
return namespacedClient.Get(context.TODO(), obj.Name, metav1.GetOptions{})
}

// localNamespaces returns a set of strings of all the namespaces
// for the passed non cluster-scoped localObjs, plus the namespace
// of the passed inventory object.
func localNamespaces(localInv inventory.InventoryInfo, localObjs []object.ObjMetadata) sets.String {
namespaces := sets.NewString()
for _, obj := range objs {
namespace := strings.TrimSpace(strings.ToLower(obj.GetNamespace()))
for _, obj := range localObjs {
namespace := strings.TrimSpace(strings.ToLower(obj.Namespace))
if namespace != "" {
namespaces.Insert(namespace)
}
}
invNamespace := strings.TrimSpace(strings.ToLower(localInv.Namespace()))
if invNamespace != "" {
namespaces.Insert(invNamespace)
}
return namespaces
}

Expand Down
Loading

0 comments on commit 6db68fd

Please sign in to comment.