diff --git a/felix/calc/event_sequencer.go b/felix/calc/event_sequencer.go index 68329bc89fc..168e3fafe0c 100644 --- a/felix/calc/event_sequencer.go +++ b/felix/calc/event_sequencer.go @@ -468,6 +468,12 @@ func ModelWorkloadEndpointToProto(ep *model.WorkloadEndpoint, peerData *Endpoint } func ModelHostEndpointToProto(ep *model.HostEndpoint, tiers, untrackedTiers, preDNATTiers []*proto.TierInfo, forwardTiers []*proto.TierInfo) *proto.HostEndpoint { + var qosPolicies []*proto.QoSPolicy + if ep.QoSControls != nil && ep.QoSControls.DSCP != nil { + qosPolicies = append(qosPolicies, &proto.QoSPolicy{ + Dscp: int32(ep.QoSControls.DSCP.ToUint8()), + }) + } return &proto.HostEndpoint{ Name: ep.Name, ExpectedIpv4Addrs: ipsToStrings(ep.ExpectedIPv4Addrs), @@ -477,6 +483,7 @@ func ModelHostEndpointToProto(ep *model.HostEndpoint, tiers, untrackedTiers, pre UntrackedTiers: untrackedTiers, PreDnatTiers: preDNATTiers, ForwardTiers: forwardTiers, + QosPolicies: qosPolicies, } } diff --git a/felix/calc/event_sequencer_test.go b/felix/calc/event_sequencer_test.go index df71502ec5e..14f79468d63 100644 --- a/felix/calc/event_sequencer_test.go +++ b/felix/calc/event_sequencer_test.go @@ -278,6 +278,37 @@ var _ = DescribeTable("ModelHostEndpointToProto", ProfileIds: []string{"prof1"}, }, ), + Entry("host endpoint with QoSControls", + model.HostEndpoint{ + Name: "eth0", + ExpectedIPv4Addrs: []net.IP{mustParseIP("10.28.0.13"), mustParseIP("10.28.0.14")}, + ExpectedIPv6Addrs: []net.IP{mustParseIP("dead::beef"), mustParseIP("dead::bee5")}, + Labels: uniquelabels.Make(map[string]string{ + "a": "b", + }), + ProfileIDs: []string{"prof1"}, + QoSControls: &model.QoSControls{ + DSCP: &dscp, + }, + }, + []*proto.TierInfo{{Name: "a", IngressPolicies: []string{"b"}}}, + []*proto.TierInfo{{Name: "a", EgressPolicies: []string{"c"}}}, + []*proto.TierInfo{{Name: "a", EgressPolicies: []string{"d"}}}, + &proto.HostEndpoint{ + Name: "eth0", + ExpectedIpv4Addrs: []string{"10.28.0.13", "10.28.0.14"}, + ExpectedIpv6Addrs: []string{"dead::beef", "dead::bee5"}, + Tiers: []*proto.TierInfo{{Name: "a", IngressPolicies: []string{"b"}}}, + UntrackedTiers: []*proto.TierInfo{{Name: "a", EgressPolicies: []string{"c"}}}, + ForwardTiers: []*proto.TierInfo{{Name: "a", EgressPolicies: []string{"d"}}}, + ProfileIds: []string{"prof1"}, + QosPolicies: []*proto.QoSPolicy{ + &proto.QoSPolicy{ + Dscp: 38, + }, + }, + }, + ), ) var _ = Describe("ServiceAccount update/remove", func() { diff --git a/felix/dataplane/linux/qos_policy_mgr.go b/felix/dataplane/linux/qos_policy_mgr.go index 4178dbd9d28..108473051c9 100644 --- a/felix/dataplane/linux/qos_policy_mgr.go +++ b/felix/dataplane/linux/qos_policy_mgr.go @@ -28,11 +28,12 @@ import ( type qosPolicyManager struct { ipVersion uint8 ruleRenderer rules.RuleRenderer + mangleTable Table - // QoS policy - mangleTable Table + // QoS policies. + wepPolicies map[types.WorkloadEndpointID]rules.QoSPolicy + hepPolicies map[types.HostEndpointID]rules.QoSPolicy dirty bool - policies map[types.WorkloadEndpointID]rules.QoSPolicy logCxt *logrus.Entry } @@ -43,30 +44,66 @@ func newQoSPolicyManager( ipVersion uint8, ) *qosPolicyManager { return &qosPolicyManager{ - ipVersion: ipVersion, - policies: map[types.WorkloadEndpointID]rules.QoSPolicy{}, - dirty: true, mangleTable: mangleTable, ruleRenderer: ruleRenderer, + ipVersion: ipVersion, + wepPolicies: map[types.WorkloadEndpointID]rules.QoSPolicy{}, + hepPolicies: map[types.HostEndpointID]rules.QoSPolicy{}, + dirty: true, logCxt: logrus.WithField("ipVersion", ipVersion), } } func (m *qosPolicyManager) OnUpdate(msg interface{}) { switch msg := msg.(type) { + case *proto.HostEndpointUpdate: + m.handleHEPUpdates(msg.GetId(), msg) + case *proto.HostEndpointRemove: + m.handleHEPUpdates(msg.GetId(), nil) case *proto.WorkloadEndpointUpdate: - m.handleWlEndpointUpdates(msg.GetId(), msg) + m.handleWEPUpdates(msg.GetId(), msg) case *proto.WorkloadEndpointRemove: - m.handleWlEndpointUpdates(msg.GetId(), nil) + m.handleWEPUpdates(msg.GetId(), nil) + } +} + +func (m *qosPolicyManager) handleHEPUpdates(hepID *proto.HostEndpointID, msg *proto.HostEndpointUpdate) { + id := types.ProtoToHostEndpointID(hepID) + if msg == nil || len(msg.Endpoint.QosPolicies) == 0 { + _, exists := m.hepPolicies[id] + if exists { + delete(m.hepPolicies, id) + m.dirty = true + } + return + } + + // We only support one policy per endpoint at this point. + dscp := msg.Endpoint.QosPolicies[0].Dscp + + // This situation must be handled earlier. + if dscp > 63 || dscp < 0 { + logrus.WithField("id", id).Panicf("Invalid DSCP value %v", dscp) + } + ips := msg.Endpoint.ExpectedIpv4Addrs + if m.ipVersion == 6 { + ips = msg.Endpoint.ExpectedIpv6Addrs + } + if len(ips) != 0 { + m.hepPolicies[id] = rules.QoSPolicy{ + SrcAddrs: normaliseSourceAddr(ips), + DSCP: uint8(dscp), + } + m.dirty = true } } -func (m *qosPolicyManager) handleWlEndpointUpdates(wlID *proto.WorkloadEndpointID, msg *proto.WorkloadEndpointUpdate) { - id := types.ProtoToWorkloadEndpointID(wlID) +func (m *qosPolicyManager) handleWEPUpdates(wepID *proto.WorkloadEndpointID, msg *proto.WorkloadEndpointUpdate) { + id := types.ProtoToWorkloadEndpointID(wepID) if msg == nil || len(msg.Endpoint.QosPolicies) == 0 { - _, exists := m.policies[id] + _, exists := m.wepPolicies[id] if exists { - delete(m.policies, id) + delete(m.wepPolicies, id) m.dirty = true } return @@ -84,7 +121,7 @@ func (m *qosPolicyManager) handleWlEndpointUpdates(wlID *proto.WorkloadEndpointI ips = msg.Endpoint.Ipv6Nets } if len(ips) != 0 { - m.policies[id] = rules.QoSPolicy{ + m.wepPolicies[id] = rules.QoSPolicy{ SrcAddrs: normaliseSourceAddr(ips), DSCP: uint8(dscp), } @@ -102,9 +139,12 @@ func normaliseSourceAddr(addrs []string) string { } func (m *qosPolicyManager) CompleteDeferredWork() error { + var policies []rules.QoSPolicy if m.dirty { - var policies []rules.QoSPolicy - for _, p := range m.policies { + for _, p := range m.wepPolicies { + policies = append(policies, p) + } + for _, p := range m.hepPolicies { policies = append(policies, p) } sort.Slice(policies, func(i, j int) bool { diff --git a/felix/dataplane/linux/qos_policy_mgr_test.go b/felix/dataplane/linux/qos_policy_mgr_test.go index faed4e7b10d..e74ca76ac64 100644 --- a/felix/dataplane/linux/qos_policy_mgr_test.go +++ b/felix/dataplane/linux/qos_policy_mgr_test.go @@ -57,8 +57,8 @@ func qosPolicyManagerTests(ipVersion uint8) func() { }}}) }) - It("should handle workload updates correctly", func() { - By("sending workload endpoint updates with DSCP annotion") + It("should handle endpoint updates correctly", func() { + By("sending first workload endpoint update with DSCP annotation") endpoint1 := &proto.WorkloadEndpoint{ State: "active", Name: "cali12345-ab", @@ -84,7 +84,7 @@ func qosPolicyManagerTests(ipVersion uint8) func() { }, }}}) - By("sending another workload endpoint updates with DSCP annotion") + By("sending another workload endpoint update with DSCP annotation") endpoint2 := &proto.WorkloadEndpoint{ State: "active", Name: "cali2", @@ -115,7 +115,7 @@ func qosPolicyManagerTests(ipVersion uint8) func() { }, }}}) - By("verifying update to DSCP value takes effect") + By("verifying update to first workload DSCP value") endpoint1.QosPolicies = []*proto.QoSPolicy{{Dscp: 13}} manager.OnUpdate(&proto.WorkloadEndpointUpdate{ Id: &wlEPID1, @@ -140,7 +140,73 @@ func qosPolicyManagerTests(ipVersion uint8) func() { }, }}}) - By("verifying QoS policy rules removed when annotation is removed") + By("sending a host endpoint update with DSCP annotation") + hep1ID := &proto.HostEndpointID{ + EndpointId: "id1", + } + hep1 := &proto.HostEndpoint{ + Name: "eth0", + ExpectedIpv4Addrs: []string{"192.168.1.2", "192.168.2.2"}, + ExpectedIpv6Addrs: []string{"2001:db9:10::2", "dead:beff::20:2"}, + QosPolicies: []*proto.QoSPolicy{{Dscp: 44}}, + } + manager.OnUpdate(&proto.HostEndpointUpdate{ + Id: hep1ID, + Endpoint: hep1, + }) + + err = manager.CompleteDeferredWork() + Expect(err).NotTo(HaveOccurred()) + + mangleTable.checkChains([][]*generictables.Chain{{{ + Name: rules.ChainQoSPolicy, + Rules: []generictables.Rule{ + // Rendered policies are sorted. + { + Action: iptables.DSCPAction{Value: 20}, + Match: iptables.Match().SourceNet(addrFromWlUpdate(endpoint2, ipVersion)), + }, + { + Action: iptables.DSCPAction{Value: 13}, + Match: iptables.Match().SourceNet(addrFromWlUpdate(endpoint1, ipVersion)), + }, + { + Action: iptables.DSCPAction{Value: 44}, + Match: iptables.Match().SourceNet(addrFromHepUpdate(hep1, ipVersion)), + }, + }, + }}}) + + By("verifying update to host endpoint DSCP value") + hep1.QosPolicies = []*proto.QoSPolicy{{Dscp: 30}} + manager.OnUpdate(&proto.HostEndpointUpdate{ + Id: hep1ID, + Endpoint: hep1, + }) + + err = manager.CompleteDeferredWork() + Expect(err).NotTo(HaveOccurred()) + + mangleTable.checkChains([][]*generictables.Chain{{{ + Name: rules.ChainQoSPolicy, + Rules: []generictables.Rule{ + // Rendered policies are sorted. + { + Action: iptables.DSCPAction{Value: 20}, + Match: iptables.Match().SourceNet(addrFromWlUpdate(endpoint2, ipVersion)), + }, + { + Action: iptables.DSCPAction{Value: 13}, + Match: iptables.Match().SourceNet(addrFromWlUpdate(endpoint1, ipVersion)), + }, + { + Action: iptables.DSCPAction{Value: 30}, + Match: iptables.Match().SourceNet(addrFromHepUpdate(hep1, ipVersion)), + }, + }, + }}}) + + By("verifying QoS policy rules removed when first workload annotation is removed") endpoint1.QosPolicies = nil manager.OnUpdate(&proto.WorkloadEndpointUpdate{ Id: &wlEPID1, @@ -157,17 +223,37 @@ func qosPolicyManagerTests(ipVersion uint8) func() { Action: iptables.DSCPAction{Value: 20}, Match: iptables.Match().SourceNet(addrFromWlUpdate(endpoint2, ipVersion)), }, + { + Action: iptables.DSCPAction{Value: 30}, + Match: iptables.Match().SourceNet(addrFromHepUpdate(hep1, ipVersion)), + }, }, }}}) - By("verifying QoS policy rules removed when workload is removed") + By("verifying QoS policy rules removed when second workload is removed") manager.OnUpdate(&proto.WorkloadEndpointRemove{ Id: &wlEPID2, }) err = manager.CompleteDeferredWork() Expect(err).NotTo(HaveOccurred()) + mangleTable.checkChains([][]*generictables.Chain{{{ + Name: rules.ChainQoSPolicy, + Rules: []generictables.Rule{ + { + Action: iptables.DSCPAction{Value: 30}, + Match: iptables.Match().SourceNet(addrFromHepUpdate(hep1, ipVersion)), + }, + }, + }}}) + + By("verifying QoS policy rules removed when host endpoint is removed") + manager.OnUpdate(&proto.HostEndpointRemove{ + Id: hep1ID, + }) + err = manager.CompleteDeferredWork() + Expect(err).NotTo(HaveOccurred()) mangleTable.checkChains([][]*generictables.Chain{{{ Name: rules.ChainQoSPolicy, Rules: nil, @@ -183,3 +269,11 @@ func addrFromWlUpdate(endpoint *proto.WorkloadEndpoint, ipVersion uint8) string } return normaliseSourceAddr(addr) } + +func addrFromHepUpdate(endpoint *proto.HostEndpoint, ipVersion uint8) string { + addr := endpoint.ExpectedIpv4Addrs + if ipVersion == 6 { + addr = endpoint.ExpectedIpv6Addrs + } + return normaliseSourceAddr(addr) +} diff --git a/felix/fv/qos_policy_test.go b/felix/fv/qos_policy_test.go index 3440bf89f4e..cf1c4281f5c 100644 --- a/felix/fv/qos_policy_test.go +++ b/felix/fv/qos_policy_test.go @@ -33,28 +33,28 @@ import ( "github.com/projectcalico/calico/felix/fv/workload" "github.com/projectcalico/calico/libcalico-go/lib/apiconfig" api "github.com/projectcalico/calico/libcalico-go/lib/apis/v3" + client "github.com/projectcalico/calico/libcalico-go/lib/clientv3" + "github.com/projectcalico/calico/libcalico-go/lib/options" ) -var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ qos policy tests", []apiconfig.DatastoreType{apiconfig.EtcdV3, apiconfig.Kubernetes}, func(getInfra infrastructure.InfraFactory) { +var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ qos policy tests", []apiconfig.DatastoreType{apiconfig.Kubernetes}, func(getInfra infrastructure.InfraFactory) { const ( wepPortStr = "8055" ) var ( - infra infrastructure.DatastoreInfra - tc infrastructure.TopologyContainers - ep1_1, ep2_1 *workload.Workload // Workloads on Felix0 - ep1_2, ep2_2 *workload.Workload // Dual stack workloads on Felix1 - extClient *containers.Container - cc *connectivity.Checker + infra infrastructure.DatastoreInfra + tc infrastructure.TopologyContainers + client client.Interface + ep1_1, ep2_1, hostw *workload.Workload // Workloads on Felix0 + ep1_2, ep2_2 *workload.Workload // Dual stack workloads on Felix1 + extClient *containers.Container + cc *connectivity.Checker ) BeforeEach(func() { iOpts := []infrastructure.CreateOption{} infra = getInfra(iOpts...) - if (NFTMode() || BPFMode()) && getDataStoreType(infra) == "etcdv3" { - Skip("Skipping NFT / BPF test for etcdv3 backend.") - } // TODO (mazdak): Add support for bpf if BPFMode() { @@ -63,9 +63,8 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ qos policy tests", []apicon options := infrastructure.DefaultTopologyOptions() options.IPIPMode = apiv3.IPIPModeNever - options.FelixLogSeverity = "Debug" options.EnableIPv6 = true - tc, _ = infrastructure.StartNNodeTopology(2, options, infra) + tc, client = infrastructure.StartNNodeTopology(2, options, infra) // Install a default profile that allows all ingress and egress, in the absence of any Policy. infra.AddDefaultAllow() @@ -77,6 +76,9 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ qos policy tests", []apicon ep2_1 = workload.Run(tc.Felixes[0], "ep2-1", "default", "10.65.0.1", wepPortStr, "tcp") ep2_1.ConfigureInInfra(infra) + hostw = workload.Run(tc.Felixes[0], "host0", "", tc.Felixes[0].IP, wepPortStr, "tcp") + hostw.ConfigureInInfra(infra) + // Create workload on host 2 (Felix1) ep1_2Opts := workload.WithIPv6Address("dead:beef::1:0") ep1_2 = workload.Run(tc.Felixes[1], "ep1-2", "default", "10.65.1.0", wepPortStr, "tcp", ep1_2Opts) @@ -113,6 +115,7 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ qos policy tests", []apicon } } + hostw.Stop() ep1_1.Stop() ep2_1.Stop() ep1_2.Stop() @@ -126,46 +129,50 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ qos policy tests", []apicon }) It("should have expected restriction on the rule jumping to QoS policy rules", func() { - detecIptablesRule := func(felix *infrastructure.Felix, ipv6 bool) { + detecIptablesRule := func(felix *infrastructure.Felix, ipVersion uint8) { binary := "iptables-save" - ipsetName := "cali40all-ipam-pools" - if ipv6 { + if ipVersion == 6 { binary = "ip6tables-save" - ipsetName = "cali60all-ipam-pools" } - expectedRule := fmt.Sprintf( - "-m set --match-set %v src -m set ! --match-set %v dst -j cali-qos-policy", ipsetName, ipsetName) + allPoolsIPSet := fmt.Sprintf("cali%v0all-ipam-pools", ipVersion) + allHostsIPSet := fmt.Sprintf("cali%v0all-hosts-net", ipVersion) + tmpl := "-m set --match-set %v src -m set ! --match-set %v dst -m set ! --match-set %v dst -j cali-qos-policy" + expectedRule1 := fmt.Sprintf(tmpl, allPoolsIPSet, allPoolsIPSet, allHostsIPSet) + expectedRule2 := fmt.Sprintf(tmpl, allHostsIPSet, allPoolsIPSet, allHostsIPSet) getRules := func() string { output, _ := felix.ExecOutput(binary, "-t", "mangle") return output } - Eventually(getRules, 5*time.Second, 100*time.Millisecond).Should(ContainSubstring(expectedRule)) - Consistently(getRules, 5*time.Second, 100*time.Millisecond).Should(ContainSubstring(expectedRule)) + Eventually(getRules, 5*time.Second, 100*time.Millisecond).Should(ContainSubstring(expectedRule1)) + Consistently(getRules, 3*time.Second, 100*time.Millisecond).Should(ContainSubstring(expectedRule1)) + Consistently(getRules, 3*time.Second, 100*time.Millisecond).Should(ContainSubstring(expectedRule2)) } - detectNftablesRule := func(felix *infrastructure.Felix, ipv6 bool) { - ipsetName := "@cali40all-ipam-pools" + detectNftablesRule := func(felix *infrastructure.Felix, ipVersion uint8) { ipFamily := "ip" - if ipv6 { - ipsetName = "@cali60all-ipam-pools" + if ipVersion == 6 { ipFamily = "ip6" } - pattern := fmt.Sprintf( - "%v saddr %v %v daddr != %v .* jump mangle-cali-qos-policy", ipFamily, ipsetName, ipFamily, ipsetName) + allPoolsIPSet := fmt.Sprintf("@cali%v0all-ipam-pools", ipVersion) + allHostsIPSet := fmt.Sprintf("@cali%v0all-hosts-net", ipVersion) + tmpl := "%v saddr %v %v daddr != %v %v daddr != %v .* jump mangle-cali-qos-policy" + pattern1 := fmt.Sprintf(tmpl, ipFamily, allPoolsIPSet, ipFamily, allPoolsIPSet, ipFamily, allHostsIPSet) + pattern2 := fmt.Sprintf(tmpl, ipFamily, allHostsIPSet, ipFamily, allPoolsIPSet, ipFamily, allHostsIPSet) getRules := func() string { output, _ := felix.ExecOutput("nft", "list", "chain", ipFamily, "calico", "mangle-cali-POSTROUTING") return output } - Eventually(getRules, 5*time.Second, 100*time.Millisecond).Should(MatchRegexp(pattern)) - Consistently(getRules, 5*time.Second, 100*time.Millisecond).Should(MatchRegexp(pattern)) + Eventually(getRules, 5*time.Second, 100*time.Millisecond).Should(MatchRegexp(pattern1)) + Consistently(getRules, 3*time.Second, 100*time.Millisecond).Should(MatchRegexp(pattern1)) + Consistently(getRules, 3*time.Second, 100*time.Millisecond).Should(MatchRegexp(pattern2)) } if NFTMode() { - detectNftablesRule(tc.Felixes[0], false) - detectNftablesRule(tc.Felixes[0], true) + detectNftablesRule(tc.Felixes[0], 4) + detectNftablesRule(tc.Felixes[0], 6) } else { - detecIptablesRule(tc.Felixes[0], false) - detecIptablesRule(tc.Felixes[0], true) + detecIptablesRule(tc.Felixes[0], 4) + detecIptablesRule(tc.Felixes[0], 6) } }) @@ -190,6 +197,7 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ qos policy tests", []apicon extClient.Exec("ip6tables", "-A", "INPUT", "-m", "dscp", "!", "--dscp", "0x28", "-j", "DROP") cc.ResetExpectations() + cc.ExpectNone(extClient, hostw) cc.ExpectNone(extClient, ep1_1) cc.ExpectNone(extClient, ep2_1) @@ -201,6 +209,29 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ qos policy tests", []apicon verifyQoSPolicies(tc.Felixes[0], nil, nil) verifyQoSPolicies(tc.Felixes[1], nil, nil) + By("adding a host endpoint to felix 0") + hep := apiv3.NewHostEndpoint() + hep.Name = "host1-eth0" + hep.Labels = map[string]string{ + "name": hep.Name, + "host-endpoint": "true", + } + hep.Spec.Node = tc.Felixes[0].Hostname + hep.Spec.ExpectedIPs = []string{tc.Felixes[0].IP} + hep.Annotations = map[string]string{ + "qos.projectcalico.org/dscp": "20", + } + _, err := client.HostEndpoints().Create(utils.Ctx, hep, options.SetOptions{}) + Expect(err).NotTo(HaveOccurred()) + + gnp := apiv3.NewGlobalNetworkPolicy() + gnp.Name = "gnp-1" + gnp.Spec.Selector = "host-endpoint=='true'" + gnp.Spec.Ingress = []apiv3.Rule{{Action: apiv3.Allow}} + gnp.Spec.Egress = []apiv3.Rule{{Action: apiv3.Allow}} + _, err = client.GlobalNetworkPolicies().Create(utils.Ctx, gnp, utils.NoOptions) + Expect(err).NotTo(HaveOccurred()) + By("setting the initial DSCP values") ep1_1.WorkloadEndpoint.Spec.QoSControls = &api.QoSControls{ DSCP: &dscp20, @@ -213,6 +244,7 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ qos policy tests", []apicon ep2_2.UpdateInInfra(infra) cc.ResetExpectations() + cc.ExpectSome(extClient, hostw) cc.ExpectSome(extClient, ep1_1) cc.ExpectNone(extClient, ep2_1) @@ -220,7 +252,7 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ qos policy tests", []apicon cc.Expect(connectivity.Some, extClient, ep2_2, ccOpts) cc.CheckConnectivity() - verifyQoSPolicies(tc.Felixes[0], []string{"0x14"}, nil) + verifyQoSPolicies(tc.Felixes[0], []string{"0x14", "0x14"}, nil) verifyQoSPolicies(tc.Felixes[1], []string{"0x28"}, []string{"0x28"}) By("updating DSCP values on some workloads") @@ -235,6 +267,7 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ qos policy tests", []apicon ep1_2.UpdateInInfra(infra) cc.ResetExpectations() + cc.ExpectSome(extClient, hostw) cc.ExpectSome(extClient, ep1_1) cc.ExpectNone(extClient, ep2_1) @@ -242,7 +275,7 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ qos policy tests", []apicon cc.Expect(connectivity.Some, extClient, ep2_2, ccOpts) cc.CheckConnectivity() - verifyQoSPolicies(tc.Felixes[0], []string{"0x0", "0x14"}, nil) + verifyQoSPolicies(tc.Felixes[0], []string{"0x0", "0x14", "0x14"}, nil) verifyQoSPolicies(tc.Felixes[1], []string{"0x28", "0x28"}, []string{"0x28", "0x28"}) // 0x28 used by two workloads By("updating DSCP values on other workloads") @@ -257,6 +290,7 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ qos policy tests", []apicon ep1_2.UpdateInInfra(infra) cc.ResetExpectations() + cc.ExpectSome(extClient, hostw) cc.ExpectNone(extClient, ep1_1) cc.ExpectNone(extClient, ep2_1) @@ -264,7 +298,7 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ qos policy tests", []apicon cc.Expect(connectivity.Some, extClient, ep2_2, ccOpts) cc.CheckConnectivity() - verifyQoSPolicies(tc.Felixes[0], []string{"0x0", "0x20"}, nil) + verifyQoSPolicies(tc.Felixes[0], []string{"0x0", "0x14", "0x20"}, nil) verifyQoSPolicies(tc.Felixes[1], []string{"0x14", "0x28"}, []string{"0x14", "0x28"}) By("reverting the DSCP values") @@ -279,6 +313,23 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ qos policy tests", []apicon ep1_2.UpdateInInfra(infra) cc.ResetExpectations() + cc.ExpectSome(extClient, hostw) + cc.ExpectSome(extClient, ep1_1) + cc.ExpectNone(extClient, ep2_1) + + cc.Expect(connectivity.Some, extClient, ep1_2, ccOpts) + cc.Expect(connectivity.Some, extClient, ep2_2, ccOpts) + cc.CheckConnectivity() + + verifyQoSPolicies(tc.Felixes[0], []string{"0x0", "0x14", "0x14"}, nil) + verifyQoSPolicies(tc.Felixes[1], []string{"0x28", "0x28"}, []string{"0x28", "0x28"}) // 0x28 used by two workloads + + By("removing host endpoint") + _, err = client.HostEndpoints().Delete(utils.Ctx, hep.Name, options.DeleteOptions{}) + Expect(err).NotTo(HaveOccurred()) + + cc.ResetExpectations() + cc.ExpectNone(extClient, hostw) cc.ExpectSome(extClient, ep1_1) cc.ExpectNone(extClient, ep2_1) @@ -288,6 +339,7 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ qos policy tests", []apicon verifyQoSPolicies(tc.Felixes[0], []string{"0x0", "0x14"}, nil) verifyQoSPolicies(tc.Felixes[1], []string{"0x28", "0x28"}, []string{"0x28", "0x28"}) // 0x28 used by two workloads + By("resetting DSCP value on some workloads") ep2_1.WorkloadEndpoint.Spec.QoSControls = &api.QoSControls{} ep2_1.UpdateInInfra(infra) @@ -296,6 +348,7 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ qos policy tests", []apicon ep1_2.UpdateInInfra(infra) cc.ResetExpectations() + cc.ExpectNone(extClient, hostw) cc.ExpectSome(extClient, ep1_1) cc.ExpectNone(extClient, ep2_1) @@ -314,6 +367,7 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ qos policy tests", []apicon ep2_2.RemoveFromInfra(infra) cc.ResetExpectations() + cc.ExpectNone(extClient, hostw) cc.ExpectNone(extClient, ep1_1) cc.ExpectNone(extClient, ep2_1) @@ -387,8 +441,8 @@ func verifyQoSPoliciesWithIPFamily(felix *infrastructure.Felix, ipv6 bool, value rulePattern = "DSCP --set-dscp" } - EventuallyWithOffset(1, assertRules, 5*time.Second, 100*time.Millisecond). + EventuallyWithOffset(2, assertRules, 5*time.Second, 100*time.Millisecond). Should(BeTrue()) - ConsistentlyWithOffset(1, assertRules, 3*time.Second, 100*time.Millisecond). + ConsistentlyWithOffset(2, assertRules, 3*time.Second, 100*time.Millisecond). Should(BeTrue()) } diff --git a/felix/proto/felixbackend.pb.go b/felix/proto/felixbackend.pb.go index 1a66a61e46a..82cb51366ea 100644 --- a/felix/proto/felixbackend.pb.go +++ b/felix/proto/felixbackend.pb.go @@ -3606,6 +3606,7 @@ type HostEndpoint struct { ForwardTiers []*TierInfo `protobuf:"bytes,8,rep,name=forward_tiers,json=forwardTiers,proto3" json:"forward_tiers,omitempty"` ExpectedIpv4Addrs []string `protobuf:"bytes,4,rep,name=expected_ipv4_addrs,json=expectedIpv4Addrs,proto3" json:"expected_ipv4_addrs,omitempty"` ExpectedIpv6Addrs []string `protobuf:"bytes,5,rep,name=expected_ipv6_addrs,json=expectedIpv6Addrs,proto3" json:"expected_ipv6_addrs,omitempty"` + QosPolicies []*QoSPolicy `protobuf:"bytes,9,rep,name=qos_policies,json=qosPolicies,proto3" json:"qos_policies,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -3696,6 +3697,13 @@ func (x *HostEndpoint) GetExpectedIpv6Addrs() []string { return nil } +func (x *HostEndpoint) GetQosPolicies() []*QoSPolicy { + if x != nil { + return x.QosPolicies + } + return nil +} + type HostEndpointRemove struct { state protoimpl.MessageState `protogen:"open.v1"` Id *HostEndpointID `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` @@ -6637,7 +6645,7 @@ const file_felixbackend_proto_rawDesc = "" + "endpointIdJ\x04\b\x01\x10\x02R\bhostname\"l\n" + "\x12HostEndpointUpdate\x12%\n" + "\x02id\x18\x01 \x01(\v2\x15.felix.HostEndpointIDR\x02id\x12/\n" + - "\bendpoint\x18\x03 \x01(\v2\x13.felix.HostEndpointR\bendpoint\"\xf1\x02\n" + + "\bendpoint\x18\x03 \x01(\v2\x13.felix.HostEndpointR\bendpoint\"\xa6\x03\n" + "\fHostEndpoint\x12\x12\n" + "\x04name\x18\x01 \x01(\tR\x04name\x12\x1f\n" + "\vprofile_ids\x18\x02 \x03(\tR\n" + @@ -6647,7 +6655,8 @@ const file_felixbackend_proto_rawDesc = "" + "\x0epre_dnat_tiers\x18\a \x03(\v2\x0f.felix.TierInfoR\fpreDnatTiers\x124\n" + "\rforward_tiers\x18\b \x03(\v2\x0f.felix.TierInfoR\fforwardTiers\x12.\n" + "\x13expected_ipv4_addrs\x18\x04 \x03(\tR\x11expectedIpv4Addrs\x12.\n" + - "\x13expected_ipv6_addrs\x18\x05 \x03(\tR\x11expectedIpv6Addrs\";\n" + + "\x13expected_ipv6_addrs\x18\x05 \x03(\tR\x11expectedIpv6Addrs\x123\n" + + "\fqos_policies\x18\t \x03(\v2\x10.felix.QoSPolicyR\vqosPolicies\";\n" + "\x12HostEndpointRemove\x12%\n" + "\x02id\x18\x01 \x01(\v2\x15.felix.HostEndpointIDR\x02id\"\x99\x01\n" + "\bTierInfo\x12\x12\n" + @@ -7082,48 +7091,49 @@ var file_felixbackend_proto_depIdxs = []int32{ 45, // 87: felix.HostEndpoint.untracked_tiers:type_name -> felix.TierInfo 45, // 88: felix.HostEndpoint.pre_dnat_tiers:type_name -> felix.TierInfo 45, // 89: felix.HostEndpoint.forward_tiers:type_name -> felix.TierInfo - 41, // 90: felix.HostEndpointRemove.id:type_name -> felix.HostEndpointID - 41, // 91: felix.HostEndpointStatusUpdate.id:type_name -> felix.HostEndpointID - 49, // 92: felix.HostEndpointStatusUpdate.status:type_name -> felix.EndpointStatus - 41, // 93: felix.HostEndpointStatusRemove.id:type_name -> felix.HostEndpointID - 33, // 94: felix.WorkloadEndpointStatusUpdate.id:type_name -> felix.WorkloadEndpointID - 49, // 95: felix.WorkloadEndpointStatusUpdate.status:type_name -> felix.EndpointStatus - 36, // 96: felix.WorkloadEndpointStatusUpdate.endpoint:type_name -> felix.WorkloadEndpoint - 33, // 97: felix.WorkloadEndpointStatusRemove.id:type_name -> felix.WorkloadEndpointID - 0, // 98: felix.WireguardStatusUpdate.ip_version:type_name -> felix.IPVersion - 94, // 99: felix.HostMetadataV4V6Update.labels:type_name -> felix.HostMetadataV4V6Update.LabelsEntry - 63, // 100: felix.IPAMPoolUpdate.pool:type_name -> felix.IPAMPool - 67, // 101: felix.ServiceAccountUpdate.id:type_name -> felix.ServiceAccountID - 95, // 102: felix.ServiceAccountUpdate.labels:type_name -> felix.ServiceAccountUpdate.LabelsEntry - 67, // 103: felix.ServiceAccountRemove.id:type_name -> felix.ServiceAccountID - 70, // 104: felix.NamespaceUpdate.id:type_name -> felix.NamespaceID - 96, // 105: felix.NamespaceUpdate.labels:type_name -> felix.NamespaceUpdate.LabelsEntry - 70, // 106: felix.NamespaceRemove.id:type_name -> felix.NamespaceID - 1, // 107: felix.RouteUpdate.types:type_name -> felix.RouteType - 2, // 108: felix.RouteUpdate.ip_pool_type:type_name -> felix.IPPoolType - 71, // 109: felix.RouteUpdate.tunnel_type:type_name -> felix.TunnelType - 31, // 110: felix.DataplaneStats.protocol:type_name -> felix.Protocol - 78, // 111: felix.DataplaneStats.stats:type_name -> felix.Statistic - 79, // 112: felix.DataplaneStats.rules:type_name -> felix.RuleTrace - 3, // 113: felix.DataplaneStats.action:type_name -> felix.Action - 5, // 114: felix.Statistic.direction:type_name -> felix.Statistic.Direction - 6, // 115: felix.Statistic.relativity:type_name -> felix.Statistic.Relativity - 7, // 116: felix.Statistic.kind:type_name -> felix.Statistic.Kind - 3, // 117: felix.Statistic.action:type_name -> felix.Action - 24, // 118: felix.RuleTrace.policy:type_name -> felix.PolicyID - 20, // 119: felix.RuleTrace.profile:type_name -> felix.ProfileID - 8, // 120: felix.RuleTrace.direction:type_name -> felix.RuleTrace.Direction - 85, // 121: felix.ServiceUpdate.ports:type_name -> felix.ServicePort - 13, // 122: felix.ConfigUpdate.SourceToRawConfigEntry.value:type_name -> felix.RawConfig - 9, // 123: felix.PolicySync.Sync:input_type -> felix.SyncRequest - 77, // 124: felix.PolicySync.Report:input_type -> felix.DataplaneStats - 10, // 125: felix.PolicySync.Sync:output_type -> felix.ToDataplane - 76, // 126: felix.PolicySync.Report:output_type -> felix.ReportResult - 125, // [125:127] is the sub-list for method output_type - 123, // [123:125] is the sub-list for method input_type - 123, // [123:123] is the sub-list for extension type_name - 123, // [123:123] is the sub-list for extension extendee - 0, // [0:123] is the sub-list for field type_name + 38, // 90: felix.HostEndpoint.qos_policies:type_name -> felix.QoSPolicy + 41, // 91: felix.HostEndpointRemove.id:type_name -> felix.HostEndpointID + 41, // 92: felix.HostEndpointStatusUpdate.id:type_name -> felix.HostEndpointID + 49, // 93: felix.HostEndpointStatusUpdate.status:type_name -> felix.EndpointStatus + 41, // 94: felix.HostEndpointStatusRemove.id:type_name -> felix.HostEndpointID + 33, // 95: felix.WorkloadEndpointStatusUpdate.id:type_name -> felix.WorkloadEndpointID + 49, // 96: felix.WorkloadEndpointStatusUpdate.status:type_name -> felix.EndpointStatus + 36, // 97: felix.WorkloadEndpointStatusUpdate.endpoint:type_name -> felix.WorkloadEndpoint + 33, // 98: felix.WorkloadEndpointStatusRemove.id:type_name -> felix.WorkloadEndpointID + 0, // 99: felix.WireguardStatusUpdate.ip_version:type_name -> felix.IPVersion + 94, // 100: felix.HostMetadataV4V6Update.labels:type_name -> felix.HostMetadataV4V6Update.LabelsEntry + 63, // 101: felix.IPAMPoolUpdate.pool:type_name -> felix.IPAMPool + 67, // 102: felix.ServiceAccountUpdate.id:type_name -> felix.ServiceAccountID + 95, // 103: felix.ServiceAccountUpdate.labels:type_name -> felix.ServiceAccountUpdate.LabelsEntry + 67, // 104: felix.ServiceAccountRemove.id:type_name -> felix.ServiceAccountID + 70, // 105: felix.NamespaceUpdate.id:type_name -> felix.NamespaceID + 96, // 106: felix.NamespaceUpdate.labels:type_name -> felix.NamespaceUpdate.LabelsEntry + 70, // 107: felix.NamespaceRemove.id:type_name -> felix.NamespaceID + 1, // 108: felix.RouteUpdate.types:type_name -> felix.RouteType + 2, // 109: felix.RouteUpdate.ip_pool_type:type_name -> felix.IPPoolType + 71, // 110: felix.RouteUpdate.tunnel_type:type_name -> felix.TunnelType + 31, // 111: felix.DataplaneStats.protocol:type_name -> felix.Protocol + 78, // 112: felix.DataplaneStats.stats:type_name -> felix.Statistic + 79, // 113: felix.DataplaneStats.rules:type_name -> felix.RuleTrace + 3, // 114: felix.DataplaneStats.action:type_name -> felix.Action + 5, // 115: felix.Statistic.direction:type_name -> felix.Statistic.Direction + 6, // 116: felix.Statistic.relativity:type_name -> felix.Statistic.Relativity + 7, // 117: felix.Statistic.kind:type_name -> felix.Statistic.Kind + 3, // 118: felix.Statistic.action:type_name -> felix.Action + 24, // 119: felix.RuleTrace.policy:type_name -> felix.PolicyID + 20, // 120: felix.RuleTrace.profile:type_name -> felix.ProfileID + 8, // 121: felix.RuleTrace.direction:type_name -> felix.RuleTrace.Direction + 85, // 122: felix.ServiceUpdate.ports:type_name -> felix.ServicePort + 13, // 123: felix.ConfigUpdate.SourceToRawConfigEntry.value:type_name -> felix.RawConfig + 9, // 124: felix.PolicySync.Sync:input_type -> felix.SyncRequest + 77, // 125: felix.PolicySync.Report:input_type -> felix.DataplaneStats + 10, // 126: felix.PolicySync.Sync:output_type -> felix.ToDataplane + 76, // 127: felix.PolicySync.Report:output_type -> felix.ReportResult + 126, // [126:128] is the sub-list for method output_type + 124, // [124:126] is the sub-list for method input_type + 124, // [124:124] is the sub-list for extension type_name + 124, // [124:124] is the sub-list for extension extendee + 0, // [0:124] is the sub-list for field type_name } func init() { file_felixbackend_proto_init() } diff --git a/felix/proto/felixbackend.proto b/felix/proto/felixbackend.proto index 453d6aea8ab..71274a13ee4 100644 --- a/felix/proto/felixbackend.proto +++ b/felix/proto/felixbackend.proto @@ -465,6 +465,7 @@ message HostEndpoint { repeated TierInfo forward_tiers = 8; repeated string expected_ipv4_addrs = 4; repeated string expected_ipv6_addrs = 5; + repeated QoSPolicy qos_policies = 9; } message HostEndpointRemove { diff --git a/felix/rules/static.go b/felix/rules/static.go index 6d33dbc6db0..18fca049164 100644 --- a/felix/rules/static.go +++ b/felix/rules/static.go @@ -1067,13 +1067,25 @@ func (r *DefaultRuleRenderer) StaticManglePostroutingChain(ipVersion uint8) *gen ipConf := r.ipSetConfig(ipVersion) allIPsSetName := ipConf.NameForMainIPSet(IPSetIDAllPools) - rules = append(rules, generictables.Rule{ - Match: r.NewMatch(). - SourceIPSet(allIPsSetName). - NotDestIPSet(allIPsSetName), - Action: r.Jump(ChainQoSPolicy), - Comment: []string{"QoS policy for traffic leaving cluster"}, - }) + allHostsSetName := ipConf.NameForMainIPSet(IPSetIDAllHostNets) + rules = append( + rules, generictables.Rule{ + Match: r.NewMatch(). + SourceIPSet(allIPsSetName). + NotDestIPSet(allIPsSetName). + NotDestIPSet(allHostsSetName), + Action: r.Jump(ChainQoSPolicy), + Comment: []string{"set dscp for workloads traffic leaving cluster."}, + }, + generictables.Rule{ + Match: r.NewMatch(). + SourceIPSet(allHostsSetName). + NotDestIPSet(allIPsSetName). + NotDestIPSet(allHostsSetName), + Action: r.Jump(ChainQoSPolicy), + Comment: []string{"set dscp for host endpoints traffic leaving cluster."}, + }, + ) // Allow immediately if IptablesMarkAccept is set. Our filter-FORWARD chain sets this for // any packets that reach the end of that chain. The principle is that we don't want to diff --git a/felix/rules/static_test.go b/felix/rules/static_test.go index 1b118ee3abd..3a099a3f6d5 100644 --- a/felix/rules/static_test.go +++ b/felix/rules/static_test.go @@ -41,14 +41,26 @@ var _ = Describe("Static", func() { }) checkManglePostrouting := func(ipVersion uint8, ipvs bool) { - allIPSetName := fmt.Sprintf("cali%v0all-ipam-pools", ipVersion) + allPoolSetName := fmt.Sprintf("cali%v0all-ipam-pools", ipVersion) + allHostsSetName := fmt.Sprintf("cali%v0all-hosts-net", ipVersion) It("should generate expected cali-POSTROUTING chain in the mangle table", func() { expRules := []generictables.Rule{ - // Evaluate QoS policies. + // DSCP rules. { - Match: Match().SourceIPSet(allIPSetName).NotDestIPSet(allIPSetName), + Match: Match(). + SourceIPSet(allPoolSetName). + NotDestIPSet(allPoolSetName). + NotDestIPSet(allHostsSetName), + Action: JumpAction{Target: ChainQoSPolicy}, + Comment: []string{"set dscp for workloads traffic leaving cluster."}, + }, + { + Match: Match(). + SourceIPSet(allHostsSetName). + NotDestIPSet(allPoolSetName). + NotDestIPSet(allHostsSetName), Action: JumpAction{Target: ChainQoSPolicy}, - Comment: []string{"QoS policy for traffic leaving cluster"}, + Comment: []string{"set dscp for host endpoints traffic leaving cluster."}, }, // Accept already accepted. { diff --git a/libcalico-go/lib/backend/model/hostendpoint.go b/libcalico-go/lib/backend/model/hostendpoint.go index 81c8538ea5f..1d47ce3e22f 100644 --- a/libcalico-go/lib/backend/model/hostendpoint.go +++ b/libcalico-go/lib/backend/model/hostendpoint.go @@ -117,6 +117,7 @@ type HostEndpoint struct { Labels uniquelabels.Map `json:"labels,omitempty" validate:"omitempty,labels"` ProfileIDs []string `json:"profile_ids,omitempty" validate:"omitempty,dive,name"` Ports []EndpointPort `json:"ports,omitempty" validate:"dive"` + QoSControls *QoSControls `json:"qosControls,omitempty"` } func (e *HostEndpoint) WorkloadOrHostEndpoint() {} diff --git a/libcalico-go/lib/backend/syncersv1/updateprocessors/hostendpointprocessor.go b/libcalico-go/lib/backend/syncersv1/updateprocessors/hostendpointprocessor.go index 59730e6f83a..6ca889bafd8 100644 --- a/libcalico-go/lib/backend/syncersv1/updateprocessors/hostendpointprocessor.go +++ b/libcalico-go/lib/backend/syncersv1/updateprocessors/hostendpointprocessor.go @@ -1,4 +1,4 @@ -// Copyright (c) 2017 Tigera, Inc. All rights reserved. +// Copyright (c) 2017-2025 Tigera, Inc. All rights reserved. // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -16,10 +16,14 @@ package updateprocessors import ( "errors" + "fmt" apiv3 "github.com/projectcalico/api/pkg/apis/projectcalico/v3" + "github.com/projectcalico/api/pkg/lib/numorstring" + "github.com/sirupsen/logrus" "github.com/projectcalico/calico/lib/std/uniquelabels" + "github.com/projectcalico/calico/libcalico-go/lib/backend/k8s/conversion" "github.com/projectcalico/calico/libcalico-go/lib/backend/model" "github.com/projectcalico/calico/libcalico-go/lib/backend/watchersyncer" cnet "github.com/projectcalico/calico/libcalico-go/lib/net" @@ -28,11 +32,11 @@ import ( // Create a new SyncerUpdateProcessor to sync HostEndpoint data in v1 format for // consumption by both Felix and the BGP daemon. func NewHostEndpointUpdateProcessor() watchersyncer.SyncerUpdateProcessor { - return NewConflictResolvingCacheUpdateProcessor(apiv3.KindHostEndpoint, convertHostEndpointV2ToV1) + return NewConflictResolvingCacheUpdateProcessor(apiv3.KindHostEndpoint, convertHostEndpointV3ToV1) } // Convert v3 KVPair to the equivalent v1 KVPair. -func convertHostEndpointV2ToV1(kvp *model.KVPair) (*model.KVPair, error) { +func convertHostEndpointV3ToV1(kvp *model.KVPair) (*model.KVPair, error) { // Validate against incorrect key/value kinds. This indicates a code bug rather // than a user error. v3key, ok := kvp.Key.(model.ResourceKey) @@ -72,6 +76,12 @@ func convertHostEndpointV2ToV1(kvp *model.KVPair) (*model.KVPair, error) { }) } + qosControls, err := handleQoSControlsAnnotations(v3res.Annotations) + if err != nil { + // If QoSControls can't be parsed, log the error but keep processing the host endpoint + logrus.WithField("hep", v3res.Name).WithError(err).Warn("Error parsing QoSControl annotations") + } + v1value := &model.HostEndpoint{ Name: v3res.Spec.InterfaceName, ExpectedIPv4Addrs: ipv4Addrs, @@ -79,6 +89,7 @@ func convertHostEndpointV2ToV1(kvp *model.KVPair) (*model.KVPair, error) { Labels: uniquelabels.Make(v3res.GetLabels()), ProfileIDs: v3res.Spec.Profiles, Ports: ports, + QoSControls: qosControls, } return &model.KVPair{ @@ -87,3 +98,22 @@ func convertHostEndpointV2ToV1(kvp *model.KVPair) (*model.KVPair, error) { Revision: kvp.Revision, }, nil } + +func handleQoSControlsAnnotations(annotations map[string]string) (*model.QoSControls, error) { + var ( + qosControls *model.QoSControls + errs []error + ) + // Calico DSCP value for egress traffic annotation. + if str, found := annotations[conversion.AnnotationQoSEgressDSCP]; found { + dscp := numorstring.DSCPFromString(str) + err := dscp.Validate() + if err != nil { + errs = append(errs, fmt.Errorf("error parsing DSCP annotation: %w", err)) + } else { + qosControls = &model.QoSControls{DSCP: &dscp} + } + } + + return qosControls, errors.Join(errs...) +} diff --git a/libcalico-go/lib/backend/syncersv1/updateprocessors/hostendpointprocessor_test.go b/libcalico-go/lib/backend/syncersv1/updateprocessors/hostendpointprocessor_test.go index 751d9f0cc94..e154a5625d1 100644 --- a/libcalico-go/lib/backend/syncersv1/updateprocessors/hostendpointprocessor_test.go +++ b/libcalico-go/lib/backend/syncersv1/updateprocessors/hostendpointprocessor_test.go @@ -1,4 +1,4 @@ -// Copyright (c) 2017 Tigera, Inc. All rights reserved. +// Copyright (c) 2017-2025 Tigera, Inc. All rights reserved. // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -161,4 +161,68 @@ var _ = Describe("Test the HostEndpoint update processor", func() { }) Expect(err).To(HaveOccurred()) }) + + It("should parse valid QoSControl annotations", func() { + up := updateprocessors.NewHostEndpointUpdateProcessor() + + By("adding another HostEndpoint with a full configuration") + res := apiv3.NewHostEndpoint() + res.Name = v3HostEndpointKey2.Name + res.Labels = map[string]string{"testLabel": "label"} + res.Spec.Node = hn2 + res.Spec.InterfaceName = name2 + res.Spec.ExpectedIPs = []string{"10.100.10.1"} + expectedIpv4 := *net.ParseIP("10.100.10.1") + res.Spec.Profiles = []string{"testProfile"} + res.Spec.Ports = []apiv3.EndpointPort{ + apiv3.EndpointPort{ + Name: "portname", + Protocol: numorstring.ProtocolFromInt(uint8(30)), + Port: uint16(8080), + }, + } + res.Annotations = map[string]string{ + "arbitrary": "annotation", + "qos.projectcalico.org/dscp": "af43", + } + + kvps, err := up.Process(&model.KVPair{ + Key: v3HostEndpointKey2, + Value: res, + Revision: "1234", + }) + + dscp := numorstring.DSCPFromString("af43") + + Expect(err).NotTo(HaveOccurred()) + Expect(kvps).To(Equal([]*model.KVPair{ + { + Key: v1HostEndpointKey2, + Value: &model.HostEndpoint{ + Name: name2, + ExpectedIPv4Addrs: []net.IP{expectedIpv4}, + Labels: uniquelabels.Make(map[string]string{"testLabel": "label"}), + ProfileIDs: []string{"testProfile"}, + Ports: []model.EndpointPort{ + model.EndpointPort{ + Name: "portname", + Protocol: numorstring.ProtocolFromInt(uint8(30)), + Port: uint16(8080), + }, + }, + QoSControls: &model.QoSControls{ + DSCP: &dscp, + }, + }, + Revision: "1234", + }, + })) + + By("clearing the cache (by starting sync) and failing to delete the second host endpoint") + up.OnSyncerStarting() + kvps, err = up.Process(&model.KVPair{ + Key: v3HostEndpointKey2, + }) + Expect(err).To(HaveOccurred()) + }) })