Add WatchList support with GenericListWatcher abstraction#10187
Add WatchList support with GenericListWatcher abstraction#10187liuxu623 wants to merge 1 commit intoprojectcalico:masterfrom
Conversation
7a86dbf to
c31cd88
Compare
b2de890 to
f4dc1ff
Compare
|
D'oh, think we're duplicating some work here; I've been working on the same. #9593 was an enabling PR to be able to add this feature. You're right; it's a big win for us. It actually moves the k8s API to work the same way that Calico's internal "Syncer" API works(!) I need to dust off my work in progress and see if there were any gotchas and compare with your approach. |
|
/sem-approve |
I noticed #9593 before, WatchList feature depends on bookmark. After support bookmark, a lot of work was reduced. |
88db18c to
c5ecc46
Compare
|
@fasaxc Felix add |
|
Sorry for the radio silence, I finally got a chance to have a look. A few high level thoughts rather than jumping into line-by line review:
For reference, here's a gist with my proof-of-concept version: https://gist.github.com/fasaxc/79da02a2ab91c3bcd9644761e6abfc79 It was based on a much earlier version of the watch bookmarks PR and it's hard to rebase now but there might be something worth borrowing from it. Some things I did differently:
|
Before k8s 1.27, watch not support resourceVersionMatch, ValidateListOptions return err. // ValidateListOptions returns all validation errors found while validating the ListOptions.
func ValidateListOptions(options *internalversion.ListOptions) field.ErrorList {
allErrs := field.ErrorList{}
if match := options.ResourceVersionMatch; len(match) > 0 {
if options.Watch {
allErrs = append(allErrs, field.Forbidden(field.NewPath("resourceVersionMatch"), "resourceVersionMatch is forbidden for watch"))
}
......
}
return allErrs
}After k8s 1.27, sendInitialEvents not support if WatchList feature gate is disabled, ValidateListOptions return err. func validateWatchOptions(options *internalversion.ListOptions, isWatchListFeatureEnabled bool) field.ErrorList {
allErrs := field.ErrorList{}
match := options.ResourceVersionMatch
if options.SendInitialEvents != nil {
if match != metav1.ResourceVersionMatchNotOlderThan {
allErrs = append(allErrs, field.Forbidden(field.NewPath("resourceVersionMatch"), fmt.Sprintf("sendInitialEvents requires setting resourceVersionMatch to %s", metav1.ResourceVersionMatchNotOlderThan)))
}
if !isWatchListFeatureEnabled {
allErrs = append(allErrs, field.Forbidden(field.NewPath("sendInitialEvents"), "sendInitialEvents is forbidden for watch unless the WatchList feature gate is enabled"))
}
}
......
return allErrs
}So, the error is same for new and old apiserver, and the error is Invalid error, we can use the error type to determine whether the apiserver supports WatchList. metainternalversion.SetListOptionsDefaults(&opts, utilfeature.DefaultFeatureGate.Enabled(features.WatchList))
if errs := metainternalversionvalidation.ValidateListOptions(&opts, utilfeature.DefaultFeatureGate.Enabled(features.WatchList)); len(errs) > 0 {
err := errors.NewInvalid(schema.GroupKind{Group: metav1.GroupName, Kind: "ListOptions"}, "", errs)
scope.err(err, w, req)
return
} |
Yes, I plan to make etcd watch return the same error as apiserver if specifies ResourceVersionMatch or SendInitialEvents. // Watch entries in the datastore matching the resources specified by the ListInterface.
func (c *etcdV3Client) Watch(cxt context.Context, l model.ListInterface, options api.WatchOptions) (api.WatchInterface, error) {
allErrs := field.ErrorList{}
if len(options.ResourceVersionMatch) > 0 {
allErrs = append(allErrs, field.Forbidden(field.NewPath("resourceVersionMatch"), "resourceVersionMatch is forbidden for etcdv3 backend"))
}
if options.SendInitialEvents != nil && *options.SendInitialEvents {
allErrs = append(allErrs, field.Forbidden(field.NewPath("sendInitialEvents"), "sendInitialEvents is forbidden for etcdv3 backend"))
}
if len(allErrs) > 0 {
return nil, apierrors.NewInvalid(schema.GroupKind{Group: metav1.GroupName, Kind: "ListOptions"}, "", allErrs)
} |
if wc.initialRev == 0 {
// No initial revision supplied, so perform a list of current configuration
// which will also get the current revision we will start our watch from.
var kvps *model.KVPairList
var err error
if kvps, err = wc.listCurrent(); err != nil {
log.Errorf("failed to list current with latest state: %v", err)
// Error considered as terminating error, hence terminate watcher.
wc.sendError(err)
return
}
// If we're handling profiles, filter out the default-allow profile.
if len(kvps.KVPairs) > 0 && (key == profilesKey || key == defaultAllowProfileKey) {
wc.removeDefaultAllowProfile(kvps)
}
// We are sending an initial sync of entries to the watcher to provide current
// state. To the perspective of the watcher, these are added entries, so set the
// event type to WatchAdded.
log.WithField("NumEntries", len(kvps.KVPairs)).Debug("Sending create events for each existing entry")
wc.sendAddedEvents(kvps)
}
We shouldn't use this code to implement WatchList for etcdv3, because there is no time to call onInSync. |
367be48 to
7809dfb
Compare
|
https://kubernetes.io/blog/2025/05/09/kubernetes-v1-33-streaming-list-responses/ @fasaxc WatchList feature has disabled by default in 1.33, because StreamingCollectionEncodingToJSON and StreamingCollectionEncodingToProtobuf appear to work better, maybe we doesn't need WatchList anymore. |
|
Agree, let's put it on hold. It's a shame; the streaming decode is better for Calico on the client side. We avoid a big resource spike while we parse/validate the very large List object. |
@fasaxc Kubernetes 1.34 has re-promoted the WatchList feature to Beta, and we can resume this work now. |
9e30ab3 to
3002098
Compare
0d4667e to
1770165
Compare
|
@fasaxc I apologize for the lack of updates to this PR recently due to work commitments. I've made some refactorings to the ListWatch code; details can be found in the PR description. Looking forward to your feedback. |
22c5782 to
fe45ff9
Compare
|
This PR is stale because it has been open for 60 days with no activity. |
Introduce Kubernetes WatchList synchronization mode (GA in K8s 1.32), enabling efficient initial sync by streaming data via watch events. Key changes: - Extract list-watch logic into GenericListWatcher with ListWatchBackend and EventHandler interfaces for clean separation of concerns - Add k8s ListWatcher with WatchList mode, bookmark handling, and automatic fallback to List+Watch for older K8s versions - Add etcd ListWatcher using traditional List+Watch pattern - Use empty revision for List/Watch to avoid stale revision errors - Replace oldResources map with resyncEpoch for memory efficiency - Simplify watcherCache from ~500 to ~278 lines (-45%) WatchList mode reduces memory pressure by streaming resources incrementally instead of loading all at once via List API.
Description
This PR introduces Kubernetes WatchList synchronization mode support and refactors the list-watch logic into a reusable
GenericListWatcherabstraction. WatchList mode provides more efficient initial synchronization by streaming resources through watch events instead of loading all resources at once via the List API.Motivation
The traditional List+Watch pattern has a significant limitation: during initial synchronization, the entire dataset must be loaded into memory via a single List API call. For large clusters with thousands of resources, this creates:
The WatchList feature (GA in Kubernetes 1.32) addresses these issues by streaming initial data incrementally through watch events.
Refactoring Summary
This PR performs a significant refactoring of the list-watch infrastructure:
Before Refactoring
After Refactoring
Design
Architecture
flowchart TB subgraph WatcherSyncer["WatcherSyncer Layer"] WS[WatcherSyncer<br/>Coordinates multiple watchers] WC1[watcherCache #1<br/>e.g., Pods] WC2[watcherCache #2<br/>e.g., Services] WC3[watcherCache #N<br/>e.g., NetworkPolicies] WS --> WC1 WS --> WC2 WS --> WC3 end subgraph EventHandling["Event Handling"] EH{{EventHandler Interface}} EH1[OnResyncStarted] EH2[OnAdd / OnUpdate / OnDelete] EH3[OnSync] EH4[OnError] EH --> EH1 EH --> EH2 EH --> EH3 EH --> EH4 end subgraph ClientInterface["api.Client Interface"] LIST[List] WATCH[Watch] LAW[ListAndWatch<br/>NEW in this PR] end subgraph K8sBackend["Kubernetes Backend"] K8SC[k8s.KubeClient] K8SLW[k8s.ListWatcher] subgraph K8sModes["Sync Modes"] WLM[WatchList Mode<br/>K8s 1.32+ GA] LWM[List+Watch Mode<br/>Fallback] end K8SC --> K8SLW K8SLW --> WLM K8SLW -.->|fallback| LWM end subgraph EtcdBackend["etcd Backend"] ETCDC[etcdv3.Client] ETCDLW[etcdv3.ListWatcher] ETCDC --> ETCDLW ETCDLW --> LWM2[List+Watch Mode] end subgraph GenericLW["GenericListWatcher (Shared)"] GLW[Common Logic] GLW1[Retry Throttling<br/>MinResyncInterval] GLW2[Revision Tracking<br/>CurrentRevision] GLW3[Error Counting<br/>MaxErrorsPerRevision] GLW4[Connection Timeout<br/>WatchRetryTimeout] GLW5[Event Loop<br/>LoopReadingFromWatcher] GLW --> GLW1 GLW --> GLW2 GLW --> GLW3 GLW --> GLW4 GLW --> GLW5 end subgraph DataSources["Data Sources"] ETCD[(etcd)] CACHE[(API Server<br/>Watch Cache)] end WC1 -.->|implements| EH WC2 -.->|implements| EH WC3 -.->|implements| EH WC1 -->|calls| LAW WC2 -->|calls| LAW WC3 -->|calls| LAW LAW --> K8SC LAW --> ETCDC K8SLW --> GLW ETCDLW --> GLW WLM -->|rv=empty| CACHE WLM -->|streaming| CACHE LWM -->|rv=empty| ETCD LWM2 -->|List+Watch| ETCD style LAW fill:#9f9,stroke:#393 style WLM fill:#9cf,stroke:#369 style GLW fill:#fc9,stroke:#963Data Flow
sequenceDiagram participant WS as WatcherSyncer participant WC as watcherCache participant Client as api.Client participant LW as ListWatcher participant GLW as GenericListWatcher participant Server as K8s/etcd WS->>WC: run(ctx) WC->>Client: ListAndWatch(ctx, list, handler) Client->>LW: create ListWatcher LW->>GLW: embed GenericListWatcher loop Main Loop GLW->>GLW: RetryThrottleC() GLW->>LW: PerformInitialSync() alt WatchList Mode (K8s) LW->>WC: OnResyncStarted() LW->>Server: Watch(SendInitialEvents=true) loop Streaming Events Server-->>LW: WatchAdded/Modified/Deleted LW->>GLW: UpdateRevision() LW->>WC: OnAdd/OnUpdate/OnDelete end Server-->>LW: Bookmark(InitialEvents=true) LW->>WC: OnSync() else List+Watch Mode (etcd/fallback) LW->>WC: OnResyncStarted() LW->>Server: List() Server-->>LW: KVPairList loop Send Events LW->>WC: OnAdd(kvp) end LW->>WC: OnSync() LW->>Server: Watch(revision) end loop Watch Loop Server-->>LW: Events LW->>GLW: HandleBasicWatchEvent() GLW->>WC: OnAdd/OnUpdate/OnDelete end alt Error Occurred Server-->>LW: Error LW->>GLW: HandleWatchError() GLW->>GLW: IncrementErrorCount / ResetForFullResync end endKey Interfaces
EventHandler Interface
ListWatchBackend Interface
Implementation Details
1. GenericListWatcher (
api/listwatcher.go)The
GenericListWatcherprovides common functionality for all backend implementations:MaxErrorsPerRevision(default: 5)Key state management:
2. Kubernetes ListWatcher (
k8s/listwatcher.go)Implements WatchList mode with automatic fallback:
WatchList Mode (default):
SendInitialEvents=trueandResourceVersionMatch=NotOlderThanin watch optionsInitialEventsAnnotationKeyannotationFallback to List+Watch:
IsInvaliderror (older K8s versions)K8s-specific Error Handling:
3. etcd ListWatcher (
etcdv3/listwatcher.go)Implements traditional List+Watch pattern:
PerformListSync()for initial synchronizationGenericListWatcher4. Simplified watcherCache (
watchersyncer/watchercache.go)The
watcherCachenow implementsEventHandlerinterface directly:Before: ~500 lines with embedded list-watch logic
After: ~278 lines (-45% reduction)
Key simplifications:
oldResourcesmap withresyncEpochcounter for deletion detectionResync Epoch Mechanism
On resync start:
resyncEpochresyncEpoch < current(stale entries)This is more memory-efficient than the previous approach of copying all resources to an
oldResourcesmap.sequenceDiagram participant Cache as watcherCache participant Resources as resources map participant Handler as EventHandler Note over Resources: Initial State (epoch=1)<br/>pod-a:1, pod-b:1, pod-c:1 Note over Cache: Connection lost! Cache->>Cache: OnResyncStarted()<br/>resyncEpoch = 2 Note over Cache: Receive events during resync<br/>(pod-b was deleted while disconnected) Cache->>Resources: OnAdd(pod-a) → epoch=2 Cache->>Resources: OnAdd(pod-c) → epoch=2 Note over Resources: pod-a:2, pod-b:1 (STALE!), pod-c:2 Cache->>Cache: OnSync() called Cache->>Resources: Scan for stale entries<br/>(epoch < current) Resources-->>Cache: pod-b.epoch(1) < current(2) Cache->>Handler: Send delete for pod-b Cache->>Resources: Remove pod-b Note over Resources: Final State<br/>pod-a:2, pod-c:2ResourceVersion Management
Overview
ResourceVersion (revision) is a critical concept in Kubernetes/etcd that ensures consistency during list-watch operations. This PR improves revision management by:
Revision Lifecycle
stateDiagram-v2 [*] --> Empty: NewGenericListWatcher Empty --> Syncing: PerformInitialSync state Syncing { [*] --> WatchList: K8s default [*] --> ListWatch: etcd / fallback WatchList --> ReceiveEvents: Stream events ReceiveEvents --> UpdateRev: Each event updates revision UpdateRev --> ReceiveEvents UpdateRev --> Synced: Bookmark with annotation ListWatch --> ListAll: PerformList ListAll --> UpdateRevFromList: Get list revision UpdateRevFromList --> Synced: OnSync } Synced --> Watching: CreateWatch(revision) state Watching { [*] --> WaitEvent WaitEvent --> ProcessEvent: Event received ProcessEvent --> UpdateRevision: Update CurrentRevision UpdateRevision --> WaitEvent } Watching --> Empty: Error / Resync needed Watching --> Syncing: Connection lostRevision Update Points
""event.New.Revisionorevent.Old.RevisionUpdateRevision()event.New.RevisionUpdateRevision()list.RevisionUpdateRevision()""ResetForFullResync()""ResetForFullResync()""ResetForFullResync()Revision Flow Diagram
sequenceDiagram participant GLW as GenericListWatcher participant Backend as k8s/etcd Backend participant Server as API Server Note over GLW: CurrentRevision = "" GLW->>Backend: PerformInitialSync() alt WatchList Mode Backend->>Server: Watch(SendInitialEvents=true) loop Initial Events Server-->>Backend: WatchAdded(rev=100) Backend->>GLW: UpdateRevision("100") Server-->>Backend: WatchAdded(rev=101) Backend->>GLW: UpdateRevision("101") Server-->>Backend: WatchAdded(rev=102) Backend->>GLW: UpdateRevision("102") end Server-->>Backend: Bookmark(rev=102, InitialEvents=true) Backend->>GLW: UpdateRevision("102") Note over GLW: InitialSyncPending = false else List+Watch Mode Backend->>Server: List() Server-->>Backend: KVPairList(rev=102) Backend->>GLW: UpdateRevision("102") Note over GLW: InitialSyncPending = false Backend->>Server: Watch(revision=102) end Note over GLW: CurrentRevision = "102" loop Watch Events Server-->>Backend: WatchModified(rev=103) Backend->>GLW: UpdateRevision("103") Server-->>Backend: WatchDeleted(rev=104) Backend->>GLW: UpdateRevision("104") Server-->>Backend: Bookmark(rev=110) Backend->>GLW: UpdateRevision("110") end Note over GLW: CurrentRevision = "110" alt Error: ResourceExpired Server-->>Backend: Error(410 Gone) Backend->>GLW: ResetForFullResync() Note over GLW: CurrentRevision = "" Note over GLW: InitialSyncPending = true endWhy Empty Revision Instead of "0"
In the previous implementation, revision was initialized to
"0". This PR changes it to empty string:Reasons for using empty revision:
Data source difference (see Kubernetes API Concepts - Resource Versions):
""(empty/unset)"0"Using empty resourceVersion ensures we get the most recent data from etcd with strong consistency guarantees, which is critical for the initial sync to be accurate.
Pagination bypass with rv=0: When
resourceVersion=0is used, the API server returns all data at once from watch cache, bypassing pagination parameters. This defeats the purpose of memory-efficient streaming:etcd behavior: According to etcd client documentation:
This means in etcd Get/List operations,
WithRev(0)and not specifyingWithRevhave the same behavior - both return the latest data from etcd. The Calico etcd backend handles this by not passingWithRevoption when revision is empty:Key difference from Kubernetes: While in Kubernetes
rv=0vsrv=""have different semantics (cache vs etcd), in native etcd client they are equivalent. The Calico implementation uses empty string consistently for both backends to request latest data.Modern API server optimizations: Recent Kubernetes versions (1.28+) have implemented significant performance improvements for consistent reads that work best with empty resourceVersion:
ConsistentListFromCache (KEP-3157): Allows serving consistent List requests directly from the watch cache instead of requiring a quorum read from etcd. When using empty resourceVersion with
ResourceVersionMatch=NotOlderThan, the API server can serve the request from cache while still guaranteeing consistency.ListFromCacheSnapshot (KEP-3926): Creates a point-in-time snapshot of the watch cache for List operations, reducing memory allocations and improving performance for large lists.
These optimizations are designed around the pattern of requesting "latest" data (empty resourceVersion) rather than a specific version, making empty revision the recommended approach for new implementations.
Todos
Release Note
Reminder for the reviewer
Make sure that this PR has the correct labels and milestone set.
Every PR needs one
docs-*label.docs-pr-required: This change requires a change to the documentation that has not been completed yet.docs-completed: This change has all necessary documentation completed.docs-not-required: This change has no user-facing impact and requires no docs.Every PR needs one
release-note-*label.release-note-required: This PR has user-facing changes. Most PRs should have this label.release-note-not-required: This PR has no user-facing changes.Other optional labels:
cherry-pick-candidate: This PR should be cherry-picked to an earlier release. For bug fixes only.needs-operator-pr: This PR is related to install and requires a corresponding change to the operator.