Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
3056247
add dscp values
mazdakn Aug 5, 2025
774aae3
Merge remote-tracking branch 'open-source/master' into dscp-dp
mazdakn Aug 6, 2025
39f7258
fix
mazdakn Aug 6, 2025
d70aa12
update tests
mazdakn Aug 6, 2025
4f5b44c
add more UTs
mazdakn Aug 6, 2025
43c866d
more tests
mazdakn Aug 11, 2025
e9fc95c
Merge remote-tracking branch 'open-source/master' into dscp-dp
mazdakn Aug 11, 2025
74b2132
fix
mazdakn Aug 12, 2025
4df1027
Add UT for DSCP type
mazdakn Aug 12, 2025
92511c9
Merge remote-tracking branch 'open-source/master' into dscp-dp
mazdakn Aug 14, 2025
2a3a4f2
first commit
mazdakn Aug 15, 2025
9bea8d0
Merge remote-tracking branch 'open-source/master' into dscp-dp
mazdakn Aug 15, 2025
2813b4b
Merge branch 'dscp-dp' into dscp-dp-hep
mazdakn Aug 15, 2025
70dc72e
more tsts
mazdakn Aug 15, 2025
18f50fa
Fix
mazdakn Aug 15, 2025
df78fc8
Merge remote-tracking branch 'open-source/master' into dscp-dp
mazdakn Aug 15, 2025
85c93e1
Merge branch 'dscp-dp' into dscp-dp-hep
mazdakn Aug 15, 2025
1c909a8
Merge remote-tracking branch 'open-source/master' into dscp-dp
mazdakn Aug 18, 2025
61ce4be
do not enable in bpf
mazdakn Aug 18, 2025
3796ed1
Merge branch 'dscp-dp' into dscp-dp-hep
mazdakn Aug 18, 2025
020e8a0
Merge remote-tracking branch 'open-source/master' into dscp-dp
mazdakn Aug 19, 2025
26f505c
Merge branch 'dscp-dp' into dscp-dp-hep
mazdakn Aug 19, 2025
03b1281
Merge remote-tracking branch 'open-source/master' into dscp-dp
mazdakn Aug 20, 2025
ec54d1b
Merge branch 'dscp-dp' into dscp-dp-hep
mazdakn Aug 20, 2025
9ba6786
gMerge remote-tracking branch 'open-source/master' into dscp-dp
mazdakn Aug 20, 2025
0d9b24b
Merge branch 'dscp-dp' into dscp-dp-hep
mazdakn Aug 20, 2025
50ee900
add fvs
mazdakn Aug 20, 2025
f035200
Merge remote-tracking branch 'open-source/master' into dscp-dp-hep
mazdakn Aug 20, 2025
92f3cbd
fix merg
mazdakn Aug 20, 2025
6552f52
fix
mazdakn Aug 20, 2025
90c670b
update proto
mazdakn Aug 20, 2025
919c08a
fix fv
mazdakn Aug 21, 2025
b8a9003
Merge remote-tracking branch 'open-source/master' into dscp-dp-hep
mazdakn Aug 21, 2025
02badf4
markup
mazdakn Aug 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions felix/calc/event_sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,12 @@ func ModelWorkloadEndpointToProto(ep *model.WorkloadEndpoint, peerData *Endpoint
}

