Skip to content

feat: cache update on update control #2789

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 37 additions & 6 deletions docs/content/en/docs/documentation/reconciler.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,39 @@ These utility methods come in two flavors:

#### Using internal cache

In almost all cases for this purpose, you can use internal caches:
In almost all cases for this purpose, you can use internal caches. You can turn on this functionality for
`UpdateControl`, thus all updates for `UpdateControl` will be guaranteed to be present in the next reconciliation:

```java
@Override
Operator operator = new Operator(o -> o
.withParseResourceVersions(true)
.withGuaranteeUpdatedPrimaryIsAvailableForNextReconciliation(true));
```

With this setup the following trivial code will cache the resource for the next reconciliation:

```java

@Override
public UpdateControl<UpdateControlPrimaryCacheCustomResource> reconcile(
UpdateControlPrimaryCacheCustomResource resource,
Context<UpdateControlPrimaryCacheCustomResource> context) {

// your logic omitted
var freshCopy = createFreshCopy(resource); // create fresh copy for SSA
freshCopy.getStatus().setValue(statusWithState());

return UpdateControl.patchStatus(freshCopy);
}
```
See related [integration test](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/updatecontrol/UpdateControlPrimaryCacheCustomResource.java).

If you need to do update of the resource during the reconciliation you can just use `PrimaryUpdateAndCacheUtils`.
(The code below is equivalent to the one above.)
Note that this utility does not require to set `withGuaranteeUpdatedPrimaryIsAvailableForNextReconciliation` to true.

```java
@Override
public UpdateControl<StatusPatchCacheCustomResource> reconcile(
StatusPatchCacheCustomResource resource, Context<StatusPatchCacheCustomResource> context) {

Expand All @@ -207,10 +236,12 @@ public UpdateControl<StatusPatchCacheCustomResource> reconcile(
}
```

In the background `PrimaryUpdateAndCacheUtils.ssaPatchAndCacheStatus` puts the result of the update into an internal
cache and will make sure that the next reconciliation will contain the most recent version of the resource. Note that it
is not necessarily the version of the resource you got as response from the update, it can be newer since other parties
can do additional updates meanwhile, but if not explicitly modified, it will contain the up-to-date status.
In the background `PrimaryUpdateAndCacheUtils.ssaPatchAndCacheStatus` and also the mechanism behind `UpdateControl.patchStatus`
puts the result of the update into an internal cache and will make sure that the next reconciliation will contain the
most recent version of the resource.
Note that it is not necessarily the version of the resource you got as a response
from the update, it can be more recent since other parties can do additional updates meanwhile.
But if not explicitly modified by another party, it will contain the up-to-date status.

See related integration test [here](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,14 +457,39 @@ default boolean previousAnnotationForDependentResourcesEventFiltering() {
* logic, and you want to further minimize the amount of work done / updates issued by the
* operator.
*
* @return if resource version should be parsed (as integer)
* @since 4.5.0
* @return if resource version should be parsed (as integer)
* @return if resourceVersion should be parsed (as integer)
*/
default boolean parseResourceVersionsForEventFilteringAndCaching() {
return false;
}

/**
* Guarantees that primary resource updated by {@link
* io.javaoperatorsdk.operator.api.reconciler.UpdateControl} will be available for next
* reconciliation.
*
* <p>If by default you update the primary resource from reconciler regardless if you use {@link
* io.javaoperatorsdk.operator.api.reconciler.UpdateControl} or directly the Kubernetes client, it
* is not guaranteed that for the next reconciliation will receive the most up-to-date resource.
* This is a consequence of how the informers work in Operator pattern.
*
* <p>However, the framework can be smart about it and make such guarantees, but this can be done
* absolutely reliably only if it bends some rules of the Kubernetes API contract. Thus, if we
* parse the resourceVersion of a primary resource as an integer and compare it with the version
* of the resource in the cache. Note that this is a gray area, since with default setup Etcd
* guarantees such monotonically increasing resource versions. But since it is still bending the
* rules by default, this feature is turned off, and requires {@link
* #parseResourceVersionsForEventFilteringAndCaching} is also set to true.
*
* <p>See also {@link io.javaoperatorsdk.operator.api.reconciler.PrimaryUpdateAndCacheUtils}
*
* @return if the framework should cache the updated resource for next reconciliation
*/
default boolean guaranteeUpdatedPrimaryIsAvailableForNextReconciliation() {
return false;
}

/**
* {@link io.javaoperatorsdk.operator.api.reconciler.UpdateControl} patch resource or status can
* either use simple patches or SSA. Setting this to {@code true}, controllers will use SSA for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class ConfigurationServiceOverrider {
private Boolean parseResourceVersions;
private Boolean useSSAToPatchPrimaryResource;
private Boolean cloneSecondaryResourcesWhenGettingFromCache;
private Boolean cacheUpdatedResourcesViaUpdateControl;

@SuppressWarnings("rawtypes")
private DependentResourceFactory dependentResourceFactory;
Expand Down Expand Up @@ -188,6 +189,12 @@ public ConfigurationServiceOverrider withCloneSecondaryResourcesWhenGettingFromC
return this;
}

public ConfigurationServiceOverrider withGuaranteeUpdatedPrimaryIsAvailableForNextReconciliation(
boolean value) {
this.cacheUpdatedResourcesViaUpdateControl = value;
return this;
}

public ConfigurationService build() {
return new BaseConfigurationService(original.getVersion(), cloner, client) {
@Override
Expand Down Expand Up @@ -328,6 +335,13 @@ public boolean cloneSecondaryResourcesWhenGettingFromCache() {
cloneSecondaryResourcesWhenGettingFromCache,
ConfigurationService::cloneSecondaryResourcesWhenGettingFromCache);
}

@Override
public boolean guaranteeUpdatedPrimaryIsAvailableForNextReconciliation() {
return overriddenValueOrDefault(
cacheUpdatedResourcesViaUpdateControl,
ConfigurationService::guaranteeUpdatedPrimaryIsAvailableForNextReconciliation);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.processing.Controller;
import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerEventSource;

import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.*;

Expand Down Expand Up @@ -60,7 +61,8 @@ public ReconciliationDispatcher(Controller<P> controller) {
new CustomResourceFacade<>(
controller.getCRClient(),
controller.getConfiguration(),
controller.getConfiguration().getConfigurationService().getResourceCloner()));
controller.getConfiguration().getConfigurationService().getResourceCloner(),
controller.getEventSourceManager().getControllerEventSource()));
}

public PostExecutionControl<P> handleExecution(ExecutionScope<P> executionScope) {
Expand Down Expand Up @@ -175,7 +177,7 @@ private PostExecutionControl<P> reconcileExecution(
}

if (updateControl.isPatchStatus()) {
customResourceFacade.patchStatus(toUpdate, originalResource);
updatedCustomResource = customResourceFacade.patchStatus(toUpdate, originalResource);
}
return createPostExecutionControl(updatedCustomResource, updateControl);
}
Expand Down Expand Up @@ -315,7 +317,7 @@ private P addFinalizerWithSSA(P originalResource) {
objectMeta.setNamespace(originalResource.getMetadata().getNamespace());
resource.setMetadata(objectMeta);
resource.addFinalizer(configuration().getFinalizerName());
return customResourceFacade.patchResourceWithSSA(resource);
return customResourceFacade.patchResourceWithSSA(resource, originalResource);
} catch (InstantiationException
| IllegalAccessException
| InvocationTargetException
Expand Down Expand Up @@ -414,15 +416,42 @@ static class CustomResourceFacade<R extends HasMetadata> {
private final boolean useSSA;
private final String fieldManager;
private final Cloner cloner;
private final boolean cacheUpdatedResources;

private final ControllerEventSource<R> controllerEventSource;

public CustomResourceFacade(
MixedOperation<R, KubernetesResourceList<R>, Resource<R>> resourceOperation,
ControllerConfiguration<R> configuration,
Cloner cloner) {
Cloner cloner,
ControllerEventSource<R> controllerEventSource) {
this.resourceOperation = resourceOperation;
this.useSSA = configuration.getConfigurationService().useSSAToPatchPrimaryResource();
this.fieldManager = configuration.fieldManager();
this.cacheUpdatedResources =
configuration
.getConfigurationService()
.guaranteeUpdatedPrimaryIsAvailableForNextReconciliation();

this.cloner = cloner;
this.controllerEventSource = controllerEventSource;

if (cacheUpdatedResources
&& !configuration
.getConfigurationService()
.parseResourceVersionsForEventFilteringAndCaching()) {
throw new OperatorException(
"guaranteeUpdatedPrimaryIsAvailableForNextReconciliation is set to true, but"
+ " previousAnnotationForDependentResourcesEventFiltering is set to false, set this"
+ " flag also to true if you want to use this feature");
}
}

private void cachePrimaryResource(R updatedResource, R previousVersionOfResource) {
if (cacheUpdatedResources) {
controllerEventSource.handleRecentResourceUpdate(
ResourceID.fromResource(updatedResource), updatedResource, previousVersionOfResource);
}
}

public R getResource(String namespace, String name) {
Expand All @@ -434,7 +463,9 @@ public R getResource(String namespace, String name) {
}

public R patchResourceWithoutSSA(R resource, R originalResource) {
return resource(originalResource).edit(r -> resource);
R updated = resource(originalResource).edit(r -> resource);
cachePrimaryResource(updated, originalResource);
return updated;
}

public R patchResource(R resource, R originalResource) {
Expand All @@ -444,32 +475,43 @@ public R patchResource(R resource, R originalResource) {
ResourceID.fromResource(resource),
resource.getMetadata().getResourceVersion());
}
R updated;
if (useSSA) {
return patchResourceWithSSA(resource);
return patchResourceWithSSA(resource, originalResource);
} else {
return resource(originalResource).edit(r -> resource);
updated = resource(originalResource).edit(r -> resource);
cachePrimaryResource(updated, originalResource);
return updated;
}
}

public R patchStatus(R resource, R originalResource) {
log.trace("Patching status for resource: {} with ssa: {}", resource, useSSA);
if (useSSA) {
R updated = null;
var managedFields = resource.getMetadata().getManagedFields();
try {
resource.getMetadata().setManagedFields(null);
var res = resource(resource);
return res.subresource("status")
.patch(
new PatchContext.Builder()
.withFieldManager(fieldManager)
.withForce(true)
.withPatchType(PatchType.SERVER_SIDE_APPLY)
.build());
updated =
res.subresource("status")
.patch(
new PatchContext.Builder()
.withFieldManager(fieldManager)
.withForce(true)
.withPatchType(PatchType.SERVER_SIDE_APPLY)
.build());
return updated;
} finally {
resource.getMetadata().setManagedFields(managedFields);
if (updated != null) {
cachePrimaryResource(updated, originalResource);
}
}
} else {
return editStatus(resource, originalResource);
R updated = editStatus(resource, originalResource);
cachePrimaryResource(updated, originalResource);
return updated;
}
}

Expand All @@ -490,14 +532,17 @@ private R editStatus(R resource, R originalResource) {
}
}

public R patchResourceWithSSA(R resource) {
return resource(resource)
.patch(
new PatchContext.Builder()
.withFieldManager(fieldManager)
.withForce(true)
.withPatchType(PatchType.SERVER_SIDE_APPLY)
.build());
public R patchResourceWithSSA(R resource, R originalResource) {
R updated =
resource(resource)
.patch(
new PatchContext.Builder()
.withFieldManager(fieldManager)
.withForce(true)
.withPatchType(PatchType.SERVER_SIDE_APPLY)
.build());
cachePrimaryResource(updated, originalResource);
return updated;
}

private Resource<R> resource(R resource) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ void addFinalizerOnNewResource() {
verify(reconciler, never()).reconcile(ArgumentMatchers.eq(testCustomResource), any());
verify(customResourceFacade, times(1))
.patchResourceWithSSA(
argThat(testCustomResource -> testCustomResource.hasFinalizer(DEFAULT_FINALIZER)));
argThat(testCustomResource -> testCustomResource.hasFinalizer(DEFAULT_FINALIZER)),
any());
}

@Test
Expand Down Expand Up @@ -357,13 +358,13 @@ void doesNotUpdateTheResourceIfNoUpdateUpdateControlIfFinalizerSet() {
void addsFinalizerIfNotMarkedForDeletionAndEmptyCustomResourceReturned() {
removeFinalizers(testCustomResource);
reconciler.reconcile = (r, c) -> UpdateControl.noUpdate();
when(customResourceFacade.patchResourceWithSSA(any())).thenReturn(testCustomResource);
when(customResourceFacade.patchResourceWithSSA(any(), any())).thenReturn(testCustomResource);

var postExecControl =
reconciliationDispatcher.handleExecution(executionScopeWithCREvent(testCustomResource));

verify(customResourceFacade, times(1))
.patchResourceWithSSA(argThat(a -> !a.getMetadata().getFinalizers().isEmpty()));
.patchResourceWithSSA(argThat(a -> !a.getMetadata().getFinalizers().isEmpty()), any());
assertThat(postExecControl.updateIsStatusPatch()).isFalse();
assertThat(postExecControl.getUpdatedCustomResource()).isPresent();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ public TestController(
reconciler,
new TestConfiguration(true, onAddFilter, onUpdateFilter, genericFilter),
MockKubernetesClient.client(TestCustomResource.class));

when(eventSourceManager.getControllerEventSource())
.thenReturn(mock(ControllerEventSource.class));
}

public TestController(boolean generationAware) {
Expand All @@ -176,6 +179,9 @@ public TestController(boolean generationAware) {

@Override
public EventSourceManager<TestCustomResource> getEventSourceManager() {
if (eventSourceManager == null) {
return super.getEventSourceManager();
}
return eventSourceManager;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.javaoperatorsdk.operator.baseapi.statuscache.updatecontrol;

import io.fabric8.kubernetes.api.model.Namespaced;
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.model.annotation.Group;
import io.fabric8.kubernetes.model.annotation.ShortNames;
import io.fabric8.kubernetes.model.annotation.Version;

@Group("sample.javaoperatorsdk")
@Version("v1")
@ShortNames("ucpc")
public class UpdateControlPrimaryCacheCustomResource
extends CustomResource<UpdateControlPrimaryCacheSpec, UpdateControlPrimaryCacheStatus>
implements Namespaced {}
Loading