Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge dev & release #405

Merged
merged 36 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
f1309c3
add weighted_lb feature
Apr 2, 2024
06ac7ee
add weighted_lb feature
Apr 2, 2024
afb36c2
add weighted_lb feature
Apr 2, 2024
2ca3289
fix_rpc_async_call
Dec 27, 2023
938ec49
Merge pull request #390 from cocowh/feat/runtime_info_wh
rayzhang0603 Apr 3, 2024
7394008
add weighted_lb feature
Apr 3, 2024
399a671
Merge branch 'dev' into weighted_lb
Apr 3, 2024
48275d7
add weighted_lb feature
Apr 3, 2024
1a28f02
add weighted_lb feature
Apr 3, 2024
7af8144
add weighted_lb feature
Apr 3, 2024
cae619f
add weighted_lb feature
Apr 3, 2024
a4484e4
add weighted_lb feature
Apr 3, 2024
ad5ae82
add weighted_lb feature
Apr 3, 2024
ea5fe03
beautify code
Apr 7, 2024
4004aeb
beautify code
Apr 7, 2024
bc604a1
Merge pull request #394 from Hoofffman/weighted_lb
rayzhang0603 Apr 7, 2024
e545679
modify canServe and make motanV1Compatible compatible with motan2, mo…
Apr 7, 2024
45ad8bd
Merge pull request #395 from Hoofffman/can_serve
rayzhang0603 Apr 8, 2024
1f620a4
export MSContext's context with GetContext
Apr 8, 2024
3115d01
ep refresher finish chan add close protection
Apr 8, 2024
adcf990
meta get remote meta compatible with empty map
Apr 8, 2024
8aff236
remove motanV1Compatible endpoint
Apr 9, 2024
1537870
context parseRefers compatible with motanV1Compatible
Apr 9, 2024
f4df51e
Merge pull request #396 from Hoofffman/export_get_context
rayzhang0603 Apr 10, 2024
7659eba
fix:heartbeat not stop
May 6, 2024
13c3dc9
Merge pull request #397 from cocowh/fix/endpoint_heartbeat
rayzhang0603 May 7, 2024
cbff1ba
fix:all groups are appended with RPC_REG_GROUP_SUFFIX
Jun 5, 2024
db5aa0a
Merge pull request #399 from cocowh/fix/reg_group_suffix
rayzhang0603 Jun 11, 2024
30745c1
fix motanCommonEndpoint.IsAvailable()
Jun 19, 2024
9a0f42f
Merge pull request #400 from cocowh/fix/endpoint
rayzhang0603 Jun 19, 2024
7877647
fix
Aug 29, 2024
b63a532
Merge pull request #403 from cocowh/fix_syncpoll_reuse
rayzhang0603 Aug 30, 2024
88d97eb
add mport access log
Sep 5, 2024
fa31bb4
Merge pull request #404 from weibocom/opt/mport_log
rayzhang0603 Sep 5, 2024
12aa8ad
Merge branch 'master' into dev
Sep 9, 2024
b035daf
update version
Sep 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
testing:
strategy:
matrix:
go-version: [1.12.x,1.13.x,1.14.x,1.15.x,1.16.x,1.17.x,1.18.x,1.19.x,1.20.x,1.21.x]
go-version: [1.16.x,1.17.x,1.18.x,1.19.x,1.20.x,1.21.x]
platform: [ubuntu-latest]
runs-on: ${{ matrix.platform }}
steps:
Expand All @@ -45,10 +45,10 @@ jobs:
name: codecov
runs-on: ubuntu-latest
steps:
- name: Set up Go 1.15
- name: Set up Go 1.16
uses: actions/setup-go@v3
with:
go-version: 1.15.x
go-version: 1.16.x
id: go

- name: Checkout code
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ zj*
# gdb
*.gdb_history
__*
*.log

