From f991c42d55799d42f18cb35f55d2a2c745a429d3 Mon Sep 17 00:00:00 2001 From: Antonin Bas Date: Wed, 8 Jan 2025 11:24:46 -0800 Subject: [PATCH] [FlowAggregator] Add clusterId to aggregated records (#6769) In order to allow external flow collectors to easily identify which cluster the record is coming from (rather than relying on the observationDomainId or the exporter's IP address). We use the cluster UUID (generated by Antrea) to populate this IE. Signed-off-by: Antonin Bas --- ci/kind/test-e2e-kind.sh | 2 +- pkg/flowaggregator/exporter/ipfix.go | 8 ++++++-- pkg/flowaggregator/exporter/ipfix_test.go | 3 +++ pkg/flowaggregator/flowaggregator.go | 16 +++++++++++++++ pkg/flowaggregator/flowaggregator_test.go | 6 ++++++ test/e2e/basic_test.go | 23 +++------------------- test/e2e/flowaggregator_test.go | 17 ++++++++++++++-- test/e2e/framework.go | 24 ++++++++++++++++++++++- 8 files changed, 73 insertions(+), 26 deletions(-) diff --git a/ci/kind/test-e2e-kind.sh b/ci/kind/test-e2e-kind.sh index 635191076e7..5adc2a8352c 100755 --- a/ci/kind/test-e2e-kind.sh +++ b/ci/kind/test-e2e-kind.sh @@ -253,7 +253,7 @@ COMMON_IMAGES_LIST=("registry.k8s.io/e2e-test-images/agnhost:2.40" \ "antrea/nginx:1.21.6-alpine" \ "antrea/toolbox:1.5-1") -FLOW_VISIBILITY_IMAGE_LIST=("antrea/ipfix-collector:v0.11.0" \ +FLOW_VISIBILITY_IMAGE_LIST=("antrea/ipfix-collector:v0.12.0" \ "antrea/clickhouse-operator:0.21.0" \ "antrea/metrics-exporter:0.21.0" \ "antrea/clickhouse-server:23.4") diff --git a/pkg/flowaggregator/exporter/ipfix.go b/pkg/flowaggregator/exporter/ipfix.go index 4826ca3c137..4601632ea60 100644 --- a/pkg/flowaggregator/exporter/ipfix.go +++ b/pkg/flowaggregator/exporter/ipfix.go @@ -327,12 +327,16 @@ func (e *IPFIXExporter) sendTemplateSet(isIPv6 bool) (int, error) { } elements = append(elements, ie) } + ie, err := e.createInfoElementForTemplateSet("clusterId", ipfixregistry.AntreaEnterpriseID) + if err != nil { + return 0, err + } + elements = append(elements, ie) e.set.ResetSet() if err := e.set.PrepareSet(ipfixentities.Template, templateID); err != nil { return 0, err } - err := e.set.AddRecordV2(elements, templateID) - if err != nil { + if err := e.set.AddRecordV2(elements, templateID); err != nil { return 0, fmt.Errorf("error when adding record to set, error: %v", err) } bytesSent, err := e.exportingProcess.SendSet(e.set) diff --git a/pkg/flowaggregator/exporter/ipfix_test.go b/pkg/flowaggregator/exporter/ipfix_test.go index 4d018a4ad7e..e7c374efcb4 100644 --- a/pkg/flowaggregator/exporter/ipfix_test.go +++ b/pkg/flowaggregator/exporter/ipfix_test.go @@ -296,6 +296,9 @@ func createElementList(isIPv6 bool, mockIPFIXRegistry *ipfixtesting.MockIPFIXReg elemList = append(elemList, createElement(ie, ipfixregistry.AntreaEnterpriseID)) mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil) } + elemList = append(elemList, createElement("clusterId", ipfixregistry.AntreaEnterpriseID)) + mockIPFIXRegistry.EXPECT().GetInfoElement("clusterId", ipfixregistry.AntreaEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil) + return elemList } diff --git a/pkg/flowaggregator/flowaggregator.go b/pkg/flowaggregator/flowaggregator.go index 3eb79c7b7ff..a5f4fba4365 100644 --- a/pkg/flowaggregator/flowaggregator.go +++ b/pkg/flowaggregator/flowaggregator.go @@ -259,6 +259,8 @@ func (fa *flowAggregator) InitCollectingProcess() error { } cpInput.NumExtraElements = len(infoelements.AntreaSourceStatsElementList) + len(infoelements.AntreaDestinationStatsElementList) + len(infoelements.AntreaLabelsElementList) + len(infoelements.AntreaFlowEndSecondsElementList) + len(infoelements.AntreaThroughputElementList) + len(infoelements.AntreaSourceThroughputElementList) + len(infoelements.AntreaDestinationThroughputElementList) + // clusterId + cpInput.NumExtraElements += 1 var err error fa.collectingProcess, err = collector.InitCollectingProcess(cpInput) return err @@ -424,6 +426,9 @@ func (fa *flowAggregator) sendFlowKeyRecord(key ipfixintermediate.FlowKey, recor // Even if fa.includePodLabels is false, we still need to add an empty IE to match the template. if !fa.aggregationProcess.AreExternalFieldsFilled(*record) { fa.fillPodLabels(key, record.Record, *startTime) + if err := fa.fillClusterID(record.Record); err != nil { + klog.ErrorS(err, "Failed to add clusterId") + } fa.aggregationProcess.SetExternalFieldsFilled(record, true) } if fa.ipfixExporter != nil { @@ -559,6 +564,17 @@ func (fa *flowAggregator) fillPodLabels(key ipfixintermediate.FlowKey, record ip } } +func (fa *flowAggregator) fillClusterID(record ipfixentities.Record) error { + ie, err := fa.registry.GetInfoElement("clusterId", ipfixregistry.AntreaEnterpriseID) + if err != nil { + return fmt.Errorf("error when getting clusterId InfoElement: %w", err) + } + if err := record.AddInfoElement(ipfixentities.NewStringInfoElement(ie, fa.clusterUUID.String())); err != nil { + return fmt.Errorf("error when adding clusterId InfoElement with value: %w", err) + } + return nil +} + func (fa *flowAggregator) GetFlowRecords(flowKey *ipfixintermediate.FlowKey) []map[string]interface{} { return fa.aggregationProcess.GetRecords(flowKey) } diff --git a/pkg/flowaggregator/flowaggregator_test.go b/pkg/flowaggregator/flowaggregator_test.go index 3429b480af7..09f1f906753 100644 --- a/pkg/flowaggregator/flowaggregator_test.go +++ b/pkg/flowaggregator/flowaggregator_test.go @@ -129,8 +129,10 @@ func TestFlowAggregator_sendFlowKeyRecord(t *testing.T) { mockRecord := ipfixentitiestesting.NewMockRecord(ctrl) mockAggregationProcess := ipfixtesting.NewMockIPFIXAggregationProcess(ctrl) + clusterUUID := uuid.New() newFlowAggregator := func(includePodLabels bool) *flowAggregator { return &flowAggregator{ + clusterUUID: clusterUUID, aggregatorTransportProtocol: "tcp", aggregationProcess: mockAggregationProcess, activeFlowRecordTimeout: testActiveTimeout, @@ -186,6 +188,10 @@ func TestFlowAggregator_sendFlowKeyRecord(t *testing.T) { mockIPFIXRegistry.EXPECT().GetInfoElement("destinationPodLabels", ipfixregistry.AntreaEnterpriseID).Return(destinationPodLabelsElement, nil) destinationPodLabelsIE := ipfixentities.NewStringInfoElement(destinationPodLabelsElement, podLabels) mockRecord.EXPECT().AddInfoElement(destinationPodLabelsIE).Return(nil) + clusterIDElement := ipfixentities.NewInfoElement("clusterId", 0, ipfixentities.String, ipfixregistry.AntreaEnterpriseID, 0) + mockIPFIXRegistry.EXPECT().GetInfoElement("clusterId", ipfixregistry.AntreaEnterpriseID).Return(clusterIDElement, nil) + clusterIDIE := ipfixentities.NewStringInfoElement(clusterIDElement, clusterUUID.String()) + mockRecord.EXPECT().AddInfoElement(clusterIDIE).Return(nil) mockAggregationProcess.EXPECT().SetExternalFieldsFilled(flowRecord, true) mockAggregationProcess.EXPECT().IsAggregatedRecordIPv4(*flowRecord).Return(!tc.isIPv6) diff --git a/test/e2e/basic_test.go b/test/e2e/basic_test.go index 0171f5a1937..ed19ff57701 100644 --- a/test/e2e/basic_test.go +++ b/test/e2e/basic_test.go @@ -36,7 +36,6 @@ import ( "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/openflow/cookie" crdv1beta1 "antrea.io/antrea/pkg/apis/crd/v1beta1" - "antrea.io/antrea/pkg/clusteridentity" ) // TestBasic is the top-level test which contains some subtests for @@ -867,27 +866,11 @@ func testGratuitousARP(t *testing.T, data *TestData, namespace string) { // testClusterIdentity verifies that the antrea-cluster-identity ConfigMap is // populated correctly by the Antrea Controller. func testClusterIdentity(t *testing.T, data *TestData) { - clusterIdentityProvider := clusteridentity.NewClusterIdentityProvider( - antreaNamespace, - clusteridentity.DefaultClusterIdentityConfigMapName, - data.clientset, - ) - - const retryInterval = time.Second const timeout = 10 * time.Second - var clusterUUID uuid.UUID - err := wait.PollUntilContextTimeout(context.Background(), retryInterval, timeout, true, func(ctx context.Context) (bool, error) { - clusterIdentity, _, err := clusterIdentityProvider.Get() - if err != nil { - return false, nil - } - clusterUUID = clusterIdentity.UUID - t.Logf("Cluster UUID: %v", clusterUUID) - return true, nil - }) - - assert.NoError(t, err, "Failed to retrieve cluster identity information within %v", timeout) + clusterUUID, err := data.getAntreaClusterUUID(timeout) + require.NoError(t, err, "Failed to retrieve cluster identity information within %v", timeout) assert.NotEqual(t, uuid.Nil, clusterUUID) + t.Logf("Cluster UUID: %v", clusterUUID) } func testLogRotate(t *testing.T, data *TestData) { diff --git a/test/e2e/flowaggregator_test.go b/test/e2e/flowaggregator_test.go index 12f3d7d5c93..fa4f50c469b 100644 --- a/test/e2e/flowaggregator_test.go +++ b/test/e2e/flowaggregator_test.go @@ -163,6 +163,9 @@ var ( podAIPs, podBIPs, podCIPs, podDIPs, podEIPs *PodIPs serviceNames = []string{"perftest-a", "perftest-b", "perftest-c", "perftest-d", "perftest-e"} podNames = serviceNames + // We use a global variable for this to avoid having to pass it down to all helper functions. + // It will be initialized the first time setupFlowAggregatorTest is called. + antreaClusterUUID = "" ) type testFlow struct { @@ -190,8 +193,15 @@ func setupFlowAggregatorTest(t *testing.T, options flowVisibilityTestOptions) (* t.Fatalf("Error when setting up test: %v", err) } teardownFuncs = append(teardownFuncs, func() { teardownTest(t, data) }) - err = setupFlowAggregator(t, data, options) - if err != nil { + // Make sure that antreaClusterUUID is set if this function is called for the first time. + if antreaClusterUUID == "" { + if uuid, err := data.getAntreaClusterUUID(10 * time.Second); err != nil { + t.Fatalf("Error when retrieving Antrea Cluster UUID: %v", err) + } else { + antreaClusterUUID = uuid.String() + } + } + if err := setupFlowAggregator(t, data, options); err != nil { t.Fatalf("Error when setting up FlowAggregator: %v", err) } // Execute teardownFlowAggregator later than teardownTest to ensure that the logs of Flow @@ -203,6 +213,7 @@ func setupFlowAggregatorTest(t *testing.T, options flowVisibilityTestOptions) (* func TestFlowAggregatorSecureConnection(t *testing.T) { skipIfNotFlowVisibilityTest(t) skipIfHasWindowsNodes(t) + testCases := []struct { flowVisibilityTestOptions name string @@ -1019,6 +1030,8 @@ func checkRecordsForFlowsCollector(t *testing.T, data *TestData, srcIP, dstIP, s checkFlowType(t, record, ipfixregistry.FlowTypeInterNode) } assert := assert.New(t) + // Check the clusterId field, which should match the ClusterUUID generated by the Antrea Controller. + assert.Contains(record, fmt.Sprintf("clusterId: %s", antreaClusterUUID), "Record does not have the correct clusterId") if checkService { if isIntraNode { assert.Contains(record, data.testNamespace+"/perftest-b", "Record with ServiceIP does not have Service name") diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 576f98db8c3..61456335450 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -33,6 +33,7 @@ import ( "time" "github.com/containernetworking/plugins/pkg/ip" + "github.com/google/uuid" log "github.com/sirupsen/logrus" "golang.org/x/mod/semver" "gopkg.in/yaml.v2" @@ -60,6 +61,7 @@ import ( "antrea.io/antrea/pkg/agent/config" crdclientset "antrea.io/antrea/pkg/client/clientset/versioned" + "antrea.io/antrea/pkg/clusteridentity" agentconfig "antrea.io/antrea/pkg/config/agent" controllerconfig "antrea.io/antrea/pkg/config/controller" flowaggregatorconfig "antrea.io/antrea/pkg/config/flowaggregator" @@ -127,7 +129,7 @@ const ( mcjoinImage = "antrea/mcjoin:v2.9" nginxImage = "antrea/nginx:1.21.6-alpine" iisImage = "mcr.microsoft.com/windows/servercore/iis" - ipfixCollectorImage = "antrea/ipfix-collector:v0.11.0" + ipfixCollectorImage = "antrea/ipfix-collector:v0.12.0" nginxLBService = "nginx-loadbalancer" @@ -3330,3 +3332,23 @@ func (data *TestData) waitForDeploymentReady(t *testing.T, namespace string, nam } return nil } + +func (data *TestData) getAntreaClusterUUID(timeout time.Duration) (uuid.UUID, error) { + clusterIdentityProvider := clusteridentity.NewClusterIdentityProvider( + antreaNamespace, + clusteridentity.DefaultClusterIdentityConfigMapName, + data.clientset, + ) + + const retryInterval = 1 * time.Second + var clusterUUID uuid.UUID + err := wait.PollUntilContextTimeout(context.Background(), retryInterval, timeout, true, func(ctx context.Context) (bool, error) { + clusterIdentity, _, err := clusterIdentityProvider.Get() + if err != nil { + return false, nil + } + clusterUUID = clusterIdentity.UUID + return true, nil + }) + return clusterUUID, err +}