diff --git a/cmd/antrea-agent/options.go b/cmd/antrea-agent/options.go index 4b08d4435e9..3aa9538ea5c 100644 --- a/cmd/antrea-agent/options.go +++ b/cmd/antrea-agent/options.go @@ -275,7 +275,10 @@ func (o *Options) validateAntreaProxyConfig(encapMode config.TrafficEncapModeTyp } func (o *Options) validateFlowExporterConfig() error { - if features.DefaultFeatureGate.Enabled(features.FlowExporter) { + if features.DefaultFeatureGate.Enabled(features.FlowExporter) && o.config.FlowExporter.Enable { + if features.DefaultFeatureGate.Enabled(features.AntreaIPAM) { + klog.InfoS("The FlowExporter feature does not support AntreaIPAM Pods") + } host, port, proto, err := flowexport.ParseFlowCollectorAddr(o.config.FlowExporter.FlowCollectorAddr, defaultFlowCollectorPort, defaultFlowCollectorTransport) if err != nil { return err diff --git a/pkg/agent/controller/noderoute/node_route_controller.go b/pkg/agent/controller/noderoute/node_route_controller.go index e64aa79ef01..a029b8f9119 100644 --- a/pkg/agent/controller/noderoute/node_route_controller.go +++ b/pkg/agent/controller/noderoute/node_route_controller.go @@ -18,11 +18,14 @@ import ( "context" "fmt" "net" + "net/netip" + "sync" "time" "github.com/containernetworking/plugins/pkg/ip" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" coreinformers "k8s.io/client-go/informers/core/v1" corelisters "k8s.io/client-go/listers/core/v1" @@ -77,7 +80,13 @@ type Controller struct { // installedNodes records routes and flows installation states of Nodes. // The key is the host name of the Node, the value is the nodeRouteInfo of the Node. // A node will be in the map after its flows and routes are installed successfully. - installedNodes cache.Indexer + installedNodes cache.Indexer + // podSubnetsMutex protects access to the podSubnets set. + podSubnetsMutex sync.RWMutex + // podSubnets is a set which stores all known PodCIDRs in the cluster as masked netip.Prefix objects. + podSubnets sets.Set[netip.Prefix] + maskSizeV4 int + maskSizeV6 int wireGuardClient wireguard.Interface // ipsecCertificateManager is useful for determining whether the ipsec certificate has been configured // or not when IPsec is enabled with "cert" mode. The NodeRouteController must wait for the certificate @@ -124,10 +133,21 @@ func NewNodeRouteController( }, ), installedNodes: cache.NewIndexer(nodeRouteInfoKeyFunc, cache.Indexers{nodeRouteInfoPodCIDRIndexName: nodeRouteInfoPodCIDRIndexFunc}), + podSubnets: sets.New[netip.Prefix](), wireGuardClient: wireguardClient, ipsecCertificateManager: ipsecCertificateManager, flowRestoreCompleteWait: flowRestoreCompleteWait.Increment(), } + if nodeConfig.PodIPv4CIDR != nil { + prefix, _ := cidrToPrefix(nodeConfig.PodIPv4CIDR) + controller.podSubnets.Insert(prefix) + controller.maskSizeV4 = prefix.Bits() + } + if nodeConfig.PodIPv6CIDR != nil { + prefix, _ := cidrToPrefix(nodeConfig.PodIPv6CIDR) + controller.podSubnets.Insert(prefix) + controller.maskSizeV6 = prefix.Bits() + } registration, _ := nodeInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerDetailedFuncs{ AddFunc: func(cur interface{}, isInInitialList bool) { @@ -379,7 +399,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) { go func() { // When the initial list of Nodes has been processed, we decrement flowRestoreCompleteWait. err := wait.PollUntilContextCancel(wait.ContextForChannel(stopCh), 100*time.Millisecond, true, func(ctx context.Context) (done bool, err error) { - return c.hasProcessedInitialList.HasSynced(), nil + return c.HasSynced(), nil }) // An error here means the context has been cancelled, which means that the stopCh // has been closed. While it is still possible for c.hasProcessedInitialList.HasSynced @@ -395,6 +415,11 @@ func (c *Controller) Run(stopCh <-chan struct{}) { <-stopCh } +// HasSynced returns true when the initial list of Nodes has been processed by the controller. +func (c *Controller) HasSynced() bool { + return c.hasProcessedInitialList.HasSynced() +} + // worker is a long-running function that will continually call the processNextWorkItem function in // order to read and process a message on the workqueue. func (c *Controller) worker() { @@ -482,6 +507,12 @@ func (c *Controller) deleteNodeRoute(nodeName string) error { return fmt.Errorf("failed to uninstall flows to Node %s: %v", nodeName, err) } c.installedNodes.Delete(obj) + func() { + subnets, _ := cidrsToPrefixes(nodeRouteInfo.podCIDRs) + c.podSubnetsMutex.Lock() + defer c.podSubnetsMutex.Unlock() + c.podSubnets.Delete(subnets...) + }() if c.networkConfig.TrafficEncryptionMode == config.TrafficEncryptionModeIPSec { interfaceConfig, ok := c.interfaceStore.GetNodeTunnelInterface(nodeName) @@ -575,6 +606,13 @@ func (c *Controller) addNodeRoute(nodeName string, node *corev1.Node) error { peerNodeIP = peerNodeIPs.IPv6 } + func() { + subnet, _ := cidrToPrefix(peerPodCIDR) + c.podSubnetsMutex.Lock() + defer c.podSubnetsMutex.Unlock() + c.podSubnets.Insert(subnet) + }() + klog.InfoS("Adding route and flow to Node", "Node", nodeName, "podCIDR", podCIDR, "peerNodeIP", peerNodeIP) } @@ -773,40 +811,31 @@ func ParseTunnelInterfaceConfig( return interfaceConfig } -func (c *Controller) IPInPodSubnets(ip net.IP) bool { - var ipCIDR *net.IPNet - var curNodeCIDRStr string - if ip.To4() != nil { - var podIPv4CIDRMaskSize int - if c.nodeConfig.PodIPv4CIDR != nil { - curNodeCIDRStr = c.nodeConfig.PodIPv4CIDR.String() - podIPv4CIDRMaskSize, _ = c.nodeConfig.PodIPv4CIDR.Mask.Size() - } else { - return false - } - v4Mask := net.CIDRMask(podIPv4CIDRMaskSize, utilip.V4BitLen) - ipCIDR = &net.IPNet{ - IP: ip.Mask(v4Mask), - Mask: v4Mask, - } - +func (c *Controller) findPodSubnetForIP(ip netip.Addr) (netip.Prefix, bool) { + var maskSize int + if ip.Is4() { + maskSize = c.maskSizeV4 } else { - var podIPv6CIDRMaskSize int - if c.nodeConfig.PodIPv6CIDR != nil { - curNodeCIDRStr = c.nodeConfig.PodIPv6CIDR.String() - podIPv6CIDRMaskSize, _ = c.nodeConfig.PodIPv6CIDR.Mask.Size() - } else { - return false - } - v6Mask := net.CIDRMask(podIPv6CIDRMaskSize, utilip.V6BitLen) - ipCIDR = &net.IPNet{ - IP: ip.Mask(v6Mask), - Mask: v6Mask, - } + maskSize = c.maskSizeV6 } - ipCIDRStr := ipCIDR.String() - nodeInCluster, _ := c.installedNodes.ByIndex(nodeRouteInfoPodCIDRIndexName, ipCIDRStr) - return len(nodeInCluster) > 0 || ipCIDRStr == curNodeCIDRStr + if maskSize == 0 { + return netip.Prefix{}, false + } + prefix, _ := ip.Prefix(maskSize) + c.podSubnetsMutex.RLock() + defer c.podSubnetsMutex.RUnlock() + return prefix, c.podSubnets.Has(prefix) +} + +// LookupIPInPodSubnets returns two boolean values. The first one indicates whether the IP can be +// found in a PodCIDR for one of the cluster Nodes. The second one indicates whether the IP is used +// as a gateway IP. The second boolean value can only be true if the first one is true. +func (c *Controller) LookupIPInPodSubnets(ip netip.Addr) (bool, bool) { + prefix, ok := c.findPodSubnetForIP(ip) + if !ok { + return false, false + } + return ok, ip == util.GetGatewayIPForPodPrefix(prefix) } // getNodeMAC gets Node's br-int MAC from its annotation. It is only for Windows Noencap mode. @@ -821,3 +850,24 @@ func getNodeMAC(node *corev1.Node) (net.HardwareAddr, error) { } return mac, nil } + +func cidrToPrefix(cidr *net.IPNet) (netip.Prefix, error) { + addr, ok := netip.AddrFromSlice(cidr.IP) + if !ok { + return netip.Prefix{}, fmt.Errorf("invalid IP in CIDR: %v", cidr) + } + size, _ := cidr.Mask.Size() + return addr.Prefix(size) +} + +func cidrsToPrefixes(cidrs []*net.IPNet) ([]netip.Prefix, error) { + prefixes := make([]netip.Prefix, len(cidrs)) + for idx := range cidrs { + prefix, err := cidrToPrefix(cidrs[idx]) + if err != nil { + return nil, err + } + prefixes[idx] = prefix + } + return prefixes, nil +} diff --git a/pkg/agent/controller/noderoute/node_route_controller_test.go b/pkg/agent/controller/noderoute/node_route_controller_test.go index 315b359d635..6e5d7c84a0f 100644 --- a/pkg/agent/controller/noderoute/node_route_controller_test.go +++ b/pkg/agent/controller/noderoute/node_route_controller_test.go @@ -17,10 +17,10 @@ package noderoute import ( "context" "net" + "net/netip" "testing" "time" - "github.com/containernetworking/plugins/pkg/ip" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" @@ -45,27 +45,27 @@ import ( ) var ( - gatewayMAC, _ = net.ParseMAC("00:00:00:00:00:01") - _, podCIDR, _ = net.ParseCIDR("1.1.1.0/24") - _, podCIDR2, _ = net.ParseCIDR("1.1.2.0/24") - _, podCIDR3, _ = net.ParseCIDR("2001:4860:4860::8888/32") - podCIDRGateway = ip.NextIP(podCIDR.IP) - podCIDR2Gateway = ip.NextIP(podCIDR2.IP) - podCIDR3Gateway = ip.NextIP(podCIDR3.IP) - nodeIP1 = net.ParseIP("10.10.10.10") - dsIPs1 = utilip.DualStackIPs{IPv4: nodeIP1} - nodeIP2 = net.ParseIP("10.10.10.11") - dsIPs2 = utilip.DualStackIPs{IPv4: nodeIP2} - nodeIP3 = net.ParseIP("2001:db8::68") - dsIPs3 = utilip.DualStackIPs{IPv6: nodeIP3} + gatewayMAC, _ = net.ParseMAC("00:00:00:00:00:01") + // podCIDRs of "local" Node + _, podCIDR, _ = net.ParseCIDR("1.1.0.0/24") + _, podCIDRv6, _ = net.ParseCIDR("2001:4860:0000::/48") + // podCIDRs of "remote" Node + _, podCIDR1, _ = net.ParseCIDR("1.1.1.0/24") + _, podCIDR1v6, _ = net.ParseCIDR("2001:4860:0001::/48") + podCIDR1Gateway = util.GetGatewayIPForPodCIDR(podCIDR1) + podCIDR1v6Gateway = util.GetGatewayIPForPodCIDR(podCIDR1v6) + nodeIP1 = net.ParseIP("10.10.10.10") + dsIPs1 = utilip.DualStackIPs{IPv4: nodeIP1} + nodeIP2 = net.ParseIP("10.10.10.11") + dsIPs2 = utilip.DualStackIPs{IPv4: nodeIP2} node1 = &corev1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: "node1", }, Spec: corev1.NodeSpec{ - PodCIDR: podCIDR.String(), - PodCIDRs: []string{podCIDR.String()}, + PodCIDR: podCIDR1.String(), + PodCIDRs: []string{podCIDR1.String(), podCIDR1v6.String()}, }, Status: corev1.NodeStatus{ Addresses: []corev1.NodeAddress{ @@ -76,6 +76,15 @@ var ( }, }, } + + nodeConfig = &config.NodeConfig{ + PodIPv4CIDR: podCIDR, + PodIPv6CIDR: podCIDRv6, + GatewayConfig: &config.GatewayConfig{ + IPv4: nil, + MAC: gatewayMAC, + }, + } ) type fakeController struct { @@ -96,7 +105,7 @@ func (f *fakeIPsecCertificateManager) HasSynced() bool { return true } -func newController(t *testing.T, networkConfig *config.NetworkConfig, objects ...runtime.Object) *fakeController { +func newController(t testing.TB, networkConfig *config.NetworkConfig, objects ...runtime.Object) *fakeController { clientset := fake.NewSimpleClientset(objects...) informerFactory := informers.NewSharedInformerFactory(clientset, 12*time.Hour) ctrl := gomock.NewController(t) @@ -107,10 +116,11 @@ func newController(t *testing.T, networkConfig *config.NetworkConfig, objects .. ipsecCertificateManager := &fakeIPsecCertificateManager{} ovsCtlClient := ovsctltest.NewMockOVSCtlClient(ctrl) wireguardClient := wgtest.NewMockInterface(ctrl) - c := NewNodeRouteController(informerFactory.Core().V1().Nodes(), ofClient, ovsCtlClient, ovsClient, routeClient, interfaceStore, networkConfig, &config.NodeConfig{GatewayConfig: &config.GatewayConfig{ - IPv4: nil, - MAC: gatewayMAC, - }}, wireguardClient, ipsecCertificateManager, utilwait.NewGroup()) + c := NewNodeRouteController(informerFactory.Core().V1().Nodes(), ofClient, ovsCtlClient, ovsClient, routeClient, interfaceStore, networkConfig, nodeConfig, wireguardClient, ipsecCertificateManager, utilwait.NewGroup()) + require.Equal(t, 24, c.maskSizeV4) + require.Equal(t, 48, c.maskSizeV6) + // Check that the podSubnets set already includes local PodCIDRs. + require.True(t, c.podSubnets.HasAll(netip.MustParsePrefix(podCIDR.String()), netip.MustParsePrefix(podCIDRv6.String()))) return &fakeController{ Controller: c, clientset: clientset, @@ -136,13 +146,13 @@ func TestControllerWithDuplicatePodCIDR(t *testing.T) { // resourceVersion. A watcher of fake clientset only gets events that happen after the watcher is created. c.informerFactory.WaitForCacheSync(stopCh) - node2 := &corev1.Node{ + otherNode := &corev1.Node{ ObjectMeta: metav1.ObjectMeta{ - Name: "node2", + Name: "otherNode", }, Spec: corev1.NodeSpec{ - PodCIDR: podCIDR.String(), - PodCIDRs: []string{podCIDR.String()}, + PodCIDR: podCIDR1.String(), + PodCIDRs: []string{podCIDR1.String()}, }, Status: corev1.NodeStatus{ Addresses: []corev1.NodeAddress{ @@ -160,22 +170,24 @@ func TestControllerWithDuplicatePodCIDR(t *testing.T) { c.clientset.CoreV1().Nodes().Create(context.TODO(), node1, metav1.CreateOptions{}) c.ofClient.EXPECT().InstallNodeFlows("node1", gomock.Any(), &dsIPs1, uint32(0), nil).Times(1) - c.routeClient.EXPECT().AddRoutes(podCIDR, "node1", nodeIP1, podCIDRGateway).Times(1) + c.routeClient.EXPECT().AddRoutes(podCIDR1, "node1", nodeIP1, podCIDR1Gateway).Times(1) + c.routeClient.EXPECT().AddRoutes(podCIDR1v6, "node1", nil, podCIDR1v6Gateway).Times(1) c.processNextWorkItem() - // Since node1 is not deleted yet, routes and flows for node2 shouldn't be installed as its PodCIDR is duplicate. - c.clientset.CoreV1().Nodes().Create(context.TODO(), node2, metav1.CreateOptions{}) + // Since node1 is not deleted yet, routes and flows for otherNode shouldn't be installed as its PodCIDR is duplicate. + c.clientset.CoreV1().Nodes().Create(context.TODO(), otherNode, metav1.CreateOptions{}) c.processNextWorkItem() // node1 is deleted, its routes and flows should be deleted. c.clientset.CoreV1().Nodes().Delete(context.TODO(), node1.Name, metav1.DeleteOptions{}) c.ofClient.EXPECT().UninstallNodeFlows("node1").Times(1) - c.routeClient.EXPECT().DeleteRoutes(podCIDR).Times(1) + c.routeClient.EXPECT().DeleteRoutes(podCIDR1).Times(1) + c.routeClient.EXPECT().DeleteRoutes(podCIDR1v6).Times(1) c.processNextWorkItem() - // After node1 is deleted, routes and flows should be installed for node2 successfully. - c.ofClient.EXPECT().InstallNodeFlows("node2", gomock.Any(), &dsIPs2, uint32(0), nil).Times(1) - c.routeClient.EXPECT().AddRoutes(podCIDR, "node2", nodeIP2, podCIDRGateway).Times(1) + // After node1 is deleted, routes and flows should be installed for otherNode successfully. + c.ofClient.EXPECT().InstallNodeFlows("otherNode", gomock.Any(), &dsIPs2, uint32(0), nil).Times(1) + c.routeClient.EXPECT().AddRoutes(podCIDR1, "otherNode", nodeIP2, podCIDR1Gateway).Times(1) c.processNextWorkItem() }() @@ -186,7 +198,7 @@ func TestControllerWithDuplicatePodCIDR(t *testing.T) { } } -func TestIPInPodSubnets(t *testing.T) { +func TestLookupIPInPodSubnets(t *testing.T) { c := newController(t, &config.NetworkConfig{}) defer c.queue.ShutDown() @@ -197,64 +209,92 @@ func TestIPInPodSubnets(t *testing.T) { // in-between list and watch call of an informer. This is because fake clientset doesn't support watching with // resourceVersion. A watcher of fake clientset only gets events that happen after the watcher is created. c.informerFactory.WaitForCacheSync(stopCh) - c.Controller.nodeConfig.PodIPv4CIDR = podCIDR - c.Controller.nodeConfig.PodIPv6CIDR = podCIDR3 - node2 := &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "node2", - }, - Spec: corev1.NodeSpec{ - PodCIDR: podCIDR2.String(), - PodCIDRs: []string{podCIDR2.String()}, + c.clientset.CoreV1().Nodes().Create(context.TODO(), node1, metav1.CreateOptions{}) + c.ofClient.EXPECT().InstallNodeFlows("node1", gomock.Any(), &dsIPs1, uint32(0), nil).Times(1) + c.routeClient.EXPECT().AddRoutes(podCIDR1, "node1", nodeIP1, podCIDR1Gateway).Times(1) + c.routeClient.EXPECT().AddRoutes(podCIDR1v6, "node1", nil, podCIDR1v6Gateway).Times(1) + c.processNextWorkItem() + + testCases := []struct { + name string + ips []string + isInPodSubnets bool + isGwIP bool + }{ + { + name: "local gateway IP", + ips: []string{"1.1.0.1", "2001:4860:0000::1"}, + isInPodSubnets: true, + isGwIP: true, }, - Status: corev1.NodeStatus{ - Addresses: []corev1.NodeAddress{ - { - Type: corev1.NodeInternalIP, - Address: nodeIP2.String(), - }, - }, + { + name: "local Pod IP", + ips: []string{"1.1.0.101", "2001:4860:0000::101"}, + isInPodSubnets: true, + isGwIP: false, }, - } - node3 := &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "node3", + { + name: "remote gateway IP", + ips: []string{"1.1.1.1", "2001:4860:0001::1"}, + isInPodSubnets: true, + isGwIP: true, }, - Spec: corev1.NodeSpec{ - PodCIDR: podCIDR3.String(), - PodCIDRs: []string{podCIDR3.String()}, + { + name: "remote Pod IP", + ips: []string{"1.1.1.101", "2001:4860:0001::101"}, + isInPodSubnets: true, + isGwIP: false, }, - Status: corev1.NodeStatus{ - Addresses: []corev1.NodeAddress{ - { - Type: corev1.NodeInternalIP, - Address: nodeIP3.String(), - }, - }, + { + name: "unknown IPs", + ips: []string{"1.1.10.101", "2001:4860:0010::101"}, + isInPodSubnets: false, + isGwIP: false, }, } - c.clientset.CoreV1().Nodes().Create(context.TODO(), node1, metav1.CreateOptions{}) - c.ofClient.EXPECT().InstallNodeFlows("node1", gomock.Any(), &dsIPs1, uint32(0), nil).Times(1) - c.routeClient.EXPECT().AddRoutes(podCIDR, "node1", nodeIP1, podCIDRGateway).Times(1) - c.processNextWorkItem() + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + for _, ip := range tc.ips { + isInPodSubnets, isGwIP := c.Controller.LookupIPInPodSubnets(netip.MustParseAddr(ip)) + assert.Equal(t, tc.isInPodSubnets, isInPodSubnets) + assert.Equal(t, tc.isGwIP, isGwIP) + } + }) + } +} - c.clientset.CoreV1().Nodes().Create(context.TODO(), node2, metav1.CreateOptions{}) - c.ofClient.EXPECT().InstallNodeFlows("node2", gomock.Any(), &dsIPs2, uint32(0), nil).Times(1) - c.routeClient.EXPECT().AddRoutes(podCIDR2, "node2", nodeIP2, podCIDR2Gateway).Times(1) - c.processNextWorkItem() +func BenchmarkLookupIPInPodSubnets(b *testing.B) { + c := newController(b, &config.NetworkConfig{}) + defer c.queue.ShutDown() - c.clientset.CoreV1().Nodes().Create(context.TODO(), node3, metav1.CreateOptions{}) - c.ofClient.EXPECT().InstallNodeFlows("node3", gomock.Any(), &dsIPs3, uint32(0), nil).Times(1) - c.routeClient.EXPECT().AddRoutes(podCIDR3, "node3", nodeIP3, podCIDR3Gateway).Times(1) + stopCh := make(chan struct{}) + defer close(stopCh) + c.informerFactory.Start(stopCh) + // Must wait for cache sync, otherwise resource creation events will be missing if the resources are created + // in-between list and watch call of an informer. This is because fake clientset doesn't support watching with + // resourceVersion. A watcher of fake clientset only gets events that happen after the watcher is created. + c.informerFactory.WaitForCacheSync(stopCh) + + c.clientset.CoreV1().Nodes().Create(context.TODO(), node1, metav1.CreateOptions{}) + c.ofClient.EXPECT().InstallNodeFlows("node1", gomock.Any(), &dsIPs1, uint32(0), nil).Times(1) + c.routeClient.EXPECT().AddRoutes(podCIDR1, "node1", nodeIP1, podCIDR1Gateway).Times(1) + c.routeClient.EXPECT().AddRoutes(podCIDR1v6, "node1", nil, podCIDR1v6Gateway).Times(1) c.processNextWorkItem() - assert.Equal(t, true, c.Controller.IPInPodSubnets(net.ParseIP("1.1.1.1"))) - assert.Equal(t, true, c.Controller.IPInPodSubnets(net.ParseIP("2001:4860:4860::8889"))) - assert.Equal(t, true, c.Controller.IPInPodSubnets(net.ParseIP("1.1.2.1"))) - assert.Equal(t, false, c.Controller.IPInPodSubnets(net.ParseIP("10.10.10.10"))) - assert.Equal(t, false, c.Controller.IPInPodSubnets(net.ParseIP("8.8.8.8"))) + localPodIP := netip.MustParseAddr("1.1.0.99") + remotePodIP := netip.MustParseAddr("1.1.1.99") + remoteGatewayIP := netip.MustParseAddr("1.1.1.1") + unknownIP := netip.MustParseAddr("1.1.2.99") + + b.ResetTimer() + for range b.N { + c.Controller.findPodSubnetForIP(localPodIP) + c.Controller.findPodSubnetForIP(remotePodIP) + c.Controller.findPodSubnetForIP(remoteGatewayIP) + c.Controller.findPodSubnetForIP(unknownIP) + } } func setup(t *testing.T, ifaces []*interfacestore.InterfaceConfig, authenticationMode config.IPsecAuthenticationMode) *fakeController { @@ -300,8 +340,8 @@ func TestRemoveStaleTunnelPorts(t *testing.T) { Name: "xyz-k8s-0-1", }, Spec: corev1.NodeSpec{ - PodCIDR: podCIDR.String(), - PodCIDRs: []string{podCIDR.String()}, + PodCIDR: podCIDR1.String(), + PodCIDRs: []string{podCIDR1.String()}, }, Status: corev1.NodeStatus{ Addresses: []corev1.NodeAddress{ @@ -630,7 +670,7 @@ func TestRemoveStaleGatewayRoutes(t *testing.T) { c.informerFactory.Start(stopCh) c.informerFactory.WaitForCacheSync(stopCh) - c.routeClient.EXPECT().Reconcile([]string{podCIDR.String()}) + c.routeClient.EXPECT().Reconcile([]string{podCIDR1.String(), podCIDR1v6.String()}) err := c.removeStaleGatewayRoutes() assert.NoError(t, err) } @@ -690,7 +730,7 @@ func TestDeleteNodeRoute(t *testing.T) { expectedCalls: func(ovsClient *ovsconfigtest.MockOVSBridgeClient, routeClient *routetest.MockInterface, ofClient *oftest.MockClient, wgClient *wgtest.MockInterface) { ovsClient.EXPECT().DeletePort("123") - routeClient.EXPECT().DeleteRoutes(podCIDR) + routeClient.EXPECT().DeleteRoutes(podCIDR1) ofClient.EXPECT().UninstallNodeFlows(node1.Name) }, }, @@ -700,7 +740,7 @@ func TestDeleteNodeRoute(t *testing.T) { mode: config.TrafficEncryptionModeWireGuard, expectedCalls: func(ovsClient *ovsconfigtest.MockOVSBridgeClient, routeClient *routetest.MockInterface, ofClient *oftest.MockClient, wgClient *wgtest.MockInterface) { - routeClient.EXPECT().DeleteRoutes(podCIDR) + routeClient.EXPECT().DeleteRoutes(podCIDR1) ofClient.EXPECT().UninstallNodeFlows(nodeWithWireGuard.Name) wgClient.EXPECT().DeletePeer(nodeWithWireGuard.Name) }, @@ -714,7 +754,7 @@ func TestDeleteNodeRoute(t *testing.T) { }, tt.node) c.installedNodes.Add(&nodeRouteInfo{ nodeName: tt.node.Name, - podCIDRs: []*net.IPNet{podCIDR}, + podCIDRs: []*net.IPNet{podCIDR1}, }) defer c.queue.ShutDown() @@ -747,7 +787,8 @@ func TestInitialListHasSynced(t *testing.T) { require.Error(t, c.flowRestoreCompleteWait.WaitWithTimeout(100*time.Millisecond)) c.ofClient.EXPECT().InstallNodeFlows("node1", gomock.Any(), &dsIPs1, uint32(0), nil).Times(1) - c.routeClient.EXPECT().AddRoutes(podCIDR, "node1", nodeIP1, podCIDRGateway).Times(1) + c.routeClient.EXPECT().AddRoutes(podCIDR1, "node1", nodeIP1, podCIDR1Gateway).Times(1) + c.routeClient.EXPECT().AddRoutes(podCIDR1v6, "node1", nil, podCIDR1v6Gateway).Times(1) c.processNextWorkItem() assert.True(t, c.hasProcessedInitialList.HasSynced()) @@ -760,7 +801,7 @@ func TestInitialListHasSyncedStopChClosedEarly(t *testing.T) { c.informerFactory.Start(stopCh) c.informerFactory.WaitForCacheSync(stopCh) - c.routeClient.EXPECT().Reconcile([]string{podCIDR.String()}) + c.routeClient.EXPECT().Reconcile([]string{podCIDR1.String(), podCIDR1v6.String()}) // We close the stopCh right away, then call Run synchronously and wait for it to // return. The workers will not even start, and the initial list of Nodes should never be diff --git a/pkg/agent/flowexporter/exporter/exporter.go b/pkg/agent/flowexporter/exporter/exporter.go index 9abfa5d7600..21c2eb78e59 100644 --- a/pkg/agent/flowexporter/exporter/exporter.go +++ b/pkg/agent/flowexporter/exporter/exporter.go @@ -26,6 +26,7 @@ import ( ipfixregistry "github.com/vmware/go-ipfix/pkg/registry" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/config" @@ -34,7 +35,6 @@ import ( "antrea.io/antrea/pkg/agent/flowexporter/connections" "antrea.io/antrea/pkg/agent/flowexporter/priorityqueue" "antrea.io/antrea/pkg/agent/metrics" - "antrea.io/antrea/pkg/agent/openflow" "antrea.io/antrea/pkg/agent/proxy" "antrea.io/antrea/pkg/features" "antrea.io/antrea/pkg/ipfix" @@ -225,6 +225,14 @@ func (exp *FlowExporter) Run(stopCh <-chan struct{}) { // Start the goroutine to poll conntrack flows. go exp.conntrackConnStore.Run(stopCh) + if exp.nodeRouteController != nil { + // Wait for NodeRouteController to have processed the initial list of Nodes so that + // the list of Pod subnets is up-to-date. + if !cache.WaitForCacheSync(stopCh, exp.nodeRouteController.HasSynced) { + return + } + } + defaultTimeout := exp.conntrackPriorityQueue.ActiveFlowTimeout expireTimer := time.NewTimer(defaultTimeout) for { @@ -608,10 +616,20 @@ func (exp *FlowExporter) sendDataSet() (int, error) { if err != nil { return 0, fmt.Errorf("error when sending data set: %v", err) } - klog.V(4).InfoS("Data set sent successfully", "Bytes sent", sentBytes) + if klog.V(5).Enabled() { + klog.InfoS("Data set sent successfully", "Bytes sent", sentBytes) + } return sentBytes, nil } +const ( + // flowTypeUnknown indicates that we are unable to determine the flow type. + flowTypeUnknown = uint8(0) + // flowTypeUnsupported indicates that this type of flow is not supported and that we should + // skip exporting it. + flowTypeUnsupported = uint8(0xff) +) + func (exp *FlowExporter) findFlowType(conn flowexporter.Connection) uint8 { // TODO: support Pod-To-External flows in network policy only mode. if exp.isNetworkPolicyOnly { @@ -622,21 +640,34 @@ func (exp *FlowExporter) findFlowType(conn flowexporter.Connection) uint8 { } if exp.nodeRouteController == nil { - klog.V(4).InfoS("Can't find flowType without nodeRouteController") - return 0 + if klog.V(5).Enabled() { + klog.InfoS("Can't find flowType without nodeRouteController") + } + return flowTypeUnknown + } + srcIsPod, srcIsGw := exp.nodeRouteController.LookupIPInPodSubnets(conn.FlowKey.SourceAddress) + dstIsPod, dstIsGw := exp.nodeRouteController.LookupIPInPodSubnets(conn.FlowKey.DestinationAddress) + if srcIsGw || dstIsGw { + // This matches what we do in filterAntreaConns but is more general as we consider + // remote gateways as well. + if klog.V(5).Enabled() { + klog.InfoS("Flows where the source or destination IP is a gateway IP will not be exported") + } + return flowTypeUnsupported } - if exp.nodeRouteController.IPInPodSubnets(conn.FlowKey.SourceAddress.AsSlice()) { - if conn.Mark&openflow.ServiceCTMark.GetRange().ToNXRange().ToUint32Mask() == openflow.ServiceCTMark.GetValue() || exp.nodeRouteController.IPInPodSubnets(conn.FlowKey.DestinationAddress.AsSlice()) { - if conn.SourcePodName == "" || conn.DestinationPodName == "" { - return ipfixregistry.FlowTypeInterNode - } - return ipfixregistry.FlowTypeIntraNode + if !srcIsPod { + if klog.V(5).Enabled() { + klog.InfoS("Flows where the source is not a Pod will not be exported") } + return flowTypeUnsupported + } + if !dstIsPod { return ipfixregistry.FlowTypeToExternal } - // We do not support External-To-Pod flows for now. - klog.Warningf("Source IP: %s doesn't exist in PodCIDRs", conn.FlowKey.SourceAddress.String()) - return 0 + if conn.SourcePodName == "" || conn.DestinationPodName == "" { + return ipfixregistry.FlowTypeInterNode + } + return ipfixregistry.FlowTypeIntraNode } func (exp *FlowExporter) fillEgressInfo(conn *flowexporter.Connection) { @@ -648,11 +679,16 @@ func (exp *FlowExporter) fillEgressInfo(conn *flowexporter.Connection) { conn.EgressName = egressName conn.EgressIP = egressIP conn.EgressNodeName = egressNodeName - klog.V(4).InfoS("Filling Egress Info for flow", "Egress", conn.EgressName, "EgressIP", conn.EgressIP, "EgressNode", conn.EgressNodeName, "SourcePod", klog.KRef(conn.SourcePodNamespace, conn.SourcePodName)) + if klog.V(5).Enabled() { + klog.InfoS("Filling Egress Info for flow", "Egress", conn.EgressName, "EgressIP", conn.EgressIP, "EgressNode", conn.EgressNodeName, "SourcePod", klog.KRef(conn.SourcePodNamespace, conn.SourcePodName)) + } } func (exp *FlowExporter) exportConn(conn *flowexporter.Connection) error { conn.FlowType = exp.findFlowType(*conn) + if conn.FlowType == flowTypeUnsupported { + return nil + } if conn.FlowType == ipfixregistry.FlowTypeToExternal { if conn.SourcePodNamespace != "" && conn.SourcePodName != "" { exp.fillEgressInfo(conn) @@ -669,7 +705,9 @@ func (exp *FlowExporter) exportConn(conn *flowexporter.Connection) error { return err } exp.numDataSetsSent = exp.numDataSetsSent + 1 - klog.V(4).InfoS("Record for connection sent successfully", "flowKey", conn.FlowKey, "connection", conn) + if klog.V(5).Enabled() { + klog.InfoS("Record for connection sent successfully", "flowKey", conn.FlowKey, "connection", conn) + } return nil } diff --git a/pkg/agent/util/net.go b/pkg/agent/util/net.go index 14d43757bff..55855faf0b0 100644 --- a/pkg/agent/util/net.go +++ b/pkg/agent/util/net.go @@ -23,13 +23,15 @@ import ( "io" "math" "net" + "net/netip" "strings" + "github.com/containernetworking/plugins/pkg/ip" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" utilnet "k8s.io/utils/net" - "antrea.io/antrea/pkg/util/ip" + utilip "antrea.io/antrea/pkg/util/ip" ) const ( @@ -137,7 +139,7 @@ func listenUnix(address string) (net.Listener, error) { // GetIPNetDeviceFromIP returns local IPs/masks and associated device from IP, and ignores the interfaces which have // names in the ignoredInterfaces. -func GetIPNetDeviceFromIP(localIPs *ip.DualStackIPs, ignoredInterfaces sets.Set[string]) (v4IPNet *net.IPNet, v6IPNet *net.IPNet, iface *net.Interface, err error) { +func GetIPNetDeviceFromIP(localIPs *utilip.DualStackIPs, ignoredInterfaces sets.Set[string]) (v4IPNet *net.IPNet, v6IPNet *net.IPNet, iface *net.Interface, err error) { linkList, err := netInterfaces() if err != nil { return nil, nil, nil, err @@ -438,3 +440,14 @@ func GenerateOVSDatapathID(macString string) string { } return "0000" + strings.Replace(macString, ":", "", -1) } + +// GetGatewayIPForPodCIDR returns the gateway IP for a given Pod CIDR. +func GetGatewayIPForPodCIDR(cidr *net.IPNet) net.IP { + return ip.NextIP(cidr.IP.Mask(cidr.Mask)) +} + +// GetGatewayIPForPodPrefix acts like GetGatewayIPForPodCIDR but takes a netip.Prefix as a parameter +// and returns a netip.Addr value. +func GetGatewayIPForPodPrefix(prefix netip.Prefix) netip.Addr { + return prefix.Masked().Addr().Next() +} diff --git a/pkg/agent/util/net_windows.go b/pkg/agent/util/net_windows.go index 8e9c0ca534d..7183fef7c73 100644 --- a/pkg/agent/util/net_windows.go +++ b/pkg/agent/util/net_windows.go @@ -28,7 +28,6 @@ import ( "github.com/Microsoft/go-winio" "github.com/Microsoft/hcsshim" - "github.com/containernetworking/plugins/pkg/ip" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" @@ -64,7 +63,7 @@ func GetNSPath(containerNetNS string) (string, error) { func CreateHNSNetwork(hnsNetName string, subnetCIDR *net.IPNet, nodeIP *net.IPNet, adapter *net.Interface) (*hcsshim.HNSNetwork, error) { adapterMAC := adapter.HardwareAddr adapterName := adapter.Name - gateway := ip.NextIP(subnetCIDR.IP.Mask(subnetCIDR.Mask)) + gateway := GetGatewayIPForPodCIDR(subnetCIDR) network := &hcsshim.HNSNetwork{ Name: hnsNetName, Type: HNSNetworkType,