Skip to content

Commit

Permalink
Merge pull request #322 from seans3/final-inventory-update
Browse files Browse the repository at this point in the history
Better final inventory calculation during errors
  • Loading branch information
k8s-ci-robot authored Feb 5, 2021
2 parents dfab5b4 + 185aa3d commit 7992542
Show file tree
Hide file tree
Showing 8 changed files with 332 additions and 56 deletions.
15 changes: 15 additions & 0 deletions pkg/apply/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ func (a *Applier) prepareObjects(localInv inventory.InventoryInfo, localObjs []*
return nil, err
}
}
// Retrieve previous inventory objects. Must happen before inventory client merge.
prevInv, err := a.invClient.GetClusterObjs(localInv)
if err != nil {
return nil, err
}
klog.V(4).Infof("%d previous inventory objects in cluster", len(prevInv))

klog.V(4).Infof("applier merging %d objects into inventory", len(localObjs))
currentObjs := object.UnstructuredsToObjMetas(localObjs)
Expand All @@ -122,6 +128,7 @@ func (a *Applier) prepareObjects(localInv inventory.InventoryInfo, localObjs []*
LocalInv: localInv,
Resources: localObjs,
PruneIds: pruneIds,
PrevInv: prevInv,
}, nil
}

Expand All @@ -132,6 +139,7 @@ type ResourceObjects struct {
LocalInv inventory.InventoryInfo
Resources []*unstructured.Unstructured
PruneIds []object.ObjMetadata
PrevInv []object.ObjMetadata
}

// ObjsForApply returns the unstructured representation for all the resources
Expand Down Expand Up @@ -162,6 +170,13 @@ func (r *ResourceObjects) IdsForPrune() []object.ObjMetadata {
return r.PruneIds
}

// IdsForPrevInv returns the Ids for the previous inventory. These
// Ids reference the objects managed by the inventory object which
// are already in the cluster.
func (r *ResourceObjects) IdsForPrevInv() []object.ObjMetadata {
return r.PrevInv
}

