Skip to content

Commit f4dc1ff

Browse files
committed
support use WatchList for k8s
1 parent be8b199 commit f4dc1ff

16 files changed

Lines changed: 280 additions & 150 deletions

File tree

libcalico-go/lib/apiconfig/apiconfig.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ type KubeConfig struct {
8383
K8sClientQPS float32 `json:"k8sClientQPS"`
8484
// K8sCurrentContext provides a context override for kubeconfig.
8585
K8sCurrentContext string `json:"k8sCurrentContext" envconfig:"K8S_CURRENT_CONTEXT" default:""`
86+
// K8sUseWatchList controls whether use of WatchList.
87+
K8sUseWatchList bool `json:"k8sUseWatchList" envconfig:"K8S_USE_WATCH_LIST" default:"false"`
8688
}
8789

8890
// NewCalicoAPIConfig creates a new (zeroed) CalicoAPIConfig struct with the

libcalico-go/lib/backend/api/api.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"sync"
2121

2222
"github.com/projectcalico/calico/libcalico-go/lib/backend/model"
23+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2324
)
2425

2526
// SyncStatus represents the overall state of the datastore.
@@ -133,8 +134,10 @@ type StatusClient interface {
133134
}
134135

135136
type WatchOptions struct {
136-
Revision string
137-
AllowWatchBookmarks bool
137+
Revision string
138+
AllowWatchBookmarks bool
139+
SendInitialEvents *bool
140+
ResourceVersionMatch metav1.ResourceVersionMatch
138141
}
139142

140143
type Syncer interface {

libcalico-go/lib/backend/k8s/resources/resources.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -360,8 +360,10 @@ func ConvertK8sResourceToCalicoResource(res Resource) error {
360360

361361
func watchOptionsToK8sListOptions(wo api.WatchOptions) metav1.ListOptions {
362362
return metav1.ListOptions{
363-
ResourceVersion: wo.Revision,
364-
Watch: true,
365-
AllowWatchBookmarks: wo.AllowWatchBookmarks,
363+
ResourceVersion: wo.Revision,
364+
Watch: true,
365+
AllowWatchBookmarks: wo.AllowWatchBookmarks,
366+
SendInitialEvents: wo.SendInitialEvents,
367+
ResourceVersionMatch: wo.ResourceVersionMatch,
366368
}
367369
}

libcalico-go/lib/backend/k8s/resources/watcher.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ func (crw *k8sWatcherConverter) convertEvent(kevent kwatch.Event) []*api.WatchEv
174174
return []*api.WatchEvent{{
175175
Type: api.WatchBookmark,
176176
New: &model.KVPair{
177+
Value: k8sRes,
177178
Revision: revision,
178179
},
179180
}}

libcalico-go/lib/backend/syncersv1/bgpsyncer/bgpsyncer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,5 +57,5 @@ func New(client api.Client, callbacks api.SyncerCallbacks, node string, cfg apic
5757
})
5858
}
5959

60-
return watchersyncer.New(client, resourceTypes, callbacks)
60+
return watchersyncer.New(client, resourceTypes, callbacks, watchersyncer.WithUseWatchList(cfg.K8sUseWatchList))
6161
}

libcalico-go/lib/backend/syncersv1/felixsyncer/felixsyncerv1.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,5 +137,6 @@ func New(client api.Client, cfg apiconfig.CalicoAPIConfigSpec, callbacks api.Syn
137137
client,
138138
resourceTypes,
139139
callbacks,
140+
watchersyncer.WithUseWatchList(cfg.K8sUseWatchList),
140141
)
141142
}

libcalico-go/lib/backend/syncersv1/nodestatussyncer/nodestatussyncer.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,19 @@ package nodestatussyncer
1717
import (
1818
apiv3 "github.com/projectcalico/api/pkg/apis/projectcalico/v3"
1919

20+
"github.com/projectcalico/calico/libcalico-go/lib/apiconfig"
2021
"github.com/projectcalico/calico/libcalico-go/lib/backend/api"
2122
"github.com/projectcalico/calico/libcalico-go/lib/backend/model"
2223
"github.com/projectcalico/calico/libcalico-go/lib/backend/watchersyncer"
2324
)
2425

