Skip to content

Commit

Permalink
Merge pull request #2136 from ffromani/ocp-split-l3-cache
Browse files Browse the repository at this point in the history
OCPBUGS-44786: add support for  the LLC alignment cpumanager policy option
  • Loading branch information
openshift-merge-bot[bot] authored Dec 11, 2024
2 parents 8ac36bf + 6fded69 commit 3c62f73
Show file tree
Hide file tree
Showing 11 changed files with 1,394 additions and 351 deletions.
118 changes: 111 additions & 7 deletions pkg/kubelet/cm/cpumanager/cpu_assignment.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,17 @@ func (n *numaFirst) takeFullSecondLevel() {
n.acc.takeFullSockets()
}

// Sort the UncoreCaches within the NUMA nodes.
func (a *cpuAccumulator) sortAvailableUncoreCaches() []int {
var result []int
for _, numa := range a.sortAvailableNUMANodes() {
uncore := a.details.UncoreInNUMANodes(numa).UnsortedList()
a.sort(uncore, a.details.CPUsInUncoreCaches)
result = append(result, uncore...)
}
return result
}

// If NUMA nodes are higher in the memory hierarchy than sockets, then just
// sort the NUMA nodes directly, and return them.
func (n *numaFirst) sortAvailableNUMANodes() []int {
Expand Down Expand Up @@ -318,6 +329,12 @@ func (a *cpuAccumulator) isSocketFree(socketID int) bool {
return a.details.CPUsInSockets(socketID).Size() == a.topo.CPUsPerSocket()
}

// Returns true if the supplied UnCoreCache is fully available,
// "fully available" means that all the CPUs in it are free.
func (a *cpuAccumulator) isUncoreCacheFree(uncoreID int) bool {
return a.details.CPUsInUncoreCaches(uncoreID).Size() == a.topo.CPUDetails.CPUsInUncoreCaches(uncoreID).Size()
}

// Returns true if the supplied core is fully available in `a.details`.
// "fully available" means that all the CPUs in it are free.
func (a *cpuAccumulator) isCoreFree(coreID int) bool {
Expand Down Expand Up @@ -346,6 +363,17 @@ func (a *cpuAccumulator) freeSockets() []int {
return free
}

// Returns free UncoreCache IDs as a slice sorted by sortAvailableUnCoreCache().
func (a *cpuAccumulator) freeUncoreCache() []int {
free := []int{}
for _, uncore := range a.sortAvailableUncoreCaches() {
if a.isUncoreCacheFree(uncore) {
free = append(free, uncore)
}
}
return free
}

// Returns free core IDs as a slice sorted by sortAvailableCores().
func (a *cpuAccumulator) freeCores() []int {
free := []int{}
Expand Down Expand Up @@ -519,6 +547,62 @@ func (a *cpuAccumulator) takeFullSockets() {
}
}

func (a *cpuAccumulator) takeFullUncore() {
for _, uncore := range a.freeUncoreCache() {
cpusInUncore := a.topo.CPUDetails.CPUsInUncoreCaches(uncore)
if !a.needsAtLeast(cpusInUncore.Size()) {
continue
}
klog.V(4).InfoS("takeFullUncore: claiming uncore", "uncore", uncore)
a.take(cpusInUncore)
}
}

func (a *cpuAccumulator) takePartialUncore(uncoreID int) {
numCoresNeeded := a.numCPUsNeeded / a.topo.CPUsPerCore()

// determine the N number of free cores (physical cpus) within the UncoreCache, then
// determine the M number of free cpus (virtual cpus) that correspond with the free cores
freeCores := a.details.CoresNeededInUncoreCache(numCoresNeeded, uncoreID)
freeCPUs := a.details.CPUsInCores(freeCores.UnsortedList()...)

// claim the cpus if the free cpus within the UncoreCache can satisfy the needed cpus
claimed := (a.numCPUsNeeded == freeCPUs.Size())
klog.V(4).InfoS("takePartialUncore: trying to claim partial uncore",
"uncore", uncoreID,
"claimed", claimed,
"needed", a.numCPUsNeeded,
"cores", freeCores.String(),
"cpus", freeCPUs.String())
if !claimed {
return

}
a.take(freeCPUs)
}

// First try to take full UncoreCache, if available and need is at least the size of the UncoreCache group.
// Second try to take the partial UncoreCache if available and the request size can fit w/in the UncoreCache.
func (a *cpuAccumulator) takeUncoreCache() {
numCPUsInUncore := a.topo.CPUsPerUncore()
for _, uncore := range a.sortAvailableUncoreCaches() {
// take full UncoreCache if the CPUs needed is greater than free UncoreCache size
if a.needsAtLeast(numCPUsInUncore) {
a.takeFullUncore()
}

if a.isSatisfied() {
return
}

// take partial UncoreCache if the CPUs needed is less than free UncoreCache size
a.takePartialUncore(uncore)
if a.isSatisfied() {
return
}
}
}

func (a *cpuAccumulator) takeFullCores() {
for _, core := range a.freeCores() {
cpusInCore := a.topo.CPUDetails.CPUsInCores(core)
Expand Down Expand Up @@ -637,6 +721,14 @@ func (a *cpuAccumulator) iterateCombinations(n []int, k int, f func([]int) LoopC
// or the remaining number of CPUs to take after having taken full sockets and NUMA nodes is less
// than a whole NUMA node, the function tries to take whole physical cores (cores).
//
// If `PreferAlignByUncoreCache` is enabled, the function will try to optimally assign Uncorecaches.
// If `numCPUs` is larger than or equal to the total number of CPUs in a Uncorecache, and there are
// free (i.e. all CPUs within the Uncorecache are free) Uncorecaches, the function takes as many entire
// cores from free Uncorecaches as possible. If/Once `numCPUs` is smaller than the total number of
// CPUs in a free Uncorecache, the function scans each Uncorecache index in numerical order to assign
// cores that will fit within the Uncorecache. If `numCPUs` cannot fit within any Uncorecache, the
// function tries to take whole physical cores.
//
// If `numCPUs` is bigger than the total number of CPUs in a core, and there are
// free (i.e. all CPUs in them are free) cores, the function takes as many entire free cores as possible.
// The cores are taken from one socket at a time, and the sockets are considered by
Expand All @@ -658,7 +750,7 @@ func (a *cpuAccumulator) iterateCombinations(n []int, k int, f func([]int) LoopC
// the least amount of free CPUs to the one with the highest amount of free CPUs (i.e. in ascending
// order of free CPUs). For any NUMA node, the cores are selected from the ones in the socket with
// the least amount of free CPUs to the one with the highest amount of free CPUs.
func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int, cpuSortingStrategy CPUSortingStrategy) (cpuset.CPUSet, error) {
func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int, cpuSortingStrategy CPUSortingStrategy, preferAlignByUncoreCache bool) (cpuset.CPUSet, error) {
acc := newCPUAccumulator(topo, availableCPUs, numCPUs, cpuSortingStrategy)
if acc.isSatisfied() {
return acc.result, nil
Expand All @@ -681,7 +773,17 @@ func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.C
return acc.result, nil
}

// 2. Acquire whole cores, if available and the container requires at least
// 2. If PreferAlignByUncoreCache is enabled, acquire whole UncoreCaches
// if available and the container requires at least a UncoreCache's-worth
// of CPUs. Otherwise, acquire CPUs from the least amount of UncoreCaches.
if preferAlignByUncoreCache {
acc.takeUncoreCache()
if acc.isSatisfied() {
return acc.result, nil
}
}

// 3. Acquire whole cores, if available and the container requires at least
// a core's-worth of CPUs.
// If `CPUSortingStrategySpread` is specified, skip taking the whole core.
if cpuSortingStrategy != CPUSortingStrategySpread {
Expand All @@ -691,7 +793,7 @@ func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.C
}
}

// 3. Acquire single threads, preferring to fill partially-allocated cores
// 4. Acquire single threads, preferring to fill partially-allocated cores
// on the same sockets as the whole cores we have already taken in this
// allocation.
acc.takeRemainingCPUs()
Expand Down Expand Up @@ -769,8 +871,10 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
// If the number of CPUs requested cannot be handed out in chunks of
// 'cpuGroupSize', then we just call out the packing algorithm since we
// can't distribute CPUs in this chunk size.
// PreferAlignByUncoreCache feature not implemented here yet and set to false.
// Support for PreferAlignByUncoreCache to be done at beta release.
if (numCPUs % cpuGroupSize) != 0 {
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs, cpuSortingStrategy)
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs, cpuSortingStrategy, false)
}

// Otherwise build an accumulator to start allocating CPUs from.
Expand Down Expand Up @@ -953,7 +1057,7 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
// size 'cpuGroupSize' from 'bestCombo'.
distribution := (numCPUs / len(bestCombo) / cpuGroupSize) * cpuGroupSize
for _, numa := range bestCombo {
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), distribution, cpuSortingStrategy)
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), distribution, cpuSortingStrategy, false)
acc.take(cpus)
}

