diff --git a/ci/kind/test-e2e-kind.sh b/ci/kind/test-e2e-kind.sh index 635191076e7..5adc2a8352c 100755 --- a/ci/kind/test-e2e-kind.sh +++ b/ci/kind/test-e2e-kind.sh @@ -253,7 +253,7 @@ COMMON_IMAGES_LIST=("registry.k8s.io/e2e-test-images/agnhost:2.40" \ "antrea/nginx:1.21.6-alpine" \ "antrea/toolbox:1.5-1") -FLOW_VISIBILITY_IMAGE_LIST=("antrea/ipfix-collector:v0.11.0" \ +FLOW_VISIBILITY_IMAGE_LIST=("antrea/ipfix-collector:v0.12.0" \ "antrea/clickhouse-operator:0.21.0" \ "antrea/metrics-exporter:0.21.0" \ "antrea/clickhouse-server:23.4") diff --git a/pkg/flowaggregator/exporter/ipfix.go b/pkg/flowaggregator/exporter/ipfix.go index 4826ca3c137..4601632ea60 100644 --- a/pkg/flowaggregator/exporter/ipfix.go +++ b/pkg/flowaggregator/exporter/ipfix.go @@ -327,12 +327,16 @@ func (e *IPFIXExporter) sendTemplateSet(isIPv6 bool) (int, error) { } elements = append(elements, ie) } + ie, err := e.createInfoElementForTemplateSet("clusterId", ipfixregistry.AntreaEnterpriseID) + if err != nil { + return 0, err + } + elements = append(elements, ie) e.set.ResetSet() if err := e.set.PrepareSet(ipfixentities.Template, templateID); err != nil { return 0, err } - err := e.set.AddRecordV2(elements, templateID) - if err != nil { + if err := e.set.AddRecordV2(elements, templateID); err != nil { return 0, fmt.Errorf("error when adding record to set, error: %v", err) } bytesSent, err := e.exportingProcess.SendSet(e.set) diff --git a/pkg/flowaggregator/exporter/ipfix_test.go b/pkg/flowaggregator/exporter/ipfix_test.go index 4d018a4ad7e..e7c374efcb4 100644 --- a/pkg/flowaggregator/exporter/ipfix_test.go +++ b/pkg/flowaggregator/exporter/ipfix_test.go @@ -296,6 +296,9 @@ func createElementList(isIPv6 bool, mockIPFIXRegistry *ipfixtesting.MockIPFIXReg elemList = append(elemList, createElement(ie, ipfixregistry.AntreaEnterpriseID)) mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil) } + elemList = append(elemList, createElement("clusterId", ipfixregistry.AntreaEnterpriseID)) + mockIPFIXRegistry.EXPECT().GetInfoElement("clusterId", ipfixregistry.AntreaEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil) + return elemList } diff --git a/pkg/flowaggregator/flowaggregator.go b/pkg/flowaggregator/flowaggregator.go index 3eb79c7b7ff..a5f4fba4365 100644 --- a/pkg/flowaggregator/flowaggregator.go +++ b/pkg/flowaggregator/flowaggregator.go @@ -259,6 +259,8 @@ func (fa *flowAggregator) InitCollectingProcess() error { } cpInput.NumExtraElements = len(infoelements.AntreaSourceStatsElementList) + len(infoelements.AntreaDestinationStatsElementList) + len(infoelements.AntreaLabelsElementList) + len(infoelements.AntreaFlowEndSecondsElementList) + len(infoelements.AntreaThroughputElementList) + len(infoelements.AntreaSourceThroughputElementList) + len(infoelements.AntreaDestinationThroughputElementList) + // clusterId + cpInput.NumExtraElements += 1 var err error fa.collectingProcess, err = collector.InitCollectingProcess(cpInput) return err @@ -424,6 +426,9 @@ func (fa *flowAggregator) sendFlowKeyRecord(key ipfixintermediate.FlowKey, recor // Even if fa.includePodLabels is false, we still need to add an empty IE to match the template. if !fa.aggregationProcess.AreExternalFieldsFilled(*record) { fa.fillPodLabels(key, record.Record, *startTime) + if err := fa.fillClusterID(record.Record); err != nil { + klog.ErrorS(err, "Failed to add clusterId") + } fa.aggregationProcess.SetExternalFieldsFilled(record, true) } if fa.ipfixExporter != nil { @@ -559,6 +564,17 @@ func (fa *flowAggregator) fillPodLabels(key ipfixintermediate.FlowKey, record ip } } +func (fa *flowAggregator) fillClusterID(record ipfixentities.Record) error { + ie, err := fa.registry.GetInfoElement("clusterId", ipfixregistry.AntreaEnterpriseID) + if err != nil { + return fmt.Errorf("error when getting clusterId InfoElement: %w", err) + } + if err := record.AddInfoElement(ipfixentities.NewStringInfoElement(ie, fa.clusterUUID.String())); err != nil { + return fmt.Errorf("error when adding clusterId InfoElement with value: %w", err) + } + return nil +} + func (fa *flowAggregator) GetFlowRecords(flowKey *ipfixintermediate.FlowKey) []map[string]interface{} { return fa.aggregationProcess.GetRecords(flowKey) } diff --git a/pkg/flowaggregator/flowaggregator_test.go b/pkg/flowaggregator/flowaggregator_test.go index 3429b480af7..09f1f906753 100644 --- a/pkg/flowaggregator/flowaggregator_test.go +++ b/pkg/flowaggregator/flowaggregator_test.go @@ -129,8 +129,10 @@ func TestFlowAggregator_sendFlowKeyRecord(t *testing.T) { mockRecord := ipfixentitiestesting.NewMockRecord(ctrl) mockAggregationProcess := ipfixtesting.NewMockIPFIXAggregationProcess(ctrl) + clusterUUID := uuid.New() newFlowAggregator := func(includePodLabels bool) *flowAggregator { return &flowAggregator{ + clusterUUID: clusterUUID, aggregatorTransportProtocol: "tcp", aggregationProcess: mockAggregationProcess, activeFlowRecordTimeout: testActiveTimeout, @@ -186,6 +188,10 @@ func TestFlowAggregator_sendFlowKeyRecord(t *testing.T) { mockIPFIXRegistry.EXPECT().GetInfoElement("destinationPodLabels", ipfixregistry.AntreaEnterpriseID).Return(destinationPodLabelsElement, nil) destinationPodLabelsIE := ipfixentities.NewStringInfoElement(destinationPodLabelsElement, podLabels) mockRecord.EXPECT().AddInfoElement(destinationPodLabelsIE).Return(nil) + clusterIDElement := ipfixentities.NewInfoElement("clusterId", 0, ipfixentities.String, ipfixregistry.AntreaEnterpriseID, 0) + mockIPFIXRegistry.EXPECT().GetInfoElement("clusterId", ipfixregistry.AntreaEnterpriseID).Return(clusterIDElement, nil) + clusterIDIE := ipfixentities.NewStringInfoElement(clusterIDElement, clusterUUID.String()) + mockRecord.EXPECT().AddInfoElement(clusterIDIE).Return(nil) mockAggregationProcess.EXPECT().SetExternalFieldsFilled(flowRecord, true) mockAggregationProcess.EXPECT().IsAggregatedRecordIPv4(*flowRecord).Return(!tc.isIPv6) diff --git a/test/e2e/basic_test.go b/test/e2e/basic_test.go index 0171f5a1937..ed19ff57701 100644 --- a/test/e2e/basic_test.go +++ b/test/e2e/basic_test.go @@ -36,7 +36,6 @@ import ( "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/openflow/cookie" crdv1beta1 "antrea.io/antrea/pkg/apis/crd/v1beta1" - "antrea.io/antrea/pkg/clusteridentity" ) // TestBasic is the top-level test which contains some subtests for @@ -867,27 +866,11 @@ func testGratuitousARP(t *testing.T, data *TestData, namespace string) { // testClusterIdentity verifies that the antrea-cluster-identity ConfigMap is // populated correctly by the Antrea Controller. func testClusterIdentity(t *testing.T, data *TestData) { - clusterIdentityProvider := clusteridentity.NewClusterIdentityProvider( - antreaNamespace, - clusteridentity.DefaultClusterIdentityConfigMapName, - data.clientset, - ) - - const retryInterval = time.Second const timeout = 10 * time.Second - var clusterUUID uuid.UUID - err := wait.PollUntilContextTimeout(context.Background(), retryInterval, timeout, true, func(ctx context.Context) (bool, error) { - clusterIdentity, _, err := clusterIdentityProvider.Get() - if err != nil { - return false, nil - } - clusterUUID = clusterIdentity.UUID - t.Logf("Cluster UUID: %v", clusterUUID) - return true, nil - }) - - assert.NoError(t, err, "Failed to retrieve cluster identity information within %v", timeout) + clusterUUID, err := data.getAntreaClusterUUID(timeout) + require.NoError(t, err, "Failed to retrieve cluster identity information within %v", timeout) assert.NotEqual(t, uuid.Nil, clusterUUID) + t.Logf("Cluster UUID: %v", clusterUUID) } func testLogRotate(t *testing.T, data *TestData) { diff --git a/test/e2e/flowaggregator_test.go b/test/e2e/flowaggregator_test.go index 12f3d7d5c93..fa4f50c469b 100644 --- a/test/e2e/flowaggregator_test.go +++ b/test/e2e/flowaggregator_test.go @@ -163,6 +163,9 @@ var ( podAIPs, podBIPs, podCIPs, podDIPs, podEIPs *PodIPs serviceNames = []string{"perftest-a", "perftest-b", "perftest-c", "perftest-d", "perftest-e"} podNames = serviceNames + // We use a global variable for this to avoid having to pass it down to all helper functions. + // It will be initialized the first time setupFlowAggregatorTest is called. + antreaClusterUUID = "" ) type testFlow struct { @@ -190,8 +193,15 @@ func setupFlowAggregatorTest(t *testing.T, options flowVisibilityTestOptions) (* t.Fatalf("Error when setting up test: %v", err) } teardownFuncs = append(teardownFuncs, func() { teardownTest(t, data) }) - err = setupFlowAggregator(t, data, options) - if err != nil { + // Make sure that antreaClusterUUID is set if this function is called for the first time. + if antreaClusterUUID == "" { + if uuid, err := data.getAntreaClusterUUID(10 * time.Second); err != nil { + t.Fatalf("Error when retrieving Antrea Cluster UUID: %v", err) + } else { + antreaClusterUUID = uuid.String() + } + } + if err := setupFlowAggregator(t, data, options); err != nil { t.Fatalf("Error when setting up FlowAggregator: %v", err) } // Execute teardownFlowAggregator later than teardownTest to ensure that the logs of Flow @@ -203,6 +213,7 @@ func setupFlowAggregatorTest(t *testing.T, options flowVisibilityTestOptions) (* func TestFlowAggregatorSecureConnection(t *testing.T) { skipIfNotFlowVisibilityTest(t) skipIfHasWindowsNodes(t) + testCases := []struct { flowVisibilityTestOptions name string @@ -1019,6 +1030,8 @@ func checkRecordsForFlowsCollector(t *testing.T, data *TestData, srcIP, dstIP, s checkFlowType(t, record, ipfixregistry.FlowTypeInterNode) } assert := assert.New(t) + // Check the clusterId field, which should match the ClusterUUID generated by the Antrea Controller. + assert.Contains(record, fmt.Sprintf("clusterId: %s", antreaClusterUUID), "Record does not have the correct clusterId") if checkService { if isIntraNode { assert.Contains(record, data.testNamespace+"/perftest-b", "Record with ServiceIP does not have Service name") diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 576f98db8c3..61456335450 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -33,6 +33,7 @@ import ( "time" "github.com/containernetworking/plugins/pkg/ip" + "github.com/google/uuid" log "github.com/sirupsen/logrus" "golang.org/x/mod/semver" "gopkg.in/yaml.v2" @@ -60,6 +61,7 @@ import ( "antrea.io/antrea/pkg/agent/config" crdclientset "antrea.io/antrea/pkg/client/clientset/versioned" + "antrea.io/antrea/pkg/clusteridentity" agentconfig "antrea.io/antrea/pkg/config/agent" controllerconfig "antrea.io/antrea/pkg/config/controller" flowaggregatorconfig "antrea.io/antrea/pkg/config/flowaggregator" @@ -127,7 +129,7 @@ const ( mcjoinImage = "antrea/mcjoin:v2.9" nginxImage = "antrea/nginx:1.21.6-alpine" iisImage = "mcr.microsoft.com/windows/servercore/iis" - ipfixCollectorImage = "antrea/ipfix-collector:v0.11.0" + ipfixCollectorImage = "antrea/ipfix-collector:v0.12.0" nginxLBService = "nginx-loadbalancer" @@ -3330,3 +3332,23 @@ func (data *TestData) waitForDeploymentReady(t *testing.T, namespace string, nam } return nil } + +func (data *TestData) getAntreaClusterUUID(timeout time.Duration) (uuid.UUID, error) { + clusterIdentityProvider := clusteridentity.NewClusterIdentityProvider( + antreaNamespace, + clusteridentity.DefaultClusterIdentityConfigMapName, + data.clientset, + ) + + const retryInterval = 1 * time.Second + var clusterUUID uuid.UUID + err := wait.PollUntilContextTimeout(context.Background(), retryInterval, timeout, true, func(ctx context.Context) (bool, error) { + clusterIdentity, _, err := clusterIdentityProvider.Get() + if err != nil { + return false, nil + } + clusterUUID = clusterIdentity.UUID + return true, nil + }) + return clusterUUID, err +}