From 399d0bd0cfe4fe45bdbf099095c6752a292fe520 Mon Sep 17 00:00:00 2001 From: nXtCyberNet Date: Fri, 29 May 2026 23:24:30 +0530 Subject: [PATCH 01/10] updated the route namestruct from model to namespace/modelroutename Signed-off-by: nXtCyberNet --- .../filters/ratelimit/global_test.go | 54 ++++++++++++ .../filters/ratelimit/ratelimit.go | 20 ++--- .../filters/ratelimit/ratelimit_test.go | 88 ++++++++++--------- 3 files changed, 111 insertions(+), 51 deletions(-) diff --git a/pkg/kthena-router/filters/ratelimit/global_test.go b/pkg/kthena-router/filters/ratelimit/global_test.go index d6ccae972..9f079ad7a 100644 --- a/pkg/kthena-router/filters/ratelimit/global_test.go +++ b/pkg/kthena-router/filters/ratelimit/global_test.go @@ -214,6 +214,60 @@ func TestTokenRateLimiter_RedisConnectionFailure(t *testing.T) { assert.Contains(t, err.Error(), "failed to connect to redis") } +func TestTokenRateLimiter_Isolation(t *testing.T) { + mr, redisConfig := setupMiniRedis(t) + defer mr.Close() + + rl := NewTokenRateLimiter() + keyOne := "namespace-a/modelroute-1" + keyTwo := "namespace-a/modelroute-2" + prompt := "hello world" // Should be ~3 tokens + tokensOne := uint32(3) + tokensTwo := uint32(10) + unit := networkingv1alpha1.Second + + configOne := &networkingv1alpha1.RateLimit{ + InputTokensPerUnit: &tokensOne, + Unit: unit, + Global: &networkingv1alpha1.GlobalRateLimit{ + Redis: redisConfig, + }, + } + configTwo := &networkingv1alpha1.RateLimit{ + InputTokensPerUnit: &tokensTwo, + Unit: unit, + Global: &networkingv1alpha1.GlobalRateLimit{ + Redis: redisConfig, + }, + } + + require.NoError(t, rl.AddOrUpdateLimiter(keyOne, configOne)) + require.NoError(t, rl.AddOrUpdateLimiter(keyTwo, configTwo)) + + // Each limiter should write to a different Redis key even though the namespace is the same. + redisKeyOne := "kthena:ratelimit:namespace-a/modelroute-1:input" + redisKeyTwo := "kthena:ratelimit:namespace-a/modelroute-2:input" + + err := rl.RateLimit(keyOne, prompt) + require.NoError(t, err) + assert.True(t, mr.Exists(redisKeyOne), "Redis key should exist for the first route") + + err = rl.RateLimit(keyTwo, prompt) + require.NoError(t, err) + assert.True(t, mr.Exists(redisKeyTwo), "Redis key should exist for the second route") + + // Exhaust the first route only. + err = rl.RateLimit(keyOne, prompt) + assert.Error(t, err) + assert.IsType(t, &InputRateLimitExceededError{}, err) + + // The second route should still have capacity because it uses an isolated key. + err = rl.RateLimit(keyTwo, prompt) + require.NoError(t, err) + err = rl.RateLimit(keyTwo, prompt) + require.NoError(t, err) +} + // newTestGlobalRateLimiter creates a GlobalRateLimiter directly with a miniredis-backed client. func newTestGlobalRateLimiter(t *testing.T, mr *miniredis.Miniredis, modelName, tokenType string, limit uint32, unit networkingv1alpha1.RateLimitUnit) *GlobalRateLimiter { client := redis.NewClient(&redis.Options{ diff --git a/pkg/kthena-router/filters/ratelimit/ratelimit.go b/pkg/kthena-router/filters/ratelimit/ratelimit.go index 1d40d1e7a..512032628 100644 --- a/pkg/kthena-router/filters/ratelimit/ratelimit.go +++ b/pkg/kthena-router/filters/ratelimit/ratelimit.go @@ -137,7 +137,7 @@ func (r *TokenRateLimiter) RecordOutputTokens(model string, tokenCount int) { } // AddOrUpdateLimiter adds or updates rate limiter for a model -func (r *TokenRateLimiter) AddOrUpdateLimiter(model string, ratelimit *networkingv1alpha1.RateLimit) error { +func (r *TokenRateLimiter) AddOrUpdateLimiter(limiterKey string, ratelimit *networkingv1alpha1.RateLimit) error { r.mutex.Lock() defer r.mutex.Unlock() @@ -161,10 +161,10 @@ func (r *TokenRateLimiter) AddOrUpdateLimiter(model string, ratelimit *networkin // Create global rate limiters if ratelimit.InputTokensPerUnit != nil { - r.inputLimiter[model] = NewGlobalRateLimiter( + r.inputLimiter[limiterKey] = NewGlobalRateLimiter( r.redisClient, "kthena:ratelimit", - model, + limiterKey, "input", *ratelimit.InputTokensPerUnit, ratelimit.Unit, @@ -172,10 +172,10 @@ func (r *TokenRateLimiter) AddOrUpdateLimiter(model string, ratelimit *networkin } if ratelimit.OutputTokensPerUnit != nil { - r.outputLimiter[model] = NewGlobalRateLimiter( + r.outputLimiter[limiterKey] = NewGlobalRateLimiter( r.redisClient, "kthena:ratelimit", - model, + limiterKey, "output", *ratelimit.OutputTokensPerUnit, ratelimit.Unit, @@ -186,14 +186,14 @@ func (r *TokenRateLimiter) AddOrUpdateLimiter(model string, ratelimit *networkin duration := getTimeUnitDuration(ratelimit.Unit) if ratelimit.InputTokensPerUnit != nil { - r.inputLimiter[model] = NewLocalLimiter( + r.inputLimiter[limiterKey] = NewLocalLimiter( rate.Limit(float64(*ratelimit.InputTokensPerUnit)/duration.Seconds()), int(*ratelimit.InputTokensPerUnit), ) } if ratelimit.OutputTokensPerUnit != nil { - r.outputLimiter[model] = NewLocalLimiter( + r.outputLimiter[limiterKey] = NewLocalLimiter( rate.Limit(float64(*ratelimit.OutputTokensPerUnit)/duration.Seconds()), int(*ratelimit.OutputTokensPerUnit), ) @@ -204,12 +204,12 @@ func (r *TokenRateLimiter) AddOrUpdateLimiter(model string, ratelimit *networkin } // DeleteLimiter deletes rate limiter for a model -func (r *TokenRateLimiter) DeleteLimiter(model string) { +func (r *TokenRateLimiter) DeleteLimiter(limiterKey string) { r.mutex.Lock() defer r.mutex.Unlock() - delete(r.inputLimiter, model) - delete(r.outputLimiter, model) + delete(r.inputLimiter, limiterKey) + delete(r.outputLimiter, limiterKey) } func getTimeUnitDuration(unit networkingv1alpha1.RateLimitUnit) time.Duration { diff --git a/pkg/kthena-router/filters/ratelimit/ratelimit_test.go b/pkg/kthena-router/filters/ratelimit/ratelimit_test.go index cdaad0365..d9a3759ea 100644 --- a/pkg/kthena-router/filters/ratelimit/ratelimit_test.go +++ b/pkg/kthena-router/filters/ratelimit/ratelimit_test.go @@ -17,6 +17,7 @@ limitations under the License. package ratelimit import ( + "fmt" "testing" "time" @@ -25,26 +26,26 @@ import ( func TestTokenRateLimiter_Basic(t *testing.T) { rl := NewTokenRateLimiter() - model := "test-model" + limiterKey := testLimiterKey("default", "test-route") prompt := "hello world" // 3 tokens tokens := uint32(10) unit := networkingv1alpha1.Second - rl.AddOrUpdateLimiter(model, &networkingv1alpha1.RateLimit{ + rl.AddOrUpdateLimiter(limiterKey, &networkingv1alpha1.RateLimit{ InputTokensPerUnit: &tokens, Unit: unit, }) // Should allow up to 10 tokens immediately for i := 0; i < 3; i++ { - err := rl.RateLimit(model, prompt) + err := rl.RateLimit(limiterKey, prompt) if err != nil { t.Fatalf("unexpected error on allowed request: %v, %d", err, i) } } // 4th request should be rate limited - err := rl.RateLimit(model, prompt) + err := rl.RateLimit(limiterKey, prompt) if err == nil { t.Fatalf("expected rate limit error, got nil") } @@ -56,7 +57,7 @@ func TestTokenRateLimiter_Basic(t *testing.T) { func TestTokenRateLimiter_NoLimiter(t *testing.T) { rl := NewTokenRateLimiter() // No limiter added, should always allow - err := rl.RateLimit("unknown-model", "test") + err := rl.RateLimit(testLimiterKey("unknown", "route"), "test") if err != nil { t.Fatalf("expected nil error for unknown model, got %v", err) } @@ -64,25 +65,25 @@ func TestTokenRateLimiter_NoLimiter(t *testing.T) { func TestTokenRateLimiter_ResetAfterTime(t *testing.T) { rl := NewTokenRateLimiter() - model := "test-model" + limiterKey := testLimiterKey("default", "test-route") prompt := "hello world" tokens := uint32(10) unit := networkingv1alpha1.Second - rl.AddOrUpdateLimiter(model, &networkingv1alpha1.RateLimit{ + rl.AddOrUpdateLimiter(limiterKey, &networkingv1alpha1.RateLimit{ InputTokensPerUnit: &tokens, Unit: unit, }) // Use up tokens for i := 0; i < 3; i++ { - err := rl.RateLimit(model, prompt) + err := rl.RateLimit(limiterKey, prompt) if err != nil { t.Fatalf("unexpected error: %v", err) } } // Should be rate limited now - err := rl.RateLimit(model, prompt) + err := rl.RateLimit(limiterKey, prompt) if err == nil { t.Fatalf("expected rate limit error, got nil") } @@ -92,7 +93,7 @@ func TestTokenRateLimiter_ResetAfterTime(t *testing.T) { // Wait for refill time.Sleep(1100 * time.Millisecond) - err = rl.RateLimit(model, prompt) + err = rl.RateLimit(limiterKey, prompt) if err != nil { t.Fatalf("expected nil after refill, got %v", err) } @@ -100,48 +101,48 @@ func TestTokenRateLimiter_ResetAfterTime(t *testing.T) { func TestTokenRateLimiter_OutputTokenRecording(t *testing.T) { rl := NewTokenRateLimiter() - model := "test-model" + limiterKey := testLimiterKey("default", "test-route") tokens := uint32(10) unit := networkingv1alpha1.Second - rl.AddOrUpdateLimiter(model, &networkingv1alpha1.RateLimit{ + rl.AddOrUpdateLimiter(limiterKey, &networkingv1alpha1.RateLimit{ OutputTokensPerUnit: &tokens, Unit: unit, }) // Record output tokens - this should not block/error - rl.RecordOutputTokens(model, 5) - rl.RecordOutputTokens(model, 3) - rl.RecordOutputTokens(model, 2) // Total: 10 tokens consumed + rl.RecordOutputTokens(limiterKey, 5) + rl.RecordOutputTokens(limiterKey, 3) + rl.RecordOutputTokens(limiterKey, 2) // Total: 10 tokens consumed // Recording more tokens should still work (just consumes from the bucket) - rl.RecordOutputTokens(model, 1) + rl.RecordOutputTokens(limiterKey, 1) } func TestTokenRateLimiter_CombinedInputOutput(t *testing.T) { rl := NewTokenRateLimiter() - model := "test-model" + limiterKey := testLimiterKey("default", "test-route") prompt := "hello world hello world" // Should be ~6 tokens inputTokens := uint32(8) // Allow only one request (6 tokens < 8, but two requests = 12 > 8) outputTokens := uint32(10) // Allow output recording unit := networkingv1alpha1.Second - rl.AddOrUpdateLimiter(model, &networkingv1alpha1.RateLimit{ + rl.AddOrUpdateLimiter(limiterKey, &networkingv1alpha1.RateLimit{ InputTokensPerUnit: &inputTokens, OutputTokensPerUnit: &outputTokens, Unit: unit, }) // First request should be allowed - err := rl.RateLimit(model, prompt) + err := rl.RateLimit(limiterKey, prompt) if err != nil { t.Fatalf("unexpected error on first request: %v", err) } // Record output tokens used - rl.RecordOutputTokens(model, 2) + rl.RecordOutputTokens(limiterKey, 2) // Second request should be rate limited due to input token exhaustion - err = rl.RateLimit(model, prompt) + err = rl.RateLimit(limiterKey, prompt) if err == nil { t.Fatalf("expected rate limit error after exhausting input tokens") } @@ -153,72 +154,72 @@ func TestTokenRateLimiter_CombinedInputOutput(t *testing.T) { func TestTokenRateLimiter_OutputNoLimiter(t *testing.T) { rl := NewTokenRateLimiter() // No limiter added, should not error when recording output tokens - rl.RecordOutputTokens("unknown-model", 100) + rl.RecordOutputTokens(testLimiterKey("unknown", "route"), 100) // RecordOutputTokens doesn't return error, just silently does nothing } func TestTokenRateLimiter_DeleteLimiter(t *testing.T) { rl := NewTokenRateLimiter() - model := "test-model" + limiterKey := testLimiterKey("default", "test-route") inputTokens := uint32(3) // Very restrictive outputTokens := uint32(5) unit := networkingv1alpha1.Second - rl.AddOrUpdateLimiter(model, &networkingv1alpha1.RateLimit{ + rl.AddOrUpdateLimiter(limiterKey, &networkingv1alpha1.RateLimit{ InputTokensPerUnit: &inputTokens, OutputTokensPerUnit: &outputTokens, Unit: unit, }) // Verify limiter exists and restricts - err := rl.RateLimit(model, "hello world") // ~3 tokens + err := rl.RateLimit(limiterKey, "hello world") // ~3 tokens if err != nil { t.Fatalf("first request should be allowed: %v", err) } - err = rl.RateLimit(model, "hello world") // Should be rate limited + err = rl.RateLimit(limiterKey, "hello world") // Should be rate limited if err == nil { t.Fatalf("expected rate limit error") } // Delete limiters - rl.DeleteLimiter(model) + rl.DeleteLimiter(limiterKey) // Should now be unrestricted for i := 0; i < 10; i++ { - err = rl.RateLimit(model, "hello world") + err = rl.RateLimit(limiterKey, "hello world") if err != nil { t.Fatalf("expected nil after deletion, got %v", err) } } // Recording output tokens should work without error - rl.RecordOutputTokens(model, 100) + rl.RecordOutputTokens(limiterKey, 100) } func TestTokenRateLimiter_OutputRateLimit(t *testing.T) { rl := NewTokenRateLimiter() - model := "test-model" + limiterKey := testLimiterKey("default", "test-route") prompt := "hello world" outputTokens := uint32(5) // Very low limit unit := networkingv1alpha1.Second - rl.AddOrUpdateLimiter(model, &networkingv1alpha1.RateLimit{ + rl.AddOrUpdateLimiter(limiterKey, &networkingv1alpha1.RateLimit{ OutputTokensPerUnit: &outputTokens, Unit: unit, }) // First request should be allowed (has 5 tokens available) - err := rl.RateLimit(model, prompt) + err := rl.RateLimit(limiterKey, prompt) if err != nil { t.Fatalf("first request should be allowed: %v", err) } // Consume most tokens - rl.RecordOutputTokens(model, 5) + rl.RecordOutputTokens(limiterKey, 5) // Next request should be blocked due to insufficient output tokens - err = rl.RateLimit(model, prompt) + err = rl.RateLimit(limiterKey, prompt) if err == nil { t.Fatalf("expected output rate limit error") } @@ -229,19 +230,19 @@ func TestTokenRateLimiter_OutputRateLimit(t *testing.T) { func TestTokenRateLimiter_InputAndOutputErrors(t *testing.T) { rl := NewTokenRateLimiter() - model := "test-model" longPrompt := "hello world hello world hello world" // Should be ~9 tokens inputTokens := uint32(5) // Very low input limit outputTokens := uint32(10) // Higher output limit unit := networkingv1alpha1.Second // Test input rate limit error - rl.AddOrUpdateLimiter(model+"-input", &networkingv1alpha1.RateLimit{ + inputLimiterKey := testLimiterKey("default", "test-route-input") + rl.AddOrUpdateLimiter(inputLimiterKey, &networkingv1alpha1.RateLimit{ InputTokensPerUnit: &inputTokens, Unit: unit, }) - err := rl.RateLimit(model+"-input", longPrompt) + err := rl.RateLimit(inputLimiterKey, longPrompt) if err == nil { t.Fatalf("expected input rate limit error") } @@ -250,25 +251,26 @@ func TestTokenRateLimiter_InputAndOutputErrors(t *testing.T) { } // Test output rate limit error - rl.AddOrUpdateLimiter(model+"-output", &networkingv1alpha1.RateLimit{ + outputLimiterKey := testLimiterKey("default", "test-route-output") + rl.AddOrUpdateLimiter(outputLimiterKey, &networkingv1alpha1.RateLimit{ OutputTokensPerUnit: &outputTokens, Unit: unit, }) // First make a successful request to establish the limiter - err = rl.RateLimit(model+"-output", "short") + err = rl.RateLimit(outputLimiterKey, "short") if err != nil { t.Fatalf("first request should succeed: %v", err) } // Consume all available output tokens - rl.RecordOutputTokens(model+"-output", 10) // Consume all 10 tokens + rl.RecordOutputTokens(outputLimiterKey, 10) // Consume all 10 tokens // Wait a bit for the tokens to be recorded time.Sleep(10 * time.Millisecond) // Next request should be blocked due to insufficient output tokens (< 1 token available) - err = rl.RateLimit(model+"-output", "short") // Short prompt to avoid input limit + err = rl.RateLimit(outputLimiterKey, "short") // Short prompt to avoid input limit if err == nil { t.Fatalf("expected output rate limit error") } @@ -276,3 +278,7 @@ func TestTokenRateLimiter_InputAndOutputErrors(t *testing.T) { t.Fatalf("expected OutputRateLimitExceededError, got %T: %v", err, err) } } + +func testLimiterKey(namespace, modelRouteName string) string { + return fmt.Sprintf("%s/%s", namespace, modelRouteName) +} From 199be106ba96cd7fdcb7d6d8deb7a7af633e0d47 Mon Sep 17 00:00:00 2001 From: nXtCyberNet Date: Wed, 3 Jun 2026 14:54:34 +0530 Subject: [PATCH 02/10] changed the key in main flow Signed-off-by: nXtCyberNet --- pkg/kthena-router/datastore/store.go | 2 +- .../filters/ratelimit/ratelimit.go | 10 ++--- pkg/kthena-router/router/router.go | 42 +++++++++++++------ 3 files changed, 36 insertions(+), 18 deletions(-) diff --git a/pkg/kthena-router/datastore/store.go b/pkg/kthena-router/datastore/store.go index ab9e1211f..c85240aac 100644 --- a/pkg/kthena-router/datastore/store.go +++ b/pkg/kthena-router/datastore/store.go @@ -1099,7 +1099,7 @@ func (s *store) DeleteModelRoute(namespacedName string) error { s.triggerCallbacks("ModelRoute", EventData{ EventType: EventDelete, ModelName: modelName, - ModelRoute: nil, + ModelRoute: deletedRoute, }) return nil } diff --git a/pkg/kthena-router/filters/ratelimit/ratelimit.go b/pkg/kthena-router/filters/ratelimit/ratelimit.go index 512032628..20f44e3da 100644 --- a/pkg/kthena-router/filters/ratelimit/ratelimit.go +++ b/pkg/kthena-router/filters/ratelimit/ratelimit.go @@ -98,7 +98,7 @@ func NewTokenRateLimiter() *TokenRateLimiter { } // RateLimit checks if the request is within rate limits for both input and output tokens -func (r *TokenRateLimiter) RateLimit(model, prompt string) error { +func (r *TokenRateLimiter) RateLimit(limiterKey, prompt string) error { // Estimate input tokens tokens, err := r.tokenizer.CalculateTokenNum(prompt) if err != nil { @@ -107,8 +107,8 @@ func (r *TokenRateLimiter) RateLimit(model, prompt string) error { } r.mutex.RLock() - inputLimiter, hasInputLimit := r.inputLimiter[model] - outputLimiter, hasOutputLimit := r.outputLimiter[model] + inputLimiter, hasInputLimit := r.inputLimiter[limiterKey] + outputLimiter, hasOutputLimit := r.outputLimiter[limiterKey] r.mutex.RUnlock() // Check input token rate limit @@ -126,9 +126,9 @@ func (r *TokenRateLimiter) RateLimit(model, prompt string) error { } // RecordOutputTokens records the actual output tokens consumed after response generation -func (r *TokenRateLimiter) RecordOutputTokens(model string, tokenCount int) { +func (r *TokenRateLimiter) RecordOutputTokens(limiterKey string, tokenCount int) { r.mutex.RLock() - outputLimiter, exists := r.outputLimiter[model] + outputLimiter, exists := r.outputLimiter[limiterKey] r.mutex.RUnlock() if exists { diff --git a/pkg/kthena-router/router/router.go b/pkg/kthena-router/router/router.go index d0f2edce8..628ff79ae 100644 --- a/pkg/kthena-router/router/router.go +++ b/pkg/kthena-router/router/router.go @@ -106,16 +106,20 @@ func NewRouter(store datastore.Store, routerConfigPath string) *Router { if data.ModelRoute == nil || data.ModelRoute.Spec.RateLimit == nil { return } - klog.Infof("add or update rate limit for model %s", data.ModelName) + // Use namespace/routename as the rate limit key + routeKey := fmt.Sprintf("%s/%s", data.ModelRoute.Namespace, data.ModelRoute.Name) + klog.Infof("add or update rate limit for route %s", routeKey) - // Configure the unified rate limiter for this model - if err := loadRateLimiter.AddOrUpdateLimiter(data.ModelName, data.ModelRoute.Spec.RateLimit); err != nil { - klog.Errorf("failed to configure rate limiter for model %s: %v", data.ModelName, err) + // Configure the unified rate limiter for this route + if err := loadRateLimiter.AddOrUpdateLimiter(routeKey, data.ModelRoute.Spec.RateLimit); err != nil { + klog.Errorf("failed to configure rate limiter for route %s: %v", routeKey, err) } case datastore.EventDelete: - klog.Infof("delete rate limit for model %s", data.ModelName) - loadRateLimiter.DeleteLimiter(data.ModelName) + // Use namespace/routename as the rate limit key + routeKey := fmt.Sprintf("%s/%s", data.ModelRoute.Namespace, data.ModelRoute.Name) + klog.Infof("delete rate limit for route %s", routeKey) + loadRateLimiter.DeleteLimiter(routeKey) } }) @@ -274,8 +278,18 @@ func (r *Router) HandlerFunc() gin.HandlerFunc { // Record input tokens immediately metricsRecorder.RecordInputTokens(inputTokens) - // Apply rate limiting using the unified rate limiter - if err := r.loadRateLimiter.RateLimit(modelName, promptStr); err != nil { + // Get gateway key and match ModelRoute to extract route key for rate limiting + gatewayKey := c.GetString(GatewayKey) + + // Match ModelRoute to get the route key. Require ModelRoute match only. + _, _, modelRoute, _ := r.store.MatchModelServer(modelName, c.Request, gatewayKey) + + rateLimitKey := fmt.Sprintf("%s/%s", modelRoute.Namespace, modelRoute.Name) + // store rateLimitKey in context for later output-token recording + c.Set("rateLimitKey", rateLimitKey) + + // Apply rate limiting using the route key + if err := r.loadRateLimiter.RateLimit(rateLimitKey, promptStr); err != nil { var errorMsg string var errorType string var tokenType string @@ -307,7 +321,7 @@ func (r *Router) HandlerFunc() gin.HandlerFunc { c.Request.Header.Set("x-request-id", requestID) } - // Store metrics recorder in context for use in other functions + // Store metrics recorder in context for use in other functions c.Set("metricsRecorder", metricsRecorder) // step 3.1: load balancing @@ -806,9 +820,11 @@ func (r *Router) proxyModelEndpoint( if resp.Usage.TotalTokens <= 0 { return } - // Record output tokens for rate limiting + // Record output tokens for rate limiting using the route key if r.loadRateLimiter != nil { - r.loadRateLimiter.RecordOutputTokens(modelName, resp.Usage.CompletionTokens) + if rateLimitKeyVal, ok := c.Get("rateLimitKey"); ok { + r.loadRateLimiter.RecordOutputTokens(rateLimitKeyVal.(string), resp.Usage.CompletionTokens) + } } // Update access log with output tokens if accessCtx := accesslog.GetAccessLogContext(c); accessCtx != nil { @@ -1110,7 +1126,9 @@ func (r *Router) proxyToPDDisaggregated( // Record output tokens for rate limiting if outputTokens > 0 && r.loadRateLimiter != nil { - r.loadRateLimiter.RecordOutputTokens(ctx.Model, outputTokens) + if rateLimitKeyVal, ok := c.Get("rateLimitKey"); ok { + r.loadRateLimiter.RecordOutputTokens(rateLimitKeyVal.(string), outputTokens) + } } // Record output token metrics From e1aad99699fa7222c0420cf51d07c54c4853a716 Mon Sep 17 00:00:00 2001 From: nXtCyberNet Date: Wed, 3 Jun 2026 18:39:20 +0530 Subject: [PATCH 03/10] added err log Signed-off-by: nXtCyberNet --- pkg/kthena-router/router/router.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/pkg/kthena-router/router/router.go b/pkg/kthena-router/router/router.go index 628ff79ae..2ba5553d2 100644 --- a/pkg/kthena-router/router/router.go +++ b/pkg/kthena-router/router/router.go @@ -101,13 +101,16 @@ func NewRouter(store datastore.Store, routerConfigPath string) *Router { tokenizerInstance := tokenizer.NewSimpleEstimateTokenizer() store.RegisterCallback("ModelRoute", func(data datastore.EventData) { + routeKey := fmt.Sprintf("%s/%s", + data.ModelRoute.Namespace, + data.ModelRoute.Name, + ) switch data.EventType { case datastore.EventAdd, datastore.EventUpdate: if data.ModelRoute == nil || data.ModelRoute.Spec.RateLimit == nil { return } // Use namespace/routename as the rate limit key - routeKey := fmt.Sprintf("%s/%s", data.ModelRoute.Namespace, data.ModelRoute.Name) klog.Infof("add or update rate limit for route %s", routeKey) // Configure the unified rate limiter for this route @@ -117,7 +120,6 @@ func NewRouter(store datastore.Store, routerConfigPath string) *Router { case datastore.EventDelete: // Use namespace/routename as the rate limit key - routeKey := fmt.Sprintf("%s/%s", data.ModelRoute.Namespace, data.ModelRoute.Name) klog.Infof("delete rate limit for route %s", routeKey) loadRateLimiter.DeleteLimiter(routeKey) } @@ -283,7 +285,10 @@ func (r *Router) HandlerFunc() gin.HandlerFunc { // Match ModelRoute to get the route key. Require ModelRoute match only. _, _, modelRoute, _ := r.store.MatchModelServer(modelName, c.Request, gatewayKey) - + if err != nil || modelRoute == nil { + c.AbortWithStatusJSON(http.StatusNotFound, "route not found") + return + } rateLimitKey := fmt.Sprintf("%s/%s", modelRoute.Namespace, modelRoute.Name) // store rateLimitKey in context for later output-token recording c.Set("rateLimitKey", rateLimitKey) From 03c06eddc2c6fdd8a44d4b1027686fafa898696f Mon Sep 17 00:00:00 2001 From: nXtCyberNet Date: Wed, 3 Jun 2026 22:28:32 +0530 Subject: [PATCH 04/10] solved the issue Signed-off-by: nXtCyberNet --- pkg/kthena-router/router/router.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/kthena-router/router/router.go b/pkg/kthena-router/router/router.go index 2ba5553d2..5a3f34396 100644 --- a/pkg/kthena-router/router/router.go +++ b/pkg/kthena-router/router/router.go @@ -284,7 +284,8 @@ func (r *Router) HandlerFunc() gin.HandlerFunc { gatewayKey := c.GetString(GatewayKey) // Match ModelRoute to get the route key. Require ModelRoute match only. - _, _, modelRoute, _ := r.store.MatchModelServer(modelName, c.Request, gatewayKey) + rateLimitKey, err := r.store.GetRateLimitKey(modelName, c.Request, gatewayKey) + if err != nil || modelRoute == nil { c.AbortWithStatusJSON(http.StatusNotFound, "route not found") return From 98e8ca5b14cf3e14676335c12336d2df3147a32e Mon Sep 17 00:00:00 2001 From: nXtCyberNet Date: Wed, 3 Jun 2026 22:32:04 +0530 Subject: [PATCH 05/10] . Signed-off-by: nXtCyberNet --- pkg/kthena-router/router/router.go | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/pkg/kthena-router/router/router.go b/pkg/kthena-router/router/router.go index 5a3f34396..35beaa6e0 100644 --- a/pkg/kthena-router/router/router.go +++ b/pkg/kthena-router/router/router.go @@ -283,16 +283,7 @@ func (r *Router) HandlerFunc() gin.HandlerFunc { // Get gateway key and match ModelRoute to extract route key for rate limiting gatewayKey := c.GetString(GatewayKey) - // Match ModelRoute to get the route key. Require ModelRoute match only. - rateLimitKey, err := r.store.GetRateLimitKey(modelName, c.Request, gatewayKey) - - if err != nil || modelRoute == nil { - c.AbortWithStatusJSON(http.StatusNotFound, "route not found") - return - } - rateLimitKey := fmt.Sprintf("%s/%s", modelRoute.Namespace, modelRoute.Name) - // store rateLimitKey in context for later output-token recording - c.Set("rateLimitKey", rateLimitKey) + rateLimitKey := gatewayKey // Apply rate limiting using the route key if err := r.loadRateLimiter.RateLimit(rateLimitKey, promptStr); err != nil { From 263ab88e04d0fa046de4c761d19084ec12b36628 Mon Sep 17 00:00:00 2001 From: nXtCyberNet Date: Fri, 5 Jun 2026 21:30:12 +0530 Subject: [PATCH 06/10] changed ratelimit to doloadbalancer Signed-off-by: nXtCyberNet --- pkg/kthena-router/router/router.go | 99 ++++++++++++++++++++---------- 1 file changed, 68 insertions(+), 31 deletions(-) diff --git a/pkg/kthena-router/router/router.go b/pkg/kthena-router/router/router.go index 35beaa6e0..74adff477 100644 --- a/pkg/kthena-router/router/router.go +++ b/pkg/kthena-router/router/router.go @@ -283,36 +283,6 @@ func (r *Router) HandlerFunc() gin.HandlerFunc { // Get gateway key and match ModelRoute to extract route key for rate limiting gatewayKey := c.GetString(GatewayKey) - rateLimitKey := gatewayKey - - // Apply rate limiting using the route key - if err := r.loadRateLimiter.RateLimit(rateLimitKey, promptStr); err != nil { - var errorMsg string - var errorType string - var tokenType string - switch err.(type) { - case *ratelimit.InputRateLimitExceededError: - errorMsg = "input token rate limit exceeded" - errorType = "input_rate_limit" - tokenType = metrics.LimitTypeInputTokens - case *ratelimit.OutputRateLimitExceededError: - errorMsg = "output token rate limit exceeded" - errorType = "output_rate_limit" - tokenType = metrics.LimitTypeOutputTokens - default: - errorMsg = "token usage exceeds rate limit" - errorType = "rate_limit" - tokenType = metrics.LimitTypeRequests - } - accesslog.SetError(c, errorType, errorMsg) - - // Record rate limit exceeded - metricsRecorder.RecordRateLimitExceeded(tokenType) - c.AbortWithStatusJSON(http.StatusTooManyRequests, errorMsg) - c.Set("finishReason", "rate_limit") - return - } - requestID := uuid.New().String() if c.Request.Header.Get("x-request-id") == "" { c.Request.Header.Set("x-request-id", requestID) @@ -331,7 +301,7 @@ func (r *Router) HandlerFunc() gin.HandlerFunc { if err := r.handleFairnessScheduling(c, modelRequest, requestID, modelName); err != nil { accesslog.SetError(c, "scheduling", err.Error()) c.Set("finishReason", "scheduling") - return + return } } } @@ -365,6 +335,20 @@ func (r *Router) doLoadbalance(c *gin.Context, modelRequest ModelRequest) { accesslog.SetError(c, "model_server_matching", fmt.Sprintf("can't find corresponding model server: %v", err)) } + // Apply rate limiting NOW that we have modelRoute + if prompt, exists := c.Get(PromptKey); exists { + if p, ok := prompt.(*common.ChatMessage); ok { + promptStr := utils.GetPromptString(p) + if metricsRecorder, exists := c.Get("metricsRecorder"); exists { + if rec, ok := metricsRecorder.(*metrics.RequestMetricsRecorder); ok { + if err := r.applyRateLimit(c, modelRoute, modelName, promptStr, rec); err != nil { + return // Response already written by applyRateLimit + } + } + } + } + } + if err == nil && strings.HasPrefix(c.Request.URL.Path, "/v1/") { // Regular ModelServer request // step 3: Find pods and model server details @@ -1194,3 +1178,56 @@ func (r *Router) handleFairnessScheduling(c *gin.Context, modelRequest ModelRequ return fmt.Errorf("client disconnected while waiting in fairness queue") } } + +// applyRateLimit enforces rate limiting based on route identity +// For ModelRoute paths: uses namespace/routeName (route-scoped) +// For HTTPRoute paths: uses modelName (model-scoped) +func (r *Router) applyRateLimit( + c *gin.Context, + modelRoute *v1alpha1.ModelRoute, + modelName string, + promptStr string, + metricsRecorder *metrics.RequestMetricsRecorder, +) error { + // Determine rate limit key + var rateLimitKey string + + if modelRoute != nil { + // ModelRoute path: use route-scoped limit (fixes isolation bug) + rateLimitKey = fmt.Sprintf("%s/%s", modelRoute.Namespace, modelRoute.Name) + } else { + // HTTPRoute or fallback: use model-scoped + rateLimitKey = modelName + } + + // Apply rate limiting + if err := r.loadRateLimiter.RateLimit(rateLimitKey, promptStr); err != nil { + var errorMsg string + var errorType string + var tokenType string + + switch err.(type) { + case *ratelimit.InputRateLimitExceededError: + errorMsg = "input token rate limit exceeded" + errorType = "input_rate_limit" + tokenType = metrics.LimitTypeInputTokens + case *ratelimit.OutputRateLimitExceededError: + errorMsg = "output token rate limit exceeded" + errorType = "output_rate_limit" + tokenType = metrics.LimitTypeOutputTokens + default: + errorMsg = "token usage exceeds rate limit" + errorType = "rate_limit" + tokenType = metrics.LimitTypeRequests + } + + accesslog.SetError(c, errorType, errorMsg) + metricsRecorder.RecordRateLimitExceeded(tokenType) + c.AbortWithStatusJSON(http.StatusTooManyRequests, errorMsg) + c.Set("finishReason", "rate_limit") + + return err + } + + return nil +} \ No newline at end of file From 3613a527c0e82eba13ada1fdf3deba9c547521f2 Mon Sep 17 00:00:00 2001 From: nXtCyberNet Date: Fri, 5 Jun 2026 21:34:54 +0530 Subject: [PATCH 07/10] ssolved ci Signed-off-by: nXtCyberNet --- pkg/kthena-router/router/router.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/pkg/kthena-router/router/router.go b/pkg/kthena-router/router/router.go index 74adff477..23c36b164 100644 --- a/pkg/kthena-router/router/router.go +++ b/pkg/kthena-router/router/router.go @@ -280,9 +280,6 @@ func (r *Router) HandlerFunc() gin.HandlerFunc { // Record input tokens immediately metricsRecorder.RecordInputTokens(inputTokens) - // Get gateway key and match ModelRoute to extract route key for rate limiting - gatewayKey := c.GetString(GatewayKey) - requestID := uuid.New().String() if c.Request.Header.Get("x-request-id") == "" { c.Request.Header.Set("x-request-id", requestID) From f2ba014f452dfd040ecea754ea69c99bb1e79428 Mon Sep 17 00:00:00 2001 From: nXtCyberNet Date: Sat, 6 Jun 2026 11:05:30 +0530 Subject: [PATCH 08/10] resolve conflict Signed-off-by: nXtCyberNet --- pkg/kthena-router/router/router.go | 28 +++++++++++----------------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/pkg/kthena-router/router/router.go b/pkg/kthena-router/router/router.go index 23c36b164..cc171b653 100644 --- a/pkg/kthena-router/router/router.go +++ b/pkg/kthena-router/router/router.go @@ -298,7 +298,7 @@ func (r *Router) HandlerFunc() gin.HandlerFunc { if err := r.handleFairnessScheduling(c, modelRequest, requestID, modelName); err != nil { accesslog.SetError(c, "scheduling", err.Error()) c.Set("finishReason", "scheduling") - return + return } } } @@ -339,13 +339,13 @@ func (r *Router) doLoadbalance(c *gin.Context, modelRequest ModelRequest) { if metricsRecorder, exists := c.Get("metricsRecorder"); exists { if rec, ok := metricsRecorder.(*metrics.RequestMetricsRecorder); ok { if err := r.applyRateLimit(c, modelRoute, modelName, promptStr, rec); err != nil { - return // Response already written by applyRateLimit + return // Response already written by applyRateLimit } } } } } - + if err == nil && strings.HasPrefix(c.Request.URL.Path, "/v1/") { // Regular ModelServer request // step 3: Find pods and model server details @@ -1179,16 +1179,10 @@ func (r *Router) handleFairnessScheduling(c *gin.Context, modelRequest ModelRequ // applyRateLimit enforces rate limiting based on route identity // For ModelRoute paths: uses namespace/routeName (route-scoped) // For HTTPRoute paths: uses modelName (model-scoped) -func (r *Router) applyRateLimit( - c *gin.Context, - modelRoute *v1alpha1.ModelRoute, - modelName string, - promptStr string, - metricsRecorder *metrics.RequestMetricsRecorder, -) error { +func (r *Router) applyRateLimit(c *gin.Context, modelRoute *v1alpha1.ModelRoute, modelName string, promptStr string, metricsRecorder *metrics.RequestMetricsRecorder) error { // Determine rate limit key var rateLimitKey string - + if modelRoute != nil { // ModelRoute path: use route-scoped limit (fixes isolation bug) rateLimitKey = fmt.Sprintf("%s/%s", modelRoute.Namespace, modelRoute.Name) @@ -1196,13 +1190,13 @@ func (r *Router) applyRateLimit( // HTTPRoute or fallback: use model-scoped rateLimitKey = modelName } - + // Apply rate limiting if err := r.loadRateLimiter.RateLimit(rateLimitKey, promptStr); err != nil { var errorMsg string var errorType string var tokenType string - + switch err.(type) { case *ratelimit.InputRateLimitExceededError: errorMsg = "input token rate limit exceeded" @@ -1217,14 +1211,14 @@ func (r *Router) applyRateLimit( errorType = "rate_limit" tokenType = metrics.LimitTypeRequests } - + accesslog.SetError(c, errorType, errorMsg) metricsRecorder.RecordRateLimitExceeded(tokenType) c.AbortWithStatusJSON(http.StatusTooManyRequests, errorMsg) c.Set("finishReason", "rate_limit") - + return err } - + return nil -} \ No newline at end of file +} From f86bb4d715e335860c463e887a59827d50ee1a48 Mon Sep 17 00:00:00 2001 From: nXtCyberNet Date: Fri, 12 Jun 2026 11:26:22 +0530 Subject: [PATCH 09/10] remove the test Signed-off-by: nXtCyberNet --- .../filters/ratelimit/global_test.go | 54 ------------------- 1 file changed, 54 deletions(-) diff --git a/pkg/kthena-router/filters/ratelimit/global_test.go b/pkg/kthena-router/filters/ratelimit/global_test.go index 9f079ad7a..d6ccae972 100644 --- a/pkg/kthena-router/filters/ratelimit/global_test.go +++ b/pkg/kthena-router/filters/ratelimit/global_test.go @@ -214,60 +214,6 @@ func TestTokenRateLimiter_RedisConnectionFailure(t *testing.T) { assert.Contains(t, err.Error(), "failed to connect to redis") } -func TestTokenRateLimiter_Isolation(t *testing.T) { - mr, redisConfig := setupMiniRedis(t) - defer mr.Close() - - rl := NewTokenRateLimiter() - keyOne := "namespace-a/modelroute-1" - keyTwo := "namespace-a/modelroute-2" - prompt := "hello world" // Should be ~3 tokens - tokensOne := uint32(3) - tokensTwo := uint32(10) - unit := networkingv1alpha1.Second - - configOne := &networkingv1alpha1.RateLimit{ - InputTokensPerUnit: &tokensOne, - Unit: unit, - Global: &networkingv1alpha1.GlobalRateLimit{ - Redis: redisConfig, - }, - } - configTwo := &networkingv1alpha1.RateLimit{ - InputTokensPerUnit: &tokensTwo, - Unit: unit, - Global: &networkingv1alpha1.GlobalRateLimit{ - Redis: redisConfig, - }, - } - - require.NoError(t, rl.AddOrUpdateLimiter(keyOne, configOne)) - require.NoError(t, rl.AddOrUpdateLimiter(keyTwo, configTwo)) - - // Each limiter should write to a different Redis key even though the namespace is the same. - redisKeyOne := "kthena:ratelimit:namespace-a/modelroute-1:input" - redisKeyTwo := "kthena:ratelimit:namespace-a/modelroute-2:input" - - err := rl.RateLimit(keyOne, prompt) - require.NoError(t, err) - assert.True(t, mr.Exists(redisKeyOne), "Redis key should exist for the first route") - - err = rl.RateLimit(keyTwo, prompt) - require.NoError(t, err) - assert.True(t, mr.Exists(redisKeyTwo), "Redis key should exist for the second route") - - // Exhaust the first route only. - err = rl.RateLimit(keyOne, prompt) - assert.Error(t, err) - assert.IsType(t, &InputRateLimitExceededError{}, err) - - // The second route should still have capacity because it uses an isolated key. - err = rl.RateLimit(keyTwo, prompt) - require.NoError(t, err) - err = rl.RateLimit(keyTwo, prompt) - require.NoError(t, err) -} - // newTestGlobalRateLimiter creates a GlobalRateLimiter directly with a miniredis-backed client. func newTestGlobalRateLimiter(t *testing.T, mr *miniredis.Miniredis, modelName, tokenType string, limit uint32, unit networkingv1alpha1.RateLimitUnit) *GlobalRateLimiter { client := redis.NewClient(&redis.Options{ From 3d7df8d44eb913e5066aa15eb3eeb2df323294db Mon Sep 17 00:00:00 2001 From: nXtCyberNet Date: Thu, 18 Jun 2026 21:54:43 +0530 Subject: [PATCH 10/10] . Signed-off-by: nXtCyberNet --- pkg/kthena-router/router/router.go | 142 ++++++++++++++++------------- 1 file changed, 78 insertions(+), 64 deletions(-) diff --git a/pkg/kthena-router/router/router.go b/pkg/kthena-router/router/router.go index cc171b653..d1f9bb7d4 100644 --- a/pkg/kthena-router/router/router.go +++ b/pkg/kthena-router/router/router.go @@ -234,6 +234,28 @@ func (r *Router) HandlerFunc() gin.HandlerFunc { // Store model name in context for metrics middleware c.Set("model", modelName) + // Get gateway key from context if available (set by Gateway listener) + var gatewayKey string + if key, exists := c.Get(GatewayKey); exists { + if k, ok := key.(string); ok { + gatewayKey = k + } + } + if gatewayKey != "" { + accesslog.SetGatewayAPIInfo(c, gatewayKey, "", "") + } + + // Early route matching + matchedModelServerName, matchedIsLora, matchedModelRoute, matchedMatchError := r.store.MatchModelServer(modelName, c.Request, gatewayKey) + c.Set("matchedModelServerName", matchedModelServerName) + c.Set("matchedIsLora", matchedIsLora) + if matchedModelRoute != nil { + c.Set("matchedModelRoute", matchedModelRoute) + } + if matchedMatchError != nil { + c.Set("matchedMatchError", matchedMatchError) + } + // Create metrics recorder for this request path := c.Request.URL.Path metricsRecorder := metrics.NewRequestMetricsRecorder(r.metrics, modelName, path) @@ -280,12 +302,50 @@ func (r *Router) HandlerFunc() gin.HandlerFunc { // Record input tokens immediately metricsRecorder.RecordInputTokens(inputTokens) + // Determine rate limit key + var rateLimitKey string + + if matchedModelRoute != nil { + rateLimitKey = fmt.Sprintf("%s/%s", matchedModelRoute.Namespace, matchedModelRoute.Name) + } else { + // HTTPRoute or fallback: use model-scoped + rateLimitKey = modelName + } + + // Apply rate limiting using the unified rate limiter + if err := r.loadRateLimiter.RateLimit(rateLimitKey, promptStr); err != nil { + var errorMsg string + var errorType string + var tokenType string + switch err.(type) { + case *ratelimit.InputRateLimitExceededError: + errorMsg = "input token rate limit exceeded" + errorType = "input_rate_limit" + tokenType = metrics.LimitTypeInputTokens + case *ratelimit.OutputRateLimitExceededError: + errorMsg = "output token rate limit exceeded" + errorType = "output_rate_limit" + tokenType = metrics.LimitTypeOutputTokens + default: + errorMsg = "token usage exceeds rate limit" + errorType = "rate_limit" + tokenType = metrics.LimitTypeRequests + } + accesslog.SetError(c, errorType, errorMsg) + + // Record rate limit exceeded + metricsRecorder.RecordRateLimitExceeded(tokenType) + c.AbortWithStatusJSON(http.StatusTooManyRequests, errorMsg) + c.Set("finishReason", "rate_limit") + return + } + requestID := uuid.New().String() if c.Request.Header.Get("x-request-id") == "" { c.Request.Header.Set("x-request-id", requestID) } - // Store metrics recorder in context for use in other functions + // Store metrics recorder in context for use in other functions c.Set("metricsRecorder", metricsRecorder) // step 3.1: load balancing @@ -326,24 +386,25 @@ func (r *Router) doLoadbalance(c *gin.Context, modelRequest ModelRequest) { var isLora bool var err error - // Try to match ModelRoute first - modelServerName, isLora, modelRoute, err = r.store.MatchModelServer(modelName, c.Request, gatewayKey) - if err != nil { - accesslog.SetError(c, "model_server_matching", fmt.Sprintf("can't find corresponding model server: %v", err)) + // Retrieve cached ModelRoute matching results from the context + if cachedServerName, exists := c.Get("matchedModelServerName"); exists { + modelServerName = cachedServerName.(types.NamespacedName) + if cachedIsLora, ok := c.Get("matchedIsLora"); ok { + isLora = cachedIsLora.(bool) + } + if cachedRoute, ok := c.Get("matchedModelRoute"); ok { + modelRoute = cachedRoute.(*v1alpha1.ModelRoute) + } + if cachedErr, ok := c.Get("matchedMatchError"); ok { + err = cachedErr.(error) + } + } else { + // Fallback to match if not cached + modelServerName, isLora, modelRoute, err = r.store.MatchModelServer(modelName, c.Request, gatewayKey) } - // Apply rate limiting NOW that we have modelRoute - if prompt, exists := c.Get(PromptKey); exists { - if p, ok := prompt.(*common.ChatMessage); ok { - promptStr := utils.GetPromptString(p) - if metricsRecorder, exists := c.Get("metricsRecorder"); exists { - if rec, ok := metricsRecorder.(*metrics.RequestMetricsRecorder); ok { - if err := r.applyRateLimit(c, modelRoute, modelName, promptStr, rec); err != nil { - return // Response already written by applyRateLimit - } - } - } - } + if err != nil { + accesslog.SetError(c, "model_server_matching", fmt.Sprintf("can't find corresponding model server: %v", err)) } if err == nil && strings.HasPrefix(c.Request.URL.Path, "/v1/") { @@ -1175,50 +1236,3 @@ func (r *Router) handleFairnessScheduling(c *gin.Context, modelRequest ModelRequ return fmt.Errorf("client disconnected while waiting in fairness queue") } } - -// applyRateLimit enforces rate limiting based on route identity -// For ModelRoute paths: uses namespace/routeName (route-scoped) -// For HTTPRoute paths: uses modelName (model-scoped) -func (r *Router) applyRateLimit(c *gin.Context, modelRoute *v1alpha1.ModelRoute, modelName string, promptStr string, metricsRecorder *metrics.RequestMetricsRecorder) error { - // Determine rate limit key - var rateLimitKey string - - if modelRoute != nil { - // ModelRoute path: use route-scoped limit (fixes isolation bug) - rateLimitKey = fmt.Sprintf("%s/%s", modelRoute.Namespace, modelRoute.Name) - } else { - // HTTPRoute or fallback: use model-scoped - rateLimitKey = modelName - } - - // Apply rate limiting - if err := r.loadRateLimiter.RateLimit(rateLimitKey, promptStr); err != nil { - var errorMsg string - var errorType string - var tokenType string - - switch err.(type) { - case *ratelimit.InputRateLimitExceededError: - errorMsg = "input token rate limit exceeded" - errorType = "input_rate_limit" - tokenType = metrics.LimitTypeInputTokens - case *ratelimit.OutputRateLimitExceededError: - errorMsg = "output token rate limit exceeded" - errorType = "output_rate_limit" - tokenType = metrics.LimitTypeOutputTokens - default: - errorMsg = "token usage exceeds rate limit" - errorType = "rate_limit" - tokenType = metrics.LimitTypeRequests - } - - accesslog.SetError(c, errorType, errorMsg) - metricsRecorder.RecordRateLimitExceeded(tokenType) - c.AbortWithStatusJSON(http.StatusTooManyRequests, errorMsg) - c.Set("finishReason", "rate_limit") - - return err - } - - return nil -}