Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OCPBUGS-44786: add support for the LLC alignment cpumanager policy option #2136

Merged
merged 3 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 111 additions & 7 deletions pkg/kubelet/cm/cpumanager/cpu_assignment.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,17 @@ func (n *numaFirst) takeFullSecondLevel() {
n.acc.takeFullSockets()
}

// Sort the UncoreCaches within the NUMA nodes.
func (a *cpuAccumulator) sortAvailableUncoreCaches() []int {
var result []int
for _, numa := range a.sortAvailableNUMANodes() {
uncore := a.details.UncoreInNUMANodes(numa).UnsortedList()
a.sort(uncore, a.details.CPUsInUncoreCaches)
result = append(result, uncore...)
}
return result
}

// If NUMA nodes are higher in the memory hierarchy than sockets, then just
// sort the NUMA nodes directly, and return them.
func (n *numaFirst) sortAvailableNUMANodes() []int {
Expand Down Expand Up @@ -318,6 +329,12 @@ func (a *cpuAccumulator) isSocketFree(socketID int) bool {
return a.details.CPUsInSockets(socketID).Size() == a.topo.CPUsPerSocket()
}

// Returns true if the supplied UnCoreCache is fully available,
// "fully available" means that all the CPUs in it are free.
func (a *cpuAccumulator) isUncoreCacheFree(uncoreID int) bool {
return a.details.CPUsInUncoreCaches(uncoreID).Size() == a.topo.CPUDetails.CPUsInUncoreCaches(uncoreID).Size()
}

// Returns true if the supplied core is fully available in `a.details`.
// "fully available" means that all the CPUs in it are free.
func (a *cpuAccumulator) isCoreFree(coreID int) bool {
Expand Down Expand Up @@ -346,6 +363,17 @@ func (a *cpuAccumulator) freeSockets() []int {
return free
}

// Returns free UncoreCache IDs as a slice sorted by sortAvailableUnCoreCache().
func (a *cpuAccumulator) freeUncoreCache() []int {
free := []int{}
for _, uncore := range a.sortAvailableUncoreCaches() {
if a.isUncoreCacheFree(uncore) {
free = append(free, uncore)
}
}
return free
}

// Returns free core IDs as a slice sorted by sortAvailableCores().
func (a *cpuAccumulator) freeCores() []int {
free := []int{}
Expand Down Expand Up @@ -519,6 +547,62 @@ func (a *cpuAccumulator) takeFullSockets() {
}
}

func (a *cpuAccumulator) takeFullUncore() {
for _, uncore := range a.freeUncoreCache() {
cpusInUncore := a.topo.CPUDetails.CPUsInUncoreCaches(uncore)
if !a.needsAtLeast(cpusInUncore.Size()) {
continue
}
klog.V(4).InfoS("takeFullUncore: claiming uncore", "uncore", uncore)
a.take(cpusInUncore)
}
}

func (a *cpuAccumulator) takePartialUncore(uncoreID int) {
numCoresNeeded := a.numCPUsNeeded / a.topo.CPUsPerCore()

// determine the N number of free cores (physical cpus) within the UncoreCache, then
// determine the M number of free cpus (virtual cpus) that correspond with the free cores
freeCores := a.details.CoresNeededInUncoreCache(numCoresNeeded, uncoreID)
freeCPUs := a.details.CPUsInCores(freeCores.UnsortedList()...)

// claim the cpus if the free cpus within the UncoreCache can satisfy the needed cpus
claimed := (a.numCPUsNeeded == freeCPUs.Size())
klog.V(4).InfoS("takePartialUncore: trying to claim partial uncore",
"uncore", uncoreID,
"claimed", claimed,
"needed", a.numCPUsNeeded,
"cores", freeCores.String(),
"cpus", freeCPUs.String())
if !claimed {
return

}
a.take(freeCPUs)
}

// First try to take full UncoreCache, if available and need is at least the size of the UncoreCache group.
// Second try to take the partial UncoreCache if available and the request size can fit w/in the UncoreCache.
func (a *cpuAccumulator) takeUncoreCache() {
numCPUsInUncore := a.topo.CPUsPerUncore()
for _, uncore := range a.sortAvailableUncoreCaches() {
// take full UncoreCache if the CPUs needed is greater than free UncoreCache size
if a.needsAtLeast(numCPUsInUncore) {
a.takeFullUncore()
}

if a.isSatisfied() {
return
}

// take partial UncoreCache if the CPUs needed is less than free UncoreCache size
a.takePartialUncore(uncore)
if a.isSatisfied() {
return
}
}
}

func (a *cpuAccumulator) takeFullCores() {
for _, core := range a.freeCores() {
cpusInCore := a.topo.CPUDetails.CPUsInCores(core)
Expand Down Expand Up @@ -637,6 +721,14 @@ func (a *cpuAccumulator) iterateCombinations(n []int, k int, f func([]int) LoopC
// or the remaining number of CPUs to take after having taken full sockets and NUMA nodes is less
// than a whole NUMA node, the function tries to take whole physical cores (cores).
//
// If `PreferAlignByUncoreCache` is enabled, the function will try to optimally assign Uncorecaches.
// If `numCPUs` is larger than or equal to the total number of CPUs in a Uncorecache, and there are
// free (i.e. all CPUs within the Uncorecache are free) Uncorecaches, the function takes as many entire
// cores from free Uncorecaches as possible. If/Once `numCPUs` is smaller than the total number of
// CPUs in a free Uncorecache, the function scans each Uncorecache index in numerical order to assign
// cores that will fit within the Uncorecache. If `numCPUs` cannot fit within any Uncorecache, the
// function tries to take whole physical cores.
//
// If `numCPUs` is bigger than the total number of CPUs in a core, and there are
// free (i.e. all CPUs in them are free) cores, the function takes as many entire free cores as possible.
// The cores are taken from one socket at a time, and the sockets are considered by
Expand All @@ -658,7 +750,7 @@ func (a *cpuAccumulator) iterateCombinations(n []int, k int, f func([]int) LoopC
// the least amount of free CPUs to the one with the highest amount of free CPUs (i.e. in ascending
// order of free CPUs). For any NUMA node, the cores are selected from the ones in the socket with
// the least amount of free CPUs to the one with the highest amount of free CPUs.
func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int, cpuSortingStrategy CPUSortingStrategy) (cpuset.CPUSet, error) {
func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int, cpuSortingStrategy CPUSortingStrategy, preferAlignByUncoreCache bool) (cpuset.CPUSet, error) {
acc := newCPUAccumulator(topo, availableCPUs, numCPUs, cpuSortingStrategy)
if acc.isSatisfied() {
return acc.result, nil
Expand All @@ -681,7 +773,17 @@ func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.C
return acc.result, nil
}

// 2. Acquire whole cores, if available and the container requires at least
// 2. If PreferAlignByUncoreCache is enabled, acquire whole UncoreCaches
// if available and the container requires at least a UncoreCache's-worth
// of CPUs. Otherwise, acquire CPUs from the least amount of UncoreCaches.
if preferAlignByUncoreCache {
acc.takeUncoreCache()
if acc.isSatisfied() {
return acc.result, nil
}
}

// 3. Acquire whole cores, if available and the container requires at least
// a core's-worth of CPUs.
// If `CPUSortingStrategySpread` is specified, skip taking the whole core.
if cpuSortingStrategy != CPUSortingStrategySpread {
Expand All @@ -691,7 +793,7 @@ func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.C
}
}

// 3. Acquire single threads, preferring to fill partially-allocated cores
// 4. Acquire single threads, preferring to fill partially-allocated cores
// on the same sockets as the whole cores we have already taken in this
// allocation.
acc.takeRemainingCPUs()
Expand Down Expand Up @@ -769,8 +871,10 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
// If the number of CPUs requested cannot be handed out in chunks of
// 'cpuGroupSize', then we just call out the packing algorithm since we
// can't distribute CPUs in this chunk size.
// PreferAlignByUncoreCache feature not implemented here yet and set to false.
// Support for PreferAlignByUncoreCache to be done at beta release.
if (numCPUs % cpuGroupSize) != 0 {
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs, cpuSortingStrategy)
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs, cpuSortingStrategy, false)
}

// Otherwise build an accumulator to start allocating CPUs from.
Expand Down Expand Up @@ -953,7 +1057,7 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
// size 'cpuGroupSize' from 'bestCombo'.
distribution := (numCPUs / len(bestCombo) / cpuGroupSize) * cpuGroupSize
for _, numa := range bestCombo {
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), distribution, cpuSortingStrategy)
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), distribution, cpuSortingStrategy, false)
acc.take(cpus)
}