2526
// New creates a new CalicoNodeStatus v1 Syncer.
26-
func New(client api.Client, callbacks api.SyncerCallbacks) api.Syncer {
27+
func New(client api.Client, callbacks api.SyncerCallbacks, cfg apiconfig.CalicoAPIConfigSpec) api.Syncer {
2728
resourceTypes := []watchersyncer.ResourceType{
2829
{
2930
ListInterface: model.ResourceListOptions{Kind: apiv3.KindCalicoNodeStatus},
3031
},
3132
}
3233

33-
return watchersyncer.New(client, resourceTypes, callbacks)
34+
return watchersyncer.New(client, resourceTypes, callbacks, watchersyncer.WithUseWatchList(cfg.K8sUseWatchList))
3435
}

libcalico-go/lib/backend/syncersv1/nodestatussyncer/nodestatussyncer_e2e_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ var _ = testutils.E2eDatastoreDescribe("Calico node status syncer tests", testut
5555
// Create a SyncerTester to receive the Calico node status syncer callback events and to allow us
5656
// to assert state.
5757
syncTester := testutils.NewSyncerTester()
58-
syncer := nodestatussyncer.New(be, syncTester)
58+
syncer := nodestatussyncer.New(be, syncTester, config.Spec)
5959
syncer.Start()
6060
expectedCacheSize := 0
6161

@@ -113,7 +113,7 @@ var _ = testutils.E2eDatastoreDescribe("Calico node status syncer tests", testut
113113
// We need to create a new syncTester and syncer.
114114
current := syncTester.GetCacheEntries()
115115
syncTester = testutils.NewSyncerTester()
116-
syncer = nodestatussyncer.New(be, syncTester)
116+
syncer = nodestatussyncer.New(be, syncTester, config.Spec)
117117
syncer.Start()
118118

119119
// Verify the data is the same as the data from the previous cache. We got the cache in the previous

libcalico-go/lib/backend/syncersv1/tunnelipsyncer/tunnelipsyncer.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package tunnelipsyncer
1717
import (
1818
apiv3 "github.com/projectcalico/api/pkg/apis/projectcalico/v3"
1919

20+
"github.com/projectcalico/calico/libcalico-go/lib/apiconfig"
2021
libapiv3 "github.com/projectcalico/calico/libcalico-go/lib/apis/v3"
2122
"github.com/projectcalico/calico/libcalico-go/lib/backend/api"
2223
"github.com/projectcalico/calico/libcalico-go/lib/backend/model"
@@ -26,7 +27,7 @@ import (
2627

2728
// New creates a new tunnel IP allocation v1 Syncer. An optional node name may be supplied. If set, the syncer only
2829
// watches the specified node rather than all nodes.
29-
func New(client api.Client, callbacks api.SyncerCallbacks, node string) api.Syncer {
30+
func New(client api.Client, callbacks api.SyncerCallbacks, node string, cfg apiconfig.CalicoAPIConfigSpec) api.Syncer {
3031
resourceTypes := []watchersyncer.ResourceType{
3132
{
3233
ListInterface: model.ResourceListOptions{Kind: apiv3.KindIPPool},
@@ -37,5 +38,5 @@ func New(client api.Client, callbacks api.SyncerCallbacks, node string) api.Sync
3738
},
3839
}
3940

40-
return watchersyncer.New(client, resourceTypes, callbacks)
41+
return watchersyncer.New(client, resourceTypes, callbacks, watchersyncer.WithUseWatchList(cfg.K8sUseWatchList))
4142
}

libcalico-go/lib/backend/syncersv1/tunnelipsyncer/tunnelipsyncer_e2e_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ var _ = testutils.E2eDatastoreDescribe("Tunnel IP allocation syncer tests", test
5757
// Create a SyncerTester to receive the tunnel IP allocation syncer callback events and to allow us
5858
// to assert state.
5959
syncTester := testutils.NewSyncerTester()
60-
syncer := tunnelipsyncer.New(be, syncTester, "127.0.0.1")
60+
syncer := tunnelipsyncer.New(be, syncTester, "127.0.0.1", config.Spec)
6161
syncer.Start()
6262
expectedCacheSize := 0
6363

@@ -115,7 +115,7 @@ var _ = testutils.E2eDatastoreDescribe("Tunnel IP allocation syncer tests", test
115115
// We need to create a new syncTester and syncer.
116116
current := syncTester.GetCacheEntries()
117117
syncTester = testutils.NewSyncerTester()
118-
syncer = tunnelipsyncer.New(be, syncTester, "127.0.0.1")
118+
syncer = tunnelipsyncer.New(be, syncTester, "127.0.0.1", config.Spec)
119119
syncer.Start()
120120

121121
// Verify the data is the same as the data from the previous cache. We got the cache in the previous

0 commit comments

Comments
 (0)