Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FlowExporter] More efficient IP checks #6960

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cmd/antrea-agent/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,10 @@ func (o *Options) validateAntreaProxyConfig(encapMode config.TrafficEncapModeTyp
}

func (o *Options) validateFlowExporterConfig() error {
if features.DefaultFeatureGate.Enabled(features.FlowExporter) {
if features.DefaultFeatureGate.Enabled(features.FlowExporter) && o.config.FlowExporter.Enable {
if features.DefaultFeatureGate.Enabled(features.AntreaIPAM) {
klog.InfoS("The FlowExporter feature does not support AntreaIPAM Pods")
}
host, port, proto, err := flowexport.ParseFlowCollectorAddr(o.config.FlowExporter.FlowCollectorAddr, defaultFlowCollectorPort, defaultFlowCollectorTransport)
if err != nil {
return err
Expand Down
118 changes: 84 additions & 34 deletions pkg/agent/controller/noderoute/node_route_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@ import (
"context"
"fmt"
"net"
"net/netip"
"sync"
"time"

"github.com/containernetworking/plugins/pkg/ip"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
Expand Down Expand Up @@ -77,7 +80,13 @@ type Controller struct {
// installedNodes records routes and flows installation states of Nodes.
// The key is the host name of the Node, the value is the nodeRouteInfo of the Node.
// A node will be in the map after its flows and routes are installed successfully.
installedNodes cache.Indexer
installedNodes cache.Indexer
// podSubnetsMutex protects access to the podSubnets set.
podSubnetsMutex sync.RWMutex
// podSubnets is a set which stores all known PodCIDRs in the cluster as masked netip.Prefix objects.
podSubnets sets.Set[netip.Prefix]
maskSizeV4 int
maskSizeV6 int
wireGuardClient wireguard.Interface
// ipsecCertificateManager is useful for determining whether the ipsec certificate has been configured
// or not when IPsec is enabled with "cert" mode. The NodeRouteController must wait for the certificate
Expand Down Expand Up @@ -124,10 +133,21 @@ func NewNodeRouteController(
},
),
installedNodes: cache.NewIndexer(nodeRouteInfoKeyFunc, cache.Indexers{nodeRouteInfoPodCIDRIndexName: nodeRouteInfoPodCIDRIndexFunc}),
podSubnets: sets.New[netip.Prefix](),
wireGuardClient: wireguardClient,
ipsecCertificateManager: ipsecCertificateManager,
flowRestoreCompleteWait: flowRestoreCompleteWait.Increment(),
}
if nodeConfig.PodIPv4CIDR != nil {
prefix, _ := cidrToPrefix(nodeConfig.PodIPv4CIDR)
controller.podSubnets.Insert(prefix)
controller.maskSizeV4 = prefix.Bits()
}
if nodeConfig.PodIPv6CIDR != nil {
prefix, _ := cidrToPrefix(nodeConfig.PodIPv6CIDR)
controller.podSubnets.Insert(prefix)
controller.maskSizeV6 = prefix.Bits()
}
registration, _ := nodeInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerDetailedFuncs{
AddFunc: func(cur interface{}, isInInitialList bool) {
Expand Down Expand Up @@ -379,7 +399,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
go func() {
// When the initial list of Nodes has been processed, we decrement flowRestoreCompleteWait.
err := wait.PollUntilContextCancel(wait.ContextForChannel(stopCh), 100*time.Millisecond, true, func(ctx context.Context) (done bool, err error) {
return c.hasProcessedInitialList.HasSynced(), nil
return c.HasSynced(), nil
})
// An error here means the context has been cancelled, which means that the stopCh
// has been closed. While it is still possible for c.hasProcessedInitialList.HasSynced
Expand All @@ -395,6 +415,11 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
<-stopCh
}

// HasSynced returns true when the initial list of Nodes has been processed by the controller.
func (c *Controller) HasSynced() bool {
return c.hasProcessedInitialList.HasSynced()
}

// worker is a long-running function that will continually call the processNextWorkItem function in
// order to read and process a message on the workqueue.
func (c *Controller) worker() {
Expand Down Expand Up @@ -482,6 +507,12 @@ func (c *Controller) deleteNodeRoute(nodeName string) error {
return fmt.Errorf("failed to uninstall flows to Node %s: %v", nodeName, err)
}
c.installedNodes.Delete(obj)
func() {
subnets, _ := cidrsToPrefixes(nodeRouteInfo.podCIDRs)
c.podSubnetsMutex.Lock()
defer c.podSubnetsMutex.Unlock()
c.podSubnets.Delete(subnets...)
}()

if c.networkConfig.TrafficEncryptionMode == config.TrafficEncryptionModeIPSec {
interfaceConfig, ok := c.interfaceStore.GetNodeTunnelInterface(nodeName)
Expand Down Expand Up @@ -575,6 +606,13 @@ func (c *Controller) addNodeRoute(nodeName string, node *corev1.Node) error {
peerNodeIP = peerNodeIPs.IPv6
}

func() {
subnet, _ := cidrToPrefix(peerPodCIDR)
c.podSubnetsMutex.Lock()
defer c.podSubnetsMutex.Unlock()
c.podSubnets.Insert(subnet)
}()

klog.InfoS("Adding route and flow to Node", "Node", nodeName, "podCIDR", podCIDR,
"peerNodeIP", peerNodeIP)
}
Expand Down Expand Up @@ -773,40 +811,31 @@ func ParseTunnelInterfaceConfig(
return interfaceConfig
}

func (c *Controller) IPInPodSubnets(ip net.IP) bool {
var ipCIDR *net.IPNet
var curNodeCIDRStr string
if ip.To4() != nil {
var podIPv4CIDRMaskSize int
if c.nodeConfig.PodIPv4CIDR != nil {
curNodeCIDRStr = c.nodeConfig.PodIPv4CIDR.String()
podIPv4CIDRMaskSize, _ = c.nodeConfig.PodIPv4CIDR.Mask.Size()
} else {
return false
}
v4Mask := net.CIDRMask(podIPv4CIDRMaskSize, utilip.V4BitLen)
ipCIDR = &net.IPNet{
IP: ip.Mask(v4Mask),
Mask: v4Mask,
}

func (c *Controller) findPodSubnetForIP(ip netip.Addr) (netip.Prefix, bool) {
var maskSize int
if ip.Is4() {
maskSize = c.maskSizeV4
} else {
var podIPv6CIDRMaskSize int
if c.nodeConfig.PodIPv6CIDR != nil {
curNodeCIDRStr = c.nodeConfig.PodIPv6CIDR.String()
podIPv6CIDRMaskSize, _ = c.nodeConfig.PodIPv6CIDR.Mask.Size()
} else {
return false
}
v6Mask := net.CIDRMask(podIPv6CIDRMaskSize, utilip.V6BitLen)
ipCIDR = &net.IPNet{
IP: ip.Mask(v6Mask),
Mask: v6Mask,
}
maskSize = c.maskSizeV6
}
ipCIDRStr := ipCIDR.String()
nodeInCluster, _ := c.installedNodes.ByIndex(nodeRouteInfoPodCIDRIndexName, ipCIDRStr)
return len(nodeInCluster) > 0 || ipCIDRStr == curNodeCIDRStr
if maskSize == 0 {
return netip.Prefix{}, false
}
prefix, _ := ip.Prefix(maskSize)
c.podSubnetsMutex.RLock()
defer c.podSubnetsMutex.RUnlock()
return prefix, c.podSubnets.Has(prefix)
}

// LookupIPInPodSubnets returns two boolean values. The first one indicates whether the IP can be
// found in a PodCIDR for one of the cluster Nodes. The second one indicates whether the IP is used
// as a gwateway IP. The second boolean value can only be true if the first one is true.
func (c *Controller) LookupIPInPodSubnets(ip netip.Addr) (bool, bool) {
prefix, ok := c.findPodSubnetForIP(ip)
if !ok {
return false, false
}
return ok, ip == util.GetGatewayIPForPodPrefix(prefix)
}

// getNodeMAC gets Node's br-int MAC from its annotation. It is only for Windows Noencap mode.
Expand All @@ -821,3 +850,24 @@ func getNodeMAC(node *corev1.Node) (net.HardwareAddr, error) {
}
return mac, nil
}

func cidrToPrefix(cidr *net.IPNet) (netip.Prefix, error) {
addr, ok := netip.AddrFromSlice(cidr.IP)
if !ok {
return netip.Prefix{}, fmt.Errorf("invalid IP in cidr: %v", cidr)
}
size, _ := cidr.Mask.Size()
return addr.Prefix(size)
}

func cidrsToPrefixes(cidrs []*net.IPNet) ([]netip.Prefix, error) {
prefixes := make([]netip.Prefix, len(cidrs))
for idx := range cidrs {
prefix, err := cidrToPrefix(cidrs[idx])
if err != nil {
return nil, err
}
prefixes[idx] = prefix
}
return prefixes, nil
}
Loading
Loading