Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-4.16] WIP: NOMERGE: DNM: consume u/s LLC alignment support #2128

Open
wants to merge 5 commits into
base: release-4.16
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 105 additions & 8 deletions pkg/kubelet/cm/cpumanager/cpu_assignment.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,17 @@ func (n *numaFirst) takeFullSecondLevel() {
n.acc.takeFullSockets()
}

// Sort the UncoreCaches within the NUMA nodes.
func (a *cpuAccumulator) sortAvailableUncoreCaches() []int {
var result []int
for _, numa := range a.sortAvailableNUMANodes() {
uncore := a.details.UncoreInNUMANodes(numa).UnsortedList()
a.sort(uncore, a.details.CPUsInUncoreCaches)
result = append(result, uncore...)
}
return result
}

// If NUMA nodes are higher in the memory hierarchy than sockets, then just
// sort the NUMA nodes directly, and return them.
func (n *numaFirst) sortAvailableNUMANodes() []int {
Expand Down Expand Up @@ -238,7 +249,14 @@ func (a *cpuAccumulator) isSocketFree(socketID int) bool {
return a.details.CPUsInSockets(socketID).Size() == a.topo.CPUsPerSocket()
}

// Returns true if the supplied core is fully available in `topoDetails`.
// Returns true if the supplied UnCoreCache is fully available,
// "fully available" means that all the CPUs in it are free.
func (a *cpuAccumulator) isUncoreCacheFree(uncoreID int) bool {
return a.details.CPUsInUncoreCaches(uncoreID).Size() == a.topo.CPUDetails.CPUsInUncoreCaches(uncoreID).Size()
}

// Returns true if the supplied core is fully available in `a.details`.
// "fully available" means that all the CPUs in it are free.
func (a *cpuAccumulator) isCoreFree(coreID int) bool {
return a.details.CPUsInCores(coreID).Size() == a.topo.CPUsPerCore()
}
Expand All @@ -265,6 +283,17 @@ func (a *cpuAccumulator) freeSockets() []int {
return free
}

// Returns free UncoreCache IDs as a slice sorted by sortAvailableUnCoreCache().
func (a *cpuAccumulator) freeUncoreCache() []int {
free := []int{}
for _, uncore := range a.sortAvailableUncoreCaches() {
if a.isUncoreCacheFree(uncore) {
free = append(free, uncore)
}
}
return free
}

// Returns free core IDs as a slice sorted by sortAvailableCores().
func (a *cpuAccumulator) freeCores() []int {
free := []int{}
Expand Down Expand Up @@ -358,6 +387,62 @@ func (a *cpuAccumulator) takeFullSockets() {
}
}

func (a *cpuAccumulator) takeFullUncore() {
for _, uncore := range a.freeUncoreCache() {
cpusInUncore := a.topo.CPUDetails.CPUsInUncoreCaches(uncore)
if !a.needs(cpusInUncore.Size()) {
continue
}
klog.V(4).InfoS("takeFullUncore: claiming uncore", "uncore", uncore)
a.take(cpusInUncore)
}
}

func (a *cpuAccumulator) takePartialUncore(uncoreID int) {
numCoresNeeded := a.numCPUsNeeded / a.topo.CPUsPerCore()

// determine the N number of free cores (physical cpus) within the UncoreCache, then
// determine the M number of free cpus (virtual cpus) that correspond with the free cores
freeCores := a.details.CoresNeededInUncoreCache(numCoresNeeded, uncoreID)
freeCPUs := a.details.CPUsInCores(freeCores.UnsortedList()...)

// claim the cpus if the free cpus within the UncoreCache can satisfy the needed cpus
claimed := (a.numCPUsNeeded == freeCPUs.Size())
klog.V(4).InfoS("takePartialUncore: trying to claim partial uncore",
"uncore", uncoreID,
"claimed", claimed,
"needed", a.numCPUsNeeded,
"cores", freeCores.String(),
"cpus", freeCPUs.String())
if !claimed {
return

}
a.take(freeCPUs)
}

// First try to take full UncoreCache, if available and need is at least the size of the UncoreCache group.
// Second try to take the partial UncoreCache if available and the request size can fit w/in the UncoreCache.
func (a *cpuAccumulator) takeUncoreCache() {
numCPUsInUncore := a.topo.CPUsPerUncore()
for _, uncore := range a.sortAvailableUncoreCaches() {
// take full UncoreCache if the CPUs needed is greater than free UncoreCache size
if a.needs(numCPUsInUncore) {
a.takeFullUncore()
}

if a.isSatisfied() {
return
}

// take partial UncoreCache if the CPUs needed is less than free UncoreCache size
a.takePartialUncore(uncore)
if a.isSatisfied() {
return
}
}
}

func (a *cpuAccumulator) takeFullCores() {
for _, core := range a.freeCores() {
cpusInCore := a.topo.CPUDetails.CPUsInCores(core)
Expand Down Expand Up @@ -447,7 +532,7 @@ func (a *cpuAccumulator) iterateCombinations(n []int, k int, f func([]int) LoopC
helper(n, k, 0, []int{}, f)
}

func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) {
func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int, preferAlignByUncoreCache bool) (cpuset.CPUSet, error) {
acc := newCPUAccumulator(topo, availableCPUs, numCPUs)
if acc.isSatisfied() {
return acc.result, nil
Expand All @@ -470,14 +555,24 @@ func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.C
return acc.result, nil
}

// 2. Acquire whole cores, if available and the container requires at least
// 2. If PreferAlignByUncoreCache is enabled, acquire whole UncoreCaches
// if available and the container requires at least a UncoreCache's-worth
// of CPUs. Otherwise, acquire CPUs from the least amount of UncoreCaches.
if preferAlignByUncoreCache {
acc.takeUncoreCache()
if acc.isSatisfied() {
return acc.result, nil
}
}

// 3. Acquire whole cores, if available and the container requires at least
// a core's-worth of CPUs.
acc.takeFullCores()
if acc.isSatisfied() {
return acc.result, nil
}

// 3. Acquire single threads, preferring to fill partially-allocated cores
// 4. Acquire single threads, preferring to fill partially-allocated cores
// on the same sockets as the whole cores we have already taken in this
// allocation.
acc.takeRemainingCPUs()
Expand Down Expand Up @@ -555,8 +650,10 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
// If the number of CPUs requested cannot be handed out in chunks of
// 'cpuGroupSize', then we just call out the packing algorithm since we
// can't distribute CPUs in this chunk size.
// PreferAlignByUncoreCache feature not implemented here yet and set to false.
// Support for PreferAlignByUncoreCache to be done at beta release.
if (numCPUs % cpuGroupSize) != 0 {
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs)
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs, false)
}

// Otherwise build an accumulator to start allocating CPUs from.
Expand Down Expand Up @@ -739,7 +836,7 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
// size 'cpuGroupSize' from 'bestCombo'.
distribution := (numCPUs / len(bestCombo) / cpuGroupSize) * cpuGroupSize
for _, numa := range bestCombo {
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), distribution)
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), distribution, false)
acc.take(cpus)
}

