Skip to content

Add rate limiting for task updates #35

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/function_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import datetime
import datetime

from taskbadger import track

Expand All @@ -7,7 +7,7 @@
def my_task(arg1, arg2, kwarg1=None, kwarg2="demo"):
print("Hello from my_task")
print(f"arg1={arg1}, arg2={arg2}, kwarg1={kwarg1}, kwarg2={kwarg2}")
return ["Hello from my_task", datetime.utcnow()]
return ["Hello from my_task", datetime.datetime.now(datetime.timezone.utc)]


if __name__ == "__main__":
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/test_basics.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from datetime import datetime
import datetime

import taskbadger as badger
from taskbadger import StatusEnum


def test_basics():
data = {"now": datetime.utcnow().isoformat()}
data = {"now": datetime.datetime.now(datetime.timezone.utc).isoformat()}
task = badger.create_task("test basics", data=data)
task.success(100)
assert task.status == StatusEnum.SUCCESS
Expand Down
8 changes: 4 additions & 4 deletions taskbadger/process.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import datetime
import subprocess
import threading
import time
from datetime import datetime


class ProcessRunner:
Expand All @@ -13,7 +13,7 @@ def __init__(self, process_args, env, capture_output: bool, update_frequency: in
self.returncode = None

def run(self):
last_update = datetime.utcnow()
last_update = datetime.datetime.now(datetime.timezone.utc)

kwargs = {}
if self.capture_output:
Expand All @@ -28,7 +28,7 @@ def run(self):
while process.poll() is None:
time.sleep(0.1)
if _should_update(last_update, self.update_frequency):
last_update = datetime.utcnow()
last_update = datetime.datetime.now(datetime.timezone.utc)
if self.capture_output:
yield {"stdout": stdout.read(), "stderr": stderr.read()}
else:
Expand Down Expand Up @@ -75,4 +75,4 @@ def __bool__(self):


def _should_update(last_update: datetime, update_frequency_seconds):
return (datetime.utcnow() - last_update).total_seconds() >= update_frequency_seconds
return (datetime.datetime.now(datetime.timezone.utc) - last_update).total_seconds() >= update_frequency_seconds
58 changes: 49 additions & 9 deletions taskbadger/sdk.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import logging
import os
from typing import Any
Expand Down Expand Up @@ -326,18 +327,33 @@ def update_status(self, status: StatusEnum):
"""Update the task status"""
self.update(status=status)

def increment_progress(self, amount: int):
"""Increment the task progress.
def increment_progress(self, amount: int, min_value_interval: int = None, min_time_interval: int = None):
"""Increment the task progress by adding the specified amount to the current value.
If the task value is not set it will be set to `amount`.

Arguments:
amount: The amount to increment the task value by.
min_value_interval: The minimum change in value required to trigger an update.
min_time_interval: The minimum interval between updates in seconds.
"""
value = self._task.value
value_norm = value if value is not UNSET and value is not None else 0
new_amount = value_norm + amount
self.update(value=new_amount)
self.update_progress(new_amount, min_value_interval, min_time_interval)

def update_progress(self, value: int, min_value_interval: int = None, min_time_interval: int = None):
"""Update task progress.

def update_progress(self, value: int):
"""Update task progress."""
self.update(value=value)
Arguments:
value: The new value to set.
min_value_interval: The minimum change in value required to trigger an update.
min_time_interval: The minimum interval between updates in seconds.
"""
skip_check = not (min_value_interval or min_time_interval)
time_check = min_time_interval and self._check_update_time_interval(min_time_interval)
value_check = min_value_interval and self._check_update_value_interval(value, min_value_interval)
if skip_check or time_check or value_check:
self.update(value=value)

def set_value_max(self, value_max: int):
"""Set the `value_max`."""
Expand Down Expand Up @@ -392,10 +408,16 @@ def tag(self, tags: dict[str, str]):
"""Add tags to the task."""
self.update(tags=tags)

def ping(self):
def ping(self, min_time_interval=None):
"""Update the task without changing any values. This can be used in conjunction
with 'stale_timeout' to indicate that the task is still running."""
self.update()
with 'stale_timeout' to indicate that the task is still running.

Arguments:
min_time_interval: The minimum interval between pings in seconds. If set this will only
update the task if the last update was more than `min_time_interval` seconds ago.
"""
if self._check_update_time_interval(min_time_interval):
self.update()

@property
def tags(self):
Expand All @@ -410,6 +432,24 @@ def safe_update(self, **kwargs):
except Exception as e:
log.warning("Error updating task '%s': %s", self._task.id, e)

def _check_update_time_interval(self, min_time_interval: int = None):
if min_time_interval and self._task.updated:
# tzinfo should always be set but for the sake of safety we check
if self._task.updated.tzinfo is None:
tz = None
else:
# Use timezone.utc for Python <3.11 compatibility
tz = datetime.timezone.utc
now = datetime.datetime.now(tz)
time_since = now - self._task.updated
return time_since.total_seconds() >= min_time_interval
return True

def _check_update_value_interval(self, new_value, min_value_interval: int = None):
if min_value_interval and self._task.value:
return new_value - self._task.value >= min_value_interval
return True


def _none_to_unset(value):
return UNSET if value is None else value
Expand Down
80 changes: 80 additions & 0 deletions tests/test_sdk.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
from http import HTTPStatus
from unittest import mock

Expand Down Expand Up @@ -153,6 +154,85 @@ def test_update_data(settings, patched_update):
_verify_update(settings, patched_update, data={"a": 1})


def test_ping(settings, patched_update):
task = Task(task_for_test())

updated_at = task.updated
patched_update.return_value = Response(HTTPStatus.OK, b"", {}, task_for_test())
task.ping(min_time_interval=1)
assert len(patched_update.call_args_list) == 0

task.ping()
_verify_update(settings, patched_update)
assert task.updated > updated_at

task.ping(min_time_interval=1)
assert len(patched_update.call_args_list) == 1

task._task.updated = task._task.updated - datetime.timedelta(seconds=1)
task.ping(min_time_interval=1)
assert len(patched_update.call_args_list) == 2


def test_update_progress_min_time_interval(settings, patched_update):
task = Task(task_for_test(value=1))

updated_at = task.updated
patched_update.return_value = Response(HTTPStatus.OK, b"", {}, task_for_test())
task.update_progress(2, min_time_interval=1)
assert len(patched_update.call_args_list) == 0

task.update_progress(2)
_verify_update(settings, patched_update, value=2)
assert task.updated > updated_at

task.update_progress(3, min_time_interval=1)
assert len(patched_update.call_args_list) == 1

task._task.updated = task._task.updated - datetime.timedelta(seconds=1)
task.update_progress(3, min_time_interval=1)
assert len(patched_update.call_args_list) == 2


def test_update_progress_min_value_interval(settings, patched_update):
task = Task(task_for_test(value=1))

patched_update.return_value = Response(HTTPStatus.OK, b"", {}, task_for_test(value=4))
task.update_progress(4, min_value_interval=5)
assert len(patched_update.call_args_list) == 0

task.update_progress(4)
_verify_update(settings, patched_update, value=4)

task.update_progress(8, min_value_interval=5)
assert len(patched_update.call_args_list) == 1

task.update_progress(9, min_value_interval=5)
assert len(patched_update.call_args_list) == 2


def test_update_progress_min_interval_both(settings, patched_update):
task = Task(task_for_test(value=1))

patched_update.return_value = Response(HTTPStatus.OK, b"", {}, task_for_test(value=4))
# neither checks pass
task.update_progress(4, min_time_interval=1, min_value_interval=5)
assert len(patched_update.call_args_list) == 0

# value check passes
task.update_progress(6, min_time_interval=1, min_value_interval=5)
_verify_update(settings, patched_update, value=6)

# neither checks pass
task.update_progress(8, min_time_interval=1, min_value_interval=5)
assert len(patched_update.call_args_list) == 1

# time check passes
task._task.updated = task._task.updated - datetime.timedelta(seconds=1)
task.update_progress(6, min_time_interval=1, min_value_interval=5)
assert len(patched_update.call_args_list) == 2


def test_increment_progress(settings, patched_update):
api_task = task_for_test()
task = Task(api_task)
Expand Down
6 changes: 3 additions & 3 deletions tests/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import datetime
import datetime
from uuid import uuid4

from taskbadger.internal.models import Task as TaskInternal
Expand All @@ -12,8 +12,8 @@ def task_for_test(**kwargs):
kwargs["url"] = None
kwargs["public_url"] = None
kwargs["value_percent"] = None
kwargs["created"] = datetime.utcnow()
kwargs["updated"] = datetime.utcnow()
kwargs["created"] = datetime.datetime.now(datetime.timezone.utc)
kwargs["updated"] = datetime.datetime.now(datetime.timezone.utc)
return TaskInternal(
task_id,
"org",
Expand Down