From 632d197a9dee8577f3ac9316d86d0782c7697429 Mon Sep 17 00:00:00 2001 From: Andreas Klos Date: Wed, 3 Sep 2025 11:10:01 +0200 Subject: [PATCH 01/10] refactor: integrate retry decorator settings into DependencyContainer and LangchainSummarizer --- .../src/admin_api_lib/dependency_container.py | 6 +- .../impl/settings/summarizer_settings.py | 62 ++++++++++- .../impl/summarizer/langchain_summarizer.py | 105 ++++++++++++++---- 3 files changed, 145 insertions(+), 28 deletions(-) diff --git a/libs/admin-api-lib/src/admin_api_lib/dependency_container.py b/libs/admin-api-lib/src/admin_api_lib/dependency_container.py index feee4cd8..35eb52ff 100644 --- a/libs/admin-api-lib/src/admin_api_lib/dependency_container.py +++ b/libs/admin-api-lib/src/admin_api_lib/dependency_container.py @@ -64,6 +64,7 @@ from rag_core_lib.impl.settings.langfuse_settings import LangfuseSettings from rag_core_lib.impl.settings.ollama_llm_settings import OllamaSettings from rag_core_lib.impl.settings.rag_class_types_settings import RAGClassTypeSettings +from rag_core_lib.impl.settings.retry_decorator_settings import RetryDecoratorSettings from rag_core_lib.impl.settings.stackit_vllm_settings import StackitVllmSettings from rag_core_lib.impl.tracers.langfuse_traced_chain import LangfuseTracedGraph from rag_core_lib.impl.utils.async_threadsafe_semaphore import AsyncThreadsafeSemaphore @@ -86,6 +87,7 @@ class DependencyContainer(DeclarativeContainer): key_value_store_settings = KeyValueSettings() summarizer_settings = SummarizerSettings() source_uploader_settings = SourceUploaderSettings() + retry_decorator_settings = RetryDecoratorSettings() key_value_store = Singleton(FileStatusKeyValueStore, key_value_store_settings) file_service = Singleton(S3Service, s3_settings=s3_settings) @@ -136,7 +138,9 @@ class DependencyContainer(DeclarativeContainer): LangchainSummarizer, langfuse_manager=langfuse_manager, chunker=summary_text_splitter, - semaphore=Singleton(AsyncThreadsafeSemaphore, summarizer_settings.maximum_concurrreny), + semaphore=Singleton(AsyncThreadsafeSemaphore, summarizer_settings.maximum_concurrency), + summarizer_settings=summarizer_settings, + retry_decorator_settings=retry_decorator_settings ) summary_enhancer = List( diff --git a/libs/admin-api-lib/src/admin_api_lib/impl/settings/summarizer_settings.py b/libs/admin-api-lib/src/admin_api_lib/impl/settings/summarizer_settings.py index 3617adb8..b3138ce0 100644 --- a/libs/admin-api-lib/src/admin_api_lib/impl/settings/summarizer_settings.py +++ b/libs/admin-api-lib/src/admin_api_lib/impl/settings/summarizer_settings.py @@ -1,6 +1,7 @@ """Contains settings for summarizer.""" -from pydantic import Field +from typing import Optional +from pydantic import Field, PositiveInt from pydantic_settings import BaseSettings @@ -12,8 +13,22 @@ class SummarizerSettings(BaseSettings): ---------- maximum_input_size : int The maximum size of the input that the summarizer can handle. Default is 8000. - maximum_concurrreny : int + maximum_concurrency : int The maximum number of concurrent summarization processes. Default is 10. + max_retries: Optional[PositiveInt] + Total retries, not counting the initial attempt. + retry_base_delay: Optional[float] + Base delay in seconds for the first retry. + retry_max_delay: Optional[float] + Maximum delay cap in seconds for any single wait. + backoff_factor: Optional[float] + Exponential backoff factor (>= 1). + attempt_cap: Optional[int] + Cap for exponent growth (backoff_factor ** attempt_cap). + jitter_min: Optional[float] + Minimum jitter in seconds. + jitter_max: Optional[float] + Maximum jitter in seconds. """ class Config: @@ -23,4 +38,45 @@ class Config: case_sensitive = False maximum_input_size: int = Field(default=8000) - maximum_concurrreny: int = Field(default=10) + maximum_concurrency: int = Field(default=10) + max_retries: Optional[PositiveInt] = Field( + default=None, + title="Max Retries", + description="Total retries, not counting the initial attempt.", + ) + retry_base_delay: Optional[float] = Field( + default=None, + ge=0, + title="Retry Base Delay", + description="Base delay in seconds for the first retry.", + ) + retry_max_delay: Optional[float] = Field( + default=None, + gt=0, + title="Retry Max Delay", + description="Maximum delay cap in seconds for any single wait.", + ) + backoff_factor: Optional[float] = Field( + default=None, + ge=1.0, + title="Backoff Factor", + description="Exponential backoff factor (>= 1).", + ) + attempt_cap: Optional[int] = Field( + default=None, + ge=0, + title="Attempt Cap", + description="Cap for exponent growth (backoff_factor ** attempt_cap).", + ) + jitter_min: Optional[float] = Field( + default=None, + ge=0.0, + title="Jitter Min (s)", + description="Minimum jitter in seconds.", + ) + jitter_max: Optional[float] = Field( + default=None, + ge=0.0, + title="Jitter Max (s)", + description="Maximum jitter in seconds.", + ) diff --git a/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py b/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py index 1d5b5d09..80971743 100644 --- a/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py +++ b/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py @@ -1,20 +1,24 @@ """Module for the LangchainSummarizer class.""" +import asyncio import logging -import traceback from typing import Optional from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_core.documents import Document from langchain_core.runnables import Runnable, RunnableConfig, ensure_config +from openai import APIConnectionError, APIError, APITimeoutError, RateLimitError +from admin_api_lib.impl.settings.summarizer_settings import SummarizerSettings from admin_api_lib.summarizer.summarizer import ( Summarizer, SummarizerInput, SummarizerOutput, ) from rag_core_lib.impl.langfuse_manager.langfuse_manager import LangfuseManager +from rag_core_lib.impl.settings.retry_decorator_settings import RetryDecoratorSettings from rag_core_lib.impl.utils.async_threadsafe_semaphore import AsyncThreadsafeSemaphore +from rag_core_lib.impl.utils.retry_decorator import retry_with_backoff logger = logging.getLogger(__name__) @@ -32,10 +36,15 @@ def __init__( langfuse_manager: LangfuseManager, chunker: RecursiveCharacterTextSplitter, semaphore: AsyncThreadsafeSemaphore, + summarizer_settings: SummarizerSettings, + retry_decorator_settings: RetryDecoratorSettings, ): self._chunker = chunker self._langfuse_manager = langfuse_manager self._semaphore = semaphore + self._retry_decorator_settings = self._create_retry_decorator_settings( + summarizer_settings, retry_decorator_settings + ) async def ainvoke(self, query: SummarizerInput, config: Optional[RunnableConfig] = None) -> SummarizerOutput: """ @@ -65,40 +74,88 @@ async def ainvoke(self, query: SummarizerInput, config: Optional[RunnableConfig] """ assert query, "Query is empty: %s" % query # noqa S101 config = ensure_config(config) - tries_remaining = config.get("configurable", {}).get("tries_remaining", 3) - logger.debug("Tries remaining %d" % tries_remaining) - if tries_remaining < 0: - raise Exception("Summary creation failed.") document = Document(page_content=query) langchain_documents = self._chunker.split_documents([document]) + logger.debug("Summarizing %d chunk(s)...", len(langchain_documents)) - outputs = [] - for langchain_document in langchain_documents: - async with self._semaphore: - try: - result = await self._create_chain().ainvoke({"text": langchain_document.page_content}, config) - # Extract content from AIMessage if it's not already a string - content = result.content if hasattr(result, "content") else str(result) - outputs.append(content) - except Exception as e: - logger.error("Error in summarizing langchain doc: %s %s", e, traceback.format_exc()) - config["tries_remaining"] = tries_remaining - 1 - result = await self._create_chain().ainvoke({"text": langchain_document.page_content}, config) - # Extract content from AIMessage if it's not already a string - content = result.content if hasattr(result, "content") else str(result) - outputs.append(content) + # Fan out with concurrency, bounded by your semaphore inside _summarize_chunk + tasks = [asyncio.create_task(self._summarize_chunk(doc.page_content, config)) for doc in langchain_documents] + outputs = await asyncio.gather(*tasks) if len(outputs) == 1: return outputs[0] - summary = " ".join(outputs) + + # Optional single reduce pass (no recursion) + merged = " ".join(outputs) logger.debug( - "Reduced number of chars from %d to %d" - % (len("".join([x.page_content for x in langchain_documents])), len(summary)) + "Reduced number of chars from %d to %d", + len("".join([x.page_content for x in langchain_documents])), + len(merged), + ) + return await self._summarize_chunk(merged, config) + + def _create_retry_decorator_settings( + self, summarizer_settings: SummarizerSettings, retry_decorator_settings: RetryDecoratorSettings + ): + return RetryDecoratorSettings( + max_retries=( + summarizer_settings.max_retries + if summarizer_settings.max_retries is not None + else retry_decorator_settings.max_retries + ), + retry_base_delay=( + summarizer_settings.retry_base_delay + if summarizer_settings.retry_base_delay is not None + else retry_decorator_settings.retry_base_delay + ), + retry_max_delay=( + summarizer_settings.retry_max_delay + if summarizer_settings.retry_max_delay is not None + else retry_decorator_settings.retry_max_delay + ), + backoff_factor=( + summarizer_settings.backoff_factor + if summarizer_settings.backoff_factor is not None + else retry_decorator_settings.backoff_factor + ), + attempt_cap=( + summarizer_settings.attempt_cap + if summarizer_settings.attempt_cap is not None + else retry_decorator_settings.attempt_cap + ), + jitter_min=( + summarizer_settings.jitter_min + if summarizer_settings.jitter_min is not None + else retry_decorator_settings.jitter_min + ), + jitter_max=( + summarizer_settings.jitter_max + if summarizer_settings.jitter_max is not None + else retry_decorator_settings.jitter_max + ), ) - return await self.ainvoke(summary, config) def _create_chain(self) -> Runnable: return self._langfuse_manager.get_base_prompt(self.__class__.__name__) | self._langfuse_manager.get_base_llm( self.__class__.__name__ ) + + def _retry_with_backoff_wrapper(self): + # Prefer summarizer-specific overrides; fall back to global retry settings + return retry_with_backoff( + settings=self._retry_decorator_settings, + exceptions=(APIError, RateLimitError, APITimeoutError, APIConnectionError), + rate_limit_exceptions=(RateLimitError,), + logger=logger, + ) + + async def _summarize_chunk(self, text: str, config: Optional[RunnableConfig]) -> SummarizerOutput: + @self._retry_with_backoff_wrapper() + async def _call(text: str, config: Optional[RunnableConfig]) -> SummarizerOutput: + response = await self._create_chain().ainvoke({"text": text}, config) + return response.content if hasattr(response, "content") else str(response) + + # Hold the semaphore for the entire retry lifecycle + async with self._semaphore: + return await _call(text, config) From 75d898c4b83848f2e1cb3a786de2cff8fe812ed0 Mon Sep 17 00:00:00 2001 From: Andreas Klos Date: Wed, 3 Sep 2025 11:29:39 +0200 Subject: [PATCH 02/10] docs: update README to include summarizer retry behavior and configuration details --- libs/README.md | 39 +++++++++++++++++++++++++++++++++++---- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/libs/README.md b/libs/README.md index e06214c1..d77b74a5 100644 --- a/libs/README.md +++ b/libs/README.md @@ -12,6 +12,7 @@ It consists of the following python packages: - [2.1 Requirements](#21-requirements) - [2.2 Endpoints](#22-endpoints) - [2.3 Replaceable parts](#23-replaceable-parts) + - [2.4 Summarizer retry behavior](#24-summarizer-retry-behavior) - [`3. Extractor API lib`](#3-extractor-api-lib) - [3.1 Requirements](#31-requirements) - [3.2 Endpoints](#32-endpoints) @@ -71,6 +72,7 @@ By default `OpenAI` is used by the evaluation. If you want to use the same LLM-c Endpoint to remove documents from the vector database. #### `/information_pieces/upload` + Endpoint to upload documents into the vector database. These documents need to have been parsed. For simplicity, a LangChain Documents like format is used. Uploaded documents are required to contain the following metadata: @@ -94,7 +96,7 @@ Uploaded documents are required to contain the following metadata: | chat_graph | [`rag_core_api.graph.graph_base.GraphBase`](./rag-core-api/src/rag_core_api/graph/graph_base.py) | [`rag_core_api.impl.graph.chat_graph.DefaultChatGraph`](./rag-core-api/src/rag_core_api/impl/graph/chat_graph.py) | Langgraph graph that contains the entire logic for question answering. | | traced_chat_graph | [`rag_core_lib.chains.async_chain.AsyncChain[Any, Any]`](./rag-core-lib/src/rag_core_lib/chains/async_chain.py)| [`rag_core_lib.impl.tracers.langfuse_traced_chain.LangfuseTracedGraph`](./rag-core-lib/src/rag_core_lib/impl/tracers/langfuse_traced_chain.py) | Wraps around the *chat_graph* and add langfuse tracing. | | evaluator | [`rag_core_api.impl.evaluator.langfuse_ragas_evaluator.LangfuseRagasEvaluator`](./rag-core-api/src/rag_core_api/impl/evaluator/langfuse_ragas_evaluator.py) | [`rag_core_api.impl.evaluator.langfuse_ragas_evaluator.LangfuseRagasEvaluator`](./rag-core-api/src/rag_core_api/impl/evaluator/langfuse_ragas_evaluator.py) | The evaulator used in the evaluate endpoint. | -| chat_endpoint | [ `rag_core_api.api_endpoints.chat.Chat`](./rag-core-api/src/rag_core_api/api_endpoints/chat.py) | [`rag_core_api.impl.api_endpoints.default_chat.DefaultChat`](./rag-core-api/src/rag_core_api/impl/api_endpoints/default_chat.py) | Implementation of the chat endpoint. Default implementation just calls the *traced_chat_graph* | +| chat_endpoint | [`rag_core_api.api_endpoints.chat.Chat`](./rag-core-api/src/rag_core_api/api_endpoints/chat.py) | [`rag_core_api.impl.api_endpoints.default_chat.DefaultChat`](./rag-core-api/src/rag_core_api/impl/api_endpoints/default_chat.py) | Implementation of the chat endpoint. Default implementation just calls the *traced_chat_graph* | | ragas_llm | `langchain_core.language_models.chat_models.BaseChatModel` | `langchain_openai.ChatOpenAI` or `langchain_ollama.ChatOllama` | The LLM used for the ragas evaluation. | ## 2. Admin API Lib @@ -115,7 +117,7 @@ The following endpoints are provided by the *admin-api-lib*: All required python libraries can be found in the [pyproject.toml](./admin-api-lib/pyproject.toml) file. In addition to python libraries, the following system packages are required: -``` +```shell build-essential make ``` @@ -157,10 +159,10 @@ The extracted information will be summarized using LLM. The summary, as well as | key_value_store | [`admin_api_lib.impl.key_db.file_status_key_value_store.FileStatusKeyValueStore`](./admin-api-lib/src/admin_api_lib/impl/key_db/file_status_key_value_store.py) | [`admin_api_lib.impl.key_db.file_status_key_value_store.FileStatusKeyValueStore`](./admin-api-lib/src/admin_api_lib/impl/key_db/file_status_key_value_store.py) | Is used for storing the available sources and their current state. | | chunker | [`admin_api_lib.chunker.chunker.Chunker`](./admin-api-lib/src/admin_api_lib/chunker/chunker.py) | [`admin_api_lib.impl.chunker.text_chunker.TextChunker`](./admin-api-lib/src/admin_api_lib/impl/chunker/text_chunker.py) | Used for splitting the documents in managable chunks. | | document_extractor | [`admin_api_lib.extractor_api_client.openapi_client.api.extractor_api.ExtractorApi`](./admin-api-lib/src/admin_api_lib/extractor_api_client/openapi_client/api/extractor_api.py) | [`admin_api_lib.extractor_api_client.openapi_client.api.extractor_api.ExtractorApi`](./admin-api-lib/src/admin_api_lib/extractor_api_client/openapi_client/api/extractor_api.py) | Needs to be replaced if adjustments to the `extractor-api` is made. | -| rag_api | [`admin_api_lib.rag_backend_client.openapi_client.api.rag_api.RagApi`](./admin-api-lib/src/admin_api_lib/rag_backend_client/openapi_client/api/rag_api.py) | [`admin_api_lib.rag_backend_client.openapi_client.api.rag_api.RagApi`](./admin-api-lib/src/admin_api_lib/rag_backend_client/openapi_client/api/rag_api.py) | Needs to be replaced if changes to the `/information_pieces/remove` or `/information_pieces/upload` of the [`rag-core-api`](#rag-core-api) are made. | +| rag_api | [`admin_api_lib.rag_backend_client.openapi_client.api.rag_api.RagApi`](./admin-api-lib/src/admin_api_lib/rag_backend_client/openapi_client/api/rag_api.py) | [`admin_api_lib.rag_backend_client.openapi_client.api.rag_api.RagApi`](./admin-api-lib/src/admin_api_lib/rag_backend_client/openapi_client/api/rag_api.py) | Needs to be replaced if changes to the `/information_pieces/remove` or `/information_pieces/upload` of the [`rag-core-api`](#1-rag-core-api) are made. | | summarizer_prompt | `str` | [`admin_api_lib.prompt_templates.summarize_prompt.SUMMARIZE_PROMPT`](./admin-api-lib/src/admin_api_lib/prompt_templates/summarize_prompt.py) | The prompt used of the summarization. | | langfuse_manager | [`rag_core_lib.impl.langfuse_manager.langfuse_manager.LangfuseManager`](./rag-core-lib/src/rag_core_lib/impl/langfuse_manager/langfuse_manager.py) | [`rag_core_lib.impl.langfuse_manager.langfuse_manager.LangfuseManager`](./rag-core-lib/src/rag_core_lib/impl/langfuse_manager/langfuse_manager.py) | Retrieves additional settings, as well as the prompt from langfuse if available. | -| summarizer | [`admin_api_lib.summarizer.summarizer.Summarizer`](./admin-api-lib/src/admin_api_lib/summarizer/summarizer.py) | [`admin_api_lib.impl.summarizer.langchain_summarizer.LangchainSummarizer`](./admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py) | Creates the summaries. | +| summarizer | [`admin_api_lib.summarizer.summarizer.Summarizer`](./admin-api-lib/src/admin_api_lib/summarizer/summarizer.py) | [`admin_api_lib.impl.summarizer.langchain_summarizer.LangchainSummarizer`](./admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py) | Creates the summaries. Uses the shared retry decorator with optional per-summarizer overrides (see 2.4). | | untraced_information_enhancer |[`admin_api_lib.information_enhancer.information_enhancer.InformationEnhancer`](./admin-api-lib/src/admin_api_lib/information_enhancer/information_enhancer.py) | [`admin_api_lib.impl.information_enhancer.general_enhancer.GeneralEnhancer`](./admin-api-lib/src/admin_api_lib/impl/information_enhancer/general_enhancer.py) | Uses the *summarizer* to enhance the extracted documents. | | information_enhancer | [`rag_core_lib.chains.async_chain.AsyncChain[Any, Any]`](./rag-core-lib/src/rag_core_lib/chains/async_chain.py)| [`rag_core_lib.impl.tracers.langfuse_traced_chain.LangfuseTracedGraph`](./rag-core-lib/src/rag_core_lib/impl/tracers/langfuse_traced_chain.py) |Wraps around the *untraced_information_enhancer* and adds langfuse tracing. | | document_deleter |[`admin_api_lib.api_endpoints.document_deleter.DocumentDeleter`](./admin-api-lib/src/admin_api_lib/api_endpoints/document_deleter.py) | [`admin_api_lib.impl.api_endpoints.default_document_deleter.DefaultDocumentDeleter`](./admin-api-lib/src/admin_api_lib/impl/api_endpoints/default_document_deleter.py) | Handles deletion of sources. | @@ -169,6 +171,33 @@ The extracted information will be summarized using LLM. The summary, as well as | document_reference_retriever | [`admin_api_lib.api_endpoints.document_reference_retriever.DocumentReferenceRetriever`](./admin-api-lib/src/admin_api_lib/api_endpoints/document_reference_retriever.py) | [`admin_api_lib.impl.api_endpoints.default_document_reference_retriever.DefaultDocumentReferenceRetriever`](./admin-api-lib/src/admin_api_lib/impl/api_endpoints/default_document_reference_retriever.py) | Handles return of files from connected storage. | | file_uploader | [`admin_api_lib.api_endpoints.file_uploader.FileUploader`](./admin-api-lib/src/admin_api_lib/api_endpoints/file_uploader.py) | [`admin_api_lib.impl.api_endpoints.default_file_uploader.DefaultFileUploader`](./admin-api-lib/src/admin_api_lib/impl/api_endpoints/default_file_uploader.py) | Handles upload and extraction of files. | +### 2.4 Summarizer retry behavior + +The default summarizer implementation (`LangchainSummarizer`) now uses the shared retry decorator with exponential backoff from the core library. + +- Decorator: `rag_core_lib.impl.utils.retry_decorator.retry_with_backoff` +- Base settings (fallback): [`RetryDecoratorSettings`](./rag-core-lib/src/rag_core_lib/impl/settings/retry_decorator_settings.py) +- Per-summarizer overrides: [`SummarizerSettings`](./admin-api-lib/src/admin_api_lib/impl/settings/summarizer_settings.py) + +How it resolves settings + +- Each field in `SummarizerSettings` is optional. When a field is provided (not None), it overrides the corresponding value from `RetryDecoratorSettings`. +- When a field is not provided (None), the summarizer falls back to the value from `RetryDecoratorSettings`. +- Zero values (e.g., 0 or 0.0 where allowed) are honored and do not trigger fallback. +- The effective retry configuration is computed once per summarizer instance at initialization. + +Configuring via environment variables + +- Summarizer-specific (prefix `SUMMARIZER_`): + - `SUMMARIZER_MAX_RETRIES` + - `SUMMARIZER_RETRY_BASE_DELAY` + - `SUMMARIZER_RETRY_MAX_DELAY` + - `SUMMARIZER_BACKOFF_FACTOR` + - `SUMMARIZER_ATTEMPT_CAP` + - `SUMMARIZER_JITTER_MIN` + - `SUMMARIZER_JITTER_MAX` +- Global fallback (prefix `RETRY_DECORATOR_`): see section [4.2](#42-retry-decorator-exponential-backoff) for all keys and defaults. + ## 3. Extractor API Lib The Extractor Library contains components that provide document parsing capabilities for various file formats and web sources. It supports extracting content from PDF, DOCX, XML files, as well as web pages via sitemaps and Confluence pages. It also includes a default `dependency_container`, that is pre-configured and is a good starting point for most use-cases. This API should not be exposed by ingress and only used for internally. @@ -197,6 +226,7 @@ tesseract-ocr-eng ### 3.2 Endpoints #### `/extract_from_file` + This endpoint will extract the information from PDF,PTTX,WORD,XML files. It will load the files from the connected storage. The following types of information will be extracted: @@ -215,6 +245,7 @@ The following types of information can be extracted: - `IMAGE`: image found in the document For sitemap sources, additional parameters can be provided, e.g.: + - `web_path`: The URL of the XML sitemap to crawl - `filter_urls`: JSON array of URL patterns to filter pages (optional) - `header_template`: JSON object for custom HTTP headers (optional) From 03804ddca2d1490bfb2f225bda80d5003aecc1ef Mon Sep 17 00:00:00 2001 From: Andreas Klos Date: Wed, 3 Sep 2025 11:33:01 +0200 Subject: [PATCH 03/10] feat: add summarizer retry configuration parameters to adminBackend --- infrastructure/rag/values.yaml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/infrastructure/rag/values.yaml b/infrastructure/rag/values.yaml index fe9b81f9..507e06d2 100644 --- a/infrastructure/rag/values.yaml +++ b/infrastructure/rag/values.yaml @@ -314,6 +314,13 @@ adminBackend: summarizer: SUMMARIZER_MAXIMUM_INPUT_SIZE: "8000" SUMMARIZER_MAXIMUM_CONCURRENCY: "10" + SUMMARIZER_MAX_RETRIES: "5" + SUMMARIZER_RETRY_BASE_DELAY: "0.5" + SUMMARIZER_RETRY_MAX_DELAY: "600" + SUMMARIZER_BACKOFF_FACTOR: "2" + SUMMARIZER_ATTEMPT_CAP: "6" + SUMMARIZER_JITTER_MIN: "0.05" + SUMMARIZER_JITTER_MAX: "0.25" ragapi: RAG_API_HOST: "http://backend:8080" chunker: From 4049b6c275f70c9e25adc810dd64925cc11f4cc3 Mon Sep 17 00:00:00 2001 From: Andreas Klos Date: Wed, 3 Sep 2025 11:34:08 +0200 Subject: [PATCH 04/10] fix: add missing comma in DependencyContainer initialization for retry decorator settings --- libs/admin-api-lib/src/admin_api_lib/dependency_container.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/admin-api-lib/src/admin_api_lib/dependency_container.py b/libs/admin-api-lib/src/admin_api_lib/dependency_container.py index 35eb52ff..a074dbdb 100644 --- a/libs/admin-api-lib/src/admin_api_lib/dependency_container.py +++ b/libs/admin-api-lib/src/admin_api_lib/dependency_container.py @@ -140,7 +140,7 @@ class DependencyContainer(DeclarativeContainer): chunker=summary_text_splitter, semaphore=Singleton(AsyncThreadsafeSemaphore, summarizer_settings.maximum_concurrency), summarizer_settings=summarizer_settings, - retry_decorator_settings=retry_decorator_settings + retry_decorator_settings=retry_decorator_settings, ) summary_enhancer = List( From b681466f857831c835fc1895bf354b9095f76367 Mon Sep 17 00:00:00 2001 From: Andreas Klos Date: Wed, 3 Sep 2025 11:43:46 +0200 Subject: [PATCH 05/10] refactor: remove optional single reduce pass comment in LangchainSummarizer --- .../src/admin_api_lib/impl/summarizer/langchain_summarizer.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py b/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py index 80971743..e71a2296 100644 --- a/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py +++ b/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py @@ -86,7 +86,6 @@ async def ainvoke(self, query: SummarizerInput, config: Optional[RunnableConfig] if len(outputs) == 1: return outputs[0] - # Optional single reduce pass (no recursion) merged = " ".join(outputs) logger.debug( "Reduced number of chars from %d to %d", @@ -142,7 +141,6 @@ def _create_chain(self) -> Runnable: ) def _retry_with_backoff_wrapper(self): - # Prefer summarizer-specific overrides; fall back to global retry settings return retry_with_backoff( settings=self._retry_decorator_settings, exceptions=(APIError, RateLimitError, APITimeoutError, APIConnectionError), From e4ffae299d5b51d9356bb113f51d82b5718f0b00 Mon Sep 17 00:00:00 2001 From: Andreas Klos Date: Wed, 3 Sep 2025 11:57:11 +0200 Subject: [PATCH 06/10] docs: update README to include Helm chart configuration for summarizer environment variables --- libs/README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/libs/README.md b/libs/README.md index d77b74a5..9c0ec4ae 100644 --- a/libs/README.md +++ b/libs/README.md @@ -197,6 +197,7 @@ Configuring via environment variables - `SUMMARIZER_JITTER_MIN` - `SUMMARIZER_JITTER_MAX` - Global fallback (prefix `RETRY_DECORATOR_`): see section [4.2](#42-retry-decorator-exponential-backoff) for all keys and defaults. +- Helm chart: set the same keys under `adminBackend.envs.summarizer` in [infrastructure/rag/values.yaml](../infrastructure/rag/values.yaml). ## 3. Extractor API Lib From 9ec0044deeeb25700413cf290e129a316e356e81 Mon Sep 17 00:00:00 2001 From: Andreas Klos Date: Wed, 3 Sep 2025 13:04:29 +0200 Subject: [PATCH 07/10] feat: add retry configuration parameters for StackitEmbedder and update README with embedder retry behavior --- infrastructure/rag/values.yaml | 9 ++ libs/README.md | 29 +++++++ .../src/rag_core_api/dependency_container.py | 4 +- .../impl/embeddings/stackit_embedder.py | 82 +++++++++++++++++-- .../settings/stackit_embedder_settings.py | 58 ++++++++++++- 5 files changed, 172 insertions(+), 10 deletions(-) diff --git a/infrastructure/rag/values.yaml b/infrastructure/rag/values.yaml index 507e06d2..31903790 100644 --- a/infrastructure/rag/values.yaml +++ b/infrastructure/rag/values.yaml @@ -192,6 +192,14 @@ backend: stackitEmbedder: STACKIT_EMBEDDER_MODEL: "intfloat/e5-mistral-7b-instruct" STACKIT_EMBEDDER_BASE_URL: https://api.openai-compat.model-serving.eu01.onstackit.cloud/v1 + # Retry settings (optional). If omitted, fall back to shared RETRY_DECORATOR_* values. + STACKIT_EMBEDDER_MAX_RETRIES: "5" + STACKIT_EMBEDDER_RETRY_BASE_DELAY: "0.5" + STACKIT_EMBEDDER_RETRY_MAX_DELAY: "600" + STACKIT_EMBEDDER_BACKOFF_FACTOR: "2" + STACKIT_EMBEDDER_ATTEMPT_CAP: "6" + STACKIT_EMBEDDER_JITTER_MIN: "0.05" + STACKIT_EMBEDDER_JITTER_MAX: "0.25" ollama: OLLAMA_MODEL: "llama3.2:3b-instruct-fp16" OLLAMA_BASE_URL: "http://rag-ollama:11434" @@ -314,6 +322,7 @@ adminBackend: summarizer: SUMMARIZER_MAXIMUM_INPUT_SIZE: "8000" SUMMARIZER_MAXIMUM_CONCURRENCY: "10" + # Retry settings (optional). If omitted, fall back to shared RETRY_DECORATOR_* values. SUMMARIZER_MAX_RETRIES: "5" SUMMARIZER_RETRY_BASE_DELAY: "0.5" SUMMARIZER_RETRY_MAX_DELAY: "600" diff --git a/libs/README.md b/libs/README.md index 9c0ec4ae..ca1feb4a 100644 --- a/libs/README.md +++ b/libs/README.md @@ -8,6 +8,7 @@ It consists of the following python packages: - [1.1 Requirements](#11-requirements) - [1.2 Endpoints](#12-endpoints) - [1.3 Replaceable parts](#13-replaceable-parts) + - [1.4 Embedder retry behavior](#14-embedder-retry-behavior) - [`2. Admin API lib`](#2-admin-api-lib) - [2.1 Requirements](#21-requirements) - [2.2 Endpoints](#22-endpoints) @@ -99,6 +100,34 @@ Uploaded documents are required to contain the following metadata: | chat_endpoint | [`rag_core_api.api_endpoints.chat.Chat`](./rag-core-api/src/rag_core_api/api_endpoints/chat.py) | [`rag_core_api.impl.api_endpoints.default_chat.DefaultChat`](./rag-core-api/src/rag_core_api/impl/api_endpoints/default_chat.py) | Implementation of the chat endpoint. Default implementation just calls the *traced_chat_graph* | | ragas_llm | `langchain_core.language_models.chat_models.BaseChatModel` | `langchain_openai.ChatOpenAI` or `langchain_ollama.ChatOllama` | The LLM used for the ragas evaluation. | +### 1.4 Embedder retry behavior + +The default STACKIT embedder implementation (`StackitEmbedder`) uses the shared retry decorator with exponential backoff from the core library. + +- Decorator: `rag_core_lib.impl.utils.retry_decorator.retry_with_backoff` +- Base settings (fallback): [`RetryDecoratorSettings`](./rag-core-lib/src/rag_core_lib/impl/settings/retry_decorator_settings.py) +- Per-embedder overrides: [`StackitEmbedderSettings`](./rag-core-api/src/rag_core_api/impl/settings/stackit_embedder_settings.py) + +How it resolves settings + +- Each retry-related field in `StackitEmbedderSettings` is optional. When a field is provided (not None), it overrides the corresponding value from `RetryDecoratorSettings`. +- When a field is not provided (None), the embedder falls back to the value from `RetryDecoratorSettings`. +- Zero values (e.g., 0 or 0.0 where allowed) are honored and do not trigger fallback. +- The effective retry configuration is computed once per embedder instance at initialization. + +Configuring via environment variables + +- Embedder-specific (prefix `STACKIT_EMBEDDER_`): + - `STACKIT_EMBEDDER_MAX_RETRIES` + - `STACKIT_EMBEDDER_RETRY_BASE_DELAY` + - `STACKIT_EMBEDDER_RETRY_MAX_DELAY` + - `STACKIT_EMBEDDER_BACKOFF_FACTOR` + - `STACKIT_EMBEDDER_ATTEMPT_CAP` + - `STACKIT_EMBEDDER_JITTER_MIN` + - `STACKIT_EMBEDDER_JITTER_MAX` +- Global fallback (prefix `RETRY_DECORATOR_`): see section [4.2](#42-retry-decorator-exponential-backoff) for all keys and defaults. +- Helm chart: set the same keys under `backend.envs.stackitEmbedder` in [infrastructure/rag/values.yaml](../infrastructure/rag/values.yaml). + ## 2. Admin API Lib The Admin API Library contains all required components for file management capabilities for RAG systems, handling all document lifecycle operations. It also includes a default `dependency_container`, that is pre-configured and should fit most use-cases. diff --git a/libs/rag-core-api/src/rag_core_api/dependency_container.py b/libs/rag-core-api/src/rag_core_api/dependency_container.py index 22fa36b9..60afd52d 100644 --- a/libs/rag-core-api/src/rag_core_api/dependency_container.py +++ b/libs/rag-core-api/src/rag_core_api/dependency_container.py @@ -63,6 +63,7 @@ from rag_core_lib.impl.settings.langfuse_settings import LangfuseSettings from rag_core_lib.impl.settings.ollama_llm_settings import OllamaSettings from rag_core_lib.impl.settings.rag_class_types_settings import RAGClassTypeSettings +from rag_core_lib.impl.settings.retry_decorator_settings import RetryDecoratorSettings from rag_core_lib.impl.settings.stackit_vllm_settings import StackitVllmSettings from rag_core_lib.impl.tracers.langfuse_traced_chain import LangfuseTracedGraph from rag_core_lib.impl.utils.async_threadsafe_semaphore import AsyncThreadsafeSemaphore @@ -89,6 +90,7 @@ class DependencyContainer(DeclarativeContainer): stackit_embedder_settings = StackitEmbedderSettings() chat_history_settings = ChatHistorySettings() sparse_embedder_settings = SparseEmbedderSettings() + retry_decorator_settings = RetryDecoratorSettings() chat_history_config.from_dict(chat_history_settings.model_dump()) class_selector_config.from_dict(rag_class_type_settings.model_dump() | embedder_class_type_settings.model_dump()) @@ -98,7 +100,7 @@ class DependencyContainer(DeclarativeContainer): ollama=Singleton( LangchainCommunityEmbedder, embedder=Singleton(OllamaEmbeddings, **ollama_embedder_settings.model_dump()) ), - stackit=Singleton(StackitEmbedder, stackit_embedder_settings), + stackit=Singleton(StackitEmbedder, stackit_embedder_settings, retry_decorator_settings), ) sparse_embedder = Singleton(FastEmbedSparse, **sparse_embedder_settings.model_dump()) diff --git a/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py b/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py index 65d67a1f..63fcad32 100644 --- a/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py +++ b/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py @@ -1,16 +1,23 @@ """Module that contains the StackitEmbedder class.""" from langchain_core.embeddings import Embeddings -from openai import OpenAI +from openai import OpenAI, APIConnectionError, APIError, APITimeoutError, RateLimitError from rag_core_api.embeddings.embedder import Embedder from rag_core_api.impl.settings.stackit_embedder_settings import StackitEmbedderSettings +import logging +from rag_core_lib.impl.settings.retry_decorator_settings import RetryDecoratorSettings +from rag_core_lib.impl.utils.retry_decorator import retry_with_backoff + +logger = logging.getLogger(__name__) class StackitEmbedder(Embedder, Embeddings): """A class that represents any Langchain provided Embedder.""" - def __init__(self, stackit_embedder_settings: StackitEmbedderSettings): + def __init__( + self, stackit_embedder_settings: StackitEmbedderSettings, retry_decorator_settings: RetryDecoratorSettings + ): """ Initialize the StackitEmbedder with the given settings. @@ -18,12 +25,17 @@ def __init__(self, stackit_embedder_settings: StackitEmbedderSettings): ---------- stackit_embedder_settings : StackitEmbedderSettings The settings for configuring the StackitEmbedder, including the API key and base URL. + retry_decorator_settings : RetryDecoratorSettings + Default retry settings used as fallback when StackitEmbedderSettings leaves fields unset. """ self._client = OpenAI( api_key=stackit_embedder_settings.api_key, base_url=stackit_embedder_settings.base_url, ) self._settings = stackit_embedder_settings + self._retry_decorator_settings = self._create_retry_decorator_settings( + stackit_embedder_settings, retry_decorator_settings + ) def get_embedder(self) -> "StackitEmbedder": """Return the embedder instance. @@ -48,12 +60,16 @@ def embed_documents(self, texts: list[str]) -> list[list[float]]: list[list[float]] A list where each element is a list of floats representing the embedded vector of a document. """ - responses = self._client.embeddings.create( - input=texts, - model=self._settings.model, - ) - return [data.embedding for data in responses.data] + @self._retry_with_backoff_wrapper() + def _call(texts: list[str]) -> list[list[float]]: + responses = self._client.embeddings.create( + input=texts, + model=self._settings.model, + ) + return [data.embedding for data in responses.data] + + return _call(texts) def embed_query(self, text: str) -> list[float]: """ @@ -69,4 +85,54 @@ def embed_query(self, text: str) -> list[float]: list[float] The embedded representation of the query text. """ - return self.embed_documents([text])[0] + embeddings_list = self.embed_documents([text]) + if embeddings_list: + embeddings = embeddings_list[0] + return embeddings if embeddings else [] + logger.warning("No embeddings found for query: %s", text) + return embeddings_list + + def _create_retry_decorator_settings( + self, + stackit_settings: StackitEmbedderSettings, + retry_defaults: RetryDecoratorSettings, + ) -> RetryDecoratorSettings: + # Prefer values from StackitEmbedderSettings when provided; + # otherwise fall back to RetryDecoratorSettings defaults + return RetryDecoratorSettings( + max_retries=( + stackit_settings.max_retries if stackit_settings.max_retries is not None else retry_defaults.max_retries + ), + retry_base_delay=( + stackit_settings.retry_base_delay + if stackit_settings.retry_base_delay is not None + else retry_defaults.retry_base_delay + ), + retry_max_delay=( + stackit_settings.retry_max_delay + if stackit_settings.retry_max_delay is not None + else retry_defaults.retry_max_delay + ), + backoff_factor=( + stackit_settings.backoff_factor + if stackit_settings.backoff_factor is not None + else retry_defaults.backoff_factor + ), + attempt_cap=( + stackit_settings.attempt_cap if stackit_settings.attempt_cap is not None else retry_defaults.attempt_cap + ), + jitter_min=( + stackit_settings.jitter_min if stackit_settings.jitter_min is not None else retry_defaults.jitter_min + ), + jitter_max=( + stackit_settings.jitter_max if stackit_settings.jitter_max is not None else retry_defaults.jitter_max + ), + ) + + def _retry_with_backoff_wrapper(self): + return retry_with_backoff( + settings=self._retry_decorator_settings, + exceptions=(APIError, RateLimitError, APITimeoutError, APIConnectionError), + rate_limit_exceptions=(RateLimitError,), + logger=logger, + ) diff --git a/libs/rag-core-api/src/rag_core_api/impl/settings/stackit_embedder_settings.py b/libs/rag-core-api/src/rag_core_api/impl/settings/stackit_embedder_settings.py index e451f06c..ddf09b70 100644 --- a/libs/rag-core-api/src/rag_core_api/impl/settings/stackit_embedder_settings.py +++ b/libs/rag-core-api/src/rag_core_api/impl/settings/stackit_embedder_settings.py @@ -1,6 +1,7 @@ """Module contains settings regarding the stackit embedder.""" -from pydantic import Field +from typing import Optional +from pydantic import Field, PositiveInt from pydantic_settings import BaseSettings @@ -17,6 +18,20 @@ class StackitEmbedderSettings(BaseSettings): (default "https://e629124b-accc-4e25-a1cc-dc57ac741e1d.model-serving.eu01.onstackit.cloud/v1"). api_key : str The API key for authentication. + max_retries: Optional[PositiveInt] + Total retries, not counting the initial attempt. + retry_base_delay: Optional[float] + Base delay in seconds for the first retry. + retry_max_delay: Optional[float] + Maximum delay cap in seconds for any single wait. + backoff_factor: Optional[float] + Exponential backoff factor (>= 1). + attempt_cap: Optional[int] + Cap for exponent growth (backoff_factor ** attempt_cap). + jitter_min: Optional[float] + Minimum jitter in seconds. + jitter_max: Optional[float] + Maximum jitter in seconds. """ class Config: @@ -28,3 +43,44 @@ class Config: model: str = Field(default="intfloat/e5-mistral-7b-instruct") base_url: str = Field(default="https://e629124b-accc-4e25-a1cc-dc57ac741e1d.model-serving.eu01.onstackit.cloud/v1") api_key: str = Field(default="") + max_retries: Optional[PositiveInt] = Field( + default=None, + title="Max Retries", + description="Total retries, not counting the initial attempt.", + ) + retry_base_delay: Optional[float] = Field( + default=None, + ge=0, + title="Retry Base Delay", + description="Base delay in seconds for the first retry.", + ) + retry_max_delay: Optional[float] = Field( + default=None, + gt=0, + title="Retry Max Delay", + description="Maximum delay cap in seconds for any single wait.", + ) + backoff_factor: Optional[float] = Field( + default=None, + ge=1.0, + title="Backoff Factor", + description="Exponential backoff factor (>= 1).", + ) + attempt_cap: Optional[int] = Field( + default=None, + ge=0, + title="Attempt Cap", + description="Cap for exponent growth (backoff_factor ** attempt_cap).", + ) + jitter_min: Optional[float] = Field( + default=None, + ge=0.0, + title="Jitter Min (s)", + description="Minimum jitter in seconds.", + ) + jitter_max: Optional[float] = Field( + default=None, + ge=0.0, + title="Jitter Max (s)", + description="Maximum jitter in seconds.", + ) From 4c5b318e673e446b577877adbef758620311bad4 Mon Sep 17 00:00:00 2001 From: Andreas Klos Date: Wed, 3 Sep 2025 13:10:59 +0200 Subject: [PATCH 08/10] docs: update README to clarify usage of retry decorator in LangchainSummarizer --- libs/README.md | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/libs/README.md b/libs/README.md index 9c0ec4ae..f2b82e13 100644 --- a/libs/README.md +++ b/libs/README.md @@ -173,7 +173,7 @@ The extracted information will be summarized using LLM. The summary, as well as ### 2.4 Summarizer retry behavior -The default summarizer implementation (`LangchainSummarizer`) now uses the shared retry decorator with exponential backoff from the core library. +The default summarizer implementation (`LangchainSummarizer`) now uses the shared retry decorator with exponential backoff from the `rag-core-lib`. - Decorator: `rag_core_lib.impl.utils.retry_decorator.retry_with_backoff` - Base settings (fallback): [`RetryDecoratorSettings`](./rag-core-lib/src/rag_core_lib/impl/settings/retry_decorator_settings.py) @@ -183,8 +183,6 @@ How it resolves settings - Each field in `SummarizerSettings` is optional. When a field is provided (not None), it overrides the corresponding value from `RetryDecoratorSettings`. - When a field is not provided (None), the summarizer falls back to the value from `RetryDecoratorSettings`. -- Zero values (e.g., 0 or 0.0 where allowed) are honored and do not trigger fallback. -- The effective retry configuration is computed once per summarizer instance at initialization. Configuring via environment variables From 8e49f94c8785e7a191542763669cbd6714f4c81d Mon Sep 17 00:00:00 2001 From: Andreas Klos Date: Wed, 3 Sep 2025 13:11:52 +0200 Subject: [PATCH 09/10] docs: update README to clarify embedder retry behavior source --- libs/README.md | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/libs/README.md b/libs/README.md index e1087fd7..960e7cfb 100644 --- a/libs/README.md +++ b/libs/README.md @@ -102,7 +102,7 @@ Uploaded documents are required to contain the following metadata: ### 1.4 Embedder retry behavior -The default STACKIT embedder implementation (`StackitEmbedder`) uses the shared retry decorator with exponential backoff from the core library. +The default STACKIT embedder implementation (`StackitEmbedder`) uses the shared retry decorator with exponential backoff from the `rag-core-lib`. - Decorator: `rag_core_lib.impl.utils.retry_decorator.retry_with_backoff` - Base settings (fallback): [`RetryDecoratorSettings`](./rag-core-lib/src/rag_core_lib/impl/settings/retry_decorator_settings.py) @@ -112,8 +112,6 @@ How it resolves settings - Each retry-related field in `StackitEmbedderSettings` is optional. When a field is provided (not None), it overrides the corresponding value from `RetryDecoratorSettings`. - When a field is not provided (None), the embedder falls back to the value from `RetryDecoratorSettings`. -- Zero values (e.g., 0 or 0.0 where allowed) are honored and do not trigger fallback. -- The effective retry configuration is computed once per embedder instance at initialization. Configuring via environment variables From 7398cc4dbcd69f8be34de833e23e511de64c628d Mon Sep 17 00:00:00 2001 From: Andreas Klos Date: Thu, 9 Oct 2025 10:06:10 +0200 Subject: [PATCH 10/10] refactor: streamline retry decorator settings creation in LangchainSummarizer and StackitEmbedder --- .../impl/summarizer/langchain_summarizer.py | 28 +------------ .../impl/embeddings/stackit_embedder.py | 41 +------------------ .../impl/utils/retry_decorator.py | 38 +++++++++++++++++ 3 files changed, 42 insertions(+), 65 deletions(-) diff --git a/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py b/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py index bb6a99b3..b6b6ab03 100644 --- a/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py +++ b/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py @@ -21,8 +21,7 @@ from rag_core_lib.impl.settings.retry_decorator_settings import RetryDecoratorSettings from rag_core_lib.impl.settings.retry_decorator_settings import RetryDecoratorSettings from rag_core_lib.impl.utils.async_threadsafe_semaphore import AsyncThreadsafeSemaphore -from rag_core_lib.impl.utils.retry_decorator import retry_with_backoff -from rag_core_lib.impl.utils.retry_decorator import retry_with_backoff +from rag_core_lib.impl.utils.retry_decorator import create_retry_decorator_settings, retry_with_backoff logger = logging.getLogger(__name__) @@ -46,10 +45,7 @@ def __init__( self._chunker = chunker self._langfuse_manager = langfuse_manager self._semaphore = semaphore - self._retry_decorator_settings = self._create_retry_decorator_settings( - summarizer_settings, retry_decorator_settings - ) - self._retry_decorator_settings = self._create_retry_decorator_settings( + self._retry_decorator_settings = create_retry_decorator_settings( summarizer_settings, retry_decorator_settings ) @@ -108,26 +104,6 @@ async def ainvoke(self, query: SummarizerInput, config: Optional[RunnableConfig] ) return await self._summarize_chunk(merged, config) - def _create_retry_decorator_settings( - self, summarizer_settings: SummarizerSettings, retry_decorator_settings: RetryDecoratorSettings - ): - fields = [ - "max_retries", - "retry_base_delay", - "retry_max_delay", - "backoff_factor", - "attempt_cap", - "jitter_min", - "jitter_max", - ] - settings_kwargs = { - field: getattr(summarizer_settings, field) - if getattr(summarizer_settings, field) is not None - else getattr(retry_decorator_settings, field) - for field in fields - } - return RetryDecoratorSettings(**settings_kwargs) - def _create_chain(self) -> Runnable: return self._langfuse_manager.get_base_prompt(self.__class__.__name__) | self._langfuse_manager.get_base_llm( self.__class__.__name__ diff --git a/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py b/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py index 63fcad32..ec804d2d 100644 --- a/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py +++ b/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py @@ -7,7 +7,7 @@ from rag_core_api.impl.settings.stackit_embedder_settings import StackitEmbedderSettings import logging from rag_core_lib.impl.settings.retry_decorator_settings import RetryDecoratorSettings -from rag_core_lib.impl.utils.retry_decorator import retry_with_backoff +from rag_core_lib.impl.utils.retry_decorator import create_retry_decorator_settings, retry_with_backoff logger = logging.getLogger(__name__) @@ -33,7 +33,7 @@ def __init__( base_url=stackit_embedder_settings.base_url, ) self._settings = stackit_embedder_settings - self._retry_decorator_settings = self._create_retry_decorator_settings( + self._retry_decorator_settings = create_retry_decorator_settings( stackit_embedder_settings, retry_decorator_settings ) @@ -92,43 +92,6 @@ def embed_query(self, text: str) -> list[float]: logger.warning("No embeddings found for query: %s", text) return embeddings_list - def _create_retry_decorator_settings( - self, - stackit_settings: StackitEmbedderSettings, - retry_defaults: RetryDecoratorSettings, - ) -> RetryDecoratorSettings: - # Prefer values from StackitEmbedderSettings when provided; - # otherwise fall back to RetryDecoratorSettings defaults - return RetryDecoratorSettings( - max_retries=( - stackit_settings.max_retries if stackit_settings.max_retries is not None else retry_defaults.max_retries - ), - retry_base_delay=( - stackit_settings.retry_base_delay - if stackit_settings.retry_base_delay is not None - else retry_defaults.retry_base_delay - ), - retry_max_delay=( - stackit_settings.retry_max_delay - if stackit_settings.retry_max_delay is not None - else retry_defaults.retry_max_delay - ), - backoff_factor=( - stackit_settings.backoff_factor - if stackit_settings.backoff_factor is not None - else retry_defaults.backoff_factor - ), - attempt_cap=( - stackit_settings.attempt_cap if stackit_settings.attempt_cap is not None else retry_defaults.attempt_cap - ), - jitter_min=( - stackit_settings.jitter_min if stackit_settings.jitter_min is not None else retry_defaults.jitter_min - ), - jitter_max=( - stackit_settings.jitter_max if stackit_settings.jitter_max is not None else retry_defaults.jitter_max - ), - ) - def _retry_with_backoff_wrapper(self): return retry_with_backoff( settings=self._retry_decorator_settings, diff --git a/libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py b/libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py index 8c4e94b7..2ded2898 100644 --- a/libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py +++ b/libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py @@ -8,6 +8,8 @@ from functools import wraps from typing import Callable, Optional, ParamSpec, TypeVar +from pydantic_settings import BaseSettings + from rag_core_lib.impl.settings.retry_decorator_settings import RetryDecoratorSettings from rag_core_lib.impl.utils.utils import ( headers_from_exception, @@ -166,3 +168,39 @@ def retry_with_backoff( logger=logger, ) return engine.decorate + +def create_retry_decorator_settings( + ai_settings: BaseSettings, retry_decorator_settings: RetryDecoratorSettings +) -> RetryDecoratorSettings: + """Create retry decorator settings based on AI and default settings. + + If the corresponding field in ai_settings is not set, the value from retry_decorator_settings will be used. + + Parameters + ---------- + ai_settings : BaseSettings + Those are the AI settings, e.g. Embeddings, Summarizers etc. + retry_decorator_settings : RetryDecoratorSettings + Those are the default retry settings. + + Returns + ------- + RetryDecoratorSettings + The combined retry settings. + """ + fields = [ + "max_retries", + "retry_base_delay", + "retry_max_delay", + "backoff_factor", + "attempt_cap", + "jitter_min", + "jitter_max", + ] + settings_kwargs = { + field: getattr(ai_settings, field) + if getattr(ai_settings, field) is not None + else getattr(retry_decorator_settings, field) + for field in fields + } + return RetryDecoratorSettings(**settings_kwargs)