Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -491,18 +491,16 @@ default Set<Class<? extends HasMetadata>> withPreviousAnnotationForDependentReso

/**
* If the event logic should parse the resourceVersion to determine the ordering of dependent
* resource events. This is typically not needed.
* resource events.
*
* <p>Disabled by default as Kubernetes does not support, and discourages, this interpretation of
* resourceVersions. Enable only if your api server event processing seems to lag the operator
* logic, and you want to further minimize the amount of work done / updates issued by the
* operator.
* <p>Enabled by default as Kubernetes does support this interpretation of resourceVersions.
* Disable only if your api server provides non comparable resource versions..
*
* @return if resource version should be parsed (as integer)
* @since 4.5.0
*/
default boolean parseResourceVersionsForEventFilteringAndCaching() {
return false;
return true;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,18 +96,21 @@ class DefaultInformerEventSourceConfiguration<R extends HasMetadata>
private final GroupVersionKind groupVersionKind;
private final InformerConfiguration<R> informerConfig;
private final KubernetesClient kubernetesClient;
private final boolean comparableResourceVersions;

protected DefaultInformerEventSourceConfiguration(
GroupVersionKind groupVersionKind,
PrimaryToSecondaryMapper<?> primaryToSecondaryMapper,
SecondaryToPrimaryMapper<R> secondaryToPrimaryMapper,
InformerConfiguration<R> informerConfig,
KubernetesClient kubernetesClient) {
KubernetesClient kubernetesClient,
boolean comparableResourceVersions) {
this.informerConfig = Objects.requireNonNull(informerConfig);
this.groupVersionKind = groupVersionKind;
this.primaryToSecondaryMapper = primaryToSecondaryMapper;
this.secondaryToPrimaryMapper = secondaryToPrimaryMapper;
this.kubernetesClient = kubernetesClient;
this.comparableResourceVersions = comparableResourceVersions;
}

@Override
Expand Down Expand Up @@ -135,6 +138,11 @@ public Optional<GroupVersionKind> getGroupVersionKind() {
public Optional<KubernetesClient> getKubernetesClient() {
return Optional.ofNullable(kubernetesClient);
}

@Override
public boolean parseResourceVersionsForEventFilteringAndCaching() {
return this.comparableResourceVersions;
}
}

@SuppressWarnings({"unused", "UnusedReturnValue"})
Expand All @@ -148,6 +156,7 @@ class Builder<R extends HasMetadata> {
private PrimaryToSecondaryMapper<?> primaryToSecondaryMapper;
private SecondaryToPrimaryMapper<R> secondaryToPrimaryMapper;
private KubernetesClient kubernetesClient;
private boolean comparableResourceVersions = true;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add this also to @KubernetesDependent annotation.


private Builder(Class<R> resourceClass, Class<? extends HasMetadata> primaryResourceClass) {
this(resourceClass, primaryResourceClass, null);
Expand Down Expand Up @@ -285,6 +294,11 @@ public Builder<R> withFieldSelector(FieldSelector fieldSelector) {
return this;
}

public Builder<R> parseResourceVersionsForEventFilteringAndCaching(boolean parse) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comparableResourceVersions we should probably have this also as withComparableResourceVersions(boolean) that is a more generic name.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did this to be consistent with the other configuration for now. I can use a different name to start with here, or we can use a follow-up issue on how to approach the renaming in general.

this.comparableResourceVersions = parse;
return this;
}

public void updateFrom(InformerConfiguration<R> informerConfig) {
if (informerConfig != null) {
final var informerConfigName = informerConfig.getName();
Expand Down Expand Up @@ -324,7 +338,10 @@ public InformerEventSourceConfiguration<R> build() {
HasMetadata.getKind(primaryResourceClass),
false)),
config.build(),
kubernetesClient);
kubernetesClient,
comparableResourceVersions);
}
}

boolean parseResourceVersionsForEventFilteringAndCaching();
}
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,11 @@ public static <P extends HasMetadata> P addFinalizerWithSSA(
}
}

public static int compareResourceVersions(HasMetadata h1, HasMetadata h2) {
return compareResourceVersions(
h1.getMetadata().getResourceVersion(), h2.getMetadata().getResourceVersion());
}

