From 6b4b8e28f2ebceb123ee178c87ae34d5860a1c8e Mon Sep 17 00:00:00 2001 From: Antonin Bas Date: Wed, 29 Jan 2025 17:07:06 -0800 Subject: [PATCH] [FlowExporter] More efficient IP checks The FlowExporter in the Agent queries the NodeRouteController to determine whether the source / destination IPs are Pod IPs (NodeIPAM only). Prior to this change, these checks were expensive, involving an indexer lookup and conversions between different IP formats. The new implementation is about 10x faster, and peforms no memory allocations. The new implementation introduces a new set in the NodeRouteController, dedicated to storing all the PodCIDRs in the cluster. While I considered removing the dependency of the FlowExporter on the NodeRouteController altogether, it would have been a much bigger change. Additionally, in the long term, we could consider removing these checks from the FlowExporter altogether, and pushing the logic to the FlowAggregator. We also make a few additional changes to the FlowExporter: * more consistently ignore connections where the source / destination IP is a gateway IP * classify Pod-to-Service traffic where the destination IP is not a Pod IP as Pod-to-External * log a warning if FlowExporter is enabled alongside AntreaIPAM Signed-off-by: Antonin Bas --- cmd/antrea-agent/options.go | 5 +- .../noderoute/node_route_controller.go | 118 +++++++--- .../noderoute/node_route_controller_test.go | 215 +++++++++++------- pkg/agent/flowexporter/exporter/exporter.go | 68 ++++-- pkg/agent/util/net.go | 17 +- pkg/agent/util/net_windows.go | 3 +- 6 files changed, 285 insertions(+), 141 deletions(-) diff --git a/cmd/antrea-agent/options.go b/cmd/antrea-agent/options.go index 4b08d4435e9..3aa9538ea5c 100644 --- a/cmd/antrea-agent/options.go +++ b/cmd/antrea-agent/options.go @@ -275,7 +275,10 @@ func (o *Options) validateAntreaProxyConfig(encapMode config.TrafficEncapModeTyp } func (o *Options) validateFlowExporterConfig() error { - if features.DefaultFeatureGate.Enabled(features.FlowExporter) { + if features.DefaultFeatureGate.Enabled(features.FlowExporter) && o.config.FlowExporter.Enable { + if features.DefaultFeatureGate.Enabled(features.AntreaIPAM) { + klog.InfoS("The FlowExporter feature does not support AntreaIPAM Pods") + } host, port, proto, err := flowexport.ParseFlowCollectorAddr(o.config.FlowExporter.FlowCollectorAddr, defaultFlowCollectorPort, defaultFlowCollectorTransport) if err != nil { return err diff --git a/pkg/agent/controller/noderoute/node_route_controller.go b/pkg/agent/controller/noderoute/node_route_controller.go index e64aa79ef01..ab50fe15017 100644 --- a/pkg/agent/controller/noderoute/node_route_controller.go +++ b/pkg/agent/controller/noderoute/node_route_controller.go @@ -18,11 +18,14 @@ import ( "context" "fmt" "net" + "net/netip" + "sync" "time" "github.com/containernetworking/plugins/pkg/ip" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" coreinformers "k8s.io/client-go/informers/core/v1" corelisters "k8s.io/client-go/listers/core/v1" @@ -77,7 +80,13 @@ type Controller struct { // installedNodes records routes and flows installation states of Nodes. // The key is the host name of the Node, the value is the nodeRouteInfo of the Node. // A node will be in the map after its flows and routes are installed successfully. - installedNodes cache.Indexer + installedNodes cache.Indexer + // podSubnetsMutex protects access to the podSubnets set. + podSubnetsMutex sync.RWMutex + // podSubnets is a set which stores all known PodCIDRs in the cluster as masked netip.Prefix objects. + podSubnets sets.Set[netip.Prefix] + maskSizeV4 int + maskSizeV6 int wireGuardClient wireguard.Interface // ipsecCertificateManager is useful for determining whether the ipsec certificate has been configured // or not when IPsec is enabled with "cert" mode. The NodeRouteController must wait for the certificate @@ -124,10 +133,21 @@ func NewNodeRouteController( }, ), installedNodes: cache.NewIndexer(nodeRouteInfoKeyFunc, cache.Indexers{nodeRouteInfoPodCIDRIndexName: nodeRouteInfoPodCIDRIndexFunc}), + podSubnets: sets.New[netip.Prefix](), wireGuardClient: wireguardClient, ipsecCertificateManager: ipsecCertificateManager, flowRestoreCompleteWait: flowRestoreCompleteWait.Increment(), } + if nodeConfig.PodIPv4CIDR != nil { + prefix, _ := cidrToPrefix(nodeConfig.PodIPv4CIDR) + controller.podSubnets.Insert(prefix) + controller.maskSizeV4 = prefix.Bits() + } + if nodeConfig.PodIPv6CIDR != nil { + prefix, _ := cidrToPrefix(nodeConfig.PodIPv6CIDR) + controller.podSubnets.Insert(prefix) + controller.maskSizeV6 = prefix.Bits() + } registration, _ := nodeInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerDetailedFuncs{ AddFunc: func(cur interface{}, isInInitialList bool) { @@ -379,7 +399,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) { go func() { // When the initial list of Nodes has been processed, we decrement flowRestoreCompleteWait. err := wait.PollUntilContextCancel(wait.ContextForChannel(stopCh), 100*time.Millisecond, true, func(ctx context.Context) (done bool, err error) { - return c.hasProcessedInitialList.HasSynced(), nil + return c.HasSynced(), nil }) // An error here means the context has been cancelled, which means that the stopCh // has been closed. While it is still possible for c.hasProcessedInitialList.HasSynced @@ -395,6 +415,11 @@ func (c *Controller) Run(stopCh <-chan struct{}) { <-stopCh } +// HasSynced returns true when the initial list of Nodes has been processed by the controller. +func (c *Controller) HasSynced() bool { + return c.hasProcessedInitialList.HasSynced() +} + // worker is a long-running function that will continually call the processNextWorkItem function in // order to read and process a message on the workqueue. func (c *Controller) worker() { @@ -482,6 +507,12 @@ func (c *Controller) deleteNodeRoute(nodeName string) error { return fmt.Errorf("failed to uninstall flows to Node %s: %v", nodeName, err) } c.installedNodes.Delete(obj) + func() { + subnets, _ := cidrsToPrefixes(nodeRouteInfo.podCIDRs) + c.podSubnetsMutex.Lock() + defer c.podSubnetsMutex.Unlock() + c.podSubnets.Delete(subnets...) + }() if c.networkConfig.TrafficEncryptionMode == config.TrafficEncryptionModeIPSec { interfaceConfig, ok := c.interfaceStore.GetNodeTunnelInterface(nodeName) @@ -575,6 +606,13 @@ func (c *Controller) addNodeRoute(nodeName string, node *corev1.Node) error { peerNodeIP = peerNodeIPs.IPv6 } + func() { + subnet, _ := cidrToPrefix(peerPodCIDR) + c.podSubnetsMutex.Lock() + defer c.podSubnetsMutex.Unlock() + c.podSubnets.Insert(subnet) + }() + klog.InfoS("Adding route and flow to Node", "Node", nodeName, "podCIDR", podCIDR, "peerNodeIP", peerNodeIP) } @@ -773,40 +811,31 @@ func ParseTunnelInterfaceConfig( return interfaceConfig } -func (c *Controller) IPInPodSubnets(ip net.IP) bool { - var ipCIDR *net.IPNet - var curNodeCIDRStr string - if ip.To4() != nil { - var podIPv4CIDRMaskSize int - if c.nodeConfig.PodIPv4CIDR != nil { - curNodeCIDRStr = c.nodeConfig.PodIPv4CIDR.String() - podIPv4CIDRMaskSize, _ = c.nodeConfig.PodIPv4CIDR.Mask.Size() - } else { - return false - } - v4Mask := net.CIDRMask(podIPv4CIDRMaskSize, utilip.V4BitLen) - ipCIDR = &net.IPNet{ - IP: ip.Mask(v4Mask), - Mask: v4Mask, - } - +func (c *Controller) findPodSubnetForIP(ip netip.Addr) (netip.Prefix, bool) { + var maskSize int + if ip.Is4() { + maskSize = c.maskSizeV4 } else { - var podIPv6CIDRMaskSize int - if c.nodeConfig.PodIPv6CIDR != nil { - curNodeCIDRStr = c.nodeConfig.PodIPv6CIDR.String() - podIPv6CIDRMaskSize, _ = c.nodeConfig.PodIPv6CIDR.Mask.Size() - } else { - return false - } - v6Mask := net.CIDRMask(podIPv6CIDRMaskSize, utilip.V6BitLen) - ipCIDR = &net.IPNet{ - IP: ip.Mask(v6Mask), - Mask: v6Mask, - } + maskSize = c.maskSizeV6 } - ipCIDRStr := ipCIDR.String() - nodeInCluster, _ := c.installedNodes.ByIndex(nodeRouteInfoPodCIDRIndexName, ipCIDRStr) - return len(nodeInCluster) > 0 || ipCIDRStr == curNodeCIDRStr + if maskSize == 0 { + return netip.Prefix{}, false + } + prefix, _ := ip.Prefix(maskSize) + c.podSubnetsMutex.RLock() + defer c.podSubnetsMutex.RUnlock() + return prefix, c.podSubnets.Has(prefix) +} + +// LookupIPInPodSubnets returns two boolean values. The first one indicates whether the IP can be +// found in a PodCIDR for one of the cluster Nodes. The second one indicates whether the IP is used +// as a gwateway IP. The second boolean value can only be true if the first one is true. +func (c *Controller) LookupIPInPodSubnets(ip netip.Addr) (bool, bool) { + prefix, ok := c.findPodSubnetForIP(ip) + if !ok { + return false, false + } + return ok, ip == util.GetGatewayIPForPodPrefix(prefix) } // getNodeMAC gets Node's br-int MAC from its annotation. It is only for Windows Noencap mode. @@ -821,3 +850,24 @@ func getNodeMAC(node *corev1.Node) (net.HardwareAddr, error) { } return mac, nil } + +func cidrToPrefix(cidr *net.IPNet) (netip.Prefix, error) { + addr, ok := netip.AddrFromSlice(cidr.IP) + if !ok { + return netip.Prefix{}, fmt.Errorf("invalid IP in cidr: %v", cidr) + } + size, _ := cidr.Mask.Size() + return addr.Prefix(size) +} + +func cidrsToPrefixes(cidrs []*net.IPNet) ([]netip.Prefix, error) { + prefixes := make([]netip.Prefix, len(cidrs)) + for idx := range cidrs { + prefix, err := cidrToPrefix(cidrs[idx]) + if err != nil { + return nil, err + } + prefixes[idx] = prefix + } + return prefixes, nil +} diff --git a/pkg/agent/controller/noderoute/node_route_controller_test.go b/pkg/agent/controller/noderoute/node_route_controller_test.go index 315b359d635..6e5d7c84a0f 100644 --- a/pkg/agent/controller/noderoute/node_route_controller_test.go +++ b/pkg/agent/controller/noderoute/node_route_controller_test.go @@ -17,10 +17,10 @@ package noderoute import ( "context" "net" + "net/netip" "testing" "time" - "github.com/containernetworking/plugins/pkg/ip" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" @@ -45,27 +45,27 @@ import ( ) var ( - gatewayMAC, _ = net.ParseMAC("00:00:00:00:00:01") - _, podCIDR, _ = net.ParseCIDR("1.1.1.0/24") - _, podCIDR2, _ = net.ParseCIDR("1.1.2.0/24") - _, podCIDR3, _ = net.ParseCIDR("2001:4860:4860::8888/32") - podCIDRGateway = ip.NextIP(podCIDR.IP) - podCIDR2Gateway = ip.NextIP(podCIDR2.IP) - podCIDR3Gateway = ip.NextIP(podCIDR3.IP) - nodeIP1 = net.ParseIP("10.10.10.10") - dsIPs1 = utilip.DualStackIPs{IPv4: nodeIP1} - nodeIP2 = net.ParseIP("10.10.10.11") - dsIPs2 = utilip.DualStackIPs{IPv4: nodeIP2} - nodeIP3 = net.ParseIP("2001:db8::68") - dsIPs3 = utilip.DualStackIPs{IPv6: nodeIP3} + gatewayMAC, _ = net.ParseMAC("00:00:00:00:00:01") + // podCIDRs of "local" Node + _, podCIDR, _ = net.ParseCIDR("1.1.0.0/24") + _, podCIDRv6, _ = net.ParseCIDR("2001:4860:0000::/48") + // podCIDRs of "remote" Node + _, podCIDR1, _ = net.ParseCIDR("1.1.1.0/24") + _, podCIDR1v6, _ = net.ParseCIDR("2001:4860:0001::/48") + podCIDR1Gateway = util.GetGatewayIPForPodCIDR(podCIDR1) + podCIDR1v6Gateway = util.GetGatewayIPForPodCIDR(podCIDR1v6) + nodeIP1 = net.ParseIP("10.10.10.10") + dsIPs1 = utilip.DualStackIPs{IPv4: nodeIP1} + nodeIP2 = net.ParseIP("10.10.10.11") + dsIPs2 = utilip.DualStackIPs{IPv4: nodeIP2} node1 = &corev1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: "node1", }, Spec: corev1.NodeSpec{ - PodCIDR: podCIDR.String(), - PodCIDRs: []string{podCIDR.String()}, + PodCIDR: podCIDR1.String(), + PodCIDRs: []string{podCIDR1.String(), podCIDR1v6.String()}, }, Status: corev1.NodeStatus{ Addresses: []corev1.NodeAddress{ @@ -76,6 +76,15 @@ var ( }, }, } + + nodeConfig = &config.NodeConfig{ + PodIPv4CIDR: podCIDR, + PodIPv6CIDR: podCIDRv6, + GatewayConfig: &config.GatewayConfig{ + IPv4: nil, + MAC: gatewayMAC, + }, + } ) type fakeController struct { @@ -96,7 +105,7 @@ func (f *fakeIPsecCertificateManager) HasSynced() bool { return true } -func newController(t *testing.T, networkConfig *config.NetworkConfig, objects ...runtime.Object) *fakeController { +func newController(t testing.TB, networkConfig *config.NetworkConfig, objects ...runtime.Object) *fakeController { clientset := fake.NewSimpleClientset(objects...) informerFactory := informers.NewSharedInformerFactory(clientset, 12*time.Hour) ctrl := gomock.NewController(t) @@ -107,10 +116,11 @@ func newController(t *testing.T, networkConfig *config.NetworkConfig, objects .. ipsecCertificateManager := &fakeIPsecCertificateManager{} ovsCtlClient := ovsctltest.NewMockOVSCtlClient(ctrl) wireguardClient := wgtest.NewMockInterface(ctrl) - c := NewNodeRouteController(informerFactory.Core().V1().Nodes(), ofClient, ovsCtlClient, ovsClient, routeClient, interfaceStore, networkConfig, &config.NodeConfig{GatewayConfig: &config.GatewayConfig{ - IPv4: nil, - MAC: gatewayMAC, - }}, wireguardClient, ipsecCertificateManager, utilwait.NewGroup()) + c := NewNodeRouteController(informerFactory.Core().V1().Nodes(), ofClient, ovsCtlClient, ovsClient, routeClient, interfaceStore, networkConfig, nodeConfig, wireguardClient, ipsecCertificateManager, utilwait.NewGroup()) + require.Equal(t, 24, c.maskSizeV4) + require.Equal(t, 48, c.maskSizeV6) + // Check that the podSubnets set already includes local PodCIDRs. + require.True(t, c.podSubnets.HasAll(netip.MustParsePrefix(podCIDR.String()), netip.MustParsePrefix(podCIDRv6.String()))) return &fakeController{ Controller: c, clientset: clientset, @@ -136,13 +146,13 @@ func TestControllerWithDuplicatePodCIDR(t *testing.T) { // resourceVersion. A watcher of fake clientset only gets events that happen after the watcher is created. c.informerFactory.WaitForCacheSync(stopCh) - node2 := &corev1.Node{ + otherNode := &corev1.Node{ ObjectMeta: metav1.ObjectMeta{ - Name: "node2", + Name: "otherNode", }, Spec: corev1.NodeSpec{ - PodCIDR: podCIDR.String(), - PodCIDRs: []string{podCIDR.String()}, + PodCIDR: podCIDR1.String(), + PodCIDRs: []string{podCIDR1.String()}, }, Status: corev1.NodeStatus{ Addresses: []corev1.NodeAddress{ @@ -160,22 +170,24 @@ func TestControllerWithDuplicatePodCIDR(t *testing.T) { c.clientset.CoreV1().Nodes().Create(context.TODO(), node1, metav1.CreateOptions{}) c.ofClient.EXPECT().InstallNodeFlows("node1", gomock.Any(), &dsIPs1, uint32(0), nil).Times(1) - c.routeClient.EXPECT().AddRoutes(podCIDR, "node1", nodeIP1, podCIDRGateway).Times(1) + c.routeClient.EXPECT().AddRoutes(podCIDR1, "node1", nodeIP1, podCIDR1Gateway).Times(1) + c.routeClient.EXPECT().AddRoutes(podCIDR1v6, "node1", nil, podCIDR1v6Gateway).Times(1) c.processNextWorkItem() - // Since node1 is not deleted yet, routes and flows for node2 shouldn't be installed as its PodCIDR is duplicate. - c.clientset.CoreV1().Nodes().Create(context.TODO(), node2, metav1.CreateOptions{}) + // Since node1 is not deleted yet, routes and flows for otherNode shouldn't be installed as its PodCIDR is duplicate. + c.clientset.CoreV1().Nodes().Create(context.TODO(), otherNode, metav1.CreateOptions{}) c.processNextWorkItem() // node1 is deleted, its routes and flows should be deleted. c.clientset.CoreV1().Nodes().Delete(context.TODO(), node1.Name, metav1.DeleteOptions{}) c.ofClient.EXPECT().UninstallNodeFlows("node1").Times(1) - c.routeClient.EXPECT().DeleteRoutes(podCIDR).Times(1) + c.routeClient.EXPECT().DeleteRoutes(podCIDR1).Times(1) + c.routeClient.EXPECT().DeleteRoutes(podCIDR1v6).Times(1) c.processNextWorkItem() - // After node1 is deleted, routes and flows should be installed for node2 successfully. - c.ofClient.EXPECT().InstallNodeFlows("node2", gomock.Any(), &dsIPs2, uint32(0), nil).Times(1) - c.routeClient.EXPECT().AddRoutes(podCIDR, "node2", nodeIP2, podCIDRGateway).Times(1) + // After node1 is deleted, routes and flows should be installed for otherNode successfully. + c.ofClient.EXPECT().InstallNodeFlows("otherNode", gomock.Any(), &dsIPs2, uint32(0), nil).Times(1) + c.routeClient.EXPECT().AddRoutes(podCIDR1, "otherNode", nodeIP2, podCIDR1Gateway).Times(1) c.processNextWorkItem() }() @@ -186,7 +198,7 @@ func TestControllerWithDuplicatePodCIDR(t *testing.T) { } } -func TestIPInPodSubnets(t *testing.T) { +func TestLookupIPInPodSubnets(t *testing.T) { c := newController(t, &config.NetworkConfig{}) defer c.queue.ShutDown() @@ -197,64 +209,92 @@ func TestIPInPodSubnets(t *testing.T) { // in-between list and watch call of an informer. This is because fake clientset doesn't support watching with // resourceVersion. A watcher of fake clientset only gets events that happen after the watcher is created. c.informerFactory.WaitForCacheSync(stopCh) - c.Controller.nodeConfig.PodIPv4CIDR = podCIDR - c.Controller.nodeConfig.PodIPv6CIDR = podCIDR3 - node2 := &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "node2", - }, - Spec: corev1.NodeSpec{ - PodCIDR: podCIDR2.String(), - PodCIDRs: []string{podCIDR2.String()}, + c.clientset.CoreV1().Nodes().Create(context.TODO(), node1, metav1.CreateOptions{}) + c.ofClient.EXPECT().InstallNodeFlows("node1", gomock.Any(), &dsIPs1, uint32(0), nil).Times(1) + c.routeClient.EXPECT().AddRoutes(podCIDR1, "node1", nodeIP1, podCIDR1Gateway).Times(1) + c.routeClient.EXPECT().AddRoutes(podCIDR1v6, "node1", nil, podCIDR1v6Gateway).Times(1) + c.processNextWorkItem() + + testCases := []struct { + name string + ips []string + isInPodSubnets bool + isGwIP bool + }{ + { + name: "local gateway IP", + ips: []string{"1.1.0.1", "2001:4860:0000::1"}, + isInPodSubnets: true, + isGwIP: true, }, - Status: corev1.NodeStatus{ - Addresses: []corev1.NodeAddress{ - { - Type: corev1.NodeInternalIP, - Address: nodeIP2.String(), - }, - }, + { + name: "local Pod IP", + ips: []string{"1.1.0.101", "2001:4860:0000::101"}, + isInPodSubnets: true, + isGwIP: false, }, - } - node3 := &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "node3", + { + name: "remote gateway IP", + ips: []string{"1.1.1.1", "2001:4860:0001::1"}, + isInPodSubnets: true, + isGwIP: true, }, - Spec: corev1.NodeSpec{ - PodCIDR: podCIDR3.String(), - PodCIDRs: []string{podCIDR3.String()}, + { + name: "remote Pod IP", + ips: []string{"1.1.1.101", "2001:4860:0001::101"}, + isInPodSubnets: true, + isGwIP: false, }, - Status: corev1.NodeStatus{ - Addresses: []corev1.NodeAddress{ - { - Type: corev1.NodeInternalIP, - Address: nodeIP3.String(), - }, - }, + { + name: "unknown IPs", + ips: []string{"1.1.10.101", "2001:4860:0010::101"}, + isInPodSubnets: false, + isGwIP: false, }, } - c.clientset.CoreV1().Nodes().Create(context.TODO(), node1, metav1.CreateOptions{}) - c.ofClient.EXPECT().InstallNodeFlows("node1", gomock.Any(), &dsIPs1, uint32(0), nil).Times(1) - c.routeClient.EXPECT().AddRoutes(podCIDR, "node1", nodeIP1, podCIDRGateway).Times(1) - c.processNextWorkItem() + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + for _, ip := range tc.ips { + isInPodSubnets, isGwIP := c.Controller.LookupIPInPodSubnets(netip.MustParseAddr(ip)) + assert.Equal(t, tc.isInPodSubnets, isInPodSubnets) + assert.Equal(t, tc.isGwIP, isGwIP) + } + }) + } +} - c.clientset.CoreV1().Nodes().Create(context.TODO(), node2, metav1.CreateOptions{}) - c.ofClient.EXPECT().InstallNodeFlows("node2", gomock.Any(), &dsIPs2, uint32(0), nil).Times(1) - c.routeClient.EXPECT().AddRoutes(podCIDR2, "node2", nodeIP2, podCIDR2Gateway).Times(1) - c.processNextWorkItem() +func BenchmarkLookupIPInPodSubnets(b *testing.B) { + c := newController(b, &config.NetworkConfig{}) + defer c.queue.ShutDown() - c.clientset.CoreV1().Nodes().Create(context.TODO(), node3, metav1.CreateOptions{}) - c.ofClient.EXPECT().InstallNodeFlows("node3", gomock.Any(), &dsIPs3, uint32(0), nil).Times(1) - c.routeClient.EXPECT().AddRoutes(podCIDR3, "node3", nodeIP3, podCIDR3Gateway).Times(1) + stopCh := make(chan struct{}) + defer close(stopCh) + c.informerFactory.Start(stopCh) + // Must wait for cache sync, otherwise resource creation events will be missing if the resources are created + // in-between list and watch call of an informer. This is because fake clientset doesn't support watching with + // resourceVersion. A watcher of fake clientset only gets events that happen after the watcher is created. + c.informerFactory.WaitForCacheSync(stopCh) + + c.clientset.CoreV1().Nodes().Create(context.TODO(), node1, metav1.CreateOptions{}) + c.ofClient.EXPECT().InstallNodeFlows("node1", gomock.Any(), &dsIPs1, uint32(0), nil).Times(1) + c.routeClient.EXPECT().AddRoutes(podCIDR1, "node1", nodeIP1, podCIDR1Gateway).Times(1) + c.routeClient.EXPECT().AddRoutes(podCIDR1v6, "node1", nil, podCIDR1v6Gateway).Times(1) c.processNextWorkItem() - assert.Equal(t, true, c.Controller.IPInPodSubnets(net.ParseIP("1.1.1.1"))) - assert.Equal(t, true, c.Controller.IPInPodSubnets(net.ParseIP("2001:4860:4860::8889"))) - assert.Equal(t, true, c.Controller.IPInPodSubnets(net.ParseIP("1.1.2.1"))) - assert.Equal(t, false, c.Controller.IPInPodSubnets(net.ParseIP("10.10.10.10"))) - assert.Equal(t, false, c.Controller.IPInPodSubnets(net.ParseIP("8.8.8.8"))) + localPodIP := netip.MustParseAddr("1.1.0.99") + remotePodIP := netip.MustParseAddr("1.1.1.99") + remoteGatewayIP := netip.MustParseAddr("1.1.1.1") + unknownIP := netip.MustParseAddr("1.1.2.99") + + b.ResetTimer() + for range b.N { + c.Controller.findPodSubnetForIP(localPodIP) + c.Controller.findPodSubnetForIP(remotePodIP) + c.Controller.findPodSubnetForIP(remoteGatewayIP) + c.Controller.findPodSubnetForIP(unknownIP) + } } func setup(t *testing.T, ifaces []*interfacestore.InterfaceConfig, authenticationMode config.IPsecAuthenticationMode) *fakeController { @@ -300,8 +340,8 @@ func TestRemoveStaleTunnelPorts(t *testing.T) { Name: "xyz-k8s-0-1", }, Spec: corev1.NodeSpec{ - PodCIDR: podCIDR.String(), - PodCIDRs: []string{podCIDR.String()}, + PodCIDR: podCIDR1.String(), + PodCIDRs: []string{podCIDR1.String()}, }, Status: corev1.NodeStatus{ Addresses: []corev1.NodeAddress{ @@ -630,7 +670,7 @@ func TestRemoveStaleGatewayRoutes(t *testing.T) { c.informerFactory.Start(stopCh) c.informerFactory.WaitForCacheSync(stopCh) - c.routeClient.EXPECT().Reconcile([]string{podCIDR.String()}) + c.routeClient.EXPECT().Reconcile([]string{podCIDR1.String(), podCIDR1v6.String()}) err := c.removeStaleGatewayRoutes() assert.NoError(t, err) } @@ -690,7 +730,7 @@ func TestDeleteNodeRoute(t *testing.T) { expectedCalls: func(ovsClient *ovsconfigtest.MockOVSBridgeClient, routeClient *routetest.MockInterface, ofClient *oftest.MockClient, wgClient *wgtest.MockInterface) { ovsClient.EXPECT().DeletePort("123") - routeClient.EXPECT().DeleteRoutes(podCIDR) + routeClient.EXPECT().DeleteRoutes(podCIDR1) ofClient.EXPECT().UninstallNodeFlows(node1.Name) }, }, @@ -700,7 +740,7 @@ func TestDeleteNodeRoute(t *testing.T) { mode: config.TrafficEncryptionModeWireGuard, expectedCalls: func(ovsClient *ovsconfigtest.MockOVSBridgeClient, routeClient *routetest.MockInterface, ofClient *oftest.MockClient, wgClient *wgtest.MockInterface) { - routeClient.EXPECT().DeleteRoutes(podCIDR) + routeClient.EXPECT().DeleteRoutes(podCIDR1) ofClient.EXPECT().UninstallNodeFlows(nodeWithWireGuard.Name) wgClient.EXPECT().DeletePeer(nodeWithWireGuard.Name) }, @@ -714,7 +754,7 @@ func TestDeleteNodeRoute(t *testing.T) { }, tt.node) c.installedNodes.Add(&nodeRouteInfo{ nodeName: tt.node.Name, - podCIDRs: []*net.IPNet{podCIDR}, + podCIDRs: []*net.IPNet{podCIDR1}, }) defer c.queue.ShutDown() @@ -747,7 +787,8 @@ func TestInitialListHasSynced(t *testing.T) { require.Error(t, c.flowRestoreCompleteWait.WaitWithTimeout(100*time.Millisecond)) c.ofClient.EXPECT().InstallNodeFlows("node1", gomock.Any(), &dsIPs1, uint32(0), nil).Times(1) - c.routeClient.EXPECT().AddRoutes(podCIDR, "node1", nodeIP1, podCIDRGateway).Times(1) + c.routeClient.EXPECT().AddRoutes(podCIDR1, "node1", nodeIP1, podCIDR1Gateway).Times(1) + c.routeClient.EXPECT().AddRoutes(podCIDR1v6, "node1", nil, podCIDR1v6Gateway).Times(1) c.processNextWorkItem() assert.True(t, c.hasProcessedInitialList.HasSynced()) @@ -760,7 +801,7 @@ func TestInitialListHasSyncedStopChClosedEarly(t *testing.T) { c.informerFactory.Start(stopCh) c.informerFactory.WaitForCacheSync(stopCh) - c.routeClient.EXPECT().Reconcile([]string{podCIDR.String()}) + c.routeClient.EXPECT().Reconcile([]string{podCIDR1.String(), podCIDR1v6.String()}) // We close the stopCh right away, then call Run synchronously and wait for it to // return. The workers will not even start, and the initial list of Nodes should never be diff --git a/pkg/agent/flowexporter/exporter/exporter.go b/pkg/agent/flowexporter/exporter/exporter.go index 9abfa5d7600..6974ec41336 100644 --- a/pkg/agent/flowexporter/exporter/exporter.go +++ b/pkg/agent/flowexporter/exporter/exporter.go @@ -26,6 +26,7 @@ import ( ipfixregistry "github.com/vmware/go-ipfix/pkg/registry" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/config" @@ -34,7 +35,6 @@ import ( "antrea.io/antrea/pkg/agent/flowexporter/connections" "antrea.io/antrea/pkg/agent/flowexporter/priorityqueue" "antrea.io/antrea/pkg/agent/metrics" - "antrea.io/antrea/pkg/agent/openflow" "antrea.io/antrea/pkg/agent/proxy" "antrea.io/antrea/pkg/features" "antrea.io/antrea/pkg/ipfix" @@ -225,6 +225,14 @@ func (exp *FlowExporter) Run(stopCh <-chan struct{}) { // Start the goroutine to poll conntrack flows. go exp.conntrackConnStore.Run(stopCh) + if exp.nodeRouteController != nil { + // Wait for NodeRouteController to have processed the initial list of Nodes so that + // the list of Pod subnets is up-to-date. + if !cache.WaitForCacheSync(stopCh, exp.nodeRouteController.HasSynced) { + return + } + } + defaultTimeout := exp.conntrackPriorityQueue.ActiveFlowTimeout expireTimer := time.NewTimer(defaultTimeout) for { @@ -608,10 +616,20 @@ func (exp *FlowExporter) sendDataSet() (int, error) { if err != nil { return 0, fmt.Errorf("error when sending data set: %v", err) } - klog.V(4).InfoS("Data set sent successfully", "Bytes sent", sentBytes) + if klog.V(5).Enabled() { + klog.InfoS("Data set sent successfully", "Bytes sent", sentBytes) + } return sentBytes, nil } +const ( + // flowTypeUnknown indicates that we are unable to determine the flow type. + flowTypeUnknown = uint8(0) + // flowTypeUnsupported indicates that this type of flow is not supported and that we should + // skip exporting it. + flowTypeUnsupported = uint8(0xff) +) + func (exp *FlowExporter) findFlowType(conn flowexporter.Connection) uint8 { // TODO: support Pod-To-External flows in network policy only mode. if exp.isNetworkPolicyOnly { @@ -622,21 +640,34 @@ func (exp *FlowExporter) findFlowType(conn flowexporter.Connection) uint8 { } if exp.nodeRouteController == nil { - klog.V(4).InfoS("Can't find flowType without nodeRouteController") - return 0 + if klog.V(5).Enabled() { + klog.InfoS("Can't find flowType without nodeRouteController") + } + return flowTypeUnknown + } + srcIsPod, srcIsGw := exp.nodeRouteController.LookupIPInPodSubnets(conn.FlowKey.SourceAddress) + dstIsPod, dstIsGw := exp.nodeRouteController.LookupIPInPodSubnets(conn.FlowKey.DestinationAddress) + if srcIsGw || dstIsGw { + // This matches what we do in filterAntreaConns, but not sure whether this is the right thing to do. + // This ignores all flows to / from hostNetwork Pods. + if klog.V(5).Enabled() { + klog.InfoS("Flows where the source or destination IP is a gateway IP are not exported") + } + return flowTypeUnsupported } - if exp.nodeRouteController.IPInPodSubnets(conn.FlowKey.SourceAddress.AsSlice()) { - if conn.Mark&openflow.ServiceCTMark.GetRange().ToNXRange().ToUint32Mask() == openflow.ServiceCTMark.GetValue() || exp.nodeRouteController.IPInPodSubnets(conn.FlowKey.DestinationAddress.AsSlice()) { - if conn.SourcePodName == "" || conn.DestinationPodName == "" { - return ipfixregistry.FlowTypeInterNode - } - return ipfixregistry.FlowTypeIntraNode + if !srcIsPod { + if klog.V(5).Enabled() { + klog.InfoS("Flows where the source is not a Pod are not exported") } + return flowTypeUnsupported + } + if !dstIsPod { return ipfixregistry.FlowTypeToExternal } - // We do not support External-To-Pod flows for now. - klog.Warningf("Source IP: %s doesn't exist in PodCIDRs", conn.FlowKey.SourceAddress.String()) - return 0 + if conn.SourcePodName == "" || conn.DestinationPodName == "" { + return ipfixregistry.FlowTypeInterNode + } + return ipfixregistry.FlowTypeIntraNode } func (exp *FlowExporter) fillEgressInfo(conn *flowexporter.Connection) { @@ -648,11 +679,16 @@ func (exp *FlowExporter) fillEgressInfo(conn *flowexporter.Connection) { conn.EgressName = egressName conn.EgressIP = egressIP conn.EgressNodeName = egressNodeName - klog.V(4).InfoS("Filling Egress Info for flow", "Egress", conn.EgressName, "EgressIP", conn.EgressIP, "EgressNode", conn.EgressNodeName, "SourcePod", klog.KRef(conn.SourcePodNamespace, conn.SourcePodName)) + if klog.V(5).Enabled() { + klog.InfoS("Filling Egress Info for flow", "Egress", conn.EgressName, "EgressIP", conn.EgressIP, "EgressNode", conn.EgressNodeName, "SourcePod", klog.KRef(conn.SourcePodNamespace, conn.SourcePodName)) + } } func (exp *FlowExporter) exportConn(conn *flowexporter.Connection) error { conn.FlowType = exp.findFlowType(*conn) + if conn.FlowType == flowTypeUnsupported { + return nil + } if conn.FlowType == ipfixregistry.FlowTypeToExternal { if conn.SourcePodNamespace != "" && conn.SourcePodName != "" { exp.fillEgressInfo(conn) @@ -669,7 +705,9 @@ func (exp *FlowExporter) exportConn(conn *flowexporter.Connection) error { return err } exp.numDataSetsSent = exp.numDataSetsSent + 1 - klog.V(4).InfoS("Record for connection sent successfully", "flowKey", conn.FlowKey, "connection", conn) + if klog.V(5).Enabled() { + klog.InfoS("Record for connection sent successfully", "flowKey", conn.FlowKey, "connection", conn) + } return nil } diff --git a/pkg/agent/util/net.go b/pkg/agent/util/net.go index 14d43757bff..55855faf0b0 100644 --- a/pkg/agent/util/net.go +++ b/pkg/agent/util/net.go @@ -23,13 +23,15 @@ import ( "io" "math" "net" + "net/netip" "strings" + "github.com/containernetworking/plugins/pkg/ip" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" utilnet "k8s.io/utils/net" - "antrea.io/antrea/pkg/util/ip" + utilip "antrea.io/antrea/pkg/util/ip" ) const ( @@ -137,7 +139,7 @@ func listenUnix(address string) (net.Listener, error) { // GetIPNetDeviceFromIP returns local IPs/masks and associated device from IP, and ignores the interfaces which have // names in the ignoredInterfaces. -func GetIPNetDeviceFromIP(localIPs *ip.DualStackIPs, ignoredInterfaces sets.Set[string]) (v4IPNet *net.IPNet, v6IPNet *net.IPNet, iface *net.Interface, err error) { +func GetIPNetDeviceFromIP(localIPs *utilip.DualStackIPs, ignoredInterfaces sets.Set[string]) (v4IPNet *net.IPNet, v6IPNet *net.IPNet, iface *net.Interface, err error) { linkList, err := netInterfaces() if err != nil { return nil, nil, nil, err @@ -438,3 +440,14 @@ func GenerateOVSDatapathID(macString string) string { } return "0000" + strings.Replace(macString, ":", "", -1) } + +// GetGatewayIPForPodCIDR returns the gateway IP for a given Pod CIDR. +func GetGatewayIPForPodCIDR(cidr *net.IPNet) net.IP { + return ip.NextIP(cidr.IP.Mask(cidr.Mask)) +} + +// GetGatewayIPForPodPrefix acts like GetGatewayIPForPodCIDR but takes a netip.Prefix as a parameter +// and returns a netip.Addr value. +func GetGatewayIPForPodPrefix(prefix netip.Prefix) netip.Addr { + return prefix.Masked().Addr().Next() +} diff --git a/pkg/agent/util/net_windows.go b/pkg/agent/util/net_windows.go index 8e9c0ca534d..7183fef7c73 100644 --- a/pkg/agent/util/net_windows.go +++ b/pkg/agent/util/net_windows.go @@ -28,7 +28,6 @@ import ( "github.com/Microsoft/go-winio" "github.com/Microsoft/hcsshim" - "github.com/containernetworking/plugins/pkg/ip" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" @@ -64,7 +63,7 @@ func GetNSPath(containerNetNS string) (string, error) { func CreateHNSNetwork(hnsNetName string, subnetCIDR *net.IPNet, nodeIP *net.IPNet, adapter *net.Interface) (*hcsshim.HNSNetwork, error) { adapterMAC := adapter.HardwareAddr adapterName := adapter.Name - gateway := ip.NextIP(subnetCIDR.IP.Mask(subnetCIDR.Mask)) + gateway := GetGatewayIPForPodCIDR(subnetCIDR) network := &hcsshim.HNSNetwork{ Name: hnsNetName, Type: HNSNetworkType,