Skip to content

Commit

Permalink
Merge pull request #163 from tbs60/dev_yanafu
Browse files Browse the repository at this point in the history
feat: 支持原生k8s集群cpu和内存小量扣除
  • Loading branch information
tming authored Jan 15, 2024
2 parents afec82b + 6d84a16 commit cb35a47
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 44 deletions.
4 changes: 2 additions & 2 deletions src/backend/booster/server/pkg/resource/crm/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ func (rm *resourceManager) recover() error {
return err
}

rm.nodeInfoPool = op.NewNodeInfoPool(rm.conf.BcsCPUPerInstance, rm.conf.BcsMemPerInstance, 1, rm.conf.InstanceType)
rm.nodeInfoPool = op.NewNodeInfoPool(rm.conf)

rm.registeredResourceMapLock.Lock()
defer rm.registeredResourceMapLock.Unlock()
Expand Down Expand Up @@ -1187,7 +1187,7 @@ func (hwu *handlerWithUser) AddBroker(
return hwu.mgr.addBroker(name, hwu.user, strategyType, strategy, param)
}

//GetInstanceType return the instanceType from key
// GetInstanceType return the instanceType from key
func (hwu *handlerWithUser) GetInstanceType(platform string, group string) *config.InstanceType {
retIst := config.InstanceType{
CPUPerInstance: hwu.mgr.conf.BcsCPUPerInstance,
Expand Down
101 changes: 68 additions & 33 deletions src/backend/booster/server/pkg/resource/crm/operator/bcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ const (
AttributeKeyPlatform = "Platform"
)

//CheckQueueKey describe the function that get queue key from attributes
// CheckQueueKey describe the function that get queue key from attributes
func (param *BcsLaunchParam) CheckQueueKey(instanceType config.InstanceType) bool {
platform, city := getInstanceKey(param.AttributeCondition)
if instanceType.Group == city && instanceType.Platform == platform {
Expand Down Expand Up @@ -104,7 +104,7 @@ type NodeInfo struct {
Disabled bool
}

func (ni *NodeInfo) figureAvailableInstanceFromFree(cpuPerInstance, memPerInstance, diskPerInstance float64) int {
func (ni *NodeInfo) FigureAvailableInstanceFromFree(cpuPerInstance, memPerInstance, diskPerInstance float64) int {
if cpuPerInstance == 0 || memPerInstance == 0 || diskPerInstance == 0 {
return 0
}
Expand Down Expand Up @@ -133,23 +133,40 @@ func (ni *NodeInfo) valid() bool {
}

// NewNodeInfoPool get a new node info pool
func NewNodeInfoPool(cpu, mem, disk float64, istTypes []config.InstanceType) *NodeInfoPool {
func NewNodeInfoPool(conf *config.ContainerResourceConfig) *NodeInfoPool {
nip := NodeInfoPool{
cpuPerInstance: cpu,
memPerInstance: mem,
diskPerInstance: disk,
nodeBlockMap: make(map[string]*NodeInfoBlock, 1000),
cpuPerInstance: conf.BcsCPUPerInstance,
memPerInstance: conf.BcsMemPerInstance,
cpuPerInstanceOffset: conf.BcsCPUPerInstanceOffset,
memPerInstanceOffset: conf.BcsMemPerInstanceOffset,
diskPerInstance: 1,
nodeBlockMap: make(map[string]*NodeInfoBlock, 1000),
}
for _, istItem := range istTypes {
for _, istItem := range conf.InstanceType {
condition := map[string]string{
AttributeKeyCity: istItem.Group,
AttributeKeyPlatform: istItem.Platform,
}
key := getBlockKey(condition)
nip.nodeBlockMap[key] = &NodeInfoBlock{
CPUPerInstance: istItem.CPUPerInstance,
MemPerInstance: istItem.MemPerInstance,
newBlock := NodeInfoBlock{
CPUPerInstance: conf.BcsCPUPerInstance,
MemPerInstance: conf.BcsMemPerInstance,
CPUPerInstanceOffset: conf.BcsCPUPerInstanceOffset,
MemPerInstanceOffset: conf.BcsMemPerInstanceOffset,
}
if istItem.CPUPerInstance > 0.0 {
newBlock.CPUPerInstance = istItem.CPUPerInstance
}
if istItem.MemPerInstance > 0.0 {
newBlock.MemPerInstance = istItem.MemPerInstance
}
if istItem.CPUPerInstanceOffset > 0.0 && istItem.CPUPerInstanceOffset < istItem.CPUPerInstance {
newBlock.CPUPerInstanceOffset = istItem.CPUPerInstanceOffset
}
if istItem.MemPerInstanceOffset > 0.0 && istItem.MemPerInstanceOffset < istItem.MemPerInstance {
newBlock.MemPerInstanceOffset = istItem.MemPerInstanceOffset
}
nip.nodeBlockMap[key] = &newBlock
}
return &nip
}
Expand All @@ -158,10 +175,12 @@ func NewNodeInfoPool(cpu, mem, disk float64, istTypes []config.InstanceType) *No
type NodeInfoPool struct {
sync.Mutex

cpuPerInstance float64
memPerInstance float64
diskPerInstance float64
lastUpdateTime time.Time
cpuPerInstance float64
memPerInstance float64
cpuPerInstanceOffset float64
memPerInstanceOffset float64
diskPerInstance float64
lastUpdateTime time.Time

nodeBlockMap map[string]*NodeInfoBlock
}
Expand Down Expand Up @@ -198,11 +217,13 @@ func (nip *NodeInfoPool) GetStats() string {
}

message += fmt.Sprintf(
"\nCity: %s[cpuPerInstance: %.2f, memPerInstance:%.2f], available-instance: %d, report-instance: %d, noready-instance: %d "+
"\nCity: %s[cpuPerInstance:%.2f,cpuPerIstOffset:%.2f,memPerInstance:%.2f,memPerIstOffset:%.2f], available-instance: %d, report-instance: %d, noready-instance: %d "+
"CPU-Left: %s, MEM-Left: %s",
city,
block.CPUPerInstance,
block.CPUPerInstanceOffset,
block.MemPerInstance,
block.MemPerInstanceOffset,
block.AvailableInstance-block.noReadyInstance,
block.AvailableInstance, block.noReadyInstance,
cpuLeftStr,
Expand Down Expand Up @@ -243,18 +264,26 @@ func (nip *NodeInfoPool) GetLastUpdateTime() time.Time {
return nip.lastUpdateTime
}

func (nip *NodeInfoPool) getNodeInstance(key string) (float64, float64) {
func (nip *NodeInfoPool) getNodeInstance(key string) (float64, float64, float64, float64) {
cpuPerInstance := nip.cpuPerInstance
memPerInstance := nip.memPerInstance
cpuPerInstanceOffset := nip.cpuPerInstanceOffset
memPerInstanceOffset := nip.memPerInstanceOffset
if _, ok := nip.nodeBlockMap[key]; ok {
if nip.nodeBlockMap[key].CPUPerInstance > 0.0 {
cpuPerInstance = nip.nodeBlockMap[key].CPUPerInstance
}
if nip.nodeBlockMap[key].MemPerInstance > 0.0 {
memPerInstance = nip.nodeBlockMap[key].MemPerInstance
}
if nip.nodeBlockMap[key].CPUPerInstanceOffset > 0.0 {
cpuPerInstanceOffset = nip.nodeBlockMap[key].CPUPerInstanceOffset
}
if nip.nodeBlockMap[key].MemPerInstanceOffset > 0.0 {
memPerInstanceOffset = nip.nodeBlockMap[key].MemPerInstanceOffset
}
}
return cpuPerInstance, memPerInstance
return cpuPerInstance, memPerInstance, cpuPerInstanceOffset, memPerInstanceOffset
}

// UpdateResources 更新资源数据, 给定从operators获取的节点信息列表, 将其信息与当前的资源信息进行整合同步
Expand Down Expand Up @@ -294,14 +323,17 @@ func (nip *NodeInfoPool) UpdateResources(nodeInfoList []*NodeInfo) {
newBlock.MemLeft += NodeInfo.MemLeft
newBlock.CPULeft += NodeInfo.CPULeft
//inherit the instance model if exist
cpuPerInstance, memPerInstance := nip.getNodeInstance(key)
newBlock.AvailableInstance += NodeInfo.figureAvailableInstanceFromFree(
cpuPerInstance, memPerInstance, cpuPerInstanceOffset, memPerInstanceOffset := nip.getNodeInstance(key)
newBlock.AvailableInstance += NodeInfo.FigureAvailableInstanceFromFree(
cpuPerInstance,
memPerInstance,
nip.diskPerInstance,
)
newBlock.CPUPerInstance = cpuPerInstance
newBlock.MemPerInstance = memPerInstance
newBlock.CPUPerInstanceOffset = cpuPerInstanceOffset
newBlock.MemPerInstanceOffset = memPerInstanceOffset

// inherit the no-ready instance records
if _, ok := nip.nodeBlockMap[key]; ok {
newBlock.noReadyInstance = nip.nodeBlockMap[key].noReadyInstance
Expand All @@ -326,6 +358,8 @@ func (nip *NodeInfoPool) UpdateResources(nodeInfoList []*NodeInfo) {
nodeBlock.CPULeft = newBlock.CPULeft
nodeBlock.CPUPerInstance = newBlock.CPUPerInstance
nodeBlock.MemPerInstance = newBlock.MemPerInstance
nodeBlock.CPUPerInstanceOffset = newBlock.CPUPerInstanceOffset
nodeBlock.MemPerInstanceOffset = newBlock.MemPerInstanceOffset
nodeBlock.AvailableInstance = newBlock.AvailableInstance
nodeBlock.noReadyInstance = newBlock.noReadyInstance
}
Expand Down Expand Up @@ -383,19 +417,20 @@ func (nip *NodeInfoPool) ReleaseNoReadyInstance(key string, instance int) {
// NodeInfoBlock 描述了一个特定区域的资源信息, 通常由多个区域组成一个完整的资源池NodeInfoPool
// 例如 shenzhen区, shanghai区, projectA区等等, 同一个NodeInfoBlock内的资源是统一处理的, 拥有共同的noReady计数
type NodeInfoBlock struct {
DiskTotal float64
MemTotal float64
CPUTotal float64
DiskUsed float64
MemUsed float64
CPUUsed float64
DiskLeft float64
MemLeft float64
CPULeft float64
CPUPerInstance float64
MemPerInstance float64

AvailableInstance int
DiskTotal float64
MemTotal float64
CPUTotal float64
DiskUsed float64
MemUsed float64
CPUUsed float64
DiskLeft float64
MemLeft float64
CPULeft float64
CPUPerInstance float64
MemPerInstance float64
CPUPerInstanceOffset float64
MemPerInstanceOffset float64
AvailableInstance int

noReadyInstance int
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,21 +234,45 @@ func (o *operator) getResource(clusterID string) ([]*op.NodeInfo, error) {
dl, _ := node.Labels[disableLabel]
disabled := dl == "true"

memTotal := float64(node.Status.Capacity.Memory().Value()) / 1024 / 1024
cpuTotal := float64(node.Status.Capacity.Cpu().Value())
memUsed := float64(allocatedResource.Memory().Value()) / 1024 / 1024
cpuUsed := float64(allocatedResource.Cpu().Value())
diskUsed := float64(allocatedResource.StorageEphemeral().Value())
diskTotal := float64(node.Status.Capacity.StorageEphemeral().Value())
for _, ist := range o.conf.InstanceType {
if ist.Group == node.Labels[o.cityLabelKey] && ist.Platform == node.Labels[o.platformLabelKey] {
if ist.CPUPerInstanceOffset > 0.0 || ist.MemPerInstanceOffset > 0.0 {
//通过offset计算实际可用的instance数量,并矫正cpu和内存总量
n := op.NodeInfo{
MemTotal: memTotal,
CPUTotal: cpuTotal,
MemUsed: memUsed,
CPUUsed: cpuUsed,
DiskTotal: diskTotal,
DiskUsed: diskUsed,
}
availableNum := n.FigureAvailableInstanceFromFree(ist.CPUPerInstance-ist.CPUPerInstanceOffset, ist.MemPerInstance-ist.MemPerInstanceOffset, 1)
cpuTotal = cpuUsed + float64(availableNum)*ist.CPUPerInstance
memTotal = memUsed + float64(availableNum)*ist.MemPerInstance
}
break
}
}
// use city-label-key value and platform-label-key to overwrite the city and platform
node.Labels[op.AttributeKeyCity], _ = node.Labels[o.cityLabelKey]
node.Labels[op.AttributeKeyPlatform], _ = node.Labels[o.platformLabelKey]
nodeInfoList = append(nodeInfoList, &op.NodeInfo{
IP: ip,
Hostname: node.Name,
DiskTotal: float64(node.Status.Capacity.StorageEphemeral().Value()),
MemTotal: float64(node.Status.Capacity.Memory().Value()) / 1024 / 1024,
CPUTotal: float64(node.Status.Capacity.Cpu().Value()),
DiskUsed: float64(allocatedResource.StorageEphemeral().Value()),
MemUsed: float64(allocatedResource.Memory().Value()) / 1024 / 1024,
CPUUsed: float64(allocatedResource.Cpu().Value()),
DiskTotal: diskTotal,
MemTotal: memTotal,
CPUTotal: cpuTotal,
DiskUsed: diskUsed,
MemUsed: memUsed,
CPUUsed: cpuUsed,
Attributes: node.Labels,

Disabled: disabled,
Disabled: disabled,
})
}

Expand Down Expand Up @@ -413,8 +437,8 @@ func (o *operator) getFederationResource(clusterID string) ([]*op.NodeInfo, erro
IP: clusterID + "-" + o.conf.BcsNamespace + "-" + ist.Platform + "-" + ist.Group,
Hostname: clusterID + "-" + o.conf.BcsNamespace + "-" + ist.Platform + "-" + ist.Group,
DiskLeft: totalIst,
MemLeft: totalIst * ist.MemPerInstance,
CPULeft: totalIst * ist.CPUPerInstance,
MemLeft: totalIst * ist.MemPerInstance,
Attributes: map[string]string{
op.AttributeKeyPlatform: ist.Platform,
op.AttributeKeyCity: ist.Group,
Expand Down

0 comments on commit cb35a47

Please sign in to comment.