Skip to content

Commit 15577ce

Browse files
committed
chore: Add support for FDv1 polling synchronizer
1 parent a89650b commit 15577ce

File tree

10 files changed

+686
-33
lines changed

10 files changed

+686
-33
lines changed

ldclient/config.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,6 @@ class DataSystemConfig:
179179
data_store: Optional[FeatureStore] = None
180180
"""The (optional) persistent data store instance."""
181181

182-
# TODO(fdv2): Implement this synchronizer up and hook it up everywhere.
183-
# TODO(fdv2): Remove this when FDv2 is fully launched
184182
fdv1_fallback_synchronizer: Optional[Builder[Synchronizer]] = None
185183
"""An optional fallback synchronizer that will read from FDv1"""
186184

ldclient/impl/datasourcev2/polling.py

Lines changed: 128 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import urllib3
1515

1616
from ldclient.config import Config
17+
from ldclient.impl.datasource.feature_requester import LATEST_ALL_URI
1718
from ldclient.impl.datasystem import BasisResult, SelectorStore, Update
1819
from ldclient.impl.datasystem.protocolv2 import (
1920
Basis,
@@ -22,6 +23,7 @@
2223
DeleteObject,
2324
EventName,
2425
IntentCode,
26+
Payload,
2527
PutObject,
2628
Selector,
2729
ServerIntent
@@ -43,6 +45,7 @@
4345
DataSourceErrorKind,
4446
DataSourceState
4547
)
48+
from ldclient.versioned_data_kind import FEATURES, SEGMENTS
4649

4750
POLLING_ENDPOINT = "/sdk/poll"
4851

@@ -123,6 +126,15 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
123126
),
124127
)
125128

129+
fallback = result.exception.headers.get("X-LD-FD-Fallback") == 'true'
130+
if fallback:
131+
yield Update(
132+
state=DataSourceState.OFF,
133+
error=error_info,
134+
revert_to_fdv1=True
135+
)
136+
break
137+
126138
status_code = result.exception.status
127139
if is_http_error_recoverable(status_code):
128140
# TODO(fdv2): Add support for environment ID
@@ -158,6 +170,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
158170
state=DataSourceState.VALID,
159171
change_set=change_set,
160172
environment_id=headers.get("X-LD-EnvID"),
173+
revert_to_fdv1=headers.get('X-LD-FD-Fallback') == 'true'
161174
)
162175

163176
if self._event.wait(self._poll_interval):
@@ -262,7 +275,7 @@ def fetch(self, selector: Optional[Selector]) -> PollingResult:
262275

263276
if response.status >= 400:
264277
return _Fail(
265-
f"HTTP error {response}", UnsuccessfulResponseException(response.status)
278+
f"HTTP error {response}", UnsuccessfulResponseException(response.status, response.headers)
266279
)
267280

