diff --git a/endpoint/motanCommonEndpoint.go b/endpoint/motanCommonEndpoint.go index 8f6a87a0..be0bfa91 100644 --- a/endpoint/motanCommonEndpoint.go +++ b/endpoint/motanCommonEndpoint.go @@ -37,7 +37,7 @@ type MotanCommonEndpoint struct { channels *ChannelPool destroyed bool destroyCh chan struct{} - available bool + available atomic.Value errorCount uint32 proxy bool errorCountThreshold int64 @@ -68,7 +68,7 @@ func (m *MotanCommonEndpoint) GetRuntimeInfo() map[string]interface{} { } func (m *MotanCommonEndpoint) setAvailable(available bool) { - m.available = available + m.available.Store(available) } func (m *MotanCommonEndpoint) SetSerialization(s motan.Serialization) { @@ -306,7 +306,9 @@ func (m *MotanCommonEndpoint) keepalive() { if atomic.LoadUint32(&m.keepaliveType) == KeepaliveProfile { m.profile() } else { - m.heartbeat() + if m.heartbeat() { + return + } } case <-m.destroyCh: return @@ -314,7 +316,7 @@ func (m *MotanCommonEndpoint) keepalive() { } } -func (m *MotanCommonEndpoint) heartbeat() { +func (m *MotanCommonEndpoint) heartbeat() bool { if channel, err := m.channels.Get(); err != nil { vlog.Infof("[keepalive] failed. url:%s, err:%s", m.url.GetIdentity(), err.Error()) } else { @@ -323,10 +325,11 @@ func (m *MotanCommonEndpoint) heartbeat() { m.setAvailable(true) m.resetErr() vlog.Infof("[keepalive] heartbeat success. url: %s", m.url.GetIdentity()) - return + return true } vlog.Infof("[keepalive] heartbeat failed. url:%s, err:%s", m.url.GetIdentity(), err.Error()) } + return false } func (m *MotanCommonEndpoint) profile() { @@ -357,7 +360,7 @@ func (m *MotanCommonEndpoint) SetURL(url *motan.URL) { } func (m *MotanCommonEndpoint) IsAvailable() bool { - return m.available + return m.available.Load().(bool) } type Channel struct { diff --git a/endpoint/motanCommonEndpoint_test.go b/endpoint/motanCommonEndpoint_test.go index 47d97540..edb70c98 100644 --- a/endpoint/motanCommonEndpoint_test.go +++ b/endpoint/motanCommonEndpoint_test.go @@ -8,6 +8,7 @@ import ( "github.com/weibocom/motan-go/serialize" "net" "runtime" + "strconv" "sync/atomic" "testing" "time" @@ -55,9 +56,10 @@ func TestV1RecordErrEmptyThreshold(t *testing.T) { } func TestV1RecordErrWithErrThreshold(t *testing.T) { + errorCountThreshold := 5 url := &motan.URL{Port: 8989, Protocol: "motan"} - url.PutParam(motan.TimeOutKey, "100") - url.PutParam(motan.ErrorCountThresholdKey, "5") + url.PutParam(motan.TimeOutKey, "1100") + url.PutParam(motan.ErrorCountThresholdKey, strconv.Itoa(errorCountThreshold)) url.PutParam(motan.ClientConnectionKey, "1") url.PutParam(motan.AsyncInitConnection, "false") ep := &MotanCommonEndpoint{} @@ -66,15 +68,25 @@ func TestV1RecordErrWithErrThreshold(t *testing.T) { ep.SetSerialization(&serialize.SimpleSerialization{}) ep.Initialize() assert.Equal(t, 1, ep.clientConnection) - for j := 0; j < 10; j++ { + for j := 0; j < errorCountThreshold; j++ { request := &motan.MotanRequest{ServiceName: "test", Method: "test"} request.Attachment = motan.NewStringMap(0) - ep.Call(request) - if j < 4 { + request.Attachment.Store("exception", "other_exception") + request.Attachment.Store("sleep_time", "200") + resp := ep.Call(request) + assert.NotNil(t, resp) + assert.NotNil(t, resp.GetException()) + assert.Equal(t, motan.EUnkonwnMsg, resp.GetException().ErrCode) + if j < errorCountThreshold-1 { assert.True(t, ep.IsAvailable()) } else { - assert.False(t, ep.IsAvailable()) assert.Equal(t, KeepaliveHeartbeat, atomic.LoadUint32(&ep.keepaliveType)) + // wait keepalive goroutine set keepaliveRunning status + time.Sleep(time.Millisecond * 100) + assert.True(t, ep.keepaliveRunning.Load().(bool)) + time.Sleep(ep.keepaliveInterval * 2) + assert.True(t, ep.IsAvailable()) + assert.False(t, ep.keepaliveRunning.Load().(bool)) } } <-ep.channels.channels @@ -90,7 +102,7 @@ func TestV1RecordErrWithErrThreshold(t *testing.T) { func TestNotFoundProviderCircuitBreaker(t *testing.T) { url := &motan.URL{Port: 8989, Protocol: "motan"} - url.PutParam(motan.TimeOutKey, "2000") + url.PutParam(motan.TimeOutKey, "1200") url.PutParam(motan.ErrorCountThresholdKey, "5") url.PutParam(motan.ClientConnectionKey, "10") url.PutParam(motan.AsyncInitConnection, "false") diff --git a/endpoint/motanEndpoint_test.go b/endpoint/motanEndpoint_test.go index 3897a9fa..545a0d88 100644 --- a/endpoint/motanEndpoint_test.go +++ b/endpoint/motanEndpoint_test.go @@ -319,15 +319,17 @@ func handle(netListen net.Listener) { } func handleConnection(conn net.Conn, timeout int) { - reader := bufio.NewReader(conn) - decodeBuf := make([]byte, 100) - msg, err := protocol.Decode(reader, &decodeBuf) - if err != nil { - time.Sleep(time.Millisecond * 1000) - conn.Close() - return + for { + reader := bufio.NewReader(conn) + decodeBuf := make([]byte, 100) + msg, err := protocol.Decode(reader, &decodeBuf) + if err != nil { + time.Sleep(time.Millisecond * 1000) + conn.Close() + return + } + processMsg(msg, conn) } - processMsg(msg, conn) } func processMsg(msg *protocol.Message, conn net.Conn) { @@ -342,7 +344,12 @@ func processMsg(msg *protocol.Message, conn net.Conn) { if msg.Header.IsHeartbeat() { res = protocol.BuildHeartbeat(msg.Header.RequestID, protocol.Res) } else { - time.Sleep(time.Millisecond * 1000) + sleepTimeStr, _ := msg.Metadata.Load("sleep_time") + sleepTimeValue, _ := strconv.Atoi(sleepTimeStr) + if sleepTimeValue <= 0 { + sleepTimeValue = 1000 + } + time.Sleep(time.Millisecond * time.Duration(sleepTimeValue)) var resp *motan.MotanResponse e, _ := msg.Metadata.Load("exception") switch e { @@ -352,6 +359,12 @@ func processMsg(msg *protocol.Message, conn net.Conn) { ErrCode: motan.EProviderNotExist, ErrMsg: motan.ProviderNotExistPrefix, ErrType: motan.ServiceException}) + case "other_exception": + resp = motan.BuildExceptionResponse(lastRequestID, + &motan.Exception{ + ErrCode: motan.EUnkonwnMsg, + ErrMsg: "exception", + ErrType: motan.ServiceException}) default: resp = &motan.MotanResponse{ RequestID: lastRequestID,