func ModelHostEndpointToProto(ep *model.HostEndpoint, tiers, untrackedTiers, preDNATTiers []*proto.TierInfo, forwardTiers []*proto.TierInfo) *proto.HostEndpoint {
var qosPolicies []*proto.QoSPolicy
if ep.QoSControls != nil && ep.QoSControls.DSCP != nil {
qosPolicies = append(qosPolicies, &proto.QoSPolicy{
Dscp: int32(ep.QoSControls.DSCP.ToUint8()),
})
}
return &proto.HostEndpoint{
Name: ep.Name,
ExpectedIpv4Addrs: ipsToStrings(ep.ExpectedIPv4Addrs),
Expand All @@ -477,6 +483,7 @@ func ModelHostEndpointToProto(ep *model.HostEndpoint, tiers, untrackedTiers, pre
UntrackedTiers: untrackedTiers,
PreDnatTiers: preDNATTiers,
ForwardTiers: forwardTiers,
QosPolicies: qosPolicies,
}
}

Expand Down
31 changes: 31 additions & 0 deletions felix/calc/event_sequencer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,37 @@ var _ = DescribeTable("ModelHostEndpointToProto",
ProfileIds: []string{"prof1"},
},
),
Entry("host endpoint with QoSControls",
model.HostEndpoint{
Name: "eth0",
ExpectedIPv4Addrs: []net.IP{mustParseIP("10.28.0.13"), mustParseIP("10.28.0.14")},
ExpectedIPv6Addrs: []net.IP{mustParseIP("dead::beef"), mustParseIP("dead::bee5")},
Labels: uniquelabels.Make(map[string]string{
"a": "b",
}),
ProfileIDs: []string{"prof1"},
QoSControls: &model.QoSControls{
DSCP: &dscp,
},
},
[]*proto.TierInfo{{Name: "a", IngressPolicies: []string{"b"}}},
[]*proto.TierInfo{{Name: "a", EgressPolicies: []string{"c"}}},
[]*proto.TierInfo{{Name: "a", EgressPolicies: []string{"d"}}},
&proto.HostEndpoint{
Name: "eth0",
ExpectedIpv4Addrs: []string{"10.28.0.13", "10.28.0.14"},
ExpectedIpv6Addrs: []string{"dead::beef", "dead::bee5"},
Tiers: []*proto.TierInfo{{Name: "a", IngressPolicies: []string{"b"}}},
UntrackedTiers: []*proto.TierInfo{{Name: "a", EgressPolicies: []string{"c"}}},
ForwardTiers: []*proto.TierInfo{{Name: "a", EgressPolicies: []string{"d"}}},
ProfileIds: []string{"prof1"},
QosPolicies: []*proto.QoSPolicy{
&proto.QoSPolicy{
Dscp: 38,
},
},
},
),
)

