replace custom Azure Redis auth -> redis-entraid#308
Conversation
The previous AzureManagedRedisProvider fetched Entra ID tokens reactively (only on new connections), meaning long-lived connection pools would silently use expired tokens. It also had no path for Celery workers since Kombu doesn't expose redis-py's CredentialProvider hook via broker URL. Replaced with redis-entraid which proactively refreshes tokens in a background thread. A single patch_redis_for_azure() utility monkey-patches redis.ConnectionPool.__init__ globally at process startup, injecting the provider before any connection is made — covering CacheManager, Kombu's broker pool, and the Celery task producer in the FastAPI app with one call each.
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughThis PR adds a Redis/Azure monkey-patch module, wires it into server and worker startup, removes the previous Azure Redis credential path from ChangesAzure Redis Authentication Migration
Estimated code review effort: 4 (Complex) | ~45 minutes Possibly related PRs
Suggested reviewers: 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@wavefront/server/background_jobs/celery_worker/celery_worker/celery_app.py`:
- Around line 7-11: Call patch_redis_for_azure() at module import in
celery_app.py so the Redis monkey-patch is applied before the main worker
process opens broker/backend connections, and keep setup_azure_redis_auth
connected to worker_process_init for pool children. Use the existing
setup_azure_redis_auth hook and patch_redis_for_azure symbol to add the
import-time call without removing the signal-based initialization.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 25371270-5b01-47e3-8877-70d98e8e327d
⛔ Files ignored due to path filters (1)
wavefront/server/uv.lockis excluded by!**/*.lock
📒 Files selected for processing (5)
wavefront/server/apps/floware/floware/server.pywavefront/server/background_jobs/celery_worker/celery_worker/celery_app.pywavefront/server/modules/db_repo_module/db_repo_module/cache/azure_redis_auth.pywavefront/server/modules/db_repo_module/db_repo_module/cache/cache_manager.pywavefront/server/modules/db_repo_module/pyproject.toml
| @worker_process_init.connect | ||
| def setup_azure_redis_auth(**kwargs): | ||
| from db_repo_module.cache.azure_redis_auth import patch_redis_for_azure | ||
|
|
||
| patch_redis_for_azure() |
There was a problem hiding this comment.
🩺 Stability & Availability | 🟠 Major | ⚡ Quick win
🧩 Analysis chain
🌐 Web query:
Celery 5.4 worker_process_init signal documentation child process main worker broker connection timing
💡 Result:
In Celery 5.4, the worker_process_init signal is dispatched within each individual pool child process immediately after it is initialized by the prefork pool, but before it begins processing tasks [1][2]. Key technical details regarding your query: 1. Signal Timing: The worker_process_init signal is triggered by the process_initializer function [2]. This function is responsible for setting up the environment for the child process (e.g., resetting signals, initializing logging, and configuring task execution) [2]. Because it is called as part of the process initialization sequence, it runs in the child process context, not the main worker process context [1][2]. 2. Broker Connection Timing: The main Celery worker process is responsible for managing the connection to the message broker [3]. Broker connections are established by the Consumer bootstep in the main process [3][4]. Child processes in the prefork pool do not independently establish their own primary connections to the broker to fetch tasks; instead, they receive tasks from the main process via an internal pipe/queue mechanism [2]. Therefore, worker_process_init completes its execution before the child processes are ready to receive and execute tasks, and it operates independently of the main process's broker connection lifecycle [2]. 3. Development Constraint: Handlers attached to the worker_process_init signal must not block for more than 4 seconds [1][5]. If a handler takes longer, the worker assumes the child process failed to start and will kill it [1][5]. In summary, worker_process_init is a child-process-specific hook that runs during the boot phase of pool workers, long before they start consuming tasks, and it is entirely separate from the main process's broker connection logic [1][2].
Citations:
- 1: https://docs.celeryq.dev/en/v5.4.0/userguide/signals.html
- 2: https://docs.celeryq.dev/en/v5.4.0/%5Fmodules/celery/concurrency/prefork.html
- 3: https://docs.celeryq.dev/en/v5.4.0/_modules/celery/worker/consumer/consumer.html
- 4: https://docs.celeryq.dev/en/v5.4.0/%5Fmodules/celery/worker/worker.html
- 5: https://docs.celeryq.dev/en/main/userguide/signals.html
🏁 Script executed:
#!/bin/bash
set -euo pipefail
printf '== celery_app.py ==\n'
sed -n '1,120p' wavefront/server/background_jobs/celery_worker/celery_worker/celery_app.py
printf '\n== azure_redis_auth references ==\n'
rg -n "patch_redis_for_azure|azure_redis_auth|worker_process_init|Celery\\(" wavefront/server/background_jobs -SRepository: rootflo/wavefront
Length of output: 1875
🏁 Script executed:
#!/bin/bash
set -euo pipefail
python3 - <<'PY'
from pathlib import Path
p = Path('wavefront/server/background_jobs')
for path in p.rglob('azure_redis_auth.py'):
print(path)
PY
rg -n "def patch_redis_for_azure|redis" wavefront/server/background_jobs -SRepository: rootflo/wavefront
Length of output: 1155
🏁 Script executed:
#!/bin/bash
set -euo pipefail
printf '== worker_setup.py ==\n'
sed -n '1,160p' wavefront/server/background_jobs/celery_worker/celery_worker/worker_setup.py
printf '\n== env.py ==\n'
sed -n '1,160p' wavefront/server/background_jobs/celery_worker/celery_worker/env.py
printf '\n== app import usages ==\n'
rg -n "from celery_worker\.celery_app import app|import celery_worker\.celery_app|celery_app import app" wavefront/server/background_jobs/celery_worker -SRepository: rootflo/wavefront
Length of output: 9143
Patch Azure Redis auth at module import. worker_process_init only runs in pool children, so the main worker process can open the Redis broker/backend connection before this monkey-patch applies. Call patch_redis_for_azure() at import time and keep the signal handler for child processes.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@wavefront/server/background_jobs/celery_worker/celery_worker/celery_app.py`
around lines 7 - 11, Call patch_redis_for_azure() at module import in
celery_app.py so the Redis monkey-patch is applied before the main worker
process opens broker/backend connections, and keep setup_azure_redis_auth
connected to worker_process_init for pool children. Use the existing
setup_azure_redis_auth hook and patch_redis_for_azure symbol to add the
import-time call without removing the signal-based initialization.
Azure Cache for Redis in cluster mode rejects multi-key pipelines that span hash slots. Celery's mingle and gossip startup steps issue cross-slot MULTI/EXEC transactions (e.g. reply.celery.pidbox vs reply.celery.pidbox3), causing workers to crash on boot. --without-mingle --without-gossip skips these handshakes; task execution is unaffected.
Wraps redis-entraid's get_credentials() in a daemon thread with a 10-second timeout. Without this, the proactive background token fetch could block indefinitely on startup if Azure workload identity is slow to respond, causing the server to hang silently for the full socket_timeout (60s).
Azure Managed Redis rejects MULTI pipelines across keys in different
hash slots. Celery's priority-queue suffixes (celery, celery3, etc.)
land in different slots. Setting task_default_queue='{celery}' forces
all derived keys to hash on 'celery', placing them in the same slot.
Azure Managed Redis rejects pipelines across different hash slots. Celery's pidbox priority variants hash to different slots, causing CROSSSLOT in MULTI. Disabling remote control removes pidbox entirely.
task_acks_late=True causes Kombu to pipeline unacked, unacked_index,
and the task queue in a single MULTI block. On Azure Managed Redis,
these keys hash to different slots causing CROSSSLOT. Setting them
with the {celery} hash tag via broker_transport_options forces all
three to the same slot.
The previous AzureManagedRedisProvider fetched Entra ID tokens reactively (only on new connections), meaning long-lived connection pools would silently use expired tokens. It also had no path for Celery workers since Kombu doesn't expose redis-py's CredentialProvider hook via broker URL.
Replaced with redis-entraid which proactively refreshes tokens in a background thread. A single patch_redis_for_azure() utility monkey-patches redis.ConnectionPool.init globally at process startup, injecting the provider before any connection is made — covering CacheManager, Kombu's broker pool, and the Celery task producer in the FastAPI app with one call each.
Summary by CodeRabbit
Bug Fixes
Chores / Configuration
'{celery}'and disabled Celery remote control.redis-entraiddependency.