public static int compareResourceVersions(String v1, String v2) {
var v1Length = v1.length();
if (v1Length == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,14 @@ public class ControllerEventSource<T extends HasMetadata>

@SuppressWarnings({"unchecked", "rawtypes"})
public ControllerEventSource(Controller<T> controller) {
super(NAME, controller.getCRClient(), controller.getConfiguration(), false);
super(
NAME,
controller.getCRClient(),
controller.getConfiguration(),
controller
.getConfiguration()
.getConfigurationService()
.parseResourceVersionsForEventFilteringAndCaching());
this.controller = controller;

final var config = controller.getConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,11 @@ public InformerEventSource(
this(
configuration,
configuration.getKubernetesClient().orElse(context.getClient()),
context
.getControllerConfiguration()
.getConfigurationService()
.parseResourceVersionsForEventFilteringAndCaching());
configuration.parseResourceVersionsForEventFilteringAndCaching());
}

InformerEventSource(InformerEventSourceConfiguration<R> configuration, KubernetesClient client) {
this(configuration, client, false);
this(configuration, client, true);
}

@SuppressWarnings({"unchecked", "rawtypes"})
Expand Down Expand Up @@ -207,21 +204,8 @@ private synchronized void onAddOrUpdate(
}

private boolean canSkipEvent(R newObject, R oldObject, ResourceID resourceID) {
var res = temporaryResourceCache.getResourceFromCache(resourceID);
if (res.isEmpty()) {
return isEventKnownFromAnnotation(newObject, oldObject);
}
boolean resVersionsEqual =
newObject
.getMetadata()
.getResourceVersion()
.equals(res.get().getMetadata().getResourceVersion());
log.debug(
"Resource found in temporal cache for id: {} resource versions equal: {}",
resourceID,
resVersionsEqual);
return resVersionsEqual
|| temporaryResourceCache.isLaterResourceVersion(resourceID, res.get(), newObject);
return temporaryResourceCache.canSkipEvent(resourceID, newObject)
|| isEventKnownFromAnnotation(newObject, oldObject);
}

private boolean isEventKnownFromAnnotation(R newObject, R oldObject) {
Expand Down Expand Up @@ -301,11 +285,7 @@ public synchronized void handleRecentResourceCreate(ResourceID resourceID, R res

private void handleRecentCreateOrUpdate(Operation operation, R newResource, R oldResource) {
primaryToSecondaryIndex.onAddOrUpdate(newResource);
temporaryResourceCache.putResource(
newResource,
Optional.ofNullable(oldResource)
.map(r -> r.getMetadata().getResourceVersion())
.orElse(null));
temporaryResourceCache.putResource(newResource);
}

private boolean useSecondaryToPrimaryIndex() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,11 @@ public Optional<R> get(ResourceID resourceID) {
: r);
}

public Optional<String> getLastSyncResourceVersion(Optional<String> namespace) {
return getSource(namespace.orElse(WATCH_ALL_NAMESPACES))
.map(source -> source.getLastSyncResourceVersion());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can be replaced by methode reference

Copy link
Collaborator Author

@shawkins shawkins Oct 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, this can be removed - see #3015

}

@Override
public Stream<ResourceID> keys() {
return sources.values().stream().flatMap(Cache::keys);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ public Optional<T> get(ResourceID resourceID) {
return Optional.ofNullable(cache.getByKey(getKey(resourceID)));
}

public String getLastSyncResourceVersion() {
return this.informer.lastSyncResourceVersion();
}

private String getKey(ResourceID resourceID) {
return Cache.namespaceKeyFunc(resourceID.getNamespace().orElse(null), resourceID.getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.config.Informable;
import io.javaoperatorsdk.operator.api.config.NamespaceChangeable;
import io.javaoperatorsdk.operator.api.reconciler.PrimaryUpdateAndCacheUtils;
import io.javaoperatorsdk.operator.api.reconciler.dependent.RecentOperationCacheFiller;
import io.javaoperatorsdk.operator.health.InformerHealthIndicator;
import io.javaoperatorsdk.operator.health.InformerWrappingEventSourceHealthIndicator;
Expand Down Expand Up @@ -122,30 +123,38 @@ public synchronized void stop() {
@Override
public void handleRecentResourceUpdate(
ResourceID resourceID, R resource, R previousVersionOfResource) {
temporaryResourceCache.putResource(
resource, previousVersionOfResource.getMetadata().getResourceVersion());
temporaryResourceCache.putResource(resource);
}

@Override
public void handleRecentResourceCreate(ResourceID resourceID, R resource) {
temporaryResourceCache.putAddedResource(resource);
temporaryResourceCache.putResource(resource);
}

@Override
public Optional<R> get(ResourceID resourceID) {
var res = cache.get(resourceID);
Optional<R> resource = temporaryResourceCache.getResourceFromCache(resourceID);
if (resource.isPresent()) {
log.debug("Resource found in temporary cache for Resource ID: {}", resourceID);
if (parseResourceVersions
&& resource.isPresent()
&& res.filter(
r ->
PrimaryUpdateAndCacheUtils.compareResourceVersions(r, resource.orElseThrow())
> 0)
.isEmpty()) {
log.debug("Latest resource found in temporary cache for Resource ID: {}", resourceID);
return resource;
} else {
log.debug(
"Resource not found in temporary cache reading it from informer cache,"
+ " for Resource ID: {}",
resourceID);
var res = cache.get(resourceID);
log.debug("Resource found in cache: {} for id: {}", res.isPresent(), resourceID);
return res;
}
log.debug(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense also to log if parseResourceVersions is true/false? But that might be trivial to see.

"Resource not found, or older, in temporary cache. Found in informer cache {}, for"
+ " Resource ID: {}",
res.isPresent(),
resourceID);
return res;
}

public Optional<String> getLastSyncResourceVersion(Optional<String> namespace) {
return cache.getLastSyncResourceVersion(namespace);
}

@SuppressWarnings("unused")
Expand Down
Loading