diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java index 2098a8e24fea..fab9f5f09912 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java @@ -619,9 +619,7 @@ void unregisterClient(ClientProxyMembershipID memberId, boolean normalShutdown) public void readyForEvents(ClientProxyMembershipID proxyId) { CacheClientProxy proxy = getClientProxy(proxyId); if (proxy != null) { - // False signifies that a marker message has not already been processed. - // Generate and send one. - proxy.startOrResumeMessageDispatcher(false); + proxy.startOrResumeMessageDispatcher(true); } } diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientCrashDUnitTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientCrashDUnitTest.java index 36f98a43160a..327e55f624e1 100755 --- a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientCrashDUnitTest.java +++ b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientCrashDUnitTest.java @@ -46,7 +46,7 @@ protected void preTearDownDurableClientTestBase() { @Test public void testDurableClient() { - startupDurableClientAndServer(VERY_LONG_DURABLE_TIMEOUT_SECONDS); + startupDurableClientAndServer(VERY_LONG_DURABLE_TIMEOUT_SECONDS, true); verifyDurableClientPresent(VERY_LONG_DURABLE_TIMEOUT_SECONDS, durableClientId, server1VM); diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientNetDownDUnitTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientNetDownDUnitTest.java index f6dd5ec2b3e3..584626d75eee 100644 --- a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientNetDownDUnitTest.java +++ b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientNetDownDUnitTest.java @@ -79,7 +79,7 @@ public void verifyListenerUpdatesDisconnected(int numberOfEntries) { @Test public void testDurableClient() { - startupDurableClientAndServer(VERY_LONG_DURABLE_TIMEOUT_SECONDS); + startupDurableClientAndServer(VERY_LONG_DURABLE_TIMEOUT_SECONDS, true); verifyDurableClientPresent(VERY_LONG_DURABLE_TIMEOUT_SECONDS, durableClientId, server1VM); diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientSimpleDUnitTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientSimpleDUnitTest.java index d08000b1119e..6e90193bd7d6 100644 --- a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientSimpleDUnitTest.java +++ b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientSimpleDUnitTest.java @@ -15,8 +15,6 @@ package org.apache.geode.internal.cache.tier.sockets; import static java.lang.Thread.sleep; -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.MINUTES; import static org.apache.geode.cache.InterestResultPolicy.NONE; import static org.apache.geode.cache.Region.SEPARATOR; import static org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.TYPE_CREATE; @@ -866,19 +864,11 @@ public void testDurableClientReceivedClientSessionInitialValue() { // Start a durable client with the ControlListener durableClientId = getName() + "_client"; - durableClientVM.invoke(() -> createCacheClient( - getClientPool(getServerHostName(), server1Port, server2Port, true), regionName, - getClientDistributedSystemProperties(durableClientId, VERY_LONG_DURABLE_TIMEOUT_SECONDS), - true)); - durableClientVM.invoke(() -> { - await().atMost(HEAVY_TEST_LOAD_DELAY_SUPPORT_MULTIPLIER, MINUTES) - .pollInterval(100, MILLISECONDS) - .untilAsserted(() -> assertThat(getCache()).isNotNull()); - }); - - // Send clientReady message - sendClientReady(durableClientVM); + startupDurableClient(VERY_LONG_DURABLE_TIMEOUT_SECONDS, + getClientPool(getServerHostName(), server1Port, server2Port, + true), + Boolean.TRUE, true); // Use ClientSession on the server to ` in entry key on behalf of durable client boolean server1IsPrimary = false; diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientTestBase.java b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientTestBase.java index bd742fe70888..4e13781a2534 100644 --- a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientTestBase.java +++ b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientTestBase.java @@ -35,6 +35,7 @@ import static org.hamcrest.Matchers.nullValue; import java.time.Duration; +import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Properties; @@ -43,6 +44,7 @@ import org.apache.geode.cache.CacheException; import org.apache.geode.cache.InterestResultPolicy; import org.apache.geode.cache.Region; +import org.apache.geode.cache.client.ClientCache; import org.apache.geode.cache.client.Pool; import org.apache.geode.cache.client.PoolFactory; import org.apache.geode.cache.client.PoolManager; @@ -115,48 +117,40 @@ public final void preTearDown() { protected void preTearDownDurableClientTestBase() {} - void startupDurableClientAndServer(final int durableClientTimeout) { + void startupDurableClientAndServer(final int durableClientTimeout, + final boolean sendReadyForEvents) { server1Port = server1VM .invoke(() -> createCacheServer(regionName, Boolean.TRUE)); durableClientId = getName() + "_client"; - durableClientVM.invoke(() -> createCacheClient( - getClientPool(NetworkUtils.getServerHostName(), server1Port, true), regionName, - getClientDistributedSystemProperties(durableClientId, durableClientTimeout), - Boolean.TRUE)); - - durableClientVM.invoke(() -> { - await().atMost(HEAVY_TEST_LOAD_DELAY_SUPPORT_MULTIPLIER, MINUTES) - .pollInterval(100, MILLISECONDS) - .until(CacheServerTestUtil::getCache, notNullValue()); - }); - // Send clientReady message - sendClientReady(durableClientVM); + startupDurableClient(durableClientTimeout, Boolean.TRUE, sendReadyForEvents); verifyDurableClientPresent(durableClientTimeout, durableClientId, server1VM); - } // This exists so child classes can override the behavior and mock out network failures public void restartDurableClient(int durableClientTimeout, Pool clientPool, Boolean addControlListener) { - durableClientVM.invoke(() -> createCacheClient(clientPool, regionName, - getClientDistributedSystemProperties(durableClientId, durableClientTimeout), - addControlListener)); - - durableClientVM.invoke(() -> { - await().atMost(HEAVY_TEST_LOAD_DELAY_SUPPORT_MULTIPLIER, MINUTES) - .pollInterval(100, MILLISECONDS) - .until(CacheServerTestUtil::getCache, notNullValue()); - }); + startupDurableClient(durableClientTimeout, clientPool, addControlListener, true); } // This exists so child classes can override the behavior and mock out network failures public void restartDurableClient(int durableClientTimeout, Boolean addControlListener) { - durableClientVM.invoke(() -> createCacheClient( - getClientPool(NetworkUtils.getServerHostName(), server1Port, true), regionName, - getClientDistributedSystemProperties(durableClientId, durableClientTimeout), + startupDurableClient(durableClientTimeout, addControlListener, true); + } + + public void restartDurableClient(int durableClientTimeout, Boolean addControlListener, + boolean readyForEvents) { + startupDurableClient(durableClientTimeout, addControlListener, readyForEvents); + } + + + void startupDurableClient(int durableClientTimeout, Pool clientPool, + Boolean addControlListener, final boolean sendReadyForEvents) { + this.durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient( + clientPool, + regionName, getClientDistributedSystemProperties(durableClientId, durableClientTimeout), addControlListener)); durableClientVM.invoke(() -> { @@ -165,8 +159,17 @@ public void restartDurableClient(int durableClientTimeout, Boolean addControlLis .until(CacheServerTestUtil::getCache, notNullValue()); }); - // Send clientReady message - sendClientReady(durableClientVM); + if (sendReadyForEvents) { + // Send clientReady message + sendClientReady(durableClientVM); + } + } + + private void startupDurableClient(int durableClientTimeout, Boolean addControlListener, + boolean sendReadyForEvents) { + startupDurableClient(durableClientTimeout, + getClientPool(NetworkUtils.getServerHostName(), server1Port, true), addControlListener, + sendReadyForEvents); } void verifyDurableClientPresent(int durableClientTimeout, String durableClientId, @@ -190,6 +193,19 @@ void waitForDurableClientPresence(String durableClientId, VM serverVM) { }); } + void verifyEntryPresentInCache(Map.Entry expectedEntry) { + this.durableClientVM.invoke(() -> { + ClientCache clientCache = CacheServerTestUtil.getClientCache(); + Region region = clientCache.getRegion(regionName); + + Map entriesMap = new HashMap<>(); + for (Map.Entry entry : region.entrySet()) { + entriesMap.put((String) entry.getKey(), (String) entry.getValue()); + } + await().untilAsserted(() -> assertThat(entriesMap).contains(expectedEntry)); + }); + } + void verifyDurableClientPresence(int durableClientTimeout, String durableClientId, VM serverVM, final int count) { serverVM.invoke(() -> { @@ -521,6 +537,30 @@ public void run2() throws CacheException { } } + + protected void registerInterestAll(VM vm, final String regionName, final boolean durable, + final InterestResultPolicy interestResultPolicy) { + vm.invoke(new CacheSerializableRunnable("Register interest on region : " + regionName) { + @Override + public void run2() throws CacheException { + + Region region = CacheServerTestUtil.getCache().getRegion(regionName); + assertThat(region).isNotNull(); + + // Register interest in all keys + region.registerInterestForAllKeys(interestResultPolicy, durable); + } + }); + + // This seems to be necessary for the queue to start up. Ideally should be replaced with + // Awaitility if possible. + try { + java.lang.Thread.sleep(5000); + } catch (java.lang.InterruptedException ex) { + fail("interrupted"); + } + } + void createCq(VM vm, final String cqName, final String cqQuery, final boolean durable) { vm.invoke("Register cq " + cqName, new CacheSerializableRunnable() { @Override @@ -555,6 +595,18 @@ public void run2() throws CacheException { }); } + void publishEntry(Map.Entry entry) { + this.publisherClientVM.invoke(new CacheSerializableRunnable("Publish entry") { + @Override + public void run2() throws CacheException { + Region region = CacheServerTestUtil.getCache().getRegion( + regionName); + assertThat(region).isNotNull(); + region.put(entry.getKey(), entry.getValue()); + } + }); + } + // Publishes portfolios void publishEntries(VM publisherClientVM, final String regionName, final int numEntries) { publisherClientVM.invoke("publish " + numEntries + " entries", new CacheSerializableRunnable() { diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientTestCase.java b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientTestCase.java index d928b22d756b..d3a494236978 100755 --- a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientTestCase.java +++ b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableClientTestCase.java @@ -26,6 +26,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.hamcrest.Matchers.notNullValue; +import java.util.AbstractMap; +import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit; @@ -62,7 +64,7 @@ public class DurableClientTestCase extends DurableClientTestBase { */ @Test public void testSimpleDurableClient() { - startupDurableClientAndServer(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT); + startupDurableClientAndServer(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, true); // Stop the server server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); @@ -142,6 +144,106 @@ regionName, getClientDistributedSystemProperties(durableClientId, } + /** + * Test bug fix where all events received between registerInterests and readyForEvents are + * discarded by the client. + */ + @Test + public void testEventsReceivedBetweenRegisterInterestsAndReadyForEventsAreStoredInClientCache() { + Map.Entry entry = new AbstractMap.SimpleEntry<>("1", "1"); + + startupDurableClientAndServer(VERY_LONG_DURABLE_TIMEOUT_SECONDS, false); + + registerInterest(this.durableClientVM, regionName, true, InterestResultPolicy.NONE); + + // Start normal publisher client + this.publisherClientVM.invoke(() -> CacheServerTestUtil.createCacheClient( + getClientPool(NetworkUtils.getServerHostName(), server1Port, + false), + regionName)); + + publishEntry(entry); + + // Wait until queue count is 1 on server1VM + waitUntilQueueContainsRequiredNumberOfEvents(this.server1VM, 1); + + sendClientReady(durableClientVM); + + // Wait until queue count is 0 on server1VM + waitUntilQueueContainsRequiredNumberOfEvents(this.server1VM, 0); + + // Verify the durable client received the updates + this.checkListenerEvents(1, 1, -1, this.durableClientVM); + + verifyEntryPresentInCache(entry); + + // Stop the durable client + this.durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); + + // Stop the server + this.server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); + } + + @Test + public void testEventsConflictResolution() throws InterruptedException { + Map.Entry entryOld = new AbstractMap.SimpleEntry<>("1", "1"); + Map.Entry entryNew = new AbstractMap.SimpleEntry<>("1", "2"); + + startupDurableClientAndServer(VERY_LONG_DURABLE_TIMEOUT_SECONDS, false); + + registerInterestAll(this.durableClientVM, regionName, true, InterestResultPolicy.KEYS_VALUES); + + // Start normal publisher client + this.publisherClientVM.invoke(() -> CacheServerTestUtil.createCacheClient( + getClientPool(NetworkUtils.getServerHostName(), server1Port, + false), + regionName)); + + // Publish some entries + publishEntry(entryOld); + publishEntry(entryNew); + + // Wait until queue count is 0 on server1VM + waitUntilQueueContainsRequiredNumberOfEvents(this.server1VM, 2); + + // Stop the durable client + this.disconnectDurableClient(true); + + // Re-start the durable client + this.restartDurableClient(VERY_LONG_DURABLE_TIMEOUT_SECONDS, Boolean.TRUE, false); + + // Verify durable client on server + verifyDurableClientPresent(VERY_LONG_DURABLE_TIMEOUT_SECONDS, durableClientId, server1VM); + + // Start normal publisher client + this.publisherClientVM.invoke(() -> CacheServerTestUtil.createCacheClient( + getClientPool(NetworkUtils.getServerHostName(), server1Port, + false), + regionName)); + + registerInterestAll(this.durableClientVM, regionName, true, InterestResultPolicy.KEYS_VALUES); + + // register interest will fetch current state of servers cache + verifyEntryPresentInCache(entryNew); + + sendClientReady(durableClientVM); + + // Wait until queue count is 1 on server1VM + waitUntilQueueContainsRequiredNumberOfEvents(this.server1VM, 0); + + // Verify the durable client received the updates + this.checkListenerEvents(1, 1, -1, this.durableClientVM); + + // verify that old events do not overwrite the new ones + verifyEntryPresentInCache(entryNew); + + // Stop the durable client + this.durableClientVM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); + + // Stop the server + this.server1VM.invoke((SerializableRunnableIF) CacheServerTestUtil::closeCache); + } + /** * Test that starting, stopping then restarting a durable client is correctly processed by the * server. @@ -149,7 +251,7 @@ regionName, getClientDistributedSystemProperties(durableClientId, @Test public void testStartStopStartDurableClient() { - startupDurableClientAndServer(VERY_LONG_DURABLE_TIMEOUT_SECONDS); + startupDurableClientAndServer(VERY_LONG_DURABLE_TIMEOUT_SECONDS, true); // Stop the durable client disconnectDurableClient(true); @@ -175,7 +277,7 @@ public void testStartStopStartDurableClient() { */ @Test public void test39630() { - startupDurableClientAndServer(VERY_LONG_DURABLE_TIMEOUT_SECONDS); + startupDurableClientAndServer(VERY_LONG_DURABLE_TIMEOUT_SECONDS, true); // Stop the durable client disconnectDurableClient(true); @@ -209,7 +311,7 @@ public void test39630() { public void testStartStopTimeoutDurableClient() { final int durableClientTimeout = 5; - startupDurableClientAndServer(durableClientTimeout); + startupDurableClientAndServer(durableClientTimeout, true); // Stop the durable client disconnectDurableClient(true); @@ -239,7 +341,7 @@ public void run2() throws CacheException { */ @Test public void testDurableClientPrimaryUpdate() { - startupDurableClientAndServer(VERY_LONG_DURABLE_TIMEOUT_SECONDS); + startupDurableClientAndServer(VERY_LONG_DURABLE_TIMEOUT_SECONDS, true); registerInterest(durableClientVM, regionName, true, InterestResultPolicy.NONE); @@ -299,7 +401,7 @@ public void testDurableClientPrimaryUpdate() { */ @Test public void testStartStopStartDurableClientUpdate() { - startupDurableClientAndServer(VERY_LONG_DURABLE_TIMEOUT_SECONDS); + startupDurableClientAndServer(VERY_LONG_DURABLE_TIMEOUT_SECONDS, true); // Have the durable client register interest in all keys registerInterest(durableClientVM, regionName, true, InterestResultPolicy.NONE); diff --git a/geode-dunit/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheServerTestUtil.java b/geode-dunit/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheServerTestUtil.java index 57cc3c42c4f6..30435c396bd2 100755 --- a/geode-dunit/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheServerTestUtil.java +++ b/geode-dunit/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheServerTestUtil.java @@ -326,7 +326,7 @@ public static void createCacheServer(String regionName, Boolean notifyBySubscrip new CacheServerTestUtil().createCache(props); AttributesFactory factory = new AttributesFactory<>(); factory.setScope(Scope.DISTRIBUTED_ACK); - factory.setEnableBridgeConflation(true); + factory.setEnableBridgeConflation(false); factory.setDataPolicy(DataPolicy.REPLICATE); RegionAttributes attrs = factory.create(); cache.createRegion(regionName, attrs);