diff --git a/lib/src/main/java/io/ably/lib/objects/Adapter.java b/lib/src/main/java/io/ably/lib/objects/Adapter.java index de6afbe3d..804fa59c8 100644 --- a/lib/src/main/java/io/ably/lib/objects/Adapter.java +++ b/lib/src/main/java/io/ably/lib/objects/Adapter.java @@ -1,8 +1,11 @@ package io.ably.lib.objects; import io.ably.lib.realtime.AblyRealtime; +import io.ably.lib.realtime.ChannelState; import io.ably.lib.realtime.CompletionListener; import io.ably.lib.types.AblyException; +import io.ably.lib.types.ChannelMode; +import io.ably.lib.types.ChannelOptions; import io.ably.lib.types.ProtocolMessage; import io.ably.lib.util.Log; import org.jetbrains.annotations.NotNull; @@ -34,4 +37,32 @@ public void send(@NotNull ProtocolMessage msg, @NotNull CompletionListener liste public int maxMessageSizeLimit() { return ably.connection.connectionManager.maxMessageSize; } + + @Override + public ChannelMode[] getChannelModes(@NotNull String channelName) { + if (ably.channels.containsKey(channelName)) { + // RTO2a - channel.modes is only populated on channel attachment, so use it only if it is set + ChannelMode[] modes = ably.channels.get(channelName).getModes(); + if (modes != null) { + return modes; + } + // RTO2b - otherwise as a best effort use user provided channel options + ChannelOptions options = ably.channels.get(channelName).getOptions(); + if (options != null && options.hasModes()) { + return options.modes; + } + return null; + } + Log.e(TAG, "getChannelMode(): channel not found: " + channelName); + return null; + } + + @Override + public ChannelState getChannelState(@NotNull String channelName) { + if (ably.channels.containsKey(channelName)) { + return ably.channels.get(channelName).state; + } + Log.e(TAG, "getChannelState(): channel not found: " + channelName); + return null; + } } diff --git a/lib/src/main/java/io/ably/lib/objects/LiveCounter.java b/lib/src/main/java/io/ably/lib/objects/LiveCounter.java index 2339fcb4f..81ef13f37 100644 --- a/lib/src/main/java/io/ably/lib/objects/LiveCounter.java +++ b/lib/src/main/java/io/ably/lib/objects/LiveCounter.java @@ -1,6 +1,5 @@ package io.ably.lib.objects; -import io.ably.lib.types.Callback; import org.jetbrains.annotations.Blocking; import org.jetbrains.annotations.NonBlocking; import org.jetbrains.annotations.NotNull; @@ -33,7 +32,7 @@ public interface LiveCounter { * @param callback the callback to be invoked upon completion of the operation. */ @NonBlocking - void incrementAsync(@NotNull Callback callback); + void incrementAsync(@NotNull ObjectsCallback callback); /** * Decrements the value of the counter by 1. @@ -49,7 +48,7 @@ public interface LiveCounter { * @param callback the callback to be invoked upon completion of the operation. */ @NonBlocking - void decrementAsync(@NotNull Callback callback); + void decrementAsync(@NotNull ObjectsCallback callback); /** * Retrieves the current value of the counter. diff --git a/lib/src/main/java/io/ably/lib/objects/LiveMap.java b/lib/src/main/java/io/ably/lib/objects/LiveMap.java index 7a964dc90..ae1299dd4 100644 --- a/lib/src/main/java/io/ably/lib/objects/LiveMap.java +++ b/lib/src/main/java/io/ably/lib/objects/LiveMap.java @@ -1,6 +1,5 @@ package io.ably.lib.objects; -import io.ably.lib.types.Callback; import org.jetbrains.annotations.Blocking; import org.jetbrains.annotations.NonBlocking; import org.jetbrains.annotations.Contract; @@ -24,6 +23,7 @@ public interface LiveMap { * If the value associated with the provided key is an objectId string of another LiveObject, a reference to that LiveObject * is returned, provided it exists in the local pool and is not tombstoned. Otherwise, null is returned. * If the value is not an objectId, then that value is returned. + * Spec: RTLM5, RTLM5a * * @param keyName the key whose associated value is to be returned. * @return the value associated with the specified key, or null if the key does not exist. @@ -33,6 +33,7 @@ public interface LiveMap { /** * Retrieves all entries (key-value pairs) in the map. + * Spec: RTLM11, RTLM11a * * @return an iterable collection of all entries in the map. */ @@ -42,6 +43,7 @@ public interface LiveMap { /** * Retrieves all keys in the map. + * Spec: RTLM12, RTLM12a * * @return an iterable collection of all keys in the map. */ @@ -51,6 +53,7 @@ public interface LiveMap { /** * Retrieves all values in the map. + * Spec: RTLM13, RTLM13a * * @return an iterable collection of all values in the map. */ @@ -85,6 +88,7 @@ public interface LiveMap { /** * Retrieves the number of entries in the map. + * Spec: RTLM10, RTLM10a * * @return the size of the map. */ @@ -104,7 +108,7 @@ public interface LiveMap { * @param callback the callback to handle the result or any errors. */ @NonBlocking - void setAsync(@NotNull String keyName, @NotNull Object value, @NotNull Callback callback); + void setAsync(@NotNull String keyName, @NotNull Object value, @NotNull ObjectsCallback callback); /** * Asynchronously removes the specified key and its associated value from the map. @@ -117,5 +121,5 @@ public interface LiveMap { * @param callback the callback to handle the result or any errors. */ @NonBlocking - void removeAsync(@NotNull String keyName, @NotNull Callback callback); + void removeAsync(@NotNull String keyName, @NotNull ObjectsCallback callback); } diff --git a/lib/src/main/java/io/ably/lib/objects/LiveObjects.java b/lib/src/main/java/io/ably/lib/objects/LiveObjects.java index adf05df6e..ac5b2c919 100644 --- a/lib/src/main/java/io/ably/lib/objects/LiveObjects.java +++ b/lib/src/main/java/io/ably/lib/objects/LiveObjects.java @@ -1,6 +1,6 @@ package io.ably.lib.objects; -import io.ably.lib.types.Callback; +import io.ably.lib.objects.state.ObjectsStateChange; import org.jetbrains.annotations.Blocking; import org.jetbrains.annotations.NonBlocking; import org.jetbrains.annotations.NotNull; @@ -16,7 +16,7 @@ *

Implementations of this interface must be thread-safe as they may be accessed * from multiple threads concurrently. */ -public interface LiveObjects { +public interface LiveObjects extends ObjectsStateChange { /** * Retrieves the root LiveMap object. @@ -95,7 +95,7 @@ public interface LiveObjects { * @param callback the callback to handle the result or error. */ @NonBlocking - void getRootAsync(@NotNull Callback<@NotNull LiveMap> callback); + void getRootAsync(@NotNull ObjectsCallback<@NotNull LiveMap> callback); /** * Asynchronously creates a new LiveMap based on an existing LiveMap. @@ -108,7 +108,7 @@ public interface LiveObjects { * @param callback the callback to handle the result or error. */ @NonBlocking - void createMapAsync(@NotNull LiveMap liveMap, @NotNull Callback<@NotNull LiveMap> callback); + void createMapAsync(@NotNull LiveMap liveMap, @NotNull ObjectsCallback<@NotNull LiveMap> callback); /** * Asynchronously creates a new LiveMap based on a LiveCounter. @@ -121,7 +121,7 @@ public interface LiveObjects { * @param callback the callback to handle the result or error. */ @NonBlocking - void createMapAsync(@NotNull LiveCounter liveCounter, @NotNull Callback<@NotNull LiveMap> callback); + void createMapAsync(@NotNull LiveCounter liveCounter, @NotNull ObjectsCallback<@NotNull LiveMap> callback); /** * Asynchronously creates a new LiveMap based on a standard Java Map. @@ -134,7 +134,7 @@ public interface LiveObjects { * @param callback the callback to handle the result or error. */ @NonBlocking - void createMapAsync(@NotNull Map map, @NotNull Callback<@NotNull LiveMap> callback); + void createMapAsync(@NotNull Map map, @NotNull ObjectsCallback<@NotNull LiveMap> callback); /** * Asynchronously creates a new LiveCounter with an initial value. @@ -147,5 +147,5 @@ public interface LiveObjects { * @param callback the callback to handle the result or error. */ @NonBlocking - void createCounterAsync(@NotNull Long initialValue, @NotNull Callback<@NotNull LiveCounter> callback); + void createCounterAsync(@NotNull Long initialValue, @NotNull ObjectsCallback<@NotNull LiveCounter> callback); } diff --git a/lib/src/main/java/io/ably/lib/objects/LiveObjectsAdapter.java b/lib/src/main/java/io/ably/lib/objects/LiveObjectsAdapter.java index e6b1f2204..690bc7495 100644 --- a/lib/src/main/java/io/ably/lib/objects/LiveObjectsAdapter.java +++ b/lib/src/main/java/io/ably/lib/objects/LiveObjectsAdapter.java @@ -1,9 +1,12 @@ package io.ably.lib.objects; +import io.ably.lib.realtime.ChannelState; import io.ably.lib.realtime.CompletionListener; import io.ably.lib.types.AblyException; +import io.ably.lib.types.ChannelMode; import io.ably.lib.types.ProtocolMessage; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; public interface LiveObjectsAdapter { /** @@ -31,5 +34,24 @@ public interface LiveObjectsAdapter { * @return the maximum message size limit in bytes. */ int maxMessageSizeLimit(); + + /** + * Retrieves the channel modes for a specific channel. + * This method returns the modes that are set for the specified channel. + * + * @param channelName the name of the channel for which to retrieve the modes + * @return the array of channel modes for the specified channel, or null if the channel is not found + * Spec: RTO2a, RTO2b + */ + @Nullable ChannelMode[] getChannelModes(@NotNull String channelName); + + /** + * Retrieves the current state of a specific channel. + * This method returns the state of the specified channel, which indicates its connection status. + * + * @param channelName the name of the channel for which to retrieve the state + * @return the current state of the specified channel, or null if the channel is not found + */ + @Nullable ChannelState getChannelState(@NotNull String channelName); } diff --git a/lib/src/main/java/io/ably/lib/objects/LiveObjectsPlugin.java b/lib/src/main/java/io/ably/lib/objects/LiveObjectsPlugin.java index 81156d654..392b9f1df 100644 --- a/lib/src/main/java/io/ably/lib/objects/LiveObjectsPlugin.java +++ b/lib/src/main/java/io/ably/lib/objects/LiveObjectsPlugin.java @@ -46,6 +46,7 @@ public interface LiveObjectsPlugin { * Disposes of the LiveObjects instance associated with the specified channel name. * This method removes the LiveObjects instance for the given channel, releasing any * resources associated with it. + * This is invoked when ablyRealtimeClient.channels.release(channelName) is called * * @param channelName the name of the channel whose LiveObjects instance is to be removed. */ @@ -53,6 +54,7 @@ public interface LiveObjectsPlugin { /** * Disposes of the plugin instance and all underlying resources. + * This is invoked when ablyRealtimeClient.close() is called */ void dispose(); } diff --git a/lib/src/main/java/io/ably/lib/objects/ObjectsCallback.java b/lib/src/main/java/io/ably/lib/objects/ObjectsCallback.java new file mode 100644 index 000000000..f6614918f --- /dev/null +++ b/lib/src/main/java/io/ably/lib/objects/ObjectsCallback.java @@ -0,0 +1,31 @@ +package io.ably.lib.objects; + +import io.ably.lib.types.AblyException; + +/** + * Callback interface for handling results of asynchronous LiveObjects operations. + * Used for operations like creating LiveMaps/LiveCounters, modifying entries, and retrieving objects. + * Callbacks are executed on background threads managed by the LiveObjects system. + * + * @param the type of the result returned by the asynchronous operation + */ +public interface ObjectsCallback { + + /** + * Called when the asynchronous operation completes successfully. + * For modification operations (set, remove, increment), result is typically Void. + * For creation/retrieval operations, result contains the created/retrieved object. + * + * @param result the result of the operation, may be null for modification operations + */ + void onSuccess(T result); + + /** + * Called when the asynchronous operation fails. + * The exception contains detailed error information including error codes and messages. + * Common errors include network issues, authentication failures, and validation errors. + * + * @param exception the exception that occurred during the operation + */ + void onError(AblyException exception); +} diff --git a/lib/src/main/java/io/ably/lib/objects/ObjectsSubscription.java b/lib/src/main/java/io/ably/lib/objects/ObjectsSubscription.java new file mode 100644 index 000000000..d6d007ecd --- /dev/null +++ b/lib/src/main/java/io/ably/lib/objects/ObjectsSubscription.java @@ -0,0 +1,22 @@ +package io.ably.lib.objects; + +/** + * Represents a objects subscription that can be unsubscribed from. + * This interface provides a way to clean up and remove subscriptions when they are no longer needed. + * Example usage: + *

+ * {@code
+ * ObjectsSubscription s = objects.subscribe(ObjectsStateEvent.SYNCING, new ObjectsStateListener() {});
+ * // Later when done with the subscription
+ * s.unsubscribe();
+ * }
+ * 
+ */ +public interface ObjectsSubscription { + /** + * This method should be called when the subscription is no longer needed, + * it will make sure no further events will be sent to the subscriber and + * that references to the subscriber are cleaned up. + */ + void unsubscribe(); +} diff --git a/lib/src/main/java/io/ably/lib/objects/state/ObjectsStateChange.java b/lib/src/main/java/io/ably/lib/objects/state/ObjectsStateChange.java new file mode 100644 index 000000000..7b3a7e1e3 --- /dev/null +++ b/lib/src/main/java/io/ably/lib/objects/state/ObjectsStateChange.java @@ -0,0 +1,56 @@ +package io.ably.lib.objects.state; + +import io.ably.lib.objects.ObjectsSubscription; +import org.jetbrains.annotations.NonBlocking; +import org.jetbrains.annotations.NotNull; + +public interface ObjectsStateChange { + /** + * Subscribes to a specific Live Objects synchronization state event. + * + *

This method registers the provided listener to be notified when the specified + * synchronization state event occurs. The returned subscription can be used to + * unsubscribe later when the notifications are no longer needed. + * + * @param event the synchronization state event to subscribe to (SYNCING or SYNCED) + * @param listener the listener that will be called when the event occurs + * @return a subscription object that can be used to unsubscribe from the event + */ + @NonBlocking + ObjectsSubscription on(@NotNull ObjectsStateEvent event, @NotNull ObjectsStateChange.Listener listener); + + /** + * Unsubscribes the specified listener from all synchronization state events. + * + *

After calling this method, the provided listener will no longer receive + * any synchronization state event notifications. + * + * @param listener the listener to unregister from all events + */ + @NonBlocking + void off(@NotNull ObjectsStateChange.Listener listener); + + /** + * Unsubscribes all listeners from all synchronization state events. + * + *

After calling this method, no listeners will receive any synchronization + * state event notifications until new listeners are registered. + */ + @NonBlocking + void offAll(); + + /** + * Interface for receiving notifications about Live Objects synchronization state changes. + *

+ * Implement this interface and register it with an ObjectsStateEmitter to be notified + * when synchronization state transitions occur. + */ + interface Listener { + /** + * Called when the synchronization state changes. + * + * @param objectsStateEvent The new state event (SYNCING or SYNCED) + */ + void onStateChanged(ObjectsStateEvent objectsStateEvent); + } +} diff --git a/lib/src/main/java/io/ably/lib/objects/state/ObjectsStateEvent.java b/lib/src/main/java/io/ably/lib/objects/state/ObjectsStateEvent.java new file mode 100644 index 000000000..4fa01a173 --- /dev/null +++ b/lib/src/main/java/io/ably/lib/objects/state/ObjectsStateEvent.java @@ -0,0 +1,19 @@ +package io.ably.lib.objects.state; + +/** + * Represents the synchronization state of Ably Live Objects. + *

+ * This enum is used to notify listeners about state changes in the synchronization process. + * Clients can register an {@link ObjectsStateChange.Listener} to receive these events. + */ +public enum ObjectsStateEvent { + /** + * Indicates that synchronization between local and remote objects is in progress. + */ + SYNCING, + + /** + * Indicates that synchronization has completed successfully and objects are in sync. + */ + SYNCED +} diff --git a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java index 3f7062d36..4dd78c8eb 100644 --- a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java +++ b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java @@ -1289,6 +1289,10 @@ public ChannelMode[] getModes() { return modes.toArray(new ChannelMode[modes.size()]); } + public ChannelOptions getOptions() { + return options; + } + /************************************ * internal general * @throws AblyException @@ -1329,6 +1333,9 @@ else if(stateChange.current.equals(failureState)) { state = ChannelState.initialized; this.decodingContext = new DecodingContext(); this.liveObjectsPlugin = liveObjectsPlugin; + if (liveObjectsPlugin != null) { + liveObjectsPlugin.getInstance(name); // Make objects instance ready to process sync messages + } this.annotations = new RealtimeAnnotations( this, new RestAnnotations(name, ably.http, ably.options, options) diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt index 45177fa94..d449404ce 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt @@ -1,21 +1,15 @@ package io.ably.lib.objects +import io.ably.lib.objects.state.ObjectsStateChange +import io.ably.lib.objects.state.ObjectsStateEvent import io.ably.lib.realtime.ChannelState -import io.ably.lib.types.Callback +import io.ably.lib.types.AblyException import io.ably.lib.types.ProtocolMessage import io.ably.lib.util.Log import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED import kotlinx.coroutines.flow.MutableSharedFlow - -/** - * @spec RTO2 - enum representing objects state - */ -internal enum class ObjectsState { - INITIALIZED, - SYNCING, - SYNCED -} +import java.util.concurrent.CancellationException /** * Default implementation of LiveObjects interface. @@ -28,7 +22,7 @@ internal class DefaultLiveObjects(internal val channelName: String, internal val */ internal val objectsPool = ObjectsPool(this) - internal var state = ObjectsState.INITIALIZED + internal var state = ObjectsState.Initialized /** * @spec RTO4 - Used for handling object messages and object sync messages @@ -43,20 +37,21 @@ internal class DefaultLiveObjects(internal val channelName: String, internal val /** * Event bus for handling incoming object messages sequentially. + * Processes messages inside [incomingObjectsHandler] job created using [sequentialScope]. */ private val objectsEventBus = MutableSharedFlow(extraBufferCapacity = UNLIMITED) private val incomingObjectsHandler: Job + /** + * Provides a channel-specific scope for safely executing asynchronous operations with callbacks. + */ + private val asyncScope = ObjectsAsyncScope(channelName) + init { incomingObjectsHandler = initializeHandlerForIncomingObjectMessages() } - /** - * @spec RTO1 - Returns the root LiveMap object with proper validation and sync waiting - */ - override fun getRoot(): LiveMap { - TODO("Not yet implemented") - } + override fun getRoot(): LiveMap = runBlocking { getRootAsync() } override fun createMap(liveMap: LiveMap): LiveMap { TODO("Not yet implemented") @@ -70,23 +65,23 @@ internal class DefaultLiveObjects(internal val channelName: String, internal val TODO("Not yet implemented") } - override fun getRootAsync(callback: Callback) { - TODO("Not yet implemented") + override fun getRootAsync(callback: ObjectsCallback) { + asyncScope.launchWithCallback(callback) { getRootAsync() } } - override fun createMapAsync(liveMap: LiveMap, callback: Callback) { + override fun createMapAsync(liveMap: LiveMap, callback: ObjectsCallback) { TODO("Not yet implemented") } - override fun createMapAsync(liveCounter: LiveCounter, callback: Callback) { + override fun createMapAsync(liveCounter: LiveCounter, callback: ObjectsCallback) { TODO("Not yet implemented") } - override fun createMapAsync(map: MutableMap, callback: Callback) { + override fun createMapAsync(map: MutableMap, callback: ObjectsCallback) { TODO("Not yet implemented") } - override fun createCounterAsync(initialValue: Long, callback: Callback) { + override fun createCounterAsync(initialValue: Long, callback: ObjectsCallback) { TODO("Not yet implemented") } @@ -94,6 +89,19 @@ internal class DefaultLiveObjects(internal val channelName: String, internal val TODO("Not yet implemented") } + override fun on(event: ObjectsStateEvent, listener: ObjectsStateChange.Listener): ObjectsSubscription = + objectsManager.on(event, listener) + + override fun off(listener: ObjectsStateChange.Listener) = objectsManager.off(listener) + + override fun offAll() = objectsManager.offAll() + + private suspend fun getRootAsync(): LiveMap = withContext(sequentialScope.coroutineContext) { + adapter.throwIfInvalidAccessApiConfiguration(channelName) + objectsManager.ensureSynced(state) + objectsPool.get(ROOT_OBJECT_ID) as LiveMap + } + /** * Handles a ProtocolMessage containing proto action as `object` or `object_sync`. * @spec RTL1 - Processes incoming object messages and object sync messages @@ -153,7 +161,7 @@ internal class DefaultLiveObjects(internal val channelName: String, internal val Log.v(tag, "Objects.onAttached() channel=$channelName, hasObjects=$hasObjects") // RTO4a - val fromInitializedState = this@DefaultLiveObjects.state == ObjectsState.INITIALIZED + val fromInitializedState = this@DefaultLiveObjects.state == ObjectsState.Initialized if (hasObjects || fromInitializedState) { // should always start a new sync sequence if we're in the initialized state, no matter the HAS_OBJECTS flag value. // this guarantees we emit both "syncing" -> "synced" events in that order. @@ -186,29 +194,14 @@ internal class DefaultLiveObjects(internal val channelName: String, internal val } } - /** - * Changes the state and emits events. - * - * @spec RTO2 - Emits state change events for syncing and synced states - */ - internal fun stateChange(newState: ObjectsState, deferEvent: Boolean) { - if (state == newState) { - return - } - - state = newState - Log.v(tag, "Objects state changed to: $newState") - - // TODO: Emit state change events - } - // Dispose of any resources associated with this LiveObjects instance - fun dispose(reason: String) { - val cancellationError = CancellationException("Objects disposed for channel $channelName, reason: $reason") - incomingObjectsHandler.cancel(cancellationError) // objectsEventBus automatically garbage collected when collector is cancelled + fun dispose(cause: AblyException) { + val disposeReason = CancellationException().apply { initCause(cause) } + incomingObjectsHandler.cancel(disposeReason) // objectsEventBus automatically garbage collected when collector is cancelled objectsPool.dispose() objectsManager.dispose() - // Don't cancel sequentialScope (needed in public methods), just cancel ongoing coroutines - sequentialScope.coroutineContext.cancelChildren(cancellationError) + // Don't cancel sequentialScope (needed in getRoot method), just cancel ongoing coroutines + sequentialScope.coroutineContext.cancelChildren(disposeReason) + asyncScope.cancel(disposeReason) // cancel all ongoing callbacks } } diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjectsPlugin.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjectsPlugin.kt index f3f2e71a4..66cab1d30 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjectsPlugin.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjectsPlugin.kt @@ -22,13 +22,13 @@ public class DefaultLiveObjectsPlugin(private val adapter: LiveObjectsAdapter) : } override fun dispose(channelName: String) { - liveObjects[channelName]?.dispose("Channel has ben released using channels.release()") + liveObjects[channelName]?.dispose(clientError("Channel has been released using channels.release()")) liveObjects.remove(channelName) } override fun dispose() { liveObjects.values.forEach { - it.dispose("AblyClient has been closed using client.close()") + it.dispose(clientError("AblyClient has been closed using client.close()")) } liveObjects.clear() } diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/ErrorCodes.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/ErrorCodes.kt index 35b6c3ad2..5608491a3 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/ErrorCodes.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/ErrorCodes.kt @@ -8,6 +8,9 @@ internal enum class ErrorCode(public val code: Int) { // LiveMap specific error codes MapKeyShouldBeString(40_003), MapValueDataTypeUnsupported(40_013), + // Channel mode and state validation error codes + ChannelModeRequired(40_024), + ChannelStateError(90_001), } internal enum class HttpStatusCode(public val code: Int) { diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt index 5f17027b4..8dbd86bad 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt @@ -1,6 +1,8 @@ package io.ably.lib.objects +import io.ably.lib.realtime.ChannelState import io.ably.lib.realtime.CompletionListener +import io.ably.lib.types.ChannelMode import io.ably.lib.types.ErrorInfo import io.ably.lib.types.ProtocolMessage import kotlinx.coroutines.suspendCancellableCoroutine @@ -39,6 +41,27 @@ internal fun LiveObjectsAdapter.setChannelSerial(channelName: String, protocolMe setChannelSerial(channelName, channelSerial) } +internal fun LiveObjectsAdapter.throwIfInvalidAccessApiConfiguration(channelName: String) { + throwIfMissingChannelMode(channelName, ChannelMode.object_subscribe) + throwIfInChannelState(channelName, arrayOf(ChannelState.detached, ChannelState.failed)) +} + +// Spec: RTO2 +internal fun LiveObjectsAdapter.throwIfMissingChannelMode(channelName: String, channelMode: ChannelMode) { + val channelModes = getChannelModes(channelName) + if (channelModes == null || !channelModes.contains(channelMode)) { + // Spec: RTO2a2, RTO2b2 + throw ablyException("\"${channelMode.name}\" channel mode must be set for this operation", ErrorCode.ChannelModeRequired) + } +} + +internal fun LiveObjectsAdapter.throwIfInChannelState(channelName: String, channelStates: Array) { + val currentState = getChannelState(channelName) + if (currentState == null || channelStates.contains(currentState)) { + throw ablyException("Channel is in invalid state: $currentState", ErrorCode.ChannelStateError) + } +} + internal class Binary(val data: ByteArray) { override fun equals(other: Any?): Boolean { if (this === other) return true diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt index fe201e081..206ebc71c 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt @@ -9,7 +9,7 @@ import io.ably.lib.util.Log * @spec RTO5 - Processes OBJECT and OBJECT_SYNC messages during sync sequences * @spec RTO6 - Creates zero-value objects when needed */ -internal class ObjectsManager(private val liveObjects: DefaultLiveObjects) { +internal class ObjectsManager(private val liveObjects: DefaultLiveObjects): ObjectsStateCoordinator() { private val tag = "ObjectsManager" /** * @spec RTO5 - Sync objects data pool for collecting sync messages @@ -27,7 +27,7 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects) { * @spec RTO8 - Buffers messages if not synced, applies immediately if synced */ internal fun handleObjectMessages(objectMessages: List) { - if (liveObjects.state != ObjectsState.SYNCED) { + if (liveObjects.state != ObjectsState.Synced) { // RTO7 - The client receives object messages in realtime over the channel concurrently with the sync sequence. // Some of the incoming object messages may have already been applied to the objects described in // the sync sequence, but others may not; therefore we must buffer these messages so that we can apply @@ -77,7 +77,7 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects) { bufferedObjectOperations.clear() // RTO5a2b syncObjectsDataPool.clear() // RTO5a2a currentSyncId = syncId - liveObjects.stateChange(ObjectsState.SYNCING, false) + stateChange(ObjectsState.Syncing, false) } /** @@ -95,7 +95,7 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects) { bufferedObjectOperations.clear() // RTO5c5 syncObjectsDataPool.clear() // RTO5c4 currentSyncId = null // RTO5c3 - liveObjects.stateChange(ObjectsState.SYNCED, deferStateEvent) + stateChange(ObjectsState.Synced, deferStateEvent) } /** @@ -220,8 +220,25 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects) { } } + /** + * Changes the state and emits events. + * + * @spec RTO2 - Emits state change events for syncing and synced states + */ + private fun stateChange(newState: ObjectsState, deferEvent: Boolean) { + if (liveObjects.state == newState) { + return + } + Log.v(tag, "Objects state changed to: $newState from ${liveObjects.state}") + liveObjects.state = newState + + // deferEvent not needed since objectsStateChanged processes events in a sequential coroutine scope + objectsStateChanged(newState) + } + internal fun dispose() { syncObjectsDataPool.clear() bufferedObjectOperations.clear() + disposeObjectsStateListeners() } } diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsState.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsState.kt new file mode 100644 index 000000000..8ba280e3d --- /dev/null +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsState.kt @@ -0,0 +1,107 @@ +package io.ably.lib.objects + +import io.ably.lib.objects.state.ObjectsStateChange +import io.ably.lib.objects.state.ObjectsStateEvent +import io.ably.lib.util.EventEmitter +import io.ably.lib.util.Log +import kotlinx.coroutines.* + +/** + * @spec RTO2 - enum representing objects state + */ +internal enum class ObjectsState { + Initialized, + Syncing, + Synced +} + +/** + * Maps internal ObjectsState values to their corresponding public ObjectsStateEvent values. + * Used to determine which events should be emitted when state changes occur. + * INITIALIZED maps to null (no event), while SYNCING and SYNCED map to their respective events. + */ +private val objectsStateToEventMap = mapOf( + ObjectsState.Initialized to null, + ObjectsState.Syncing to ObjectsStateEvent.SYNCING, + ObjectsState.Synced to ObjectsStateEvent.SYNCED +) + +/** + * An interface for managing and communicating changes in the synchronization state of live objects. + * + * Implementations should ensure thread-safe event emission and proper synchronization + * between state change notifications. + */ +internal interface HandlesObjectsStateChange { + /** + * Handles changes in the state of live objects by notifying all registered listeners. + * Implementations should ensure thread-safe event emission to both internal and public listeners. + * Makes sure every event is processed in the order they were received. + * @param newState The new state of the objects, SYNCING or SYNCED. + */ + fun objectsStateChanged(newState: ObjectsState) + + /** + * Suspends the current coroutine until objects are synchronized. + * Returns immediately if state is already SYNCED, otherwise waits for the SYNCED event. + * + * @param currentState The current state of objects to determine if waiting is necessary + */ + suspend fun ensureSynced(currentState: ObjectsState) + + /** + * Disposes all registered state change listeners and cancels any pending operations. + * Should be called when the associated LiveObjects instance is no longer needed. + */ + fun disposeObjectsStateListeners() +} + + +internal abstract class ObjectsStateCoordinator : ObjectsStateChange, HandlesObjectsStateChange { + private val tag = "ObjectsStateCoordinator" + private val internalObjectStateEmitter = ObjectsStateEmitter() + // related to RTC10, should have a separate EventEmitter for users of the library + private val externalObjectStateEmitter = ObjectsStateEmitter() + + override fun on(event: ObjectsStateEvent, listener: ObjectsStateChange.Listener): ObjectsSubscription { + externalObjectStateEmitter.on(event, listener) + return ObjectsSubscription { + externalObjectStateEmitter.off(event, listener) + } + } + + override fun off(listener: ObjectsStateChange.Listener) = externalObjectStateEmitter.off(listener) + + override fun offAll() = externalObjectStateEmitter.off() + + override fun objectsStateChanged(newState: ObjectsState) { + objectsStateToEventMap[newState]?.let { objectsStateEvent -> + internalObjectStateEmitter.emit(objectsStateEvent) + externalObjectStateEmitter.emit(objectsStateEvent) + } + } + + override suspend fun ensureSynced(currentState: ObjectsState) { + if (currentState != ObjectsState.Synced) { + val deferred = CompletableDeferred() + internalObjectStateEmitter.once(ObjectsStateEvent.SYNCED) { + Log.v(tag, "Objects state changed to SYNCED, resuming ensureSynced") + deferred.complete(Unit) + } + deferred.await() + } + } + + override fun disposeObjectsStateListeners() = offAll() +} + +private class ObjectsStateEmitter : EventEmitter() { + private val tag = "ObjectsStateEmitter" + override fun apply(listener: ObjectsStateChange.Listener?, event: ObjectsStateEvent?, vararg args: Any?) { + try { + listener?.onStateChanged(event!!) + } catch (t: Throwable) { + Log.e(tag, "Error occurred while executing listener callback for event: $event", t) + } + } +} diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/Utils.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/Utils.kt index 35bd4cefa..2fde867b9 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/Utils.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/Utils.kt @@ -2,6 +2,9 @@ package io.ably.lib.objects import io.ably.lib.types.AblyException import io.ably.lib.types.ErrorInfo +import io.ably.lib.util.Log +import kotlinx.coroutines.* +import java.util.concurrent.CancellationException internal fun ablyException( errorMessage: String, @@ -44,3 +47,57 @@ internal fun objectError(errorMessage: String, cause: Throwable? = null): AblyEx */ internal val String.byteSize: Int get() = this.toByteArray(Charsets.UTF_8).size + +/** + * A channel-specific coroutine scope for executing callbacks asynchronously in the LiveObjects system. + * Provides safe execution of suspend functions with results delivered via callbacks. + * Supports proper error handling and cancellation during LiveObjects disposal. + */ +internal class ObjectsAsyncScope(channelName: String) { + private val tag = "ObjectsCallbackScope-$channelName" + + private val scope = + CoroutineScope(Dispatchers.Default + CoroutineName(tag) + SupervisorJob()) + + internal fun launchWithCallback(callback: ObjectsCallback, block: suspend () -> T) { + scope.launch { + try { + val result = block() + try { callback.onSuccess(result) } catch (t: Throwable) { + Log.e(tag, "Error occurred while executing callback's onSuccess handler", t) + } // catch and don't rethrow error from callback + } catch (throwable: Throwable) { + when (throwable) { + is AblyException -> { callback.onError(throwable) } + else -> { + val ex = ablyException("Error executing operation", ErrorCode.BadRequest, cause = throwable) + callback.onError(ex) + } + } + } + } + } + + internal fun launchWithVoidCallback(callback: ObjectsCallback, block: suspend () -> Unit) { + scope.launch { + try { + block() + try { callback.onSuccess(null) } catch (t: Throwable) { + Log.e(tag, "Error occurred while executing callback's onSuccess handler", t) + } // catch and don't rethrow error from callback + } catch (throwable: Throwable) { + when (throwable) { + is AblyException -> { callback.onError(throwable) } + else -> { + val ex = ablyException("Error executing operation", ErrorCode.BadRequest, cause = throwable) + callback.onError(ex) + } + } + } + } + } + + internal fun cancel(cause: CancellationException) { + scope.coroutineContext.cancelChildren(cause) + } +} diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/type/BaseLiveObject.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/type/BaseLiveObject.kt index 70778cfbe..523d37dc8 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/type/BaseLiveObject.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/type/BaseLiveObject.kt @@ -1,6 +1,5 @@ package io.ably.lib.objects.type -import io.ably.lib.objects.* import io.ably.lib.objects.ObjectMessage import io.ably.lib.objects.ObjectOperation import io.ably.lib.objects.ObjectState diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt index 80f6151a2..5f0ee538e 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt @@ -5,7 +5,6 @@ import io.ably.lib.objects.ObjectOperation import io.ably.lib.objects.ObjectState import io.ably.lib.objects.type.BaseLiveObject import io.ably.lib.objects.type.ObjectType -import io.ably.lib.types.Callback import java.util.concurrent.atomic.AtomicReference /** @@ -38,7 +37,7 @@ internal class DefaultLiveCounter private constructor( TODO("Not yet implemented") } - override fun incrementAsync(callback: Callback) { + override fun incrementAsync(callback: ObjectsCallback) { TODO("Not yet implemented") } @@ -46,12 +45,13 @@ internal class DefaultLiveCounter private constructor( TODO("Not yet implemented") } - override fun decrementAsync(callback: Callback) { + override fun decrementAsync(callback: ObjectsCallback) { TODO("Not yet implemented") } override fun value(): Double { - TODO("Not yet implemented") + adapter.throwIfInvalidAccessApiConfiguration(channelName) + return data.get() } override fun validate(state: ObjectState) = liveCounterManager.validate(state) diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt index 45ccbac9f..b17368de9 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt @@ -7,8 +7,9 @@ import io.ably.lib.objects.ObjectOperation import io.ably.lib.objects.ObjectState import io.ably.lib.objects.type.BaseLiveObject import io.ably.lib.objects.type.ObjectType -import io.ably.lib.types.Callback import java.util.concurrent.ConcurrentHashMap +import java.util.AbstractMap + /** * Implementation of LiveObject for LiveMap. @@ -38,38 +39,65 @@ internal class DefaultLiveMap private constructor( internal val objectsPool: ObjectsPool get() = liveObjects.objectsPool override fun get(keyName: String): Any? { - TODO("Not yet implemented") + adapter.throwIfInvalidAccessApiConfiguration(channelName) // RTLM5b, RTLM5c + if (isTombstoned) { + return null + } + data[keyName]?.let { liveMapEntry -> + return liveMapEntry.getResolvedValue(objectsPool) + } + return null // RTLM5d1 } - override fun entries(): MutableIterable> { - TODO("Not yet implemented") + override fun entries(): Iterable> { + adapter.throwIfInvalidAccessApiConfiguration(channelName) // RTLM11b, RTLM11c + + return sequence> { + for ((key, entry) in data.entries) { + val value = entry.getResolvedValue(objectsPool) // RTLM11d, RTLM11d2 + value?.let { + yield(AbstractMap.SimpleImmutableEntry(key, it)) + } + } + }.asIterable() } - override fun keys(): MutableIterable { - TODO("Not yet implemented") + override fun keys(): Iterable { + val iterableEntries = entries() + return sequence { + for (entry in iterableEntries) { + yield(entry.key) // RTLM12b + } + }.asIterable() } - override fun values(): MutableIterable { - TODO("Not yet implemented") + override fun values(): Iterable { + val iterableEntries = entries() + return sequence { + for (entry in iterableEntries) { + yield(entry.value) // RTLM13b + } + }.asIterable() } - override fun set(keyName: String, value: Any) { - TODO("Not yet implemented") + override fun size(): Long { + adapter.throwIfInvalidAccessApiConfiguration(channelName) + return data.values.count { !it.isEntryOrRefTombstoned(objectsPool) }.toLong() // RTLM10d } - override fun remove(keyName: String) { + override fun set(keyName: String, value: Any) { TODO("Not yet implemented") } - override fun size(): Long { + override fun remove(keyName: String) { TODO("Not yet implemented") } - override fun setAsync(keyName: String, value: Any, callback: Callback) { + override fun setAsync(keyName: String, value: Any, callback: ObjectsCallback) { TODO("Not yet implemented") } - override fun removeAsync(keyName: String, callback: Callback) { + override fun removeAsync(keyName: String, callback: ObjectsCallback) { TODO("Not yet implemented") } diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveCounterTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveCounterTest.kt new file mode 100644 index 000000000..a55106451 --- /dev/null +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveCounterTest.kt @@ -0,0 +1,205 @@ +package io.ably.lib.objects.integration + +import io.ably.lib.objects.LiveCounter +import io.ably.lib.objects.LiveMap +import io.ably.lib.objects.assertWaiter +import io.ably.lib.objects.integration.helpers.fixtures.createUserMapWithCountersObject +import io.ably.lib.objects.integration.setup.IntegrationTest +import kotlinx.coroutines.test.runTest +import org.junit.Test +import kotlin.test.assertEquals +import kotlin.test.assertNotNull + +class DefaultLiveCounterTest: IntegrationTest() { + /** + * Tests the synchronization process when a user map object with counters is initialized before channel attach. + * This includes checking the initial values of all counter objects and nested maps in the + * comprehensive user engagement counter structure. + */ + @Test + fun testLiveCounterSync() = runTest { + val channelName = generateChannelName() + val userMapObjectId = restObjects.createUserMapWithCountersObject(channelName) + restObjects.setMapRef(channelName, "root", "user", userMapObjectId) + + val channel = getRealtimeChannel(channelName) + val rootMap = channel.objects.root + + // Get the user map object from the root map + val userMap = rootMap.get("user") as LiveMap + assertNotNull(userMap, "User map should be synchronized") + assertEquals(7L, userMap.size(), "User map should contain 7 top-level entries") + + // Assert direct counter objects at the top level of the user map + // Test profileViews counter - should have initial value of 127 + val profileViewsCounter = userMap.get("profileViews") as LiveCounter + assertNotNull(profileViewsCounter, "Profile views counter should exist") + assertEquals(127.0, profileViewsCounter.value(), "Profile views counter should have initial value of 127") + + // Test postLikes counter - should have initial value of 45 + val postLikesCounter = userMap.get("postLikes") as LiveCounter + assertNotNull(postLikesCounter, "Post likes counter should exist") + assertEquals(45.0, postLikesCounter.value(), "Post likes counter should have initial value of 45") + + // Test commentCount counter - should have initial value of 23 + val commentCountCounter = userMap.get("commentCount") as LiveCounter + assertNotNull(commentCountCounter, "Comment count counter should exist") + assertEquals(23.0, commentCountCounter.value(), "Comment count counter should have initial value of 23") + + // Test followingCount counter - should have initial value of 89 + val followingCountCounter = userMap.get("followingCount") as LiveCounter + assertNotNull(followingCountCounter, "Following count counter should exist") + assertEquals(89.0, followingCountCounter.value(), "Following count counter should have initial value of 89") + + // Test followersCount counter - should have initial value of 156 + val followersCountCounter = userMap.get("followersCount") as LiveCounter + assertNotNull(followersCountCounter, "Followers count counter should exist") + assertEquals(156.0, followersCountCounter.value(), "Followers count counter should have initial value of 156") + + // Test loginStreak counter - should have initial value of 7 + val loginStreakCounter = userMap.get("loginStreak") as LiveCounter + assertNotNull(loginStreakCounter, "Login streak counter should exist") + assertEquals(7.0, loginStreakCounter.value(), "Login streak counter should have initial value of 7") + + // Assert the nested engagement metrics map + val engagementMetrics = userMap.get("engagementMetrics") as LiveMap + assertNotNull(engagementMetrics, "Engagement metrics map should exist") + assertEquals(4L, engagementMetrics.size(), "Engagement metrics map should contain 4 counter entries") + + // Assert counter objects within the engagement metrics map + // Test totalShares counter - should have initial value of 34 + val totalSharesCounter = engagementMetrics.get("totalShares") as LiveCounter + assertNotNull(totalSharesCounter, "Total shares counter should exist") + assertEquals(34.0, totalSharesCounter.value(), "Total shares counter should have initial value of 34") + + // Test totalBookmarks counter - should have initial value of 67 + val totalBookmarksCounter = engagementMetrics.get("totalBookmarks") as LiveCounter + assertNotNull(totalBookmarksCounter, "Total bookmarks counter should exist") + assertEquals(67.0, totalBookmarksCounter.value(), "Total bookmarks counter should have initial value of 67") + + // Test totalReactions counter - should have initial value of 189 + val totalReactionsCounter = engagementMetrics.get("totalReactions") as LiveCounter + assertNotNull(totalReactionsCounter, "Total reactions counter should exist") + assertEquals(189.0, totalReactionsCounter.value(), "Total reactions counter should have initial value of 189") + + // Test dailyActiveStreak counter - should have initial value of 12 + val dailyActiveStreakCounter = engagementMetrics.get("dailyActiveStreak") as LiveCounter + assertNotNull(dailyActiveStreakCounter, "Daily active streak counter should exist") + assertEquals(12.0, dailyActiveStreakCounter.value(), "Daily active streak counter should have initial value of 12") + + // Verify that all expected counter keys exist at the top level + val topLevelKeys = userMap.keys().toSet() + val expectedTopLevelKeys = setOf( + "profileViews", "postLikes", "commentCount", "followingCount", + "followersCount", "loginStreak", "engagementMetrics" + ) + assertEquals(expectedTopLevelKeys, topLevelKeys, "Top-level keys should match expected counter keys") + + // Verify that all expected counter keys exist in the engagement metrics map + val engagementKeys = engagementMetrics.keys().toSet() + val expectedEngagementKeys = setOf( + "totalShares", "totalBookmarks", "totalReactions", "dailyActiveStreak" + ) + assertEquals(expectedEngagementKeys, engagementKeys, "Engagement metrics keys should match expected counter keys") + + // Verify total counter values match expectations (useful for integration testing) + val totalUserCounterValues = listOf(127.0, 45.0, 23.0, 89.0, 156.0, 7.0).sum() + val totalEngagementCounterValues = listOf(34.0, 67.0, 189.0, 12.0).sum() + assertEquals(447.0, totalUserCounterValues, "Sum of user counter values should be 447") + assertEquals(302.0, totalEngagementCounterValues, "Sum of engagement counter values should be 302") + } + + /** + * Tests sequential counter operations including creation with initial value, incrementing by various amounts, + * decrementing by various amounts, and validates the resulting counter value after each operation. + */ + @Test + fun testLiveCounterOperations() = runTest { + val channelName = generateChannelName() + val channel = getRealtimeChannel(channelName) + val rootMap = channel.objects.root + + // Step 1: Create a new counter with initial value of 10 + val testCounterObjectId = restObjects.createCounter(channelName, initialValue = 10.0) + restObjects.setMapRef(channelName, "root", "testCounter", testCounterObjectId) + + // Wait for updated testCounter to be available in the root map + assertWaiter { rootMap.get("testCounter") != null } + + // Assert initial state after creation + val testCounter = rootMap.get("testCounter") as LiveCounter + assertNotNull(testCounter, "Test counter should be created and accessible") + assertEquals(10.0, testCounter.value(), "Counter should have initial value of 10") + + // Step 2: Increment counter by 5 (10 + 5 = 15) + restObjects.incrementCounter(channelName, testCounterObjectId, 5.0) + // Wait for the counter to be updated + assertWaiter { testCounter.value() == 15.0 } + + // Assert after first increment + assertEquals(15.0, testCounter.value(), "Counter should be incremented to 15") + + // Step 3: Increment counter by 3 (15 + 3 = 18) + restObjects.incrementCounter(channelName, testCounterObjectId, 3.0) + // Wait for the counter to be updated + assertWaiter { testCounter.value() == 18.0 } + + // Assert after second increment + assertEquals(18.0, testCounter.value(), "Counter should be incremented to 18") + + // Step 4: Increment counter by a larger amount: 12 (18 + 12 = 30) + restObjects.incrementCounter(channelName, testCounterObjectId, 12.0) + // Wait for the counter to be updated + assertWaiter { testCounter.value() == 30.0 } + + // Assert after third increment + assertEquals(30.0, testCounter.value(), "Counter should be incremented to 30") + + // Step 5: Decrement counter by 7 (30 - 7 = 23) + restObjects.decrementCounter(channelName, testCounterObjectId, 7.0) + // Wait for the counter to be updated + assertWaiter { testCounter.value() == 23.0 } + + // Assert after first decrement + assertEquals(23.0, testCounter.value(), "Counter should be decremented to 23") + + // Step 6: Decrement counter by 4 (23 - 4 = 19) + restObjects.decrementCounter(channelName, testCounterObjectId, 4.0) + // Wait for the counter to be updated + assertWaiter { testCounter.value() == 19.0 } + + // Assert after second decrement + assertEquals(19.0, testCounter.value(), "Counter should be decremented to 19") + + // Step 7: Increment counter by 1 (19 + 1 = 20) + restObjects.incrementCounter(channelName, testCounterObjectId, 1.0) + // Wait for the counter to be updated + assertWaiter { testCounter.value() == 20.0 } + + // Assert after final increment + assertEquals(20.0, testCounter.value(), "Counter should be incremented to 20") + + // Step 8: Decrement counter by a larger amount: 15 (20 - 15 = 5) + restObjects.decrementCounter(channelName, testCounterObjectId, 15.0) + // Wait for the counter to be updated + assertWaiter { testCounter.value() == 5.0 } + + // Assert after large decrement + assertEquals(5.0, testCounter.value(), "Counter should be decremented to 5") + + // Final verification - test final increment to ensure counter still works + restObjects.incrementCounter(channelName, testCounterObjectId, 25.0) + assertWaiter { testCounter.value() == 30.0 } + + // Assert final state + assertEquals(30.0, testCounter.value(), "Counter should have final value of 30") + + // Verify the counter object is still accessible and functioning + assertNotNull(testCounter, "Counter should still be accessible at the end") + + // Verify we can still access it from the root map + val finalCounterCheck = rootMap.get("testCounter") as LiveCounter + assertNotNull(finalCounterCheck, "Counter should still be accessible from root map") + assertEquals(30.0, finalCounterCheck.value(), "Final counter value should be 30 when accessed from root map") + } +} diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveMapTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveMapTest.kt new file mode 100644 index 000000000..68e94c891 --- /dev/null +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveMapTest.kt @@ -0,0 +1,213 @@ +package io.ably.lib.objects.integration + +import io.ably.lib.objects.* +import io.ably.lib.objects.ObjectData +import io.ably.lib.objects.ObjectValue +import io.ably.lib.objects.integration.helpers.fixtures.createUserMapObject +import io.ably.lib.objects.integration.setup.IntegrationTest +import kotlinx.coroutines.test.runTest +import org.junit.Test +import kotlin.test.assertEquals +import kotlin.test.assertNotNull +import kotlin.test.assertNull + +class DefaultLiveMapTest: IntegrationTest() { + /** + * Tests the synchronization process when a user map object is initialized before channel attach. + * This includes checking the initial values of all nested maps, counters, and primitive data types + * in the comprehensive user map object structure. + */ + @Test + fun testLiveMapSync() = runTest { + val channelName = generateChannelName() + val userMapObjectId = restObjects.createUserMapObject(channelName) + restObjects.setMapRef(channelName, "root", "user", userMapObjectId) + + val channel = getRealtimeChannel(channelName) + val rootMap = channel.objects.root + + // Get the user map object from the root map + val userMap = rootMap.get("user") as LiveMap + assertNotNull(userMap, "User map should be synchronized") + assertEquals(5L, userMap.size(), "User map should contain 5 top-level entries") + + // Assert Counter Objects + // Test loginCounter - should have initial value of 5 + val loginCounter = userMap.get("loginCounter") as LiveCounter + assertNotNull(loginCounter, "Login counter should exist") + assertEquals(5.0, loginCounter.value(), "Login counter should have initial value of 5") + + // Test sessionCounter - should have initial value of 0 + val sessionCounter = userMap.get("sessionCounter") as LiveCounter + assertNotNull(sessionCounter, "Session counter should exist") + assertEquals(0.0, sessionCounter.value(), "Session counter should have initial value of 0") + + // Assert User Profile Map + val userProfile = userMap.get("userProfile") as LiveMap + assertNotNull(userProfile, "User profile map should exist") + assertEquals(6L, userProfile.size(), "User profile should contain 6 entries") + + // Assert user profile primitive values + assertEquals("user123", userProfile.get("userId"), "User ID should match expected value") + assertEquals("John Doe", userProfile.get("name"), "User name should match expected value") + assertEquals("john@example.com", userProfile.get("email"), "User email should match expected value") + assertEquals(true, userProfile.get("isActive"), "User should be active") + + // Assert Preferences Map (nested within user profile) + val preferences = userProfile.get("preferences") as LiveMap + assertNotNull(preferences, "Preferences map should exist") + assertEquals(4L, preferences.size(), "Preferences should contain 4 entries") + assertEquals("dark", preferences.get("theme"), "Theme preference should be dark") + assertEquals(true, preferences.get("notifications"), "Notifications should be enabled") + assertEquals("en", preferences.get("language"), "Language should be English") + assertEquals(3.0, preferences.get("maxRetries"), "Max retries should be 3") + + // Assert Metrics Map (nested within user profile) + val metrics = userProfile.get("metrics") as LiveMap + assertNotNull(metrics, "Metrics map should exist") + assertEquals(4L, metrics.size(), "Metrics should contain 4 entries") + assertEquals("2024-01-01T08:30:00Z", metrics.get("lastLoginTime"), "Last login time should match") + assertEquals(42.0, metrics.get("profileViews"), "Profile views should be 42") + + // Test counter references within metrics map + val totalLoginsCounter = metrics.get("totalLogins") as LiveCounter + assertNotNull(totalLoginsCounter, "Total logins counter should exist") + assertEquals(5.0, totalLoginsCounter.value(), "Total logins should reference login counter with value 5") + + val activeSessionsCounter = metrics.get("activeSessions") as LiveCounter + assertNotNull(activeSessionsCounter, "Active sessions counter should exist") + assertEquals(0.0, activeSessionsCounter.value(), "Active sessions should reference session counter with value 0") + + // Assert direct references to maps from top-level user map + val preferencesMapRef = userMap.get("preferencesMap") as LiveMap + assertNotNull(preferencesMapRef, "Preferences map reference should exist") + assertEquals(4L, preferencesMapRef.size(), "Referenced preferences map should have 4 entries") + assertEquals("dark", preferencesMapRef.get("theme"), "Referenced preferences should match nested preferences") + + val metricsMapRef = userMap.get("metricsMap") as LiveMap + assertNotNull(metricsMapRef, "Metrics map reference should exist") + assertEquals(4L, metricsMapRef.size(), "Referenced metrics map should have 4 entries") + assertEquals("2024-01-01T08:30:00Z", metricsMapRef.get("lastLoginTime"), "Referenced metrics should match nested metrics") + + // Verify that references point to the same objects + assertEquals(preferences.get("theme"), preferencesMapRef.get("theme"), "Preference references should point to same data") + assertEquals(metrics.get("profileViews"), metricsMapRef.get("profileViews"), "Metrics references should point to same data") + } + + /** + * Tests sequential map operations including creation with initial data, updating existing fields, + * adding new fields, and removing fields. Validates the resulting data after each operation. + */ + @Test + fun testLiveMapOperations() = runTest { + val channelName = generateChannelName() + val channel = getRealtimeChannel(channelName) + val rootMap = channel.objects.root + + // Step 1: Create a new map with initial data + val testMapObjectId = restObjects.createMap( + channelName, + data = mapOf( + "name" to ObjectData(value = ObjectValue.String("Alice")), + "age" to ObjectData(value = ObjectValue.Number(30)), + "isActive" to ObjectData(value = ObjectValue.Boolean(true)) + ) + ) + restObjects.setMapRef(channelName, "root", "testMap", testMapObjectId) + + // wait for updated testMap to be available in the root map + assertWaiter { rootMap.get("testMap") != null } + + // Assert initial state after creation + val testMap = rootMap.get("testMap") as LiveMap + assertNotNull(testMap, "Test map should be created and accessible") + assertEquals(3L, testMap.size(), "Test map should have 3 initial entries") + assertEquals("Alice", testMap.get("name"), "Initial name should be Alice") + assertEquals(30.0, testMap.get("age"), "Initial age should be 30") + assertEquals(true, testMap.get("isActive"), "Initial active status should be true") + + // Step 2: Update an existing field (name from "Alice" to "Bob") + restObjects.setMapValue(channelName, testMapObjectId, "name", ObjectValue.String("Bob")) + // Wait for the map to be updated + assertWaiter { testMap.get("name") == "Bob" } + + // Assert after updating existing field + assertEquals(3L, testMap.size(), "Map size should remain the same after update") + assertEquals("Bob", testMap.get("name"), "Name should be updated to Bob") + assertEquals(30.0, testMap.get("age"), "Age should remain unchanged") + assertEquals(true, testMap.get("isActive"), "Active status should remain unchanged") + + // Step 3: Add a new field (email) + restObjects.setMapValue(channelName, testMapObjectId, "email", ObjectValue.String("bob@example.com")) + // Wait for the map to be updated + assertWaiter { testMap.get("email") == "bob@example.com" } + + // Assert after adding new field + assertEquals(4L, testMap.size(), "Map size should increase after adding new field") + assertEquals("Bob", testMap.get("name"), "Name should remain Bob") + assertEquals(30.0, testMap.get("age"), "Age should remain unchanged") + assertEquals(true, testMap.get("isActive"), "Active status should remain unchanged") + assertEquals("bob@example.com", testMap.get("email"), "Email should be added successfully") + + // Step 4: Add another new field with different data type (score as number) + restObjects.setMapValue(channelName, testMapObjectId, "score", ObjectValue.Number(85)) + // Wait for the map to be updated + assertWaiter { testMap.get("score") == 85.0 } + + // Assert after adding second new field + assertEquals(5L, testMap.size(), "Map size should increase to 5 after adding score") + assertEquals("Bob", testMap.get("name"), "Name should remain Bob") + assertEquals(30.0, testMap.get("age"), "Age should remain unchanged") + assertEquals(true, testMap.get("isActive"), "Active status should remain unchanged") + assertEquals("bob@example.com", testMap.get("email"), "Email should remain unchanged") + assertEquals(85.0, testMap.get("score"), "Score should be added as numeric value") + + // Step 5: Update the boolean field + restObjects.setMapValue(channelName, testMapObjectId, "isActive", ObjectValue.Boolean(false)) + // Wait for the map to be updated + assertWaiter { testMap.get("isActive") == false } + + // Assert after updating boolean field + assertEquals(5L, testMap.size(), "Map size should remain 5 after boolean update") + assertEquals("Bob", testMap.get("name"), "Name should remain Bob") + assertEquals(30.0, testMap.get("age"), "Age should remain unchanged") + assertEquals(false, testMap.get("isActive"), "Active status should be updated to false") + assertEquals("bob@example.com", testMap.get("email"), "Email should remain unchanged") + assertEquals(85.0, testMap.get("score"), "Score should remain unchanged") + + // Step 6: Remove a field (age) + restObjects.removeMapValue(channelName, testMapObjectId, "age") + // Wait for the map to be updated + assertWaiter { testMap.get("age") == null } + + // Assert after removing field + assertEquals(4L, testMap.size(), "Map size should decrease to 4 after removing age") + assertEquals("Bob", testMap.get("name"), "Name should remain Bob") + assertNull(testMap.get("age"), "Age should be removed and return null") + assertEquals(false, testMap.get("isActive"), "Active status should remain false") + assertEquals("bob@example.com", testMap.get("email"), "Email should remain unchanged") + assertEquals(85.0, testMap.get("score"), "Score should remain unchanged") + + // Step 7: Remove another field (score) + restObjects.removeMapValue(channelName, testMapObjectId, "score") + // Wait for the map to be updated + assertWaiter { testMap.get("score") == null } + + // Assert final state after second removal + assertEquals(3L, testMap.size(), "Map size should decrease to 3 after removing score") + assertEquals("Bob", testMap.get("name"), "Name should remain Bob") + assertEquals(false, testMap.get("isActive"), "Active status should remain false") + assertEquals("bob@example.com", testMap.get("email"), "Email should remain unchanged") + assertNull(testMap.get("score"), "Score should be removed and return null") + assertNull(testMap.get("age"), "Age should remain null") + + // Final verification - ensure all expected keys exist and unwanted keys don't + assertEquals(3, testMap.size(), "Final map should have exactly 3 entries") + + val finalKeys = testMap.keys().toSet() + assertEquals(setOf("name", "isActive", "email"), finalKeys, "Final keys should match expected set") + + val finalValues = testMap.values().toSet() + assertEquals(setOf("Bob", false, "bob@example.com"), finalValues, "Final values should match expected set") + } +} diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveObjectsTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveObjectsTest.kt new file mode 100644 index 000000000..8e5396a1a --- /dev/null +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveObjectsTest.kt @@ -0,0 +1,187 @@ +package io.ably.lib.objects.integration + +import com.google.gson.JsonArray +import com.google.gson.JsonObject +import io.ably.lib.objects.* +import io.ably.lib.objects.Binary +import io.ably.lib.objects.integration.helpers.State +import io.ably.lib.objects.integration.helpers.fixtures.initializeRootMap +import io.ably.lib.objects.integration.setup.IntegrationTest +import io.ably.lib.objects.size +import io.ably.lib.objects.state.ObjectsStateEvent +import kotlinx.coroutines.test.runTest +import org.junit.Test +import kotlin.test.assertEquals +import kotlin.test.assertNotNull +import kotlin.test.assertNull +import kotlin.text.toByteArray + +class DefaultLiveObjectsTest : IntegrationTest() { + + @Test + fun testChannelObjects() = runTest { + val channelName = generateChannelName() + val channel = getRealtimeChannel(channelName) + val objects = channel.objects + assertNotNull(objects) + } + + @Test + fun testObjectsSyncEvents() = runTest { + val channelName = generateChannelName() + // Initialize the root map on the channel with initial data + restObjects.initializeRootMap(channelName) + + val channel = getRealtimeChannel(channelName, autoAttach = false) + val objects = channel.objects + assertNotNull(objects) + + assertEquals(ObjectsState.Initialized, objects.State, "Initial state should be INITIALIZED") + + val syncStates = mutableListOf() + objects.on(ObjectsStateEvent.SYNCING) { + syncStates.add(it) + } + objects.on(ObjectsStateEvent.SYNCED) { + syncStates.add(it) + } + + channel.attach() + + assertWaiter { syncStates.size == 2 } // Wait for both SYNCING and SYNCED events + + assertEquals(ObjectsStateEvent.SYNCING, syncStates[0], "First event should be SYNCING") + assertEquals(ObjectsStateEvent.SYNCED, syncStates[1], "Second event should be SYNCED") + + val rootMap = objects.root + assertEquals(6, rootMap.size(), "Root map should have 6 entries after sync") + } + + /** + * This will test objects sync process when the root map is initialized before channel attach. + * This includes checking the initial values of counters, maps, and other data types. + */ + @Test + fun testObjectsSync() = runTest { + val channelName = generateChannelName() + // Initialize the root map on the channel with initial data + restObjects.initializeRootMap(channelName) + + val channel = getRealtimeChannel(channelName) + val rootMap = channel.objects.root + assertNotNull(rootMap) + + // Assert Counter Objects + // Test emptyCounter - should have initial value of 0 + val emptyCounter = rootMap.get("emptyCounter") as LiveCounter + assertNotNull(emptyCounter) + assertEquals(0.0, emptyCounter.value()) + + // Test initialValueCounter - should have initial value of 10 + val initialValueCounter = rootMap.get("initialValueCounter") as LiveCounter + assertNotNull(initialValueCounter) + assertEquals(10.0, initialValueCounter.value()) + + // Test referencedCounter - should have initial value of 20 + val referencedCounter = rootMap.get("referencedCounter") as LiveCounter + assertNotNull(referencedCounter) + assertEquals(20.0, referencedCounter.value()) + + // Assert Map Objects + // Test emptyMap - should be an empty map + val emptyMap = rootMap.get("emptyMap") as LiveMap + assertNotNull(emptyMap) + assertEquals(0L, emptyMap.size()) + + // Test referencedMap - should contain one key "counterKey" pointing to referencedCounter + val referencedMap = rootMap.get("referencedMap") as LiveMap + assertNotNull(referencedMap) + assertEquals(1L, referencedMap.size()) + val referencedMapCounter = referencedMap.get("counterKey") as LiveCounter + assertNotNull(referencedMapCounter) + assertEquals(20.0, referencedMapCounter.value()) // Should point to the same counter with value 20 + + // Test valuesMap - should contain all primitive data types and one map reference + val valuesMap = rootMap.get("valuesMap") as LiveMap + assertNotNull(valuesMap) + assertEquals(13L, valuesMap.size()) // Should have 13 entries + + // Assert string values + assertEquals("stringValue", valuesMap.get("string")) + assertEquals("", valuesMap.get("emptyString")) + + // Assert binary values + val bytesValue = valuesMap.get("bytes") as Binary + assertNotNull(bytesValue) + val expectedBinary = Binary("eyJwcm9kdWN0SWQiOiAiMDAxIiwgInByb2R1Y3ROYW1lIjogImNhciJ9".toByteArray()) + assertEquals(expectedBinary, bytesValue) // Should contain encoded JSON data + + val emptyBytesValue = valuesMap.get("emptyBytes") as Binary + assertNotNull(emptyBytesValue) + assertEquals(0, emptyBytesValue.size()) // Should be empty byte array + + // Assert numeric values + assertEquals(99999999.0, valuesMap.get("maxSafeNumber")) + assertEquals(-99999999.0, valuesMap.get("negativeMaxSafeNumber")) + assertEquals(1.0, valuesMap.get("number")) + assertEquals(0.0, valuesMap.get("zero")) + + // Assert boolean values + assertEquals(true, valuesMap.get("true")) + assertEquals(false, valuesMap.get("false")) + + // Assert JSON object value - should contain {"foo": "bar"} + val jsonObjectValue = valuesMap.get("object") as JsonObject + assertNotNull(jsonObjectValue) + assertEquals("bar", jsonObjectValue.get("foo").asString) + + // Assert JSON array value - should contain ["foo", "bar", "baz"] + val jsonArrayValue = valuesMap.get("array") as JsonArray + assertNotNull(jsonArrayValue) + assertEquals(3, jsonArrayValue.size()) + assertEquals("foo", jsonArrayValue[0].asString) + assertEquals("bar", jsonArrayValue[1].asString) + assertEquals("baz", jsonArrayValue[2].asString) + + // Assert map reference - should point to the same referencedMap + val mapRefValue = valuesMap.get("mapRef") as LiveMap + assertNotNull(mapRefValue) + assertEquals(1L, mapRefValue.size()) + val mapRefCounter = mapRefValue.get("counterKey") as LiveCounter + assertNotNull(mapRefCounter) + assertEquals(20.0, mapRefCounter.value()) // Should point to the same counter with value 20 + } + + /** + * Spec: RTLO4e - Tests the removal of objects from the root map. + * Server runs periodic garbage collection (GC) to remove orphaned objects and will send + * OBJECT_DELETE events for objects that are no longer referenced. + * `OBJECT_DELETE` event is not covered in the test and we only check if map entries are removed + */ + @Test + fun testObjectRemovalFromRoot() = runTest { + val channelName = generateChannelName() + // Initialize the root map on the channel with initial data + restObjects.initializeRootMap(channelName) + + val channel = getRealtimeChannel(channelName) + val rootMap = channel.objects.root + assertEquals(6L, rootMap.size()) // Should have 6 entries initially + + // Remove the "referencedCounter" from the root map + assertNotNull(rootMap.get("referencedCounter")) // Access to ensure it exists before removal + + restObjects.removeMapValue(channelName, "root", "referencedCounter") + + assertWaiter { rootMap.size() == 5L } // Wait for the removal to complete + assertNull(rootMap.get("referencedCounter")) // Should be null after removal + + // Remove the "referencedMap" from the root map + assertNotNull(rootMap.get("referencedMap")) // Access to ensure it exists before removal + + restObjects.removeMapValue(channelName, "root", "referencedMap") + + assertWaiter { rootMap.size() == 4L } // Wait for the removal to complete + assertNull(rootMap.get("referencedMap")) // Should be null after removal + } +} diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/LiveObjectTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/LiveObjectTest.kt deleted file mode 100644 index 7e672e178..000000000 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/LiveObjectTest.kt +++ /dev/null @@ -1,17 +0,0 @@ -package io.ably.lib.objects.integration - -import io.ably.lib.objects.integration.setup.IntegrationTest -import kotlinx.coroutines.test.runTest -import org.junit.Test -import kotlin.test.assertNotNull - -class LiveObjectTest : IntegrationTest() { - - @Test - fun testChannelObjectGetterTest() = runTest { - val channelName = generateChannelName() - val channel = getRealtimeChannel(channelName) - val objects = channel.objects - assertNotNull(objects) - } -} diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/PayloadBuilder.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/PayloadBuilder.kt new file mode 100644 index 000000000..2a8b466ee --- /dev/null +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/PayloadBuilder.kt @@ -0,0 +1,135 @@ +package io.ably.lib.objects.integration.helpers + +import com.google.gson.JsonObject +import io.ably.lib.objects.ObjectData +import io.ably.lib.objects.ObjectOperationAction +import io.ably.lib.objects.serialization.gson +import java.util.* + +internal object PayloadBuilder { + /** + * Action strings for REST API operations. + * Maps ObjectOperationAction enum values to their string representations. + */ + private val ACTION_STRINGS = mapOf( + ObjectOperationAction.MapCreate to "MAP_CREATE", + ObjectOperationAction.MapSet to "MAP_SET", + ObjectOperationAction.MapRemove to "MAP_REMOVE", + ObjectOperationAction.CounterCreate to "COUNTER_CREATE", + ObjectOperationAction.CounterInc to "COUNTER_INC", + ) + + /** + * Generates a random nonce string for object creation operations. + */ + private fun nonce(): String = UUID.randomUUID().toString().replace("-", "") + + /** + * Creates a MAP_CREATE operation payload for REST API. + * + * @param objectId Optional specific object ID + * @param data Optional initial data for the map + * @param nonce Optional nonce for deterministic object ID generation + */ + internal fun mapCreateRestOp( + objectId: String? = null, + data: Map? = null, + nonce: String? = null, + ): JsonObject { + val opBody = JsonObject().apply { + addProperty("operation", ACTION_STRINGS[ObjectOperationAction.MapCreate]) + } + + if (data != null) { + opBody.add("data", gson.toJsonTree(data)) + } + + if (objectId != null) { + opBody.addProperty("objectId", objectId) + opBody.addProperty("nonce", nonce ?: nonce()) + } + + return opBody + } + + + /** + * Creates a MAP_SET operation payload for REST API. + */ + internal fun mapSetRestOp(objectId: String, key: String, value: ObjectData): JsonObject { + val opBody = JsonObject().apply { + addProperty("operation", ACTION_STRINGS[ObjectOperationAction.MapSet]) + addProperty("objectId", objectId) + } + + val dataObj = JsonObject().apply { + addProperty("key", key) + add("value", gson.toJsonTree(value)) + } + opBody.add("data", dataObj) + + return opBody + } + + /** + * Creates a MAP_REMOVE operation payload for REST API. + */ + internal fun mapRemoveRestOp(objectId: String, key: String): JsonObject { + val opBody = JsonObject().apply { + addProperty("operation", ACTION_STRINGS[ObjectOperationAction.MapRemove]) + addProperty("objectId", objectId) + } + + val dataObj = JsonObject().apply { + addProperty("key", key) + } + opBody.add("data", dataObj) + + return opBody + } + + /** + * Creates a COUNTER_CREATE operation payload for REST API. + * + * @param objectId Optional specific object ID + * @param nonce Optional nonce for deterministic object ID generation + * @param number Optional initial counter value + */ + internal fun counterCreateRestOp( + objectId: String? = null, + number: Double? = null, + nonce: String? = null, + ): JsonObject { + val opBody = JsonObject().apply { + addProperty("operation", ACTION_STRINGS[ObjectOperationAction.CounterCreate]) + } + + if (number != null) { + val dataObj = JsonObject().apply { + addProperty("number", number) + } + opBody.add("data", dataObj) + } + + if (objectId != null) { + opBody.addProperty("objectId", objectId) + opBody.addProperty("nonce", nonce ?: nonce()) + } + + return opBody + } + + /** + * Creates a COUNTER_INC operation payload for REST API. + */ + internal fun counterIncRestOp(objectId: String, number: Double): JsonObject { + val opBody = JsonObject().apply { + addProperty("operation", ACTION_STRINGS[ObjectOperationAction.CounterInc]) + addProperty("objectId", objectId) + add("data", JsonObject().apply { + addProperty("number", number) + }) + } + return opBody + } +} diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/RestObjects.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/RestObjects.kt new file mode 100644 index 000000000..82e1dbef0 --- /dev/null +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/RestObjects.kt @@ -0,0 +1,119 @@ +package io.ably.lib.objects.integration.helpers + +import com.google.gson.JsonObject +import io.ably.lib.objects.ObjectData +import io.ably.lib.objects.ObjectValue +import io.ably.lib.rest.AblyRest +import io.ably.lib.http.HttpUtils +import io.ably.lib.types.ClientOptions + +/** + * Helper class to create pre-determined objects and modify them on channels using rest api. + */ +internal class RestObjects(options: ClientOptions) { + + private val ablyRest: AblyRest = AblyRest(options) + + /** + * Creates a new map object on the channel with optional initial data. + * @return The object ID of the created map + */ + internal fun createMap(channelName: String, data: Map? = null): String { + val mapCreateOp = PayloadBuilder.mapCreateRestOp(data = data) + return operationRequest(channelName, mapCreateOp).objectId ?: + throw Exception("Failed to create map: no objectId returned") + } + + /** + * Sets a value (primitives, JsonObject, JsonArray, etc.) at the specified key in an existing map. + */ + internal fun setMapValue(channelName: String, mapObjectId: String, key: String, value: ObjectValue) { + val data = ObjectData(value = value) + val mapCreateOp = PayloadBuilder.mapSetRestOp(mapObjectId, key, data) + operationRequest(channelName, mapCreateOp) + } + + /** + * Sets an object reference at the specified key in an existing map. + */ + internal fun setMapRef(channelName: String, mapObjectId: String, key: String, refMapObjectId: String) { + val data = ObjectData(objectId = refMapObjectId) + val mapCreateOp = PayloadBuilder.mapSetRestOp(mapObjectId, key, data) + operationRequest(channelName, mapCreateOp) + } + + /** + * Removes a key-value pair from an existing map. + */ + internal fun removeMapValue(channelName: String, mapObjectId: String, key: String) { + val mapRemoveOp = PayloadBuilder.mapRemoveRestOp(mapObjectId, key) + operationRequest(channelName, mapRemoveOp) + } + + /** + * Creates a new counter object with an optional initial value (defaults to 0). + * @return The object ID of the created counter + */ + internal fun createCounter(channelName: String, initialValue: Double? = null): String { + val counterCreateOp = PayloadBuilder.counterCreateRestOp(number = initialValue) + return operationRequest(channelName, counterCreateOp).objectId + ?: throw Exception("Failed to create counter: no objectId returned") + } + + /** + * Increments an existing counter by the specified amount. + */ + internal fun incrementCounter(channelName: String, counterObjectId: String, incrementBy: Double) { + val counterIncrementOp = PayloadBuilder.counterIncRestOp(counterObjectId, incrementBy) + operationRequest(channelName, counterIncrementOp) + } + + /** + * Decrements an existing counter by the specified amount. + */ + internal fun decrementCounter(channelName: String, counterObjectId: String, decrementBy: Double) { + val counterDecrementOp = PayloadBuilder.counterIncRestOp(counterObjectId, -decrementBy) + operationRequest(channelName, counterDecrementOp) + } + + /** + * Core method that executes object operations by sending POST requests to Ably's Objects REST API. + * All public methods delegate to this for actual API communication. + */ + private fun operationRequest(channelName: String, opBody: JsonObject): OperationResult { + try { + val path = "/channels/$channelName/objects" + val requestBody = HttpUtils.requestBodyFromGson(opBody, ablyRest.options.useBinaryProtocol) + + val response = ablyRest.request("POST", path, null, requestBody, null) + + if (!response.success) { + throw Exception("REST operation failed: HTTP ${response.statusCode} - ${response.errorMessage}") + } + + val responseItems = response.items() + if (responseItems.isEmpty()) { + return OperationResult(null, null, success = true) + } + + // Process first response item + responseItems[0].asJsonObject.let { firstItem -> + val objectIds = firstItem.get("objectIds")?.let { element -> + if (element.isJsonArray) element.asJsonArray.map { it.asString } else null + } + return OperationResult(objectIds?.firstOrNull(), objectIds, success = true) + } + } catch (e: Exception) { + throw Exception("Failed to execute operation request: ${e.message}", e) + } + } + + /** + * Result class for operation requests containing the response data and extracted object ID. + */ + private data class OperationResult( + val objectId: String?, + val objectIds: List? = null, // Seems only used for batch operations + val success: Boolean = true + ) +} diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/Utils.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/Utils.kt new file mode 100644 index 000000000..e402347e7 --- /dev/null +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/Utils.kt @@ -0,0 +1,14 @@ +package io.ably.lib.objects.integration.helpers + +import io.ably.lib.objects.DefaultLiveObjects +import io.ably.lib.objects.LiveCounter +import io.ably.lib.objects.LiveMap +import io.ably.lib.objects.LiveObjects +import io.ably.lib.objects.type.livecounter.DefaultLiveCounter +import io.ably.lib.objects.type.livemap.DefaultLiveMap + +internal val LiveMap.ObjectId get() = (this as DefaultLiveMap).objectId + +internal val LiveCounter.ObjectId get() = (this as DefaultLiveCounter).objectId + +internal val LiveObjects.State get() = (this as DefaultLiveObjects).state diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/fixtures/CounterFixtures.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/fixtures/CounterFixtures.kt new file mode 100644 index 000000000..ec9e7aa61 --- /dev/null +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/fixtures/CounterFixtures.kt @@ -0,0 +1,65 @@ +package io.ably.lib.objects.integration.helpers.fixtures + +import io.ably.lib.objects.integration.helpers.RestObjects + +/** + * Creates a comprehensive test fixture object tree focused on user-context counters. + * + * This method establishes a hierarchical structure of live counter objects for testing + * counter operations in a realistic user engagement context, creating various types of + * counters and establishing references between them through nested maps. + * + * **Object Tree Structure:** + * ``` + * userMap (Map) + * ├── profileViews → Counter(value=127) + * ├── postLikes → Counter(value=45) + * ├── commentCount → Counter(value=23) + * ├── followingCount → Counter(value=89) + * ├── followersCount → Counter(value=156) + * ├── loginStreak → Counter(value=7) + * └── engagementMetrics → Map{ + * ├── "totalShares" → Counter(value=34) + * ├── "totalBookmarks" → Counter(value=67) + * ├── "totalReactions" → Counter(value=189) + * └── "dailyActiveStreak" → Counter(value=12) + * } + * ``` + * + * @param channelName The channel where the counter object tree will be created + * @return The object ID of the root test map containing all counter references + */ +internal fun RestObjects.createUserMapWithCountersObject(channelName: String): String { + // Create the main test map first + val testMapObjectId = createMap(channelName) + + // Create various user-context relevant counters + val profileViewsCounterObjectId = createCounter(channelName, 127.0) + val postLikesCounterObjectId = createCounter(channelName, 45.0) + val commentCountCounterObjectId = createCounter(channelName, 23.0) + val followingCountCounterObjectId = createCounter(channelName, 89.0) + val followersCountCounterObjectId = createCounter(channelName, 156.0) + val loginStreakCounterObjectId = createCounter(channelName, 7.0) + + // Create engagement metrics nested map with counters + val engagementMetricsMapObjectId = createMap( + channelName, + data = mapOf( + "totalShares" to DataFixtures.mapRef(createCounter(channelName, 34.0)), + "totalBookmarks" to DataFixtures.mapRef(createCounter(channelName, 67.0)), + "totalReactions" to DataFixtures.mapRef(createCounter(channelName, 189.0)), + "dailyActiveStreak" to DataFixtures.mapRef(createCounter(channelName, 12.0)) + ) + ) + + // Set up the main test map structure with references to all created counters + setMapRef(channelName, testMapObjectId, "profileViews", profileViewsCounterObjectId) + setMapRef(channelName, testMapObjectId, "postLikes", postLikesCounterObjectId) + setMapRef(channelName, testMapObjectId, "commentCount", commentCountCounterObjectId) + setMapRef(channelName, testMapObjectId, "followingCount", followingCountCounterObjectId) + setMapRef(channelName, testMapObjectId, "followersCount", followersCountCounterObjectId) + setMapRef(channelName, testMapObjectId, "loginStreak", loginStreakCounterObjectId) + setMapRef(channelName, testMapObjectId, "engagementMetrics", engagementMetricsMapObjectId) + + return testMapObjectId +} diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/fixtures/DataFixtures.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/fixtures/DataFixtures.kt new file mode 100644 index 000000000..18928cd19 --- /dev/null +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/fixtures/DataFixtures.kt @@ -0,0 +1,84 @@ +package io.ably.lib.objects.integration.helpers.fixtures + +import com.google.gson.JsonArray +import com.google.gson.JsonObject +import io.ably.lib.objects.Binary +import io.ably.lib.objects.ObjectData +import io.ably.lib.objects.ObjectValue + +internal object DataFixtures { + + /** Test fixture for string value ("stringValue") data type */ + internal val stringData = ObjectData(value = ObjectValue.String("stringValue")) + + /** Test fixture for empty string data type */ + internal val emptyStringData = ObjectData(value = ObjectValue.String("")) + + /** Test fixture for binary data containing encoded JSON */ + internal val bytesData = ObjectData( + value = ObjectValue.Binary(Binary("eyJwcm9kdWN0SWQiOiAiMDAxIiwgInByb2R1Y3ROYW1lIjogImNhciJ9".toByteArray()))) + + /** Test fixture for empty binary data (zero-length byte array) */ + internal val emptyBytesData = ObjectData(value = ObjectValue.Binary(Binary(ByteArray(0)))) + + /** Test fixture for maximum safe number value */ + internal val maxSafeNumberData = ObjectData(value = ObjectValue.Number(99999999.0)) + + /** Test fixture for minimum safe number value */ + internal val negativeMaxSafeNumberData = ObjectData(value = ObjectValue.Number(-99999999.0)) + + /** Test fixture for positive number value (1) */ + internal val numberData = ObjectData(value = ObjectValue.Number(1.0)) + + /** Test fixture for zero number value */ + internal val zeroData = ObjectData(value = ObjectValue.Number(0.0)) + + /** Test fixture for boolean true value */ + internal val trueData = ObjectData(value = ObjectValue.Boolean(true)) + + /** Test fixture for boolean false value */ + internal val falseData = ObjectData(value = ObjectValue.Boolean(false)) + + /** Test fixture for JSON object value with single property */ + internal val objectData = ObjectData(value = ObjectValue.JsonObject(JsonObject().apply { addProperty("foo", "bar")})) + + /** Test fixture for JSON array value with three string elements */ + internal val arrayData = ObjectData( + value = ObjectValue.JsonArray(JsonArray().apply { + add("foo") + add("bar") + add("baz") + }) + ) + + /** + * Creates an ObjectData instance that references another map object. + * @param referencedMapObjectId The object ID of the referenced map + */ + internal fun mapRef(referencedMapObjectId: String) = ObjectData(objectId = referencedMapObjectId) + + /** + * Creates a test fixture map containing all supported data types and values. + * @param referencedMapObjectId The object ID to be used for the map reference entry + */ + internal fun mapWithAllValues(referencedMapObjectId: String? = null): Map { + val baseMap = mapOf( + "string" to stringData, + "emptyString" to emptyStringData, + "bytes" to bytesData, + "emptyBytes" to emptyBytesData, + "maxSafeNumber" to maxSafeNumberData, + "negativeMaxSafeNumber" to negativeMaxSafeNumberData, + "number" to numberData, + "zero" to zeroData, + "true" to trueData, + "false" to falseData, + "object" to objectData, + "array" to arrayData + ) + referencedMapObjectId?.let { + return baseMap + ("mapRef" to mapRef(it)) + } + return baseMap + } +} diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/fixtures/MapFixtures.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/fixtures/MapFixtures.kt new file mode 100644 index 000000000..f99dd7d9c --- /dev/null +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/fixtures/MapFixtures.kt @@ -0,0 +1,157 @@ +package io.ably.lib.objects.integration.helpers.fixtures + +import io.ably.lib.objects.ObjectData +import io.ably.lib.objects.ObjectValue +import io.ably.lib.objects.integration.helpers.RestObjects + +/** + * Initializes a comprehensive test fixture object tree on the specified channel. + * + * This method creates a predetermined object hierarchy rooted at a "root" map object, + * establishing references between different types of live objects to enable comprehensive testing. + * + * **Object Tree Structure:** + * ``` + * root (Map) + * ├── emptyCounter → Counter(value=0) + * ├── initialValueCounter → Counter(value=10) + * ├── referencedCounter → Counter(value=20) + * ├── emptyMap → Map{} + * ├── referencedMap → Map{ + * │ └── "counterKey" → referencedCounter + * │ } + * └── valuesMap → Map{ + * ├── "string" → "stringValue" + * ├── "emptyString" → "" + * ├── "bytes" → + * ├── "emptyBytes" → + * ├── "maxSafeInteger" → Long.MAX_VALUE + * ├── "negativeMaxSafeInteger" → Long.MIN_VALUE + * ├── "number" → 1 + * ├── "zero" → 0 + * ├── "true" → true + * ├── "false" → false + * ├── "object" → {"foo": "bar"} + * ├── "array" → ["foo", "bar", "baz"] + * └── "mapRef" → referencedMap + * } + * ``` + * + * @param channelName The channel where the object tree will be created + */ +internal fun RestObjects.initializeRootMap(channelName: String) { + // Create counters + val emptyCounterObjectId = createCounter(channelName) + setMapRef(channelName, "root", "emptyCounter", emptyCounterObjectId) + + val initialValueCounterObjectId = createCounter(channelName, 10.0) + setMapRef(channelName, "root", "initialValueCounter", initialValueCounterObjectId) + + val referencedCounterObjectId = createCounter(channelName, 20.0) + setMapRef(channelName, "root", "referencedCounter", referencedCounterObjectId) + + // Create maps + val emptyMapObjectId = createMap(channelName) + setMapRef(channelName, "root", "emptyMap", emptyMapObjectId) + + val referencedMapObjectId = createMap( + channelName, + data = mapOf("counterKey" to DataFixtures.mapRef(referencedCounterObjectId)) + ) + setMapRef(channelName, "root", "referencedMap", referencedMapObjectId) + + val valuesMapObjectId = createMap( + channelName, + data = DataFixtures.mapWithAllValues(referencedMapObjectId) + ) + setMapRef(channelName, "root", "valuesMap", valuesMapObjectId) +} + + +/** + * Creates a comprehensive test fixture object tree on the specified channel using + * + * This method establishes a hierarchical structure of live objects for testing map operations, + * creating various types of objects and establishing references between them. + * + * **Object Tree Structure:** + * ``` + * testMap (Map) + * ├── userProfile → Map{ + * │ ├── "userId" → "user123" + * │ ├── "name" → "John Doe" + * │ ├── "email" → "john@example.com" + * │ ├── "isActive" → true + * │ ├── "metrics" → metricsMap + * │ └── "preferences" → preferencesMap + * │ } + * ├── loginCounter → Counter(value=5) + * ├── sessionCounter → Counter(value=0) + * ├── preferencesMap → Map{ + * │ ├── "theme" → "dark" + * │ ├── "notifications" → true + * │ ├── "language" → "en" + * │ └── "maxRetries" → 3 + * │ } + * └── metricsMap → Map{ + * ├── "totalLogins" → loginCounter + * ├── "activeSessions" → sessionCounter + * ├── "lastLoginTime" → "2024-01-01T08:30:00Z" + * └── "profileViews" → 42 + * } + * ``` + * + * @param channelName The channel where the test object tree will be created + */ +internal fun RestObjects.createUserMapObject(channelName: String): String { + // Create the main test map first + val testMapObjectId = createMap(channelName) + + // Create counter objects for testing numeric operations + val loginCounterObjectId = createCounter(channelName, 5.0) + val sessionCounterObjectId = createCounter(channelName, 0.0) + + // Create a preferences map with various data types + val preferencesMapObjectId = createMap( + channelName, + data = mapOf( + "theme" to ObjectData(value = ObjectValue.String("dark")), + "notifications" to ObjectData(value = ObjectValue.Boolean(true)), + "language" to ObjectData(value = ObjectValue.String("en")), + "maxRetries" to ObjectData(value = ObjectValue.Number(3)) + ) + ) + + // Create a metrics map that tracks single user activity + val metricsMapObjectId = createMap( + channelName, + data = mapOf( + "totalLogins" to DataFixtures.mapRef(loginCounterObjectId), + "activeSessions" to DataFixtures.mapRef(sessionCounterObjectId), + "lastLoginTime" to ObjectData(value = ObjectValue.String("2024-01-01T08:30:00Z")), + "profileViews" to ObjectData(value = ObjectValue.Number(42)) + ) + ) + + // Create a user profile map with mixed data types and references + val userProfileMapObjectId = createMap( + channelName, + data = mapOf( + "userId" to ObjectData(value = ObjectValue.String("user123")), + "name" to ObjectData(value = ObjectValue.String("John Doe")), + "email" to ObjectData(value = ObjectValue.String("john@example.com")), + "isActive" to ObjectData(value = ObjectValue.Boolean(true)), + "metrics" to DataFixtures.mapRef(metricsMapObjectId), + "preferences" to DataFixtures.mapRef(preferencesMapObjectId) + ) + ) + + // Set up the main test map structure with references to all created objects + setMapRef(channelName, testMapObjectId, "userProfile", userProfileMapObjectId) + setMapRef(channelName, testMapObjectId, "loginCounter", loginCounterObjectId) + setMapRef(channelName, testMapObjectId, "sessionCounter", sessionCounterObjectId) + setMapRef(channelName, testMapObjectId, "preferencesMap", preferencesMapObjectId) + setMapRef(channelName, testMapObjectId, "metricsMap", metricsMapObjectId) + + return testMapObjectId +} diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/setup/IntegrationTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/setup/IntegrationTest.kt index ea323124b..b79f24a04 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/setup/IntegrationTest.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/setup/IntegrationTest.kt @@ -1,5 +1,6 @@ package io.ably.lib.objects.integration.setup +import io.ably.lib.objects.integration.helpers.RestObjects import io.ably.lib.realtime.AblyRealtime import io.ably.lib.realtime.Channel import io.ably.lib.types.ChannelMode @@ -35,7 +36,7 @@ abstract class IntegrationTest { * @return The attached realtime channel. * @throws Exception If the channel fails to attach or the client fails to connect. */ - internal suspend fun getRealtimeChannel(channelName: String, clientId: String = "client1"): Channel { + internal suspend fun getRealtimeChannel(channelName: String, clientId: String = "client1", autoAttach: Boolean = true): Channel { val client = realtimeClients.getOrPut(clientId) { sandbox.createRealtimeClient { this.clientId = clientId @@ -46,8 +47,10 @@ abstract class IntegrationTest { modes = arrayOf(ChannelMode.object_publish, ChannelMode.object_subscribe) } return client.channels.get(channelName, channelOpts).apply { - attach() - ensureAttached() + if (autoAttach) { + attach() + ensureAttached() + } } } @@ -73,6 +76,7 @@ abstract class IntegrationTest { companion object { private lateinit var sandbox: Sandbox + internal lateinit var restObjects: RestObjects @JvmStatic @Parameterized.Parameters(name = "{0}") @@ -86,6 +90,7 @@ abstract class IntegrationTest { fun setUpBeforeClass() { runBlocking { sandbox = Sandbox.createInstance() + restObjects = sandbox.createRestObjects() } } diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/setup/Sandbox.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/setup/Sandbox.kt index 7d2b05586..f38009450 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/setup/Sandbox.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/setup/Sandbox.kt @@ -3,20 +3,17 @@ package io.ably.lib.objects.integration.setup import com.google.gson.JsonElement import com.google.gson.JsonParser import io.ably.lib.objects.ablyException +import io.ably.lib.objects.integration.helpers.RestObjects import io.ably.lib.realtime.* import io.ably.lib.types.ClientOptions -import io.ktor.client.HttpClient -import io.ktor.client.engine.cio.CIO -import io.ktor.client.network.sockets.ConnectTimeoutException -import io.ktor.client.network.sockets.SocketTimeoutException -import io.ktor.client.plugins.HttpRequestRetry -import io.ktor.client.plugins.HttpRequestTimeoutException +import io.ktor.client.* +import io.ktor.client.engine.cio.* +import io.ktor.client.network.sockets.* +import io.ktor.client.plugins.* import io.ktor.client.request.* import io.ktor.client.statement.HttpResponse import io.ktor.client.statement.bodyAsText -import io.ktor.http.ContentType -import io.ktor.http.contentType -import io.ktor.http.isSuccess +import io.ktor.http.* import kotlinx.coroutines.CompletableDeferred private val client = HttpClient(CIO) { @@ -69,6 +66,15 @@ internal fun Sandbox.createRealtimeClient(options: ClientOptions.() -> Unit): Ab return AblyRealtime(clientOptions) } +internal fun Sandbox.createRestObjects(): RestObjects { + val options = ClientOptions().apply { + key = apiKey + environment = "sandbox" + useBinaryProtocol = false + } + return RestObjects(options) +} + internal suspend fun AblyRealtime.ensureConnected() { if (this.connection.state == ConnectionState.connected) { return diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/unit/UtilsTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/UtilsTest.kt new file mode 100644 index 000000000..cbf4dbaee --- /dev/null +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/UtilsTest.kt @@ -0,0 +1,283 @@ +package io.ably.lib.objects.unit + +import io.ably.lib.objects.* +import io.ably.lib.objects.assertWaiter +import io.ably.lib.types.AblyException +import io.ably.lib.types.ErrorInfo +import kotlinx.coroutines.* +import kotlinx.coroutines.test.* +import org.junit.Test +import org.junit.Assert.* +import java.util.concurrent.CancellationException + +class UtilsTest { + + @Test + fun testStringByteSize() { + // Test ASCII strings + assertEquals(5, "Hello".byteSize) + assertEquals(0, "".byteSize) + assertEquals(1, "A".byteSize) + + // Test non-ASCII strings + assertEquals(3, "你".byteSize) // Chinese character + assertEquals(4, "😊".byteSize) // Emoji + assertEquals(6, "你好".byteSize) // Two Chinese characters + } + + @Test + fun testErrorCreationFunctions() { + // Test clientError + val clientEx = clientError("Bad request") + assertEquals("Bad request", clientEx.errorInfo.message) + assertEquals(ErrorCode.BadRequest.code, clientEx.errorInfo.code) + assertEquals(HttpStatusCode.BadRequest.code, clientEx.errorInfo.statusCode) + + // Test serverError + val serverEx = serverError("Internal error") + assertEquals("Internal error", serverEx.errorInfo.message) + assertEquals(ErrorCode.InternalError.code, serverEx.errorInfo.code) + assertEquals(HttpStatusCode.InternalServerError.code, serverEx.errorInfo.statusCode) + + // Test objectError + val objectEx = objectError("Invalid object") + assertEquals("Invalid object", objectEx.errorInfo.message) + assertEquals(ErrorCode.InvalidObject.code, objectEx.errorInfo.code) + assertEquals(HttpStatusCode.InternalServerError.code, objectEx.errorInfo.statusCode) + + // Test objectError with cause + val cause = RuntimeException("Original error") + val objectExWithCause = objectError("Invalid object", cause) + assertEquals("Invalid object", objectExWithCause.errorInfo.message) + assertEquals(cause, objectExWithCause.cause) + } + + @Test + fun testAblyExceptionCreation() { + // Test with error message and codes + val ex = ablyException("Test error", ErrorCode.BadRequest, HttpStatusCode.BadRequest) + assertEquals("Test error", ex.errorInfo.message) + assertEquals(ErrorCode.BadRequest.code, ex.errorInfo.code) + assertEquals(HttpStatusCode.BadRequest.code, ex.errorInfo.statusCode) + + // Test with ErrorInfo + val errorInfo = ErrorInfo("Custom error", 400, 40000) + val ex2 = ablyException(errorInfo) + assertEquals("Custom error", ex2.errorInfo.message) + assertEquals(400, ex2.errorInfo.statusCode) + assertEquals(40000, ex2.errorInfo.code) + + // Test with cause + val cause = RuntimeException("Cause") + val ex3 = ablyException(errorInfo, cause) + assertEquals(cause, ex3.cause) + } + + @Test + fun testObjectsAsyncScopeLaunchWithCallback() = runTest { + val asyncScope = ObjectsAsyncScope("test-channel") + var callbackExecuted = false + var resultReceived: String? = null + + val callback = object : ObjectsCallback { + override fun onSuccess(result: String) { + callbackExecuted = true + resultReceived = result + } + + override fun onError(exception: AblyException) { + fail("Should not call onError for successful execution") + } + } + + asyncScope.launchWithCallback(callback) { + delay(10) // Simulate async work + "test result" + } + + // Wait for callback to be executed + assertWaiter { callbackExecuted } + + assertTrue("Callback should be executed", callbackExecuted) + assertEquals("test result", resultReceived) + } + + @Test + fun testObjectsAsyncScopeLaunchWithCallbackError() = runTest { + val asyncScope = ObjectsAsyncScope("test-channel") + var errorReceived: AblyException? = null + + val callback = object : ObjectsCallback { + override fun onSuccess(result: String) { + fail("Should not call onSuccess for error case") + } + + override fun onError(exception: AblyException) { + errorReceived = exception + } + } + + asyncScope.launchWithCallback(callback) { + delay(10) + throw AblyException.fromErrorInfo(ErrorInfo("Test error", 400, 40000)) + } + + // Wait for error to be received + assertWaiter { errorReceived != null } + + assertNotNull("Error should be received", errorReceived) + assertEquals("Test error", errorReceived?.errorInfo?.message) + assertEquals(400, errorReceived?.errorInfo?.statusCode) + } + + @Test + fun testObjectsAsyncScopeLaunchWithVoidCallback() = runTest { + val asyncScope = ObjectsAsyncScope("test-channel") + var callbackExecuted = false + + val callback = object : ObjectsCallback { + override fun onSuccess(result: Void?) { + callbackExecuted = true + } + + override fun onError(exception: AblyException) { + fail("Should not call onError for successful execution") + } + } + + asyncScope.launchWithVoidCallback(callback) { + delay(10) // Simulate async work + } + + // Wait for callback to be executed + assertWaiter { callbackExecuted } + + assertTrue("Callback should be executed", callbackExecuted) + } + + @Test + fun testObjectsAsyncScopeLaunchWithVoidCallbackError() = runTest { + val asyncScope = ObjectsAsyncScope("test-channel") + var errorReceived: AblyException? = null + + val callback = object : ObjectsCallback { + override fun onSuccess(result: Void?) { + fail("Should not call onSuccess for error case") + } + + override fun onError(exception: AblyException) { + errorReceived = exception + } + } + + asyncScope.launchWithVoidCallback(callback) { + delay(10) + throw AblyException.fromErrorInfo(ErrorInfo("Test error", 500, 50000)) + } + + // Wait for error to be received + assertWaiter { errorReceived != null } + + assertNotNull("Error should be received", errorReceived) + assertEquals("Test error", errorReceived?.errorInfo?.message) + assertEquals(500, errorReceived?.errorInfo?.statusCode) + } + + @Test + fun testObjectsAsyncScopeCallbackExceptionHandling() = runTest { + val asyncScope = ObjectsAsyncScope("test-channel") + var callback1Called = false + var callback2Called = false + + val callback1 = object : ObjectsCallback { + override fun onSuccess(result: String) { + callback1Called = true + throw RuntimeException("Callback exception") + } + + override fun onError(exception: AblyException) { + fail("Should not call onError when onSuccess throws") + } + } + + asyncScope.launchWithCallback(callback1) { "test result" } + // Wait for callback to be called + assertWaiter { callback1Called } + + val callback2 = object : ObjectsCallback { + override fun onSuccess(result: String) { + callback2Called = true + } + + override fun onError(exception: AblyException) { + fail("Should not call onError when onSuccess throws") + } + } + + asyncScope.launchWithCallback(callback2) { "test result" } + // Callback 2 should be called even if callback 1 throws an exception + assertWaiter { callback2Called } + } + + @Test + fun testObjectsAsyncScopeCancel() = runTest { + val asyncScope = ObjectsAsyncScope("test-channel") + var errorReceived = false + + val callback = object : ObjectsCallback { + override fun onSuccess(result: String) { + fail("Should not call onSuccess") + } + + override fun onError(exception: AblyException) { + errorReceived = true + } + } + + asyncScope.launchWithCallback(callback) { + delay(1000) // Long delay + "test result" + } + + // Cancel immediately + asyncScope.cancel(CancellationException("Test cancellation")) + + // Wait a bit to ensure cancellation takes effect + assertWaiter { errorReceived } + } + + @Test + fun testObjectsAsyncScopeNonAblyException() = runTest { + val asyncScope = ObjectsAsyncScope("test-channel") + var errorReceived = false + var error: AblyException? = null + + val callback = object : ObjectsCallback { + override fun onSuccess(result: String) { + fail("Should not call onSuccess for error case") + } + + override fun onError(exception: AblyException) { + errorReceived = true + error = exception + } + } + + asyncScope.launchWithCallback(callback) { + delay(10) + throw RuntimeException("Non-Ably exception") + } + + // Wait for error to be received + assertWaiter { errorReceived } + + // Non-Ably exceptions should be wrapped in AblyException + assertNotNull("Non-Ably exceptions should be wrapped in AblyException", error) + assertEquals("Error executing operation", error?.errorInfo?.message) + assertEquals(ErrorCode.BadRequest.code, error?.errorInfo?.code) + assertEquals(HttpStatusCode.BadRequest.code, error?.errorInfo?.statusCode) + + assertTrue(error?.cause is RuntimeException) + assertEquals("Non-Ably exception", error?.cause?.message) + } +} diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultLiveObjectsTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultLiveObjectsTest.kt index 381ab9b47..ea57da9a6 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultLiveObjectsTest.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultLiveObjectsTest.kt @@ -34,7 +34,7 @@ class DefaultLiveObjectsTest { // RTO4a - If the HAS_OBJECTS flag is 1, the server will shortly perform an OBJECT_SYNC sequence defaultLiveObjects.handleStateChange(ChannelState.attached, true) - assertWaiter { defaultLiveObjects.state == ObjectsState.SYNCING } + assertWaiter { defaultLiveObjects.state == ObjectsState.Syncing } // It is expected that the client will start a new sync sequence verify(exactly = 1) { @@ -59,7 +59,7 @@ class DefaultLiveObjectsTest { defaultLiveObjects.handleStateChange(ChannelState.attached, false) // Verify expected outcomes - assertWaiter { defaultLiveObjects.state == ObjectsState.SYNCED } // RTO4b4 + assertWaiter { defaultLiveObjects.state == ObjectsState.Synced } // RTO4b4 verify(exactly = 1) { defaultLiveObjects.objectsPool.resetToInitialPool(true) @@ -80,7 +80,7 @@ class DefaultLiveObjectsTest { val defaultLiveObjects = getDefaultLiveObjectsWithMockedDeps() // Ensure we're in INITIALIZED state - defaultLiveObjects.state = ObjectsState.INITIALIZED + defaultLiveObjects.state = ObjectsState.Initialized // RTO4a - Should start sync even with HAS_OBJECTS flag false when in INITIALIZED state defaultLiveObjects.handleStateChange(ChannelState.attached, false) diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt index 2d777f3ff..aaa3f5c29 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt @@ -17,7 +17,7 @@ class ObjectsManagerTest { @Test fun `(RTO5) ObjectsManager should handle object sync messages`() { val defaultLiveObjects = getDefaultLiveObjectsWithMockedDeps() - assertEquals(ObjectsState.INITIALIZED, defaultLiveObjects.state, "Initial state should be INITIALIZED") + assertEquals(ObjectsState.Initialized, defaultLiveObjects.state, "Initial state should be INITIALIZED") val objectsManager = defaultLiveObjects.ObjectsManager @@ -72,7 +72,7 @@ class ObjectsManagerTest { assertEquals("counter:testObject@2", newlyCreatedObjects[0].objectId) assertEquals("map:testObject@3", newlyCreatedObjects[1].objectId) - assertEquals(ObjectsState.SYNCED, defaultLiveObjects.state, "State should be SYNCED after sync sequence") + assertEquals(ObjectsState.Synced, defaultLiveObjects.state, "State should be SYNCED after sync sequence") // After sync `counter:testObject@4` will be removed from pool assertNull(objectsPool.get("counter:testObject@4")) assertEquals(4, objectsPool.size(), "Objects pool should contain 4 objects after sync including root") @@ -97,7 +97,7 @@ class ObjectsManagerTest { @Test fun `(RTO8) ObjectsManager should apply object operation when state is synced`() { val defaultLiveObjects = getDefaultLiveObjectsWithMockedDeps() - defaultLiveObjects.state = ObjectsState.SYNCED // Ensure we're in SYNCED state + defaultLiveObjects.state = ObjectsState.Synced // Ensure we're in SYNCED state val objectsManager = defaultLiveObjects.ObjectsManager @@ -165,7 +165,7 @@ class ObjectsManagerTest { @Test fun `(RTO7) ObjectsManager should buffer operations when not in sync, apply them after synced`() { val defaultLiveObjects = getDefaultLiveObjectsWithMockedDeps() - assertEquals(ObjectsState.INITIALIZED, defaultLiveObjects.state, "Initial state should be INITIALIZED") + assertEquals(ObjectsState.Initialized, defaultLiveObjects.state, "Initial state should be INITIALIZED") val objectsManager = defaultLiveObjects.ObjectsManager assertEquals(0, objectsManager.BufferedObjectOperations.size, "RTO7a1 - Initial buffer should be empty") @@ -176,7 +176,7 @@ class ObjectsManagerTest { mockZeroValuedObjects() // Set state to SYNCING - defaultLiveObjects.state = ObjectsState.SYNCING + defaultLiveObjects.state = ObjectsState.Syncing val objectMessage = ObjectMessage( id = "testId",