Expand All @@ -968,7 +1072,7 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
if acc.details.CPUsInNUMANodes(numa).Size() < cpuGroupSize {
continue
}
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), cpuGroupSize, cpuSortingStrategy)
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), cpuGroupSize, cpuSortingStrategy, false)
acc.take(cpus)
remainder -= cpuGroupSize
}
Expand All @@ -992,5 +1096,5 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu

// If we never found a combination of NUMA nodes that we could properly
// distribute CPUs across, fall back to the packing algorithm.
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs, cpuSortingStrategy)
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs, cpuSortingStrategy, false)
}
77 changes: 75 additions & 2 deletions pkg/kubelet/cm/cpumanager/cpu_assignment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,79 @@ func TestTakeByTopologyNUMAPacked(t *testing.T) {
"",
mustParseCPUSet(t, "0-29,40-69,30,31,70,71"),
},
// Test cases for PreferAlignByUncoreCache
{
"take cpus from two full UncoreCaches and partial from a single UncoreCache",
topoUncoreSingleSocketNoSMT,
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
mustParseCPUSet(t, "1-15"),
10,
"",
cpuset.New(1, 2, 4, 5, 6, 7, 8, 9, 10, 11),
},
{
"take one cpu from dual socket with HT - core from Socket 0",
topoDualSocketHT,
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
cpuset.New(1, 2, 3, 4, 5, 7, 8, 9, 10, 11),
1,
"",
cpuset.New(2),
},
{
"take first available UncoreCache from first socket",
topoUncoreDualSocketNoSMT,
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
mustParseCPUSet(t, "0-15"),
4,
"",
cpuset.New(0, 1, 2, 3),
},
{
"take all available UncoreCache from first socket",
topoUncoreDualSocketNoSMT,
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
mustParseCPUSet(t, "2-15"),
6,
"",
cpuset.New(2, 3, 4, 5, 6, 7),
},
{
"take first available UncoreCache from second socket",
topoUncoreDualSocketNoSMT,
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
mustParseCPUSet(t, "8-15"),
4,
"",
cpuset.New(8, 9, 10, 11),
},
{
"take first available UncoreCache from available NUMA",
topoUncoreSingleSocketMultiNuma,
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
mustParseCPUSet(t, "3,4-8,12"),
2,
"",
cpuset.New(4, 5),
},
{
"take cpus from best available UncoreCache group of multi uncore cache single socket - SMT enabled",
topoUncoreSingleSocketSMT,
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
mustParseCPUSet(t, "2-3,10-11,4-7,12-15"),
6,
"",
cpuset.New(4, 5, 6, 12, 13, 14),
},
{
"take cpus from multiple UncoreCache of single socket - SMT enabled",
topoUncoreSingleSocketSMT,
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
mustParseCPUSet(t, "1-7,9-15"),
10,
"",
mustParseCPUSet(t, "4-7,12-15,1,9"),
},
}...)