268281
headers = response.headers
@@ -375,3 +388,117 @@ def build(self) -> PollingDataSource:
375388
return PollingDataSource(
376389
poll_interval=self._config.poll_interval, requester=requester
377390
)
391+
392+
393+
# pylint: disable=too-few-public-methods
394+
class Urllib3FDv1PollingRequester:
395+
"""
396+
Urllib3PollingRequesterFDv1 is a Requester that uses urllib3 to make HTTP
397+
requests.
398+
"""
399+
400+
def __init__(self, config: Config):
401+
self._etag = None
402+
self._http = _http_factory(config).create_pool_manager(1, config.base_uri)
403+
self._config = config
404+
self._poll_uri = config.base_uri + LATEST_ALL_URI
405+
406+
def fetch(self, selector: Optional[Selector]) -> PollingResult:
407+
"""
408+
Fetches the data for the given selector.
409+
Returns a Result containing a tuple of ChangeSet and any request headers,
410+
or an error if the data could not be retrieved.
411+
"""
412+
query_params = {}
413+
if self._config.payload_filter_key is not None:
414+
query_params["filter"] = self._config.payload_filter_key
415+
416+
uri = self._poll_uri
417+
if len(query_params) > 0:
418+
filter_query = parse.urlencode(query_params)
419+
uri += f"?{filter_query}"
420+
421+
hdrs = _headers(self._config)
422+
hdrs["Accept-Encoding"] = "gzip"
423+
424+
if self._etag is not None:
425+
hdrs["If-None-Match"] = self._etag
426+
427+
response = self._http.request(
428+
"GET",
429+
uri,
430+
headers=hdrs,
431+
timeout=urllib3.Timeout(
432+
connect=self._config.http.connect_timeout,
433+
read=self._config.http.read_timeout,
434+
),
435+
retries=1,
436+
)
437+
438+
if response.status >= 400:
439+
return _Fail(
440+
f"HTTP error {response}", UnsuccessfulResponseException(response.status, response.headers)
441+
)
442+
443+
headers = response.headers
444+
445+
if response.status == 304:
446+
return _Success(value=(ChangeSetBuilder.no_changes(), headers))
447+
448+
data = json.loads(response.data.decode("UTF-8"))
449+
etag = headers.get("ETag")
450+
451+
if etag is not None:
452+
self._etag = etag
453+
454+
log.debug(
455+
"%s response status:[%d] ETag:[%s]",
456+
uri,
457+
response.status,
458+
etag,
459+
)
460+
461+
changeset_result = fdv1_polling_payload_to_changeset(data)
462+
if isinstance(changeset_result, _Success):
463+
return _Success(value=(changeset_result.value, headers))
464+
465+
return _Fail(
466+
error=changeset_result.error,
467+
exception=changeset_result.exception,
468+
)
469+
470+
471+
# pylint: disable=too-many-branches,too-many-return-statements
472+
def fdv1_polling_payload_to_changeset(data: dict) -> _Result[ChangeSet, str]:
473+
"""
474+
Converts a fdv1 polling payload into a ChangeSet.
475+
"""
476+
builder = ChangeSetBuilder()
477+
builder.start(IntentCode.TRANSFER_FULL)
478+
selector = Selector.no_selector()
479+
480+
# FDv1 uses "flags" instead of "features", so we need to map accordingly
481+
kind_mappings = [
482+
(FEATURES, "flags"),
483+
(SEGMENTS, "segments")
484+
]
485+
486+
for kind, fdv1_key in kind_mappings:
487+
kind_data = data.get(fdv1_key)
488+
if kind_data is None:
489+
continue
490+
if not isinstance(kind_data, dict):
491+
return _Fail(error=f"Invalid format: {fdv1_key} is not a dictionary")
492+
493+
for key in kind_data:
494+
flag_or_segment = kind_data.get(key)
495+
if flag_or_segment is None or not isinstance(flag_or_segment, dict):
496+
return _Fail(error=f"Invalid format: {key} is not a dictionary")
497+
498+
version = flag_or_segment.get('version')
499+
if version is None:
500+
return _Fail(error=f"Invalid format: {key} does not have a version set")
501+
502+
builder.add_put(kind, key, version, flag_or_segment)
503+
504+
return _Success(builder.finish(selector))

ldclient/impl/datasourcev2/streaming.py

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,12 @@
44
"""
55

66
import json
7-
from abc import abstractmethod
87
from time import time
9-
from typing import Callable, Generator, Iterable, Optional, Protocol, Tuple
8+
from typing import Callable, Generator, Optional, Tuple
109
from urllib import parse
1110

1211
from ld_eventsource import SSEClient
13-
from ld_eventsource.actions import Action, Event, Fault
12+
from ld_eventsource.actions import Event, Fault, Start
1413
from ld_eventsource.config import (
1514
ConnectStrategy,
1615
ErrorStrategy,
@@ -151,6 +150,15 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
151150
break
152151
continue
153152

153+
if isinstance(action, Start) and action.headers is not None:
154+
fallback = action.headers.get('X-LD-FD-Fallback') == 'true'
155+
if fallback:
156+
yield Update(
157+
state=DataSourceState.OFF,
158+
revert_to_fdv1=True
159+
)
160+
break
161+
154162
if not isinstance(action, Event):
155163
continue
156164

@@ -188,11 +196,6 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
188196
# if update is not None:
189197
# self._record_stream_init(False)
190198

191-
# if self._data_source_update_sink is not None:
192-
# self._data_source_update_sink.update_status(
193-
# DataSourceState.VALID, None
194-
# )
195-
196199
self._sse.close()
197200

198201
def stop(self):
@@ -288,6 +291,8 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:
288291
289292
If an update is provided, it should be forward upstream, regardless of
290293
whether or not we are going to retry this failure.
294+
295+
The return should be thought of (update, should_continue)
291296
"""
292297
if not self._running:
293298
return (None, False) # don't retry if we've been deliberately stopped
@@ -315,12 +320,18 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:
315320
str(error),
316321
)
317322

