Skip to content

Commit

Permalink
Merge pull request #397 from cocowh/fix/endpoint_heartbeat
Browse files Browse the repository at this point in the history
fix:heartbeat not stop
  • Loading branch information
rayzhang0603 committed May 7, 2024
2 parents f4df51e + 7659eba commit 13c3dc9
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 22 deletions.
15 changes: 9 additions & 6 deletions endpoint/motanCommonEndpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type MotanCommonEndpoint struct {
channels *ChannelPool
destroyed bool
destroyCh chan struct{}
available bool
available atomic.Value
errorCount uint32
proxy bool
errorCountThreshold int64
Expand Down Expand Up @@ -68,7 +68,7 @@ func (m *MotanCommonEndpoint) GetRuntimeInfo() map[string]interface{} {
}

func (m *MotanCommonEndpoint) setAvailable(available bool) {
m.available = available
m.available.Store(available)
}

func (m *MotanCommonEndpoint) SetSerialization(s motan.Serialization) {
Expand Down Expand Up @@ -306,15 +306,17 @@ func (m *MotanCommonEndpoint) keepalive() {
if atomic.LoadUint32(&m.keepaliveType) == KeepaliveProfile {
m.profile()
} else {
m.heartbeat()
if m.heartbeat() {
return
}
}
case <-m.destroyCh:
return
}
}
}

func (m *MotanCommonEndpoint) heartbeat() {
func (m *MotanCommonEndpoint) heartbeat() bool {
if channel, err := m.channels.Get(); err != nil {
vlog.Infof("[keepalive] failed. url:%s, err:%s", m.url.GetIdentity(), err.Error())
} else {
Expand All @@ -323,10 +325,11 @@ func (m *MotanCommonEndpoint) heartbeat() {
m.setAvailable(true)
m.resetErr()
vlog.Infof("[keepalive] heartbeat success. url: %s", m.url.GetIdentity())
return
return true
}
vlog.Infof("[keepalive] heartbeat failed. url:%s, err:%s", m.url.GetIdentity(), err.Error())
}
return false
}

func (m *MotanCommonEndpoint) profile() {
Expand Down Expand Up @@ -357,7 +360,7 @@ func (m *MotanCommonEndpoint) SetURL(url *motan.URL) {
}

func (m *MotanCommonEndpoint) IsAvailable() bool {
return m.available
return m.available.Load().(bool)
}

type Channel struct {
Expand Down
26 changes: 19 additions & 7 deletions endpoint/motanCommonEndpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/weibocom/motan-go/serialize"
"net"
"runtime"
"strconv"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -55,9 +56,10 @@ func TestV1RecordErrEmptyThreshold(t *testing.T) {
}

func TestV1RecordErrWithErrThreshold(t *testing.T) {
errorCountThreshold := 5
url := &motan.URL{Port: 8989, Protocol: "motan"}
url.PutParam(motan.TimeOutKey, "100")
url.PutParam(motan.ErrorCountThresholdKey, "5")
url.PutParam(motan.TimeOutKey, "1100")
url.PutParam(motan.ErrorCountThresholdKey, strconv.Itoa(errorCountThreshold))
url.PutParam(motan.ClientConnectionKey, "1")
url.PutParam(motan.AsyncInitConnection, "false")
ep := &MotanCommonEndpoint{}
Expand All @@ -66,15 +68,25 @@ func TestV1RecordErrWithErrThreshold(t *testing.T) {
ep.SetSerialization(&serialize.SimpleSerialization{})
ep.Initialize()
assert.Equal(t, 1, ep.clientConnection)
for j := 0; j < 10; j++ {
for j := 0; j < errorCountThreshold; j++ {
request := &motan.MotanRequest{ServiceName: "test", Method: "test"}
request.Attachment = motan.NewStringMap(0)
ep.Call(request)
if j < 4 {
request.Attachment.Store("exception", "other_exception")
request.Attachment.Store("sleep_time", "200")
resp := ep.Call(request)
assert.NotNil(t, resp)
assert.NotNil(t, resp.GetException())
assert.Equal(t, motan.EUnkonwnMsg, resp.GetException().ErrCode)
if j < errorCountThreshold-1 {
assert.True(t, ep.IsAvailable())
} else {
assert.False(t, ep.IsAvailable())
assert.Equal(t, KeepaliveHeartbeat, atomic.LoadUint32(&ep.keepaliveType))
// wait keepalive goroutine set keepaliveRunning status
time.Sleep(time.Millisecond * 100)
assert.True(t, ep.keepaliveRunning.Load().(bool))
time.Sleep(ep.keepaliveInterval * 2)
assert.True(t, ep.IsAvailable())
assert.False(t, ep.keepaliveRunning.Load().(bool))
}
}
<-ep.channels.channels
Expand All @@ -90,7 +102,7 @@ func TestV1RecordErrWithErrThreshold(t *testing.T) {

func TestNotFoundProviderCircuitBreaker(t *testing.T) {
url := &motan.URL{Port: 8989, Protocol: "motan"}
url.PutParam(motan.TimeOutKey, "2000")
url.PutParam(motan.TimeOutKey, "1200")
url.PutParam(motan.ErrorCountThresholdKey, "5")
url.PutParam(motan.ClientConnectionKey, "10")
url.PutParam(motan.AsyncInitConnection, "false")
Expand Down
31 changes: 22 additions & 9 deletions endpoint/motanEndpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,15 +319,17 @@ func handle(netListen net.Listener) {
}

func handleConnection(conn net.Conn, timeout int) {
reader := bufio.NewReader(conn)
decodeBuf := make([]byte, 100)
msg, err := protocol.Decode(reader, &decodeBuf)
if err != nil {
time.Sleep(time.Millisecond * 1000)
conn.Close()
return
for {
reader := bufio.NewReader(conn)
decodeBuf := make([]byte, 100)
msg, err := protocol.Decode(reader, &decodeBuf)
if err != nil {
time.Sleep(time.Millisecond * 1000)
conn.Close()
return
}
processMsg(msg, conn)
}
processMsg(msg, conn)
}

func processMsg(msg *protocol.Message, conn net.Conn) {
Expand All @@ -342,7 +344,12 @@ func processMsg(msg *protocol.Message, conn net.Conn) {
if msg.Header.IsHeartbeat() {
res = protocol.BuildHeartbeat(msg.Header.RequestID, protocol.Res)
} else {
time.Sleep(time.Millisecond * 1000)
sleepTimeStr, _ := msg.Metadata.Load("sleep_time")
sleepTimeValue, _ := strconv.Atoi(sleepTimeStr)
if sleepTimeValue <= 0 {
sleepTimeValue = 1000
}
time.Sleep(time.Millisecond * time.Duration(sleepTimeValue))
var resp *motan.MotanResponse
e, _ := msg.Metadata.Load("exception")
switch e {
Expand All @@ -352,6 +359,12 @@ func processMsg(msg *protocol.Message, conn net.Conn) {
ErrCode: motan.EProviderNotExist,
ErrMsg: motan.ProviderNotExistPrefix,
ErrType: motan.ServiceException})
case "other_exception":
resp = motan.BuildExceptionResponse(lastRequestID,
&motan.Exception{
ErrCode: motan.EUnkonwnMsg,
ErrMsg: "exception",
ErrType: motan.ServiceException})
default:
resp = &motan.MotanResponse{
RequestID: lastRequestID,
Expand Down

0 comments on commit 13c3dc9

Please sign in to comment.