fix: move Redis subscriber connect to background task for faster startup#1538
fix: move Redis subscriber connect to background task for faster startup#1538cristiam86 wants to merge 2 commits intomainfrom
Conversation
The Redis subscriber's connect() + subscribe() was blocking the Uvicorn lifespan for ~2.5 minutes in production. Since Uvicorn doesn't serve HTTP requests until the lifespan yields, health probes had no server to talk to during this entire window. The root cause: the background health checker (started before Redis connect) runs blocking sync I/O that starves the async Redis connect, turning a sub-second operation into a multi-minute ordeal. Fix: run Redis subscriber connect in a background task so Uvicorn can start serving health probes and RPC requests immediately. Event handlers are registered before connect (stored locally) so no events are missed once the connection completes.
📝 WalkthroughWalkthroughThe change defers Redis subscriber connection to a background task with handler registration occurring before connecting and a single retry on failure; also adds an Alembic migration creating two indexes on the transactions table. Changes
Sequence Diagram(s)sequenceDiagram
participant App as App Startup
participant Handler as Handler Registration
participant BGTask as Background Task
participant Redis as Redis Subscriber
participant Retry as Retry Logic
App->>Handler: Register validator-change handlers
Handler-->>App: Handlers registered (not yet connected)
App->>BGTask: Schedule _connect_redis_subscriber
App-->>App: Log "Redis subscriber connecting in background"
BGTask->>Redis: Attempt initial connect/start
alt Connection succeeds
Redis-->>BGTask: Connected & listening
BGTask-->>BGTask: Log success
else Connection fails
Redis-->>BGTask: Connection error
BGTask->>Retry: Wait 5 seconds
Retry->>Redis: Attempt second connect/start
alt Retry succeeds
Redis-->>BGTask: Connected & listening
BGTask-->>BGTask: Log success
else Retry fails
Redis-->>BGTask: Connection error
BGTask-->>BGTask: Log final error
end
end
Estimated Code Review Effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/protocol_rpc/app_lifespan.py`:
- Around line 395-398: The function handle_validator_change uses an unnecessary
f-string in its logger call and lacks a type hint for the event_data parameter;
remove the leading "f" from the logger.info call so the message is a plain
string and add an appropriate type annotation for event_data (e.g., event_data:
dict or event_data: Any) on the handle_validator_change signature to improve
clarity and typing while leaving the await validators_manager.restart() call
unchanged.
- Around line 414-438: The retry path in _connect_redis_subscriber can leak a
partially-initialized RedisEventSubscriber because
RedisEventSubscriber.connect() assigns self.redis_client before later steps can
fail; before calling connect()/start() in the retry block, ensure any
previously-created client is cleaned up (e.g., if redis_subscriber.redis_client
is not None call the subscriber's close/disconnect/cleanup method or set it to
None after proper shutdown) or change the retry to call redis_subscriber.start()
only (which will internally call connect() only when redis_client is None) so
you don't re-create a second client; update the retry block around
redis_subscriber.connect()/redis_subscriber.start() accordingly to invoke the
proper cleanup or start-only path.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: bc11910a-94d0-4da9-a950-53f939b258f7
📒 Files selected for processing (1)
backend/protocol_rpc/app_lifespan.py
| async def handle_validator_change(event_data): | ||
| """Reload validators when they change.""" | ||
| logger.info(f"RPC worker reloading validators due to change event") | ||
| await validators_manager.restart() |
There was a problem hiding this comment.
Remove extraneous f-prefix from string literal.
The f-string on line 397 has no placeholders. Additionally, consider adding a type hint for the event_data parameter.
Proposed fix
- async def handle_validator_change(event_data):
+ async def handle_validator_change(event_data: dict):
"""Reload validators when they change."""
- logger.info(f"RPC worker reloading validators due to change event")
+ logger.info("RPC worker reloading validators due to change event")
await validators_manager.restart()🧰 Tools
🪛 Ruff (0.15.6)
[error] 397-397: f-string without any placeholders
Remove extraneous f prefix
(F541)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/protocol_rpc/app_lifespan.py` around lines 395 - 398, The function
handle_validator_change uses an unnecessary f-string in its logger call and
lacks a type hint for the event_data parameter; remove the leading "f" from the
logger.info call so the message is a plain string and add an appropriate type
annotation for event_data (e.g., event_data: dict or event_data: Any) on the
handle_validator_change signature to improve clarity and typing while leaving
the await validators_manager.restart() call unchanged.
| async def _connect_redis_subscriber(): | ||
| """Connect Redis subscriber in background so Uvicorn starts serving immediately.""" | ||
| try: | ||
| await redis_subscriber.connect() | ||
| await redis_subscriber.start() | ||
| logger.info( | ||
| f"[STARTUP] Redis subscriber connected at {redis_url} for worker event broadcasting" | ||
| ) | ||
| except Exception as e: | ||
| logger.error( | ||
| f"Failed to connect Redis subscriber at {redis_url}: {e}. " | ||
| f"Worker events will not be received. Retrying in 5s..." | ||
| ) | ||
| await asyncio.sleep(5) | ||
| try: | ||
| await redis_subscriber.connect() | ||
| await redis_subscriber.start() | ||
| logger.info( | ||
| f"[STARTUP] Redis subscriber connected at {redis_url} (retry succeeded)" | ||
| ) | ||
| except Exception as retry_err: | ||
| logger.error( | ||
| f"Redis subscriber retry failed: {retry_err}. " | ||
| f"Worker events will NOT be forwarded to WebSocket clients." | ||
| ) |
There was a problem hiding this comment.
Potential connection leak if retry is needed after partial connect failure.
If connect() partially succeeds (e.g., redis_client is assigned but ping() or subscribe() fails), the retry block calls connect() again without closing the existing redis_client. Looking at RedisEventSubscriber.connect(), self.redis_client is assigned before the connection test, so a failure after assignment would leak the first connection object.
Consider adding explicit cleanup before retry, or make the retry only call start() since it internally calls connect() if redis_client is None.
Proposed fix: cleanup before retry
except Exception as e:
logger.error(
f"Failed to connect Redis subscriber at {redis_url}: {e}. "
f"Worker events will not be received. Retrying in 5s..."
)
+ # Cleanup any partial connection before retry
+ try:
+ await redis_subscriber.stop()
+ except Exception:
+ pass
await asyncio.sleep(5)
try:
await redis_subscriber.connect()
await redis_subscriber.start()🧰 Tools
🪛 Ruff (0.15.6)
[warning] 422-422: Do not catch blind exception: Exception
(BLE001)
[warning] 434-434: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/protocol_rpc/app_lifespan.py` around lines 414 - 438, The retry path
in _connect_redis_subscriber can leak a partially-initialized
RedisEventSubscriber because RedisEventSubscriber.connect() assigns
self.redis_client before later steps can fail; before calling connect()/start()
in the retry block, ensure any previously-created client is cleaned up (e.g., if
redis_subscriber.redis_client is not None call the subscriber's
close/disconnect/cleanup method or set it to None after proper shutdown) or
change the retry to call redis_subscriber.start() only (which will internally
call connect() only when redis_client is None) so you don't re-create a second
client; update the retry block around
redis_subscriber.connect()/redis_subscriber.start() accordingly to invoke the
proper cleanup or start-only path.
The transactions table had no index on status, causing every health check query to do a full sequential scan. In production this resulted in 61M seq scans and 5.1T tuple reads, driving the DB to 100% CPU. Adds: - idx_transactions_status (status) - idx_transactions_status_to_address (status, to_address)
There was a problem hiding this comment.
🧹 Nitpick comments (2)
backend/database_handler/migration/versions/e4f8a2b7c913_add_transactions_status_indexes.py (2)
36-38: Addif_exists=Truefor downgrade resilience.The downgrade could fail if indexes don't exist (e.g., partial upgrade or manual removal). For consistency with the idempotent upgrade, consider adding
if_exists=True.♻️ Proposed fix
def downgrade() -> None: - op.drop_index("idx_transactions_status_to_address", table_name="transactions") - op.drop_index("idx_transactions_status", table_name="transactions") + op.drop_index("idx_transactions_status_to_address", table_name="transactions", if_exists=True) + op.drop_index("idx_transactions_status", table_name="transactions", if_exists=True)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/database_handler/migration/versions/e4f8a2b7c913_add_transactions_status_indexes.py` around lines 36 - 38, In the downgrade function, make dropping indexes resilient by passing if_exists=True to the op.drop_index calls; update the two calls to op.drop_index that target "idx_transactions_status_to_address" and "idx_transactions_status" (with table_name="transactions") so they include if_exists=True to avoid failing when the indexes are already absent.
21-33: Consider concurrent index creation for large tables.Creating indexes without
CONCURRENTLYcan lock writes on the table during migration. Iftransactionsis large or frequently written in production, usepostgresql_concurrently=Truewithautocommit_block()to avoid blocking writes:from alembic import op def upgrade() -> None: with op.get_context().autocommit_block(): op.create_index( "idx_transactions_status", "transactions", ["status"], if_not_exists=True, postgresql_concurrently=True, ) with op.get_context().autocommit_block(): op.create_index( "idx_transactions_status_to_address", "transactions", ["status", "to_address"], if_not_exists=True, postgresql_concurrently=True, ) def downgrade() -> None: with op.get_context().autocommit_block(): op.drop_index("idx_transactions_status_to_address", table_name="transactions", postgresql_concurrently=True) with op.get_context().autocommit_block(): op.drop_index("idx_transactions_status", table_name="transactions", postgresql_concurrently=True)This is optional depending on your deployment strategy and table size, but recommended for production tables with heavy write traffic.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/database_handler/migration/versions/e4f8a2b7c913_add_transactions_status_indexes.py` around lines 21 - 33, The upgrade currently creates indexes on transactions without using CONCURRENTLY which can block writes; modify upgrade() to create each index inside op.get_context().autocommit_block() and pass postgresql_concurrently=True to op.create_index (for "idx_transactions_status" and "idx_transactions_status_to_address"), and mirror this in downgrade() by dropping indexes with op.drop_index inside autocommit_block() using postgresql_concurrently=True and table_name="transactions" to safely remove them without locking writes.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In
`@backend/database_handler/migration/versions/e4f8a2b7c913_add_transactions_status_indexes.py`:
- Around line 36-38: In the downgrade function, make dropping indexes resilient
by passing if_exists=True to the op.drop_index calls; update the two calls to
op.drop_index that target "idx_transactions_status_to_address" and
"idx_transactions_status" (with table_name="transactions") so they include
if_exists=True to avoid failing when the indexes are already absent.
- Around line 21-33: The upgrade currently creates indexes on transactions
without using CONCURRENTLY which can block writes; modify upgrade() to create
each index inside op.get_context().autocommit_block() and pass
postgresql_concurrently=True to op.create_index (for "idx_transactions_status"
and "idx_transactions_status_to_address"), and mirror this in downgrade() by
dropping indexes with op.drop_index inside autocommit_block() using
postgresql_concurrently=True and table_name="transactions" to safely remove them
without locking writes.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 3e79ea66-975a-4169-95b1-b0e0a84524cd
📒 Files selected for processing (1)
backend/database_handler/migration/versions/e4f8a2b7c913_add_transactions_status_indexes.py
Summary
connect()+subscribe()was blocking the Uvicorn lifespan for ~2.5 minutes in production (rally-studio-prd), preventing health probes from being servedEvidence from rally-studio-prd logs
Test plan
Summary by CodeRabbit
Refactor
Chores