Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Proxy mode for FlowAggregator #6920

Merged
merged 2 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build/charts/flow-aggregator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ Kubernetes: `>= 1.19.0-0`
| image | object | `{"pullPolicy":"IfNotPresent","repository":"antrea/flow-aggregator","tag":""}` | Container image used by Flow Aggregator. |
| inactiveFlowRecordTimeout | string | `"90s"` | Provide the inactive flow record timeout as a duration string. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". |
| logVerbosity | int | `0` | Log verbosity switch for Flow Aggregator. |
| mode | string | `"Aggregate"` | Mode in which to run the flow aggregator. Must be one of "Aggregate" or "Proxy". In Aggregate mode, flow records received from source and destination are aggregated and sent as one flow record. In Proxy mode, flow records are enhanced with some additional information, then sent directly without buffering or aggregation. |
| recordContents.podLabels | bool | `false` | Determine whether source and destination Pod labels will be included in the flow records. |
| s3Uploader.awsCredentials | object | `{"aws_access_key_id":"changeme","aws_secret_access_key":"changeme","aws_session_token":""}` | Credentials to authenticate to AWS. They will be stored in a Secret and injected into the Pod as environment variables. |
| s3Uploader.bucketName | string | `""` | BucketName is the name of the S3 bucket to which flow records will be uploaded. It is required. |
Expand Down
6 changes: 6 additions & 0 deletions build/charts/flow-aggregator/conf/flow-aggregator.conf
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# Mode in which to run the flow aggregator. Must be one of "Aggregate" or "Proxy". In Aggregate
# mode, flow records received from source and destination are aggregated and sent as one flow
# record. In Proxy mode, flow records are enhanced with some additional information, then sent
# directly without buffering or aggregation.
mode: {{ .Values.mode }}

# Provide the active flow record timeout as a duration string. This determines
# how often the flow aggregator exports the active flow records to the flow
# collector. Thus, for flows with a continuous stream of packets, a flow record
Expand Down
5 changes: 5 additions & 0 deletions build/charts/flow-aggregator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ image:
pullPolicy: "IfNotPresent"
tag: ""

# -- Mode in which to run the flow aggregator. Must be one of "Aggregate" or "Proxy". In Aggregate
# mode, flow records received from source and destination are aggregated and sent as one flow
# record. In Proxy mode, flow records are enhanced with some additional information, then sent
# directly without buffering or aggregation.
mode: "Aggregate"
# -- Provide the active flow record timeout as a duration string.
# Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
activeFlowRecordTimeout: 60s
Expand Down
6 changes: 6 additions & 0 deletions build/yamls/flow-aggregator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ subjects:
apiVersion: v1
data:
flow-aggregator.conf: |
# Mode in which to run the flow aggregator. Must be one of "Aggregate" or "Proxy". In Aggregate
# mode, flow records received from source and destination are aggregated and sent as one flow
# record. In Proxy mode, flow records are enhanced with some additional information, then sent
# directly without buffering or aggregation.
mode: Aggregate

# Provide the active flow record timeout as a duration string. This determines
# how often the flow aggregator exports the active flow records to the flow
# collector. Thus, for flows with a continuous stream of packets, a flow record
Expand Down
13 changes: 13 additions & 0 deletions pkg/config/flowaggregator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,20 @@ const (
AggregatorTransportProtocolUDP AggregatorTransportProtocol = "UDP"
)

type AggregatorMode string

const (
// In Aggregate mode, flow records received from source and destination are aggregated and
// sent as one flow record.
AggregatorModeAggregate AggregatorMode = "Aggregate"
// In Proxy mode, flow records are enhanced with some additional information, then sent
// directly without buffering or aggregation.
AggregatorModeProxy AggregatorMode = "Proxy"
)