var _ = Describe("ServiceAccount update/remove", func() {
Expand Down
70 changes: 55 additions & 15 deletions felix/dataplane/linux/qos_policy_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ import (
type qosPolicyManager struct {
ipVersion uint8
ruleRenderer rules.RuleRenderer
mangleTable Table

// QoS policy
mangleTable Table
// QoS policies.
wepPolicies map[types.WorkloadEndpointID]rules.QoSPolicy
hepPolicies map[types.HostEndpointID]rules.QoSPolicy
dirty bool
policies map[types.WorkloadEndpointID]rules.QoSPolicy

logCxt *logrus.Entry
}
Expand All @@ -43,30 +44,66 @@ func newQoSPolicyManager(
ipVersion uint8,
) *qosPolicyManager {
return &qosPolicyManager{
ipVersion: ipVersion,
policies: map[types.WorkloadEndpointID]rules.QoSPolicy{},
dirty: true,
mangleTable: mangleTable,
ruleRenderer: ruleRenderer,
ipVersion: ipVersion,
wepPolicies: map[types.WorkloadEndpointID]rules.QoSPolicy{},
hepPolicies: map[types.HostEndpointID]rules.QoSPolicy{},
dirty: true,
logCxt: logrus.WithField("ipVersion", ipVersion),
}
}

func (m *qosPolicyManager) OnUpdate(msg interface{}) {
switch msg := msg.(type) {
case *proto.HostEndpointUpdate:
m.handleHEPUpdates(msg.GetId(), msg)
case *proto.HostEndpointRemove:
m.handleHEPUpdates(msg.GetId(), nil)
case *proto.WorkloadEndpointUpdate:
m.handleWlEndpointUpdates(msg.GetId(), msg)
m.handleWEPUpdates(msg.GetId(), msg)
case *proto.WorkloadEndpointRemove:
m.handleWlEndpointUpdates(msg.GetId(), nil)
m.handleWEPUpdates(msg.GetId(), nil)
}
}

func (m *qosPolicyManager) handleHEPUpdates(hepID *proto.HostEndpointID, msg *proto.HostEndpointUpdate) {
id := types.ProtoToHostEndpointID(hepID)
if msg == nil || len(msg.Endpoint.QosPolicies) == 0 {
_, exists := m.hepPolicies[id]
if exists {
delete(m.hepPolicies, id)
m.dirty = true
}
return
}

// We only support one policy per endpoint at this point.
dscp := msg.Endpoint.QosPolicies[0].Dscp

// This situation must be handled earlier.
if dscp > 63 || dscp < 0 {
logrus.WithField("id", id).Panicf("Invalid DSCP value %v", dscp)
}
ips := msg.Endpoint.ExpectedIpv4Addrs
if m.ipVersion == 6 {
ips = msg.Endpoint.ExpectedIpv6Addrs
}
if len(ips) != 0 {
m.hepPolicies[id] = rules.QoSPolicy{
SrcAddrs: normaliseSourceAddr(ips),
DSCP: uint8(dscp),
}
m.dirty = true
}
}

func (m *qosPolicyManager) handleWlEndpointUpdates(wlID *proto.WorkloadEndpointID, msg *proto.WorkloadEndpointUpdate) {
id := types.ProtoToWorkloadEndpointID(wlID)
func (m *qosPolicyManager) handleWEPUpdates(wepID *proto.WorkloadEndpointID, msg *proto.WorkloadEndpointUpdate) {
id := types.ProtoToWorkloadEndpointID(wepID)
if msg == nil || len(msg.Endpoint.QosPolicies) == 0 {
_, exists := m.policies[id]
_, exists := m.wepPolicies[id]
if exists {
delete(m.policies, id)
delete(m.wepPolicies, id)
m.dirty = true
}
return
Expand All @@ -84,7 +121,7 @@ func (m *qosPolicyManager) handleWlEndpointUpdates(wlID *proto.WorkloadEndpointI
ips = msg.Endpoint.Ipv6Nets
}
if len(ips) != 0 {
m.policies[id] = rules.QoSPolicy{
m.wepPolicies[id] = rules.QoSPolicy{
SrcAddrs: normaliseSourceAddr(ips),
DSCP: uint8(dscp),
}
Expand All @@ -102,9 +139,12 @@ func normaliseSourceAddr(addrs []string) string {
}

func (m *qosPolicyManager) CompleteDeferredWork() error {
var policies []rules.QoSPolicy
if m.dirty {
var policies []rules.QoSPolicy
for _, p := range m.policies {
for _, p := range m.wepPolicies {
policies = append(policies, p)
}
for _, p := range m.hepPolicies {
policies = append(policies, p)
}
sort.Slice(policies, func(i, j int) bool {
Expand Down
106 changes: 100 additions & 6 deletions felix/dataplane/linux/qos_policy_mgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ func qosPolicyManagerTests(ipVersion uint8) func() {
}}})
})

It("should handle workload updates correctly", func() {
By("sending workload endpoint updates with DSCP annotion")
It("should handle endpoint updates correctly", func() {
By("sending first workload endpoint update with DSCP annotation")
endpoint1 := &proto.WorkloadEndpoint{
State: "active",
Name: "cali12345-ab",
Expand All @@ -84,7 +84,7 @@ func qosPolicyManagerTests(ipVersion uint8) func() {
},
}}})

By("sending another workload endpoint updates with DSCP annotion")
By("sending another workload endpoint update with DSCP annotation")
endpoint2 := &proto.WorkloadEndpoint{
State: "active",
Name: "cali2",
Expand Down Expand Up @@ -115,7 +115,7 @@ func qosPolicyManagerTests(ipVersion uint8) func() {
},
}}})

By("verifying update to DSCP value takes effect")
By("verifying update to first workload DSCP value")
endpoint1.QosPolicies = []*proto.QoSPolicy{{Dscp: 13}}
manager.OnUpdate(&proto.WorkloadEndpointUpdate{
Id: &wlEPID1,
Expand All @@ -140,7 +140,73 @@ func qosPolicyManagerTests(ipVersion uint8) func() {
},
}}})

By("verifying QoS policy rules removed when annotation is removed")
By("sending a host endpoint update with DSCP annotation")
hep1ID := &proto.HostEndpointID{
EndpointId: "id1",
}
hep1 := &proto.HostEndpoint{
Name: "eth0",
ExpectedIpv4Addrs: []string{"192.168.1.2", "192.168.2.2"},
ExpectedIpv6Addrs: []string{"2001:db9:10::2", "dead:beff::20:2"},
QosPolicies: []*proto.QoSPolicy{{Dscp: 44}},
}
manager.OnUpdate(&proto.HostEndpointUpdate{
Id: hep1ID,
Endpoint: hep1,
})

err = manager.CompleteDeferredWork()
Expect(err).NotTo(HaveOccurred())

mangleTable.checkChains([][]*generictables.Chain{{{
Name: rules.ChainQoSPolicy,
Rules: []generictables.Rule{
// Rendered policies are sorted.
{
Action: iptables.DSCPAction{Value: 20},
Match: iptables.Match().SourceNet(addrFromWlUpdate(endpoint2, ipVersion)),
},
{
Action: iptables.DSCPAction{Value: 13},
Match: iptables.Match().SourceNet(addrFromWlUpdate(endpoint1, ipVersion)),
},
{
Action: iptables.DSCPAction{Value: 44},
Match: iptables.Match().SourceNet(addrFromHepUpdate(hep1, ipVersion)),
},
},
}}})

By("verifying update to host endpoint DSCP value")
hep1.QosPolicies = []*proto.QoSPolicy{{Dscp: 30}}
manager.OnUpdate(&proto.HostEndpointUpdate{
Id: hep1ID,
Endpoint: hep1,
})

err = manager.CompleteDeferredWork()
Expect(err).NotTo(HaveOccurred())

mangleTable.checkChains([][]*generictables.Chain{{{
Name: rules.ChainQoSPolicy,
Rules: []generictables.Rule{
// Rendered policies are sorted.
{
Action: iptables.DSCPAction{Value: 20},
Match: iptables.Match().SourceNet(addrFromWlUpdate(endpoint2, ipVersion)),
},
{
Action: iptables.DSCPAction{Value: 13},
Match: iptables.Match().SourceNet(addrFromWlUpdate(endpoint1, ipVersion)),
},
{
Action: iptables.DSCPAction{Value: 30},
Match: iptables.Match().SourceNet(addrFromHepUpdate(hep1, ipVersion)),
},
},
}}})

By("verifying QoS policy rules removed when first workload annotation is removed")
endpoint1.QosPolicies = nil
manager.OnUpdate(&proto.WorkloadEndpointUpdate{
Id: &wlEPID1,
Expand All @@ -157,17 +223,37 @@ func qosPolicyManagerTests(ipVersion uint8) func() {
Action: iptables.DSCPAction{Value: 20},
Match: iptables.Match().SourceNet(addrFromWlUpdate(endpoint2, ipVersion)),
},
{
Action: iptables.DSCPAction{Value: 30},
Match: iptables.Match().SourceNet(addrFromHepUpdate(hep1, ipVersion)),
},
},
}}})

By("verifying QoS policy rules removed when workload is removed")
By("verifying QoS policy rules removed when second workload is removed")
manager.OnUpdate(&proto.WorkloadEndpointRemove{
Id: &wlEPID2,
})

err = manager.CompleteDeferredWork()
Expect(err).NotTo(HaveOccurred())
mangleTable.checkChains([][]*generictables.Chain{{{
Name: rules.ChainQoSPolicy,
Rules: []generictables.Rule{
{
Action: iptables.DSCPAction{Value: 30},
Match: iptables.Match().SourceNet(addrFromHepUpdate(hep1, ipVersion)),
},
},
}}})

By("verifying QoS policy rules removed when host endpoint is removed")
manager.OnUpdate(&proto.HostEndpointRemove{
Id: hep1ID,
})

err = manager.CompleteDeferredWork()
Expect(err).NotTo(HaveOccurred())
mangleTable.checkChains([][]*generictables.Chain{{{
Name: rules.ChainQoSPolicy,
Rules: nil,
Expand All @@ -183,3 +269,11 @@ func addrFromWlUpdate(endpoint *proto.WorkloadEndpoint, ipVersion uint8) string
}
return normaliseSourceAddr(addr)
}

func addrFromHepUpdate(endpoint *proto.HostEndpoint, ipVersion uint8) string {
addr := endpoint.ExpectedIpv4Addrs
if ipVersion == 6 {
addr = endpoint.ExpectedIpv6Addrs
}
return normaliseSourceAddr(addr)
}
Loading
Loading