Expand All @@ -968,7 +1072,7 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
if acc.details.CPUsInNUMANodes(numa).Size() < cpuGroupSize {
continue
}
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), cpuGroupSize, cpuSortingStrategy)
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), cpuGroupSize, cpuSortingStrategy, false)
acc.take(cpus)
remainder -= cpuGroupSize
}
Expand All @@ -992,5 +1096,5 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu

// If we never found a combination of NUMA nodes that we could properly
// distribute CPUs across, fall back to the packing algorithm.
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs, cpuSortingStrategy)
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs, cpuSortingStrategy, false)
}
77 changes: 75 additions & 2 deletions pkg/kubelet/cm/cpumanager/cpu_assignment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,79 @@ func TestTakeByTopologyNUMAPacked(t *testing.T) {
"",
mustParseCPUSet(t, "0-29,40-69,30,31,70,71"),
},
// Test cases for PreferAlignByUncoreCache
{
"take cpus from two full UncoreCaches and partial from a single UncoreCache",
topoUncoreSingleSocketNoSMT,
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
mustParseCPUSet(t, "1-15"),
10,
"",
cpuset.New(1, 2, 4, 5, 6, 7, 8, 9, 10, 11),
},
{
"take one cpu from dual socket with HT - core from Socket 0",
topoDualSocketHT,
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
cpuset.New(1, 2, 3, 4, 5, 7, 8, 9, 10, 11),
1,
"",
cpuset.New(2),
},
{
"take first available UncoreCache from first socket",
topoUncoreDualSocketNoSMT,
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
mustParseCPUSet(t, "0-15"),
4,
"",
cpuset.New(0, 1, 2, 3),
},
{
"take all available UncoreCache from first socket",
topoUncoreDualSocketNoSMT,
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
mustParseCPUSet(t, "2-15"),
6,
"",
cpuset.New(2, 3, 4, 5, 6, 7),
},
{
"take first available UncoreCache from second socket",
topoUncoreDualSocketNoSMT,
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
mustParseCPUSet(t, "8-15"),
4,
"",
cpuset.New(8, 9, 10, 11),
},
{
"take first available UncoreCache from available NUMA",
topoUncoreSingleSocketMultiNuma,
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
mustParseCPUSet(t, "3,4-8,12"),
2,
"",
cpuset.New(4, 5),
},
{
"take cpus from best available UncoreCache group of multi uncore cache single socket - SMT enabled",
topoUncoreSingleSocketSMT,
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
mustParseCPUSet(t, "2-3,10-11,4-7,12-15"),
6,
"",
cpuset.New(4, 5, 6, 12, 13, 14),
},
{
"take cpus from multiple UncoreCache of single socket - SMT enabled",
topoUncoreSingleSocketSMT,
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
mustParseCPUSet(t, "1-7,9-15"),
10,
"",
mustParseCPUSet(t, "4-7,12-15,1,9"),
},
}...)

