From 155fb3d60a829fec9cb78d791b7c7f72187a051c Mon Sep 17 00:00:00 2001 From: Samuel Bussmann Date: Thu, 21 Sep 2023 14:11:15 +0200 Subject: [PATCH 1/8] Reapply: PR #3170 Fixes #3169 : Track event listeners keyed by type to allow earlier event firing veto #3170 --- .../core/events/EventListenerWrapper.java | 5 +- .../impl/events/CacheEventDispatcherImpl.java | 99 +++++++++++-------- .../impl/events/EventDispatchTask.java | 10 +- 3 files changed, 67 insertions(+), 47 deletions(-) diff --git a/ehcache-core/src/main/java/org/ehcache/core/events/EventListenerWrapper.java b/ehcache-core/src/main/java/org/ehcache/core/events/EventListenerWrapper.java index 2076b68d81..ff71ab6526 100644 --- a/ehcache-core/src/main/java/org/ehcache/core/events/EventListenerWrapper.java +++ b/ehcache-core/src/main/java/org/ehcache/core/events/EventListenerWrapper.java @@ -23,6 +23,7 @@ import org.ehcache.event.EventType; import java.util.EnumSet; +import java.util.Set; /** * Internal wrapper for {@link CacheEventListener} and their configuration. @@ -86,8 +87,8 @@ public void onEvent(CacheEvent event) { return listener; } - public boolean isForEventType(EventType type) { - return forEvents.contains(type); + public Set getEventTypes() { + return forEvents; } public boolean isOrdered() { diff --git a/ehcache-impl/src/main/java/org/ehcache/impl/events/CacheEventDispatcherImpl.java b/ehcache-impl/src/main/java/org/ehcache/impl/events/CacheEventDispatcherImpl.java index 283f074797..a7ef7c32e4 100644 --- a/ehcache-impl/src/main/java/org/ehcache/impl/events/CacheEventDispatcherImpl.java +++ b/ehcache-impl/src/main/java/org/ehcache/impl/events/CacheEventDispatcherImpl.java @@ -34,11 +34,21 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collection; +import java.util.EnumMap; import java.util.EnumSet; import java.util.List; +import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.stream.Stream; + +import static java.util.Collections.unmodifiableMap; +import static java.util.EnumSet.allOf; +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toMap; +import static java.util.stream.Stream.concat; /** * Per-cache component that manages cache event listener registrations, and provides event delivery based on desired @@ -55,10 +65,12 @@ public class CacheEventDispatcherImpl implements CacheEventDispatcher> syncListenersList = new CopyOnWriteArrayList<>(); - private final List> aSyncListenersList = new CopyOnWriteArrayList<>(); + + private final Map>> syncListenersList = unmodifiableMap(allOf(EventType.class).stream() + .collect(toMap(identity(), t -> new CopyOnWriteArrayList<>(), (a, b) -> { throw new AssertionError(); }, () -> new EnumMap<>(EventType.class)))); + private final Map>> asyncListenersList = unmodifiableMap(allOf(EventType.class).stream() + .collect(toMap(identity(), t -> new CopyOnWriteArrayList<>(), (a, b) -> { throw new AssertionError(); }, () -> new EnumMap<>(EventType.class)))); + private final StoreEventListener eventListener = new StoreListener(); private volatile Cache listenerSource; @@ -94,69 +106,76 @@ public void registerCacheEventListener(CacheEventListener * @param wrapper the listener wrapper to register */ private synchronized void registerCacheEventListener(EventListenerWrapper wrapper) { - if(aSyncListenersList.contains(wrapper) || syncListenersList.contains(wrapper)) { + + if(allListeners().anyMatch(wrapper::equals)) { throw new IllegalStateException("Cache Event Listener already registered: " + wrapper.getListener()); } - if (wrapper.isOrdered() && orderedListenerCount++ == 0) { + boolean firstListener = !allListeners().findAny().isPresent(); + + if (wrapper.isOrdered() && (firstListener || allListeners().noneMatch(EventListenerWrapper::isOrdered))) { storeEventSource.setEventOrdering(true); } switch (wrapper.getFiringMode()) { case ASYNCHRONOUS: - aSyncListenersList.add(wrapper); + wrapper.getEventTypes().forEach(type -> asyncListenersList.get(type).add(wrapper)); break; case SYNCHRONOUS: - if (syncListenersList.isEmpty()) { + if (!syncListeners().findAny().isPresent()) { storeEventSource.setSynchronous(true); } - syncListenersList.add(wrapper); + wrapper.getEventTypes().forEach(type -> syncListenersList.get(type).add(wrapper)); break; default: throw new AssertionError("Unhandled EventFiring value: " + wrapper.getFiringMode()); } - if (listenersCount++ == 0) { + if (firstListener) { storeEventSource.addEventListener(eventListener); } } + private Stream> allListeners() { + return concat(asyncListeners(), syncListeners()); + } + + private Stream> syncListeners() { + return syncListenersList.values().stream().flatMap(Collection::stream); + } + + private Stream> asyncListeners() { + return asyncListenersList.values().stream().flatMap(Collection::stream); + } + /** * {@inheritDoc} */ @Override - public void deregisterCacheEventListener(CacheEventListener listener) { + public synchronized void deregisterCacheEventListener(CacheEventListener listener) { EventListenerWrapper wrapper = new EventListenerWrapper<>(listener); - if (!removeWrapperFromList(wrapper, aSyncListenersList)) { - if (!removeWrapperFromList(wrapper, syncListenersList)) { - throw new IllegalStateException("Unknown cache event listener: " + listener); - } + boolean removed = Stream.of(asyncListenersList, syncListenersList) + .flatMap(list -> list.values().stream()) + .map(list -> list.remove(wrapper)) + .reduce((a, b) -> a || b).orElse(false); + + if (!removed) { + throw new IllegalStateException("Unknown cache event listener: " + listener); } - } - /** - * Synchronized to make sure listener removal is atomic - * - * @param wrapper the listener wrapper to unregister - * @param listenersList the listener list to remove from - */ - private synchronized boolean removeWrapperFromList(EventListenerWrapper wrapper, List> listenersList) { - int index = listenersList.indexOf(wrapper); - if (index != -1) { - EventListenerWrapper containedWrapper = listenersList.remove(index); - if(containedWrapper.isOrdered() && --orderedListenerCount == 0) { + if (!allListeners().findAny().isPresent()) { + storeEventSource.setSynchronous(false); + storeEventSource.setEventOrdering(false); + storeEventSource.removeEventListener(eventListener); + } else { + if (allListeners().noneMatch(EventListenerWrapper::isOrdered)) { storeEventSource.setEventOrdering(false); } - if (--listenersCount == 0) { - storeEventSource.removeEventListener(eventListener); - } - if (syncListenersList.isEmpty()) { + if (!syncListeners().findAny().isPresent()) { storeEventSource.setSynchronous(false); } - return true; } - return false; } /** @@ -167,8 +186,8 @@ public synchronized void shutdown() { storeEventSource.removeEventListener(eventListener); storeEventSource.setEventOrdering(false); storeEventSource.setSynchronous(false); - syncListenersList.clear(); - aSyncListenersList.clear(); + syncListenersList.values().forEach(Collection::clear); + asyncListenersList.values().forEach(Collection::clear); unOrderedExectuor.shutdown(); orderedExecutor.shutdown(); } @@ -188,11 +207,13 @@ void onEvent(CacheEvent event) { } else { executor = unOrderedExectuor; } - if (!aSyncListenersList.isEmpty()) { - executor.submit(new EventDispatchTask<>(event, aSyncListenersList)); + List> asyncTargets = asyncListenersList.get(event.getType()); + if (!asyncTargets.isEmpty()) { + executor.submit(new EventDispatchTask<>(event, asyncTargets)); } - if (!syncListenersList.isEmpty()) { - Future future = executor.submit(new EventDispatchTask<>(event, syncListenersList)); + List> syncTargets = syncListenersList.get(event.getType()); + if (!syncTargets.isEmpty()) { + Future future = executor.submit(new EventDispatchTask<>(event, syncTargets)); try { future.get(); } catch (Exception e) { diff --git a/ehcache-impl/src/main/java/org/ehcache/impl/events/EventDispatchTask.java b/ehcache-impl/src/main/java/org/ehcache/impl/events/EventDispatchTask.java index fbdcbdf88d..42ad6f8bb0 100644 --- a/ehcache-impl/src/main/java/org/ehcache/impl/events/EventDispatchTask.java +++ b/ehcache-impl/src/main/java/org/ehcache/impl/events/EventDispatchTask.java @@ -40,12 +40,10 @@ class EventDispatchTask implements Runnable { @Override public void run() { for(EventListenerWrapper listenerWrapper : listenerWrappers) { - if (listenerWrapper.isForEventType(cacheEvent.getType())) { - try { - listenerWrapper.onEvent(cacheEvent); - } catch (Exception e) { - LOGGER.warn(listenerWrapper.getListener() + " Failed to fire Event due to ", e); - } + try { + listenerWrapper.onEvent(cacheEvent); + } catch (Exception e) { + LOGGER.warn(listenerWrapper.getListener() + " Failed to fire Event due to ", e); } } } From 14ec4e19d54bcad215f405abc074e239b2b51e3f Mon Sep 17 00:00:00 2001 From: Samuel Bussmann Date: Tue, 26 Sep 2023 16:22:43 +0200 Subject: [PATCH 2/8] Fixes #3169: Register only those CacheEvents with corresponding listener --- .../client/internal/store/ClusteredStore.java | 6 +++++ .../java/org/ehcache/event/EventType.java | 10 ++++++- .../core/events/NullStoreEventDispatcher.java | 5 ++++ .../spi/store/events/StoreEventListener.java | 14 ++++++++++ .../spi/store/events/StoreEventSource.java | 5 ++++ .../impl/events/CacheEventDispatcherImpl.java | 22 +++++++++++++++- .../events/AbstractStoreEventDispatcher.java | 26 ++++++++++++++++++- .../FudgingInvocationScopedEventSink.java | 5 ++-- .../events/InvocationScopedEventSink.java | 26 +++++++++++-------- .../ThreadLocalStoreEventDispatcher.java | 2 +- .../impl/internal/store/basic/NopStore.java | 4 +++ .../FudgingInvocationScopedEventSinkTest.java | 9 ++++++- .../events/InvocationScopedEventSinkTest.java | 5 +++- .../events/TestStoreEventDispatcher.java | 5 ++++ .../DefaultStoreEventDispatcherTest.java | 1 + .../xa/internal/StoreEventSourceWrapper.java | 5 ++++ 16 files changed, 131 insertions(+), 19 deletions(-) diff --git a/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/store/ClusteredStore.java b/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/store/ClusteredStore.java index c3a7ae9967..4a24c27de8 100644 --- a/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/store/ClusteredStore.java +++ b/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/store/ClusteredStore.java @@ -1016,6 +1016,12 @@ public synchronized void removeEventListener(StoreEventListener eventListe } delegate.removeEventListener(eventListener); } + + @Override + public void listenerModified() { + delegate.listenerModified(); + } + @Override public void addEventFilter(StoreEventFilter eventFilter) { delegate.addEventFilter(eventFilter); diff --git a/ehcache-api/src/main/java/org/ehcache/event/EventType.java b/ehcache-api/src/main/java/org/ehcache/event/EventType.java index d8286717b1..4cd5855878 100644 --- a/ehcache-api/src/main/java/org/ehcache/event/EventType.java +++ b/ehcache-api/src/main/java/org/ehcache/event/EventType.java @@ -16,6 +16,9 @@ package org.ehcache.event; +import java.util.EnumSet; +import java.util.Set; + /** * The different event types. */ @@ -44,6 +47,11 @@ public enum EventType { /** * Represents an existing {@link org.ehcache.Cache.Entry cache entry} being updated for a given key */ - UPDATED, + UPDATED; + + private static final Set ALL_EVENT_TYPES = EnumSet.allOf(EventType.class); + public static Set allAsSet() { + return ALL_EVENT_TYPES; + } } diff --git a/ehcache-core/src/main/java/org/ehcache/core/events/NullStoreEventDispatcher.java b/ehcache-core/src/main/java/org/ehcache/core/events/NullStoreEventDispatcher.java index ff26a97fd7..ab83841cb1 100644 --- a/ehcache-core/src/main/java/org/ehcache/core/events/NullStoreEventDispatcher.java +++ b/ehcache-core/src/main/java/org/ehcache/core/events/NullStoreEventDispatcher.java @@ -87,6 +87,11 @@ public void removeEventListener(StoreEventListener eventListener) { // Do nothing } + @Override + public void listenerModified() { + // Do nothing + } + @Override public void addEventFilter(StoreEventFilter eventFilter) { // Do nothing diff --git a/ehcache-core/src/main/java/org/ehcache/core/spi/store/events/StoreEventListener.java b/ehcache-core/src/main/java/org/ehcache/core/spi/store/events/StoreEventListener.java index ee3af6ee0b..ca5c66dd06 100644 --- a/ehcache-core/src/main/java/org/ehcache/core/spi/store/events/StoreEventListener.java +++ b/ehcache-core/src/main/java/org/ehcache/core/spi/store/events/StoreEventListener.java @@ -16,7 +16,10 @@ package org.ehcache.core.spi.store.events; +import java.util.Set; + import org.ehcache.core.events.StoreEventDispatcher; +import org.ehcache.event.EventType; /** * Interface used to register on a {@link StoreEventSource} to get notified of events happening to mappings the @@ -36,4 +39,15 @@ public interface StoreEventListener { * @param event the actual {@link StoreEvent} */ void onEvent(StoreEvent event); + + /** + * Specify which Events this Listener is handling. + *