type FlowAggregatorConfig struct {
// Mode in which to run the flow aggregator. Must be one of "Aggregate" or "Proxy".
Mode AggregatorMode `yaml:"mode,omitempty"`
// Provide the active flow record timeout as a duration string. This determines
// how often the flow aggregator exports the active flow records to the flow
// collector. Thus, for flows with a continuous stream of packets, a flow record
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/flowaggregator/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ const (
)

func SetConfigDefaults(flowAggregatorConf *FlowAggregatorConfig) {
if flowAggregatorConf.Mode == "" {
flowAggregatorConf.Mode = AggregatorModeAggregate
}
if flowAggregatorConf.ActiveFlowRecordTimeout == "" {
flowAggregatorConf.ActiveFlowRecordTimeout = DefaultActiveFlowRecordTimeout
}
Expand Down
97 changes: 59 additions & 38 deletions pkg/flowaggregator/exporter/ipfix.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type IPFIXExporter struct {
externalFlowCollectorProto string
exportingProcess ipfix.IPFIXExportingProcess
sendJSONRecord bool
aggregatorMode flowaggregatorconfig.AggregatorMode
observationDomainID uint32
templateRefreshTimeout time.Duration
templateIDv4 uint16
Expand Down Expand Up @@ -88,6 +89,7 @@ func NewIPFIXExporter(
externalFlowCollectorAddr: opt.ExternalFlowCollectorAddr,
externalFlowCollectorProto: opt.ExternalFlowCollectorProto,
sendJSONRecord: sendJSONRecord,
aggregatorMode: opt.AggregatorMode,
observationDomainID: observationDomainID,
templateRefreshTimeout: opt.TemplateRefreshTimeout,
registry: registry,
Expand Down Expand Up @@ -274,66 +276,85 @@ func (e *IPFIXExporter) sendTemplateSet(isIPv6 bool) (int, error) {
}
elements = append(elements, ie)
}
// The order of source and destination stats elements needs to match the order specified in
// addFieldsForStatsAggregation method in go-ipfix aggregation process.
for i := range infoelements.StatsElementList {
// Add Antrea source stats fields
ieName := infoelements.AntreaSourceStatsElementList[i]
ie, err := e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID)
if err != nil {
return 0, err
if e.aggregatorMode == flowaggregatorconfig.AggregatorModeAggregate {
// The order of source and destination stats elements needs to match the order specified in
// addFieldsForStatsAggregation method in go-ipfix aggregation process.
for i := range infoelements.StatsElementList {
// Add Antrea source stats fields
ieName := infoelements.AntreaSourceStatsElementList[i]
ie, err := e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID)
if err != nil {
return 0, err
}
elements = append(elements, ie)
// Add Antrea destination stats fields
ieName = infoelements.AntreaDestinationStatsElementList[i]
ie, err = e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID)
if err != nil {
return 0, err
}
elements = append(elements, ie)
}
elements = append(elements, ie)
// Add Antrea destination stats fields
ieName = infoelements.AntreaDestinationStatsElementList[i]
ie, err = e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID)
if err != nil {
return 0, err
for _, ie := range infoelements.AntreaFlowEndSecondsElementList {
ie, err := e.createInfoElementForTemplateSet(ie, ipfixregistry.AntreaEnterpriseID)
if err != nil {
return 0, err
}
elements = append(elements, ie)
}
for i := range infoelements.AntreaThroughputElementList {
// Add common throughput fields
ieName := infoelements.AntreaThroughputElementList[i]
ie, err := e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID)
if err != nil {
return 0, err
}
elements = append(elements, ie)
// Add source node specific throughput fields
ieName = infoelements.AntreaSourceThroughputElementList[i]
ie, err = e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID)
if err != nil {
return 0, err
}
elements = append(elements, ie)
// Add destination node specific throughput fields
ieName = infoelements.AntreaDestinationThroughputElementList[i]
ie, err = e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID)
if err != nil {
return 0, err
}
elements = append(elements, ie)
}
elements = append(elements, ie)
}
for _, ie := range infoelements.AntreaFlowEndSecondsElementList {
for _, ie := range infoelements.AntreaLabelsElementList {
ie, err := e.createInfoElementForTemplateSet(ie, ipfixregistry.AntreaEnterpriseID)
if err != nil {
return 0, err
}
elements = append(elements, ie)
}
for i := range infoelements.AntreaThroughputElementList {
// Add common throughput fields
ieName := infoelements.AntreaThroughputElementList[i]
ie, err := e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID)
if err != nil {
return 0, err
}
elements = append(elements, ie)
// Add source node specific throughput fields
ieName = infoelements.AntreaSourceThroughputElementList[i]
ie, err = e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID)
ie, err := e.createInfoElementForTemplateSet("clusterId", ipfixregistry.AntreaEnterpriseID)
if err != nil {
return 0, err
}
elements = append(elements, ie)
if e.aggregatorMode == flowaggregatorconfig.AggregatorModeProxy {
ie, err := e.createInfoElementForTemplateSet("originalObservationDomainId", ipfixregistry.IANAEnterpriseID)
if err != nil {
return 0, err
}
elements = append(elements, ie)
// Add destination node specific throughput fields
ieName = infoelements.AntreaDestinationThroughputElementList[i]
ie, err = e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID)
ie, err = e.createInfoElementForTemplateSet("originalExporterIPv4Address", ipfixregistry.IANAEnterpriseID)
if err != nil {
return 0, err
}
elements = append(elements, ie)
}
for _, ie := range infoelements.AntreaLabelsElementList {
ie, err := e.createInfoElementForTemplateSet(ie, ipfixregistry.AntreaEnterpriseID)
ie, err = e.createInfoElementForTemplateSet("originalExporterIPv6Address", ipfixregistry.IANAEnterpriseID)
if err != nil {
return 0, err
}
elements = append(elements, ie)
}
ie, err := e.createInfoElementForTemplateSet("clusterId", ipfixregistry.AntreaEnterpriseID)
if err != nil {
return 0, err
}
elements = append(elements, ie)
e.set.ResetSet()
if err := e.set.PrepareSet(ipfixentities.Template, templateID); err != nil {
return 0, err
Expand Down
25 changes: 18 additions & 7 deletions pkg/flowaggregator/exporter/ipfix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func TestIPFIXExporter_sendTemplateSet(t *testing.T) {
templateIDv6: testTemplateIDv6,
registry: mockIPFIXRegistry,
set: mockTempSet,
aggregatorMode: flowaggregatorconfig.AggregatorModeAggregate,
observationDomainID: testObservationDomainID,
}
elemList := createElementList(isIPv6, mockIPFIXRegistry)
Expand Down Expand Up @@ -118,13 +119,14 @@ func TestIPFIXExporter_UpdateOptions(t *testing.T) {
RecordFormat: "IPFIX",
},
}
ipfixExporter := IPFIXExporter{
ipfixExporter := &IPFIXExporter{
config: config.FlowCollector,
externalFlowCollectorAddr: "",
externalFlowCollectorProto: "",
templateIDv4: testTemplateIDv4,
templateIDv6: testTemplateIDv6,
set: mockSet,
aggregatorMode: flowaggregatorconfig.AggregatorModeAggregate,
observationDomainID: testObservationDomainID,
}
testTemplateID := testTemplateIDv4
Expand Down Expand Up @@ -182,12 +184,13 @@ func TestIPFIXExporter_AddRecord(t *testing.T) {
initIPFIXExportingProcess = initIPFIXExportingProcessSaved
}()

ipfixExporter := IPFIXExporter{
ipfixExporter := &IPFIXExporter{
externalFlowCollectorAddr: "",
externalFlowCollectorProto: "",
templateIDv4: testTemplateIDv4,
templateIDv6: testTemplateIDv6,
set: mockSet,
aggregatorMode: flowaggregatorconfig.AggregatorModeAggregate,
observationDomainID: testObservationDomainID,
}
testTemplateID := testTemplateIDv4
Expand Down Expand Up @@ -216,9 +219,10 @@ func TestIPFIXExporter_initIPFIXExportingProcess_Error(t *testing.T) {
initIPFIXExportingProcess = initIPFIXExportingProcessSaved
}()

ipfixExporter := IPFIXExporter{
ipfixExporter := &IPFIXExporter{
externalFlowCollectorAddr: "",
externalFlowCollectorProto: "",
aggregatorMode: flowaggregatorconfig.AggregatorModeAggregate,
}

assert.Error(t, ipfixExporter.AddRecord(mockRecord, false))
Expand All @@ -231,13 +235,14 @@ func TestIPFIXExporter_sendRecord_Error(t *testing.T) {
mockSet := ipfixentitiestesting.NewMockSet(ctrl)
mockRecord := ipfixentitiestesting.NewMockRecord(ctrl)

ipfixExporter := IPFIXExporter{
ipfixExporter := &IPFIXExporter{
externalFlowCollectorAddr: "",
externalFlowCollectorProto: "",
exportingProcess: mockIPFIXExpProc,
templateIDv4: testTemplateIDv4,
templateIDv6: testTemplateIDv6,
set: mockSet,
aggregatorMode: flowaggregatorconfig.AggregatorModeAggregate,
observationDomainID: testObservationDomainID,
}
testTemplateID := testTemplateIDv4
Expand Down Expand Up @@ -308,7 +313,9 @@ func TestInitExportingProcess(t *testing.T) {
t.Run("tcp success", func(t *testing.T) {
ctrl := gomock.NewController(t)
mockIPFIXRegistry := ipfixtesting.NewMockIPFIXRegistry(ctrl)
opt := &options.Options{}
opt := &options.Options{
AggregatorMode: flowaggregatorconfig.AggregatorModeAggregate,
}
opt.Config = &flowaggregatorconfig.FlowAggregatorConfig{}
flowaggregatorconfig.SetConfigDefaults(opt.Config)
listener, err := net.Listen("tcp", "127.0.0.1:0")
Expand All @@ -326,7 +333,9 @@ func TestInitExportingProcess(t *testing.T) {
t.Run("udp success", func(t *testing.T) {
ctrl := gomock.NewController(t)
mockIPFIXRegistry := ipfixtesting.NewMockIPFIXRegistry(ctrl)
opt := &options.Options{}
opt := &options.Options{
AggregatorMode: flowaggregatorconfig.AggregatorModeAggregate,
}
opt.Config = &flowaggregatorconfig.FlowAggregatorConfig{}
flowaggregatorconfig.SetConfigDefaults(opt.Config)
udpAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
Expand All @@ -346,7 +355,9 @@ func TestInitExportingProcess(t *testing.T) {
t.Run("tcp failure", func(t *testing.T) {
ctrl := gomock.NewController(t)
mockIPFIXRegistry := ipfixtesting.NewMockIPFIXRegistry(ctrl)
opt := &options.Options{}
opt := &options.Options{
AggregatorMode: flowaggregatorconfig.AggregatorModeAggregate,
}
opt.Config = &flowaggregatorconfig.FlowAggregatorConfig{}
flowaggregatorconfig.SetConfigDefaults(opt.Config)
// dialing this address is guaranteed to fail (we use 0 as the port number)
Expand Down
Loading
Loading