Skip to content

Commit

Permalink
feat:support result cache, issue: TencentBlueKing#315
Browse files Browse the repository at this point in the history
  • Loading branch information
tbs60 committed Dec 24, 2024
1 parent 59d8533 commit 1ed30b1
Show file tree
Hide file tree
Showing 10 changed files with 66 additions and 185 deletions.
2 changes: 1 addition & 1 deletion src/backend/booster/bk_dist/booster/command/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ func getServerFromConfig(c *commandCli.Context) (string, error) {

func getCacheServerHost(c *commandCli.Context) (string, error) {
if c.IsSet(FlagCacheServer) {
s := c.String(FlagServer)
s := c.String(FlagCacheServer)
blog.Infof("booster-command: use cache server from command line --cache_server specified: %s", s)
return s, nil
}
Expand Down
7 changes: 6 additions & 1 deletion src/backend/booster/bk_dist/common/resultcache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func (r Record) Valid() bool {

func (r Record) GetResultHashString() string {
v, ok := r[ResultKey]
if !ok {
if ok {
return v
}

Expand Down Expand Up @@ -400,6 +400,7 @@ type RecordGroup struct {
}

func (rg *RecordGroup) PutRecord(record Record) {
blog.Infof("resultcache: ready put record:%v", record)
rg.lock.Lock()
defer rg.lock.Unlock()

Expand All @@ -416,6 +417,7 @@ func (rg *RecordGroup) PutRecord(record Record) {
}

rg.LastStatus = StatusModified
blog.Infof("resultcache: finish put record:%v", record)
}

func (rg *RecordGroup) DeleteRecord(record Record) {
Expand Down Expand Up @@ -523,6 +525,7 @@ func NewTable(dir string) *Table {
// PutRecord adds a new record to the table.
func (t *Table) PutRecord(record Record) error {
if !record.Valid() {
blog.Infof("resultcache: record:%v is invalid", record)
return ErrorRecordInvalid
}

Expand Down Expand Up @@ -551,6 +554,7 @@ func (t *Table) PutRecord(record Record) error {
// DeleteRecord delete one record from the table.
func (t *Table) DeleteRecord(record Record) error {
if !record.Valid() {
blog.Infof("resultcache: record:%v is invalid", record)
return ErrorRecordInvalid
}

Expand Down Expand Up @@ -675,6 +679,7 @@ func (t *Table) Ticker() {
for {
select {
case <-ticker.C:
blog.Infof("resultcache: on ticker now...")
t.lock.RLock()
for _, rg := range t.Data {
if rg.LastStatus == StatusModified {
Expand Down
3 changes: 1 addition & 2 deletions src/backend/booster/bk_dist/common/sdk/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ type RemoteWorkerHandler interface {
ExecuteReportResultCache(
server *dcProtocol.Host,
attributes map[string]string,
results []*FileDesc,
mgr LockMgr) (*BKReportResultCacheResult, error)
results []*FileDesc) (*BKReportResultCacheResult, error)

ExecuteQueryResultCacheIndex(
server *dcProtocol.Host,
Expand Down
1 change: 1 addition & 0 deletions src/backend/booster/bk_dist/controller/pkg/api/v1/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ func (s *sdk) register(config dcSDK.ControllerRegisterConfig) (dcSDK.ControllerW
Apply: config.Apply,
}, &data)

blog.Infof("sdk: ready register with data:[%s]", string(data))
tmp, _, err := s.request("POST", registerURI, data, config.BatchMode)
if err != nil {
retry := 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ import (
"strings"

"github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/env"
"github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/protocol"
"github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/resultcache"
dcSDK "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/sdk"
"github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/util"
"github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/controller/pkg/types"
"github.com/TencentBlueKing/bk-turbo/src/backend/booster/common/blog"
)
Expand Down Expand Up @@ -53,11 +55,11 @@ func (e *executor) initResultCacheInfo() {
}

func (e *executor) localCacheEnabled() bool {
return e.cacheType&resultcache.CacheTypeLocal == 1
return e.cacheType&resultcache.CacheTypeLocal != 0
}

func (e *executor) remoteCacheEnabled() bool {
return e.cacheType&resultcache.CacheTypeRemote == 1
return e.cacheType&resultcache.CacheTypeRemote != 0
}

func (e *executor) cacheEnabled() bool {
Expand Down Expand Up @@ -175,6 +177,7 @@ func (e *executor) getLocalResultFiles(c *dcSDK.BKDistCommand) *types.LocalTaskE
}

func (e *executor) getRemoteResultFiles(c *dcSDK.BKDistCommand) *types.LocalTaskExecuteResult {
blog.Debugf("executor: ready get remote result files now")
rs, err := e.mgr.getRemoteResultCacheFile(e.preprocessResultKey)
if err != nil {
return nil
Expand Down Expand Up @@ -224,13 +227,32 @@ func (e *executor) getRemoteResultFiles(c *dcSDK.BKDistCommand) *types.LocalTask
}

for k, v := range resultmap {
// uncompress
data := rs.Resultfiles[v].Buffer
if rs.Resultfiles[v].Compresstype == protocol.CompressLZ4 {
dst := make([]byte, rs.Resultfiles[v].FileSize)
outdata, err := util.Lz4Uncompress(data, dst)
if err != nil {
blog.Errorf("executor: decompress with error: [%v], data len:[%d], "+
"buffer len:[%d], filesize:[%d]",
err, len(data), len(dst),
rs.Resultfiles[v].FileSize)
return nil
}
blog.Infof("executor: uncompressed %s from %d to %d, "+
"expected from %d to %d",
k, len(data), len(outdata),
rs.Resultfiles[v].CompressedSize, rs.Resultfiles[v].FileSize)
data = outdata
}

f, err := os.Create(k)
if err != nil {
blog.Errorf("executor: create file %s with error: %v", k, err)
return nil
}

_, err = f.Write(rs.Resultfiles[v].Buffer)
_, err = f.Write(data)
if err != nil {
f.Close()
blog.Errorf("executor: save file %s with error: %v", k, err)
Expand Down Expand Up @@ -342,6 +364,8 @@ func (e *executor) putLocalResultFiles(r *dcSDK.BKDistResult) error {
}

func (e *executor) putRemoteResult(r *dcSDK.BKDistResult, record resultcache.Record) error {
blog.Debugf("executor: ready put record:%v to remote now", record)

if len(r.Results) != 1 {
return nil
}
Expand All @@ -352,7 +376,8 @@ func (e *executor) putRemoteResult(r *dcSDK.BKDistResult, record resultcache.Rec
resultlen := len(r.Results[0].ResultFiles)
rs := make([]*dcSDK.FileDesc, 0, resultlen)
for _, v := range r.Results[0].ResultFiles {
rs = append(rs, &v)
f := v
rs = append(rs, &f)
}

_, err = e.mgr.reportRemoteResultCache(record, rs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -627,9 +627,7 @@ func (m *Mgr) reportRemoteResultCache(
result, err := handler.ExecuteReportResultCache(
m.resultdata.cacheServer,
record,
results,
m.work.LockMgr(),
)
results)

if result != nil {
blog.Infof("local: report result files to remote with retcode:%d,out message:%s,error message:%s",
Expand Down
6 changes: 6 additions & 0 deletions src/backend/booster/bk_dist/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ func main() {
}

c.Parse()
if !filepath.IsAbs(c.ResultCacheDir) {
newabs, err := filepath.Abs(c.ResultCacheDir)
if err == nil {
c.ResultCacheDir = newabs
}
}
config.GlobalResultCacheDir = c.ResultCacheDir

if !filepath.IsAbs(c.LogConfig.LogDir) {
Expand Down
118 changes: 4 additions & 114 deletions src/backend/booster/bk_dist/worker/pkg/client/bkcommondist_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,91 +742,25 @@ func (r *CommonRemoteHandler) ExecuteQuerySlot(
return client.conn, nil
}

// 这儿是将内存中的数据发送到cache server,无需加锁
func (r *CommonRemoteHandler) ExecuteReportResultCache(
server *dcProtocol.Host,
attributes map[string]string,
results []*dcSDK.FileDesc,
mgr dcSDK.LockMgr) (*dcSDK.BKReportResultCacheResult, error) {
results []*dcSDK.FileDesc) (*dcSDK.BKReportResultCacheResult, error) {

// record the exit status.
defer func() {
r.updateJobStatsFunc()
}()
blog.Infof("remotehandle rrc: start report %d result files to server %s", len(results), server)

// 加内存锁
var totalsize int64
var locksize int64
memorylocked := false
t1 := time.Now()
t2 := t1
tinit := time.Now()
blog.Infof("remotehandle rrc: start report record:%v and %d result files to server %s",
attributes, len(results), server)

var err error
var dlocallock, dencodereq, dmemorylock time.Duration

// 加本地资源锁
locallocked := false
if mgr != nil && len(results) > 0 {
if mgr.LockSlots(dcSDK.JobUsageDefault, 1) {
locallocked = true
blog.Debugf("remotehandle rrc: succeed to get one local lock")
}
}

t2 = time.Now()
dlocallock = t2.Sub(t1)
t1 = t2

if r.slot != nil && len(results) > 0 {
for _, v := range results {
totalsize += v.FileSize
}
// 考虑到文件需要读到内存,然后压缩,以及后续的pb协议打包,需要的内存大小至少是两倍
locksize = totalsize * 3
if locksize > 0 {
if r.slot.Lock(locksize) {
memorylocked = true
blog.Debugf("remotehandle rrc: succeed to get one memory lock")
}
}
}

t2 = time.Now()
dmemorylock = t2.Sub(t1)
t1 = t2

dcSDK.StatsTimeNow(&r.recordStats.ReportCachePackCommonStartTime)
messages, err := encodeReportCacheReq(attributes, results)
dcSDK.StatsTimeNow(&r.recordStats.ReportCachePackCommonEndTime)

t2 = time.Now()
dencodereq = t2.Sub(t1)
t1 = t2

if locallocked {
mgr.UnlockSlots(dcSDK.JobUsageDefault, 1)
blog.Debugf("remotehandle rrc: succeed to release one local lock")
}

if err != nil {
blog.Warnf("remotehandle rrc: error: %v", err)

if memorylocked {
r.slot.Unlock(locksize)
blog.Debugf("remotehandle rrc: succeed to release one memory lock")
}

return nil, err
}

debug.FreeOSMemory() // free memory anyway

// blog.Debugf("remotehandle rrc: success pack-up to server %s", server)

blog.Debugf("remotehandle rrc: finished encode report result cache request with %d files size:%d to server %s",
len(results), totalsize, server.Server)

// send request
dcSDK.StatsTimeNow(&r.recordStats.ReportCacheSendCommonStartTime)
// record the send starting status, sending should be waiting for a while.
Expand All @@ -838,11 +772,6 @@ func (r *CommonRemoteHandler) ExecuteReportResultCache(
if err := client.Connect(getRealServer(server.Server)); err != nil {
blog.Warnf("remotehandle rrc: error: %v", err)

if memorylocked {
r.slot.Unlock(locksize)
blog.Debugf("remotehandle rrc: succeed to release one memory lock")
}

return nil, err
}
d := time.Now().Sub(t)
Expand All @@ -853,41 +782,19 @@ func (r *CommonRemoteHandler) ExecuteReportResultCache(
_ = client.Close()
}()

t2 = time.Now()
dconnect := t2.Sub(t1)
t1 = t2

blog.Debugf("remotehandle rrc: success connect to server %s", server)

err = SendMessages(client, messages)
if memorylocked {
r.slot.Unlock(locksize)
blog.Debugf("remotehandle rrc: succeed to release one memory lock")
}

if err != nil {
blog.Warnf("remotehandle rrc: error: %v", err)
return nil, err
}

t2 = time.Now()
dsend := t2.Sub(t1)
t1 = t2

blog.Debugf("remotehandle rrc: report result cache, total %d files with size:%d to server %s",
len(results), totalsize, server.Server)

debug.FreeOSMemory() // free memory anyway

blog.Debugf("remotehandle rrc: success sent to server %s", server)
// receive result
data, err := receiveReportCacheRsp(client)
dcSDK.StatsTimeNow(&r.recordStats.ReportCacheSendCommonEndTime)

t2 = time.Now()
drecv := t2.Sub(t1)
t1 = t2

if err != nil {
blog.Warnf("remotehandle rrc: error: %v", err)
return nil, err
Expand All @@ -899,23 +806,6 @@ func (r *CommonRemoteHandler) ExecuteReportResultCache(
return nil, err
}

t2 = time.Now()
ddecode := t2.Sub(t1)
t1 = t2

dtotal := t2.Sub(tinit)

blog.Infof("remotehandle rrc: report result cache stat, total %d files size:%d server:%s "+
"memory lock : %f , local lock : %f , "+
"encode req : %f , connect : %f , "+
"send : %f , receive : %f , "+
"decode response : %f , total : %f",
len(results), totalsize, server.Server,
dmemorylock.Seconds(), dlocallock.Seconds(),
dencodereq.Seconds(), dconnect.Seconds(),
dsend.Seconds(), drecv.Seconds(),
ddecode.Seconds(), dtotal.Seconds())

blog.Debugf("remotehandle rrc: report result cache done *")

return result, nil
Expand Down
Loading

0 comments on commit 1ed30b1

Please sign in to comment.