Skip to content

Commit 94b4c23

Browse files
committed
feat: worker选取及恢复机制优化 TencentBlueKing#311
1 parent 8bd2aef commit 94b4c23

File tree

3 files changed

+3
-68
lines changed

3 files changed

+3
-68
lines changed

src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,6 @@ const (
3939
corkMaxSize = 1024 * 1024 * 10
4040
// corkMaxSize = 1024 * 1024 * 1024
4141
largeFileSize = 1024 * 1024 * 100 // 100MB
42-
43-
//fileMaxFailCount = 5
4442
)
4543

4644
// NewMgr get a new Remote Mgr
@@ -1070,8 +1068,7 @@ func (m *Mgr) ensureFiles(
10701068
if v.info.SendStatus == types.FileSendSucceed {
10711069
wg <- nil
10721070
continue
1073-
} else if v.info.SendStatus == types.FileSendFailed ||
1074-
v.info.SendStatus == types.FileSendUnknown {
1071+
} else if v.info.SendStatus == types.FileSendFailed {
10751072
wg <- types.ErrSendFileFailed
10761073
continue
10771074
}
@@ -1436,7 +1433,7 @@ type matchResult struct {
14361433

14371434
// checkOrLockCorkFiles 批量检查目标file的sendStatus, 如果已经被发送, 则返回当前状态和true; 如果没有被发送过, 则将其置于sending, 并返回false
14381435
func (m *Mgr) checkOrLockCorkFiles(server string, descs []*dcSDK.FileDesc) []matchResult {
1439-
if len(descs) == 0 || !descs[0].Retry { // 普通的文件
1436+
if len(descs) == 0 || !descs[0].Retry { // 第一次发送的文件
14401437
m.fileSendMutex.Lock()
14411438
target, ok := m.fileSendMap[server]
14421439
if !ok {
@@ -1446,7 +1443,7 @@ func (m *Mgr) checkOrLockCorkFiles(server string, descs []*dcSDK.FileDesc) []mat
14461443
m.fileSendMutex.Unlock()
14471444

14481445
return target.matchOrInserts(descs)
1449-
} else { // 失败的文件
1446+
} else { // 失败重试的文件
14501447
m.fileSendMutex.Lock()
14511448
target, ok := m.failFileSendMap[server]
14521449
if !ok {

src/backend/booster/bk_dist/controller/pkg/manager/remote/slotbyworkeroffer.go

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -282,36 +282,6 @@ func (wo *workerOffer) EnableWorker(host *dcProtocol.Host) {
282282
blog.Infof("remote slot: total slot:%d after enable host:%v", wo.validWorkerNum, *host)
283283
}
284284

285-
/*func (wo *workerOffer) CanWorkerRetry(host *dcProtocol.Host) bool {
286-
if host == nil {
287-
return false
288-
}
289-
290-
wo.workerLock.Lock()
291-
defer wo.workerLock.Unlock()
292-
293-
for _, wk := range wo.worker {
294-
if !wk.host.Equal(host) {
295-
continue
296-
}
297-
298-
if wk.dead {
299-
blog.Infof("remote slot: host:%v is already dead,do nothing now", host)
300-
return false
301-
}
302-
303-
if wk.status == Retrying {
304-
blog.Infof("remote slot: host:%v is retrying,do nothing now", host)
305-
return true
306-
}
307-
blog.Info("remote slot: host:%v can retry, change worker from %s to %s", host, wk.status, Retrying)
308-
wk.status = Retrying
309-
return false
310-
}
311-
312-
return false
313-
}*/
314-
315285
func (wo *workerOffer) SetWorkerStatus(host *dcProtocol.Host, s Status) {
316286
wo.workerLock.Lock()
317287
defer wo.workerLock.Unlock()

src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ type RemoteSlotMgr interface {
2828
RecoverDeadWorker(w *worker)
2929
DisableWorker(host *dcProtocol.Host)
3030
EnableWorker(host *dcProtocol.Host)
31-
//CanWorkerRetry(host *dcProtocol.Host) bool // check if worker can retry, if can set worker status to retrying
3231
SetWorkerStatus(host *dcProtocol.Host, status Status)
3332
Lock(usage dcSDK.JobUsage, f string, banWorkerList []*dcProtocol.Host) *dcProtocol.Host
3433
Unlock(usage dcSDK.JobUsage, host *dcProtocol.Host)
@@ -299,37 +298,6 @@ func (wr *resource) EnableWorker(host *dcProtocol.Host) {
299298
blog.Infof("remote slot: total slot:%d after enable host:%v", wr.totalSlots, *host)
300299
}
301300

302-
/*func (wr *resource) CanWorkerRetry(host *dcProtocol.Host) bool {
303-
if host == nil {
304-
return false
305-
}
306-
307-
wr.workerLock.Lock()
308-
defer wr.workerLock.Unlock()
309-
for _, wk := range wr.worker {
310-
if !wk.host.Equal(host) {
311-
continue
312-
}
313-
314-
if wk.dead {
315-
blog.Infof("remote slot: host:%v is already dead, do nothing now", host)
316-
return false
317-
}
318-
if !wk.disabled {
319-
return false
320-
}
321-
if wk.status == Retrying {
322-
blog.Infof("remote slot: host:%v is retrying, do nothing now", host)
323-
return false
324-
}
325-
blog.Info("remote slot: host:%v can retry, change worker from %s to %s", host, wk.status, Retrying)
326-
wk.status = Retrying
327-
return true
328-
}
329-
330-
return false
331-
}*/
332-
333301
func (wr *resource) SetWorkerStatus(host *dcProtocol.Host, s Status) {
334302
wr.workerLock.Lock()
335303
defer wr.workerLock.Unlock()

0 commit comments

Comments
 (0)