Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/my-website/docs/proxy/config_settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ router_settings:
| max_response_size_mb | int | The maximum size for responses in MB. LLM Responses above this size will not be sent. |
| proxy_budget_rescheduler_min_time | int | The minimum time (in seconds) to wait before checking db for budget resets. **Default is 597 seconds** |
| proxy_budget_rescheduler_max_time | int | The maximum time (in seconds) to wait before checking db for budget resets. **Default is 605 seconds** |
| proxy_batch_write_at | int | Time (in seconds) to wait before batch writing spend logs to the db. **Default is 10 seconds** |
| proxy_batch_write_at | int | Time (in seconds) to wait before batch writing spend logs to the db. **Default is 30 seconds** |
| proxy_batch_polling_interval | int | Time (in seconds) to wait before polling a batch, to check if it's completed. **Default is 6000 seconds (1 hour)** |
| alerting_args | dict | Args for Slack Alerting [Doc on Slack Alerting](./alerting.md) |
| custom_key_generate | str | Custom function for key generation [Doc on custom key generation](./virtual_keys.md#custom--key-generate) |
Expand Down Expand Up @@ -704,7 +704,7 @@ router_settings:
| PROMPTLAYER_API_KEY | API key for PromptLayer integration
| PROXY_ADMIN_ID | Admin identifier for proxy server
| PROXY_BASE_URL | Base URL for proxy service
| PROXY_BATCH_WRITE_AT | Time in seconds to wait before batch writing spend logs to the database. Default is 10
| PROXY_BATCH_WRITE_AT | Time in seconds to wait before batch writing spend logs to the database. Default is 30
| PROXY_BATCH_POLLING_INTERVAL | Time in seconds to wait before polling a batch, to check if it's completed. Default is 6000s (1 hour)
| PROXY_BUDGET_RESCHEDULER_MAX_TIME | Maximum time in seconds to wait before checking database for budget resets. Default is 605
| PROXY_BUDGET_RESCHEDULER_MIN_TIME | Minimum time in seconds to wait before checking database for budget resets. Default is 597
Expand Down
3 changes: 3 additions & 0 deletions enterprise/litellm_enterprise/integrations/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -2187,6 +2187,9 @@ def initialize_budget_metrics_cron_job(scheduler: AsyncIOScheduler):
prometheus_logger.initialize_remaining_budget_metrics,
"interval",
minutes=PROMETHEUS_BUDGET_METRICS_REFRESH_INTERVAL_MINUTES,
# REMOVED jitter parameter - major cause of memory leak
id="prometheus_budget_metrics_job",
replace_existing=True,
)

@staticmethod
Expand Down
12 changes: 11 additions & 1 deletion litellm/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,17 @@
PROXY_BUDGET_RESCHEDULER_MAX_TIME = int(
os.getenv("PROXY_BUDGET_RESCHEDULER_MAX_TIME", 605)
)
PROXY_BATCH_WRITE_AT = int(os.getenv("PROXY_BATCH_WRITE_AT", 10)) # in seconds
# MEMORY LEAK FIX: Increased from 10s to 30s minimum to prevent memory issues with APScheduler
# Very frequent intervals (<30s) can cause memory leaks in APScheduler's internal functions
PROXY_BATCH_WRITE_AT = int(os.getenv("PROXY_BATCH_WRITE_AT", 30)) # in seconds, increased from 10

# APScheduler Configuration - MEMORY LEAK FIX
# These settings prevent memory leaks in APScheduler's normalize() and _apply_jitter() functions
APSCHEDULER_COALESCE = True # collapse many missed runs into one
APSCHEDULER_MISFIRE_GRACE_TIME = 3600 # ignore runs older than 1 hour (was 120)
APSCHEDULER_MAX_INSTANCES = 1 # prevent concurrent job instances
APSCHEDULER_REPLACE_EXISTING = True # always replace existing jobs