Expand All @@ -754,7 +851,7 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
if acc.details.CPUsInNUMANodes(numa).Size() < cpuGroupSize {
continue
}
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), cpuGroupSize)
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), cpuGroupSize, false)
acc.take(cpus)
remainder -= cpuGroupSize
}
Expand All @@ -778,5 +875,5 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu

// If we never found a combination of NUMA nodes that we could properly
// distribute CPUs across, fall back to the packing algorithm.
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs)
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs, false)
}
101 changes: 100 additions & 1 deletion pkg/kubelet/cm/cpumanager/cpu_assignment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,106 @@ func TestTakeByTopologyNUMAPacked(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs)
result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs, false)
if tc.expErr != "" && err != nil && err.Error() != tc.expErr {
t.Errorf("expected error to be [%v] but it was [%v]", tc.expErr, err)
}
if !result.Equals(tc.expResult) {
t.Errorf("expected result [%s] to equal [%s]", result, tc.expResult)
}
})
}
}

type takeByTopologyUncoreTestCase struct {
description string
topo *topology.CPUTopology
opts StaticPolicyOptions
availableCPUs cpuset.CPUSet
numCPUs int
expErr string
expResult cpuset.CPUSet
}

func TestTakeByTopologyUncore(t *testing.T) {
testCases := []takeByTopologyUncoreTestCase{
{
"take cpus from two full UncoreCaches and partial from a single UncoreCache",
topoUncoreSingleSocketNoSMT,
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
mustParseCPUSet(t, "1-15"),
10,
"",
cpuset.New(1, 2, 4, 5, 6, 7, 8, 9, 10, 11),
},
{
"take one cpu from dual socket with HT - core from Socket 0",
topoDualSocketHT,
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
cpuset.New(1, 2, 3, 4, 5, 7, 8, 9, 10, 11),
1,
"",
cpuset.New(2),
},
{
"take first available UncoreCache from first socket",
topoUncoreDualSocketNoSMT,
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
mustParseCPUSet(t, "0-15"),
4,
"",
cpuset.New(0, 1, 2, 3),
},
{
"take all available UncoreCache from first socket",
topoUncoreDualSocketNoSMT,
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
mustParseCPUSet(t, "2-15"),
6,
"",
cpuset.New(2, 3, 4, 5, 6, 7),
},
{
"take first available UncoreCache from second socket",
topoUncoreDualSocketNoSMT,
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
mustParseCPUSet(t, "8-15"),
4,
"",
cpuset.New(8, 9, 10, 11),
},
{
"take first available UncoreCache from available NUMA",
topoUncoreSingleSocketMultiNuma,
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
mustParseCPUSet(t, "3,4-8,12"),
2,
"",
cpuset.New(4, 5),
},
{
"take cpus from best available UncoreCache group of multi uncore cache single socket - SMT enabled",
topoUncoreSingleSocketSMT,
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
mustParseCPUSet(t, "2-3,10-11,4-7,12-15"),
6,
"",
cpuset.New(4, 5, 6, 12, 13, 14),
},
{
"take cpus from multiple UncoreCache of single socket - SMT enabled",
topoUncoreSingleSocketSMT,
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
mustParseCPUSet(t, "1-7,9-15"),
10,
"",
mustParseCPUSet(t, "4-7,12-15,1,9"),
},
}

for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs, tc.opts.PreferAlignByUncoreCacheOption)
if tc.expErr != "" && err != nil && err.Error() != tc.expErr {
t.Errorf("expected error to be [%v] but it was [%v]", tc.expErr, err)
}
Expand Down
20 changes: 12 additions & 8 deletions pkg/kubelet/cm/cpumanager/cpu_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,20 +651,24 @@ func TestCPUManagerGenerate(t *testing.T) {
{
Cores: []cadvisorapi.Core{
{
Id: 0,
Threads: []int{0},
Id: 0,
Threads: []int{0},
UncoreCaches: []cadvisorapi.Cache{{Id: 1}},
},
{
Id: 1,
Threads: []int{1},
Id: 1,
Threads: []int{1},
UncoreCaches: []cadvisorapi.Cache{{Id: 1}},
},
{
Id: 2,
Threads: []int{2},
Id: 2,
Threads: []int{2},
UncoreCaches: []cadvisorapi.Cache{{Id: 1}},
},
{
Id: 3,
Threads: []int{3},
Id: 3,
Threads: []int{3},
UncoreCaches: []cadvisorapi.Cache{{Id: 1}},
},
},
},
Expand Down
16 changes: 16 additions & 0 deletions pkg/kubelet/cm/cpumanager/policy_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
FullPCPUsOnlyOption string = "full-pcpus-only"
DistributeCPUsAcrossNUMAOption string = "distribute-cpus-across-numa"
AlignBySocketOption string = "align-by-socket"
PreferAlignByUnCoreCacheOption string = "prefer-align-cpus-by-uncorecache"
)

