From d25d27e70c2c72fa7e28dfdb327d2fb95339861c Mon Sep 17 00:00:00 2001 From: yanafu Date: Wed, 20 Nov 2024 16:39:51 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20worker=E9=80=89=E5=8F=96=E5=8F=8A?= =?UTF-8?q?=E6=81=A2=E5=A4=8D=E6=9C=BA=E5=88=B6=E4=BC=98=E5=8C=96=20#311?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/pkg/manager/remote/slots.go | 42 +++++++++---------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go index 81975a54..b4ad8406 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go @@ -649,9 +649,7 @@ func (wr *resource) handleLock(ctx context.Context) { case <-wr.emptyChan: wr.onSlotEmpty() case <-ticker.C: - if wr.waitingList.Len() > 0 { - go wr.occupyWaitList() - } + wr.occupyWaitList() } } } @@ -733,7 +731,7 @@ func (wr *resource) putSlot(msg lockWorkerMessage) { } else if !hasAvailableWorker { msg.result <- nil wr.waitingList.Remove(e) - blog.Debugf("remote slot: occupy waiting list, but no slot available for ban worker list %v, just turn it local", msg.banWorkerList) + blog.Infof("remote slot: occupy waiting list, but no slot available for ban worker list %v, just turn it local", msg.banWorkerList) } else { blog.Debugf("remote slot: occupy waiting list, but no slot available %v", msg.banWorkerList) } @@ -754,23 +752,25 @@ func (wr *resource) onSlotEmpty() { } func (wr *resource) occupyWaitList() { - for e := wr.waitingList.Front(); e != nil; e = e.Next() { - msg := e.Value.(*lockWorkerMessage) - set := wr.getUsageSet(msg.jobUsage) - if wr.isIdle(set) { - h, hasAvailableWorker := wr.occupyWorkerSlots(msg.largeFile, msg.banWorkerList) - if h != nil { - set.occupied++ - wr.occupiedSlots++ - msg.result <- h - wr.waitingList.Remove(e) - blog.Debugf("remote slot: occupy waiting list") - } else if !hasAvailableWorker { // no slot available for ban worker list, turn it local - blog.Infof("remote slot: occupy waiting list, but no slot available for ban worker list %v, just turn it local", msg.banWorkerList) - msg.result <- nil - wr.waitingList.Remove(e) - } else { - blog.Debugf("remote slot: occupy waiting list, but no slot available %v", msg.banWorkerList) + if wr.waitingList.Len() > 0 { + for e := wr.waitingList.Front(); e != nil; e = e.Next() { + msg := e.Value.(*lockWorkerMessage) + set := wr.getUsageSet(msg.jobUsage) + if wr.isIdle(set) { + h, hasAvailableWorker := wr.occupyWorkerSlots(msg.largeFile, msg.banWorkerList) + if h != nil { + set.occupied++ + wr.occupiedSlots++ + msg.result <- h + wr.waitingList.Remove(e) + blog.Debugf("remote slot: occupy waiting list") + } else if !hasAvailableWorker { // no slot available for ban worker list, turn it local + blog.Infof("remote slot: occupy waiting list, but no slot available for ban worker list %v, just turn it local", msg.banWorkerList) + msg.result <- nil + wr.waitingList.Remove(e) + } else { + blog.Debugf("remote slot: occupy waiting list, but no slot available %v", msg.banWorkerList) + } } } }