Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions wavefront/server/apps/floware/floware/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from common_module.log.logger import logger
from common_module.prometheus.prometheus_middleware import PrometheusMiddleware
from common_module.response_formatter import ResponseFormatter
from db_repo_module.cache.azure_redis_auth import patch_redis_for_azure
from db_repo_module.database.connection import DatabaseClient
from db_repo_module.db_repo_container import DatabaseModuleContainer
from fastapi import HTTPException
Expand Down Expand Up @@ -224,6 +225,7 @@

@asynccontextmanager
async def lifespan(app: FastAPI):
patch_redis_for_azure()
# Startup code (runs before the application starts)
logger.info('Starting application...')

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
from celery import Celery
from celery.signals import worker_process_init

from celery_worker.env import CELERY_BROKER_URL, CELERY_RESULT_BACKEND


@worker_process_init.connect
def setup_azure_redis_auth(**kwargs):
from db_repo_module.cache.azure_redis_auth import patch_redis_for_azure

patch_redis_for_azure()
Comment on lines +7 to +11

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🌐 Web query:

Celery 5.4 worker_process_init signal documentation child process main worker broker connection timing

💡 Result:

In Celery 5.4, the worker_process_init signal is dispatched within each individual pool child process immediately after it is initialized by the prefork pool, but before it begins processing tasks [1][2]. Key technical details regarding your query: 1. Signal Timing: The worker_process_init signal is triggered by the process_initializer function [2]. This function is responsible for setting up the environment for the child process (e.g., resetting signals, initializing logging, and configuring task execution) [2]. Because it is called as part of the process initialization sequence, it runs in the child process context, not the main worker process context [1][2]. 2. Broker Connection Timing: The main Celery worker process is responsible for managing the connection to the message broker [3]. Broker connections are established by the Consumer bootstep in the main process [3][4]. Child processes in the prefork pool do not independently establish their own primary connections to the broker to fetch tasks; instead, they receive tasks from the main process via an internal pipe/queue mechanism [2]. Therefore, worker_process_init completes its execution before the child processes are ready to receive and execute tasks, and it operates independently of the main process's broker connection lifecycle [2]. 3. Development Constraint: Handlers attached to the worker_process_init signal must not block for more than 4 seconds [1][5]. If a handler takes longer, the worker assumes the child process failed to start and will kill it [1][5]. In summary, worker_process_init is a child-process-specific hook that runs during the boot phase of pool workers, long before they start consuming tasks, and it is entirely separate from the main process's broker connection logic [1][2].

Citations:


🏁 Script executed:

#!/bin/bash
set -euo pipefail

printf '== celery_app.py ==\n'
sed -n '1,120p' wavefront/server/background_jobs/celery_worker/celery_worker/celery_app.py

printf '\n== azure_redis_auth references ==\n'
rg -n "patch_redis_for_azure|azure_redis_auth|worker_process_init|Celery\\(" wavefront/server/background_jobs -S

Repository: rootflo/wavefront

Length of output: 1875


🏁 Script executed:

#!/bin/bash
set -euo pipefail

python3 - <<'PY'
from pathlib import Path
p = Path('wavefront/server/background_jobs')
for path in p.rglob('azure_redis_auth.py'):
    print(path)
PY

rg -n "def patch_redis_for_azure|redis" wavefront/server/background_jobs -S

Repository: rootflo/wavefront

Length of output: 1155


🏁 Script executed:

#!/bin/bash
set -euo pipefail

printf '== worker_setup.py ==\n'
sed -n '1,160p' wavefront/server/background_jobs/celery_worker/celery_worker/worker_setup.py

printf '\n== env.py ==\n'
sed -n '1,160p' wavefront/server/background_jobs/celery_worker/celery_worker/env.py

printf '\n== app import usages ==\n'
rg -n "from celery_worker\.celery_app import app|import celery_worker\.celery_app|celery_app import app" wavefront/server/background_jobs/celery_worker -S

Repository: rootflo/wavefront

Length of output: 9143