# motan-go
*.pid
Expand All @@ -40,4 +41,4 @@ main/magent*
log/log.test*
go.sum
agent_runtime
test/
test/
114 changes: 87 additions & 27 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"github.com/weibocom/motan-go/endpoint"
vlog "github.com/weibocom/motan-go/log"
"github.com/weibocom/motan-go/meta"
"github.com/weibocom/motan-go/provider"
"gopkg.in/yaml.v2"
"io/ioutil"
"net"
Expand Down Expand Up @@ -193,6 +195,8 @@ func (a *Agent) StartMotanAgentFromConfig(config *cfg.Config) {
return
}
fmt.Println("init agent context success.")
// initialize meta package
meta.Initialize(a.Context)
a.initParam()
a.SetSanpshotConf()
a.initAgentURL()
Expand Down Expand Up @@ -225,7 +229,7 @@ func (a *Agent) StartMotanAgentFromConfig(config *cfg.Config) {

func (a *Agent) startRegistryFailback() {
vlog.Infoln("start agent failback")
ticker := time.NewTicker(registry.DefaultFailbackInterval * time.Millisecond)
ticker := time.NewTicker(time.Duration(registry.GetFailbackInterval()) * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
a.registryLock.Lock()
Expand Down Expand Up @@ -265,22 +269,8 @@ func (a *Agent) GetRegistryStatus() []map[string]*motan.RegistryStatus {
}

func (a *Agent) registerStatusSampler() {
metrics.RegisterStatusSampleFunc("memory", func() int64 {
p, _ := process.NewProcess(int32(os.Getpid()))
memInfo, err := p.MemoryInfo()
if err != nil {
return 0
}
return int64(memInfo.RSS >> 20)
})
metrics.RegisterStatusSampleFunc("cpu", func() int64 {
p, _ := process.NewProcess(int32(os.Getpid()))
cpuPercent, err := p.CPUPercent()
if err != nil {
return 0
}
return int64(cpuPercent)
})
metrics.RegisterStatusSampleFunc("memory", GetRssMemory)
metrics.RegisterStatusSampleFunc("cpu", GetCpuPercent)
metrics.RegisterStatusSampleFunc("goroutine_count", func() int64 {
return int64(runtime.NumGoroutine())
})
Expand All @@ -289,6 +279,24 @@ func (a *Agent) registerStatusSampler() {
})
}

func GetRssMemory() int64 {
p, _ := process.NewProcess(int32(os.Getpid()))
memInfo, err := p.MemoryInfo()
if err != nil {
return 0
}
return int64(memInfo.RSS >> 20)
}

func GetCpuPercent() int64 {
p, _ := process.NewProcess(int32(os.Getpid()))
cpuPercent, err := p.CPUPercent()
if err != nil {
return 0
}
return int64(cpuPercent)
}

func (a *Agent) initStatus() {
if a.recover {
a.recoverStatus()
Expand Down Expand Up @@ -371,7 +379,7 @@ func (a *Agent) initParam() {
port = defaultPort
}

mPort := *motan.Mport
mPort := motan.GetMport()
if mPort == 0 {
if envMPort, ok := os.LookupEnv("mport"); ok {
if envMPortInt, err := strconv.Atoi(envMPort); err == nil {
Expand Down Expand Up @@ -629,6 +637,7 @@ func (a *Agent) initAgentURL() {
} else {
agentURL.Parameters[motan.ApplicationKey] = agentURL.Group
}
motan.SetApplication(agentURL.Parameters[motan.ApplicationKey])
if agentURL.Group == "" {
agentURL.Group = defaultAgentGroup
agentURL.Parameters[motan.ApplicationKey] = defaultAgentGroup
Expand All @@ -652,8 +661,9 @@ func (a *Agent) initAgentURL() {
func (a *Agent) startAgent() {
url := a.agentURL.Copy()
url.Port = a.port
url.Protocol = mserver.Motan2
handler := &agentMessageHandler{agent: a}
server := &mserver.MotanServer{URL: url}
server := defaultExtFactory.GetServer(url)
server.SetMessageHandler(handler)
vlog.Infof("Motan agent is started. port:%d", a.port)
fmt.Println("Motan agent start.")
Expand All @@ -673,7 +683,7 @@ func (a *Agent) registerAgent() {
if agentURL.Host == "" {
agentURL.Host = motan.GetLocalIP()
}
if registryURL, regexit := a.Context.RegistryURLs[reg]; regexit {
if registryURL, regExist := a.Context.RegistryURLs[reg]; regExist {
registry := a.extFactory.GetRegistry(registryURL)
if registry != nil {
vlog.Infof("agent register in registry:%s, agent url:%s", registry.GetURL().GetIdentity(), agentURL.GetIdentity())
Expand All @@ -697,6 +707,16 @@ type agentMessageHandler struct {
agent *Agent
}

func (a *agentMessageHandler) GetName() string {
return "agentMessageHandler"
}

func (a *agentMessageHandler) GetRuntimeInfo() map[string]interface{} {
info := map[string]interface{}{}
info[motan.RuntimeMessageHandlerTypeKey] = a.GetName()
return info
}

func (a *agentMessageHandler) clusterCall(request motan.Request, ck string, motanCluster *cluster.MotanCluster) (res motan.Response) {
// fill default request info
fillDefaultReqInfo(request, motanCluster.GetURL())
Expand Down Expand Up @@ -747,8 +767,8 @@ func (a *agentMessageHandler) httpCall(request motan.Request, ck string, httpClu
}
httpRequest := fasthttp.AcquireRequest()
httpResponse := fasthttp.AcquireResponse()
// do not release http response
defer fasthttp.ReleaseRequest(httpRequest)
defer fasthttp.ReleaseResponse(httpResponse)
httpRequest.Header.Del("Host")
httpRequest.SetHost(originalService)
httpRequest.URI().SetPath(request.GetMethod())
Expand Down Expand Up @@ -886,6 +906,9 @@ func (a *Agent) unavailableAllServices() {
func (a *Agent) doExportService(url *motan.URL) {
a.svcLock.Lock()
defer a.svcLock.Unlock()
if _, ok := a.serviceExporters.Load(url.GetIdentityWithRegistry()); ok {
return
}

globalContext := a.Context
exporter := &mserver.DefaultExporter{}
Expand Down Expand Up @@ -932,11 +955,38 @@ func (a *Agent) doExportService(url *motan.URL) {
}

type serverAgentMessageHandler struct {
providers *motan.CopyOnWriteMap
providers *motan.CopyOnWriteMap
frameworkProviders *motan.CopyOnWriteMap
}

func (sa *serverAgentMessageHandler) GetName() string {
return "serverAgentMessageHandler"
}

func (sa *serverAgentMessageHandler) GetRuntimeInfo() map[string]interface{} {
info := map[string]interface{}{}
info[motan.RuntimeMessageHandlerTypeKey] = sa.GetName()
providersInfo := map[string]interface{}{}
sa.providers.Range(func(k, v interface{}) bool {
provider, ok := v.(motan.Provider)
if !ok {
return true
}
providersInfo[k.(string)] = provider.GetRuntimeInfo()
return true
})
info[motan.RuntimeProvidersKey] = providersInfo
return info
}

func (sa *serverAgentMessageHandler) Initialize() {
sa.providers = motan.NewCopyOnWriteMap()
sa.frameworkProviders = motan.NewCopyOnWriteMap()
sa.initFrameworkServiceProvider()
}

func (sa *serverAgentMessageHandler) initFrameworkServiceProvider() {
sa.frameworkProviders.Store(meta.MetaServiceName, &provider.MetaProvider{})
}

func getServiceKey(group, path string) string {
Expand All @@ -954,15 +1004,22 @@ func (sa *serverAgentMessageHandler) Call(request motan.Request) (res motan.Resp
group = request.GetAttachment(motan.GroupKey)
}
serviceKey := getServiceKey(group, request.GetServiceName())
if mfs := request.GetAttachment(mpro.MFrameworkService); mfs != "" {
if fp, ok := sa.frameworkProviders.Load(request.GetServiceName()); ok {
return fp.(motan.Provider).Call(request)
}
//throw specific exception to avoid triggering forced fusing on the client side。
return motan.BuildExceptionResponse(request.GetRequestID(), &motan.Exception{ErrCode: 501, ErrMsg: motan.ServiceNotSupport, ErrType: motan.ServiceException})
}
if p := sa.providers.LoadOrNil(serviceKey); p != nil {
p := p.(motan.Provider)
res = p.Call(request)
res.GetRPCContext(true).GzipSize = int(p.GetURL().GetIntValue(motan.GzipSizeKey, 0))
return res
}
vlog.Errorf("not found provider for %s", motan.GetReqInfo(request))
vlog.Errorf("%s%s, %s", motan.ProviderNotExistPrefix, serviceKey, motan.GetReqInfo(request))
atomic.AddInt64(&notFoundProviderCount, 1)
return motan.BuildExceptionResponse(request.GetRequestID(), &motan.Exception{ErrCode: 500, ErrMsg: "not found provider for " + serviceKey, ErrType: motan.ServiceException})
return motan.BuildExceptionResponse(request.GetRequestID(), &motan.Exception{ErrCode: motan.EProviderNotExist, ErrMsg: motan.ProviderNotExistPrefix + serviceKey, ErrType: motan.ServiceException})
}

func (sa *serverAgentMessageHandler) AddProvider(p motan.Provider) error {
Expand Down Expand Up @@ -1119,13 +1176,12 @@ func (a *Agent) startMServer() {
for kk, vv := range v {
handlers[kk] = vv
}

}
}
for k, v := range handlers {
a.mhandle(k, v)
}

var mPort int
var managementListener net.Listener
if managementUnixSockAddr := a.agentURL.GetParam(motan.ManagementUnixSockKey, ""); managementUnixSockAddr != "" {
listener, err := motan.ListenUnixSock(managementUnixSockAddr)
Expand Down Expand Up @@ -1159,19 +1215,21 @@ func (a *Agent) startMServer() {
managementListener = motan.TCPKeepAliveListener{TCPListener: listener.(*net.TCPListener)}
break
}
mPort = a.mport
if managementListener == nil {
vlog.Warningf("start management server failed for port range %s", startAndPortStr)
return
}
} else {
listener, err := net.Listen("tcp", ":"+strconv.Itoa(a.mport))
mPort = a.mport
if err != nil {
vlog.Infof("listen manage port %d failed:%s", a.mport, err.Error())
return
}
managementListener = motan.TCPKeepAliveListener{TCPListener: listener.(*net.TCPListener)}
}

motan.SetMport(mPort)
vlog.Infof("start listen manage for address: %s", managementListener.Addr().String())
err := http.Serve(managementListener, nil)
if err != nil {
Expand All @@ -1191,8 +1249,10 @@ func (a *Agent) mhandle(k string, h http.Handler) {
setAgentLock.Unlock()
}
http.HandleFunc(k, func(w http.ResponseWriter, r *http.Request) {
vlog.Infof("mport request: %s, address: %s", r.URL.Path, r.RemoteAddr)
if !PermissionCheck(r) {
w.Write([]byte("need permission!"))
vlog.Warningf("mport request no permission: %s, address: %s", r.URL.Path, r.RemoteAddr)
return
}
defer func() {
Expand Down
Loading
Loading