Skip to content

Commit fd5a12c

Browse files
committed
removing tombstones
1 parent 4a02911 commit fd5a12c

File tree

4 files changed

+25
-52
lines changed

4 files changed

+25
-52
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,11 @@ public Optional<R> get(ResourceID resourceID) {
220220
? controllerConfiguration.getConfigurationService().getResourceCloner().clone(r)
221221
: r);
222222
}
223+
224+
public Optional<String> getLastSyncResourceVersion(Optional<String> namespace) {
225+
return getSource(namespace.orElse(WATCH_ALL_NAMESPACES))
226+
.map(source -> source.getLastSyncResourceVersion());
227+
}
223228

224229
@Override
225230
public Stream<ResourceID> keys() {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,10 @@ public void stop() throws OperatorException {
155155
public Optional<T> get(ResourceID resourceID) {
156156
return Optional.ofNullable(cache.getByKey(getKey(resourceID)));
157157
}
158+
159+
public String getLastSyncResourceVersion() {
160+
return this.informer.lastSyncResourceVersion();
161+
}
158162

159163
private String getKey(ResourceID resourceID) {
160164
return Cache.namespaceKeyFunc(resourceID.getNamespace().orElse(null), resourceID.getName());

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,11 @@ public Optional<R> get(ResourceID resourceID) {
147147
return res;
148148
}
149149
}
150-
150+
151+
public Optional<String> getLastSyncResourceVersion(Optional<String> namespace) {
152+
return cache.getLastSyncResourceVersion(namespace);
153+
}
154+
151155
@SuppressWarnings("unused")
152156
public Optional<R> getCachedValue(ResourceID resourceID) {
153157
return get(resourceID);

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java

Lines changed: 11 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
package io.javaoperatorsdk.operator.processing.event.source.informer;
1717

18-
import java.util.LinkedHashMap;
1918
import java.util.Map;
2019
import java.util.Optional;
2120
import java.util.concurrent.ConcurrentHashMap;
@@ -46,53 +45,11 @@
4645
*/
4746
public class TemporaryResourceCache<T extends HasMetadata> {
4847

49-
static class ExpirationCache<K> {
50-
private final LinkedHashMap<K, Long> cache;
51-
private final int ttlMs;
52-
53-
public ExpirationCache(int maxEntries, int ttlMs) {
54-
this.ttlMs = ttlMs;
55-
this.cache =
56-
new LinkedHashMap<>() {
57-
@Override
58-
protected boolean removeEldestEntry(Map.Entry<K, Long> eldest) {
59-
return size() > maxEntries;
60-
}
61-
};
62-
}
63-
64-
public void add(K key) {
65-
clean();
66-
cache.putIfAbsent(key, System.currentTimeMillis());
67-
}
68-
69-
public boolean contains(K key) {
70-
clean();
71-
return cache.get(key) != null;
72-
}
73-
74-
void clean() {
75-
if (!cache.isEmpty()) {
76-
long currentTimeMillis = System.currentTimeMillis();
77-
var iter = cache.entrySet().iterator();
78-
// the order will already be from oldest to newest, clean a fixed number of entries to
79-
// amortize the cost amongst multiple calls
80-
for (int i = 0; i < 10 && iter.hasNext(); i++) {
81-
var entry = iter.next();
82-
if (currentTimeMillis - entry.getValue() > ttlMs) {
83-
iter.remove();
84-
}
85-
}
86-
}
87-
}
88-
}
89-
9048
private static final Logger log = LoggerFactory.getLogger(TemporaryResourceCache.class);
9149

9250
private final Map<ResourceID, T> cache = new ConcurrentHashMap<>();
9351

9452
// keep up to the last million deletions for up to 10 minutes
95-
private final ExpirationCache<String> tombstones = new ExpirationCache<>(1000000, 1200000);
9653
private final ManagedInformerEventSource<T, ?, ?> managedInformerEventSource;
9754
private final boolean parseResourceVersions;
9855

@@ -104,7 +61,6 @@ public TemporaryResourceCache(
10461
}
10562

10663
public synchronized void onDeleteEvent(T resource, boolean unknownState) {
107-
tombstones.add(resource.getMetadata().getUid());
10864
onEvent(resource, unknownState);
10965
}
11066

@@ -131,17 +87,21 @@ public synchronized void putAddedResource(T newResource) {
13187
*/
13288
public synchronized void putResource(T newResource, String previousResourceVersion) {
13389
var resourceId = ResourceID.fromResource(newResource);
90+
13491
var cachedResource = managedInformerEventSource.get(resourceId).orElse(null);
13592

13693
boolean moveAhead = false;
94+
String latest = managedInformerEventSource.getLastSyncResourceVersion(resourceId.getNamespace()).orElse(null);
95+
if (latest != null && latest > newResource.getMetadata().getResourceVersion()) {
96+
log.debug(
97+
"Won't move backwards for {}: resourceVersion {}, latest {}",
98+
resourceId,
99+
newResource.getMetadata().getResourceVersion(),
100+
latest);
101+
return;
102+
}
103+
137104
if (previousResourceVersion == null && cachedResource == null) {
138-
if (tombstones.contains(newResource.getMetadata().getUid())) {
139-
log.debug(
140-
"Won't resurrect uid {} for resource id: {}",
141-
newResource.getMetadata().getUid(),
142-
resourceId);
143-
return;
144-
}
145105
// we can skip further checks as this is a simple add and there's no previous entry to
146106
// consider
147107
moveAhead = true;

0 commit comments

Comments
 (0)