Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,36 +25,36 @@ import (
"github.com/projectcalico/calico/felix/types"
)

type qosPolicyManager struct {
type dscpManager struct {
ipVersion uint8
ruleRenderer rules.RuleRenderer
mangleTable Table

// QoS policies.
wepPolicies map[types.WorkloadEndpointID]rules.QoSPolicy
hepPolicies map[types.HostEndpointID]rules.QoSPolicy
wepPolicies map[types.WorkloadEndpointID]rules.DSCPRule
hepPolicies map[types.HostEndpointID]rules.DSCPRule
dirty bool

logCxt *logrus.Entry
}

func newQoSPolicyManager(
func newDSCPManager(
mangleTable Table,
ruleRenderer rules.RuleRenderer,
ipVersion uint8,
) *qosPolicyManager {
return &qosPolicyManager{
) *dscpManager {
return &dscpManager{
mangleTable: mangleTable,
ruleRenderer: ruleRenderer,
ipVersion: ipVersion,
wepPolicies: map[types.WorkloadEndpointID]rules.QoSPolicy{},
hepPolicies: map[types.HostEndpointID]rules.QoSPolicy{},
wepPolicies: map[types.WorkloadEndpointID]rules.DSCPRule{},
hepPolicies: map[types.HostEndpointID]rules.DSCPRule{},
dirty: true,
logCxt: logrus.WithField("ipVersion", ipVersion),
}
}

func (m *qosPolicyManager) OnUpdate(msg interface{}) {
func (m *dscpManager) OnUpdate(msg interface{}) {
switch msg := msg.(type) {
case *proto.HostEndpointUpdate:
m.handleHEPUpdates(msg.GetId(), msg)
Expand All @@ -67,7 +67,7 @@ func (m *qosPolicyManager) OnUpdate(msg interface{}) {
}
}

func (m *qosPolicyManager) handleHEPUpdates(hepID *proto.HostEndpointID, msg *proto.HostEndpointUpdate) {
func (m *dscpManager) handleHEPUpdates(hepID *proto.HostEndpointID, msg *proto.HostEndpointUpdate) {
id := types.ProtoToHostEndpointID(hepID)
if msg == nil || len(msg.Endpoint.QosPolicies) == 0 {
_, exists := m.hepPolicies[id]
Expand All @@ -90,15 +90,15 @@ func (m *qosPolicyManager) handleHEPUpdates(hepID *proto.HostEndpointID, msg *pr
ips = msg.Endpoint.ExpectedIpv6Addrs
}
if len(ips) != 0 {
m.hepPolicies[id] = rules.QoSPolicy{
m.hepPolicies[id] = rules.DSCPRule{
SrcAddrs: normaliseSourceAddr(ips),
DSCP: uint8(dscp),
Value: uint8(dscp),
}
m.dirty = true
}
}

func (m *qosPolicyManager) handleWEPUpdates(wepID *proto.WorkloadEndpointID, msg *proto.WorkloadEndpointUpdate) {
func (m *dscpManager) handleWEPUpdates(wepID *proto.WorkloadEndpointID, msg *proto.WorkloadEndpointUpdate) {
id := types.ProtoToWorkloadEndpointID(wepID)
if msg == nil || len(msg.Endpoint.QosPolicies) == 0 {
_, exists := m.wepPolicies[id]
Expand All @@ -121,9 +121,9 @@ func (m *qosPolicyManager) handleWEPUpdates(wepID *proto.WorkloadEndpointID, msg
ips = msg.Endpoint.Ipv6Nets
}
if len(ips) != 0 {
m.wepPolicies[id] = rules.QoSPolicy{
m.wepPolicies[id] = rules.DSCPRule{
SrcAddrs: normaliseSourceAddr(ips),
DSCP: uint8(dscp),
Value: uint8(dscp),
}
m.dirty = true
}
Expand All @@ -138,20 +138,20 @@ func normaliseSourceAddr(addrs []string) string {
return strings.Join(trimmedSources, ",")
}

func (m *qosPolicyManager) CompleteDeferredWork() error {
var policies []rules.QoSPolicy
func (m *dscpManager) CompleteDeferredWork() error {
var dscpRules []rules.DSCPRule
if m.dirty {
for _, p := range m.wepPolicies {
policies = append(policies, p)
for _, r := range m.wepPolicies {
dscpRules = append(dscpRules, r)
}
for _, p := range m.hepPolicies {
policies = append(policies, p)
for _, r := range m.hepPolicies {
dscpRules = append(dscpRules, r)
}
sort.Slice(policies, func(i, j int) bool {
return policies[i].SrcAddrs < policies[j].SrcAddrs
sort.Slice(dscpRules, func(i, j int) bool {
return dscpRules[i].SrcAddrs < dscpRules[j].SrcAddrs
})

chain := m.ruleRenderer.EgressQoSPolicyChain(policies)
chain := m.ruleRenderer.EgressDSCPChain(dscpRules)
m.mangleTable.UpdateChain(chain)
m.dirty = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ import (
"github.com/projectcalico/calico/felix/rules"
)

var _ = Describe("QoS policy manager IPv4", qosPolicyManagerTests(4))
var _ = Describe("QoS policy manager IPv6", qosPolicyManagerTests(6))
var _ = Describe("DSCP manager IPv4", dscpManagerTests(4))
var _ = Describe("DSCP manager IPv6", dscpManagerTests(6))

func qosPolicyManagerTests(ipVersion uint8) func() {
func dscpManagerTests(ipVersion uint8) func() {
return func() {
var (
manager *qosPolicyManager
manager *dscpManager
mangleTable *mockTable
ruleRenderer rules.RuleRenderer
)
Expand All @@ -45,14 +45,14 @@ func qosPolicyManagerTests(ipVersion uint8) func() {
MarkDrop: 0x10,
MarkEndpoint: 0x11110000,
})
manager = newQoSPolicyManager(mangleTable, ruleRenderer, ipVersion)
manager = newDSCPManager(mangleTable, ruleRenderer, ipVersion)
})

It("should program QoS policy chain with no rule", func() {
It("should program DSCP chain with no rule", func() {
err := manager.CompleteDeferredWork()
Expect(err).ToNot(HaveOccurred())
mangleTable.checkChains([][]*generictables.Chain{{{
Name: rules.ChainQoSPolicy,
Name: rules.ChainEgressDSCP,
Rules: nil,
}}})
})
Expand All @@ -75,7 +75,7 @@ func qosPolicyManagerTests(ipVersion uint8) func() {
Expect(err).NotTo(HaveOccurred())

mangleTable.checkChains([][]*generictables.Chain{{{
Name: rules.ChainQoSPolicy,
Name: rules.ChainEgressDSCP,
Rules: []generictables.Rule{
{
Action: iptables.DSCPAction{Value: 44},
Expand All @@ -101,7 +101,7 @@ func qosPolicyManagerTests(ipVersion uint8) func() {
Expect(err).NotTo(HaveOccurred())

mangleTable.checkChains([][]*generictables.Chain{{{
Name: rules.ChainQoSPolicy,
Name: rules.ChainEgressDSCP,
Rules: []generictables.Rule{
// Rendered policies are sorted.
{
Expand All @@ -126,7 +126,7 @@ func qosPolicyManagerTests(ipVersion uint8) func() {
Expect(err).NotTo(HaveOccurred())

mangleTable.checkChains([][]*generictables.Chain{{{
Name: rules.ChainQoSPolicy,
Name: rules.ChainEgressDSCP,
Rules: []generictables.Rule{
// Rendered policies are sorted.
{
Expand Down Expand Up @@ -159,7 +159,7 @@ func qosPolicyManagerTests(ipVersion uint8) func() {
Expect(err).NotTo(HaveOccurred())

mangleTable.checkChains([][]*generictables.Chain{{{
Name: rules.ChainQoSPolicy,
Name: rules.ChainEgressDSCP,
Rules: []generictables.Rule{
// Rendered policies are sorted.
{
Expand Down Expand Up @@ -188,7 +188,7 @@ func qosPolicyManagerTests(ipVersion uint8) func() {
Expect(err).NotTo(HaveOccurred())

mangleTable.checkChains([][]*generictables.Chain{{{
Name: rules.ChainQoSPolicy,
Name: rules.ChainEgressDSCP,
Rules: []generictables.Rule{
// Rendered policies are sorted.
{
Expand All @@ -206,7 +206,7 @@ func qosPolicyManagerTests(ipVersion uint8) func() {
},
}}})

By("verifying QoS policy rules removed when first workload annotation is removed")
By("verifying DSCP rule removed when first workload annotation is removed")
endpoint1.QosPolicies = nil
manager.OnUpdate(&proto.WorkloadEndpointUpdate{
Id: &wlEPID1,
Expand All @@ -217,7 +217,7 @@ func qosPolicyManagerTests(ipVersion uint8) func() {
Expect(err).NotTo(HaveOccurred())

mangleTable.checkChains([][]*generictables.Chain{{{
Name: rules.ChainQoSPolicy,
Name: rules.ChainEgressDSCP,
Rules: []generictables.Rule{
{
Action: iptables.DSCPAction{Value: 20},
Expand All @@ -230,15 +230,15 @@ func qosPolicyManagerTests(ipVersion uint8) func() {
},
}}})

By("verifying QoS policy rules removed when second workload is removed")
By("verifying DSCP rule removed when second workload is removed")
manager.OnUpdate(&proto.WorkloadEndpointRemove{
Id: &wlEPID2,
})

err = manager.CompleteDeferredWork()
Expect(err).NotTo(HaveOccurred())
mangleTable.checkChains([][]*generictables.Chain{{{
Name: rules.ChainQoSPolicy,
Name: rules.ChainEgressDSCP,
Rules: []generictables.Rule{
{
Action: iptables.DSCPAction{Value: 30},
Expand All @@ -247,15 +247,15 @@ func qosPolicyManagerTests(ipVersion uint8) func() {
},
}}})

By("verifying QoS policy rules removed when host endpoint is removed")
By("verifying DSCP rule removed when host endpoint is removed")
manager.OnUpdate(&proto.HostEndpointRemove{
Id: hep1ID,
})

err = manager.CompleteDeferredWork()
Expect(err).NotTo(HaveOccurred())
mangleTable.checkChains([][]*generictables.Chain{{{
Name: rules.ChainQoSPolicy,
Name: rules.ChainEgressDSCP,
Rules: nil,
}}})
})
Expand Down
4 changes: 2 additions & 2 deletions felix/dataplane/linux/int_dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -1123,7 +1123,7 @@ func NewIntDataplaneDriver(config Config) *InternalDataplane {
dp.RegisterManager(newHostsIPSetManager(ipSetsV4, 4, config))

if !config.BPFEnabled {
dp.RegisterManager(newQoSPolicyManager(mangleTableV4, ruleRenderer, 4))
dp.RegisterManager(newDSCPManager(mangleTableV4, ruleRenderer, 4))
}

if config.RulesConfig.IPIPEnabled {
Expand Down Expand Up @@ -1324,7 +1324,7 @@ func NewIntDataplaneDriver(config Config) *InternalDataplane {
dp.RegisterManager(newHostsIPSetManager(ipSetsV6, 6, config))

if !config.BPFEnabled {
dp.RegisterManager(newQoSPolicyManager(mangleTableV6, ruleRenderer, 6))
dp.RegisterManager(newDSCPManager(mangleTableV6, ruleRenderer, 6))
}

// Add a manager for IPv6 wireguard configuration. This is added irrespective of whether wireguard is actually enabled
Expand Down
10 changes: 5 additions & 5 deletions felix/fv/qos_policy_test.go → felix/fv/dscp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
"github.com/projectcalico/calico/libcalico-go/lib/options"
)

var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ qos policy tests", []apiconfig.DatastoreType{apiconfig.Kubernetes}, func(getInfra infrastructure.InfraFactory) {
var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ dscp tests", []apiconfig.DatastoreType{apiconfig.Kubernetes}, func(getInfra infrastructure.InfraFactory) {
const (
wepPortStr = "8055"
)
Expand Down Expand Up @@ -128,15 +128,15 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ qos policy tests", []apicon
extClient.Stop()
})

It("should have expected restriction on the rule jumping to QoS policy rules", func() {
It("should have expected restriction on the rule jumping to DSCP chain static rules", func() {
detecIptablesRule := func(felix *infrastructure.Felix, ipVersion uint8) {
binary := "iptables-save"
if ipVersion == 6 {
binary = "ip6tables-save"
}
allPoolsIPSet := fmt.Sprintf("cali%v0all-ipam-pools", ipVersion)
allHostsIPSet := fmt.Sprintf("cali%v0all-hosts-net", ipVersion)
tmpl := "-m set --match-set %v src -m set ! --match-set %v dst -m set ! --match-set %v dst -j cali-qos-policy"
tmpl := "-m set --match-set %v src -m set ! --match-set %v dst -m set ! --match-set %v dst -j cali-egress-dscp"
expectedRule1 := fmt.Sprintf(tmpl, allPoolsIPSet, allPoolsIPSet, allHostsIPSet)
expectedRule2 := fmt.Sprintf(tmpl, allHostsIPSet, allPoolsIPSet, allHostsIPSet)
getRules := func() string {
Expand All @@ -155,7 +155,7 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ qos policy tests", []apicon
}
allPoolsIPSet := fmt.Sprintf("@cali%v0all-ipam-pools", ipVersion)
allHostsIPSet := fmt.Sprintf("@cali%v0all-hosts-net", ipVersion)
tmpl := "%v saddr %v %v daddr != %v %v daddr != %v .* jump mangle-cali-qos-policy"
tmpl := "%v saddr %v %v daddr != %v %v daddr != %v .* jump mangle-cali-egress-dscp"
pattern1 := fmt.Sprintf(tmpl, ipFamily, allPoolsIPSet, ipFamily, allPoolsIPSet, ipFamily, allHostsIPSet)
pattern2 := fmt.Sprintf(tmpl, ipFamily, allHostsIPSet, ipFamily, allPoolsIPSet, ipFamily, allHostsIPSet)
getRules := func() string {
Expand Down Expand Up @@ -430,7 +430,7 @@ func verifyQoSPoliciesWithIPFamily(felix *infrastructure.Felix, ipv6 bool, value
if ipv6 {
ipFamily = "ip6"
}
cmd = []string{"nft", "-n", "list", "chain", ipFamily, "calico", "mangle-cali-qos-policy"}
cmd = []string{"nft", "-n", "list", "chain", ipFamily, "calico", "mangle-cali-egress-dscp"}
rulePattern = fmt.Sprintf("%v dscp set", ipFamily)
} else {
binary := "iptables-save"
Expand Down
22 changes: 11 additions & 11 deletions felix/rules/qos.go → felix/rules/dscp.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,23 @@ import (
"github.com/projectcalico/calico/felix/generictables"
)

type QoSPolicy struct {
type DSCPRule struct {
SrcAddrs string
DSCP uint8
Value uint8
}

func (r *DefaultRuleRenderer) EgressQoSPolicyChain(policies []QoSPolicy) *generictables.Chain {
var rules []generictables.Rule
// Policies is sorted and validated by QoS policy manager.
for _, p := range policies {
rules = append(rules, generictables.Rule{
Match: r.NewMatch().SourceNet(p.SrcAddrs),
Action: r.DSCP(p.DSCP),
func (r *DefaultRuleRenderer) EgressDSCPChain(rules []DSCPRule) *generictables.Chain {
var renderedRules []generictables.Rule
// Rules are sorted and validated by DSCP manager.
for _, rule := range rules {
renderedRules = append(renderedRules, generictables.Rule{
Match: r.NewMatch().SourceNet(rule.SrcAddrs),
Action: r.DSCP(rule.Value),
})
}

return &generictables.Chain{
Name: ChainQoSPolicy,
Rules: rules,
Name: ChainEgressDSCP,
Rules: renderedRules,
}
}
Comment thread
mazdakn marked this conversation as resolved.
Outdated
Loading