// AllIds returns the Ids for all resources that are relevant. This
// includes resources that will be applied or pruned.
func (r *ResourceObjects) AllIds() []object.ObjMetadata {
Expand Down
26 changes: 3 additions & 23 deletions pkg/apply/prune/prune.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,29 +194,9 @@ func (po *PruneOptions) Prune(localInv inventory.InventoryInfo,
}
taskContext.EventChannel() <- createPruneEvent(pruneObj, obj, event.Pruned)
}
// Calculate final inventory items, ensuring only successfully applied
// objects are in the inventory along with prune failures.
finalInventory := []object.ObjMetadata{}
for _, localObj := range localIds {
obj, err := po.getObject(localObj)
if err != nil {
if klog.V(4) {
klog.Errorf("error retrieving object for inventory determination: %s", err)
}
continue
}
uid := string(obj.GetUID())
if currentUIDs.Has(uid) {
klog.V(5).Infof("adding final inventory object %s/%s", localObj.Namespace, localObj.Name)
finalInventory = append(finalInventory, localObj)
} else {
klog.V(5).Infof("uid not found (%s); not adding final inventory obj %s/%s",
uid, localObj.Namespace, localObj.Name)
}
}
klog.V(4).Infof("final inventory %d successfully applied objects", len(finalInventory))
finalInventory = append(finalInventory, pruneFailures...)
klog.V(4).Infof("final inventory %d objects after appending prune failures", len(finalInventory))
// Final inventory equals applied objects and prune failures.
appliedResources := taskContext.AppliedResources()
finalInventory := append(appliedResources, pruneFailures...)
return po.InvClient.Replace(localInv, finalInventory)
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/apply/prune/prune_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,11 @@ func TestPrune(t *testing.T) {
// the events that can be put on it.
eventChannel := make(chan event.Event, len(tc.pastObjs)+1) // Add one for inventory object
taskContext := taskrunner.NewTaskContext(eventChannel)
for _, u := range tc.currentObjs {
o := object.UnstructuredToObjMeta(u)
uid := u.GetUID()
taskContext.ResourceApplied(o, uid, 0)
}
err := func() error {
defer close(eventChannel)
// Run the prune and validate.
Expand Down
9 changes: 9 additions & 0 deletions pkg/apply/solver/solver.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type resourceObjects interface {
Inventory() inventory.InventoryInfo
IdsForApply() []object.ObjMetadata
IdsForPrune() []object.ObjMetadata
IdsForPrevInv() []object.ObjMetadata
}

// BuildTaskQueue takes a set of resources in the form of info objects
Expand All @@ -65,12 +66,19 @@ func (t *TaskQueueSolver) BuildTaskQueue(ro resourceObjects,
o Options) chan taskrunner.Task {
var tasks []taskrunner.Task
remainingInfos := ro.ObjsForApply()
// Convert slice of previous inventory objects into a map.
prevInvSlice := ro.IdsForPrevInv()
prevInventory := make(map[object.ObjMetadata]bool, len(prevInvSlice))
for _, prevInvObj := range prevInvSlice {
prevInventory[prevInvObj] = true
}

crdSplitRes, hasCRDs := splitAfterCRDs(remainingInfos)
if hasCRDs {
tasks = append(tasks, &task.ApplyTask{
Objects: append(crdSplitRes.before, crdSplitRes.crds...),
CRDs: crdSplitRes.crds,
PrevInventory: prevInventory,
ServerSideOptions: o.ServerSideOptions,
DryRunStrategy: o.DryRunStrategy,
InfoHelper: t.InfoHelper,
Expand All @@ -96,6 +104,7 @@ func (t *TaskQueueSolver) BuildTaskQueue(ro resourceObjects,
&task.ApplyTask{
Objects: remainingInfos,
CRDs: crdSplitRes.crds,
PrevInventory: prevInventory,
ServerSideOptions: o.ServerSideOptions,
DryRunStrategy: o.DryRunStrategy,
InfoHelper: t.InfoHelper,
Expand Down
13 changes: 9 additions & 4 deletions pkg/apply/solver/solver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,10 +309,11 @@ func getType(task taskrunner.Task) reflect.Type {
}

type fakeResourceObjects struct {
objsForApply []*unstructured.Unstructured
inventory inventory.InventoryInfo
idsForApply []object.ObjMetadata
idsForPrune []object.ObjMetadata
objsForApply []*unstructured.Unstructured
inventory inventory.InventoryInfo
idsForApply []object.ObjMetadata
idsForPrune []object.ObjMetadata
idsForPrevInv []object.ObjMetadata
}

func (f *fakeResourceObjects) ObjsForApply() []*unstructured.Unstructured {
Expand All @@ -331,6 +332,10 @@ func (f *fakeResourceObjects) IdsForPrune() []object.ObjMetadata {
return f.idsForPrune
}

func (f *fakeResourceObjects) IdsForPrevInv() []object.ObjMetadata {
return f.idsForPrevInv
}

func ignoreErrInfoToObjMeta(info *unstructured.Unstructured) object.ObjMetadata {
objMeta := object.UnstructuredToObjMeta(info)
return objMeta
Expand Down
78 changes: 50 additions & 28 deletions pkg/apply/task/apply_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/klog"
"k8s.io/kubectl/pkg/cmd/apply"
"k8s.io/kubectl/pkg/cmd/delete"
cmddelete "k8s.io/kubectl/pkg/cmd/delete"
"k8s.io/kubectl/pkg/cmd/util"
"k8s.io/kubectl/pkg/util/slice"
applyerror "sigs.k8s.io/cli-utils/pkg/apply/error"
Expand Down Expand Up @@ -45,11 +45,13 @@ type applyOptions interface {
// ApplyTask applies the given Objects to the cluster
// by using the ApplyOptions.
type ApplyTask struct {
Factory util.Factory
InfoHelper info.InfoHelper
Mapper meta.RESTMapper
Objects []*unstructured.Unstructured
CRDs []*unstructured.Unstructured
Factory util.Factory
InfoHelper info.InfoHelper
Mapper meta.RESTMapper
Objects []*unstructured.Unstructured
CRDs []*unstructured.Unstructured
// Used for determining inventory during errors
PrevInventory map[object.ObjMetadata]bool
DryRunStrategy common.DryRunStrategy
ServerSideOptions common.ServerSideOptions
InventoryPolicy inventory.InventoryPolicy
Expand Down Expand Up @@ -80,6 +82,7 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) {
// we have a CRD and a CR in the same resource set, but the CRD
// will not actually have been applied when we reach the CR.
if a.DryRunStrategy.ClientOrServerDryRun() {
klog.V(4).Infof("dry-run filtering custom resources...")
// Find all resources in the set that doesn't exist in the
// RESTMapper, but where we do have the CRD for the type in
// the resource set.
Expand All @@ -97,6 +100,7 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) {
taskContext.EventChannel() <- createApplyEvent(object.UnstructuredToObjMeta(obj), event.Created, nil)
}
// Update the resource set to no longer include the CRs.
klog.V(4).Infof("after dry-run filtering custom resources, %d objects left", len(objs))
objects = objs
}

Expand All @@ -123,7 +127,8 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) {
}

klog.V(4).Infof("attempting to apply %d remaining objects", len(objects))
var infos []*resource.Info
// invInfos stores the objects which should be stored in the final inventory.
invInfos := make(map[object.ObjMetadata]*resource.Info, len(objects))
for _, obj := range objects {
// Set the client and mapping fields on the provided
// info so they can be applied to the cluster.
Expand All @@ -144,18 +149,25 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) {
if err != nil {
if !apierrors.IsNotFound(err) {
if klog.V(4) {
klog.Errorf("error retrieving %s/%s from cluster--continue",
klog.Errorf("error (%s) retrieving %s/%s from cluster--continue",
err, info.Namespace, info.Name)
}
op := event.Failed
if a.objInCluster(id) {
// Object in cluster stays in the inventory.
klog.V(4).Infof("%s/%s apply retrieval failure, but in cluster--keep in inventory",
info.Namespace, info.Name)
invInfos[id] = info
op = event.Unchanged
}
taskContext.EventChannel() <- createApplyEvent(
id,
event.Unchanged,
err)
taskContext.EventChannel() <- createApplyEvent(id, op, err)
taskContext.CaptureResourceFailure(id)
continue
}
}
infos = append(infos, info)
// At this point the object was either 1) successfully retrieved from the cluster, or
// 2) returned "Not Found" error (meaning first-time creation). Add to final inventory.
invInfos[id] = info
canApply, err := inventory.CanApply(a.InvInfo, clusterObj, a.InventoryPolicy)
if !canApply {
klog.V(5).Infof("can not apply %s/%s--continue",
Expand All @@ -176,30 +188,30 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) {
if klog.V(4) {
klog.Errorf("error applying (%s/%s) %s", info.Namespace, info.Name, err)
}
// If apply failed and the object is not in the cluster, remove
// it from the final inventory.
if !a.objInCluster(id) {
klog.V(5).Infof("not in cluster; removing apply fail object %s/%s from inventory",
info.Namespace, info.Name)
delete(invInfos, id)
}
taskContext.EventChannel() <- createApplyEvent(
id, event.Failed, applyerror.NewApplyRunError(err))
taskContext.CaptureResourceFailure(id)
}
}

// Fetch the Generation from all Infos after they have been
// applied.
for _, inf := range infos {
id, err := object.InfoToObjMeta(inf)
if err != nil {
continue
}
if inf.Object != nil {
acc, err := meta.Accessor(inf.Object)
// Store objects (and some obj metadata) in the task context
// for the final inventory.
for id, info := range invInfos {
if info.Object != nil {
acc, err := meta.Accessor(info.Object)
if err != nil {
continue
}
// Only add a resource if it successfully applied.
uid := acc.GetUID()
if string(uid) != "" {
gen := acc.GetGeneration()
taskContext.ResourceApplied(id, uid, gen)
}
gen := acc.GetGeneration()
taskContext.ResourceApplied(id, uid, gen)
}
}
a.sendTaskResult(taskContext)
Expand Down Expand Up @@ -232,7 +244,7 @@ func newApplyOptions(eventChannel chan event.Event, serverSideOptions common.Ser
},
// FilenameOptions are not needed since we don't use the ApplyOptions
// to read manifests.
DeleteOptions: &delete.DeleteOptions{},
DeleteOptions: &cmddelete.DeleteOptions{},
PrintFlags: &genericclioptions.PrintFlags{
OutputFormat: &emptyString,
},
Expand Down Expand Up @@ -292,6 +304,16 @@ func (a *ApplyTask) filterCRsWithCRDInSet(objects []*unstructured.Unstructured)
return objs, objsWithCRD, nil
}

// objInCluster returns true if the passed object is in the slice of
// previous inventory, because an object in the previous inventory
// exists in the cluster.
func (a *ApplyTask) objInCluster(obj object.ObjMetadata) bool {
if _, found := a.PrevInventory[obj]; found {
return true
}
return false
}

type crdsInfo struct {
crds []crdInfo
}
Expand Down
Loading

0 comments on commit 7992542

Please sign in to comment.