Skip to content

Tasks retry after failures (API Rate Limit / Model Throttling) #3233

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 45 additions & 2 deletions src/crewai/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,15 @@ class Task(BaseModel):
default=3, description="Maximum number of retries when guardrail fails"
)
retry_count: int = Field(default=0, description="Current number of retries")
max_retries_after_failure: int = Field(
default=5, description="Maximum number of retries after a Task failure"
)
number_of_retries_remaining_after_failure: int = Field(
default=5, description="Number of retries remaining after a Task failure"
)
max_delay_after_failure: int = Field(
default=5, description="Maximum delay after a Task failure in seconds"
)
start_time: Optional[datetime.datetime] = Field(
default=None, description="Start time of the task execution"
)
Expand Down Expand Up @@ -501,7 +510,41 @@ def _execute_core(
except Exception as e:
self.end_time = datetime.datetime.now()
crewai_event_bus.emit(self, TaskFailedEvent(error=str(e), task=self))
raise e # Re-raise the exception after emitting the event
if self.number_of_retries_remaining_after_failure > 0:
# Retrying Task execution after failure using non-blocking timer
self.number_of_retries_remaining_after_failure -= 1

# Create a Future to hold the result of the delayed execution
result_future = Future()

def delayed_retry():
try:
# Execute the retry and set the result in the future
retry_result = self._execute_core(agent, context, tools)
result_future.set_result(retry_result)
except Exception as retry_error:
result_future.set_exception(retry_error)

# Schedule the retry without blocking
timer = threading.Timer(self.max_delay_after_failure, delayed_retry)
timer.daemon = True # Allow the timer to be terminated when the main thread exits
timer.start()

# Wait for the result - this will block the current thread but allows other threads to run
return result_future.result()
else:
# Instead of raising an exception which could block the thread and cause the crew to hang,
# return a TaskOutput that indicates failure but allows the process to continue
error_output = TaskOutput(
name=self.name,
description=self.description,
expected_output=self.expected_output,
raw=f"Task failed after {self.max_retries_after_failure} retries. Error: {str(e)}",
agent=self.agent.role if self.agent else None,
output_format=self._get_output_format(),
)
self.output = error_output
return error_output

def _process_guardrail(self, task_output: TaskOutput) -> GuardrailResult:
assert self._guardrail is not None
Expand Down Expand Up @@ -788,4 +831,4 @@ def fingerprint(self) -> Fingerprint:
Returns:
Fingerprint: The fingerprint of the task
"""
return self.security_config.fingerprint
return self.security_config.fingerprint