Skip to content

session boost queue to optimize multi conversation scenario#1183

Open
YaoZengzeng wants to merge 9 commits into
volcano-sh:mainfrom
YaoZengzeng:queue-back
Open

session boost queue to optimize multi conversation scenario#1183
YaoZengzeng wants to merge 9 commits into
volcano-sh:mainfrom
YaoZengzeng:queue-back

Conversation

@YaoZengzeng

Copy link
Copy Markdown
Member

What type of PR is this?

/kind enhancement

What this PR does / why we need it:

Which issue(s) this PR fixes:
Fixes #

Special notes for your reviewer:

Does this PR introduce a user-facing change?:


Signed-off-by: YaoZengzeng <yaozengzeng@huawei.com>
Copilot AI review requested due to automatic review settings June 4, 2026 03:07
@volcano-sh-bot volcano-sh-bot added the kind/enhancement New feature or request label Jun 4, 2026
@volcano-sh-bot

Copy link
Copy Markdown
Contributor

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please ask for approval from yaozengzeng. For more information see the Kubernetes Code Review Process.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

Adds a standalone “session boost” request queue and introduces speculative pre-incrementing of in-flight counters to reduce scheduling TOCTOU, alongside backpressure-aware dequeue behavior and new metrics/docs.

Changes:

  • Add a session boost queue (standalone from fairness scheduling), wired through router/store with new Prometheus metrics and docs.
  • Add backpressure-aware dequeue and inflight-per-pod gating to fairness scheduling queues (plus tests).
  • Add speculative pre-increment of per-pod in-flight request counters to narrow the scoring→dispatch race window.

Reviewed changes

