Skip to content

Commit 3c4b92d

Browse files
authored
feat: streaming connector (#368)
1 parent 69a94f8 commit 3c4b92d

File tree

9 files changed

+549
-20
lines changed

9 files changed

+549
-20
lines changed

UnleashClient/__init__.py

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
BootstrapConnector,
2121
OfflineConnector,
2222
PollingConnector,
23+
StreamingConnector,
2324
)
2425
from UnleashClient.constants import (
2526
DISABLED_VARIATION,
@@ -43,6 +44,11 @@
4344
from .cache import BaseCache, FileCache
4445
from .utils import LOGGER, InstanceAllowType, InstanceCounter
4546

47+
try:
48+
from typing import Literal, TypedDict
49+
except ImportError:
50+
from typing_extensions import Literal, TypedDict # type: ignore
51+
4652
INSTANCES = InstanceCounter()
4753
_BASE_CONTEXT_FIELDS = [
4854
"userId",
@@ -55,6 +61,10 @@
5561
]
5662

5763

64+
class ExperimentalMode(TypedDict, total=False):
65+
type: Literal["streaming", "polling"]
66+
67+
5868
def build_ready_callback(
5969
event_callback: Optional[Callable[[BaseEvent], None]] = None,
6070
) -> Optional[Callable]:
@@ -113,6 +123,7 @@ class UnleashClient:
113123
:param scheduler_executor: Name of APSCheduler executor to use if using a custom scheduler.
114124
:param multiple_instance_mode: Determines how multiple instances being instantiated is handled by the SDK, when set to InstanceAllowType.BLOCK, the client constructor will fail when more than one instance is detected, when set to InstanceAllowType.WARN, multiple instances will be allowed but log a warning, when set to InstanceAllowType.SILENTLY_ALLOW, no warning or failure will be raised when instantiating multiple instances of the client. Defaults to InstanceAllowType.WARN
115125
:param event_callback: Function to call if impression events are enabled. WARNING: Depending on your event library, this may have performance implications!
126+
:param experimental_mode: Optional dict to configure mode. Use {"type": "streaming"} to enable streaming or {"type": "polling"} (default).
116127
"""
117128

118129
def __init__(
@@ -140,6 +151,7 @@ def __init__(
140151
scheduler_executor: Optional[str] = None,
141152
multiple_instance_mode: InstanceAllowType = InstanceAllowType.WARN,
142153
event_callback: Optional[Callable[[BaseEvent], None]] = None,
154+
experimental_mode: Optional[ExperimentalMode] = None,
143155
) -> None:
144156
custom_headers = custom_headers or {}
145157
custom_options = custom_options or {}
@@ -173,6 +185,7 @@ def __init__(
173185
self.unleash_verbose_log_level = verbose_log_level
174186
self.unleash_event_callback = event_callback
175187
self._ready_callback = build_ready_callback(event_callback)
188+
self.connector_mode: ExperimentalMode = experimental_mode or {"type": "polling"}
176189

177190
self._do_instance_check(multiple_instance_mode)
178191

@@ -279,6 +292,7 @@ def initialize_client(self, fetch_toggles: bool = True) -> None:
279292
**self.unleash_custom_headers,
280293
"unleash-connection-id": self.connection_id,
281294
"unleash-appname": self.unleash_app_name,
295+
"unleash-instanceid": self.unleash_instance_id,
282296
"unleash-sdk": f"{SDK_NAME}:{SDK_VERSION}",
283297
}
284298

@@ -295,8 +309,19 @@ def initialize_client(self, fetch_toggles: bool = True) -> None:
295309
self.strategy_mapping,
296310
self.unleash_request_timeout,
297311
)
312+
mode = self.connector_mode.get("type", "polling")
298313

299-
if fetch_toggles:
314+
if mode == "streaming" and fetch_toggles:
315+
self.connector = StreamingConnector(
316+
engine=self.engine,
317+
cache=self.cache,
318+
url=self.unleash_url,
319+
headers=base_headers,
320+
request_timeout=self.unleash_request_timeout,
321+
ready_callback=self._ready_callback,
322+
custom_options=self.unleash_custom_options,
323+
)
324+
elif fetch_toggles:
300325
start_scheduler = True
301326
self.connector = PollingConnector(
302327
engine=self.engine,
@@ -422,10 +447,16 @@ def destroy(self) -> None:
422447
)
423448

424449
try:
425-
self.unleash_scheduler.shutdown()
450+
if hasattr(self, "unleash_scheduler") and self.unleash_scheduler:
451+
self.unleash_scheduler.remove_all_jobs()
452+
self.unleash_scheduler.shutdown(wait=True)
453+
except Exception as exc:
454+
LOGGER.warning("Exception during scheduler teardown: %s", exc)
455+
456+
try:
457+
self.cache.destroy()
426458
except Exception as exc:
427-
LOGGER.warning("Exception during scheduler shutdown: %s", exc)
428-
self.cache.destroy()
459+
LOGGER.warning("Exception during cache teardown: %s", exc)
429460

430461
@staticmethod
431462
def _get_fallback_value(

UnleashClient/connectors/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22
from .bootstrap_connector import BootstrapConnector
33
from .offline_connector import OfflineConnector
44
from .polling_connector import PollingConnector
5+
from .streaming_connector import StreamingConnector
56

67
__all__ = [
78
"BaseConnector",
89
"BootstrapConnector",
910
"OfflineConnector",
1011
"PollingConnector",
12+
"StreamingConnector",
1113
]
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
import threading
2+
from typing import Callable, Optional
3+
4+
from ld_eventsource import SSEClient
5+
from ld_eventsource.config import ConnectStrategy, ErrorStrategy, RetryDelayStrategy
6+
from yggdrasil_engine.engine import UnleashEngine
7+
8+
from UnleashClient.cache import BaseCache
9+
from UnleashClient.connectors.base_connector import BaseConnector
10+
from UnleashClient.constants import APPLICATION_HEADERS, FEATURES_URL, STREAMING_URL
11+
from UnleashClient.utils import LOGGER
12+
13+
14+
class StreamingConnector(BaseConnector):
15+
def __init__(
16+
self,
17+
engine: UnleashEngine,
18+
cache: BaseCache,
19+
url: str,
20+
headers: dict,
21+
request_timeout: int,
22+
ready_callback: Optional[Callable] = None,
23+
backoff_initial: float = 2.0,
24+
backoff_max: float = 30.0,
25+
backoff_multiplier: float = 2.0,
26+
backoff_jitter: Optional[float] = 0.5,
27+
custom_options: Optional[dict] = None,
28+
) -> None:
29+
super().__init__(engine=engine, cache=cache, ready_callback=ready_callback)
30+
self._base_url = url.rstrip("/") + STREAMING_URL
31+
self._headers = {
32+
**headers,
33+
**APPLICATION_HEADERS,
34+
"Accept": "text/event-stream",
35+
}
36+
self._timeout = request_timeout
37+
self._backoff_initial = backoff_initial
38+
self._backoff_max = backoff_max
39+
self._backoff_multiplier = backoff_multiplier
40+
self._backoff_jitter = backoff_jitter
41+
self._stop = threading.Event()
42+
self._thread: Optional[threading.Thread] = None
43+
self._client: Optional[SSEClient] = None
44+
base_options = custom_options or {}
45+
if self._timeout is not None and "timeout" not in base_options:
46+
base_options = {"timeout": self._timeout, **base_options}
47+
self._custom_options = base_options
48+
49+
def start(self):
50+
if self._thread and self._thread.is_alive():
51+
return
52+
self._stop.clear()
53+
self._thread = threading.Thread(
54+
target=self._run, name="UnleashStreaming", daemon=True
55+
)
56+
self._thread.start()
57+
58+
def stop(self):
59+
self._stop.set()
60+
try:
61+
if self._client:
62+
self._client.close()
63+
except Exception:
64+
pass
65+
if self._thread:
66+
self._thread.join(timeout=5)
67+
68+
def _run(self):
69+
try:
70+
LOGGER.info("Connecting to Unleash streaming endpoint: %s", self._base_url)
71+
72+
connect_strategy = ConnectStrategy.http(
73+
self._base_url,
74+
headers=self._headers,
75+
urllib3_request_options=self._custom_options,
76+
)
77+
78+
retry_strategy = RetryDelayStrategy.default(
79+
max_delay=self._backoff_max,
80+
backoff_multiplier=self._backoff_multiplier,
81+
jitter_multiplier=self._backoff_jitter,
82+
)
83+
84+
self._client = SSEClient(
85+
connect=connect_strategy,
86+
initial_retry_delay=self._backoff_initial,
87+
retry_delay_strategy=retry_strategy,
88+
retry_delay_reset_threshold=60.0,
89+
error_strategy=ErrorStrategy.always_continue(),
90+
logger=LOGGER,
91+
)
92+
93+
# Initial hydration happens in the stream.
94+
for event in self._client.events:
95+
if self._stop.is_set():
96+
break
97+
if not event.event:
98+
continue
99+
100+
if event.event in ("unleash-connected", "unleash-updated"):
101+
try:
102+
self.engine.take_state(event.data)
103+
self.cache.set(FEATURES_URL, self.engine.get_state())
104+
105+
if event.event == "unleash-connected" and self.ready_callback:
106+
try:
107+
self.ready_callback()
108+
except Exception:
109+
LOGGER.debug("Ready callback failed", exc_info=True)
110+
except Exception:
111+
LOGGER.error("Error applying streaming state", exc_info=True)
112+
self.load_features()
113+
else:
114+
LOGGER.debug("Ignoring SSE event type: %s", event.event)
115+
116+
LOGGER.debug("SSE stream ended")
117+
except Exception as exc:
118+
LOGGER.warning("Streaming connection failed: %s", exc)
119+
self.load_features()
120+
finally:
121+
try:
122+
if self._client is not None:
123+
self._client.close()
124+
except Exception:
125+
pass

UnleashClient/constants.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
REQUEST_TIMEOUT = 30
77
REQUEST_RETRIES = 3
88
METRIC_LAST_SENT_TIME = "mlst"
9-
CLIENT_SPEC_VERSION = "5.1.9"
9+
CLIENT_SPEC_VERSION = "5.2.2"
1010

1111
# =Unleash=
1212
APPLICATION_HEADERS = {
@@ -19,6 +19,7 @@
1919
REGISTER_URL = "/client/register"
2020
FEATURES_URL = "/client/features"
2121
METRICS_URL = "/client/metrics"
22+
STREAMING_URL = "/client/streaming"
2223

2324
# Cache keys
2425
FAILED_STRATEGIES = "failed_strategies"

pyproject.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,14 @@ dependencies=[
3131
"importlib_metadata",
3232
"python-dateutil",
3333
"semver < 4.0.0",
34-
"yggdrasil-engine",
34+
"yggdrasil-engine >= 1.0.0",
35+
"launchdarkly-eventsource",
3536
]
3637

3738
[project.urls]
3839
Homepage = "https://github.com/Unleash/unleash-python-sdk"
3940
Documentation = "https://docs.getunleash.io/unleash-python-sdk"
40-
Changelog = "https://github.com/Unleash/unleash-python-sdk/blob/main/CHANGELOG.md"
41+
Changelog = "https://github.com/unleash/unleash-python-sdk/blob/main/CHANGELOG.md"
4142
Repository = "https://github.com/Unleash/unleash-python-sdk"
4243
Issues = "https://github.com/Unleash/unleash-python-sdk/issues"
4344

requirements.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ mmhash3
66
python-dateutil
77
requests
88
semver
9-
yggdrasil-engine
9+
yggdrasil-engine>=1.0.0
10+
launchdarkly-eventsource
1011

1112
# Development packages
1213
# - Testing

0 commit comments

Comments
 (0)