Skip to content

Commit

Permalink
Merge pull request #405 from weibocom/dev
Browse files Browse the repository at this point in the history
merge dev & release
  • Loading branch information
rayzhang0603 authored Sep 9, 2024
2 parents a6f5d1c + b035daf commit f3ccb8b
Show file tree
Hide file tree
Showing 83 changed files with 3,099 additions and 468 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
testing:
strategy:
matrix:
go-version: [1.12.x,1.13.x,1.14.x,1.15.x,1.16.x,1.17.x,1.18.x,1.19.x,1.20.x,1.21.x]
go-version: [1.16.x,1.17.x,1.18.x,1.19.x,1.20.x,1.21.x]
platform: [ubuntu-latest]
runs-on: ${{ matrix.platform }}
steps:
Expand All @@ -45,10 +45,10 @@ jobs:
name: codecov
runs-on: ubuntu-latest
steps:
- name: Set up Go 1.15
- name: Set up Go 1.16
uses: actions/setup-go@v3
with:
go-version: 1.15.x
go-version: 1.16.x
id: go

- name: Checkout code
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ zj*
# gdb
*.gdb_history
__*
*.log

# motan-go
*.pid
Expand All @@ -40,4 +41,4 @@ main/magent*
log/log.test*
go.sum
agent_runtime
test/
test/
114 changes: 87 additions & 27 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"github.com/weibocom/motan-go/endpoint"
vlog "github.com/weibocom/motan-go/log"
"github.com/weibocom/motan-go/meta"
"github.com/weibocom/motan-go/provider"
"gopkg.in/yaml.v2"
"io/ioutil"
"net"
Expand Down Expand Up @@ -193,6 +195,8 @@ func (a *Agent) StartMotanAgentFromConfig(config *cfg.Config) {
return
}
fmt.Println("init agent context success.")
// initialize meta package
meta.Initialize(a.Context)
a.initParam()
a.SetSanpshotConf()
a.initAgentURL()
Expand Down Expand Up @@ -225,7 +229,7 @@ func (a *Agent) StartMotanAgentFromConfig(config *cfg.Config) {

func (a *Agent) startRegistryFailback() {
vlog.Infoln("start agent failback")
ticker := time.NewTicker(registry.DefaultFailbackInterval * time.Millisecond)
ticker := time.NewTicker(time.Duration(registry.GetFailbackInterval()) * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
a.registryLock.Lock()
Expand Down Expand Up @@ -265,22 +269,8 @@ func (a *Agent) GetRegistryStatus() []map[string]*motan.RegistryStatus {
}

func (a *Agent) registerStatusSampler() {
metrics.RegisterStatusSampleFunc("memory", func() int64 {
p, _ := process.NewProcess(int32(os.Getpid()))
memInfo, err := p.MemoryInfo()
if err != nil {
return 0
}
return int64(memInfo.RSS >> 20)
})
metrics.RegisterStatusSampleFunc("cpu", func() int64 {
p, _ := process.NewProcess(int32(os.Getpid()))
cpuPercent, err := p.CPUPercent()
if err != nil {
return 0
}
return int64(cpuPercent)
})
metrics.RegisterStatusSampleFunc("memory", GetRssMemory)
metrics.RegisterStatusSampleFunc("cpu", GetCpuPercent)
metrics.RegisterStatusSampleFunc("goroutine_count", func() int64 {
return int64(runtime.NumGoroutine())
})
Expand All @@ -289,6 +279,24 @@ func (a *Agent) registerStatusSampler() {
})
}

func GetRssMemory() int64 {
p, _ := process.NewProcess(int32(os.Getpid()))
memInfo, err := p.MemoryInfo()
if err != nil {
return 0
}
return int64(memInfo.RSS >> 20)
}

func GetCpuPercent() int64 {
p, _ := process.NewProcess(int32(os.Getpid()))
cpuPercent, err := p.CPUPercent()
if err != nil {
return 0
}
return int64(cpuPercent)
}

func (a *Agent) initStatus() {
if a.recover {
a.recoverStatus()
Expand Down Expand Up @@ -371,7 +379,7 @@ func (a *Agent) initParam() {
port = defaultPort
}

mPort := *motan.Mport
mPort := motan.GetMport()
if mPort == 0 {
if envMPort, ok := os.LookupEnv("mport"); ok {
if envMPortInt, err := strconv.Atoi(envMPort); err == nil {
Expand Down Expand Up @@ -629,6 +637,7 @@ func (a *Agent) initAgentURL() {
} else {
agentURL.Parameters[motan.ApplicationKey] = agentURL.Group
}
motan.SetApplication(agentURL.Parameters[motan.ApplicationKey])
if agentURL.Group == "" {
agentURL.Group = defaultAgentGroup
agentURL.Parameters[motan.ApplicationKey] = defaultAgentGroup
Expand All @@ -652,8 +661,9 @@ func (a *Agent) initAgentURL() {
func (a *Agent) startAgent() {
url := a.agentURL.Copy()
url.Port = a.port
url.Protocol = mserver.Motan2
handler := &agentMessageHandler{agent: a}
server := &mserver.MotanServer{URL: url}
server := defaultExtFactory.GetServer(url)
server.SetMessageHandler(handler)
vlog.Infof("Motan agent is started. port:%d", a.port)
fmt.Println("Motan agent start.")
Expand All @@ -673,7 +683,7 @@ func (a *Agent) registerAgent() {
if agentURL.Host == "" {
agentURL.Host = motan.GetLocalIP()
}
if registryURL, regexit := a.Context.RegistryURLs[reg]; regexit {
if registryURL, regExist := a.Context.RegistryURLs[reg]; regExist {
registry := a.extFactory.GetRegistry(registryURL)
if registry != nil {
vlog.Infof("agent register in registry:%s, agent url:%s", registry.GetURL().GetIdentity(), agentURL.GetIdentity())
Expand All @@ -697,6 +707,16 @@ type agentMessageHandler struct {
agent *Agent
}

func (a *agentMessageHandler) GetName() string {
return "agentMessageHandler"
}

func (a *agentMessageHandler) GetRuntimeInfo() map[string]interface{} {
info := map[string]interface{}{}
info[motan.RuntimeMessageHandlerTypeKey] = a.GetName()
return info
}

func (a *agentMessageHandler) clusterCall(request motan.Request, ck string, motanCluster *cluster.MotanCluster) (res motan.Response) {
// fill default request info
fillDefaultReqInfo(request, motanCluster.GetURL())
Expand Down Expand Up @@ -747,8 +767,8 @@ func (a *agentMessageHandler) httpCall(request motan.Request, ck string, httpClu
}
httpRequest := fasthttp.AcquireRequest()
httpResponse := fasthttp.AcquireResponse()
// do not release http response
defer fasthttp.ReleaseRequest(httpRequest)
defer fasthttp.ReleaseResponse(httpResponse)
httpRequest.Header.Del("Host")
httpRequest.SetHost(originalService)
httpRequest.URI().SetPath(request.GetMethod())
Expand Down Expand Up @@ -886,6 +906,9 @@ func (a *Agent) unavailableAllServices() {
func (a *Agent) doExportService(url *motan.URL) {
a.svcLock.Lock()
defer a.svcLock.Unlock()
if _, ok := a.serviceExporters.Load(url.GetIdentityWithRegistry()); ok {
return
}

globalContext := a.Context
exporter := &mserver.DefaultExporter{}
Expand Down Expand Up @@ -932,11 +955,38 @@ func (a *Agent) doExportService(url *motan.URL) {
}

type serverAgentMessageHandler struct {
providers *motan.CopyOnWriteMap
providers *motan.CopyOnWriteMap
frameworkProviders *motan.CopyOnWriteMap
}

func (sa *serverAgentMessageHandler) GetName() string {
return "serverAgentMessageHandler"
}

func (sa *serverAgentMessageHandler) GetRuntimeInfo() map[string]interface{} {
info := map[string]interface{}{}
info[motan.RuntimeMessageHandlerTypeKey] = sa.GetName()
providersInfo := map[string]interface{}{}
sa.providers.Range(func(k, v interface{}) bool {
provider, ok := v.(motan.Provider)
if !ok {
return true
}
providersInfo[k.(string)] = provider.GetRuntimeInfo()
return true
})
info[motan.RuntimeProvidersKey] = providersInfo
return info
}

func (sa *serverAgentMessageHandler) Initialize() {
sa.providers = motan.NewCopyOnWriteMap()
sa.frameworkProviders = motan.NewCopyOnWriteMap()
sa.initFrameworkServiceProvider()
}

func (sa *serverAgentMessageHandler) initFrameworkServiceProvider() {
sa.frameworkProviders.Store(meta.MetaServiceName, &provider.MetaProvider{})
}

func getServiceKey(group, path string) string {
Expand All @@ -954,15 +1004,22 @@ func (sa *serverAgentMessageHandler) Call(request motan.Request) (res motan.Resp
group = request.GetAttachment(motan.GroupKey)
}
serviceKey := getServiceKey(group, request.GetServiceName())
if mfs := request.GetAttachment(mpro.MFrameworkService); mfs != "" {
if fp, ok := sa.frameworkProviders.Load(request.GetServiceName()); ok {
return fp.(motan.Provider).Call(request)
}
//throw specific exception to avoid triggering forced fusing on the client side。
return motan.BuildExceptionResponse(request.GetRequestID(), &motan.Exception{ErrCode: 501, ErrMsg: motan.ServiceNotSupport, ErrType: motan.ServiceException})
}
if p := sa.providers.LoadOrNil(serviceKey); p != nil {
p := p.(motan.Provider)
res = p.Call(request)
res.GetRPCContext(true).GzipSize = int(p.GetURL().GetIntValue(motan.GzipSizeKey, 0))
return res
}
vlog.Errorf("not found provider for %s", motan.GetReqInfo(request))
vlog.Errorf("%s%s, %s", motan.ProviderNotExistPrefix, serviceKey, motan.GetReqInfo(request))
atomic.AddInt64(&notFoundProviderCount, 1)
return motan.BuildExceptionResponse(request.GetRequestID(), &motan.Exception{ErrCode: 500, ErrMsg: "not found provider for " + serviceKey, ErrType: motan.ServiceException})
return motan.BuildExceptionResponse(request.GetRequestID(), &motan.Exception{ErrCode: motan.EProviderNotExist, ErrMsg: motan.ProviderNotExistPrefix + serviceKey, ErrType: motan.ServiceException})
}

func (sa *serverAgentMessageHandler) AddProvider(p motan.Provider) error {
Expand Down Expand Up @@ -1119,13 +1176,12 @@ func (a *Agent) startMServer() {
for kk, vv := range v {
handlers[kk] = vv
}

}
}
for k, v := range handlers {
a.mhandle(k, v)
}

var mPort int
var managementListener net.Listener
if managementUnixSockAddr := a.agentURL.GetParam(motan.ManagementUnixSockKey, ""); managementUnixSockAddr != "" {
listener, err := motan.ListenUnixSock(managementUnixSockAddr)
Expand Down Expand Up @@ -1159,19 +1215,21 @@ func (a *Agent) startMServer() {
managementListener = motan.TCPKeepAliveListener{TCPListener: listener.(*net.TCPListener)}
break
}
mPort = a.mport
if managementListener == nil {
vlog.Warningf("start management server failed for port range %s", startAndPortStr)
return
}
} else {
listener, err := net.Listen("tcp", ":"+strconv.Itoa(a.mport))
mPort = a.mport
if err != nil {
vlog.Infof("listen manage port %d failed:%s", a.mport, err.Error())
return
}
managementListener = motan.TCPKeepAliveListener{TCPListener: listener.(*net.TCPListener)}
}

motan.SetMport(mPort)
vlog.Infof("start listen manage for address: %s", managementListener.Addr().String())
err := http.Serve(managementListener, nil)
if err != nil {
Expand All @@ -1191,8 +1249,10 @@ func (a *Agent) mhandle(k string, h http.Handler) {
setAgentLock.Unlock()
}
http.HandleFunc(k, func(w http.ResponseWriter, r *http.Request) {
vlog.Infof("mport request: %s, address: %s", r.URL.Path, r.RemoteAddr)
if !PermissionCheck(r) {
w.Write([]byte("need permission!"))
vlog.Warningf("mport request no permission: %s, address: %s", r.URL.Path, r.RemoteAddr)
return
}
defer func() {
Expand Down
Loading

0 comments on commit f3ccb8b

Please sign in to comment.