DEFAULT_HEALTH_CHECK_INTERVAL = int(
os.getenv("DEFAULT_HEALTH_CHECK_INTERVAL", 300)
) # 5 minutes
Expand Down
106 changes: 92 additions & 14 deletions litellm/proxy/proxy_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ def generate_feedback_box():
from litellm.caching.caching import DualCache, RedisCache
from litellm.caching.redis_cluster_cache import RedisClusterCache
from litellm.constants import (
APSCHEDULER_COALESCE,
APSCHEDULER_MAX_INSTANCES,
APSCHEDULER_MISFIRE_GRACE_TIME,
APSCHEDULER_REPLACE_EXISTING,
DAYS_IN_A_MONTH,
DEFAULT_HEALTH_CHECK_INTERVAL,
DEFAULT_MODEL_CREATED_AT_TIME,
Expand Down Expand Up @@ -3697,13 +3701,43 @@ async def initialize_scheduled_background_jobs(
):
"""Initializes scheduled background jobs"""
global store_model_in_db
scheduler = AsyncIOScheduler()
interval = random.randint(
proxy_budget_rescheduler_min_time, proxy_budget_rescheduler_max_time
) # random interval, so multiple workers avoid resetting budget at the same time
batch_writing_interval = random.randint(
proxy_batch_write_at - 3, proxy_batch_write_at + 3
) # random interval, so multiple workers avoid batch writing at the same time

# MEMORY LEAK FIX: Configure scheduler with optimized settings
# Memray analysis showed APScheduler's normalize() and _apply_jitter() causing
# massive memory allocations (35GB with 483M allocations)
# Key fixes:
# 1. Remove/minimize jitter to avoid normalize() memory explosion
# 2. Use larger misfire_grace_time to prevent backlog calculations
# 3. Set replace_existing=True to avoid duplicate jobs
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.executors.asyncio import AsyncIOExecutor

scheduler = AsyncIOScheduler(
job_defaults={
"coalesce": APSCHEDULER_COALESCE,
"misfire_grace_time": APSCHEDULER_MISFIRE_GRACE_TIME,
"max_instances": APSCHEDULER_MAX_INSTANCES,
"replace_existing": APSCHEDULER_REPLACE_EXISTING,
},
# Limit job store size to prevent memory growth
jobstores={
'default': MemoryJobStore() # explicitly use memory job store
},
# Use simple executor to minimize overhead
executors={
'default': AsyncIOExecutor(),
},
# Disable timezone awareness to reduce computation
timezone=None
)

# Use fixed intervals with small random offset instead of jitter
# This avoids the expensive jitter calculations in APScheduler
budget_interval = proxy_budget_rescheduler_min_time + random.randint(0,
min(30, proxy_budget_rescheduler_max_time - proxy_budget_rescheduler_min_time))

# Ensure minimum interval of 30 seconds for batch writing to prevent memory issues
batch_writing_interval = max(30, proxy_batch_write_at) + random.randint(0, 5)

### RESET BUDGET ###
if general_settings.get("disable_reset_budget", False) is False:
Expand All @@ -3715,15 +3749,23 @@ async def initialize_scheduled_background_jobs(
scheduler.add_job(
budget_reset_job.reset_budget,
"interval",
seconds=interval,
seconds=budget_interval,
# REMOVED jitter parameter - major cause of memory leak
id="reset_budget_job",
replace_existing=True,
misfire_grace_time=APSCHEDULER_MISFIRE_GRACE_TIME,
)

### UPDATE SPEND ###
scheduler.add_job(
update_spend,
"interval",
seconds=batch_writing_interval,
# REMOVED jitter parameter - major cause of memory leak
args=[prisma_client, db_writer_client, proxy_logging_obj],
id="update_spend_job",
replace_existing=True,
misfire_grace_time=APSCHEDULER_MISFIRE_GRACE_TIME,
)

### ADD NEW MODELS ###
Expand All @@ -3732,11 +3774,17 @@ async def initialize_scheduled_background_jobs(
)

if store_model_in_db is True:
# MEMORY LEAK FIX: Increase interval from 10s to 30s minimum
# Frequent polling was causing excessive memory allocations
scheduler.add_job(
proxy_config.add_deployment,
"interval",
seconds=10,
seconds=30, # increased from 10s to reduce memory pressure
# REMOVED jitter parameter - major cause of memory leak
args=[prisma_client, proxy_logging_obj],
id="add_deployment_job",
replace_existing=True,
misfire_grace_time=APSCHEDULER_MISFIRE_GRACE_TIME,
)

# this will load all existing models on proxy startup
Expand All @@ -3748,8 +3796,12 @@ async def initialize_scheduled_background_jobs(
scheduler.add_job(
proxy_config.get_credentials,
"interval",
seconds=10,
seconds=30, # increased from 10s to reduce memory pressure
# REMOVED jitter parameter - major cause of memory leak
args=[prisma_client],
id="get_credentials_job",
replace_existing=True,
misfire_grace_time=APSCHEDULER_MISFIRE_GRACE_TIME,
)
await proxy_config.get_credentials(prisma_client=prisma_client)
if (
Expand All @@ -3775,15 +3827,22 @@ async def initialize_scheduled_background_jobs(
proxy_logging_obj.slack_alerting_instance.send_weekly_spend_report,
"interval",
days=days,
# REMOVED jitter parameter - major cause of memory leak
# Use random start time instead for distribution
next_run_time=datetime.now()
+ timedelta(seconds=10), # Start 10 seconds from now
+ timedelta(seconds=10 + random.randint(0, 300)), # Random 0-5 min offset
args=[spend_report_frequency],
id="weekly_spend_report_job",
replace_existing=True,
misfire_grace_time=APSCHEDULER_MISFIRE_GRACE_TIME,
)

scheduler.add_job(
proxy_logging_obj.slack_alerting_instance.send_monthly_spend_report,
"cron",
day=1,
id="monthly_spend_report_job",
replace_existing=True,
)

# Beta Feature - only used when prometheus api is in .env
Expand All @@ -3796,6 +3855,8 @@ async def initialize_scheduled_background_jobs(
hour=PROMETHEUS_FALLBACK_STATS_SEND_TIME_HOURS,
minute=0,
timezone=ZoneInfo("America/Los_Angeles"), # Pacific Time
id="prometheus_fallback_stats_job",
replace_existing=True,
)
await proxy_logging_obj.slack_alerting_instance.send_fallback_stats_from_prometheus()

Expand All @@ -3813,8 +3874,12 @@ async def initialize_scheduled_background_jobs(
scheduler.add_job(
spend_log_cleanup.cleanup_old_spend_logs,
"interval",
seconds=interval_seconds,
seconds=interval_seconds + random.randint(0, 60), # Add small random offset
# REMOVED jitter parameter - major cause of memory leak
args=[prisma_client],
id="spend_log_cleanup_job",
replace_existing=True,
misfire_grace_time=APSCHEDULER_MISFIRE_GRACE_TIME,
)
except ValueError:
verbose_proxy_logger.error(
Expand All @@ -3835,7 +3900,11 @@ async def initialize_scheduled_background_jobs(
scheduler.add_job(
check_batch_cost_job.check_batch_cost,
"interval",
seconds=proxy_batch_polling_interval, # these can run infrequently, as batch jobs take time to complete
seconds=proxy_batch_polling_interval + random.randint(0, 30), # Add small random offset
# REMOVED jitter parameter - major cause of memory leak
id="check_batch_cost_job",
replace_existing=True,
misfire_grace_time=APSCHEDULER_MISFIRE_GRACE_TIME,
)

except Exception:
Expand All @@ -3844,7 +3913,16 @@ async def initialize_scheduled_background_jobs(
)
pass

scheduler.start()
# MEMORY LEAK FIX: Start scheduler with paused=False to avoid backlog processing
# Do NOT reset job times to "now" as this can trigger the memory leak
# The misfire_grace_time and coalesce settings will handle any missed runs properly

# Start the scheduler immediately without processing backlogs
scheduler.start(paused=False)
verbose_proxy_logger.info(
f"APScheduler started with memory leak prevention settings: "
f"removed jitter, increased intervals, misfire_grace_time={APSCHEDULER_MISFIRE_GRACE_TIME}"
)

@classmethod
async def _initialize_spend_tracking_background_jobs(
Expand Down
Loading
Loading