From a2125473ab2a9bd59441635e1619929dfa215dce Mon Sep 17 00:00:00 2001 From: narugo1992 Date: Mon, 23 Sep 2024 13:56:38 +0800 Subject: [PATCH] dev(narugo): strictly limit the pipe --- cheesechaser/pipe/base.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/cheesechaser/pipe/base.py b/cheesechaser/pipe/base.py index ac701c33a..eb815f906 100644 --- a/cheesechaser/pipe/base.py +++ b/cheesechaser/pipe/base.py @@ -109,15 +109,20 @@ def next(self, block: bool = True, timeout: Optional[float] = None) -> PipeItem: self.is_start.set() return self.queue.get(block=block, timeout=timeout) - def _count_update(self, n: int = 1): + def _count_update(self, n: int = 1) -> bool: """ Update current count. If the count reaches the limit, set the status to ``stopped``. :param n: Count for Adding. Default is 1. + :return: Reached the limit or not + :rtype: bool """ self._current_count += n if self.max_count is not None and self._current_count >= self.max_count: self.is_stopped.set() + return True + else: + return False def __iter__(self) -> Iterator[PipeItem]: """ @@ -127,14 +132,17 @@ def __iter__(self) -> Iterator[PipeItem]: :rtype: Iterator[PipeItem] """ pg = tqdm(desc='Piped Items', total=self.max_count) - self._count_update(0) + if self._count_update(0): + return + while not (self.is_stopped.is_set() and self.queue.empty()): try: data = self.next(block=True, timeout=1.0) if isinstance(data, PipeItem): pg.update() yield data - self._count_update() + if self._count_update(): + break except Empty: pass