Skip to content

DOG-6551 : Add throttling helper class#505

Closed
Hmnt39 wants to merge 6 commits intomasterfrom
DOG-6551-throttling-support
Closed

DOG-6551 : Add throttling helper class#505
Hmnt39 wants to merge 6 commits intomasterfrom
DOG-6551-throttling-support

Conversation

@Hmnt39
Copy link
Contributor

@Hmnt39 Hmnt39 commented Jan 30, 2026

This PR add TaskThrottle class to limit concurrent task execution using semaphores. Provides both decorator and context manager (with throttle.lease()) interfaces for throttling.

@Hmnt39 Hmnt39 requested a review from a team as a code owner January 30, 2026 08:54
@gemini-code-assist
Copy link

Summary of Changes

Hello @Hmnt39, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a new utility class, TaskThrottle, aimed at providing a robust and flexible mechanism for controlling concurrent task execution within the application. By leveraging semaphores, it ensures that resource utilization is managed effectively, preventing system overload while allowing for efficient parallel processing up to a defined limit. The class is designed for ease of integration, supporting both decorator and context manager patterns.

Highlights

  • New TaskThrottle Class: Introduced a new TaskThrottle class designed to limit the number of concurrent task executions using threading.Semaphore.
  • Flexible Usage Patterns: The TaskThrottle class supports two primary interfaces for throttling: a decorator (@throttle.limit) for functions and a context manager (with throttle.lease()) for code blocks.
  • Robust Concurrency Control: The implementation ensures that the specified maximum number of concurrent tasks is never exceeded, raising a ValueError if initialized with an invalid concurrency limit.
  • Comprehensive Unit Tests: Added a dedicated test file (test_throttle.py) with tests covering initialization, verification of concurrency limits, serial execution, and high concurrency scenarios using ThreadPoolExecutor.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a TaskThrottle class, providing a robust and flexible way to limit concurrent task execution using semaphores. The implementation is clean, offering both decorator and context manager interfaces, which enhances usability. The accompanying test suite is comprehensive, covering initialization, concurrency limits, serial execution, and high concurrency scenarios, ensuring the reliability of the new utility. The code adheres to best practices for threading and resource management, such as using functools.wraps for decorators and try...finally for context managers to guarantee semaphore release. Overall, this is a well-designed and well-tested addition to the extractorutils library.

@codecov
Copy link

codecov bot commented Jan 30, 2026

Codecov Report

❌ Patch coverage is 82.75862% with 5 lines in your changes missing coverage. Please review.
✅ Project coverage is 81.18%. Comparing base (dd94c20) to head (9b14edb).

Files with missing lines Patch % Lines
cognite/extractorutils/unstable/core/throttle.py 82.14% 5 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master     #505      +/-   ##
==========================================
+ Coverage   81.13%   81.18%   +0.05%     
==========================================
  Files          43       44       +1     
  Lines        4208     4236      +28     
==========================================
+ Hits         3414     3439      +25     
- Misses        794      797       +3     
Files with missing lines Coverage Δ
cognite/extractorutils/__init__.py 100.00% <100.00%> (ø)
cognite/extractorutils/unstable/core/throttle.py 82.14% <82.14%> (ø)

... and 1 file with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Contributor

@nithinb nithinb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes look good, change the version as mentioned in the comment because patch would imply you are fixing something but the reality is that you are adding a new feature

@Hmnt39 Hmnt39 requested a review from nithinb February 2, 2026 08:46
nithinb
nithinb previously approved these changes Feb 2, 2026
@Hmnt39 Hmnt39 added the waiting-for-risk-review Waiting for a member of the risk review team to take an action label Feb 2, 2026
@haakonvt haakonvt self-assigned this Feb 3, 2026
@haakonvt haakonvt added the risk-review-ongoing Risk review is in progress label Feb 3, 2026
Copy link
Contributor

@haakonvt haakonvt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Risk review: just a few questions on whether the implemented functionality is really needed.

Comment on lines +58 to +67
@contextmanager
def lease(self) -> Generator[None, None, None]:
"""
Context manager that acquires/releases a throttle slot.
"""
self._semaphore.acquire()
try:
yield
finally:
self._semaphore.release()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By using the @contextmanager decorator, you can get the effect of a context manager without writing a full class. You, however, have written a full class, then I see no point in not implementing enter and exit dunder methods:

def __enter__(self):
    self._semaphore.acquire()

def __exit__(self, exc_type, exc_val, exc_tb):
    self._semaphore.release()

However, taking a step back, this is exactly the interface the semaphore already provides you, leading me to question why you need this in the first place?

throttle = TaskThrottle(5)
with throttle.lease():
    ...

throttle = Semaphore(5)
with throttle:
    ...

Comment on lines +46 to +55
def limit(self, func: Callable[P, T]) -> Callable[P, T]:
"""
Decorator to throttle a task function.
"""

@wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
with self.lease():
return func(*args, **kwargs)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In light of the previous comment, I would suggest you only implement this limit functionality:

def limit_concurrency(max_concurrent):
    semaphore = Semaphore(max_concurrent)

    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            with semaphore:
                return func(*args, **kwargs)
        return wrapper

    return decorator

Which would allow both single-use:

@limit_concurrency(3)
def foo(data):
    ...

...and shared-pool limit:

throttle = limit_concurrency(5)

@throttle
def foo(user_id):
    ...

@throttle
def bar(entry):
    ...


### Added
* In the `unstable` package: Add TaskThrottle helper class for limiting concurrent task execution with decorator and context manager support
* Add support for environment variable interpolation in keyvault config
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where was this added?

* Add support for environment variable interpolation in keyvault config

@haakonvt haakonvt added waiting-for-team Waiting for the submitter or reviewer of the PR to take an action and removed waiting-for-risk-review Waiting for a member of the risk review team to take an action labels Feb 3, 2026
@Hmnt39
Copy link
Contributor Author

Hmnt39 commented Feb 4, 2026

Make sense, this is not required, I can directly use semaphore as context manager rather creating a wrapper on it. Closing this PR

@Hmnt39 Hmnt39 closed this Feb 4, 2026
@haakonvt haakonvt deleted the DOG-6551-throttling-support branch February 4, 2026 13:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

risk-review-ongoing Risk review is in progress waiting-for-team Waiting for the submitter or reviewer of the PR to take an action

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants