Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
111e15a
[ECO-5076] Implemented code for getRoot method
sacOO7 Jul 15, 2025
40c7936
[ECO-5076] Moved ObjectsState to separate file
sacOO7 Jul 15, 2025
21703e4
[ECO-5457] feat: Add Live Objects state change handling
sacOO7 Jul 16, 2025
33ff9e2
[ECO-5457] Created separate interface for ObjectsStateChange to avoid…
sacOO7 Jul 16, 2025
67c2b62
[ECO-5457] Refactored ObjectsStateSubscription to ObjectsSubscription
sacOO7 Jul 17, 2025
38a3fee
[ECO-5457] Moved callbackScope at global level to be shared amongst a…
sacOO7 Jul 18, 2025
f4cde35
[ECO-5457] Refactored LiveMap to accept DefaultLiveObjects in constru…
sacOO7 Jul 18, 2025
621878c
[ECO-5457] Refactored LiveCounter, implemented value method
sacOO7 Jul 18, 2025
922309a
[ECO-5457] Refactored LiveMapEntry and LiveObject#isTombstoned to be …
sacOO7 Jul 18, 2025
c29f9db
[ECO-5457] Added integration tests for objects accessors methods
sacOO7 Jul 20, 2025
6667b4d
[ECO-5457] Added integration tests LiveMap accessors, created fixture…
sacOO7 Jul 21, 2025
fd1d2cf
[ECO-5457] Added integration tests LiveCounter accessors
sacOO7 Jul 21, 2025
6d7f0fe
Merge branch 'feature/object-sync' into feature/objects-getroot
sacOO7 Jul 23, 2025
2e5a811
[ECO-5076] Removed unnecessary use of emitterScope since events are e…
sacOO7 Jul 23, 2025
6d20ffe
[ECO-5076] Updated ObjectsState enum to use PascalCase instead of all…
sacOO7 Jul 24, 2025
f7197a9
[ECO-5076] Updated impl. to dispose objects using ablyexception
sacOO7 Jul 24, 2025
26f5ac1
Merge branch 'feature/object-sync' into feature/objects-getroot
sacOO7 Jul 29, 2025
8b5ea81
[ECO-5457] Replaced GlobalCallbackScope with ObjectsCallbackScope with
sacOO7 Jul 29, 2025
1d83242
[ECO-5457] Renamed callbackScope to asyncScope for better context aro…
sacOO7 Jul 29, 2025
e931f6c
Merge branch 'feature/object-sync' into feature/objects-getroot
sacOO7 Aug 1, 2025
3f5d115
[ECO-5076] Fixed integration test fixtures in accordance with ObjectV…
sacOO7 Aug 1, 2025
2ddd598
[ECO-5506] Declared separate interface ObjectsCallback for async ops
sacOO7 Aug 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions lib/src/main/java/io/ably/lib/objects/Adapter.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package io.ably.lib.objects;

import io.ably.lib.realtime.AblyRealtime;
import io.ably.lib.realtime.ChannelState;
import io.ably.lib.realtime.CompletionListener;
import io.ably.lib.types.AblyException;
import io.ably.lib.types.ChannelMode;
import io.ably.lib.types.ChannelOptions;
import io.ably.lib.types.ProtocolMessage;
import io.ably.lib.util.Log;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -34,4 +37,32 @@ public void send(@NotNull ProtocolMessage msg, @NotNull CompletionListener liste
public int maxMessageSizeLimit() {
return ably.connection.connectionManager.maxMessageSize;
}

@Override
public ChannelMode[] getChannelModes(@NotNull String channelName) {
if (ably.channels.containsKey(channelName)) {
// RTO2a - channel.modes is only populated on channel attachment, so use it only if it is set
ChannelMode[] modes = ably.channels.get(channelName).getModes();
if (modes != null) {
return modes;
}
// RTO2b - otherwise as a best effort use user provided channel options
ChannelOptions options = ably.channels.get(channelName).getOptions();
if (options != null && options.hasModes()) {
return options.modes;
}
return null;
}
Log.e(TAG, "getChannelMode(): channel not found: " + channelName);
return null;
}

@Override
public ChannelState getChannelState(@NotNull String channelName) {
if (ably.channels.containsKey(channelName)) {
return ably.channels.get(channelName).state;
}
Log.e(TAG, "getChannelState(): channel not found: " + channelName);
return null;
}
}
5 changes: 2 additions & 3 deletions lib/src/main/java/io/ably/lib/objects/LiveCounter.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.ably.lib.objects;

