diff --git a/src/ragas/async_utils.py b/src/ragas/async_utils.py index 38a871760..a9a5b3dc8 100644 --- a/src/ragas/async_utils.py +++ b/src/ragas/async_utils.py @@ -116,7 +116,6 @@ async def process_futures( raise # Re-raise CancelledError to ensure proper cancellation except Exception as e: result = e - yield result @@ -182,6 +181,7 @@ def run_async_tasks( async def _run(): total_tasks = len(tasks) results = [] + first_exception = None pbm = ProgressBarManager(progress_bar_desc, show_progress) if not batch_size: @@ -189,6 +189,14 @@ async def _run(): async for result in process_futures( as_completed(tasks, max_workers, cancel_check=cancel_check) ): + if isinstance(result, Exception): + logger.error( + f"Task failed with {type(result).__name__}: {result}", + exc_info=False, + ) + # Store first exception to raise after all tasks complete + if first_exception is None: + first_exception = result results.append(result) pbar.update(1) else: @@ -203,10 +211,22 @@ async def _run(): async for result in process_futures( as_completed(batch, max_workers, cancel_check=cancel_check) ): + if isinstance(result, Exception): + logger.error( + f"Task failed with {type(result).__name__}: {result}", + exc_info=False, + ) + # Store first exception to raise after all tasks complete + if first_exception is None: + first_exception = result results.append(result) batch_pbar.update(1) overall_pbar.update(len(batch)) + # Raise the first exception encountered to fail fast with clear error message + if first_exception is not None: + raise first_exception + return results return run(_run)