Skip to content

Commit

Permalink
[FlowExporter] More efficient IP checks
Browse files Browse the repository at this point in the history
The FlowExporter in the Agent queries the NodeRouteController to
determine whether the source / destination IPs are Pod IPs (NodeIPAM
only). Prior to this change, these checks were expensive, involving an
indexer lookup and conversions between different IP formats. The new
implementation is about 10x faster, and peforms no memory allocations.

The new implementation introduces a new set in the NodeRouteController,
dedicated to storing all the PodCIDRs in the cluster. While I considered
removing the dependency of the FlowExporter on the NodeRouteController
altogether, it would have been a much bigger change. Additionally, in
the long term, we could consider removing these checks from the
FlowExporter altogether, and pushing the logic to the FlowAggregator.

We also make a few additional changes to the FlowExporter:
* more consistently ignore connections where the source / destination IP
  is a gateway IP
* classify Pod-to-Service traffic where the destination IP is not a Pod
  IP as Pod-to-External
* log a warning if FlowExporter is enabled alongside AntreaIPAM

Signed-off-by: Antonin Bas <[email protected]>
  • Loading branch information
antoninbas committed Jan 30, 2025
1 parent fc05c92 commit 6b4b8e2
Show file tree
Hide file tree
Showing 6 changed files with 285 additions and 141 deletions.
5 changes: 4 additions & 1 deletion cmd/antrea-agent/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,10 @@ func (o *Options) validateAntreaProxyConfig(encapMode config.TrafficEncapModeTyp
}

func (o *Options) validateFlowExporterConfig() error {
if features.DefaultFeatureGate.Enabled(features.FlowExporter) {
if features.DefaultFeatureGate.Enabled(features.FlowExporter) && o.config.FlowExporter.Enable {
if features.DefaultFeatureGate.Enabled(features.AntreaIPAM) {
klog.InfoS("The FlowExporter feature does not support AntreaIPAM Pods")
}
host, port, proto, err := flowexport.ParseFlowCollectorAddr(o.config.FlowExporter.FlowCollectorAddr, defaultFlowCollectorPort, defaultFlowCollectorTransport)
if err != nil {
return err
Expand Down
118 changes: 84 additions & 34 deletions pkg/agent/controller/noderoute/node_route_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@ import (
"context"
"fmt"
"net"
"net/netip"
"sync"
"time"

"github.com/containernetworking/plugins/pkg/ip"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
Expand Down Expand Up @@ -77,7 +80,13 @@ type Controller struct {
// installedNodes records routes and flows installation states of Nodes.
// The key is the host name of the Node, the value is the nodeRouteInfo of the Node.
// A node will be in the map after its flows and routes are installed successfully.
installedNodes cache.Indexer
installedNodes cache.Indexer
// podSubnetsMutex protects access to the podSubnets set.
podSubnetsMutex sync.RWMutex
// podSubnets is a set which stores all known PodCIDRs in the cluster as masked netip.Prefix objects.
podSubnets sets.Set[netip.Prefix]
maskSizeV4 int
maskSizeV6 int
wireGuardClient wireguard.Interface
// ipsecCertificateManager is useful for determining whether the ipsec certificate has been configured
// or not when IPsec is enabled with "cert" mode. The NodeRouteController must wait for the certificate
Expand Down Expand Up @@ -124,10 +133,21 @@ func NewNodeRouteController(
},
),
installedNodes: cache.NewIndexer(nodeRouteInfoKeyFunc, cache.Indexers{nodeRouteInfoPodCIDRIndexName: nodeRouteInfoPodCIDRIndexFunc}),
podSubnets: sets.New[netip.Prefix](),
wireGuardClient: wireguardClient,
ipsecCertificateManager: ipsecCertificateManager,
flowRestoreCompleteWait: flowRestoreCompleteWait.Increment(),
}
if nodeConfig.PodIPv4CIDR != nil {
prefix, _ := cidrToPrefix(nodeConfig.PodIPv4CIDR)
controller.podSubnets.Insert(prefix)
controller.maskSizeV4 = prefix.Bits()
}
if nodeConfig.PodIPv6CIDR != nil {
prefix, _ := cidrToPrefix(nodeConfig.PodIPv6CIDR)
controller.podSubnets.Insert(prefix)
controller.maskSizeV6 = prefix.Bits()
}
registration, _ := nodeInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerDetailedFuncs{
AddFunc: func(cur interface{}, isInInitialList bool) {
Expand Down Expand Up @@ -379,7 +399,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
go func() {
// When the initial list of Nodes has been processed, we decrement flowRestoreCompleteWait.
err := wait.PollUntilContextCancel(wait.ContextForChannel(stopCh), 100*time.Millisecond, true, func(ctx context.Context) (done bool, err error) {
return c.hasProcessedInitialList.HasSynced(), nil
return c.HasSynced(), nil
})
// An error here means the context has been cancelled, which means that the stopCh
// has been closed. While it is still possible for c.hasProcessedInitialList.HasSynced
Expand All @@ -395,6 +415,11 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
<-stopCh
}

// HasSynced returns true when the initial list of Nodes has been processed by the controller.
func (c *Controller) HasSynced() bool {
return c.hasProcessedInitialList.HasSynced()
}

// worker is a long-running function that will continually call the processNextWorkItem function in
// order to read and process a message on the workqueue.
func (c *Controller) worker() {
Expand Down Expand Up @@ -482,6 +507,12 @@ func (c *Controller) deleteNodeRoute(nodeName string) error {
return fmt.Errorf("failed to uninstall flows to Node %s: %v", nodeName, err)
}
c.installedNodes.Delete(obj)
func() {
subnets, _ := cidrsToPrefixes(nodeRouteInfo.podCIDRs)
c.podSubnetsMutex.Lock()
defer c.podSubnetsMutex.Unlock()
c.podSubnets.Delete(subnets...)
}()

if c.networkConfig.TrafficEncryptionMode == config.TrafficEncryptionModeIPSec {
interfaceConfig, ok := c.interfaceStore.GetNodeTunnelInterface(nodeName)
Expand Down Expand Up @@ -575,6 +606,13 @@ func (c *Controller) addNodeRoute(nodeName string, node *corev1.Node) error {
peerNodeIP = peerNodeIPs.IPv6
}

func() {
subnet, _ := cidrToPrefix(peerPodCIDR)
c.podSubnetsMutex.Lock()
defer c.podSubnetsMutex.Unlock()
c.podSubnets.Insert(subnet)
}()

klog.InfoS("Adding route and flow to Node", "Node", nodeName, "podCIDR", podCIDR,
"peerNodeIP", peerNodeIP)
}
Expand Down Expand Up @@ -773,40 +811,31 @@ func ParseTunnelInterfaceConfig(
return interfaceConfig
}

func (c *Controller) IPInPodSubnets(ip net.IP) bool {
var ipCIDR *net.IPNet
var curNodeCIDRStr string
if ip.To4() != nil {
var podIPv4CIDRMaskSize int
if c.nodeConfig.PodIPv4CIDR != nil {
curNodeCIDRStr = c.nodeConfig.PodIPv4CIDR.String()
podIPv4CIDRMaskSize, _ = c.nodeConfig.PodIPv4CIDR.Mask.Size()
} else {
return false
}
v4Mask := net.CIDRMask(podIPv4CIDRMaskSize, utilip.V4BitLen)
ipCIDR = &net.IPNet{
IP: ip.Mask(v4Mask),
Mask: v4Mask,
}

func (c *Controller) findPodSubnetForIP(ip netip.Addr) (netip.Prefix, bool) {
var maskSize int
if ip.Is4() {
maskSize = c.maskSizeV4
} else {
var podIPv6CIDRMaskSize int
if c.nodeConfig.PodIPv6CIDR != nil {
curNodeCIDRStr = c.nodeConfig.PodIPv6CIDR.String()
podIPv6CIDRMaskSize, _ = c.nodeConfig.PodIPv6CIDR.Mask.Size()
} else {
return false
}
v6Mask := net.CIDRMask(podIPv6CIDRMaskSize, utilip.V6BitLen)
ipCIDR = &net.IPNet{
IP: ip.Mask(v6Mask),
Mask: v6Mask,
}
maskSize = c.maskSizeV6
}
ipCIDRStr := ipCIDR.String()
nodeInCluster, _ := c.installedNodes.ByIndex(nodeRouteInfoPodCIDRIndexName, ipCIDRStr)
return len(nodeInCluster) > 0 || ipCIDRStr == curNodeCIDRStr
if maskSize == 0 {
return netip.Prefix{}, false
}
prefix, _ := ip.Prefix(maskSize)
c.podSubnetsMutex.RLock()
defer c.podSubnetsMutex.RUnlock()
return prefix, c.podSubnets.Has(prefix)
}

// LookupIPInPodSubnets returns two boolean values. The first one indicates whether the IP can be
// found in a PodCIDR for one of the cluster Nodes. The second one indicates whether the IP is used
// as a gwateway IP. The second boolean value can only be true if the first one is true.
func (c *Controller) LookupIPInPodSubnets(ip netip.Addr) (bool, bool) {
prefix, ok := c.findPodSubnetForIP(ip)
if !ok {
return false, false
}
return ok, ip == util.GetGatewayIPForPodPrefix(prefix)
}

// getNodeMAC gets Node's br-int MAC from its annotation. It is only for Windows Noencap mode.
Expand All @@ -821,3 +850,24 @@ func getNodeMAC(node *corev1.Node) (net.HardwareAddr, error) {
}
return mac, nil
}

func cidrToPrefix(cidr *net.IPNet) (netip.Prefix, error) {
addr, ok := netip.AddrFromSlice(cidr.IP)
if !ok {
return netip.Prefix{}, fmt.Errorf("invalid IP in cidr: %v", cidr)
}
size, _ := cidr.Mask.Size()
return addr.Prefix(size)
}

func cidrsToPrefixes(cidrs []*net.IPNet) ([]netip.Prefix, error) {
prefixes := make([]netip.Prefix, len(cidrs))
for idx := range cidrs {
prefix, err := cidrToPrefix(cidrs[idx])
if err != nil {
return nil, err
}
prefixes[idx] = prefix
}
return prefixes, nil
}
Loading

0 comments on commit 6b4b8e2

Please sign in to comment.