1717import static org .junit .jupiter .api .Assertions .assertTrue ;
1818import static org .mockito .Mockito .*;
1919
20- import io .lettuce .core .protocol .AsyncCommand ;
21- import io .lettuce .core .pubsub .api .async .RedisPubSubAsyncCommands ;
22- import io .lettuce .core .pubsub .PubSubOutput ;
23- import io .lettuce .core .pubsub .RedisPubSubAdapter ;
24-
2520import java .lang .reflect .Field ;
26- import java .nio .ByteBuffer ;
2721import java .time .Duration ;
2822import java .util .Arrays ;
2923import java .util .HashSet ;
@@ -132,74 +126,27 @@ void resubscribeChannelAndPatternAndShardChanelSubscription() {
132126
133127 @ Test
134128 void autoResubscribeListenerIsRegistered () {
135- // Verify that the connection has the markIntentionalUnsubscribe method
136- // This confirms the auto-resubscribe functionality is available
137129 connection .markIntentionalUnsubscribe ("test-channel" );
138- // If no exception is thrown, the method exists and works
139130 assertTrue (true );
140131 }
141132
142133 @ Test
143134 void intentionalUnsubscribeBypassesAutoResubscribe () throws Exception {
144- // Test 1: Intentional unsubscribe should NOT trigger auto-resubscribe
145-
146- // Create a mock async commands to verify ssubscribe is NOT called
147- RedisPubSubAsyncCommands <String , String > mockAsync = mock (RedisPubSubAsyncCommands .class );
148- StatefulRedisPubSubConnectionImpl <String , String > spyConnection = spy (connection );
149- when (spyConnection .async ()).thenReturn (mockAsync );
150-
151- // Mark the channel as intentionally unsubscribed
152- spyConnection .markIntentionalUnsubscribe ("test-channel" );
153-
154- // Use reflection to access the private endpoint and trigger sunsubscribed event
155- PubSubEndpoint <String , String > endpoint = getEndpointViaReflection (spyConnection );
156- PubSubOutput <String , String > sunsubscribeMessage = createSunsubscribeMessage ("test-channel" , codec );
157- endpoint .notifyMessage (sunsubscribeMessage );
135+ connection .markIntentionalUnsubscribe ("test-channel" );
158136
159- // Wait a moment for any async processing
160- Thread .sleep (50 );
137+ RedisPubSubListener <String , String > autoResubscribeListener = getAutoResubscribeListener (connection );
161138
162- // Verify that ssubscribe was NOT called (intentional unsubscribe bypassed auto-resubscribe)
163- verify (mockAsync , never ()).ssubscribe ( "test-channel" );
139+ autoResubscribeListener . sunsubscribed ( "test-channel" , 0 );
140+ verify (mockedWriter , never ()).write ( any ( io . lettuce . core . protocol . RedisCommand . class ) );
164141 }
165142
166143 @ Test
167144 void unintentionalUnsubscribeTriggersAutoResubscribe () throws Exception {
168- // Test 2: Unintentional unsubscribe (from Redis) should trigger auto-resubscribe
169-
170- // Create a fresh connection with a mock async
171- PubSubEndpoint <String , String > mockEndpoint = mock (PubSubEndpoint .class );
172- StatefulRedisPubSubConnectionImpl <String , String > testConnection = new StatefulRedisPubSubConnectionImpl <>(mockEndpoint ,
173- mockedWriter , codec , timeout );
174-
175- // Create a mock async commands to verify ssubscribe IS called
176- RedisPubSubAsyncCommands <String , String > mockAsync = mock (RedisPubSubAsyncCommands .class );
177- @ SuppressWarnings ("unchecked" )
178- RedisFuture <Void > mockFuture = mock (RedisFuture .class );
179- when (mockAsync .ssubscribe ("test-channel" )).thenReturn (mockFuture );
180-
181- StatefulRedisPubSubConnectionImpl <String , String > spyConnection = spy (testConnection );
182- when (spyConnection .async ()).thenReturn (mockAsync );
145+ RedisPubSubListener <String , String > autoResubscribeListener = getAutoResubscribeListener (connection );
183146
184- // Get the auto-resubscribe listener directly and trigger it
185- RedisPubSubListener <String , String > autoResubscribeListener = getAutoResubscribeListener (spyConnection );
186-
187- // Do NOT mark as intentional - simulate Redis server sunsubscribe during slot movement
188147 autoResubscribeListener .sunsubscribed ("test-channel" , 0 );
189148
190- // Wait a moment for async processing
191- Thread .sleep (50 );
192-
193- // Verify that ssubscribe WAS called (auto-resubscribe triggered)
194- verify (mockAsync , times (1 )).ssubscribe ("test-channel" );
195- }
196-
197- @ SuppressWarnings ("unchecked" )
198- private PubSubEndpoint <String , String > getEndpointViaReflection (
199- StatefulRedisPubSubConnectionImpl <String , String > connection ) throws Exception {
200- Field endpointField = StatefulRedisPubSubConnectionImpl .class .getDeclaredField ("endpoint" );
201- endpointField .setAccessible (true );
202- return (PubSubEndpoint <String , String >) endpointField .get (connection );
149+ verify (mockedWriter , times (1 )).write (any (io .lettuce .core .protocol .RedisCommand .class ));
203150 }
204151
205152 @ SuppressWarnings ("unchecked" )
@@ -209,13 +156,4 @@ private RedisPubSubListener<String, String> getAutoResubscribeListener(
209156 listenerField .setAccessible (true );
210157 return (RedisPubSubListener <String , String >) listenerField .get (connection );
211158 }
212-
213- private PubSubOutput <String , String > createSunsubscribeMessage (String channel , RedisCodec <String , String > codec ) {
214- PubSubOutput <String , String > output = new PubSubOutput <>(codec );
215- output .set (ByteBuffer .wrap ("sunsubscribe" .getBytes ()));
216- output .set (ByteBuffer .wrap (channel .getBytes ()));
217- output .set (0L ); // count
218- return output ;
219- }
220-
221159}
0 commit comments