Skip to content

Commit

Permalink
[FlowAggregator] Add clusterId to aggregated records (#6769)
Browse files Browse the repository at this point in the history
In order to allow external flow collectors to easily identify which
cluster the record is coming from (rather than relying on the
observationDomainId or the exporter's IP address).

We use the cluster UUID (generated by Antrea) to populate this IE.

Signed-off-by: Antonin Bas <[email protected]>
  • Loading branch information
antoninbas authored Jan 8, 2025
1 parent 73dbd7b commit f991c42
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 26 deletions.
2 changes: 1 addition & 1 deletion ci/kind/test-e2e-kind.sh
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ COMMON_IMAGES_LIST=("registry.k8s.io/e2e-test-images/agnhost:2.40" \
"antrea/nginx:1.21.6-alpine" \
"antrea/toolbox:1.5-1")

FLOW_VISIBILITY_IMAGE_LIST=("antrea/ipfix-collector:v0.11.0" \
FLOW_VISIBILITY_IMAGE_LIST=("antrea/ipfix-collector:v0.12.0" \
"antrea/clickhouse-operator:0.21.0" \
"antrea/metrics-exporter:0.21.0" \
"antrea/clickhouse-server:23.4")
Expand Down
8 changes: 6 additions & 2 deletions pkg/flowaggregator/exporter/ipfix.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,12 +327,16 @@ func (e *IPFIXExporter) sendTemplateSet(isIPv6 bool) (int, error) {
}
elements = append(elements, ie)
}
ie, err := e.createInfoElementForTemplateSet("clusterId", ipfixregistry.AntreaEnterpriseID)
if err != nil {
return 0, err
}
elements = append(elements, ie)
e.set.ResetSet()
if err := e.set.PrepareSet(ipfixentities.Template, templateID); err != nil {
return 0, err
}
err := e.set.AddRecordV2(elements, templateID)
if err != nil {
if err := e.set.AddRecordV2(elements, templateID); err != nil {
return 0, fmt.Errorf("error when adding record to set, error: %v", err)
}
bytesSent, err := e.exportingProcess.SendSet(e.set)
Expand Down
3 changes: 3 additions & 0 deletions pkg/flowaggregator/exporter/ipfix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,9 @@ func createElementList(isIPv6 bool, mockIPFIXRegistry *ipfixtesting.MockIPFIXReg
elemList = append(elemList, createElement(ie, ipfixregistry.AntreaEnterpriseID))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil)
}
elemList = append(elemList, createElement("clusterId", ipfixregistry.AntreaEnterpriseID))
mockIPFIXRegistry.EXPECT().GetInfoElement("clusterId", ipfixregistry.AntreaEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil)

return elemList
}

Expand Down
16 changes: 16 additions & 0 deletions pkg/flowaggregator/flowaggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ func (fa *flowAggregator) InitCollectingProcess() error {
}
cpInput.NumExtraElements = len(infoelements.AntreaSourceStatsElementList) + len(infoelements.AntreaDestinationStatsElementList) + len(infoelements.AntreaLabelsElementList) +
len(infoelements.AntreaFlowEndSecondsElementList) + len(infoelements.AntreaThroughputElementList) + len(infoelements.AntreaSourceThroughputElementList) + len(infoelements.AntreaDestinationThroughputElementList)
// clusterId
cpInput.NumExtraElements += 1
var err error
fa.collectingProcess, err = collector.InitCollectingProcess(cpInput)
return err
Expand Down Expand Up @@ -424,6 +426,9 @@ func (fa *flowAggregator) sendFlowKeyRecord(key ipfixintermediate.FlowKey, recor
// Even if fa.includePodLabels is false, we still need to add an empty IE to match the template.
if !fa.aggregationProcess.AreExternalFieldsFilled(*record) {
fa.fillPodLabels(key, record.Record, *startTime)
if err := fa.fillClusterID(record.Record); err != nil {
klog.ErrorS(err, "Failed to add clusterId")
}
fa.aggregationProcess.SetExternalFieldsFilled(record, true)
}
if fa.ipfixExporter != nil {
Expand Down Expand Up @@ -559,6 +564,17 @@ func (fa *flowAggregator) fillPodLabels(key ipfixintermediate.FlowKey, record ip
}
}

func (fa *flowAggregator) fillClusterID(record ipfixentities.Record) error {
ie, err := fa.registry.GetInfoElement("clusterId", ipfixregistry.AntreaEnterpriseID)
if err != nil {
return fmt.Errorf("error when getting clusterId InfoElement: %w", err)
}
if err := record.AddInfoElement(ipfixentities.NewStringInfoElement(ie, fa.clusterUUID.String())); err != nil {
return fmt.Errorf("error when adding clusterId InfoElement with value: %w", err)
}
return nil
}

func (fa *flowAggregator) GetFlowRecords(flowKey *ipfixintermediate.FlowKey) []map[string]interface{} {
return fa.aggregationProcess.GetRecords(flowKey)
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/flowaggregator/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,10 @@ func TestFlowAggregator_sendFlowKeyRecord(t *testing.T) {
mockRecord := ipfixentitiestesting.NewMockRecord(ctrl)
mockAggregationProcess := ipfixtesting.NewMockIPFIXAggregationProcess(ctrl)

clusterUUID := uuid.New()
newFlowAggregator := func(includePodLabels bool) *flowAggregator {
return &flowAggregator{
clusterUUID: clusterUUID,
aggregatorTransportProtocol: "tcp",
aggregationProcess: mockAggregationProcess,
activeFlowRecordTimeout: testActiveTimeout,
Expand Down Expand Up @@ -186,6 +188,10 @@ func TestFlowAggregator_sendFlowKeyRecord(t *testing.T) {
mockIPFIXRegistry.EXPECT().GetInfoElement("destinationPodLabels", ipfixregistry.AntreaEnterpriseID).Return(destinationPodLabelsElement, nil)
destinationPodLabelsIE := ipfixentities.NewStringInfoElement(destinationPodLabelsElement, podLabels)
mockRecord.EXPECT().AddInfoElement(destinationPodLabelsIE).Return(nil)
clusterIDElement := ipfixentities.NewInfoElement("clusterId", 0, ipfixentities.String, ipfixregistry.AntreaEnterpriseID, 0)
mockIPFIXRegistry.EXPECT().GetInfoElement("clusterId", ipfixregistry.AntreaEnterpriseID).Return(clusterIDElement, nil)
clusterIDIE := ipfixentities.NewStringInfoElement(clusterIDElement, clusterUUID.String())
mockRecord.EXPECT().AddInfoElement(clusterIDIE).Return(nil)
mockAggregationProcess.EXPECT().SetExternalFieldsFilled(flowRecord, true)
mockAggregationProcess.EXPECT().IsAggregatedRecordIPv4(*flowRecord).Return(!tc.isIPv6)

Expand Down
23 changes: 3 additions & 20 deletions test/e2e/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/agent/openflow/cookie"
crdv1beta1 "antrea.io/antrea/pkg/apis/crd/v1beta1"
"antrea.io/antrea/pkg/clusteridentity"
)

// TestBasic is the top-level test which contains some subtests for
Expand Down Expand Up @@ -867,27 +866,11 @@ func testGratuitousARP(t *testing.T, data *TestData, namespace string) {
// testClusterIdentity verifies that the antrea-cluster-identity ConfigMap is
// populated correctly by the Antrea Controller.
func testClusterIdentity(t *testing.T, data *TestData) {
clusterIdentityProvider := clusteridentity.NewClusterIdentityProvider(
antreaNamespace,
clusteridentity.DefaultClusterIdentityConfigMapName,
data.clientset,
)

const retryInterval = time.Second
const timeout = 10 * time.Second
var clusterUUID uuid.UUID
err := wait.PollUntilContextTimeout(context.Background(), retryInterval, timeout, true, func(ctx context.Context) (bool, error) {
clusterIdentity, _, err := clusterIdentityProvider.Get()
if err != nil {
return false, nil
}
clusterUUID = clusterIdentity.UUID
t.Logf("Cluster UUID: %v", clusterUUID)
return true, nil
})

assert.NoError(t, err, "Failed to retrieve cluster identity information within %v", timeout)
clusterUUID, err := data.getAntreaClusterUUID(timeout)
require.NoError(t, err, "Failed to retrieve cluster identity information within %v", timeout)
assert.NotEqual(t, uuid.Nil, clusterUUID)
t.Logf("Cluster UUID: %v", clusterUUID)
}

func testLogRotate(t *testing.T, data *TestData) {
Expand Down
17 changes: 15 additions & 2 deletions test/e2e/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ var (
podAIPs, podBIPs, podCIPs, podDIPs, podEIPs *PodIPs
serviceNames = []string{"perftest-a", "perftest-b", "perftest-c", "perftest-d", "perftest-e"}
podNames = serviceNames
// We use a global variable for this to avoid having to pass it down to all helper functions.
// It will be initialized the first time setupFlowAggregatorTest is called.
antreaClusterUUID = ""
)

type testFlow struct {
Expand Down Expand Up @@ -190,8 +193,15 @@ func setupFlowAggregatorTest(t *testing.T, options flowVisibilityTestOptions) (*
t.Fatalf("Error when setting up test: %v", err)
}
teardownFuncs = append(teardownFuncs, func() { teardownTest(t, data) })
err = setupFlowAggregator(t, data, options)
if err != nil {
// Make sure that antreaClusterUUID is set if this function is called for the first time.
if antreaClusterUUID == "" {
if uuid, err := data.getAntreaClusterUUID(10 * time.Second); err != nil {
t.Fatalf("Error when retrieving Antrea Cluster UUID: %v", err)
} else {
antreaClusterUUID = uuid.String()
}
}
if err := setupFlowAggregator(t, data, options); err != nil {
t.Fatalf("Error when setting up FlowAggregator: %v", err)
}
// Execute teardownFlowAggregator later than teardownTest to ensure that the logs of Flow
Expand All @@ -203,6 +213,7 @@ func setupFlowAggregatorTest(t *testing.T, options flowVisibilityTestOptions) (*
func TestFlowAggregatorSecureConnection(t *testing.T) {
skipIfNotFlowVisibilityTest(t)
skipIfHasWindowsNodes(t)

testCases := []struct {
flowVisibilityTestOptions
name string
Expand Down Expand Up @@ -1019,6 +1030,8 @@ func checkRecordsForFlowsCollector(t *testing.T, data *TestData, srcIP, dstIP, s
checkFlowType(t, record, ipfixregistry.FlowTypeInterNode)
}
assert := assert.New(t)
// Check the clusterId field, which should match the ClusterUUID generated by the Antrea Controller.
assert.Contains(record, fmt.Sprintf("clusterId: %s", antreaClusterUUID), "Record does not have the correct clusterId")
if checkService {
if isIntraNode {
assert.Contains(record, data.testNamespace+"/perftest-b", "Record with ServiceIP does not have Service name")
Expand Down
24 changes: 23 additions & 1 deletion test/e2e/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"time"

"github.com/containernetworking/plugins/pkg/ip"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
"golang.org/x/mod/semver"
"gopkg.in/yaml.v2"
Expand Down Expand Up @@ -60,6 +61,7 @@ import (

"antrea.io/antrea/pkg/agent/config"
crdclientset "antrea.io/antrea/pkg/client/clientset/versioned"
"antrea.io/antrea/pkg/clusteridentity"
agentconfig "antrea.io/antrea/pkg/config/agent"
controllerconfig "antrea.io/antrea/pkg/config/controller"
flowaggregatorconfig "antrea.io/antrea/pkg/config/flowaggregator"
Expand Down Expand Up @@ -127,7 +129,7 @@ const (
mcjoinImage = "antrea/mcjoin:v2.9"
nginxImage = "antrea/nginx:1.21.6-alpine"
iisImage = "mcr.microsoft.com/windows/servercore/iis"
ipfixCollectorImage = "antrea/ipfix-collector:v0.11.0"
ipfixCollectorImage = "antrea/ipfix-collector:v0.12.0"

nginxLBService = "nginx-loadbalancer"

Expand Down Expand Up @@ -3330,3 +3332,23 @@ func (data *TestData) waitForDeploymentReady(t *testing.T, namespace string, nam
}
return nil
}

func (data *TestData) getAntreaClusterUUID(timeout time.Duration) (uuid.UUID, error) {
clusterIdentityProvider := clusteridentity.NewClusterIdentityProvider(
antreaNamespace,
clusteridentity.DefaultClusterIdentityConfigMapName,
data.clientset,
)

const retryInterval = 1 * time.Second
var clusterUUID uuid.UUID
err := wait.PollUntilContextTimeout(context.Background(), retryInterval, timeout, true, func(ctx context.Context) (bool, error) {
clusterIdentity, _, err := clusterIdentityProvider.Get()
if err != nil {
return false, nil
}
clusterUUID = clusterIdentity.UUID
return true, nil
})
return clusterUUID, err
}

0 comments on commit f991c42

Please sign in to comment.