Patch Azure Redis auth at module import. worker_process_init only runs in pool children, so the main worker process can open the Redis broker/backend connection before this monkey-patch applies. Call patch_redis_for_azure() at import time and keep the signal handler for child processes.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@wavefront/server/background_jobs/celery_worker/celery_worker/celery_app.py`
around lines 7 - 11, Call patch_redis_for_azure() at module import in
celery_app.py so the Redis monkey-patch is applied before the main worker
process opens broker/backend connections, and keep setup_azure_redis_auth
connected to worker_process_init for pool children. Use the existing
setup_azure_redis_auth hook and patch_redis_for_azure symbol to add the
import-time call without removing the signal-based initialization.



app = Celery('async_executor')
app.conf.update(
broker_url=CELERY_BROKER_URL,
Expand All @@ -20,4 +29,11 @@
task_reject_on_worker_lost=True, # Re-queue on worker crash
worker_prefetch_multiplier=1, # Fair task distribution
task_track_started=True,
task_default_queue='{celery}',
worker_enable_remote_control=False,
broker_transport_options={
'unacked_key': '{celery}.unacked',
'unacked_index_key': '{celery}.unacked_index',
'unacked_mutex_key': '{celery}.unacked_mutex',
},
)
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
create_api_services_container,
)
from agents_module.agents_container import AgentsContainer
from llm_inference_config_module.container import LlmInferenceConfigContainer
from agents_module.services.agent_inference_service import AgentInferenceService
from agents_module.services.workflow_inference_service import WorkflowInferenceService
from common_module.common_container import CommonContainer
Expand Down Expand Up @@ -124,6 +125,11 @@ def get_services() -> WorkerServices:
message_processor_bucket_name=bucket_name,
)

llm_inference_config_container = LlmInferenceConfigContainer(
db_client=db_repo_container.db_client,
cache_manager=db_repo_container.cache_manager,
)

agents_container = AgentsContainer(
db_client=db_repo_container.db_client,
cloud_storage_manager=common_container.cloud_storage_manager,
Expand All @@ -139,6 +145,7 @@ def get_services() -> WorkerServices:
api_services_manager=api_services_container.api_service_manager,
async_agentic_execution_repository=db_repo_container.async_agentic_execution_repository,
executions_bucket=AGENTIC_EXECUTIONS_BUCKET,
llm_inference_config_service=llm_inference_config_container.llm_inference_config_service,
)

# Inject config values from env vars so services like AgentCrudService
Expand All @@ -162,6 +169,7 @@ def get_services() -> WorkerServices:
agent_repository=db_repo_container.agent_repository,
workflow_repository=db_repo_container.workflow_repository,
async_agentic_execution_service=agents_container.async_agentic_execution_service,
cache_manager=db_repo_container.cache_manager,
)
triggers_container.config.from_dict(
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dependencies = [
"api-services-module",
"common-module",
"triggers-module",
"llm-inference-config-module",
"celery[redis]>=5.4.0,<6.0.0",
"python-dotenv>=1.1.0,<2.0.0",
]
Expand All @@ -28,7 +29,8 @@ flo-utils = { workspace = true }
tools-module = { workspace = true }
api-services-module = { workspace = true }
common-module = { workspace = true }
triggers-module = { workspace = true }
triggers-module = { workspace = true }
llm-inference-config-module = { workspace = true }

[tool.uv]
package = true
Expand Down
2 changes: 1 addition & 1 deletion wavefront/server/docker/celery_worker.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,4 @@ USER celery

WORKDIR /app/background_jobs/celery_worker

CMD ["uv", "run", "celery", "-A", "celery_worker.celery_app", "worker", "--loglevel=info", "--pool=solo"]
CMD ["uv", "run", "celery", "-A", "celery_worker.celery_app", "worker", "--loglevel=info", "--pool=solo", "--without-mingle", "--without-gossip"]
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ async def perform_inference_v2(
output_json_enabled: bool = True,
access_token: Optional[str] = None,
app_key: Optional[str] = None,
llm_config: Optional[LlmInferenceConfig] = None,
) -> tuple[List[BaseMessage], float, str]:
"""
Complete inference workflow (v2): fetch agent from DB + cloud storage, run inference
Expand Down Expand Up @@ -415,8 +416,9 @@ async def perform_inference_v2(
f'Retrieved agent - namespace: {namespace}, name: {name}, agent_id: {agent_id}'
)

# Resolve rootflo model_id references from the YAML, if any
llm_config = await self._resolve_rootflo_llm_config(yaml_content)
# Use caller-supplied config or fall back to resolving from the YAML
if llm_config is None:
llm_config = await self._resolve_rootflo_llm_config(yaml_content)

# Create agent from YAML with optional LLM override and tools
agent = await self.create_agent_from_yaml(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ def get_celery_client() -> Celery:
broker_url = os.getenv('CELERY_BROKER_URL')
if not broker_url:
raise RuntimeError('Missing required env var: CELERY_BROKER_URL')
return Celery('async_executor', broker=broker_url)
app = Celery('async_executor', broker=broker_url)
app.conf.task_default_queue = '{celery}'
return app
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import os
import threading

import redis

_patched = False


def patch_redis_for_azure() -> None:
global _patched
if _patched or os.getenv('CLOUD_PROVIDER', '').lower() != 'azure':
return

from redis_entraid.cred_provider import create_from_default_azure_credential
from redis.credentials import CredentialProvider

_inner = create_from_default_azure_credential(('https://redis.azure.com/.default',))

class _TimedProvider(CredentialProvider):
def get_credentials(self):
result = []
exc = []

def _fetch():
try:
result.append(_inner.get_credentials())
except Exception as e:
exc.append(e)

t = threading.Thread(target=_fetch, daemon=True)
t.start()
t.join(timeout=10)
if t.is_alive():
raise TimeoutError('Azure Redis token fetch timed out after 10s')
if exc:
raise exc[0]
return result[0]

provider = _TimedProvider()
original_init = redis.ConnectionPool.__init__

def patched_init(self, *args, **kw):
kw.pop('password', None)
kw.pop('username', None)
kw['credential_provider'] = provider
original_init(self, *args, **kw)

redis.ConnectionPool.__init__ = patched_init
_patched = True
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
import time
from typing import Any, List, Optional, Union

from azure.core.exceptions import ClientAuthenticationError
from azure.identity import DefaultAzureCredential
from common_module.common_cache import CommonCache
from common_module.log.logger import logger
from redis import Connection
Expand All @@ -13,34 +11,12 @@
from redis import RedisError
from redis import SSLConnection
from redis import TimeoutError
from redis.credentials import CredentialProvider
from tenacity import retry
from tenacity import retry_if_exception_type
from tenacity import stop_after_attempt
from tenacity import wait_exponential


class AzureManagedRedisProvider(CredentialProvider):
"""
Adapter to bridge Azure Identity with Redis CredentialProvider.
Azure Managed Redis requires 'default' as the username and the
Entra ID access token as the password.
"""

def __init__(self):
self.credential = DefaultAzureCredential()
self.scope = 'https://redis.azure.com/.default'
self.username = os.getenv('REDIS_USERNAME', 'default')

def get_credentials(self):
try:
token = self.credential.get_token(self.scope)
return (self.username, token.token)
except ClientAuthenticationError as e:
logger.error(f'Azure authentication failed: {e}')
raise


class CacheManager(CommonCache):
def __init__(
self,
Expand Down Expand Up @@ -88,7 +64,6 @@ def _create_connection_pool(
port = int(os.getenv('REDIS_PORT', 6379))
protocol = os.getenv('REDIS_PROTOCOL', 'redis')
password = os.getenv('REDIS_PASSWORD')
cloud_provider = os.getenv('CLOUD_PROVIDER', '').lower()

connection_class = Connection
if protocol == 'rediss' or port == 10000:
Expand All @@ -110,12 +85,7 @@ def _create_connection_pool(
'decode_responses': True,
}

if cloud_provider == 'azure' and not password:
logger.info(
'Configuring Azure Entra ID (Workload Identity) authentication'
)
pool_kwargs['credential_provider'] = AzureManagedRedisProvider()
elif password:
if password:
pool_kwargs['password'] = password

return ConnectionPool(**pool_kwargs)
Expand Down
1 change: 1 addition & 0 deletions wavefront/server/modules/db_repo_module/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ dependencies = [
"sqlalchemy>=2.0.36,<3.0.0",
"alembic>=1.14.1,<2.0.0",
"redis>=5.2.1,<6.0.0",
"redis-entraid>=1.2.1,<2.0.0",
"azure-identity>=1.17.0,<2.0.0",
"pgvector>=0.4.1",
"tenacity>=8.1.0,<9.0.0",
Expand Down
26 changes: 23 additions & 3 deletions wavefront/server/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading