Skip to content

Commit 338943e

Browse files
committed
fix: add public method to check existing subs
1 parent 955566d commit 338943e

1 file changed

Lines changed: 19 additions & 3 deletions

File tree

bec_lib/bec_lib/redis_connector.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -297,12 +297,15 @@ def move_from_start_to_normal(self, topics_and_end_ids: dict[str, str]):
297297
read_id=topics_and_end_ids[topic], subs=self.from_start_subs.pop(topic)
298298
)
299299

300-
def _check_registered(self, topic, new_sub: StreamSubInfo):
301-
if (
300+
def is_already_registered(self, topic: str, new_sub: StreamSubInfo):
301+
return (
302302
(topic in self.from_start_subs and new_sub in self.from_start_subs[topic])
303303
or (topic in self._dr_subs and new_sub in self._dr_subs[topic])
304304
or (topic in self._subs and new_sub in self._subs[topic].subs)
305-
):
305+
)
306+
307+
def _check_registered(self, topic: str, new_sub: StreamSubInfo):
308+
if self.is_already_registered(topic, new_sub):
306309
raise ValueError(f"Received duplicate subscription for {new_sub=}.")
307310

308311
def add_direct_listener(self, topic: str, new_sub: DirectReadStreamSubInfo):
@@ -709,6 +712,19 @@ def _normalize_patterns(self, patterns) -> list[str]:
709712
raise ValueError("register: patterns must be a string or a list of strings")
710713
return patterns
711714

715+
def any_stream_is_registered(
716+
self, topics: EndpointInfo | str | list[EndpointInfo] | list[str], cb: Callable
717+
) -> bool:
718+
"""Check if any stream in `topics` is already registered with this callback.
719+
Does not check if the topic is a stream in Redis, it will just return False."""
720+
with self._stream_subs.lock:
721+
return any(
722+
self._stream_subs.is_already_registered(
723+
topic, StreamSubInfo(louie.saferef.safe_ref(cb), {})
724+
)
725+
for topic in self._convert_endpointinfo(topics)[0]
726+
)
727+
712728
def register(
713729
self,
714730
topics: str | list[str] | EndpointInfo | list[EndpointInfo] | None = None,

0 commit comments

Comments
 (0)