add kvcache aware plugin e2e test#1222
Conversation
Signed-off-by: zhoujinyu <2319109590@qq.com>
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds an end-to-end KV-cache-aware scheduler plugin test chain by introducing a ZMQ bridge and wiring the mock deployment to publish/consume KV events through runtime + Redis.
Changes:
- Added a Python ZMQ bridge helper and created it as a ConfigMap during E2E setup.
- Updated the mock deployment to include
zmq-bridgeandruntimecontainers and enabled the sim’s native ZMQ publishing. - Added a new KV-cache-aware E2E test plus Redis/rollout helper functions.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| test/e2e/router/router-plugins/testdata/zmq-bridge.py | New helper script to forward sim ZMQ events into the runtime topic layout |
| test/e2e/router/router-plugins/testdata/LLM-Mock-plugins.yaml | Adds bridge + runtime sidecars and config/ports to enable full KV-cache-aware chain |
| test/e2e/router/router-plugins/context/context.go | Creates ConfigMap containing the bridge script before deploying mocks |
| test/e2e/router/plugins_test.go | Adds TestSchedulerPluginKVCacheAware full-chain routing preference test |
| test/e2e/router/plugins_helpers.go | Adds Redis port-forward client setup and Redis-wait helpers for KV mappings |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Code Review
This pull request introduces end-to-end testing for the kvcache-aware scheduling plugin, adding a ZMQ bridge helper script, configuring a runtime container in the mock LLM deployment, and implementing the TestSchedulerPluginKVCacheAware test. The review feedback focuses on improving test robustness and preventing flakiness. Key recommendations include refactoring setupRedisClient to return errors instead of asserting inside Eventually, resolving the zmq-bridge.py path dynamically using runtime.Caller to support running tests from package subdirectories, avoiding redundant rolling updates in patchMockRedisHost when the host is already set, and adding a buffer to the log scraping start time to account for potential clock skew.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| func setupRedisClient(t *testing.T, kube kubernetes.Interface, namespace string) (*redis.Client, func()) { | ||
| t.Helper() | ||
| pods := utils.ListReadyPodsByLabel(t, kube, namespace, redisServerAppLabel) | ||
| require.NotEmpty(t, pods, "no ready redis pods in namespace %s", namespace) | ||
|
|
||
| localPort := utils.AllocateLocalPort(t) | ||
| pf, err := utils.SetupPortForwardToPod(namespace, pods[0].Name, localPort, "6379") | ||
| require.NoError(t, err, "port-forward to redis pod %s", pods[0].Name) | ||
|
|
||
| addr := fmt.Sprintf("127.0.0.1:%s", localPort) | ||
| client := redis.NewClient(&redis.Options{Addr: addr}) | ||
| ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) | ||
| defer cancel() | ||
| require.NoError(t, client.Ping(ctx).Err(), "redis ping via port-forward") | ||
|
|
||
| return client, func() { | ||
| _ = client.Close() | ||
| pf.Close() | ||
| } | ||
| } |
There was a problem hiding this comment.
Using require assertions (which call t.FailNow()) inside a helper function that is executed within require.Eventually defeats the purpose of retrying. If any transient error occurs (such as a temporary port-forwarding failure or a Redis ping timeout while the container is starting up), the test will fail immediately instead of waiting and retrying. Refactor setupRedisClient to return an error so that the caller can handle it gracefully inside Eventually.
func setupRedisClient(t *testing.T, kube kubernetes.Interface, namespace string) (*redis.Client, func(), error) {
t.Helper()
pods := utils.ListReadyPodsByLabel(t, kube, namespace, redisServerAppLabel)
if len(pods) == 0 {
return nil, nil, fmt.Errorf("no ready redis pods in namespace %s", namespace)
}
localPort := utils.AllocateLocalPort(t)
pf, err := utils.SetupPortForwardToPod(namespace, pods[0].Name, localPort, "6379")
if err != nil {
return nil, nil, fmt.Errorf("port-forward to redis pod %s: %w", pods[0].Name, err)
}
addr := fmt.Sprintf("127.0.0.1:%s", localPort)
client := redis.NewClient(&redis.Options{Addr: addr})
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := client.Ping(ctx).Err(); err != nil {
_ = client.Close()
pf.Close()
return nil, nil, fmt.Errorf("redis ping via port-forward: %w", err)
}
return client, func() {
_ = client.Close()
pf.Close()
}, nil
}d50e36a to
b111454
Compare
There was a problem hiding this comment.
Just a compatibility. The ZMQ topic sent by the sim (sim instance) is inconsistent with the topic listened to by the runtime: the sim uses a per-pod topic like kv@<podIP>@<model>, while the runtime only subscribes to kv-events. Without conversion, it won't receive any kv-cache events, and Redis won't be written.
Example: Before processing, the sim sends a message with the topic kv@10.0.1.23@llm-mock, and the runtime, subscribing to kv-events, doesn't consume it at all. After processing, zmq_bridge changes the same message to kv-events and forwards it to the runtime (payload unchanged), allowing the runtime to write to Redis.
|
|
||
| // SetupPluginComponents deploys fast/slow plugin mocks and ModelServers shared by plugin e2e tests. | ||
| func SetupPluginComponents(kubeClient *kubernetes.Clientset, kthenaClient *clientset.Clientset, namespace string) error { | ||
| func SetupPluginComponents(kubeClient *kubernetes.Clientset, kthenaClient *clientset.Clientset, namespace, kthenaNamespace string) error { |
There was a problem hiding this comment.
Write REDIS_HOST into the ConfigMap before deploying SetupPluginComponents . I tried patching mid-deployment, but the patch triggered Pod restarts, interrupted warmup and kv cache writes.
|
|
||
| // DirectChatToPod sends count streaming chat requests directly to a pod via port-forward. | ||
| func DirectChatToPod(t *testing.T, pod corev1.Pod, model, prompt string, count int) { | ||
| func DirectChatToPod(t *testing.T, pod corev1.Pod, model, prompt string, count, maxTokens int) { |
There was a problem hiding this comment.
In the kvcache-aware chain, the kv-cache capacity/block count of sim is very small. During testing, I found that if chat requests generate too many tokens, it can easily fill the kv cache and trigger a 500 error.so I simply added a token control.
|
@hzxuzhonghu ptal |
|
|
||
| route := utils.CreateModelRouteFromFile(t, ctx, testCtx.KthenaClient, plugincontext.TestDataDir, testNamespace, "ModelRoute-plugins.yaml") | ||
| model := route.Spec.ModelName | ||
| // Must fit sim kv-cache-size=8 blocks (block-size=8): prompt blocks + max_tokens blocks <= 8. |
There was a problem hiding this comment.
no , <= 8 refers to the maximum kv-cache size of the sim (kv-cache-size=8), not a requirement of at least 8 blocks. We keep the prompt and max_tokens very small to avoid the prompt blocks + output blocks exceeding 8, which would result in a 500 error.
There was a problem hiding this comment.
maybe need to update comment
What type of PR is this?
What this PR does / why we need it:
After #1199 and #1178 only kvcache-aware plugin left now
Which issue(s) this PR fixes:
Special notes for your reviewer:
Does this PR introduce a user-facing change?: