Skip to content

Commit

Permalink
refactor for always_keep_downconn
Browse files Browse the repository at this point in the history
  • Loading branch information
SwimmingTiger committed Nov 18, 2021
1 parent fb128bc commit de33474
Show file tree
Hide file tree
Showing 12 changed files with 255 additions and 35 deletions.
2 changes: 2 additions & 0 deletions Const.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,5 @@ const (

var UpSessionTLSConf = &tls.Config{
InsecureSkipVerify: true}

const FakeJobNotifyInterval = 30 * time.Second
2 changes: 2 additions & 0 deletions Errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ var (
)

var (
// StratumErrJobNotFound 任务不存在
StratumErrJobNotFound = NewStratumError(21, "Job not found (=stale)")
// StratumErrNeedAuthorized 需要认证
StratumErrNeedAuthorized = NewStratumError(24, "Unauthorized worker")
// StratumErrNeedSubscribed 需要订阅
Expand Down
12 changes: 12 additions & 0 deletions Event.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ type EventUpSessionInitFailed struct {
Slot int
}

type EventSetUpSession struct {
Session EventInterface
}

type EventAddStratumSession struct {
Session *StratumSession
}
Expand Down Expand Up @@ -62,3 +66,11 @@ type EventSendUpdateMinerNum struct{}
type EventStopUpSessionManager struct {
SubAccount string
}

type EventUpdateFakeJob struct {
FakeJob StratumJob
}

type EventTransferStratumSessions struct{}

type EventSendFakeNotify struct{}
2 changes: 2 additions & 0 deletions ExMessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ type ExMessageSubmitShare struct {

Time uint32
VersionMask uint32

IsFakeJob bool
}

func (msg *ExMessageSubmitShare) Serialize() []byte {
Expand Down
150 changes: 150 additions & 0 deletions FakeUpSession.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package main

import (
"time"

"github.com/golang/glog"
)

type FakeUpSession struct {
manager *UpSessionManager
stratumSessions map[uint32]*StratumSession
eventChannel chan interface{}

fakeJob *StratumJob
exitChannel chan bool
}

func NewFakeUpSession(manager *UpSessionManager) (up *FakeUpSession) {
up = new(FakeUpSession)
up.manager = manager
up.stratumSessions = make(map[uint32]*StratumSession)
up.eventChannel = make(chan interface{}, UpSessionChannelCache)
up.exitChannel = make(chan bool, 1)
return
}

func (up *FakeUpSession) Run() {
if up.manager.config.AlwaysKeepDownconn {
go up.fakeNotifyTicker()
}

up.handleEvent()
}

func (up *FakeUpSession) SendEvent(event interface{}) {
up.eventChannel <- event
}

func (up *FakeUpSession) addStratumSession(e EventAddStratumSession) {
up.stratumSessions[e.Session.sessionID] = e.Session

if up.manager.config.AlwaysKeepDownconn && up.fakeJob != nil {
up.fakeJob.ToNewFakeJob()
bytes, err := up.fakeJob.ToNotifyLine(true)
if err == nil {
e.Session.SendEvent(EventSendBytes{bytes})
} else {
glog.Warning("create notify bytes failed, ", err.Error(), ", fake job: ", up.fakeJob)
}
}
}

func (up *FakeUpSession) transferStratumSessions() {
for _, session := range up.stratumSessions {
up.manager.SendEvent(EventAddStratumSession{session})
}
// 清空map
up.stratumSessions = make(map[uint32]*StratumSession)
}

func (up *FakeUpSession) exit() {
if up.manager.config.AlwaysKeepDownconn {
up.exitChannel <- true
}

for _, session := range up.stratumSessions {
go session.SendEvent(EventExit{})
}
}

func (up *FakeUpSession) sendSubmitResponse(sessionID uint32, id interface{}, status StratumStatus) {
session, ok := up.stratumSessions[sessionID]
if !ok {
// 客户端已断开,忽略
glog.Info("cannot find session ", sessionID)
return
}
go session.SendEvent(EventSubmitResponse{id, status})
}

func (up *FakeUpSession) handleSubmitShare(e EventSubmitShare) {
up.sendSubmitResponse(uint32(e.Message.Base.SessionID), e.ID, STATUS_ACCEPT)
}

func (up *FakeUpSession) stratumSessionBroken(e EventStratumSessionBroken) {
delete(up.stratumSessions, e.SessionID)
}

func (up *FakeUpSession) updateFakeJob(e EventUpdateFakeJob) {
up.fakeJob = &e.FakeJob
}

func (up *FakeUpSession) fakeNotifyTicker() {
ticker := time.NewTicker(FakeJobNotifyInterval)
defer ticker.Stop()

for {
select {
case <-up.exitChannel:
return
case <-ticker.C:
up.SendEvent(EventSendFakeNotify{})
}
}
}

func (up *FakeUpSession) sendFakeNotify() {
if up.fakeJob == nil || len(up.stratumSessions) < 1 {
return
}

up.fakeJob.ToNewFakeJob()

bytes, err := up.fakeJob.ToNotifyLine(false)
if err != nil {
glog.Warning("create notify bytes failed, ", err.Error(), ", fake job: ", up.fakeJob)
return
}

for _, session := range up.stratumSessions {
go session.SendEvent(EventSendBytes{bytes})
}
}

func (up *FakeUpSession) handleEvent() {
for {
event := <-up.eventChannel

switch e := event.(type) {
case EventAddStratumSession:
up.addStratumSession(e)
case EventSubmitShare:
up.handleSubmitShare(e)
case EventStratumSessionBroken:
up.stratumSessionBroken(e)
case EventTransferStratumSessions:
up.transferStratumSessions()
case EventUpdateFakeJob:
up.updateFakeJob(e)
case EventSendFakeNotify:
up.sendFakeNotify()
case EventExit:
up.exit()
return

default:
glog.Error("Unknown event: ", e)
}
}
}
5 changes: 5 additions & 0 deletions Interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package main

type EventInterface interface {
SendEvent(e interface{})
}
18 changes: 18 additions & 0 deletions StratumJob.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"errors"
"fmt"
"time"
)

type StratumJob struct {
Expand Down Expand Up @@ -50,3 +51,20 @@ func (job *StratumJob) ToNotifyLine(firstJob bool) (bytes []byte, err error) {

return job.ToJSONBytesLine()
}

func IsFakeJobID(id string) bool {
return len(id) < 1 || id[0] == 'f'
}

func (job *StratumJob) ToNewFakeJob() {
now := uint64(time.Now().Unix())
job.ID = fmt.Sprintf("f%04d", now)

coinbase1, _ := job.Params[2].(string)
pos := len(coinbase1) - 8
if pos < 0 {
pos = 0
}

job.Params[2] = coinbase1[:pos] + Uint64ToHex(now)
}
29 changes: 21 additions & 8 deletions StratumSession.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

type StratumSession struct {
manager *StratumSessionManager // 会话管理器
upSession *UpSession // 所属的服务器会话
upSession EventInterface // 所属的服务器会话

sessionID uint32 // 会话ID
clientConn net.Conn // 到矿机的TCP连接
Expand Down Expand Up @@ -142,6 +142,11 @@ func (session *StratumSession) parseMiningSubmit(request *JSONRPCLine) (result i
return
}

if session.upSession == nil {
err = StratumErrJobNotFound
return
}

// params:
// [0] Worker Name
// [1] Job ID
Expand All @@ -163,12 +168,17 @@ func (session *StratumSession) parseMiningSubmit(request *JSONRPCLine) (result i
err = StratumErrIllegalParams
return
}
jobID, convErr := strconv.ParseUint(jobIDStr, 10, 8)
if convErr != nil {
err = StratumErrIllegalParams
return

if IsFakeJobID(jobIDStr) {
msg.IsFakeJob = true
} else {
jobID, convErr := strconv.ParseUint(jobIDStr, 10, 8)
if convErr != nil {
err = StratumErrIllegalParams
return
}
msg.Base.JobID = uint8(jobID)
}
msg.Base.JobID = uint8(jobID)

// [2] ExtraNonce2
extraNonce2Hex, ok := request.Params[2].(string)
Expand Down Expand Up @@ -354,8 +364,9 @@ func (session *StratumSession) versionMaskStr() string {
return fmt.Sprintf("%08x", session.versionMask)
}

func (session *StratumSession) SetUpSession(upSession *UpSession) {
session.upSession = upSession
func (session *StratumSession) setUpSession(e EventSetUpSession) {
session.upSession = e.Session
session.upSession.SendEvent(EventAddStratumSession{session})
}

func (session *StratumSession) handleRequest() {
Expand Down Expand Up @@ -446,6 +457,8 @@ func (session *StratumSession) handleEvent() {
event := <-session.eventChannel

switch e := event.(type) {
case EventSetUpSession:
session.setUpSession(e)
case EventRecvJSONRPC:
session.recvJSONRPC(e)
case EventSendBytes:
Expand Down
4 changes: 3 additions & 1 deletion StratumSessionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ func (manager *StratumSessionManager) RunStratumSession(conn net.Conn) {
return
}

go session.Run()

manager.SendEvent(EventAddStratumSession{session})
}

Expand All @@ -109,7 +111,7 @@ func (manager *StratumSessionManager) addStratumSession(e EventAddStratumSession
go upManager.Run()
manager.upSessionManagers[e.Session.subAccountName] = upManager
}
upManager.SendEvent(EventAddStratumSession{e.Session})
upManager.SendEvent(e)
}

func (manager *StratumSessionManager) stopUpSessionManager(e EventStopUpSessionManager) {
Expand Down
23 changes: 15 additions & 8 deletions UpSession.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ func (up *UpSession) sendInitRequest() (err error) {
return
}

func (up *UpSession) exit() {
up.stat = StatExit
up.close()
}

func (up *UpSession) close() {
if up.stat == StatAuthorized {
up.manager.SendEvent(EventUpSessionBroken{up.slot})
Expand Down Expand Up @@ -393,9 +398,6 @@ func (up *UpSession) addStratumSession(e EventAddStratumSession) {
up.stratumSessions[e.Session.sessionID] = e.Session
up.registerWorker(e.Session)

e.Session.SetUpSession(up)
go e.Session.Run()

if up.rpcSetVersionMask != nil && e.Session.versionMask != 0 {
e.Session.SendEvent(EventSendBytes{up.rpcSetVersionMask})
}
Expand Down Expand Up @@ -476,6 +478,11 @@ func (up *UpSession) recvJSONRPC(e EventRecvJSONRPC) {
}

func (up *UpSession) handleSubmitShare(e EventSubmitShare) {
if e.Message.IsFakeJob {
up.sendSubmitResponse(uint32(e.Message.Base.SessionID), e.ID, STATUS_ACCEPT)
return
}

data := e.Message.Serialize()
_, err := up.serverConn.Write(data)

Expand Down Expand Up @@ -562,18 +569,18 @@ func (up *UpSession) handleEvent() {
up.addStratumSession(e)
case EventSubmitShare:
up.handleSubmitShare(e)
case EventStratumSessionBroken:
up.stratumSessionBroken(e)
case EventSendUpdateMinerNum:
up.sendUpdateMinerNum()
case EventRecvJSONRPC:
up.recvJSONRPC(e)
case EventRecvExMessage:
up.recvExMessage(e)
case EventConnBroken:
up.close()
case EventExit:
up.close()
case EventStratumSessionBroken:
up.stratumSessionBroken(e)
case EventSendUpdateMinerNum:
up.sendUpdateMinerNum()
up.exit()
default:
glog.Error("Unknown event: ", e)
}
Expand Down
Loading

0 comments on commit de33474

Please sign in to comment.