import io.ably.lib.types.Callback;
import org.jetbrains.annotations.Blocking;
import org.jetbrains.annotations.NonBlocking;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -33,7 +32,7 @@ public interface LiveCounter {
* @param callback the callback to be invoked upon completion of the operation.
*/
@NonBlocking
void incrementAsync(@NotNull Callback<Void> callback);
void incrementAsync(@NotNull ObjectsCallback<Void> callback);

/**
* Decrements the value of the counter by 1.
Expand All @@ -49,7 +48,7 @@ public interface LiveCounter {
* @param callback the callback to be invoked upon completion of the operation.
*/
@NonBlocking
void decrementAsync(@NotNull Callback<Void> callback);
void decrementAsync(@NotNull ObjectsCallback<Void> callback);

/**
* Retrieves the current value of the counter.
Expand Down
10 changes: 7 additions & 3 deletions lib/src/main/java/io/ably/lib/objects/LiveMap.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.ably.lib.objects;

import io.ably.lib.types.Callback;
import org.jetbrains.annotations.Blocking;
import org.jetbrains.annotations.NonBlocking;
import org.jetbrains.annotations.Contract;
Expand All @@ -24,6 +23,7 @@ public interface LiveMap {
* If the value associated with the provided key is an objectId string of another LiveObject, a reference to that LiveObject
* is returned, provided it exists in the local pool and is not tombstoned. Otherwise, null is returned.
* If the value is not an objectId, then that value is returned.
* Spec: RTLM5, RTLM5a
*
* @param keyName the key whose associated value is to be returned.
* @return the value associated with the specified key, or null if the key does not exist.
Expand All @@ -33,6 +33,7 @@ public interface LiveMap {

/**
* Retrieves all entries (key-value pairs) in the map.
* Spec: RTLM11, RTLM11a
*
* @return an iterable collection of all entries in the map.
*/
Expand All @@ -42,6 +43,7 @@ public interface LiveMap {

/**
* Retrieves all keys in the map.
* Spec: RTLM12, RTLM12a
*
* @return an iterable collection of all keys in the map.
*/
Expand All @@ -51,6 +53,7 @@ public interface LiveMap {

/**
* Retrieves all values in the map.
* Spec: RTLM13, RTLM13a
*
* @return an iterable collection of all values in the map.
*/
Expand Down Expand Up @@ -85,6 +88,7 @@ public interface LiveMap {

/**
* Retrieves the number of entries in the map.
* Spec: RTLM10, RTLM10a
*
* @return the size of the map.
*/
Expand All @@ -104,7 +108,7 @@ public interface LiveMap {
* @param callback the callback to handle the result or any errors.
*/
@NonBlocking
void setAsync(@NotNull String keyName, @NotNull Object value, @NotNull Callback<Void> callback);
void setAsync(@NotNull String keyName, @NotNull Object value, @NotNull ObjectsCallback<Void> callback);

/**
* Asynchronously removes the specified key and its associated value from the map.
Expand All @@ -117,5 +121,5 @@ public interface LiveMap {
* @param callback the callback to handle the result or any errors.
*/
@NonBlocking
void removeAsync(@NotNull String keyName, @NotNull Callback<Void> callback);
void removeAsync(@NotNull String keyName, @NotNull ObjectsCallback<Void> callback);
}
14 changes: 7 additions & 7 deletions lib/src/main/java/io/ably/lib/objects/LiveObjects.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.ably.lib.objects;

import io.ably.lib.types.Callback;
import io.ably.lib.objects.state.ObjectsStateChange;
import org.jetbrains.annotations.Blocking;
import org.jetbrains.annotations.NonBlocking;
import org.jetbrains.annotations.NotNull;
Expand All @@ -16,7 +16,7 @@
* <p>Implementations of this interface must be thread-safe as they may be accessed
* from multiple threads concurrently.
*/
public interface LiveObjects {
public interface LiveObjects extends ObjectsStateChange {

/**
* Retrieves the root LiveMap object.
Expand Down Expand Up @@ -95,7 +95,7 @@ public interface LiveObjects {
* @param callback the callback to handle the result or error.
*/
@NonBlocking
void getRootAsync(@NotNull Callback<@NotNull LiveMap> callback);
void getRootAsync(@NotNull ObjectsCallback<@NotNull LiveMap> callback);

/**
* Asynchronously creates a new LiveMap based on an existing LiveMap.
Expand All @@ -108,7 +108,7 @@ public interface LiveObjects {
* @param callback the callback to handle the result or error.
*/
@NonBlocking
void createMapAsync(@NotNull LiveMap liveMap, @NotNull Callback<@NotNull LiveMap> callback);
void createMapAsync(@NotNull LiveMap liveMap, @NotNull ObjectsCallback<@NotNull LiveMap> callback);

/**
* Asynchronously creates a new LiveMap based on a LiveCounter.
Expand All @@ -121,7 +121,7 @@ public interface LiveObjects {
* @param callback the callback to handle the result or error.
*/
@NonBlocking
void createMapAsync(@NotNull LiveCounter liveCounter, @NotNull Callback<@NotNull LiveMap> callback);
void createMapAsync(@NotNull LiveCounter liveCounter, @NotNull ObjectsCallback<@NotNull LiveMap> callback);

/**
* Asynchronously creates a new LiveMap based on a standard Java Map.
Expand All @@ -134,7 +134,7 @@ public interface LiveObjects {
* @param callback the callback to handle the result or error.
*/
@NonBlocking
void createMapAsync(@NotNull Map<String, Object> map, @NotNull Callback<@NotNull LiveMap> callback);
void createMapAsync(@NotNull Map<String, Object> map, @NotNull ObjectsCallback<@NotNull LiveMap> callback);

/**
* Asynchronously creates a new LiveCounter with an initial value.
Expand All @@ -147,5 +147,5 @@ public interface LiveObjects {
* @param callback the callback to handle the result or error.
*/
@NonBlocking
void createCounterAsync(@NotNull Long initialValue, @NotNull Callback<@NotNull LiveCounter> callback);
void createCounterAsync(@NotNull Long initialValue, @NotNull ObjectsCallback<@NotNull LiveCounter> callback);
}
22 changes: 22 additions & 0 deletions lib/src/main/java/io/ably/lib/objects/LiveObjectsAdapter.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package io.ably.lib.objects;

import io.ably.lib.realtime.ChannelState;
import io.ably.lib.realtime.CompletionListener;
import io.ably.lib.types.AblyException;
import io.ably.lib.types.ChannelMode;
import io.ably.lib.types.ProtocolMessage;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public interface LiveObjectsAdapter {
/**
Expand Down Expand Up @@ -31,5 +34,24 @@ public interface LiveObjectsAdapter {
* @return the maximum message size limit in bytes.
*/
int maxMessageSizeLimit();

/**
* Retrieves the channel modes for a specific channel.
* This method returns the modes that are set for the specified channel.
*
* @param channelName the name of the channel for which to retrieve the modes
* @return the array of channel modes for the specified channel, or null if the channel is not found
* Spec: RTO2a, RTO2b
*/
@Nullable ChannelMode[] getChannelModes(@NotNull String channelName);

/**
* Retrieves the current state of a specific channel.
* This method returns the state of the specified channel, which indicates its connection status.
*
* @param channelName the name of the channel for which to retrieve the state
* @return the current state of the specified channel, or null if the channel is not found
*/
@Nullable ChannelState getChannelState(@NotNull String channelName);
}

2 changes: 2 additions & 0 deletions lib/src/main/java/io/ably/lib/objects/LiveObjectsPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,15 @@ public interface LiveObjectsPlugin {
* Disposes of the LiveObjects instance associated with the specified channel name.
* This method removes the LiveObjects instance for the given channel, releasing any
* resources associated with it.
* This is invoked when ablyRealtimeClient.channels.release(channelName) is called
*
* @param channelName the name of the channel whose LiveObjects instance is to be removed.
*/
void dispose(@NotNull String channelName);

/**
* Disposes of the plugin instance and all underlying resources.
* This is invoked when ablyRealtimeClient.close() is called
*/
void dispose();
}
31 changes: 31 additions & 0 deletions lib/src/main/java/io/ably/lib/objects/ObjectsCallback.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.ably.lib.objects;

import io.ably.lib.types.AblyException;

/**
* Callback interface for handling results of asynchronous LiveObjects operations.
* Used for operations like creating LiveMaps/LiveCounters, modifying entries, and retrieving objects.
* Callbacks are executed on background threads managed by the LiveObjects system.
*
* @param <T> the type of the result returned by the asynchronous operation
*/
public interface ObjectsCallback<T> {

/**
* Called when the asynchronous operation completes successfully.
* For modification operations (set, remove, increment), result is typically Void.
* For creation/retrieval operations, result contains the created/retrieved object.
*
* @param result the result of the operation, may be null for modification operations
*/
void onSuccess(T result);

/**
* Called when the asynchronous operation fails.
* The exception contains detailed error information including error codes and messages.
* Common errors include network issues, authentication failures, and validation errors.
*
* @param exception the exception that occurred during the operation
*/
void onError(AblyException exception);
}
22 changes: 22 additions & 0 deletions lib/src/main/java/io/ably/lib/objects/ObjectsSubscription.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.ably.lib.objects;

/**
* Represents a objects subscription that can be unsubscribed from.
* This interface provides a way to clean up and remove subscriptions when they are no longer needed.
* Example usage:
* <pre>
* {@code
* ObjectsSubscription s = objects.subscribe(ObjectsStateEvent.SYNCING, new ObjectsStateListener() {});
* // Later when done with the subscription
* s.unsubscribe();
* }
* </pre>
*/
public interface ObjectsSubscription {
/**
* This method should be called when the subscription is no longer needed,
* it will make sure no further events will be sent to the subscriber and
* that references to the subscriber are cleaned up.
*/
void unsubscribe();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package io.ably.lib.objects.state;

import io.ably.lib.objects.ObjectsSubscription;
import org.jetbrains.annotations.NonBlocking;
import org.jetbrains.annotations.NotNull;

public interface ObjectsStateChange {
/**
* Subscribes to a specific Live Objects synchronization state event.
*
* <p>This method registers the provided listener to be notified when the specified
* synchronization state event occurs. The returned subscription can be used to
* unsubscribe later when the notifications are no longer needed.
*
* @param event the synchronization state event to subscribe to (SYNCING or SYNCED)
* @param listener the listener that will be called when the event occurs
* @return a subscription object that can be used to unsubscribe from the event
*/
@NonBlocking
ObjectsSubscription on(@NotNull ObjectsStateEvent event, @NotNull ObjectsStateChange.Listener listener);

/**
* Unsubscribes the specified listener from all synchronization state events.
*
* <p>After calling this method, the provided listener will no longer receive
* any synchronization state event notifications.
*
* @param listener the listener to unregister from all events
*/
@NonBlocking
void off(@NotNull ObjectsStateChange.Listener listener);

/**
* Unsubscribes all listeners from all synchronization state events.
*
* <p>After calling this method, no listeners will receive any synchronization
* state event notifications until new listeners are registered.
*/
@NonBlocking
void offAll();

/**
* Interface for receiving notifications about Live Objects synchronization state changes.
* <p>
* Implement this interface and register it with an ObjectsStateEmitter to be notified
* when synchronization state transitions occur.
*/
interface Listener {
/**
* Called when the synchronization state changes.
*
* @param objectsStateEvent The new state event (SYNCING or SYNCED)
*/
void onStateChanged(ObjectsStateEvent objectsStateEvent);
}
}
19 changes: 19 additions & 0 deletions lib/src/main/java/io/ably/lib/objects/state/ObjectsStateEvent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.ably.lib.objects.state;

/**
* Represents the synchronization state of Ably Live Objects.
* <p>
* This enum is used to notify listeners about state changes in the synchronization process.
* Clients can register an {@link ObjectsStateChange.Listener} to receive these events.
*/
public enum ObjectsStateEvent {
/**
* Indicates that synchronization between local and remote objects is in progress.
*/
SYNCING,

/**
* Indicates that synchronization has completed successfully and objects are in sync.
*/
SYNCED
}
7 changes: 7 additions & 0 deletions lib/src/main/java/io/ably/lib/realtime/ChannelBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -1289,6 +1289,10 @@ public ChannelMode[] getModes() {
return modes.toArray(new ChannelMode[modes.size()]);
}

public ChannelOptions getOptions() {
return options;
}

/************************************
* internal general
* @throws AblyException
Expand Down Expand Up @@ -1329,6 +1333,9 @@ else if(stateChange.current.equals(failureState)) {
state = ChannelState.initialized;
this.decodingContext = new DecodingContext();
this.liveObjectsPlugin = liveObjectsPlugin;
if (liveObjectsPlugin != null) {
liveObjectsPlugin.getInstance(name); // Make objects instance ready to process sync messages
}
this.annotations = new RealtimeAnnotations(
this,
new RestAnnotations(name, ably.http, ably.options, options)
Expand Down
Loading
Loading