for _, tc := range testCases {
Expand All @@ -677,7 +750,7 @@ func TestTakeByTopologyNUMAPacked(t *testing.T) {
strategy = CPUSortingStrategySpread
}

result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs, strategy)
result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs, strategy, tc.opts.PreferAlignByUncoreCacheOption)
if tc.expErr != "" && err != nil && err.Error() != tc.expErr {
t.Errorf("expected error to be [%v] but it was [%v]", tc.expErr, err)
}
Expand Down Expand Up @@ -778,7 +851,7 @@ func TestTakeByTopologyWithSpreadPhysicalCPUsPreferredOption(t *testing.T) {
if tc.opts.DistributeCPUsAcrossCores {
strategy = CPUSortingStrategySpread
}
result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs, strategy)
result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs, strategy, tc.opts.PreferAlignByUncoreCacheOption)
if tc.expErr != "" && err.Error() != tc.expErr {
t.Errorf("testCase %q failed, expected error to be [%v] but it was [%v]", tc.description, tc.expErr, err)
}
Expand Down
20 changes: 12 additions & 8 deletions pkg/kubelet/cm/cpumanager/cpu_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,20 +651,24 @@ func TestCPUManagerGenerate(t *testing.T) {
{
Cores: []cadvisorapi.Core{
{
Id: 0,
Threads: []int{0},
Id: 0,
Threads: []int{0},
UncoreCaches: []cadvisorapi.Cache{{Id: 1}},
},
{
Id: 1,
Threads: []int{1},
Id: 1,
Threads: []int{1},
UncoreCaches: []cadvisorapi.Cache{{Id: 1}},
},
{
Id: 2,
Threads: []int{2},
Id: 2,
Threads: []int{2},
UncoreCaches: []cadvisorapi.Cache{{Id: 1}},
},
{
Id: 3,
Threads: []int{3},
Id: 3,
Threads: []int{3},
UncoreCaches: []cadvisorapi.Cache{{Id: 1}},
},
},
},
Expand Down
Loading

0 comments on commit 3c62f73

Please sign in to comment.