var (
Expand All @@ -41,6 +42,7 @@ var (
)
betaOptions = sets.New[string](
FullPCPUsOnlyOption,
PreferAlignByUnCoreCacheOption,
)
stableOptions = sets.New[string]()
)
Expand Down Expand Up @@ -80,6 +82,9 @@ type StaticPolicyOptions struct {
// Flag to ensure CPUs are considered aligned at socket boundary rather than
// NUMA boundary
AlignBySocket bool
// Flag that makes best-effort to align CPUs to a uncorecache boundary
// As long as there are CPUs available, pods will be admitted if the condition is not met.
PreferAlignByUncoreCacheOption bool
}

// NewStaticPolicyOptions creates a StaticPolicyOptions struct from the user configuration.
Expand Down Expand Up @@ -109,12 +114,23 @@ func NewStaticPolicyOptions(policyOptions map[string]string) (StaticPolicyOption
return opts, fmt.Errorf("bad value for option %q: %w", name, err)
}
opts.AlignBySocket = optValue
case PreferAlignByUnCoreCacheOption:
optValue, err := strconv.ParseBool(value)
if err != nil {
return opts, fmt.Errorf("bad value for option %q: %w", name, err)
}
opts.PreferAlignByUncoreCacheOption = optValue
default:
// this should never be reached, we already detect unknown options,
// but we keep it as further safety.
return opts, fmt.Errorf("unsupported cpumanager option: %q (%s)", name, value)
}
}

if opts.PreferAlignByUncoreCacheOption && opts.DistributeCPUsAcrossNUMA {
return opts, fmt.Errorf("static policy options %s and %s can not be used at the same time", PreferAlignByUnCoreCacheOption, DistributeCPUsAcrossNUMAOption)
}

return opts, nil
}

Expand Down
Loading