Skip to content

Commit 181ae3d

Browse files
authored
Merge pull request #620 from ably/ECO-5456/vcdiff-support
[ECO-5456] feat: add VCDiff support for delta message decoding
2 parents 91d5759 + fd3b028 commit 181ae3d

File tree

14 files changed

+485
-17
lines changed

14 files changed

+485
-17
lines changed

ably/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@
55
from ably.types.capability import Capability
66
from ably.types.channelsubscription import PushChannelSubscription
77
from ably.types.device import DeviceDetails
8-
from ably.types.options import Options
8+
from ably.types.options import Options, VCDiffDecoder
99
from ably.util.crypto import CipherParams
1010
from ably.util.exceptions import AblyException, AblyAuthException, IncompatibleClientIdException
11+
from ably.vcdiff.default_vcdiff_decoder import AblyVCDiffDecoder
1112

1213
import logging
1314

ably/realtime/realtime_channel.py

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
from __future__ import annotations
2+
23
import asyncio
34
import logging
45
from typing import Optional, TYPE_CHECKING, Dict, Any
56
from ably.realtime.connection import ConnectionState
6-
from ably.transport.websockettransport import ProtocolMessageAction
77
from ably.rest.channel import Channel, Channels as RestChannels
8+
from ably.transport.websockettransport import ProtocolMessageAction
89
from ably.types.channelstate import ChannelState, ChannelStateChange
910
from ably.types.flags import Flag, has_flag
1011
from ably.types.message import Message
12+
from ably.types.mixins import DecodingContext
1113
from ably.util.eventemitter import EventEmitter
1214
from ably.util.exceptions import AblyException
1315
from ably.util.helper import Timer, is_callable_or_coroutine
@@ -123,6 +125,11 @@ def __init__(self, realtime: AblyRealtime, name: str, channel_options: Optional[
123125
self.__channel_options = channel_options or ChannelOptions()
124126
self.__params: Optional[Dict[str, str]] = None
125127

128+
# Delta-specific fields for RTL19/RTL20 compliance
129+
vcdiff_decoder = self.__realtime.options.vcdiff_decoder if self.__realtime.options.vcdiff_decoder else None
130+
self.__decoding_context = DecodingContext(vcdiff_decoder=vcdiff_decoder)
131+
self.__decode_failure_recovery_in_progress = False
132+
126133
# Used to listen to state changes internally, if we use the public event emitter interface then internals
127134
# will be disrupted if the user called .off() to remove all listeners
128135
self.__internal_state_emitter = EventEmitter()
@@ -415,8 +422,16 @@ def _on_message(self, proto_msg: dict) -> None:
415422
else:
416423
self._request_state(ChannelState.ATTACHING)
417424
elif action == ProtocolMessageAction.MESSAGE:
418-
messages = Message.from_encoded_array(proto_msg.get('messages'))
419-
self.__channel_serial = channel_serial
425+
messages = []
426+
try:
427+
messages = Message.from_encoded_array(proto_msg.get('messages'), context=self.__decoding_context)
428+
self.__decoding_context.last_message_id = messages[-1].id
429+
self.__channel_serial = channel_serial
430+
except AblyException as e:
431+
if e.code == 40018: # Delta decode failure - start recovery
432+
self._start_decode_failure_recovery(e)
433+
else:
434+
log.error(f"Message processing error {e}. Skip messages {proto_msg.get('messages')}")
420435
for message in messages:
421436
self.__message_emitter._emit(message.name, message)
422437
elif action == ProtocolMessageAction.ERROR:
@@ -458,6 +473,9 @@ def _notify_state(self, state: ChannelState, reason: Optional[AblyException] = N
458473
if state in (ChannelState.DETACHED, ChannelState.SUSPENDED, ChannelState.FAILED):
459474
self.__channel_serial = None
460475

476+
if state != ChannelState.ATTACHING:
477+
self.__decode_failure_recovery_in_progress = False
478+
461479
state_change = ChannelStateChange(self.__state, state, resumed, reason=reason)
462480

463481
self.__state = state
@@ -554,6 +572,24 @@ def params(self) -> Dict[str, str]:
554572
"""Get channel parameters"""
555573
return self.__params
556574

575+
def _start_decode_failure_recovery(self, error: AblyException) -> None:
576+
"""Start RTL18 decode failure recovery procedure"""
577+
578+
if self.__decode_failure_recovery_in_progress:
579+
log.info('VCDiff recovery process already started, skipping')
580+
return
581+
582+
self.__decode_failure_recovery_in_progress = True
583+
584+
# RTL18a: Log error with code 40018
585+
log.error(f'VCDiff decode failure: {error}')
586+
587+
# RTL18b: Message is already discarded by not processing it
588+
589+
# RTL18c: Send ATTACH with previous channel serial and transition to ATTACHING
590+
self._notify_state(ChannelState.ATTACHING, reason=error)
591+
self._check_pending_state()
592+
557593

558594
class Channels(RestChannels):
559595
"""Creates and destroys RealtimeChannel objects.

ably/types/message.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import logging
44

55
from ably.types.typedbuffer import TypedBuffer
6-
from ably.types.mixins import EncodeDataMixin
6+
from ably.types.mixins import EncodeDataMixin, DeltaExtras
77
from ably.util.crypto import CipherData
88
from ably.util.exceptions import AblyException
99

@@ -178,7 +178,7 @@ def as_dict(self, binary=False):
178178
return request_body
179179

180180
@staticmethod
181-
def from_encoded(obj, cipher=None):
181+
def from_encoded(obj, cipher=None, context=None):
182182
id = obj.get('id')
183183
name = obj.get('name')
184184
data = obj.get('data')
@@ -188,7 +188,12 @@ def from_encoded(obj, cipher=None):
188188
encoding = obj.get('encoding', '')
189189
extras = obj.get('extras', None)
190190

191-
decoded_data = Message.decode(data, encoding, cipher)
191+
delta_extra = DeltaExtras(extras)
192+
if delta_extra.from_id and delta_extra.from_id != context.last_message_id:
193+
raise AblyException(f"Delta message decode failure - previous message not available. "
194+
f"Message id = {id}", 400, 40018)
195+
196+
decoded_data = Message.decode(data, encoding, cipher, context)
192197

193198
return Message(
194199
id=id,

ably/types/mixins.py

Lines changed: 62 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,29 @@
33
import logging
44

55
from ably.util.crypto import CipherData
6+
from ably.util.exceptions import AblyException
67

78

89
log = logging.getLogger(__name__)
910

11+
ENC_VCDIFF = "vcdiff"
12+
13+
14+
class DeltaExtras:
15+
def __init__(self, extras):
16+
self.from_id = None
17+
if extras and 'delta' in extras:
18+
delta_info = extras['delta']
19+
if isinstance(delta_info, dict):
20+
self.from_id = delta_info.get('from')
21+
22+
23+
class DecodingContext:
24+
def __init__(self, base_payload=None, last_message_id=None, vcdiff_decoder=None):
25+
self.base_payload = base_payload
26+
self.last_message_id = last_message_id
27+
self.vcdiff_decoder = vcdiff_decoder
28+
1029

1130
class EncodeDataMixin:
1231

@@ -25,10 +44,12 @@ def encoding(self, encoding):
2544
self._encoding_array = encoding.strip('/').split('/')
2645

2746
@staticmethod
28-
def decode(data, encoding='', cipher=None):
47+
def decode(data, encoding='', cipher=None, context=None):
2948
encoding = encoding.strip('/')
3049
encoding_list = encoding.split('/')
3150

51+
last_payload = data
52+
3253
while encoding_list:
3354
encoding = encoding_list.pop()
3455
if not encoding:
@@ -46,10 +67,43 @@ def decode(data, encoding='', cipher=None):
4667
if isinstance(data, list) or isinstance(data, dict):
4768
continue
4869
data = json.loads(data)
49-
elif encoding == 'base64' and isinstance(data, bytes):
50-
data = bytearray(base64.b64decode(data))
5170
elif encoding == 'base64':
52-
data = bytearray(base64.b64decode(data.encode('utf-8')))
71+
data = bytearray(base64.b64decode(data)) if isinstance(data, bytes) \
72+
else bytearray(base64.b64decode(data.encode('utf-8')))
73+
if not encoding_list:
74+
last_payload = data
75+
elif encoding == ENC_VCDIFF:
76+
if not context or not context.vcdiff_decoder:
77+
log.error('Message cannot be decoded as no VCDiff decoder available')
78+
raise AblyException('VCDiff decoder not available', 40019, 40019)
79+
80+
if not context.base_payload:
81+
log.error('VCDiff decoding requires base payload')
82+
raise AblyException('VCDiff decode failure', 40018, 40018)
83+
84+
try:
85+
# Convert base payload to bytes if it's a string
86+
base_data = context.base_payload
87+
if isinstance(base_data, str):
88+
base_data = base_data.encode('utf-8')
89+
else:
90+
base_data = bytes(base_data)
91+
92+
# Convert delta to bytes if needed
93+
delta_data = data
94+
if isinstance(delta_data, (bytes, bytearray)):
95+
delta_data = bytes(delta_data)
96+
else:
97+
delta_data = str(delta_data).encode('utf-8')
98+
99+
# Decode with VCDiff
100+
data = bytearray(context.vcdiff_decoder.decode(delta_data, base_data))
101+
last_payload = data
102+
103+
except Exception as e:
104+
log.error(f'VCDiff decode failed: {e}')
105+
raise AblyException('VCDiff decode failure', 40018, 40018)
106+
53107
elif encoding.startswith('%s+' % CipherData.ENCODING_ID):
54108
if not cipher:
55109
log.error('Message cannot be decrypted as the channel is '
@@ -67,9 +121,11 @@ def decode(data, encoding='', cipher=None):
67121
encoding_list.append(encoding)
68122
break
69123

124+
if context:
125+
context.base_payload = last_payload
70126
encoding = '/'.join(encoding_list)
71127
return {'encoding': encoding, 'data': data}
72128

73129
@classmethod
74-
def from_encoded_array(cls, objs, cipher=None):
75-
return [cls.from_encoded(obj, cipher=cipher) for obj in objs]
130+
def from_encoded_array(cls, objs, cipher=None, context=None):
131+
return [cls.from_encoded(obj, cipher=cipher, context=context) for obj in objs]

ably/types/options.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,38 @@
11
import random
22
import logging
3+
from abc import ABC, abstractmethod
34

45
from ably.transport.defaults import Defaults
56
from ably.types.authoptions import AuthOptions
67

78
log = logging.getLogger(__name__)
89

910

11+
class VCDiffDecoder(ABC):
12+
"""
13+
The VCDiffDecoder class defines the interface for delta decoding operations.
14+
15+
This class serves as an abstract base class for implementing delta decoding
16+
algorithms, which are used to generate target bytes from compressed delta
17+
bytes and base bytes. Subclasses of this class should implement the decode
18+
method to handle the specifics of delta decoding. The decode method typically
19+
takes a delta bytes and base bytes as input and returns the decoded output.
20+
21+
"""
22+
@abstractmethod
23+
def decode(self, delta: bytes, base: bytes) -> bytes:
24+
pass
25+
26+
1027
class Options(AuthOptions):
1128
def __init__(self, client_id=None, log_level=0, tls=True, rest_host=None, realtime_host=None, port=0,
1229
tls_port=0, use_binary_protocol=True, queue_messages=False, recover=False, environment=None,
1330
http_open_timeout=None, http_request_timeout=None, realtime_request_timeout=None,
1431
http_max_retry_count=None, http_max_retry_duration=None, fallback_hosts=None,
1532
fallback_retry_timeout=None, disconnected_retry_timeout=None, idempotent_rest_publishing=None,
1633
loop=None, auto_connect=True, suspended_retry_timeout=None, connectivity_check_url=None,
17-
channel_retry_timeout=Defaults.channel_retry_timeout, add_request_ids=False, **kwargs):
34+
channel_retry_timeout=Defaults.channel_retry_timeout, add_request_ids=False,
35+
vcdiff_decoder: VCDiffDecoder = None, **kwargs):
1836

1937
super().__init__(**kwargs)
2038

@@ -77,6 +95,7 @@ def __init__(self, client_id=None, log_level=0, tls=True, rest_host=None, realti
7795
self.__connectivity_check_url = connectivity_check_url
7896
self.__fallback_realtime_host = None
7997
self.__add_request_ids = add_request_ids
98+
self.__vcdiff_decoder = vcdiff_decoder
8099

81100
self.__rest_hosts = self.__get_rest_hosts()
82101
self.__realtime_hosts = self.__get_realtime_hosts()
@@ -259,6 +278,10 @@ def fallback_realtime_host(self, value):
259278
def add_request_ids(self):
260279
return self.__add_request_ids
261280

281+
@property
282+
def vcdiff_decoder(self):
283+
return self.__vcdiff_decoder
284+
262285
def __get_rest_hosts(self):
263286
"""
264287
Return the list of hosts as they should be tried. First comes the main

ably/types/presence.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ def extras(self):
8686
return self.__extras
8787

8888
@staticmethod
89-
def from_encoded(obj, cipher=None):
89+
def from_encoded(obj, cipher=None, context=None):
9090
id = obj.get('id')
9191
action = obj.get('action', PresenceAction.ENTER)
9292
client_id = obj.get('clientId')

ably/vcdiff/__init__.py

Whitespace-only changes.
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
"""
2+
VCDiff Decoder for Ably Python SDK
3+
4+
This module provides a production-ready VCDiff decoder using the vcdiff-decoder library.
5+
It implements the VCDiffDecoder interface.
6+
7+
Usage:
8+
from ably.vcdiff import AblyVCDiffDecoder, AblyRealtime
9+
10+
# Create VCDiff decoder
11+
vcdiff_decoder = AblyVCDiffDecoder()
12+
13+
# Create client with decoder
14+
client = AblyRealtime(key="your-key", vcdiff_decoder=vcdiff_decoder)
15+
16+
# Get channel with delta enabled
17+
channel = client.channels.get("test", ChannelOptions(params={"delta": "vcdiff"}))
18+
"""
19+
20+
import logging
21+
22+
from ably.types.options import VCDiffDecoder
23+
from ably.util.exceptions import AblyException
24+
25+
log = logging.getLogger(__name__)
26+
27+
28+
class AblyVCDiffDecoder(VCDiffDecoder):
29+
"""
30+
Production VCDiff decoder using Ably's vcdiff-decoder library.
31+
32+
Raises:
33+
ImportError: If vcdiff is not installed
34+
AblyException: If VCDiff decoding fails
35+
"""
36+
37+
def __init__(self):
38+
"""Initialize the VCDiff plugin.
39+
40+
Raises:
41+
ImportError: If vcdiff-decoder library is not available
42+
"""
43+
try:
44+
import vcdiff_decoder as vcdiff
45+
self._vcdiff = vcdiff
46+
except ImportError as e:
47+
log.error("vcdiff library not found. Install with: pip install ably[vcdiff]")
48+
raise ImportError(
49+
"VCDiff plugin requires vcdiff library. "
50+
"Install with: pip install ably[vcdiff]"
51+
) from e
52+
53+
def decode(self, delta: bytes, base: bytes) -> bytes:
54+
"""
55+
Decode a VCDiff delta against a base payload.
56+
57+
Args:
58+
delta: The VCDiff-encoded delta data
59+
base: The base payload to apply the delta to
60+
61+
Returns:
62+
bytes: The decoded message payload
63+
64+
Raises:
65+
AblyException: If VCDiff decoding fails (error code 40018)
66+
"""
67+
if not isinstance(delta, bytes):
68+
raise TypeError("Delta must be bytes")
69+
if not isinstance(base, bytes):
70+
raise TypeError("Base must be bytes")
71+
72+
try:
73+
# Use the vcdiff library to decode
74+
result = self._vcdiff.decode(base, delta)
75+
return result
76+
except Exception as e:
77+
log.error(f"VCDiff decode failed: {e}")
78+
raise AblyException(f"VCDiff decode failure: {e}", 40018, 40018) from e
79+
80+
81+
# Export for easy importing
82+
__all__ = ['AblyVCDiffDecoder']

0 commit comments

Comments
 (0)