Skip to content

Commit

Permalink
can response submitted shares
Browse files Browse the repository at this point in the history
  • Loading branch information
SwimmingTiger committed Nov 7, 2021
1 parent df62ca4 commit 9c3c13d
Show file tree
Hide file tree
Showing 8 changed files with 431 additions and 50 deletions.
8 changes: 5 additions & 3 deletions Errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,13 @@ var (

var (
// StratumErrNeedSubscribed 需要订阅
StratumErrNeedSubscribed = NewStratumError(101, "Need Subscribed")
StratumErrNeedSubscribed = NewStratumError(25, "Not subscribed")
// StratumErrIllegalParams 参数非法
StratumErrIllegalParams = NewStratumError(27, "Illegal params")
// StratumErrTooFewParams 参数太少
StratumErrTooFewParams = NewStratumError(27, "Too few params")
// StratumErrDuplicateSubscribed 重复订阅
StratumErrDuplicateSubscribed = NewStratumError(102, "Duplicate Subscribed")
// StratumErrTooFewParams 参数太少
StratumErrTooFewParams = NewStratumError(103, "Too Few Params")
// StratumErrWorkerNameMustBeString 矿工名必须是字符串
StratumErrWorkerNameMustBeString = NewStratumError(104, "Worker Name Must be a String")
// StratumErrWorkerNameStartWrong 矿工名开头错误
Expand Down
11 changes: 11 additions & 0 deletions Event.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,14 @@ type EventStratumSessionBroken struct {
type EventUpSessionBroken struct {
Slot int
}

type EventSubmitShare struct {
ID interface{}
SessionID uint32
Message *ExMessageSubmitShare
}

type EventSubmitResponse struct {
ID interface{}
Status StratumStatus
}
11 changes: 11 additions & 0 deletions ExMessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,14 @@ func (msg *ExMessageMiningSetDiff) Unserialize(data []byte) (err error) {
err = binary.Read(buf, binary.LittleEndian, msg.SessionIDs)
return
}

type ExMessageSubmitResponse struct {
Index uint16
Status StratumStatus
}

func (msg *ExMessageSubmitResponse) Unserialize(data []byte) (err error) {
buf := bytes.NewReader(data)
err = binary.Read(buf, binary.LittleEndian, msg)
return
}
10 changes: 5 additions & 5 deletions JSONRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,14 @@ func NewJSONRPCRequest(rpcJSON []byte) (rpcData *JSONRPCRequest, err error) {
return
}

// AddParam 向 JSONRPCRequest 对象添加一个或多个参数
func (rpcData *JSONRPCRequest) AddParam(param ...interface{}) {
// AddParams 向 JSONRPCRequest 对象添加一个或多个参数
func (rpcData *JSONRPCRequest) AddParams(param ...interface{}) {
rpcData.Params = append(rpcData.Params, param...)
}

// SetParam 设置 JSONRPCRequest 对象的参数
// 传递给 SetParam 的参数列表将按顺序复制到 JSONRPCRequest.Params 中
func (rpcData *JSONRPCRequest) SetParam(param ...interface{}) {
// SetParams 设置 JSONRPCRequest 对象的参数
// 传递给 SetParams 的参数列表将按顺序复制到 JSONRPCRequest.Params 中
func (rpcData *JSONRPCRequest) SetParams(param ...interface{}) {
rpcData.Params = param
}

Expand Down
147 changes: 129 additions & 18 deletions StratumSession.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type StratumSession struct {
stat AuthorizeStat // 认证状态

clientAgent string // 挖矿软件名称
fullWorkerName string // 完整的矿工名
fullName string // 完整的矿工名
subAccountName string // 子账户名部分
workerName string // 矿机名部分
versionMask uint32 // 比特币版本掩码(用于AsicBoost)
Expand Down Expand Up @@ -65,21 +65,16 @@ func (session *StratumSession) IP() string {

func (session *StratumSession) ID() string {
if session.stat == StatAuthorized {
return fmt.Sprintf("%s@%s", session.fullWorkerName, session.IP())
return fmt.Sprintf("%s@%s", session.fullName, session.IP())
}
return session.IP()
}

func (session *StratumSession) writeJSONResponseToClient(jsonData *JSONRPCResponse) (int, error) {
if session.stat == StatDisconnected {
return 0, ErrConnectionClosed
}

func (session *StratumSession) writeJSONResponse(jsonData *JSONRPCResponse) (int, error) {
bytes, err := jsonData.ToJSONBytesLine()
if err != nil {
return 0, err
}

return session.clientConn.Write(bytes)
}

Expand Down Expand Up @@ -107,21 +102,119 @@ func (session *StratumSession) stratumHandleRequest(request *JSONRPCLine, reques
// 让 Init() 函数返回
session.eventLoopRunning = false

glog.Info("miner authorized, session id: ", session.sessionID, ", IP: ", session.IP(), ", worker name: ", session.fullWorkerName)
glog.Info("miner authorized, session id: ", session.sessionID, ", IP: ", session.IP(), ", worker name: ", session.fullName)
}
return

case "mining.configure":
result, err = session.parseConfigureRequest(request)
return

case "mining.submit":
result, err = session.parseMiningSubmit(request)
if err != nil {
glog.Warning("stratum error, IP: ", session.IP(), ", worker: ", session.fullName, ", error: ", err, ", submit: ", string(requestJSON))
}
return

default:
// ignore unimplemented methods
glog.Warning("unknown request, IP: ", session.IP(), ", request: ", string(requestJSON))
return
}
}

func (session *StratumSession) parseMiningSubmit(request *JSONRPCLine) (result interface{}, err *StratumError) {
// params:
// [0] Worker Name
// [1] Job ID
// [2] ExtraNonce2
// [3] Time
// [4] Nonce
// [5] Version Mask

if len(request.Params) < 5 {
err = StratumErrTooFewParams
return
}

var msg ExMessageSubmitShare

// [1] Job ID
jobIDStr, ok := request.Params[1].(string)
if !ok {
err = StratumErrIllegalParams
return
}
jobID, convErr := strconv.ParseUint(jobIDStr, 10, 8)
if convErr != nil {
err = StratumErrIllegalParams
return
}
msg.Base.JobID = uint8(jobID)

// [2] ExtraNonce2
extraNonce2Hex, ok := request.Params[2].(string)
if !ok {
err = StratumErrIllegalParams
return
}
extraNonce, convErr := strconv.ParseUint(extraNonce2Hex, 16, 32)
if convErr != nil {
err = StratumErrIllegalParams
return
}
msg.Base.ExtraNonce2 = uint32(extraNonce)

// [3] Time
timeHex, ok := request.Params[3].(string)
if !ok {
err = StratumErrIllegalParams
return
}
time, convErr := strconv.ParseUint(timeHex, 16, 32)
if convErr != nil {
err = StratumErrIllegalParams
return
}
msg.Time = uint32(time)

// [4] Nonce
nonceHex, ok := request.Params[4].(string)
if !ok {
err = StratumErrIllegalParams
return
}
nonce, convErr := strconv.ParseUint(nonceHex, 16, 32)
if convErr != nil {
err = StratumErrIllegalParams
return
}
msg.Base.ExtraNonce2 = uint32(nonce)

// [5] Version Mask
if len(request.Params) >= 6 {
versionMaskHex, ok := request.Params[5].(string)
if !ok {
err = StratumErrIllegalParams
return
}
versionMask, convErr := strconv.ParseUint(versionMaskHex, 16, 32)
if convErr != nil {
err = StratumErrIllegalParams
return
}
msg.VersionMask = uint32(versionMask)
}

var e EventSubmitShare
e.ID = request.ID
e.SessionID = session.sessionID
e.Message = &msg
session.upSession.SendEvent(e)
return
}

func (session *StratumSession) parseSubscribeRequest(request *JSONRPCLine) (result interface{}, err *StratumError) {

if len(request.Params) >= 1 {
Expand Down Expand Up @@ -149,15 +242,15 @@ func (session *StratumSession) parseAuthorizeRequest(request *JSONRPCLine) (resu
}

// 矿工名
session.fullWorkerName = FilterWorkerName(fullWorkerName)
session.fullName = FilterWorkerName(fullWorkerName)

if strings.Contains(session.fullWorkerName, ".") {
if strings.Contains(session.fullName, ".") {
// 截取“.”之前的做为子账户名,“.”及之后的做矿机名
pos := strings.Index(session.fullWorkerName, ".")
session.subAccountName = session.fullWorkerName[:pos]
session.workerName = session.fullWorkerName[pos+1:]
pos := strings.Index(session.fullName, ".")
session.subAccountName = session.fullName[:pos]
session.workerName = session.fullName[pos+1:]
} else {
session.subAccountName = session.fullWorkerName
session.subAccountName = session.fullName
session.workerName = ""
}

Expand Down Expand Up @@ -185,8 +278,8 @@ func (session *StratumSession) parseConfigureRequest(request *JSONRPCLine) (resu
}

if options, ok := request.Params[1].(map[string]interface{}); ok {
if versionMaskI, ok := options["version-rolling.mask"]; ok {
if versionMaskStr, ok := versionMaskI.(string); ok {
if obj, ok := options["version-rolling.mask"]; ok {
if versionMaskStr, ok := obj.(string); ok {
versionMask, err := strconv.ParseUint(versionMaskStr, 16, 32)
if err == nil {
session.versionMask = uint32(versionMask)
Expand Down Expand Up @@ -250,7 +343,7 @@ func (session *StratumSession) recvJSONRPC(e EventRecvJSONRPC) {
response.Result = result
response.Error = stratumErr.ToJSONRPCArray(nil)

_, err := session.writeJSONResponseToClient(&response)
_, err := session.writeJSONResponse(&response)

if err != nil {
glog.Error("write JSON response failed, IP: ", session.IP(), ", error: ", err.Error())
Expand All @@ -277,6 +370,22 @@ func (session *StratumSession) sendBytes(e EventSendBytes) {
}
}

func (session *StratumSession) submitResponse(e EventSubmitResponse) {
var response JSONRPCResponse
response.ID = e.ID
if e.Status.IsAccepted() {
response.Result = true
} else {
response.Error = e.Status.ToJSONRPCArray(nil)
}

_, err := session.writeJSONResponse(&response)
if err != nil {
glog.Error("write submit response failed, IP: ", session.IP(), ", error: ", err.Error())
session.close()
}
}

func (session *StratumSession) handleEvent() {
session.eventLoopRunning = true
for session.eventLoopRunning {
Expand All @@ -287,6 +396,8 @@ func (session *StratumSession) handleEvent() {
session.recvJSONRPC(e)
case EventSendBytes:
session.sendBytes(e)
case EventSubmitResponse:
session.submitResponse(e)
case EventConnBroken:
session.close()
default:
Expand Down
4 changes: 2 additions & 2 deletions StratumSessionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ func (manager *StratumSessionManager) RunStratumSession(conn net.Conn) {
return
}

upManager, ok := manager.upSessionManagers[session.fullWorkerName]
upManager, ok := manager.upSessionManagers[session.fullName]
if !ok {
upManager = NewUpSessionManager(session.subAccountName, manager.config)
go upManager.Run()
manager.upSessionManagers[session.fullWorkerName] = upManager
manager.upSessionManagers[session.fullName] = upManager
// 等待连接就绪
time.Sleep(3 * time.Second)
}
Expand Down
Loading

0 comments on commit 9c3c13d

Please sign in to comment.