Skip to content

Commit

Permalink
dev(narugo): strictly limit the pipe
Browse files Browse the repository at this point in the history
  • Loading branch information
narugo1992 committed Sep 23, 2024
1 parent 2c8555c commit a212547
Showing 1 changed file with 11 additions and 3 deletions.
14 changes: 11 additions & 3 deletions cheesechaser/pipe/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,20 @@ def next(self, block: bool = True, timeout: Optional[float] = None) -> PipeItem:
self.is_start.set()
return self.queue.get(block=block, timeout=timeout)

def _count_update(self, n: int = 1):
def _count_update(self, n: int = 1) -> bool:
"""
Update current count. If the count reaches the limit, set the status to ``stopped``.
:param n: Count for Adding. Default is 1.
:return: Reached the limit or not
:rtype: bool
"""
self._current_count += n
if self.max_count is not None and self._current_count >= self.max_count:
self.is_stopped.set()
return True
else:
return False

def __iter__(self) -> Iterator[PipeItem]:
"""
Expand All @@ -127,14 +132,17 @@ def __iter__(self) -> Iterator[PipeItem]:
:rtype: Iterator[PipeItem]
"""
pg = tqdm(desc='Piped Items', total=self.max_count)
self._count_update(0)
if self._count_update(0):
return

Check warning on line 136 in cheesechaser/pipe/base.py

View check run for this annotation

Codecov / codecov/patch

cheesechaser/pipe/base.py#L136

Added line #L136 was not covered by tests

while not (self.is_stopped.is_set() and self.queue.empty()):
try:
data = self.next(block=True, timeout=1.0)
if isinstance(data, PipeItem):
pg.update()
yield data
self._count_update()
if self._count_update():
break
except Empty:
pass

Expand Down

0 comments on commit a212547

Please sign in to comment.