+ * Defaults return is all values of {@link EventType} + * + * @return Set of the {@link EventType} this listener handles. + */ + default Set getEventTypes() { + return EventType.allAsSet(); + } } diff --git a/ehcache-core/src/main/java/org/ehcache/core/spi/store/events/StoreEventSource.java b/ehcache-core/src/main/java/org/ehcache/core/spi/store/events/StoreEventSource.java index d49187d70b..0c4db7001d 100644 --- a/ehcache-core/src/main/java/org/ehcache/core/spi/store/events/StoreEventSource.java +++ b/ehcache-core/src/main/java/org/ehcache/core/spi/store/events/StoreEventSource.java @@ -58,4 +58,9 @@ public interface StoreEventSource { * @return {@code true} if ordering is on, {@code false} otherwise */ boolean isEventOrdering(); + + /** + * Indicates that a listener was modified + */ + void listenerModified(); } diff --git a/ehcache-impl/src/main/java/org/ehcache/impl/events/CacheEventDispatcherImpl.java b/ehcache-impl/src/main/java/org/ehcache/impl/events/CacheEventDispatcherImpl.java index a7ef7c32e4..67517d8bd4 100644 --- a/ehcache-impl/src/main/java/org/ehcache/impl/events/CacheEventDispatcherImpl.java +++ b/ehcache-impl/src/main/java/org/ehcache/impl/events/CacheEventDispatcherImpl.java @@ -39,6 +39,7 @@ import java.util.EnumSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -70,6 +71,7 @@ public class CacheEventDispatcherImpl implements CacheEventDispatcher new CopyOnWriteArrayList<>(), (a, b) -> { throw new AssertionError(); }, () -> new EnumMap<>(EventType.class)))); private final Map>> asyncListenersList = unmodifiableMap(allOf(EventType.class).stream() .collect(toMap(identity(), t -> new CopyOnWriteArrayList<>(), (a, b) -> { throw new AssertionError(); }, () -> new EnumMap<>(EventType.class)))); + private final Set registeredEventTypes = EnumSet.noneOf(EventType.class); private final StoreEventListener eventListener = new StoreListener(); @@ -117,6 +119,8 @@ private synchronized void registerCacheEventListener(EventListenerWrapper storeEventSource.setEventOrdering(true); } + registeredEventTypes.addAll(wrapper.getEventTypes()); // add EventType of new wrapper to list or relevant EntryTypes + switch (wrapper.getFiringMode()) { case ASYNCHRONOUS: wrapper.getEventTypes().forEach(type -> asyncListenersList.get(type).add(wrapper)); @@ -133,6 +137,8 @@ private synchronized void registerCacheEventListener(EventListenerWrapper if (firstListener) { storeEventSource.addEventListener(eventListener); + } else { + storeEventSource.listenerModified(); } } @@ -164,6 +170,8 @@ public synchronized void deregisterCacheEventListener(CacheEventListener newRegisteredEventTypes = EnumSet.noneOf(EventType.class); + allListeners().forEach(listener -> newRegisteredEventTypes.addAll(listener.getEventTypes())); + // drop irrelevant EventTypes + registeredEventTypes.retainAll(newRegisteredEventTypes); + } + /** * {@inheritDoc} */ @@ -264,8 +280,12 @@ public void onEvent(StoreEvent event) { throw new AssertionError("Unexpected StoreEvent value: " + event.getType()); } } - } + @Override + public Set getEventTypes() { + return registeredEventTypes; + } + } /** * {@inheritDoc} */ diff --git a/ehcache-impl/src/main/java/org/ehcache/impl/internal/events/AbstractStoreEventDispatcher.java b/ehcache-impl/src/main/java/org/ehcache/impl/internal/events/AbstractStoreEventDispatcher.java index f2bc22432d..ddbfd01021 100644 --- a/ehcache-impl/src/main/java/org/ehcache/impl/internal/events/AbstractStoreEventDispatcher.java +++ b/ehcache-impl/src/main/java/org/ehcache/impl/internal/events/AbstractStoreEventDispatcher.java @@ -20,7 +20,9 @@ import org.ehcache.core.events.StoreEventSink; import org.ehcache.core.spi.store.events.StoreEventFilter; import org.ehcache.core.spi.store.events.StoreEventListener; +import org.ehcache.event.EventType; +import java.util.EnumSet; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CopyOnWriteArraySet; @@ -77,6 +79,7 @@ public void evicted(Object key, Supplier value) { private final Set> filters = new CopyOnWriteArraySet<>(); private final Set> listeners = new CopyOnWriteArraySet<>(); private final BlockingQueue>[] orderedQueues; + private final Set relevantEventTypes = EnumSet.noneOf(EventType.class); private volatile boolean ordered = false; protected AbstractStoreEventDispatcher(int dispatcherConcurrency) { @@ -92,6 +95,16 @@ protected AbstractStoreEventDispatcher(int dispatcherConcurrency) { } } + private void computeRelevantEventTypes() { + // collect all EventTypes the listeners are interested in. + for (StoreEventListener listener : listeners) { + relevantEventTypes.addAll(listener.getEventTypes()); + } + if (relevantEventTypes.isEmpty()) { // mocks are empty -> handle all types + relevantEventTypes.addAll(EnumSet.allOf(EventType.class)); + } + } + protected Set> getListeners() { return listeners; } @@ -104,14 +117,25 @@ protected BlockingQueue>[] getOrderedQueues() { return orderedQueues; } + protected Set getRelevantEventTypes() { + return relevantEventTypes; + } + @Override public void addEventListener(StoreEventListener eventListener) { listeners.add(eventListener); + computeRelevantEventTypes(); // refresh } @Override public void removeEventListener(StoreEventListener eventListener) { listeners.remove(eventListener); + computeRelevantEventTypes(); // refresh + } + + @Override + public void listenerModified() { + computeRelevantEventTypes(); // refresh } @Override @@ -151,6 +175,6 @@ public void reset(StoreEventSink eventSink) { @Override public StoreEventSink eventSink() { - return new InvocationScopedEventSink<>(getFilters(), isEventOrdering(), getOrderedQueues(), getListeners()); + return new InvocationScopedEventSink<>(getFilters(), isEventOrdering(), getOrderedQueues(), getListeners(), getRelevantEventTypes()); } } diff --git a/ehcache-impl/src/main/java/org/ehcache/impl/internal/events/FudgingInvocationScopedEventSink.java b/ehcache-impl/src/main/java/org/ehcache/impl/internal/events/FudgingInvocationScopedEventSink.java index 5079c5ce22..737c70ffbd 100644 --- a/ehcache-impl/src/main/java/org/ehcache/impl/internal/events/FudgingInvocationScopedEventSink.java +++ b/ehcache-impl/src/main/java/org/ehcache/impl/internal/events/FudgingInvocationScopedEventSink.java @@ -40,8 +40,9 @@ class FudgingInvocationScopedEventSink extends InvocationScopedEventSink> filters, boolean ordered, BlockingQueue>[] orderedQueues, - Set> listeners) { - super(filters, ordered, orderedQueues, listeners); + Set> listeners, + Set relevantEventTypes) { + super(filters, ordered, orderedQueues, listeners, relevantEventTypes); } @Override diff --git a/ehcache-impl/src/main/java/org/ehcache/impl/internal/events/InvocationScopedEventSink.java b/ehcache-impl/src/main/java/org/ehcache/impl/internal/events/InvocationScopedEventSink.java index 21a961e38a..77dcf55ca1 100644 --- a/ehcache-impl/src/main/java/org/ehcache/impl/internal/events/InvocationScopedEventSink.java +++ b/ehcache-impl/src/main/java/org/ehcache/impl/internal/events/InvocationScopedEventSink.java @@ -16,9 +16,11 @@ package org.ehcache.impl.internal.events; -import org.ehcache.event.EventType; -import org.ehcache.core.spi.store.events.StoreEventFilter; -import org.ehcache.core.spi.store.events.StoreEventListener; +import static org.ehcache.impl.internal.events.StoreEvents.createEvent; +import static org.ehcache.impl.internal.events.StoreEvents.evictEvent; +import static org.ehcache.impl.internal.events.StoreEvents.expireEvent; +import static org.ehcache.impl.internal.events.StoreEvents.removeEvent; +import static org.ehcache.impl.internal.events.StoreEvents.updateEvent; import java.util.ArrayDeque; import java.util.Deque; @@ -27,11 +29,9 @@ import java.util.concurrent.BlockingQueue; import java.util.function.Supplier; -import static org.ehcache.impl.internal.events.StoreEvents.createEvent; -import static org.ehcache.impl.internal.events.StoreEvents.evictEvent; -import static org.ehcache.impl.internal.events.StoreEvents.expireEvent; -import static org.ehcache.impl.internal.events.StoreEvents.removeEvent; -import static org.ehcache.impl.internal.events.StoreEvents.updateEvent; +import org.ehcache.core.spi.store.events.StoreEventFilter; +import org.ehcache.core.spi.store.events.StoreEventListener; +import org.ehcache.event.EventType; /** * InvocationScopedEventSink @@ -44,13 +44,16 @@ class InvocationScopedEventSink implements CloseableStoreEventSink { private final Set> listeners; private final Deque> events = new ArrayDeque<>(4); + private final Set relevantEventTypes; + InvocationScopedEventSink(Set> filters, boolean ordered, - BlockingQueue>[] orderedQueues, - Set> listeners) { + BlockingQueue>[] orderedQueues, Set> listeners, + Set relevantEventTypes) { this.filters = filters; this.ordered = ordered; this.orderedQueues = orderedQueues; this.listeners = listeners; + this.relevantEventTypes = relevantEventTypes; } @Override @@ -98,7 +101,8 @@ protected boolean acceptEvent(EventType type, K key, V oldValue, V newValue) { return false; } } - return true; + // at least one listener is interested in this event + return relevantEventTypes.contains(type); } @Override diff --git a/ehcache-impl/src/main/java/org/ehcache/impl/internal/events/ThreadLocalStoreEventDispatcher.java b/ehcache-impl/src/main/java/org/ehcache/impl/internal/events/ThreadLocalStoreEventDispatcher.java index 63934f319b..a10025a6be 100644 --- a/ehcache-impl/src/main/java/org/ehcache/impl/internal/events/ThreadLocalStoreEventDispatcher.java +++ b/ehcache-impl/src/main/java/org/ehcache/impl/internal/events/ThreadLocalStoreEventDispatcher.java @@ -39,7 +39,7 @@ public StoreEventSink eventSink() { } else { StoreEventSink eventSink = tlEventSink.get(); if (eventSink == null) { - eventSink = new FudgingInvocationScopedEventSink<>(getFilters(), isEventOrdering(), getOrderedQueues(), getListeners()); + eventSink = new FudgingInvocationScopedEventSink<>(getFilters(), isEventOrdering(), getOrderedQueues(), getListeners(), getRelevantEventTypes()); tlEventSink.set(eventSink); usageDepth.set(0); } else { diff --git a/ehcache-impl/src/main/java/org/ehcache/impl/internal/store/basic/NopStore.java b/ehcache-impl/src/main/java/org/ehcache/impl/internal/store/basic/NopStore.java index 2abd943e44..b5ad67d01b 100644 --- a/ehcache-impl/src/main/java/org/ehcache/impl/internal/store/basic/NopStore.java +++ b/ehcache-impl/src/main/java/org/ehcache/impl/internal/store/basic/NopStore.java @@ -144,6 +144,10 @@ public void setSynchronous(boolean synchronous) throws IllegalArgumentException public boolean isEventOrdering() { return false; } + + @Override + public void listenerModified() { + } }; } diff --git a/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/FudgingInvocationScopedEventSinkTest.java b/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/FudgingInvocationScopedEventSinkTest.java index 42f7a93d78..ebe076289a 100644 --- a/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/FudgingInvocationScopedEventSinkTest.java +++ b/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/FudgingInvocationScopedEventSinkTest.java @@ -24,6 +24,7 @@ import org.junit.Test; import org.mockito.InOrder; +import java.util.EnumSet; import java.util.HashSet; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; @@ -53,7 +54,7 @@ public void setUp() { storeEventListeners.add(listener); @SuppressWarnings({"unchecked", "rawtypes"}) BlockingQueue>[] blockingQueues = new BlockingQueue[] { new ArrayBlockingQueue>(10) }; - eventSink = new FudgingInvocationScopedEventSink<>(new HashSet<>(), false, blockingQueues, storeEventListeners); + eventSink = new FudgingInvocationScopedEventSink<>(new HashSet<>(), false, blockingQueues, storeEventListeners, EnumSet.allOf(EventType.class)); } @Test @@ -63,6 +64,7 @@ public void testEvictedDifferentKeyNoImpact() { eventSink.close(); InOrder inOrder = inOrder(listener); + inOrder.verify(listener, times(2)).getEventTypes(); inOrder.verify(listener).onEvent(argThat(createdMatcher)); inOrder.verify(listener).onEvent(argThat(evictedMatcher)); verifyNoMoreInteractions(listener); @@ -75,6 +77,7 @@ public void testEvictedSameKeyAfterUpdateReplacesWithEvictCreate() { eventSink.close(); InOrder inOrder = inOrder(listener); + inOrder.verify(listener, times(3)).getEventTypes(); inOrder.verify(listener).onEvent(argThat(evictedMatcher)); inOrder.verify(listener).onEvent(argThat(createdMatcher)); verifyNoMoreInteractions(listener); @@ -88,6 +91,7 @@ public void testEvictedSameKeyAfterCreateFudgesExpiryToo() { eventSink.close(); InOrder inOrder = inOrder(listener); + inOrder.verify(listener, times(4)).getEventTypes(); inOrder.verify(listener).onEvent(argThat(evictedMatcher)); inOrder.verify(listener).onEvent(argThat(createdMatcher)); verifyNoMoreInteractions(listener); @@ -102,6 +106,7 @@ public void testEvictedSameKeyAfterUpdateReplacesWithEvictCreateEvenWithMultiple eventSink.close(); InOrder inOrder = inOrder(listener); + inOrder.verify(listener, times(5)).getEventTypes(); inOrder.verify(listener, times(3)).onEvent(argThat(evictedMatcher)); inOrder.verify(listener).onEvent(argThat(createdMatcher)); verifyNoMoreInteractions(listener); @@ -117,6 +122,7 @@ public void testEvictedSameKeyAfterCreateFudgesExpiryTooEvenWithMultipleEvictsIn eventSink.close(); InOrder inOrder = inOrder(listener); + inOrder.verify(listener, times(6)).getEventTypes(); inOrder.verify(listener, times(3)).onEvent(argThat(evictedMatcher)); inOrder.verify(listener).onEvent(argThat(createdMatcher)); verifyNoMoreInteractions(listener); @@ -130,6 +136,7 @@ public void testEvictedKeyDoesNotFudgeOlderEvents() { eventSink.close(); InOrder inOrder = inOrder(listener); + inOrder.verify(listener, times(3)).getEventTypes(); Matcher> updatedMatcher = eventType(EventType.UPDATED); inOrder.verify(listener).onEvent(argThat(updatedMatcher)); inOrder.verify(listener).onEvent(argThat(createdMatcher)); diff --git a/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/InvocationScopedEventSinkTest.java b/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/InvocationScopedEventSinkTest.java index d2b93ef39c..9f8f7d6159 100644 --- a/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/InvocationScopedEventSinkTest.java +++ b/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/InvocationScopedEventSinkTest.java @@ -30,6 +30,7 @@ import org.mockito.junit.MockitoRule; import java.util.Collections; +import java.util.EnumSet; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -39,6 +40,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.ehcache.impl.internal.store.offheap.AbstractOffHeapStoreTest.eventType; import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.hamcrest.MockitoHamcrest.argThat; @@ -66,7 +68,7 @@ public void setUp() { private InvocationScopedEventSink createEventSink(boolean ordered) { @SuppressWarnings("unchecked") BlockingQueue>[] queues = (BlockingQueue>[]) new BlockingQueue[] { blockingQueue }; - return new InvocationScopedEventSink<>(Collections.emptySet(), ordered, queues, storeEventListeners); + return new InvocationScopedEventSink<>(Collections.emptySet(), ordered, queues, storeEventListeners, EnumSet.allOf(EventType.class)); } @Test @@ -82,6 +84,7 @@ public void testReset() { eventSink.close(); InOrder inOrder = inOrder(listener); + inOrder.verify(listener, times(5)).getEventTypes(); Matcher> createdMatcher = eventType(EventType.CREATED); inOrder.verify(listener).onEvent(argThat(createdMatcher)); Matcher> updatedMatcher = eventType(EventType.UPDATED); diff --git a/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/TestStoreEventDispatcher.java b/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/TestStoreEventDispatcher.java index 51bc63a89e..2951a5d556 100644 --- a/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/TestStoreEventDispatcher.java +++ b/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/TestStoreEventDispatcher.java @@ -75,6 +75,11 @@ public void removeEventListener(StoreEventListener eventListener) { listeners.remove(eventListener); } + @Override + public void listenerModified() { + // No-op + } + @Override public void addEventFilter(StoreEventFilter eventFilter) { filters.add(eventFilter); diff --git a/ehcache-impl/src/test/java/org/ehcache/impl/store/DefaultStoreEventDispatcherTest.java b/ehcache-impl/src/test/java/org/ehcache/impl/store/DefaultStoreEventDispatcherTest.java index 30ca7801e7..d5cf7ddaba 100644 --- a/ehcache-impl/src/test/java/org/ehcache/impl/store/DefaultStoreEventDispatcherTest.java +++ b/ehcache-impl/src/test/java/org/ehcache/impl/store/DefaultStoreEventDispatcherTest.java @@ -107,6 +107,7 @@ public void testEventFiltering() { sink.created("new", "and shiny"); dispatcher.releaseEventSink(sink); + verify(listener).getEventTypes(); Matcher> matcher = eventOfType(EventType.CREATED); verify(listener).onEvent(argThat(matcher)); verifyNoMoreInteractions(listener); diff --git a/ehcache-transactions/src/common/java/org/ehcache/transactions/xa/internal/StoreEventSourceWrapper.java b/ehcache-transactions/src/common/java/org/ehcache/transactions/xa/internal/StoreEventSourceWrapper.java index cc8c0ef442..c82d382a4b 100644 --- a/ehcache-transactions/src/common/java/org/ehcache/transactions/xa/internal/StoreEventSourceWrapper.java +++ b/ehcache-transactions/src/common/java/org/ehcache/transactions/xa/internal/StoreEventSourceWrapper.java @@ -60,6 +60,11 @@ public void removeEventListener(StoreEventListener eventListener) { } } + @Override + public void listenerModified() { + underlying.listenerModified(); + } + @Override public void addEventFilter(final StoreEventFilter eventFilter) { underlying.addEventFilter((type, key, oldValue, newValue) -> { From 5e4c4f31e5a38b44e84557bdfe0eeaf798984463 Mon Sep 17 00:00:00 2001 From: Samuel Bussmann Date: Tue, 26 Sep 2023 16:23:33 +0200 Subject: [PATCH 3/8] Fixes #3169: Inline unordered/asynch listener execution to avoid submit delay --- .../impl/events/CacheEventDispatcherImpl.java | 37 +++++++++++-------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/ehcache-impl/src/main/java/org/ehcache/impl/events/CacheEventDispatcherImpl.java b/ehcache-impl/src/main/java/org/ehcache/impl/events/CacheEventDispatcherImpl.java index 67517d8bd4..19fb387be0 100644 --- a/ehcache-impl/src/main/java/org/ehcache/impl/events/CacheEventDispatcherImpl.java +++ b/ehcache-impl/src/main/java/org/ehcache/impl/events/CacheEventDispatcherImpl.java @@ -217,23 +217,30 @@ public synchronized void setListenerSource(Cache source) { } void onEvent(CacheEvent event) { - ExecutorService executor; - if (storeEventSource.isEventOrdering()) { - executor = orderedExecutor; - } else { - executor = unOrderedExectuor; - } List> asyncTargets = asyncListenersList.get(event.getType()); - if (!asyncTargets.isEmpty()) { - executor.submit(new EventDispatchTask<>(event, asyncTargets)); - } List> syncTargets = syncListenersList.get(event.getType()); - if (!syncTargets.isEmpty()) { - Future future = executor.submit(new EventDispatchTask<>(event, syncTargets)); - try { - future.get(); - } catch (Exception e) { - LOGGER.error("Exception received as result from synchronous listeners", e); + if (storeEventSource.isEventOrdering()) { + if (!asyncTargets.isEmpty()) { + orderedExecutor.submit(new EventDispatchTask<>(event, asyncTargets)); + } + if (!syncTargets.isEmpty()) { + Future future = orderedExecutor.submit(new EventDispatchTask<>(event, syncTargets)); + try { + future.get(); + } catch (Exception e) { + LOGGER.error("Exception received as result from synchronous listeners", e); + } + } + } else { + if (!asyncTargets.isEmpty()) { + unOrderedExectuor.submit(new EventDispatchTask<>(event, asyncTargets)); + } + if (!syncTargets.isEmpty()) { + try { + new EventDispatchTask<>(event, syncTargets).run(); + } catch (Exception e) { + LOGGER.error("Exception received as result from synchronous listeners", e); + } } } } From 3699f017c88e5539825f10cd34fc0034e6bf0994 Mon Sep 17 00:00:00 2001 From: Samuel Bussmann Date: Wed, 17 Jul 2024 20:40:49 +0200 Subject: [PATCH 4/8] Fixes #3169: clean-ups & test fixes --- .../impl/events/CacheEventDispatcherImpl.java | 2 +- .../FudgingInvocationScopedEventSinkTest.java | 40 +++++++-------- .../events/InvocationScopedEventSinkTest.java | 50 +++++++++---------- 3 files changed, 43 insertions(+), 49 deletions(-) diff --git a/ehcache-impl/src/main/java/org/ehcache/impl/events/CacheEventDispatcherImpl.java b/ehcache-impl/src/main/java/org/ehcache/impl/events/CacheEventDispatcherImpl.java index 19fb387be0..c1fb1c4678 100644 --- a/ehcache-impl/src/main/java/org/ehcache/impl/events/CacheEventDispatcherImpl.java +++ b/ehcache-impl/src/main/java/org/ehcache/impl/events/CacheEventDispatcherImpl.java @@ -119,7 +119,7 @@ private synchronized void registerCacheEventListener(EventListenerWrapper storeEventSource.setEventOrdering(true); } - registeredEventTypes.addAll(wrapper.getEventTypes()); // add EventType of new wrapper to list or relevant EntryTypes + registeredEventTypes.addAll(wrapper.getEventTypes()); // add EventType of new wrapper to list of relevant EntryTypes switch (wrapper.getFiringMode()) { case ASYNCHRONOUS: diff --git a/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/FudgingInvocationScopedEventSinkTest.java b/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/FudgingInvocationScopedEventSinkTest.java index ebe076289a..44d8c767cb 100644 --- a/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/FudgingInvocationScopedEventSinkTest.java +++ b/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/FudgingInvocationScopedEventSinkTest.java @@ -16,13 +16,12 @@ package org.ehcache.impl.internal.events; -import org.ehcache.core.spi.store.events.StoreEvent; -import org.ehcache.event.EventType; -import org.ehcache.core.spi.store.events.StoreEventListener; -import org.hamcrest.Matcher; -import org.junit.Before; -import org.junit.Test; -import org.mockito.InOrder; +import static org.ehcache.impl.internal.store.offheap.AbstractOffHeapStoreTest.eventType; +import static org.ehcache.test.MockitoUtil.uncheckedGenericMock; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.hamcrest.MockitoHamcrest.argThat; import java.util.EnumSet; import java.util.HashSet; @@ -30,12 +29,13 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import static org.ehcache.impl.internal.store.offheap.AbstractOffHeapStoreTest.eventType; -import static org.ehcache.test.MockitoUtil.uncheckedGenericMock; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.hamcrest.MockitoHamcrest.argThat; +import org.ehcache.core.spi.store.events.StoreEvent; +import org.ehcache.core.spi.store.events.StoreEventListener; +import org.ehcache.event.EventType; +import org.hamcrest.Matcher; +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; /** * FudgingInvocationScopedEventSinkTest @@ -52,9 +52,11 @@ public void setUp() { Set> storeEventListeners = new HashSet<>(); listener = uncheckedGenericMock(StoreEventListener.class); storeEventListeners.add(listener); - @SuppressWarnings({"unchecked", "rawtypes"}) - BlockingQueue>[] blockingQueues = new BlockingQueue[] { new ArrayBlockingQueue>(10) }; - eventSink = new FudgingInvocationScopedEventSink<>(new HashSet<>(), false, blockingQueues, storeEventListeners, EnumSet.allOf(EventType.class)); + @SuppressWarnings({ "unchecked", "rawtypes" }) + BlockingQueue>[] blockingQueues = new BlockingQueue[] { + new ArrayBlockingQueue>(10) }; + eventSink = new FudgingInvocationScopedEventSink<>(new HashSet<>(), false, blockingQueues, storeEventListeners, + EnumSet.allOf(EventType.class)); } @Test @@ -64,7 +66,6 @@ public void testEvictedDifferentKeyNoImpact() { eventSink.close(); InOrder inOrder = inOrder(listener); - inOrder.verify(listener, times(2)).getEventTypes(); inOrder.verify(listener).onEvent(argThat(createdMatcher)); inOrder.verify(listener).onEvent(argThat(evictedMatcher)); verifyNoMoreInteractions(listener); @@ -77,7 +78,6 @@ public void testEvictedSameKeyAfterUpdateReplacesWithEvictCreate() { eventSink.close(); InOrder inOrder = inOrder(listener); - inOrder.verify(listener, times(3)).getEventTypes(); inOrder.verify(listener).onEvent(argThat(evictedMatcher)); inOrder.verify(listener).onEvent(argThat(createdMatcher)); verifyNoMoreInteractions(listener); @@ -91,7 +91,6 @@ public void testEvictedSameKeyAfterCreateFudgesExpiryToo() { eventSink.close(); InOrder inOrder = inOrder(listener); - inOrder.verify(listener, times(4)).getEventTypes(); inOrder.verify(listener).onEvent(argThat(evictedMatcher)); inOrder.verify(listener).onEvent(argThat(createdMatcher)); verifyNoMoreInteractions(listener); @@ -106,7 +105,6 @@ public void testEvictedSameKeyAfterUpdateReplacesWithEvictCreateEvenWithMultiple eventSink.close(); InOrder inOrder = inOrder(listener); - inOrder.verify(listener, times(5)).getEventTypes(); inOrder.verify(listener, times(3)).onEvent(argThat(evictedMatcher)); inOrder.verify(listener).onEvent(argThat(createdMatcher)); verifyNoMoreInteractions(listener); @@ -122,7 +120,6 @@ public void testEvictedSameKeyAfterCreateFudgesExpiryTooEvenWithMultipleEvictsIn eventSink.close(); InOrder inOrder = inOrder(listener); - inOrder.verify(listener, times(6)).getEventTypes(); inOrder.verify(listener, times(3)).onEvent(argThat(evictedMatcher)); inOrder.verify(listener).onEvent(argThat(createdMatcher)); verifyNoMoreInteractions(listener); @@ -136,7 +133,6 @@ public void testEvictedKeyDoesNotFudgeOlderEvents() { eventSink.close(); InOrder inOrder = inOrder(listener); - inOrder.verify(listener, times(3)).getEventTypes(); Matcher> updatedMatcher = eventType(EventType.UPDATED); inOrder.verify(listener).onEvent(argThat(updatedMatcher)); inOrder.verify(listener).onEvent(argThat(createdMatcher)); diff --git a/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/InvocationScopedEventSinkTest.java b/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/InvocationScopedEventSinkTest.java index 9f8f7d6159..875cee7e63 100644 --- a/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/InvocationScopedEventSinkTest.java +++ b/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/InvocationScopedEventSinkTest.java @@ -16,8 +16,21 @@ package org.ehcache.impl.internal.events; +import static org.assertj.core.api.Assertions.assertThat; +import static org.ehcache.impl.internal.store.offheap.AbstractOffHeapStoreTest.eventType; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.hamcrest.MockitoHamcrest.argThat; + +import java.util.Collections; +import java.util.EnumSet; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.IntStream; + import org.ehcache.core.spi.store.events.StoreEvent; -import org.ehcache.core.spi.store.events.StoreEventFilter; import org.ehcache.core.spi.store.events.StoreEventListener; import org.ehcache.event.EventType; import org.hamcrest.Matcher; @@ -29,21 +42,6 @@ import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; -import java.util.Collections; -import java.util.EnumSet; -import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.IntStream; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.ehcache.impl.internal.store.offheap.AbstractOffHeapStoreTest.eventType; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.hamcrest.MockitoHamcrest.argThat; - /** * InvocationScopedEventSinkTest */ @@ -67,7 +65,8 @@ public void setUp() { private InvocationScopedEventSink createEventSink(boolean ordered) { @SuppressWarnings("unchecked") - BlockingQueue>[] queues = (BlockingQueue>[]) new BlockingQueue[] { blockingQueue }; + BlockingQueue>[] queues = (BlockingQueue>[])new BlockingQueue[] { + blockingQueue }; return new InvocationScopedEventSink<>(Collections.emptySet(), ordered, queues, storeEventListeners, EnumSet.allOf(EventType.class)); } @@ -84,7 +83,6 @@ public void testReset() { eventSink.close(); InOrder inOrder = inOrder(listener); - inOrder.verify(listener, times(5)).getEventTypes(); Matcher> createdMatcher = eventType(EventType.CREATED); inOrder.verify(listener).onEvent(argThat(createdMatcher)); Matcher> updatedMatcher = eventType(EventType.UPDATED); @@ -95,8 +93,8 @@ public void testReset() { } /** - * Make sure an interrupted sink sets the interrupted flag and keep both event queues in the state - * as of before the event that was interrupted. + * Make sure an interrupted sink sets the interrupted flag and keep both event queues in the state as of before the event that was + * interrupted. * * @throws InterruptedException */ @@ -116,7 +114,7 @@ public void testInterruption() throws InterruptedException { }); t.start(); - while(blockingQueue.remainingCapacity() != 0) { + while (blockingQueue.remainingCapacity() != 0) { System.out.println(blockingQueue.remainingCapacity()); } @@ -126,11 +124,11 @@ public void testInterruption() throws InterruptedException { assertThat(wasInterrupted).isTrue(); assertThat(blockingQueue).hasSize(10); IntStream.range(0, 10).forEachOrdered(i -> { - try { - assertThat(blockingQueue.take().getEvent().getKey()).isEqualTo("k" + i); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + try { + assertThat(blockingQueue.take().getEvent().getKey()).isEqualTo("k" + i); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } }); assertThat(eventSink.getEvents()).hasSize(10); assertThat(eventSink.getEvents().getLast().getEvent().getKey()).isEqualTo("k9"); From f709f5586e3cdee74ce517da3d2f644c236d39ac Mon Sep 17 00:00:00 2001 From: Samuel Bussmann Date: Thu, 18 Jul 2024 10:14:26 +0200 Subject: [PATCH 5/8] Fixes #3169: fix compile error after rebase --- .../store/shared/store/StorePartition.java | 114 ++++++++++-------- 1 file changed, 67 insertions(+), 47 deletions(-) diff --git a/ehcache-impl/src/main/java/org/ehcache/impl/internal/store/shared/store/StorePartition.java b/ehcache-impl/src/main/java/org/ehcache/impl/internal/store/shared/store/StorePartition.java index 2c4ee1cdc2..d957339985 100644 --- a/ehcache-impl/src/main/java/org/ehcache/impl/internal/store/shared/store/StorePartition.java +++ b/ehcache-impl/src/main/java/org/ehcache/impl/internal/store/shared/store/StorePartition.java @@ -16,6 +16,22 @@ package org.ehcache.impl.internal.store.shared.store; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import javax.annotation.Nonnull; + import org.ehcache.Cache; import org.ehcache.config.ResourceType; import org.ehcache.core.CacheConfigurationChangeListener; @@ -33,21 +49,6 @@ import org.ehcache.spi.resilience.StoreAccessException; import org.slf4j.Logger; -import javax.annotation.Nonnull; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.BiFunction; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.function.Supplier; -import java.util.stream.Collectors; - public class StorePartition extends AbstractPartition, CompositeValue>> implements Store { private final Logger logger = EhcachePrefixLoggerFactory.getLogger(StorePartition.class); @@ -57,14 +58,14 @@ public class StorePartition extends AbstractPartition valueType; public StorePartition(ResourceType type, int id, Class keyType, Class valueType, - Store, CompositeValue> store) { + Store, CompositeValue> store) { super(type, id, store); this.keyType = keyType; this.valueType = valueType; } protected K checkKey(K keyObject) { - if (keyType.isInstance(Objects.requireNonNull((Object) keyObject))) { + if (keyType.isInstance(Objects.requireNonNull((Object)keyObject))) { return keyObject; } else { throw new ClassCastException("Invalid key type, expected : " + keyType.getName() + " but was : " + keyObject.getClass().getName()); @@ -72,10 +73,11 @@ protected K checkKey(K keyObject) { } protected V checkValue(V valueObject) { - if (valueType.isInstance(Objects.requireNonNull((Object) valueObject))) { + if (valueType.isInstance(Objects.requireNonNull((Object)valueObject))) { return valueObject; } else { - throw new ClassCastException("Invalid value type, expected : " + valueType.getName() + " but was : " + valueObject.getClass().getName()); + throw new ClassCastException( + "Invalid value type, expected : " + valueType.getName() + " but was : " + valueObject.getClass().getName()); } } @@ -149,13 +151,16 @@ public ReplaceStatus replace(K key, V oldValue, V newValue) throws StoreAccessEx @Override public ValueHolder getAndCompute(K key, BiFunction mappingFunction) throws StoreAccessException { checkKey(key); - return decode(shared().getAndCompute(composite(key), (k, v) -> composite(mappingFunction.apply(k.getValue(), v != null ? v.getValue() : null)))); + return decode( + shared().getAndCompute(composite(key), (k, v) -> composite(mappingFunction.apply(k.getValue(), v != null ? v.getValue() : null)))); } @Override - public ValueHolder computeAndGet(K key, BiFunction mappingFunction, Supplier replaceEqual, Supplier invokeWriter) throws StoreAccessException { + public ValueHolder computeAndGet(K key, BiFunction mappingFunction, Supplier replaceEqual, + Supplier invokeWriter) throws StoreAccessException { checkKey(key); - return decode(shared().computeAndGet(composite(key), (k, v) -> composite(mappingFunction.apply(k.getValue(), v == null ? null : v.getValue())), replaceEqual, invokeWriter)); + return decode(shared().computeAndGet(composite(key), + (k, v) -> composite(mappingFunction.apply(k.getValue(), v == null ? null : v.getValue())), replaceEqual, invokeWriter)); } @Override @@ -166,30 +171,31 @@ public ValueHolder computeIfAbsent(K key, Function ma @Override public Map> bulkCompute(Set keys, - Function>, Iterable>> remappingFunction) throws StoreAccessException { + Function>, Iterable>> remappingFunction) + throws StoreAccessException { return bulkCompute(keys, remappingFunction, SUPPLY_TRUE); } @Override public Map> bulkCompute(Set keys, - Function>, Iterable>> remappingFunction, - Supplier replaceEqual) throws StoreAccessException { + Function>, Iterable>> remappingFunction, + Supplier replaceEqual) throws StoreAccessException { Map, ValueHolder>> results; if (remappingFunction instanceof Ehcache.PutAllFunction) { - Ehcache.PutAllFunction putAllFunction = (Ehcache.PutAllFunction) remappingFunction; + Ehcache.PutAllFunction putAllFunction = (Ehcache.PutAllFunction)remappingFunction; Ehcache.PutAllFunction, CompositeValue> compositePutAllFunction = new Ehcache.PutAllFunction, CompositeValue>() { @Override public Map, CompositeValue> getEntriesToRemap() { - return putAllFunction.getEntriesToRemap().entrySet().stream().collect(Collectors.toMap(e -> composite(e.getKey()), e -> composite(e.getValue()))); + return putAllFunction.getEntriesToRemap().entrySet().stream() + .collect(Collectors.toMap(e -> composite(e.getKey()), e -> composite(e.getValue()))); } @Override public boolean newValueAlreadyExpired(CompositeValue key, CompositeValue oldValue, CompositeValue newValue) { - return putAllFunction.newValueAlreadyExpired(key.getValue(), - oldValue == null ? null : oldValue.getValue(), - newValue == null ? null : newValue.getValue()); + return putAllFunction.newValueAlreadyExpired(key.getValue(), oldValue == null ? null : oldValue.getValue(), + newValue == null ? null : newValue.getValue()); } @Override @@ -205,7 +211,7 @@ public AtomicInteger getActualUpdateCount() { results = shared().bulkCompute(compositeSet(keys), compositePutAllFunction); } else if (remappingFunction instanceof Ehcache.RemoveAllFunction) { - Ehcache.RemoveAllFunction removeAllFunction = (Ehcache.RemoveAllFunction) remappingFunction; + Ehcache.RemoveAllFunction removeAllFunction = (Ehcache.RemoveAllFunction)remappingFunction; Ehcache.RemoveAllFunction, CompositeValue> compositeRemappingFunction = removeAllFunction::getActualRemoveCount; results = shared().bulkCompute(compositeSet(keys), compositeRemappingFunction); } else { @@ -219,9 +225,10 @@ public AtomicInteger getActualUpdateCount() { @Override public Map> bulkComputeIfAbsent(Set keys, - Function, Iterable>> mappingFunction) throws StoreAccessException { - Map, ValueHolder>> results = - shared().bulkComputeIfAbsent(compositeSet(keys), new BulkComputeIfAbsentMappingFunction<>(id(), keyType, valueType, mappingFunction)); + Function, Iterable>> mappingFunction) + throws StoreAccessException { + Map, ValueHolder>> results = shared().bulkComputeIfAbsent(compositeSet(keys), + new BulkComputeIfAbsentMappingFunction<>(id(), keyType, valueType, mappingFunction)); Map> decodedResults = new HashMap<>(); results.forEach((k, v) -> decodedResults.put(k.getValue(), decode(v))); return decodedResults; @@ -254,8 +261,8 @@ public void addEventListener(StoreEventListener eventListener) { public void onEvent(StoreEvent, CompositeValue> event) { if (event.getKey().getStoreId() == id()) { eventListener.onEvent(new StoreEventImpl<>(event.getType(), event.getKey().getValue(), - event.getOldValue() == null ? null : event.getOldValue().getValue(), - event.getNewValue() == null ? null : event.getNewValue().getValue())); + event.getOldValue() == null ? null : event.getOldValue().getValue(), + event.getNewValue() == null ? null : event.getNewValue().getValue())); } } @@ -274,7 +281,7 @@ public boolean equals(Object obj) { @SuppressWarnings("unchecked") @Override public void removeEventListener(StoreEventListener eventListener) { - storeEventSource.removeEventListener((StoreEventListener, CompositeValue>) eventListener); + storeEventSource.removeEventListener((StoreEventListener, CompositeValue>)eventListener); } @Override @@ -282,7 +289,7 @@ public void addEventFilter(StoreEventFilter eventFilter) { storeEventSource.addEventFilter((type, key, oldValue, newValue) -> { if (key.getStoreId() == id()) { return eventFilter.acceptEvent(type, key.getValue(), oldValue == null ? null : oldValue.getValue(), - newValue == null ? null : newValue.getValue()); + newValue == null ? null : newValue.getValue()); } else { return true; } @@ -303,6 +310,11 @@ public void setSynchronous(boolean synchronous) throws IllegalArgumentException public boolean isEventOrdering() { return storeEventSource.isEventOrdering(); } + + @Override + public void listenerModified() { + storeEventSource.listenerModified(); + } }; } @@ -312,6 +324,7 @@ public Iterator>> iterator() { Iterator, ValueHolder>>> iterator = shared().iterator(); return new Iterator>>() { private Cache.Entry> prefetched = advance(); + @Override public boolean hasNext() { return prefetched != null; @@ -338,6 +351,7 @@ private Cache.Entry> advance() { public K getKey() { return next.getKey().getValue(); } + @Override public ValueHolder getValue() { return decode(next.getValue()); @@ -376,7 +390,7 @@ public T get() { } } - public static abstract class BaseRemappingFunction { + public static abstract class BaseRemappingFunction { protected final int storeId; protected final Class keyType; protected final Class valueType; @@ -395,21 +409,25 @@ protected void keyCheck(Object keyObject) { protected void valueCheck(Object valueObject) { if (!valueType.isInstance(Objects.requireNonNull(valueObject))) { - throw new ClassCastException("Invalid value type, expected : " + valueType.getName() + " but was : " + valueObject.getClass().getName()); + throw new ClassCastException( + "Invalid value type, expected : " + valueType.getName() + " but was : " + valueObject.getClass().getName()); } } } - public static class BulkComputeMappingFunction extends BaseRemappingFunction implements Function, ? extends CompositeValue>>, Iterable, ? extends CompositeValue>>> { + public static class BulkComputeMappingFunction extends BaseRemappingFunction implements + Function, ? extends CompositeValue>>, Iterable, ? extends CompositeValue>>> { private final Function>, Iterable>> function; - BulkComputeMappingFunction(int storeId, Class keyType, Class valueType, Function>, Iterable>> function) { + BulkComputeMappingFunction(int storeId, Class keyType, Class valueType, + Function>, Iterable>> function) { super(storeId, keyType, valueType); this.function = function; } @Override - public Iterable, ? extends CompositeValue>> apply(Iterable, ? extends CompositeValue>> entries) { + public Iterable, ? extends CompositeValue>> apply( + Iterable, ? extends CompositeValue>> entries) { Map decodedEntries = new HashMap<>(); entries.forEach(entry -> { K key = entry.getKey().getValue(); @@ -433,16 +451,19 @@ public static class BulkComputeMappingFunction extends BaseRemappingFuncti } } - public static class BulkComputeIfAbsentMappingFunction extends BaseRemappingFunction implements Function>, Iterable, ? extends CompositeValue>>> { + public static class BulkComputeIfAbsentMappingFunction extends BaseRemappingFunction implements + Function>, Iterable, ? extends CompositeValue>>> { private final Function, Iterable>> function; - BulkComputeIfAbsentMappingFunction(int storeId, Class keyType, Class valueType, Function, Iterable>> function) { + BulkComputeIfAbsentMappingFunction(int storeId, Class keyType, Class valueType, + Function, Iterable>> function) { super(storeId, keyType, valueType); this.function = function; } @Override - public Iterable, ? extends CompositeValue>> apply(Iterable> compositeValues) { + public Iterable, ? extends CompositeValue>> apply( + Iterable> compositeValues) { List keys = new ArrayList<>(); compositeValues.forEach(k -> { keyCheck(k.getValue()); @@ -464,4 +485,3 @@ public static class BulkComputeIfAbsentMappingFunction extends BaseRemappi } } } - From 7f52e7bac549ec8bab5420eaa4732b7a7e9113fc Mon Sep 17 00:00:00 2001 From: Samuel Bussmann Date: Thu, 25 Jul 2024 13:53:11 +0200 Subject: [PATCH 6/8] Fixes #3169: signal listenerModified on listener removal --- .../java/org/ehcache/impl/events/CacheEventDispatcherImpl.java | 1 + 1 file changed, 1 insertion(+) diff --git a/ehcache-impl/src/main/java/org/ehcache/impl/events/CacheEventDispatcherImpl.java b/ehcache-impl/src/main/java/org/ehcache/impl/events/CacheEventDispatcherImpl.java index c1fb1c4678..8fc7d40c00 100644 --- a/ehcache-impl/src/main/java/org/ehcache/impl/events/CacheEventDispatcherImpl.java +++ b/ehcache-impl/src/main/java/org/ehcache/impl/events/CacheEventDispatcherImpl.java @@ -183,6 +183,7 @@ public synchronized void deregisterCacheEventListener(CacheEventListener Date: Thu, 25 Jul 2024 14:02:27 +0200 Subject: [PATCH 7/8] Fixes #3169: add additional Tests for new EventType filtering --- .../events/CacheEventDispatcherImplTest.java | 46 +++++++++- .../events/InvocationScopedEventSinkTest.java | 18 ++++ .../DefaultStoreEventDispatcherTest.java | 91 +++++++++++++++++++ 3 files changed, 153 insertions(+), 2 deletions(-) diff --git a/ehcache-impl/src/test/java/org/ehcache/impl/events/CacheEventDispatcherImplTest.java b/ehcache-impl/src/test/java/org/ehcache/impl/events/CacheEventDispatcherImplTest.java index 976ee8f63c..41a0d65cd3 100644 --- a/ehcache-impl/src/test/java/org/ehcache/impl/events/CacheEventDispatcherImplTest.java +++ b/ehcache-impl/src/test/java/org/ehcache/impl/events/CacheEventDispatcherImplTest.java @@ -26,10 +26,9 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; import org.mockito.InOrder; import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import java.util.EnumSet; import java.util.concurrent.CountDownLatch; @@ -37,12 +36,15 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -119,6 +121,37 @@ public void testListenerRegistrationEnablesStoreEvents() { verify(storeEventDispatcher).addEventListener(any(StoreEventListener.class)); } + @Test + public void testMultipleListenerRegistrationNotifiesStoreEvent() { + eventService.registerCacheEventListener(listener, EventOrdering.UNORDERED, EventFiring.ASYNCHRONOUS, EnumSet.of(EventType.UPDATED)); + CacheEventListener otherLsnr = mock(CacheEventListener.class); + eventService.registerCacheEventListener(otherLsnr, EventOrdering.ORDERED, EventFiring.SYNCHRONOUS, EnumSet.of(EventType.REMOVED)); + verify(storeEventDispatcher, times(1)).listenerModified(); + } + + @Test + public void testStoreListenerRegisteredEventTypesUpdated() { + eventService.registerCacheEventListener(listener, EventOrdering.UNORDERED, EventFiring.ASYNCHRONOUS, EnumSet.of(EventType.UPDATED)); + CacheEventListener otherLsnr = mock(CacheEventListener.class); + eventService.registerCacheEventListener(otherLsnr, EventOrdering.ORDERED, EventFiring.SYNCHRONOUS, EnumSet.of(EventType.REMOVED)); + verify(storeEventDispatcher, times(1)).listenerModified(); + + ArgumentCaptor captor = ArgumentCaptor.forClass(StoreEventListener.class); + verify(storeEventDispatcher).addEventListener(captor.capture()); + StoreEventListener storeListener = captor.getValue(); + + assertTrue(storeListener.getEventTypes().contains(EventType.UPDATED)); + assertTrue(storeListener.getEventTypes().contains(EventType.REMOVED)); + assertEquals(2, storeListener.getEventTypes().size()); + + eventService.deregisterCacheEventListener(listener); + + verify(storeEventDispatcher, times(2)).listenerModified(); + + assertTrue(storeListener.getEventTypes().contains(EventType.REMOVED)); + assertEquals(1, storeListener.getEventTypes().size()); + } + @Test public void testOrderedListenerRegistrationTogglesOrderedOnStoreEvents() { eventService.registerCacheEventListener(listener, EventOrdering.ORDERED, EventFiring.ASYNCHRONOUS, EnumSet.allOf(EventType.class)); @@ -166,6 +199,15 @@ public void testDeregisterNotLastListenerDoesNotStopStoreEvents() { verify(storeEventDispatcher, never()).removeEventListener(any(StoreEventListener.class)); } + @Test + public void testDeregisterNotLastListenerTriggersListernerModified() { + eventService.registerCacheEventListener(listener, EventOrdering.UNORDERED, EventFiring.SYNCHRONOUS, EnumSet.of(EventType.EVICTED)); + eventService.registerCacheEventListener(mock(CacheEventListener.class), EventOrdering.UNORDERED, EventFiring.SYNCHRONOUS, + EnumSet.of(EventType.EVICTED)); + eventService.deregisterCacheEventListener(listener); + verify(storeEventDispatcher, times(2)).listenerModified(); + } + @Test public void testShutdownDisableStoreEventsAndShutsDownOrderedExecutor() { eventService.registerCacheEventListener(listener, diff --git a/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/InvocationScopedEventSinkTest.java b/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/InvocationScopedEventSinkTest.java index 875cee7e63..3efa3f75fd 100644 --- a/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/InvocationScopedEventSinkTest.java +++ b/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/InvocationScopedEventSinkTest.java @@ -70,6 +70,13 @@ private InvocationScopedEventSink createEventSink(boolean ordere return new InvocationScopedEventSink<>(Collections.emptySet(), ordered, queues, storeEventListeners, EnumSet.allOf(EventType.class)); } + private InvocationScopedEventSink createEventSink(boolean ordered, EventType eventTypes) { + @SuppressWarnings("unchecked") + BlockingQueue>[] queues = (BlockingQueue>[])new BlockingQueue[] { + blockingQueue }; + return new InvocationScopedEventSink<>(Collections.emptySet(), ordered, queues, storeEventListeners, EnumSet.of(eventTypes)); + } + @Test public void testReset() { eventSink = createEventSink(false); @@ -133,4 +140,15 @@ public void testInterruption() throws InterruptedException { assertThat(eventSink.getEvents()).hasSize(10); assertThat(eventSink.getEvents().getLast().getEvent().getKey()).isEqualTo("k9"); } + + @Test + public void testAcceptEventFiltersIrrelevantEventTypes() { + eventSink = createEventSink(false, EventType.REMOVED); + eventSink.created("k", "v"); + + assertThat(eventSink.getEvents()).isEmpty(); + + eventSink.removed("k", () -> "v"); + assertThat(eventSink.getEvents()).hasSize(1); + } } diff --git a/ehcache-impl/src/test/java/org/ehcache/impl/store/DefaultStoreEventDispatcherTest.java b/ehcache-impl/src/test/java/org/ehcache/impl/store/DefaultStoreEventDispatcherTest.java index d5cf7ddaba..e9168e0d49 100644 --- a/ehcache-impl/src/test/java/org/ehcache/impl/store/DefaultStoreEventDispatcherTest.java +++ b/ehcache-impl/src/test/java/org/ehcache/impl/store/DefaultStoreEventDispatcherTest.java @@ -28,16 +28,21 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.EnumSet; import java.util.Random; +import java.util.Set; import java.util.concurrent.CountDownLatch; import static org.ehcache.impl.internal.util.Matchers.eventOfType; import static org.ehcache.test.MockitoUtil.uncheckedGenericMock; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @@ -76,6 +81,92 @@ public void testListenerNotifiedUnordered() { verify(listener).onEvent(any(StoreEvent.class)); } + @Test + @SuppressWarnings("unchecked") + public void testListenerNotifiedForRelevantEventsOnly() { + DefaultStoreEventDispatcher dispatcher = new DefaultStoreEventDispatcher<>(1); + StoreEventListener listener = uncheckedGenericMock(StoreEventListener.class); + when(listener.getEventTypes()).thenReturn(EnumSet.of(EventType.REMOVED)); + + dispatcher.addEventListener(listener); + + StoreEventSink sink = dispatcher.eventSink(); + sink.created("test", "test"); + dispatcher.releaseEventSink(sink); + + verify(listener, never()).onEvent(any(StoreEvent.class)); + + sink.removed("test", () -> "test"); + dispatcher.releaseEventSink(sink); + + verify(listener, times(1)).onEvent(any(StoreEvent.class)); + } + + @Test + @SuppressWarnings("unchecked") + public void testRelevantEventTypesUpdatedOnAddEventListener() { + DefaultStoreEventDispatcher dispatcher = new DefaultStoreEventDispatcher<>(1); + StoreEventListener listener = uncheckedGenericMock(StoreEventListener.class); + dispatcher.addEventListener(listener); + + verify(listener, times(1)).getEventTypes(); + + StoreEventListener listener2 = uncheckedGenericMock(StoreEventListener.class); + + dispatcher.addEventListener(listener2); + + verify(listener, times(2)).getEventTypes(); + verify(listener2, times(1)).getEventTypes(); + } + + @Test + @SuppressWarnings("unchecked") + public void testRelevantEventTypesUpdatedOnRemoveEventListener() { + DefaultStoreEventDispatcher dispatcher = new DefaultStoreEventDispatcher<>(1); + StoreEventListener listener = uncheckedGenericMock(StoreEventListener.class); + dispatcher.addEventListener(listener); + StoreEventListener listener2 = uncheckedGenericMock(StoreEventListener.class); + dispatcher.addEventListener(listener2); + verify(listener2, times(1)).getEventTypes(); + + dispatcher.removeEventListener(listener); + + verify(listener2, times(2)).getEventTypes(); + } + + @Test + @SuppressWarnings("unchecked") + public void testRelevantEventTypesUpdatedOnListenerModifiedSignal() { + DefaultStoreEventDispatcher dispatcher = new DefaultStoreEventDispatcher<>(1); + StoreEventListener listener = uncheckedGenericMock(StoreEventListener.class); + dispatcher.addEventListener(listener); + verify(listener, times(1)).getEventTypes(); + + dispatcher.listenerModified(); + + verify(listener, times(2)).getEventTypes(); + } + + @Test + @SuppressWarnings("unchecked") + public void testRelevantEventTypesContainAllEventsForMocks() { + DefaultStoreEventDispatcherMock dispatcher = new DefaultStoreEventDispatcherMock<>(1); + StoreEventListener listener = uncheckedGenericMock(StoreEventListener.class); + dispatcher.addEventListener(listener); + + assertTrue(dispatcher.spyRelevantEventTypes().containsAll(EventType.allAsSet())); + } + + class DefaultStoreEventDispatcherMock extends DefaultStoreEventDispatcher { + public DefaultStoreEventDispatcherMock(int dispatcherConcurrency) { + super(dispatcherConcurrency); + } + + public Set spyRelevantEventTypes() { + return super.getRelevantEventTypes(); + } + } + @Test @SuppressWarnings("unchecked") public void testListenerNotifiedOrdered() { From 88a167de471186df90a6913cd6ed6c8f561e84fa Mon Sep 17 00:00:00 2001 From: Samuel Bussmann Date: Thu, 25 Jul 2024 14:23:39 +0200 Subject: [PATCH 8/8] Fixes #3169: clean-up formatting --- .../events/InvocationScopedEventSink.java | 21 ++-- .../store/shared/store/StorePartition.java | 109 ++++++++---------- .../FudgingInvocationScopedEventSinkTest.java | 34 +++--- .../events/InvocationScopedEventSinkTest.java | 50 ++++---- 4 files changed, 97 insertions(+), 117 deletions(-) diff --git a/ehcache-impl/src/main/java/org/ehcache/impl/internal/events/InvocationScopedEventSink.java b/ehcache-impl/src/main/java/org/ehcache/impl/internal/events/InvocationScopedEventSink.java index 77dcf55ca1..bc90444333 100644 --- a/ehcache-impl/src/main/java/org/ehcache/impl/internal/events/InvocationScopedEventSink.java +++ b/ehcache-impl/src/main/java/org/ehcache/impl/internal/events/InvocationScopedEventSink.java @@ -16,11 +16,9 @@ package org.ehcache.impl.internal.events; -import static org.ehcache.impl.internal.events.StoreEvents.createEvent; -import static org.ehcache.impl.internal.events.StoreEvents.evictEvent; -import static org.ehcache.impl.internal.events.StoreEvents.expireEvent; -import static org.ehcache.impl.internal.events.StoreEvents.removeEvent; -import static org.ehcache.impl.internal.events.StoreEvents.updateEvent; +import org.ehcache.event.EventType; +import org.ehcache.core.spi.store.events.StoreEventFilter; +import org.ehcache.core.spi.store.events.StoreEventListener; import java.util.ArrayDeque; import java.util.Deque; @@ -29,9 +27,11 @@ import java.util.concurrent.BlockingQueue; import java.util.function.Supplier; -import org.ehcache.core.spi.store.events.StoreEventFilter; -import org.ehcache.core.spi.store.events.StoreEventListener; -import org.ehcache.event.EventType; +import static org.ehcache.impl.internal.events.StoreEvents.createEvent; +import static org.ehcache.impl.internal.events.StoreEvents.evictEvent; +import static org.ehcache.impl.internal.events.StoreEvents.expireEvent; +import static org.ehcache.impl.internal.events.StoreEvents.removeEvent; +import static org.ehcache.impl.internal.events.StoreEvents.updateEvent; /** * InvocationScopedEventSink @@ -43,12 +43,11 @@ class InvocationScopedEventSink implements CloseableStoreEventSink { private final BlockingQueue>[] orderedQueues; private final Set> listeners; private final Deque> events = new ArrayDeque<>(4); - private final Set relevantEventTypes; InvocationScopedEventSink(Set> filters, boolean ordered, - BlockingQueue>[] orderedQueues, Set> listeners, - Set relevantEventTypes) { + BlockingQueue>[] orderedQueues, Set> listeners, + Set relevantEventTypes) { this.filters = filters; this.ordered = ordered; this.orderedQueues = orderedQueues; diff --git a/ehcache-impl/src/main/java/org/ehcache/impl/internal/store/shared/store/StorePartition.java b/ehcache-impl/src/main/java/org/ehcache/impl/internal/store/shared/store/StorePartition.java index d957339985..3ef366f473 100644 --- a/ehcache-impl/src/main/java/org/ehcache/impl/internal/store/shared/store/StorePartition.java +++ b/ehcache-impl/src/main/java/org/ehcache/impl/internal/store/shared/store/StorePartition.java @@ -16,22 +16,6 @@ package org.ehcache.impl.internal.store.shared.store; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.BiFunction; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.function.Supplier; -import java.util.stream.Collectors; - -import javax.annotation.Nonnull; - import org.ehcache.Cache; import org.ehcache.config.ResourceType; import org.ehcache.core.CacheConfigurationChangeListener; @@ -49,6 +33,21 @@ import org.ehcache.spi.resilience.StoreAccessException; import org.slf4j.Logger; +import javax.annotation.Nonnull; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + public class StorePartition extends AbstractPartition, CompositeValue>> implements Store { private final Logger logger = EhcachePrefixLoggerFactory.getLogger(StorePartition.class); @@ -58,14 +57,14 @@ public class StorePartition extends AbstractPartition valueType; public StorePartition(ResourceType type, int id, Class keyType, Class valueType, - Store, CompositeValue> store) { + Store, CompositeValue> store) { super(type, id, store); this.keyType = keyType; this.valueType = valueType; } protected K checkKey(K keyObject) { - if (keyType.isInstance(Objects.requireNonNull((Object)keyObject))) { + if (keyType.isInstance(Objects.requireNonNull((Object) keyObject))) { return keyObject; } else { throw new ClassCastException("Invalid key type, expected : " + keyType.getName() + " but was : " + keyObject.getClass().getName()); @@ -73,11 +72,10 @@ protected K checkKey(K keyObject) { } protected V checkValue(V valueObject) { - if (valueType.isInstance(Objects.requireNonNull((Object)valueObject))) { + if (valueType.isInstance(Objects.requireNonNull((Object) valueObject))) { return valueObject; } else { - throw new ClassCastException( - "Invalid value type, expected : " + valueType.getName() + " but was : " + valueObject.getClass().getName()); + throw new ClassCastException("Invalid value type, expected : " + valueType.getName() + " but was : " + valueObject.getClass().getName()); } } @@ -151,16 +149,13 @@ public ReplaceStatus replace(K key, V oldValue, V newValue) throws StoreAccessEx @Override public ValueHolder getAndCompute(K key, BiFunction mappingFunction) throws StoreAccessException { checkKey(key); - return decode( - shared().getAndCompute(composite(key), (k, v) -> composite(mappingFunction.apply(k.getValue(), v != null ? v.getValue() : null)))); + return decode(shared().getAndCompute(composite(key), (k, v) -> composite(mappingFunction.apply(k.getValue(), v != null ? v.getValue() : null)))); } @Override - public ValueHolder computeAndGet(K key, BiFunction mappingFunction, Supplier replaceEqual, - Supplier invokeWriter) throws StoreAccessException { + public ValueHolder computeAndGet(K key, BiFunction mappingFunction, Supplier replaceEqual, Supplier invokeWriter) throws StoreAccessException { checkKey(key); - return decode(shared().computeAndGet(composite(key), - (k, v) -> composite(mappingFunction.apply(k.getValue(), v == null ? null : v.getValue())), replaceEqual, invokeWriter)); + return decode(shared().computeAndGet(composite(key), (k, v) -> composite(mappingFunction.apply(k.getValue(), v == null ? null : v.getValue())), replaceEqual, invokeWriter)); } @Override @@ -171,31 +166,30 @@ public ValueHolder computeIfAbsent(K key, Function ma @Override public Map> bulkCompute(Set keys, - Function>, Iterable>> remappingFunction) - throws StoreAccessException { + Function>, Iterable>> remappingFunction) throws StoreAccessException { return bulkCompute(keys, remappingFunction, SUPPLY_TRUE); } @Override public Map> bulkCompute(Set keys, - Function>, Iterable>> remappingFunction, - Supplier replaceEqual) throws StoreAccessException { + Function>, Iterable>> remappingFunction, + Supplier replaceEqual) throws StoreAccessException { Map, ValueHolder>> results; if (remappingFunction instanceof Ehcache.PutAllFunction) { - Ehcache.PutAllFunction putAllFunction = (Ehcache.PutAllFunction)remappingFunction; + Ehcache.PutAllFunction putAllFunction = (Ehcache.PutAllFunction) remappingFunction; Ehcache.PutAllFunction, CompositeValue> compositePutAllFunction = new Ehcache.PutAllFunction, CompositeValue>() { @Override public Map, CompositeValue> getEntriesToRemap() { - return putAllFunction.getEntriesToRemap().entrySet().stream() - .collect(Collectors.toMap(e -> composite(e.getKey()), e -> composite(e.getValue()))); + return putAllFunction.getEntriesToRemap().entrySet().stream().collect(Collectors.toMap(e -> composite(e.getKey()), e -> composite(e.getValue()))); } @Override public boolean newValueAlreadyExpired(CompositeValue key, CompositeValue oldValue, CompositeValue newValue) { - return putAllFunction.newValueAlreadyExpired(key.getValue(), oldValue == null ? null : oldValue.getValue(), - newValue == null ? null : newValue.getValue()); + return putAllFunction.newValueAlreadyExpired(key.getValue(), + oldValue == null ? null : oldValue.getValue(), + newValue == null ? null : newValue.getValue()); } @Override @@ -211,7 +205,7 @@ public AtomicInteger getActualUpdateCount() { results = shared().bulkCompute(compositeSet(keys), compositePutAllFunction); } else if (remappingFunction instanceof Ehcache.RemoveAllFunction) { - Ehcache.RemoveAllFunction removeAllFunction = (Ehcache.RemoveAllFunction)remappingFunction; + Ehcache.RemoveAllFunction removeAllFunction = (Ehcache.RemoveAllFunction) remappingFunction; Ehcache.RemoveAllFunction, CompositeValue> compositeRemappingFunction = removeAllFunction::getActualRemoveCount; results = shared().bulkCompute(compositeSet(keys), compositeRemappingFunction); } else { @@ -225,10 +219,9 @@ public AtomicInteger getActualUpdateCount() { @Override public Map> bulkComputeIfAbsent(Set keys, - Function, Iterable>> mappingFunction) - throws StoreAccessException { - Map, ValueHolder>> results = shared().bulkComputeIfAbsent(compositeSet(keys), - new BulkComputeIfAbsentMappingFunction<>(id(), keyType, valueType, mappingFunction)); + Function, Iterable>> mappingFunction) throws StoreAccessException { + Map, ValueHolder>> results = + shared().bulkComputeIfAbsent(compositeSet(keys), new BulkComputeIfAbsentMappingFunction<>(id(), keyType, valueType, mappingFunction)); Map> decodedResults = new HashMap<>(); results.forEach((k, v) -> decodedResults.put(k.getValue(), decode(v))); return decodedResults; @@ -261,8 +254,8 @@ public void addEventListener(StoreEventListener eventListener) { public void onEvent(StoreEvent, CompositeValue> event) { if (event.getKey().getStoreId() == id()) { eventListener.onEvent(new StoreEventImpl<>(event.getType(), event.getKey().getValue(), - event.getOldValue() == null ? null : event.getOldValue().getValue(), - event.getNewValue() == null ? null : event.getNewValue().getValue())); + event.getOldValue() == null ? null : event.getOldValue().getValue(), + event.getNewValue() == null ? null : event.getNewValue().getValue())); } } @@ -281,7 +274,7 @@ public boolean equals(Object obj) { @SuppressWarnings("unchecked") @Override public void removeEventListener(StoreEventListener eventListener) { - storeEventSource.removeEventListener((StoreEventListener, CompositeValue>)eventListener); + storeEventSource.removeEventListener((StoreEventListener, CompositeValue>) eventListener); } @Override @@ -289,7 +282,7 @@ public void addEventFilter(StoreEventFilter eventFilter) { storeEventSource.addEventFilter((type, key, oldValue, newValue) -> { if (key.getStoreId() == id()) { return eventFilter.acceptEvent(type, key.getValue(), oldValue == null ? null : oldValue.getValue(), - newValue == null ? null : newValue.getValue()); + newValue == null ? null : newValue.getValue()); } else { return true; } @@ -324,7 +317,6 @@ public Iterator>> iterator() { Iterator, ValueHolder>>> iterator = shared().iterator(); return new Iterator>>() { private Cache.Entry> prefetched = advance(); - @Override public boolean hasNext() { return prefetched != null; @@ -351,7 +343,6 @@ private Cache.Entry> advance() { public K getKey() { return next.getKey().getValue(); } - @Override public ValueHolder getValue() { return decode(next.getValue()); @@ -390,7 +381,7 @@ public T get() { } } - public static abstract class BaseRemappingFunction { + public static abstract class BaseRemappingFunction { protected final int storeId; protected final Class keyType; protected final Class valueType; @@ -409,25 +400,21 @@ protected void keyCheck(Object keyObject) { protected void valueCheck(Object valueObject) { if (!valueType.isInstance(Objects.requireNonNull(valueObject))) { - throw new ClassCastException( - "Invalid value type, expected : " + valueType.getName() + " but was : " + valueObject.getClass().getName()); + throw new ClassCastException("Invalid value type, expected : " + valueType.getName() + " but was : " + valueObject.getClass().getName()); } } } - public static class BulkComputeMappingFunction extends BaseRemappingFunction implements - Function, ? extends CompositeValue>>, Iterable, ? extends CompositeValue>>> { + public static class BulkComputeMappingFunction extends BaseRemappingFunction implements Function, ? extends CompositeValue>>, Iterable, ? extends CompositeValue>>> { private final Function>, Iterable>> function; - BulkComputeMappingFunction(int storeId, Class keyType, Class valueType, - Function>, Iterable>> function) { + BulkComputeMappingFunction(int storeId, Class keyType, Class valueType, Function>, Iterable>> function) { super(storeId, keyType, valueType); this.function = function; } @Override - public Iterable, ? extends CompositeValue>> apply( - Iterable, ? extends CompositeValue>> entries) { + public Iterable, ? extends CompositeValue>> apply(Iterable, ? extends CompositeValue>> entries) { Map decodedEntries = new HashMap<>(); entries.forEach(entry -> { K key = entry.getKey().getValue(); @@ -451,19 +438,16 @@ public static class BulkComputeMappingFunction extends BaseRemappingFuncti } } - public static class BulkComputeIfAbsentMappingFunction extends BaseRemappingFunction implements - Function>, Iterable, ? extends CompositeValue>>> { + public static class BulkComputeIfAbsentMappingFunction extends BaseRemappingFunction implements Function>, Iterable, ? extends CompositeValue>>> { private final Function, Iterable>> function; - BulkComputeIfAbsentMappingFunction(int storeId, Class keyType, Class valueType, - Function, Iterable>> function) { + BulkComputeIfAbsentMappingFunction(int storeId, Class keyType, Class valueType, Function, Iterable>> function) { super(storeId, keyType, valueType); this.function = function; } @Override - public Iterable, ? extends CompositeValue>> apply( - Iterable> compositeValues) { + public Iterable, ? extends CompositeValue>> apply(Iterable> compositeValues) { List keys = new ArrayList<>(); compositeValues.forEach(k -> { keyCheck(k.getValue()); @@ -485,3 +469,4 @@ public static class BulkComputeIfAbsentMappingFunction extends BaseRemappi } } } + diff --git a/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/FudgingInvocationScopedEventSinkTest.java b/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/FudgingInvocationScopedEventSinkTest.java index 44d8c767cb..fa9df96802 100644 --- a/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/FudgingInvocationScopedEventSinkTest.java +++ b/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/FudgingInvocationScopedEventSinkTest.java @@ -16,12 +16,13 @@ package org.ehcache.impl.internal.events; -import static org.ehcache.impl.internal.store.offheap.AbstractOffHeapStoreTest.eventType; -import static org.ehcache.test.MockitoUtil.uncheckedGenericMock; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.hamcrest.MockitoHamcrest.argThat; +import org.ehcache.core.spi.store.events.StoreEvent; +import org.ehcache.event.EventType; +import org.ehcache.core.spi.store.events.StoreEventListener; +import org.hamcrest.Matcher; +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; import java.util.EnumSet; import java.util.HashSet; @@ -29,13 +30,12 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import org.ehcache.core.spi.store.events.StoreEvent; -import org.ehcache.core.spi.store.events.StoreEventListener; -import org.ehcache.event.EventType; -import org.hamcrest.Matcher; -import org.junit.Before; -import org.junit.Test; -import org.mockito.InOrder; +import static org.ehcache.impl.internal.store.offheap.AbstractOffHeapStoreTest.eventType; +import static org.ehcache.test.MockitoUtil.uncheckedGenericMock; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.hamcrest.MockitoHamcrest.argThat; /** * FudgingInvocationScopedEventSinkTest @@ -52,11 +52,9 @@ public void setUp() { Set> storeEventListeners = new HashSet<>(); listener = uncheckedGenericMock(StoreEventListener.class); storeEventListeners.add(listener); - @SuppressWarnings({ "unchecked", "rawtypes" }) - BlockingQueue>[] blockingQueues = new BlockingQueue[] { - new ArrayBlockingQueue>(10) }; - eventSink = new FudgingInvocationScopedEventSink<>(new HashSet<>(), false, blockingQueues, storeEventListeners, - EnumSet.allOf(EventType.class)); + @SuppressWarnings({"unchecked", "rawtypes"}) + BlockingQueue>[] blockingQueues = new BlockingQueue[] { new ArrayBlockingQueue>(10) }; + eventSink = new FudgingInvocationScopedEventSink<>(new HashSet<>(), false, blockingQueues, storeEventListeners, EnumSet.allOf(EventType.class)); } @Test diff --git a/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/InvocationScopedEventSinkTest.java b/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/InvocationScopedEventSinkTest.java index 3efa3f75fd..11fda18bfa 100644 --- a/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/InvocationScopedEventSinkTest.java +++ b/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/InvocationScopedEventSinkTest.java @@ -16,20 +16,6 @@ package org.ehcache.impl.internal.events; -import static org.assertj.core.api.Assertions.assertThat; -import static org.ehcache.impl.internal.store.offheap.AbstractOffHeapStoreTest.eventType; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.hamcrest.MockitoHamcrest.argThat; - -import java.util.Collections; -import java.util.EnumSet; -import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.IntStream; - import org.ehcache.core.spi.store.events.StoreEvent; import org.ehcache.core.spi.store.events.StoreEventListener; import org.ehcache.event.EventType; @@ -42,6 +28,20 @@ import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; +import java.util.Collections; +import java.util.EnumSet; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.IntStream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.ehcache.impl.internal.store.offheap.AbstractOffHeapStoreTest.eventType; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.hamcrest.MockitoHamcrest.argThat; + /** * InvocationScopedEventSinkTest */ @@ -65,15 +65,13 @@ public void setUp() { private InvocationScopedEventSink createEventSink(boolean ordered) { @SuppressWarnings("unchecked") - BlockingQueue>[] queues = (BlockingQueue>[])new BlockingQueue[] { - blockingQueue }; + BlockingQueue>[] queues = (BlockingQueue>[]) new BlockingQueue[] { blockingQueue }; return new InvocationScopedEventSink<>(Collections.emptySet(), ordered, queues, storeEventListeners, EnumSet.allOf(EventType.class)); } private InvocationScopedEventSink createEventSink(boolean ordered, EventType eventTypes) { @SuppressWarnings("unchecked") - BlockingQueue>[] queues = (BlockingQueue>[])new BlockingQueue[] { - blockingQueue }; + BlockingQueue>[] queues = (BlockingQueue>[]) new BlockingQueue[] { blockingQueue }; return new InvocationScopedEventSink<>(Collections.emptySet(), ordered, queues, storeEventListeners, EnumSet.of(eventTypes)); } @@ -100,8 +98,8 @@ public void testReset() { } /** - * Make sure an interrupted sink sets the interrupted flag and keep both event queues in the state as of before the event that was - * interrupted. + * Make sure an interrupted sink sets the interrupted flag and keep both event queues in the state + * as of before the event that was interrupted. * * @throws InterruptedException */ @@ -121,7 +119,7 @@ public void testInterruption() throws InterruptedException { }); t.start(); - while (blockingQueue.remainingCapacity() != 0) { + while(blockingQueue.remainingCapacity() != 0) { System.out.println(blockingQueue.remainingCapacity()); } @@ -131,11 +129,11 @@ public void testInterruption() throws InterruptedException { assertThat(wasInterrupted).isTrue(); assertThat(blockingQueue).hasSize(10); IntStream.range(0, 10).forEachOrdered(i -> { - try { - assertThat(blockingQueue.take().getEvent().getKey()).isEqualTo("k" + i); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + try { + assertThat(blockingQueue.take().getEvent().getKey()).isEqualTo("k" + i); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } }); assertThat(eventSink.getEvents()).hasSize(10); assertThat(eventSink.getEvents().getLast().getEvent().getKey()).isEqualTo("k9");