diff --git a/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/store/ClusteredStore.java b/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/store/ClusteredStore.java index c3a7ae9967..4a24c27de8 100644 --- a/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/store/ClusteredStore.java +++ b/clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/store/ClusteredStore.java @@ -1016,6 +1016,12 @@ public synchronized void removeEventListener(StoreEventListener eventListe } delegate.removeEventListener(eventListener); } + + @Override + public void listenerModified() { + delegate.listenerModified(); + } + @Override public void addEventFilter(StoreEventFilter eventFilter) { delegate.addEventFilter(eventFilter); diff --git a/ehcache-api/src/main/java/org/ehcache/event/EventType.java b/ehcache-api/src/main/java/org/ehcache/event/EventType.java index d8286717b1..4cd5855878 100644 --- a/ehcache-api/src/main/java/org/ehcache/event/EventType.java +++ b/ehcache-api/src/main/java/org/ehcache/event/EventType.java @@ -16,6 +16,9 @@ package org.ehcache.event; +import java.util.EnumSet; +import java.util.Set; + /** * The different event types. */ @@ -44,6 +47,11 @@ public enum EventType { /** * Represents an existing {@link org.ehcache.Cache.Entry cache entry} being updated for a given key */ - UPDATED, + UPDATED; + + private static final Set ALL_EVENT_TYPES = EnumSet.allOf(EventType.class); + public static Set allAsSet() { + return ALL_EVENT_TYPES; + } } diff --git a/ehcache-core/src/main/java/org/ehcache/core/events/EventListenerWrapper.java b/ehcache-core/src/main/java/org/ehcache/core/events/EventListenerWrapper.java index 2076b68d81..ff71ab6526 100644 --- a/ehcache-core/src/main/java/org/ehcache/core/events/EventListenerWrapper.java +++ b/ehcache-core/src/main/java/org/ehcache/core/events/EventListenerWrapper.java @@ -23,6 +23,7 @@ import org.ehcache.event.EventType; import java.util.EnumSet; +import java.util.Set; /** * Internal wrapper for {@link CacheEventListener} and their configuration. @@ -86,8 +87,8 @@ public void onEvent(CacheEvent event) { return listener; } - public boolean isForEventType(EventType type) { - return forEvents.contains(type); + public Set getEventTypes() { + return forEvents; } public boolean isOrdered() { diff --git a/ehcache-core/src/main/java/org/ehcache/core/events/NullStoreEventDispatcher.java b/ehcache-core/src/main/java/org/ehcache/core/events/NullStoreEventDispatcher.java index ff26a97fd7..ab83841cb1 100644 --- a/ehcache-core/src/main/java/org/ehcache/core/events/NullStoreEventDispatcher.java +++ b/ehcache-core/src/main/java/org/ehcache/core/events/NullStoreEventDispatcher.java @@ -87,6 +87,11 @@ public void removeEventListener(StoreEventListener eventListener) { // Do nothing } + @Override + public void listenerModified() { + // Do nothing + } + @Override public void addEventFilter(StoreEventFilter eventFilter) { // Do nothing diff --git a/ehcache-core/src/main/java/org/ehcache/core/spi/store/events/StoreEventListener.java b/ehcache-core/src/main/java/org/ehcache/core/spi/store/events/StoreEventListener.java index ee3af6ee0b..ca5c66dd06 100644 --- a/ehcache-core/src/main/java/org/ehcache/core/spi/store/events/StoreEventListener.java +++ b/ehcache-core/src/main/java/org/ehcache/core/spi/store/events/StoreEventListener.java @@ -16,7 +16,10 @@ package org.ehcache.core.spi.store.events; +import java.util.Set; + import org.ehcache.core.events.StoreEventDispatcher; +import org.ehcache.event.EventType; /** * Interface used to register on a {@link StoreEventSource} to get notified of events happening to mappings the @@ -36,4 +39,15 @@ public interface StoreEventListener { * @param event the actual {@link StoreEvent} */ void onEvent(StoreEvent event); + + /** + * Specify which Events this Listener is handling. + *

+ * Defaults return is all values of {@link EventType} + * + * @return Set of the {@link EventType} this listener handles. + */ + default Set getEventTypes() { + return EventType.allAsSet(); + } } diff --git a/ehcache-core/src/main/java/org/ehcache/core/spi/store/events/StoreEventSource.java b/ehcache-core/src/main/java/org/ehcache/core/spi/store/events/StoreEventSource.java index d49187d70b..0c4db7001d 100644 --- a/ehcache-core/src/main/java/org/ehcache/core/spi/store/events/StoreEventSource.java +++ b/ehcache-core/src/main/java/org/ehcache/core/spi/store/events/StoreEventSource.java @@ -58,4 +58,9 @@ public interface StoreEventSource { * @return {@code true} if ordering is on, {@code false} otherwise */ boolean isEventOrdering(); + + /** + * Indicates that a listener was modified + */ + void listenerModified(); } diff --git a/ehcache-impl/src/main/java/org/ehcache/impl/events/CacheEventDispatcherImpl.java b/ehcache-impl/src/main/java/org/ehcache/impl/events/CacheEventDispatcherImpl.java index 283f074797..8fc7d40c00 100644 --- a/ehcache-impl/src/main/java/org/ehcache/impl/events/CacheEventDispatcherImpl.java +++ b/ehcache-impl/src/main/java/org/ehcache/impl/events/CacheEventDispatcherImpl.java @@ -34,11 +34,22 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collection; +import java.util.EnumMap; import java.util.EnumSet; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.stream.Stream; + +import static java.util.Collections.unmodifiableMap; +import static java.util.EnumSet.allOf; +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toMap; +import static java.util.stream.Stream.concat; /** * Per-cache component that manages cache event listener registrations, and provides event delivery based on desired @@ -55,10 +66,13 @@ public class CacheEventDispatcherImpl implements CacheEventDispatcher> syncListenersList = new CopyOnWriteArrayList<>(); - private final List> aSyncListenersList = new CopyOnWriteArrayList<>(); + + private final Map>> syncListenersList = unmodifiableMap(allOf(EventType.class).stream() + .collect(toMap(identity(), t -> new CopyOnWriteArrayList<>(), (a, b) -> { throw new AssertionError(); }, () -> new EnumMap<>(EventType.class)))); + private final Map>> asyncListenersList = unmodifiableMap(allOf(EventType.class).stream() + .collect(toMap(identity(), t -> new CopyOnWriteArrayList<>(), (a, b) -> { throw new AssertionError(); }, () -> new EnumMap<>(EventType.class)))); + private final Set registeredEventTypes = EnumSet.noneOf(EventType.class); + private final StoreEventListener eventListener = new StoreListener(); private volatile Cache listenerSource; @@ -94,69 +108,91 @@ public void registerCacheEventListener(CacheEventListener * @param wrapper the listener wrapper to register */ private synchronized void registerCacheEventListener(EventListenerWrapper wrapper) { - if(aSyncListenersList.contains(wrapper) || syncListenersList.contains(wrapper)) { + + if(allListeners().anyMatch(wrapper::equals)) { throw new IllegalStateException("Cache Event Listener already registered: " + wrapper.getListener()); } - if (wrapper.isOrdered() && orderedListenerCount++ == 0) { + boolean firstListener = !allListeners().findAny().isPresent(); + + if (wrapper.isOrdered() && (firstListener || allListeners().noneMatch(EventListenerWrapper::isOrdered))) { storeEventSource.setEventOrdering(true); } + registeredEventTypes.addAll(wrapper.getEventTypes()); // add EventType of new wrapper to list of relevant EntryTypes + switch (wrapper.getFiringMode()) { case ASYNCHRONOUS: - aSyncListenersList.add(wrapper); + wrapper.getEventTypes().forEach(type -> asyncListenersList.get(type).add(wrapper)); break; case SYNCHRONOUS: - if (syncListenersList.isEmpty()) { + if (!syncListeners().findAny().isPresent()) { storeEventSource.setSynchronous(true); } - syncListenersList.add(wrapper); + wrapper.getEventTypes().forEach(type -> syncListenersList.get(type).add(wrapper)); break; default: throw new AssertionError("Unhandled EventFiring value: " + wrapper.getFiringMode()); } - if (listenersCount++ == 0) { + if (firstListener) { storeEventSource.addEventListener(eventListener); + } else { + storeEventSource.listenerModified(); } } + private Stream> allListeners() { + return concat(asyncListeners(), syncListeners()); + } + + private Stream> syncListeners() { + return syncListenersList.values().stream().flatMap(Collection::stream); + } + + private Stream> asyncListeners() { + return asyncListenersList.values().stream().flatMap(Collection::stream); + } + /** * {@inheritDoc} */ @Override - public void deregisterCacheEventListener(CacheEventListener listener) { + public synchronized void deregisterCacheEventListener(CacheEventListener listener) { EventListenerWrapper wrapper = new EventListenerWrapper<>(listener); - if (!removeWrapperFromList(wrapper, aSyncListenersList)) { - if (!removeWrapperFromList(wrapper, syncListenersList)) { - throw new IllegalStateException("Unknown cache event listener: " + listener); - } + boolean removed = Stream.of(asyncListenersList, syncListenersList) + .flatMap(list -> list.values().stream()) + .map(list -> list.remove(wrapper)) + .reduce((a, b) -> a || b).orElse(false); + + if (!removed) { + throw new IllegalStateException("Unknown cache event listener: " + listener); } - } - /** - * Synchronized to make sure listener removal is atomic - * - * @param wrapper the listener wrapper to unregister - * @param listenersList the listener list to remove from - */ - private synchronized boolean removeWrapperFromList(EventListenerWrapper wrapper, List> listenersList) { - int index = listenersList.indexOf(wrapper); - if (index != -1) { - EventListenerWrapper containedWrapper = listenersList.remove(index); - if(containedWrapper.isOrdered() && --orderedListenerCount == 0) { + refreshRegisteredEventTypes(); + + if (!allListeners().findAny().isPresent()) { + storeEventSource.setSynchronous(false); + storeEventSource.setEventOrdering(false); + storeEventSource.removeEventListener(eventListener); + } else { + if (allListeners().noneMatch(EventListenerWrapper::isOrdered)) { storeEventSource.setEventOrdering(false); } - if (--listenersCount == 0) { - storeEventSource.removeEventListener(eventListener); - } - if (syncListenersList.isEmpty()) { + if (!syncListeners().findAny().isPresent()) { storeEventSource.setSynchronous(false); } - return true; + storeEventSource.listenerModified(); } - return false; + } + + private void refreshRegisteredEventTypes() { + // collect all registered EventTypes + EnumSet newRegisteredEventTypes = EnumSet.noneOf(EventType.class); + allListeners().forEach(listener -> newRegisteredEventTypes.addAll(listener.getEventTypes())); + // drop irrelevant EventTypes + registeredEventTypes.retainAll(newRegisteredEventTypes); } /** @@ -167,8 +203,8 @@ public synchronized void shutdown() { storeEventSource.removeEventListener(eventListener); storeEventSource.setEventOrdering(false); storeEventSource.setSynchronous(false); - syncListenersList.clear(); - aSyncListenersList.clear(); + syncListenersList.values().forEach(Collection::clear); + asyncListenersList.values().forEach(Collection::clear); unOrderedExectuor.shutdown(); orderedExecutor.shutdown(); } @@ -182,21 +218,30 @@ public synchronized void setListenerSource(Cache source) { } void onEvent(CacheEvent event) { - ExecutorService executor; + List> asyncTargets = asyncListenersList.get(event.getType()); + List> syncTargets = syncListenersList.get(event.getType()); if (storeEventSource.isEventOrdering()) { - executor = orderedExecutor; + if (!asyncTargets.isEmpty()) { + orderedExecutor.submit(new EventDispatchTask<>(event, asyncTargets)); + } + if (!syncTargets.isEmpty()) { + Future future = orderedExecutor.submit(new EventDispatchTask<>(event, syncTargets)); + try { + future.get(); + } catch (Exception e) { + LOGGER.error("Exception received as result from synchronous listeners", e); + } + } } else { - executor = unOrderedExectuor; - } - if (!aSyncListenersList.isEmpty()) { - executor.submit(new EventDispatchTask<>(event, aSyncListenersList)); - } - if (!syncListenersList.isEmpty()) { - Future future = executor.submit(new EventDispatchTask<>(event, syncListenersList)); - try { - future.get(); - } catch (Exception e) { - LOGGER.error("Exception received as result from synchronous listeners", e); + if (!asyncTargets.isEmpty()) { + unOrderedExectuor.submit(new EventDispatchTask<>(event, asyncTargets)); + } + if (!syncTargets.isEmpty()) { + try { + new EventDispatchTask<>(event, syncTargets).run(); + } catch (Exception e) { + LOGGER.error("Exception received as result from synchronous listeners", e); + } } } } @@ -243,8 +288,12 @@ public void onEvent(StoreEvent event) { throw new AssertionError("Unexpected StoreEvent value: " + event.getType()); } } - } + @Override + public Set getEventTypes() { + return registeredEventTypes; + } + } /** * {@inheritDoc} */ diff --git a/ehcache-impl/src/main/java/org/ehcache/impl/events/EventDispatchTask.java b/ehcache-impl/src/main/java/org/ehcache/impl/events/EventDispatchTask.java index fbdcbdf88d..42ad6f8bb0 100644 --- a/ehcache-impl/src/main/java/org/ehcache/impl/events/EventDispatchTask.java +++ b/ehcache-impl/src/main/java/org/ehcache/impl/events/EventDispatchTask.java @@ -40,12 +40,10 @@ class EventDispatchTask implements Runnable { @Override public void run() { for(EventListenerWrapper listenerWrapper : listenerWrappers) { - if (listenerWrapper.isForEventType(cacheEvent.getType())) { - try { - listenerWrapper.onEvent(cacheEvent); - } catch (Exception e) { - LOGGER.warn(listenerWrapper.getListener() + " Failed to fire Event due to ", e); - } + try { + listenerWrapper.onEvent(cacheEvent); + } catch (Exception e) { + LOGGER.warn(listenerWrapper.getListener() + " Failed to fire Event due to ", e); } } } diff --git a/ehcache-impl/src/main/java/org/ehcache/impl/internal/events/AbstractStoreEventDispatcher.java b/ehcache-impl/src/main/java/org/ehcache/impl/internal/events/AbstractStoreEventDispatcher.java index f2bc22432d..ddbfd01021 100644 --- a/ehcache-impl/src/main/java/org/ehcache/impl/internal/events/AbstractStoreEventDispatcher.java +++ b/ehcache-impl/src/main/java/org/ehcache/impl/internal/events/AbstractStoreEventDispatcher.java @@ -20,7 +20,9 @@ import org.ehcache.core.events.StoreEventSink; import org.ehcache.core.spi.store.events.StoreEventFilter; import org.ehcache.core.spi.store.events.StoreEventListener; +import org.ehcache.event.EventType; +import java.util.EnumSet; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CopyOnWriteArraySet; @@ -77,6 +79,7 @@ public void evicted(Object key, Supplier value) { private final Set> filters = new CopyOnWriteArraySet<>(); private final Set> listeners = new CopyOnWriteArraySet<>(); private final BlockingQueue>[] orderedQueues; + private final Set relevantEventTypes = EnumSet.noneOf(EventType.class); private volatile boolean ordered = false; protected AbstractStoreEventDispatcher(int dispatcherConcurrency) { @@ -92,6 +95,16 @@ protected AbstractStoreEventDispatcher(int dispatcherConcurrency) { } } + private void computeRelevantEventTypes() { + // collect all EventTypes the listeners are interested in. + for (StoreEventListener listener : listeners) { + relevantEventTypes.addAll(listener.getEventTypes()); + } + if (relevantEventTypes.isEmpty()) { // mocks are empty -> handle all types + relevantEventTypes.addAll(EnumSet.allOf(EventType.class)); + } + } + protected Set> getListeners() { return listeners; } @@ -104,14 +117,25 @@ protected BlockingQueue>[] getOrderedQueues() { return orderedQueues; } + protected Set getRelevantEventTypes() { + return relevantEventTypes; + } + @Override public void addEventListener(StoreEventListener eventListener) { listeners.add(eventListener); + computeRelevantEventTypes(); // refresh } @Override public void removeEventListener(StoreEventListener eventListener) { listeners.remove(eventListener); + computeRelevantEventTypes(); // refresh + } + + @Override + public void listenerModified() { + computeRelevantEventTypes(); // refresh } @Override @@ -151,6 +175,6 @@ public void reset(StoreEventSink eventSink) { @Override public StoreEventSink eventSink() { - return new InvocationScopedEventSink<>(getFilters(), isEventOrdering(), getOrderedQueues(), getListeners()); + return new InvocationScopedEventSink<>(getFilters(), isEventOrdering(), getOrderedQueues(), getListeners(), getRelevantEventTypes()); } } diff --git a/ehcache-impl/src/main/java/org/ehcache/impl/internal/events/FudgingInvocationScopedEventSink.java b/ehcache-impl/src/main/java/org/ehcache/impl/internal/events/FudgingInvocationScopedEventSink.java index 5079c5ce22..737c70ffbd 100644 --- a/ehcache-impl/src/main/java/org/ehcache/impl/internal/events/FudgingInvocationScopedEventSink.java +++ b/ehcache-impl/src/main/java/org/ehcache/impl/internal/events/FudgingInvocationScopedEventSink.java @@ -40,8 +40,9 @@ class FudgingInvocationScopedEventSink extends InvocationScopedEventSink> filters, boolean ordered, BlockingQueue>[] orderedQueues, - Set> listeners) { - super(filters, ordered, orderedQueues, listeners); + Set> listeners, + Set relevantEventTypes) { + super(filters, ordered, orderedQueues, listeners, relevantEventTypes); } @Override diff --git a/ehcache-impl/src/main/java/org/ehcache/impl/internal/events/InvocationScopedEventSink.java b/ehcache-impl/src/main/java/org/ehcache/impl/internal/events/InvocationScopedEventSink.java index 21a961e38a..bc90444333 100644 --- a/ehcache-impl/src/main/java/org/ehcache/impl/internal/events/InvocationScopedEventSink.java +++ b/ehcache-impl/src/main/java/org/ehcache/impl/internal/events/InvocationScopedEventSink.java @@ -43,14 +43,16 @@ class InvocationScopedEventSink implements CloseableStoreEventSink { private final BlockingQueue>[] orderedQueues; private final Set> listeners; private final Deque> events = new ArrayDeque<>(4); + private final Set relevantEventTypes; InvocationScopedEventSink(Set> filters, boolean ordered, - BlockingQueue>[] orderedQueues, - Set> listeners) { + BlockingQueue>[] orderedQueues, Set> listeners, + Set relevantEventTypes) { this.filters = filters; this.ordered = ordered; this.orderedQueues = orderedQueues; this.listeners = listeners; + this.relevantEventTypes = relevantEventTypes; } @Override @@ -98,7 +100,8 @@ protected boolean acceptEvent(EventType type, K key, V oldValue, V newValue) { return false; } } - return true; + // at least one listener is interested in this event + return relevantEventTypes.contains(type); } @Override diff --git a/ehcache-impl/src/main/java/org/ehcache/impl/internal/events/ThreadLocalStoreEventDispatcher.java b/ehcache-impl/src/main/java/org/ehcache/impl/internal/events/ThreadLocalStoreEventDispatcher.java index 63934f319b..a10025a6be 100644 --- a/ehcache-impl/src/main/java/org/ehcache/impl/internal/events/ThreadLocalStoreEventDispatcher.java +++ b/ehcache-impl/src/main/java/org/ehcache/impl/internal/events/ThreadLocalStoreEventDispatcher.java @@ -39,7 +39,7 @@ public StoreEventSink eventSink() { } else { StoreEventSink eventSink = tlEventSink.get(); if (eventSink == null) { - eventSink = new FudgingInvocationScopedEventSink<>(getFilters(), isEventOrdering(), getOrderedQueues(), getListeners()); + eventSink = new FudgingInvocationScopedEventSink<>(getFilters(), isEventOrdering(), getOrderedQueues(), getListeners(), getRelevantEventTypes()); tlEventSink.set(eventSink); usageDepth.set(0); } else { diff --git a/ehcache-impl/src/main/java/org/ehcache/impl/internal/store/basic/NopStore.java b/ehcache-impl/src/main/java/org/ehcache/impl/internal/store/basic/NopStore.java index 2abd943e44..b5ad67d01b 100644 --- a/ehcache-impl/src/main/java/org/ehcache/impl/internal/store/basic/NopStore.java +++ b/ehcache-impl/src/main/java/org/ehcache/impl/internal/store/basic/NopStore.java @@ -144,6 +144,10 @@ public void setSynchronous(boolean synchronous) throws IllegalArgumentException public boolean isEventOrdering() { return false; } + + @Override + public void listenerModified() { + } }; } diff --git a/ehcache-impl/src/main/java/org/ehcache/impl/internal/store/shared/store/StorePartition.java b/ehcache-impl/src/main/java/org/ehcache/impl/internal/store/shared/store/StorePartition.java index 2c4ee1cdc2..3ef366f473 100644 --- a/ehcache-impl/src/main/java/org/ehcache/impl/internal/store/shared/store/StorePartition.java +++ b/ehcache-impl/src/main/java/org/ehcache/impl/internal/store/shared/store/StorePartition.java @@ -303,6 +303,11 @@ public void setSynchronous(boolean synchronous) throws IllegalArgumentException public boolean isEventOrdering() { return storeEventSource.isEventOrdering(); } + + @Override + public void listenerModified() { + storeEventSource.listenerModified(); + } }; } diff --git a/ehcache-impl/src/test/java/org/ehcache/impl/events/CacheEventDispatcherImplTest.java b/ehcache-impl/src/test/java/org/ehcache/impl/events/CacheEventDispatcherImplTest.java index 976ee8f63c..41a0d65cd3 100644 --- a/ehcache-impl/src/test/java/org/ehcache/impl/events/CacheEventDispatcherImplTest.java +++ b/ehcache-impl/src/test/java/org/ehcache/impl/events/CacheEventDispatcherImplTest.java @@ -26,10 +26,9 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; import org.mockito.InOrder; import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import java.util.EnumSet; import java.util.concurrent.CountDownLatch; @@ -37,12 +36,15 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -119,6 +121,37 @@ public void testListenerRegistrationEnablesStoreEvents() { verify(storeEventDispatcher).addEventListener(any(StoreEventListener.class)); } + @Test + public void testMultipleListenerRegistrationNotifiesStoreEvent() { + eventService.registerCacheEventListener(listener, EventOrdering.UNORDERED, EventFiring.ASYNCHRONOUS, EnumSet.of(EventType.UPDATED)); + CacheEventListener otherLsnr = mock(CacheEventListener.class); + eventService.registerCacheEventListener(otherLsnr, EventOrdering.ORDERED, EventFiring.SYNCHRONOUS, EnumSet.of(EventType.REMOVED)); + verify(storeEventDispatcher, times(1)).listenerModified(); + } + + @Test + public void testStoreListenerRegisteredEventTypesUpdated() { + eventService.registerCacheEventListener(listener, EventOrdering.UNORDERED, EventFiring.ASYNCHRONOUS, EnumSet.of(EventType.UPDATED)); + CacheEventListener otherLsnr = mock(CacheEventListener.class); + eventService.registerCacheEventListener(otherLsnr, EventOrdering.ORDERED, EventFiring.SYNCHRONOUS, EnumSet.of(EventType.REMOVED)); + verify(storeEventDispatcher, times(1)).listenerModified(); + + ArgumentCaptor captor = ArgumentCaptor.forClass(StoreEventListener.class); + verify(storeEventDispatcher).addEventListener(captor.capture()); + StoreEventListener storeListener = captor.getValue(); + + assertTrue(storeListener.getEventTypes().contains(EventType.UPDATED)); + assertTrue(storeListener.getEventTypes().contains(EventType.REMOVED)); + assertEquals(2, storeListener.getEventTypes().size()); + + eventService.deregisterCacheEventListener(listener); + + verify(storeEventDispatcher, times(2)).listenerModified(); + + assertTrue(storeListener.getEventTypes().contains(EventType.REMOVED)); + assertEquals(1, storeListener.getEventTypes().size()); + } + @Test public void testOrderedListenerRegistrationTogglesOrderedOnStoreEvents() { eventService.registerCacheEventListener(listener, EventOrdering.ORDERED, EventFiring.ASYNCHRONOUS, EnumSet.allOf(EventType.class)); @@ -166,6 +199,15 @@ public void testDeregisterNotLastListenerDoesNotStopStoreEvents() { verify(storeEventDispatcher, never()).removeEventListener(any(StoreEventListener.class)); } + @Test + public void testDeregisterNotLastListenerTriggersListernerModified() { + eventService.registerCacheEventListener(listener, EventOrdering.UNORDERED, EventFiring.SYNCHRONOUS, EnumSet.of(EventType.EVICTED)); + eventService.registerCacheEventListener(mock(CacheEventListener.class), EventOrdering.UNORDERED, EventFiring.SYNCHRONOUS, + EnumSet.of(EventType.EVICTED)); + eventService.deregisterCacheEventListener(listener); + verify(storeEventDispatcher, times(2)).listenerModified(); + } + @Test public void testShutdownDisableStoreEventsAndShutsDownOrderedExecutor() { eventService.registerCacheEventListener(listener, diff --git a/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/FudgingInvocationScopedEventSinkTest.java b/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/FudgingInvocationScopedEventSinkTest.java index 42f7a93d78..fa9df96802 100644 --- a/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/FudgingInvocationScopedEventSinkTest.java +++ b/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/FudgingInvocationScopedEventSinkTest.java @@ -24,6 +24,7 @@ import org.junit.Test; import org.mockito.InOrder; +import java.util.EnumSet; import java.util.HashSet; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; @@ -53,7 +54,7 @@ public void setUp() { storeEventListeners.add(listener); @SuppressWarnings({"unchecked", "rawtypes"}) BlockingQueue>[] blockingQueues = new BlockingQueue[] { new ArrayBlockingQueue>(10) }; - eventSink = new FudgingInvocationScopedEventSink<>(new HashSet<>(), false, blockingQueues, storeEventListeners); + eventSink = new FudgingInvocationScopedEventSink<>(new HashSet<>(), false, blockingQueues, storeEventListeners, EnumSet.allOf(EventType.class)); } @Test diff --git a/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/InvocationScopedEventSinkTest.java b/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/InvocationScopedEventSinkTest.java index d2b93ef39c..11fda18bfa 100644 --- a/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/InvocationScopedEventSinkTest.java +++ b/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/InvocationScopedEventSinkTest.java @@ -17,7 +17,6 @@ package org.ehcache.impl.internal.events; import org.ehcache.core.spi.store.events.StoreEvent; -import org.ehcache.core.spi.store.events.StoreEventFilter; import org.ehcache.core.spi.store.events.StoreEventListener; import org.ehcache.event.EventType; import org.hamcrest.Matcher; @@ -30,6 +29,7 @@ import org.mockito.junit.MockitoRule; import java.util.Collections; +import java.util.EnumSet; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -66,7 +66,13 @@ public void setUp() { private InvocationScopedEventSink createEventSink(boolean ordered) { @SuppressWarnings("unchecked") BlockingQueue>[] queues = (BlockingQueue>[]) new BlockingQueue[] { blockingQueue }; - return new InvocationScopedEventSink<>(Collections.emptySet(), ordered, queues, storeEventListeners); + return new InvocationScopedEventSink<>(Collections.emptySet(), ordered, queues, storeEventListeners, EnumSet.allOf(EventType.class)); + } + + private InvocationScopedEventSink createEventSink(boolean ordered, EventType eventTypes) { + @SuppressWarnings("unchecked") + BlockingQueue>[] queues = (BlockingQueue>[]) new BlockingQueue[] { blockingQueue }; + return new InvocationScopedEventSink<>(Collections.emptySet(), ordered, queues, storeEventListeners, EnumSet.of(eventTypes)); } @Test @@ -132,4 +138,15 @@ public void testInterruption() throws InterruptedException { assertThat(eventSink.getEvents()).hasSize(10); assertThat(eventSink.getEvents().getLast().getEvent().getKey()).isEqualTo("k9"); } + + @Test + public void testAcceptEventFiltersIrrelevantEventTypes() { + eventSink = createEventSink(false, EventType.REMOVED); + eventSink.created("k", "v"); + + assertThat(eventSink.getEvents()).isEmpty(); + + eventSink.removed("k", () -> "v"); + assertThat(eventSink.getEvents()).hasSize(1); + } } diff --git a/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/TestStoreEventDispatcher.java b/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/TestStoreEventDispatcher.java index 51bc63a89e..2951a5d556 100644 --- a/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/TestStoreEventDispatcher.java +++ b/ehcache-impl/src/test/java/org/ehcache/impl/internal/events/TestStoreEventDispatcher.java @@ -75,6 +75,11 @@ public void removeEventListener(StoreEventListener eventListener) { listeners.remove(eventListener); } + @Override + public void listenerModified() { + // No-op + } + @Override public void addEventFilter(StoreEventFilter eventFilter) { filters.add(eventFilter); diff --git a/ehcache-impl/src/test/java/org/ehcache/impl/store/DefaultStoreEventDispatcherTest.java b/ehcache-impl/src/test/java/org/ehcache/impl/store/DefaultStoreEventDispatcherTest.java index 30ca7801e7..e9168e0d49 100644 --- a/ehcache-impl/src/test/java/org/ehcache/impl/store/DefaultStoreEventDispatcherTest.java +++ b/ehcache-impl/src/test/java/org/ehcache/impl/store/DefaultStoreEventDispatcherTest.java @@ -28,16 +28,21 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.EnumSet; import java.util.Random; +import java.util.Set; import java.util.concurrent.CountDownLatch; import static org.ehcache.impl.internal.util.Matchers.eventOfType; import static org.ehcache.test.MockitoUtil.uncheckedGenericMock; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @@ -76,6 +81,92 @@ public void testListenerNotifiedUnordered() { verify(listener).onEvent(any(StoreEvent.class)); } + @Test + @SuppressWarnings("unchecked") + public void testListenerNotifiedForRelevantEventsOnly() { + DefaultStoreEventDispatcher dispatcher = new DefaultStoreEventDispatcher<>(1); + StoreEventListener listener = uncheckedGenericMock(StoreEventListener.class); + when(listener.getEventTypes()).thenReturn(EnumSet.of(EventType.REMOVED)); + + dispatcher.addEventListener(listener); + + StoreEventSink sink = dispatcher.eventSink(); + sink.created("test", "test"); + dispatcher.releaseEventSink(sink); + + verify(listener, never()).onEvent(any(StoreEvent.class)); + + sink.removed("test", () -> "test"); + dispatcher.releaseEventSink(sink); + + verify(listener, times(1)).onEvent(any(StoreEvent.class)); + } + + @Test + @SuppressWarnings("unchecked") + public void testRelevantEventTypesUpdatedOnAddEventListener() { + DefaultStoreEventDispatcher dispatcher = new DefaultStoreEventDispatcher<>(1); + StoreEventListener listener = uncheckedGenericMock(StoreEventListener.class); + dispatcher.addEventListener(listener); + + verify(listener, times(1)).getEventTypes(); + + StoreEventListener listener2 = uncheckedGenericMock(StoreEventListener.class); + + dispatcher.addEventListener(listener2); + + verify(listener, times(2)).getEventTypes(); + verify(listener2, times(1)).getEventTypes(); + } + + @Test + @SuppressWarnings("unchecked") + public void testRelevantEventTypesUpdatedOnRemoveEventListener() { + DefaultStoreEventDispatcher dispatcher = new DefaultStoreEventDispatcher<>(1); + StoreEventListener listener = uncheckedGenericMock(StoreEventListener.class); + dispatcher.addEventListener(listener); + StoreEventListener listener2 = uncheckedGenericMock(StoreEventListener.class); + dispatcher.addEventListener(listener2); + verify(listener2, times(1)).getEventTypes(); + + dispatcher.removeEventListener(listener); + + verify(listener2, times(2)).getEventTypes(); + } + + @Test + @SuppressWarnings("unchecked") + public void testRelevantEventTypesUpdatedOnListenerModifiedSignal() { + DefaultStoreEventDispatcher dispatcher = new DefaultStoreEventDispatcher<>(1); + StoreEventListener listener = uncheckedGenericMock(StoreEventListener.class); + dispatcher.addEventListener(listener); + verify(listener, times(1)).getEventTypes(); + + dispatcher.listenerModified(); + + verify(listener, times(2)).getEventTypes(); + } + + @Test + @SuppressWarnings("unchecked") + public void testRelevantEventTypesContainAllEventsForMocks() { + DefaultStoreEventDispatcherMock dispatcher = new DefaultStoreEventDispatcherMock<>(1); + StoreEventListener listener = uncheckedGenericMock(StoreEventListener.class); + dispatcher.addEventListener(listener); + + assertTrue(dispatcher.spyRelevantEventTypes().containsAll(EventType.allAsSet())); + } + + class DefaultStoreEventDispatcherMock extends DefaultStoreEventDispatcher { + public DefaultStoreEventDispatcherMock(int dispatcherConcurrency) { + super(dispatcherConcurrency); + } + + public Set spyRelevantEventTypes() { + return super.getRelevantEventTypes(); + } + } + @Test @SuppressWarnings("unchecked") public void testListenerNotifiedOrdered() { @@ -107,6 +198,7 @@ public void testEventFiltering() { sink.created("new", "and shiny"); dispatcher.releaseEventSink(sink); + verify(listener).getEventTypes(); Matcher> matcher = eventOfType(EventType.CREATED); verify(listener).onEvent(argThat(matcher)); verifyNoMoreInteractions(listener); diff --git a/ehcache-transactions/src/common/java/org/ehcache/transactions/xa/internal/StoreEventSourceWrapper.java b/ehcache-transactions/src/common/java/org/ehcache/transactions/xa/internal/StoreEventSourceWrapper.java index cc8c0ef442..c82d382a4b 100644 --- a/ehcache-transactions/src/common/java/org/ehcache/transactions/xa/internal/StoreEventSourceWrapper.java +++ b/ehcache-transactions/src/common/java/org/ehcache/transactions/xa/internal/StoreEventSourceWrapper.java @@ -60,6 +60,11 @@ public void removeEventListener(StoreEventListener eventListener) { } } + @Override + public void listenerModified() { + underlying.listenerModified(); + } + @Override public void addEventFilter(final StoreEventFilter eventFilter) { underlying.addEventFilter((type, key, oldValue, newValue) -> {