for _, tc := range testCases {
Expand All @@ -677,7 +750,7 @@ func TestTakeByTopologyNUMAPacked(t *testing.T) {
strategy = CPUSortingStrategySpread
}

result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs, strategy)
result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs, strategy, tc.opts.PreferAlignByUncoreCacheOption)
if tc.expErr != "" && err != nil && err.Error() != tc.expErr {
t.Errorf("expected error to be [%v] but it was [%v]", tc.expErr, err)
}
Expand Down Expand Up @@ -778,7 +851,7 @@ func TestTakeByTopologyWithSpreadPhysicalCPUsPreferredOption(t *testing.T) {
if tc.opts.DistributeCPUsAcrossCores {
strategy = CPUSortingStrategySpread
}
result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs, strategy)
result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs, strategy, tc.opts.PreferAlignByUncoreCacheOption)
if tc.expErr != "" && err.Error() != tc.expErr {
t.Errorf("testCase %q failed, expected error to be [%v] but it was [%v]", tc.description, tc.expErr, err)
}
Expand Down
20 changes: 12 additions & 8 deletions pkg/kubelet/cm/cpumanager/cpu_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,20 +651,24 @@ func TestCPUManagerGenerate(t *testing.T) {
{
Cores: []cadvisorapi.Core{
{
Id: 0,
Threads: []int{0},
Id: 0,
Threads: []int{0},
UncoreCaches: []cadvisorapi.Cache{{Id: 1}},
},
{
Id: 1,
Threads: []int{1},
Id: 1,
Threads: []int{1},
UncoreCaches: []cadvisorapi.Cache{{Id: 1}},
},
{
Id: 2,
Threads: []int{2},
Id: 2,
Threads: []int{2},
UncoreCaches: []cadvisorapi.Cache{{Id: 1}},
},
{
Id: 3,
Threads: []int{3},
Id: 3,
Threads: []int{3},
UncoreCaches: []cadvisorapi.Cache{{Id: 1}},
},
},
},
Expand Down
Loading