323+
if error.headers is not None and error.headers.get("X-LD-FD-Fallback") == 'true':
324+
update = Update(
325+
state=DataSourceState.OFF,
326+
error=error_info,
327+
revert_to_fdv1=True
328+
)
329+
return (update, False)
330+
318331
http_error_message_result = http_error_message(
319332
error.status, "stream connection"
320333
)
321-
322334
is_recoverable = is_http_error_recoverable(error.status)
323-
324335
update = Update(
325336
state=(
326337
DataSourceState.INTERRUPTED

ldclient/impl/datasystem/config.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from ldclient.impl.datasourcev2.polling import (
1010
PollingDataSource,
1111
PollingDataSourceBuilder,
12+
Urllib3FDv1PollingRequester,
1213
Urllib3PollingRequester
1314
)
1415
from ldclient.impl.datasourcev2.streaming import (
@@ -55,6 +56,17 @@ def synchronizers(
5556
self._secondary_synchronizer = secondary
5657
return self
5758

59+
def fdv1_compatible_synchronizer(
60+
self,
61+
fallback: Builder[Synchronizer]
62+
) -> "ConfigBuilder":
63+
"""
64+
Configures the SDK with a fallback synchronizer that is compatible with
65+
the Flag Delivery v1 API.
66+
"""
67+
self._fdv1_fallback_synchronizer = fallback
68+
return self
69+
5870
def data_store(self, data_store: FeatureStore, store_mode: DataStoreMode) -> "ConfigBuilder":
5971
"""
6072
Sets the data store configuration for the data system.
@@ -91,6 +103,17 @@ def builder(config: LDConfig) -> PollingDataSource:
91103
return builder
92104

93105

106+
def fdv1_fallback_ds_builder() -> Builder[PollingDataSource]:
107+
def builder(config: LDConfig) -> PollingDataSource:
108+
requester = Urllib3FDv1PollingRequester(config)
109+
polling_ds = PollingDataSourceBuilder(config)
110+
polling_ds.requester(requester)
111+
112+
return polling_ds.build()
113+
114+
return builder
115+
116+
94117
def streaming_ds_builder() -> Builder[StreamingDataSource]:
95118
def builder(config: LDConfig) -> StreamingDataSource:
96119
return StreamingDataSourceBuilder(config).build()
@@ -114,10 +137,12 @@ def default() -> ConfigBuilder:
114137

115138
polling_builder = polling_ds_builder()
116139
streaming_builder = streaming_ds_builder()
140+
fallback = fdv1_fallback_ds_builder()
117141

118142
builder = ConfigBuilder()
119143
builder.initializers([polling_builder])
120144
builder.synchronizers(streaming_builder, polling_builder)
145+
builder.fdv1_compatible_synchronizer(fallback)
121146

122147
return builder
123148

@@ -130,9 +155,11 @@ def streaming() -> ConfigBuilder:
130155
"""
131156

132157
streaming_builder = streaming_ds_builder()
158+
fallback = fdv1_fallback_ds_builder()
133159

134160
builder = ConfigBuilder()
135161
builder.synchronizers(streaming_builder)
162+
builder.fdv1_compatible_synchronizer(fallback)
136163

137164
return builder
138165

@@ -145,9 +172,11 @@ def polling() -> ConfigBuilder:
145172
"""
146173

147174
polling_builder: Builder[Synchronizer] = polling_ds_builder()
175+
fallback = fdv1_fallback_ds_builder()
148176

149177
builder = ConfigBuilder()
150178
builder.synchronizers(polling_builder)
179+
builder.fdv1_compatible_synchronizer(fallback)
151180

152181
return builder
153182

ldclient/impl/datasystem/fdv2.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -443,9 +443,13 @@ def _consume_synchronizer_results(
443443
# Update status
444444
self._data_source_status_provider.update_status(update.state, update.error)
445445

446+
# Check if we should revert to FDv1 immediately
447+
if update.revert_to_fdv1:
448+
return True, True
449+
446450
# Check for OFF state indicating permanent failure
447451
if update.state == DataSourceState.OFF:
448-
return True, update.revert_to_fdv1
452+
return True, False
449453

450454
# Check condition periodically
451455
current_status = self._data_source_status_provider.status

0 commit comments

Comments
 (0)