Copilot reviewed 13 out of 13 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
pkg/kthena-router/scheduler/scheduler_impl.go Speculatively increments in-flight counters for top candidates (including PD mode).
pkg/kthena-router/scheduler/framework/interface.go Extends scheduling context with CorrelationID + pre-increment bookkeeping.
pkg/kthena-router/router/router.go Adds session-boost scheduling path, correlation extraction, and avoids double in-flight increments.
pkg/kthena-router/metrics/metrics.go Introduces Prometheus metrics for session boost queue size/duration/cancels/dequeues/inflight.
pkg/kthena-router/debug/handlers_test.go Extends MockStore to satisfy new Store interface methods.
pkg/kthena-router/datastore/store.go Adds session boost queue plumbing + backend waiting checker + changes default scrape interval.
pkg/kthena-router/datastore/session_boost_queue.go New queue implementation (boosting + grace period + backpressure + inflight control).
pkg/kthena-router/datastore/session_boost_queue_test.go New unit tests for session boost queue behavior.
pkg/kthena-router/datastore/fairness_queue.go Adds backpressure mode, inflight gating, session tracker structs, and related APIs.
pkg/kthena-router/datastore/fairness_queue_test.go Adds tests for SessionTracker and new backpressure dequeue behavior.
docs/proposal/session-boost-queue.md Proposal doc describing session boost queue design and configuration.
docs/kthena/sidebars.ts Adds session boost guide to user guide navigation.
docs/kthena/docs/user-guide/session-boost.md User guide explaining how to enable/configure/verify session boost.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +493 to +501
func createSessionBoostQueueConfigFromEnv() *SessionBoostQueueConfig {
v := os.Getenv("SESSION_BOOST_ENABLED")
if v == "" {
return nil
}
enabled, err := strconv.ParseBool(v)
if err != nil || !enabled {
return nil
}
Comment on lines +1173 to 1182
var userId string
if userIdVal, ok := c.Get(common.UserIdKey); ok {
if s, ok := userIdVal.(string); ok {
userId = s
}
}
if userId == "" {
klog.Warningf("user ID not found in request %s", requestID)
c.AbortWithStatusJSON(http.StatusBadRequest, "missing userId in request body")
return fmt.Errorf("missing userId in request body")
}
Comment on lines 455 to 461
ctx := &framework.Context{
Model: modelName,
Prompt: prompt,
CorrelationID: c.Request.Header.Get("X-Correlation-ID"),
ModelServerName: modelServerName,
PDGroup: pdGroup,
MetricsRecorder: metricsRecorder,
Comment thread pkg/kthena-router/datastore/store.go Outdated
const (
// defaultMetricsScrapeInterval is the default polling interval for pod metrics.
defaultMetricsScrapeInterval = 1 * time.Second
defaultMetricsScrapeInterval = 50 * time.Millisecond

// Backpressure-aware dequeue (Phase 2)
sem chan struct{} // Semaphore for capacity-based admission; nil means QPS mode
sem chan struct{} // Semaphore for capacity-based admission; nil means backpressure mode
Comment on lines +698 to +700
if !hasCapacity {
klog.Infof("[BackendWaitingChecker] all %d pods busy, totalWaiting=%.0f", podCount, totalWaiting)
}

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a Session Boost Queue to prioritize follow-up requests in multi-turn conversations, maximizing prefix cache hit rates on LLM backends. It also implements speculative pre-increment of on-flight counters in the scheduler to prevent TOCTOU race windows. The review feedback highlights critical thread-safety issues and potential nil pointer dereferences when accessing pod fields directly, recommending thread-safe helper methods. Additionally, the reviewer notes a potential on-flight counter leak on early proxy failures, a select-statement race condition that could reject successfully dequeued requests, and warns against the performance overhead of the newly reduced 50ms default metrics scrape interval.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +187 to +197
for i := range ctx.DecodePods {
if ctx.PrefillPods[i] != nil && ctx.DecodePods[i] != nil {
s.store.IncrPodOnFlightRequests(types.NamespacedName{
Namespace: ctx.PrefillPods[i].Pod.Namespace, Name: ctx.PrefillPods[i].Pod.Name})
s.store.IncrPodOnFlightRequests(types.NamespacedName{
Namespace: ctx.DecodePods[i].Pod.Namespace, Name: ctx.DecodePods[i].Pod.Name})
ctx.PreIncremented = true
ctx.PreIncrementedIdx = i
break
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Accessing ctx.PrefillPods[i].Pod.Namespace and ctx.DecodePods[i].Pod.Namespace directly is not thread-safe and can race with concurrent updates to p.Pod in UpdatePod (which is called under a write lock). Additionally, if Pod is nil (e.g., in unit tests or uninitialized states), this will cause a nil pointer dereference panic. Use the thread-safe GetPodNamespacedName() helper method instead, which handles nil checks and acquires a read lock.

Suggested change
for i := range ctx.DecodePods {
if ctx.PrefillPods[i] != nil && ctx.DecodePods[i] != nil {
s.store.IncrPodOnFlightRequests(types.NamespacedName{
Namespace: ctx.PrefillPods[i].Pod.Namespace, Name: ctx.PrefillPods[i].Pod.Name})
s.store.IncrPodOnFlightRequests(types.NamespacedName{
Namespace: ctx.DecodePods[i].Pod.Namespace, Name: ctx.DecodePods[i].Pod.Name})
ctx.PreIncremented = true
ctx.PreIncrementedIdx = i
break
}
}
for i := range ctx.DecodePods {
if ctx.PrefillPods[i] != nil && ctx.DecodePods[i] != nil {
prefillName := ctx.PrefillPods[i].GetPodNamespacedName()
decodeName := ctx.DecodePods[i].GetPodNamespacedName()
if prefillName.Name != "" && decodeName.Name != "" {
s.store.IncrPodOnFlightRequests(prefillName)
s.store.IncrPodOnFlightRequests(decodeName)
ctx.PreIncremented = true
ctx.PreIncrementedIdx = i
break
}
}
}

Comment on lines +209 to +215
if len(ctx.BestPods) > 0 {
pod := ctx.BestPods[0]
podName := types.NamespacedName{Namespace: pod.Pod.Namespace, Name: pod.Pod.Name}
s.store.IncrPodOnFlightRequests(podName)
ctx.PreIncremented = true
ctx.PreIncrementedIdx = 0
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Accessing pod.Pod.Namespace directly is not thread-safe and can race with concurrent updates to p.Pod in UpdatePod. It can also cause a nil pointer dereference panic if Pod is nil. Use the thread-safe GetPodNamespacedName() helper method instead.

Suggested change
if len(ctx.BestPods) > 0 {
pod := ctx.BestPods[0]
podName := types.NamespacedName{Namespace: pod.Pod.Namespace, Name: pod.Pod.Name}
s.store.IncrPodOnFlightRequests(podName)
ctx.PreIncremented = true
ctx.PreIncrementedIdx = 0
}
if len(ctx.BestPods) > 0 {
pod := ctx.BestPods[0]
podName := pod.GetPodNamespacedName()
if podName.Name != "" {
s.store.IncrPodOnFlightRequests(podName)
ctx.PreIncremented = true
ctx.PreIncrementedIdx = 0
}
}

Comment on lines +687 to +692
totalWaiting += podInfo.RequestWaitingNum
if podInfo.RequestWaitingNum == 0 {
hasCapacity = true
return false // stop iterating, found a pod with capacity
}
return true

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Accessing podInfo.RequestWaitingNum directly without holding any lock is a data race, as it is concurrently updated under a lock in updateGaugeMetricsInfo. Use the thread-safe podInfo.GetRequestWaitingNum() getter method instead, which correctly acquires a read lock.

Suggested change
totalWaiting += podInfo.RequestWaitingNum
if podInfo.RequestWaitingNum == 0 {
hasCapacity = true
return false // stop iterating, found a pod with capacity
}
return true
waitingNum := podInfo.GetRequestWaitingNum()
totalWaiting += waitingNum
if waitingNum == 0 {
hasCapacity = true
return false // stop iterating, found a pod with capacity
}

Comment on lines +764 to +766
if !(ctx.PreIncremented && i == ctx.PreIncrementedIdx) {
r.store.IncrPodOnFlightRequests(podName)
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There is a potential on-flight counter leak if proxy fails to read the request body (e.g., io.ReadAll(req.Body) returns an error) before entering the loop. Since the speculative pre-increment was already performed in Schedule, but the loop is never entered, DecrPodOnFlightRequests is never called, leading to a permanent counter leak. Ensure that any early return path in proxy (or proxyModelEndpoint before calling proxyToPDDisaggregated) decrements the speculatively pre-incremented counters if ctx.PreIncremented is true.

Comment on lines +1294 to +1307
select {
case <-queueReq.NotifyChan:
if queueReq.Release != nil {
defer queueReq.Release()
}
klog.V(4).Infof("[SessionBoost] request dequeued: reqID=%s user=%s model=%s sessionBoost=%v waitTime=%v",
requestID, userId, modelName, queueReq.SessionBoost, time.Since(queueReq.RequestTime))
r.doLoadbalance(c, modelRequest)

if correlationID != "" {
r.store.MarkSessionCompleted(modelName, correlationID)
}
return nil
case <-reqCtx.Done():

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

When both queueReq.NotifyChan and reqCtx.Done() are ready, the select statement chooses one pseudo-randomly. This can cause a successfully dequeued request to be rejected with a gateway timeout or client disconnected error. To prioritize processing successfully dequeued requests, perform a non-blocking check of NotifyChan first.

	// Prioritize processing successfully dequeued requests over context cancellation/timeout
	select {
	case <-queueReq.NotifyChan:
		if queueReq.Release != nil {
			defer queueReq.Release()
		}
		klog.V(4).Infof("[SessionBoost] request dequeued: reqID=%s user=%s model=%s sessionBoost=%v waitTime=%v",
			requestID, userId, modelName, queueReq.SessionBoost, time.Since(queueReq.RequestTime))
		r.doLoadbalance(c, modelRequest)

		if correlationID != "" {
			r.store.MarkSessionCompleted(modelName, correlationID)
		}
		return nil
	default:
	}

	select {
	case <-queueReq.NotifyChan:
		if queueReq.Release != nil {
			defer queueReq.Release()
		}
		klog.V(4).Infof("[SessionBoost] request dequeued: reqID=%s user=%s model=%s sessionBoost=%v waitTime=%v",
			requestID, userId, modelName, queueReq.SessionBoost, time.Since(queueReq.RequestTime))
		r.doLoadbalance(c, modelRequest)

		if correlationID != "" {
			r.store.MarkSessionCompleted(modelName, correlationID)
		}
		return nil
	case <-reqCtx.Done():

Comment thread pkg/kthena-router/datastore/store.go Outdated
const (
// defaultMetricsScrapeInterval is the default polling interval for pod metrics.
defaultMetricsScrapeInterval = 1 * time.Second
defaultMetricsScrapeInterval = 50 * time.Millisecond

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The default metrics scrape interval has been reduced from 1 * time.Second to 50 * time.Millisecond. Polling backend pod metrics every 50ms is extremely aggressive and can cause significant CPU and network overhead on both the router and the backend model servers (e.g., vLLM/SGLang), potentially degrading inference performance. Consider keeping the default at a more conservative value (e.g., 500ms or 1s) and allowing users to tune it via the METRICS_SCRAPE_INTERVAL environment variable if needed.

Suggested change
defaultMetricsScrapeInterval = 50 * time.Millisecond
defaultMetricsScrapeInterval = 1 * time.Second

@FAUST-BENCHOU FAUST-BENCHOU left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems duplicate with #873
maybe we need to delete the docs/proposal/infer-gateway-session-sticky.md .As i used to disscuss it with @hzxuzhonghu we think the old porposal can be replaced by kvcache-aware plugin

Comment thread docs/proposal/session-boost-queue.md Outdated
---
title: Session Boost Queue for Multi-Turn Conversation Prefix Cache Optimization
authors:
- "@kthena-contributors"

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, maybe real id is better

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hhh, you know, AI genereated :)

Comment thread pkg/kthena-router/datastore/store.go Outdated
const (
// defaultMetricsScrapeInterval is the default polling interval for pod metrics.
defaultMetricsScrapeInterval = 1 * time.Second
defaultMetricsScrapeInterval = 50 * time.Millisecond

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super fast than before

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on my test, This frequency could take effect for metric based score plugin, like gpu-usage. Otherwise, the load cannot be accurately reflected.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh ya that make sense! when i try to make e2e test in #1179 to test plugin i found these metrics‘s change frequency was very low, making it difficult to test, so I extended the test time and request volume. This modification should improve things considerably.

@YaoZengzeng

Copy link
Copy Markdown
Member Author

seems duplicate with #873 maybe we need to delete the docs/proposal/infer-gateway-session-sticky.md .As i used to disscuss it with @hzxuzhonghu we think the old porposal can be replaced by kvcache-aware plugin

Different actually. The key of this proposal is a priority queue before scheduling to priotirize the request of the same session and flow control of the deque order to fully utilize the warm prefix cache.

The session affinity is just a score plugin. Although it could be better if the queue in this proposal could work together with the session affinity in score plugin.

Signed-off-by: YaoZengzeng <yaozengzeng@huawei.com>
Signed-off-by: YaoZengzeng <yaozengzeng@huawei.com>
Copilot AI review requested due to automatic review settings June 4, 2026 11:12

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 13 out of 13 changed files in this pull request and generated 8 comments.

Comment on lines +493 to +501
func createSessionBoostQueueConfigFromEnv() *SessionBoostQueueConfig {
v := os.Getenv("SESSION_BOOST_ENABLED")
if v == "" {
return nil
}
enabled, err := strconv.ParseBool(v)
if err != nil || !enabled {
return nil
}
Comment on lines 455 to 459
ctx := &framework.Context{
Model: modelName,
Prompt: prompt,
CorrelationID: c.Request.Header.Get("X-Correlation-ID"),
ModelServerName: modelServerName,
Comment on lines +185 to +198
// Speculative pre-increment for PD mode: bump the on-flight counters
// for the first valid prefill-decode pair so concurrent Schedule()
// calls immediately see the updated load.
for i := range ctx.DecodePods {
if ctx.PrefillPods[i] != nil && ctx.DecodePods[i] != nil {
s.store.IncrPodOnFlightRequests(types.NamespacedName{
Namespace: ctx.PrefillPods[i].Pod.Namespace, Name: ctx.PrefillPods[i].Pod.Name})
s.store.IncrPodOnFlightRequests(types.NamespacedName{
Namespace: ctx.DecodePods[i].Pod.Namespace, Name: ctx.DecodePods[i].Pod.Name})
ctx.PreIncremented = true
ctx.PreIncrementedIdx = i
break
}
}
Comment on lines +207 to +216
// Speculative pre-increment: immediately bump the on-flight counter for the
// top candidate so that the next concurrent Schedule() call sees the updated
// load, eliminating the TOCTOU race window between scoring and dispatching.
if len(ctx.BestPods) > 0 {
pod := ctx.BestPods[0]
podName := types.NamespacedName{Namespace: pod.Pod.Namespace, Name: pod.Pod.Name}
s.store.IncrPodOnFlightRequests(podName)
ctx.PreIncremented = true
ctx.PreIncrementedIdx = 0
}
Comment thread pkg/kthena-router/datastore/store.go Outdated
const (
// defaultMetricsScrapeInterval is the default polling interval for pod metrics.
defaultMetricsScrapeInterval = 1 * time.Second
defaultMetricsScrapeInterval = 50 * time.Millisecond
Comment thread pkg/kthena-router/datastore/store.go Outdated
Comment on lines +699 to +700
klog.Infof("[BackendWaitingChecker] all %d pods busy, totalWaiting=%.0f", podCount, totalWaiting)
}

| Environment Variable | Purpose | Default | Notes |
| -------------------------------- | -------------------------------------------------------------------------- | ------------------ | ----------------------------------------------------------------------------------------- |
| `ENABLE_SESSION_BOOST` | Enable the session boost queue | `false` | Global feature switch |
Comment on lines +122 to +129
// SessionTracker tracks recently completed sessions for priority boosting.
// It maps correlation IDs to their last completion time, allowing follow-up
// requests in the same session to be prioritized for prefix cache utilization.
type SessionTracker struct {
mu sync.RWMutex
sessions map[string]time.Time // correlationID -> last completion time
ttl time.Duration
}
Signed-off-by: YaoZengzeng <yaozengzeng@huawei.com>
Signed-off-by: YaoZengzeng <yaozengzeng@huawei.com>
Copilot AI review requested due to automatic review settings June 4, 2026 11:36

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 12 out of 12 changed files in this pull request and generated 7 comments.

Comment on lines +483 to +493
// createSessionBoostQueueConfigFromEnv returns a SessionBoostQueueConfig if
// SESSION_BOOST_ENABLED=true, or nil if the standalone session boost queue is disabled.
func createSessionBoostQueueConfigFromEnv() *SessionBoostQueueConfig {
v := os.Getenv("SESSION_BOOST_ENABLED")
if v == "" {
return nil
}
enabled, err := strconv.ParseBool(v)
if err != nil || !enabled {
return nil
}
Comment on lines +761 to +766
// Track this request as in-flight to the chosen pod. Skip for the
// pre-incremented candidate — the scheduler already bumped its counter
// to close the TOCTOU window between scoring and dispatching.
if !(ctx.PreIncremented && i == ctx.PreIncrementedIdx) {
r.store.IncrPodOnFlightRequests(podName)
}
Comment on lines +188 to +197
for i := range ctx.DecodePods {
if ctx.PrefillPods[i] != nil && ctx.DecodePods[i] != nil {
s.store.IncrPodOnFlightRequests(types.NamespacedName{
Namespace: ctx.PrefillPods[i].Pod.Namespace, Name: ctx.PrefillPods[i].Pod.Name})
s.store.IncrPodOnFlightRequests(types.NamespacedName{
Namespace: ctx.DecodePods[i].Pod.Namespace, Name: ctx.DecodePods[i].Pod.Name})
ctx.PreIncremented = true
ctx.PreIncrementedIdx = i
break
}
Comment thread pkg/kthena-router/datastore/store.go Outdated
const (
// defaultMetricsScrapeInterval is the default polling interval for pod metrics.
defaultMetricsScrapeInterval = 1 * time.Second
defaultMetricsScrapeInterval = 50 * time.Millisecond
Comment on lines 455 to 461
ctx := &framework.Context{
Model: modelName,
Prompt: prompt,
CorrelationID: c.Request.Header.Get("X-Correlation-ID"),
ModelServerName: modelServerName,
PDGroup: pdGroup,
MetricsRecorder: metricsRecorder,
Comment on lines +347 to +353
func (q *SessionBoostQueue) runSessionCleanup(ctx context.Context) {
cleanupInterval := q.config.SessionBoostTTL
if cleanupInterval < 10*time.Second {
cleanupInterval = 10 * time.Second
}
ticker := time.NewTicker(cleanupInterval)
defer ticker.Stop()
Comment thread pkg/kthena-router/datastore/store.go Outdated
Comment on lines +664 to +691
func (s *store) makeBackendWaitingChecker() BackendWaitingChecker {
return func() bool {
hasCapacity := false
podCount := 0
var totalWaiting float64
s.pods.Range(func(key, value any) bool {
podInfo, ok := value.(*PodInfo)
if !ok || podInfo == nil {
return true
}
podCount++
totalWaiting += podInfo.RequestWaitingNum
if podInfo.RequestWaitingNum == 0 {
hasCapacity = true
return false // stop iterating, found a pod with capacity
}
return true
})
// If no pods are registered yet, allow dequeue to avoid deadlock
if podCount == 0 {
return true
}
if !hasCapacity {
klog.Infof("[BackendWaitingChecker] all %d pods busy, totalWaiting=%.0f", podCount, totalWaiting)
}
return hasCapacity
}
}
Signed-off-by: YaoZengzeng <yaozengzeng@huawei.com>

@hzxuzhonghu hzxuzhonghu left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed proposal and the clean queue implementation (good test coverage on the queue itself, vet/build pass). My main feedback is about how general the benefit is, plus a few concrete correctness issues. Summary:

  1. Core generality concern: the headline benefit is prefix-cache reuse, which is per-pod. This PR boosts queue ordering but does not influence pod selection, and ctx.CorrelationID plumbed into the scheduler framework.Context is currently read by nobody (the referenced session-affinity plugin is not in this PR). So in the general multi-backend case the boosted follow-up can land on a cold pod and get zero prefix-cache benefit. The reordering only helps the narrow case of a single saturated backend serving sequential multi-turn traffic.

  2. Several changes are global and unrelated to session boost (scrape interval 1s->50ms for everyone; speculative on-flight pre-increment for all requests). These affect every deployment, not just ENABLE_SESSION_BOOST=true users, and should be split out and justified/tested on their own.

  3. Defaults (grace 50ms, InflightPerPod=1) optimize automated pipelines but penalize the human multi-turn case the proposal targets.

Inline comments below. Overall I'd suggest either (a) landing the routing half (session-affinity plugin that consumes CorrelationID) so the cache benefit is real, or (b) reframing the proposal around queue-ordering fairness for saturated single-pool backends, and de-scoping the unrelated global changes.


Without session boost, Turn 2 may be queued behind 10 other requests. By the time it reaches the backend, the KV cache entries for the first 1000 tokens may be evicted. With session boost, Turn 2 is prioritized immediately after Turn 1 completes, hitting the warm prefix cache and only computing attention for the new ~2000 tokens.

**Expected TTFT improvement**: For a 5000-token prompt where 3000 tokens are cached prefix, TTFT is reduced by approximately 60% (only 2000 tokens need computation vs 5000).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[important] This 60% TTFT figure (and the 50-80% claims above) only holds if the boosted follow-up is routed to the same pod that still holds the warm KV cache. Non-Goal #2 explicitly says this queue does not guarantee pod affinity, and in this PR nothing consumes the session id for routing. So for the general case (multiple backend pods, no session-sticky/prefix-cache plugin enabled) the reported benefit is not realized: boosting queue order alone does not make the cache hit. Please clarify that the cache benefit is conditional on also enabling pod-affinity routing, and ideally land that routing piece together so the proposal is self-contained.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I implemented a session affinity score plugin to compare with the effect of prefix cahce, ref: https://github.com/YaoZengzeng/kthena/blob/benchmark/benchmark-report-concurrency-40.md .

As shown in the result, the TTFT is better with session affinity score plugin, but the throughput is better with prefix cache. So prefix cache can basically achieve the same effect as session affinity.

To make the PR more focused, I removed session affinity score plugin, which can be added back in a later PR submission.


// CorrelationID is the session identifier from the X-Correlation-ID HTTP header.
// Used by session-affinity plugin to route requests from the same session to the same pod.
CorrelationID string

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[important] This comment says CorrelationID is "Used by session-affinity plugin to route requests from the same session to the same pod", but no scheduler or backend code in this PR reads ctx.CorrelationID (grep finds zero consumers). As-is this field is dead plumbing and the prefix-cache routing benefit is not delivered. Either include the consuming plugin in this PR or drop the field/comment until it exists.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remove the session affinity score plugin in the PR to make it focused. The annotation here is a remnant.

Comment thread pkg/kthena-router/router/router.go Outdated
ctx := &framework.Context{
Model: modelName,
Prompt: prompt,
CorrelationID: c.Request.Header.Get("X-Correlation-ID"),

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[blocking] Hardcoded "X-Correlation-ID" here ignores the configurable SESSION_BOOST_HEADER (honored everywhere else via GetSessionIDHeader()). If an operator sets SESSION_BOOST_HEADER=X-Session-ID, ctx.CorrelationID is always empty and any future session-affinity routing silently breaks. Use the configured header name.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

Comment thread pkg/kthena-router/datastore/store.go Outdated
const (
// defaultMetricsScrapeInterval is the default polling interval for pod metrics.
defaultMetricsScrapeInterval = 1 * time.Second
defaultMetricsScrapeInterval = 50 * time.Millisecond

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[blocking] Changing the default pod-metrics scrape interval from 1s to 50ms is a global, ungated behavior change that affects every deployment (20x more scrape traffic against backend /metrics endpoints), not just ENABLE_SESSION_BOOST users. This is bundled into a session-boost PR with no opt-out. Please split this out, justify the load impact, and ideally gate it so non-session-boost users keep the prior cadence.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

hasCapacity := false
podCount := 0
var totalWaiting float64
s.pods.Range(func(key, value any) bool {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[important] makeBackendWaitingChecker (and makePodCounter below) range over ALL pods in the store, but the SessionBoostQueue is created per-model. In a multi-model deployment, model A's backpressure gate and pod count are computed from every model's pods, so A can be admitted because model B has idle pods, flooding A's own backends. The capacity check and pod counter should be scoped to the pods backing this queue's model.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, I'll take a look.

scores := s.RunScorePlugins(pods, ctx)
ctx.BestPods = TopNPodInfos(scores, topN)

// Speculative pre-increment: immediately bump the on-flight counter for the

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[important] Two concerns. (1) Scope: this speculative on-flight pre-increment applies to all requests and is unrelated to session boost; it belongs in its own PR with dedicated tests. (2) Leak risk: Schedule() now increments BestPods[0] up front, but the matching DecrPodOnFlightRequests only runs inside proxy()'s dispatch loop. Any early return between Schedule() and that loop (e.g. the io.ReadAll(req.Body) error path in proxy, or an abort in doLoadbalance after Schedule) leaves the counter permanently incremented, making the pod look busier forever and skewing future scheduling. The previous Incr/Decr-in-the-same-loop design was self-balancing; please guarantee a Decr on every post-Schedule error path.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I'll take a look.

return SessionBoostQueueConfig{
SessionIDHeader: "X-Correlation-ID",
SessionBoostTTL: 60 * time.Second,
SessionBoostGracePeriod: 50 * time.Millisecond,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[important] These defaults optimize a narrow case. The proposal notes humans take seconds between turns, yet the 50ms grace period holds the dequeue slot idle after every completion before serving unrelated requests, adding latency to the general workload while almost never catching a human follow-up (it only helps programmatic pipelines). Combined with InflightPerPod=1 (line 139), enabling session boost effectively serializes each pool to ~1 concurrent request per pod, which is a severe throughput cap for general traffic. Consider defaulting grace to 0 and documenting that InflightPerPod=1 is throughput-limiting, or make these clearly opt-in.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Grace Period should be disabled by default. It should only take effect when user think its necessary.

This is a rather extreme optimization.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also set the default value of InflightPerPod to a big one. User need to adjust this value according to their actual situation.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should set the default to 0.

Signed-off-by: YaoZengzeng <yaozengzeng@huawei.com>

@hzxuzhonghu hzxuzhonghu left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Second-pass review, focused on the concurrency and throughput behavior of the backpressure dequeue loop (complements my earlier design/generality review; same commit). Good news first: go vet is clean, gofmt is fine, and go test -race on the SessionBoost/SessionTracker tests passes, so there are no data races in the covered paths. The goroutine lifecycles are tied to ctx, and the sync.Once-guarded Release is correct.

The two issues below are about the backpressure loop's wakeup/admission model. They matter because this feature's stated goal is to reduce TTFT, but the current loop can add admission latency and throttle throughput. Both are most visible exactly in the low-traffic, sequential multi-turn case the proposal targets. Inline.

ticker := time.NewTicker(pollInterval)
defer ticker.Stop()
for {
select {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[important] In backpressure mode this select waits on releaseCh and ticker.C but not on notifyCh, even though PushRequest signals notifyCh on every enqueue. So when the queue is idle (inflight=0, backend has capacity), a newly pushed request is not picked up until the next ticker tick, i.e. up to BackpressurePollInterval (default 100ms) of pure admission latency before scheduling even begins. For a feature whose goal is cutting TTFT, adding up to 100ms to the first turn of each conversation (and any request arriving into an idle queue) is counterproductive. Consider adding case <-q.notifyCh: here so a fresh enqueue triggers an immediate tryBackpressureDequeue, keeping the ticker only as a backstop.


// tryBackpressureDequeue attempts to dequeue one request if both the inflight limit
// and the backend capacity check pass.
func (q *SessionBoostQueue) tryBackpressureDequeue(ctx context.Context) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[important] tryBackpressureDequeue admits at most one request per invocation, and invocations are driven only by one release or one ticker tick. When maxInflight = podCount * InflightPerPod is greater than 1 (multiple pods, or InflightPerPod>1) and a burst of requests is queued against idle backends, the queue fills capacity at roughly one request per poll interval rather than filling the available slots immediately. Steady state self-feeds 1-in/1-out via releaseCh, but initial ramp and any spare capacity are underused. Consider looping here to admit while inflight < maxInflight && backendChecker() && queueLen > 0, so the loop saturates available capacity in one pass instead of waiting for the next tick.

Signed-off-by: YaoZengzeng <yaozengzeng@huawei.com>
Copilot AI review requested due to automatic review settings June 8, 2026 03:10

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 12 out of 12 changed files in this pull request and generated 7 comments.

Comment on lines +103 to +142
// SessionBoostQueueConfig holds configurable parameters for the standalone session boost queue.
type SessionBoostQueueConfig struct {
// SessionIDHeader is the HTTP header name used to identify conversation sessions.
// Configured via the SESSION_BOOST_HEADER environment variable. If not set,
// session identification is disabled.
SessionIDHeader string

// SessionBoostTTL is the duration after which a session boost expires.
// Requests from the same session that arrive within this window after the
// previous request completed will be boosted.
SessionBoostTTL time.Duration

// SessionBoostGracePeriod is the duration to wait after a release before dequeuing
// the next request in backpressure mode.
// This gives the same session time to submit a follow-up request that benefits
// from prefix cache, rather than immediately dispatching an unrelated request.
// If a session-boosted request arrives during this window, it is dequeued immediately.
// Defaults to 50ms. Set to 0 to disable the grace period.
SessionBoostGracePeriod time.Duration

// BackpressurePollInterval controls how often the backpressure checker polls
// backend pod waiting queue status. Defaults to 100ms.
BackpressurePollInterval time.Duration

// InflightPerPod is the maximum number of inflight requests allowed per backend pod.
// The total inflight limit is InflightPerPod * podCount.
// Defaults to 1.
InflightPerPod int
}

// DefaultSessionBoostQueueConfig returns default configuration for the session boost queue.
func DefaultSessionBoostQueueConfig() SessionBoostQueueConfig {
return SessionBoostQueueConfig{
SessionIDHeader: "", // Must be set via SESSION_BOOST_HEADER env var
SessionBoostTTL: 60 * time.Second,
SessionBoostGracePeriod: 0,
BackpressurePollInterval: 100 * time.Millisecond,
InflightPerPod: 16,
}
}
Comment on lines +483 to +499
// createSessionBoostQueueConfigFromEnv returns a SessionBoostQueueConfig if
// ENABLE_SESSION_BOOST=true, or nil if the standalone session boost queue is disabled.
func createSessionBoostQueueConfigFromEnv() *SessionBoostQueueConfig {
v := os.Getenv("ENABLE_SESSION_BOOST")
if v == "" {
return nil
}
enabled, err := strconv.ParseBool(v)
if err != nil || !enabled {
return nil
}

cfg := DefaultSessionBoostQueueConfig()

if v := os.Getenv("SESSION_BOOST_HEADER"); v != "" {
cfg.SessionIDHeader = v
}
Comment on lines +1196 to 1206
var pri float64
if userId != "" {
pri = r.calculateRequestPriority(userId, modelName)
}
queueReq := &datastore.Request{
ReqID: requestID,
UserID: userId,
ModelName: modelName,
SessionID: sessionID,
Priority: pri,
RequestTime: time.Now(),

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, fixed.

Comment on lines +685 to +693
// If no pods are registered yet, allow dequeue to avoid deadlock
if podCount == 0 {
return true
}
if !hasCapacity {
klog.Infof("[BackendWaitingChecker] model %s: all %d pods busy, totalWaiting=%.0f", modelName, podCount, totalWaiting)
}
return hasCapacity
}
Comment on lines +318 to +320
if req == nil || req.NotifyChan == nil {
continue
}
Comment on lines +92 to +101
ReqID string
UserID string // User ID for fairness scheduling
ModelName string // Target model for per-model fair queuing
SessionID string // Session identifier for multi-turn conversations
Priority float64 // Priority (lower value means higher priority)
SessionBoost bool // Whether this request has session priority boost (recently completed session)
RequestTime time.Time
NotifyChan chan struct{}
CancelCh <-chan struct{} // Request-scoped cancellation signal
Release func() // Set by the queue when a permit is acquired
Comment on lines +164 to +168
| Environment Variable | Default | Description |
| -------------------------------- | ------------------ | ------------------------------------------------------------------------------------------------------------------------------ |
| `ENABLE_SESSION_BOOST` | `false` | Enable session boost queue |
| `SESSION_BOOST_HEADER` | `X-Correlation-ID` | HTTP header used to identify conversation sessions |
| `SESSION_BOOST_TTL` | `60s` | Duration after which a session boost expires |
Signed-off-by: YaoZengzeng <yaozengzeng@huawei.com>
@hzxuzhonghu

Copy link
Copy Markdown
Member

Another point from me, boot queue can be seen as a special case of fairness, why not reuse the common fairness queue? My main concern is that once we have so many queues, it would influence the performance and worst case they may conflict with each other.

@YaoZengzeng

Copy link
Copy Markdown
Member Author

Another point from me, boot queue can be seen as a special case of fairness, why not reuse the common fairness queue? My main concern is that once we have so many queues, it would influence the performance and worst case they may conflict with each other.

I initially implemented it based on the fairness queue, but considering the differences between the fairness queue and the session boost queue, as well as the simplicity of configuration and the independence of the modules, I refactored it when I submitted the PR, separating the two.

@YaoZengzeng

Copy link
Copy Markdown
Member Author

/retest


Without session boost, a follow-up request may be queued behind unrelated requests. By the time it reaches the backend, the prefix cache may have been evicted, forcing a full recomputation.

When session boost is enabled, the router does the following:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need a worflow diagram here

- **Multi-turn chat applications**: ChatGPT-like interfaces where users have back-and-forth conversations with an LLM. Each turn builds on the previous conversation context.
- **Agentic workflows and RAG chains**: Automated pipelines that issue multiple sequential requests in the same session, where each request depends on the previous response.
- **Low-latency prefix cache optimization**: Workloads where minimizing TTFT is critical and the same session's requests benefit from being processed back-to-back on warm KV cache.
- **Environments without fairness requirements**: When you want prefix cache optimization without the complexity of multi-tenant fairness scheduling.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure what does this mean,

Session boost is **not** needed when:

- Your workload is single-turn (independent requests with no shared prefix).
- Requests are already routed with session sticky and no queuing contention exists.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

session sticky seems not conflict with session boost

networking:
kthenaRouter:
sessionBoost:
enabled: true

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

| -------------------------------- | -------------------------------------------------------------------------- | ------------ | ---------------------------------------------------------------------------------------------------------------------------- |
| `ENABLE_SESSION_BOOST` | Enable the session boost queue | `false` | Global feature switch |
| `SESSION_BOOST_HEADER` | HTTP header used to identify conversation sessions | *(required)* | Must match what your clients send |
| `SESSION_BOOST_TTL` | Duration after which a session's boost expires | `60s` | Longer values help slow human conversations; shorter values suit fast automated pipelines |

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does it not take effect after this? In reality, how could a user set this correctly?

And btw, how do you set it when you do benchmarking

| `SESSION_BOOST_HEADER` | HTTP header used to identify conversation sessions | *(required)* | Must match what your clients send |
| `SESSION_BOOST_TTL` | Duration after which a session's boost expires | `60s` | Longer values help slow human conversations; shorter values suit fast automated pipelines |
| `SESSION_BOOST_GRACE_PERIOD` | Wait time after a request completes for a same-session follow-up to arrive | `0` | Disabled by default. Only enable (e.g., `50ms`) if you understand the latency trade-off for non-boosted requests |
| `SESSION_BOOST_POLL_INTERVAL` | Backend capacity polling interval | `100ms` | Lower values reduce latency but increase polling load |

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

polling what? metrics?


### Grace Period

The grace period is **disabled by default** (`SESSION_BOOST_GRACE_PERIOD=0`). When disabled, the queue immediately dequeues the next request after a completion without waiting.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we design, should carefully think about how to make it understandable to both contributors and users.

From my view, i can guess of this param at some point. But i am still more confused than clear.

| Goal | Maximize prefix cache hits | Equitable resource allocation |
| Activation | `ENABLE_SESSION_BOOST=true` | `ENABLE_FAIRNESS_SCHEDULING=true` |
| Requires user ID | No | Yes |
| Priority logic | Boosted > non-boosted, FIFO within | Lower recent usage wins |

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are two sessions, both are multi-turn conversation. For example session A request boosted at the front of the queue but not dequeued, then SessionB's request enqueued, will you boost SessionB request in front of A's.

#### Goals

1. **Simple activation**: Session boost can be enabled via `ENABLE_SESSION_BOOST=true`.
2. **Configurable session identification**: Users can configure which HTTP header identifies conversation sessions via `SESSION_BOOST_HEADER` (defaults to `X-Correlation-ID`).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

X-Correlation-ID -> X-Session-ID

Comment on lines +73 to +74
│ │ map[corrID]time │ └─────────────────────────┘ │
│ │ TTL: 60s default │ │

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would suggest directly use the golang simple lru lib instead of implementing our self

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kind/enhancement New feature or request size/XXL

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants