forked from andeya/opay
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathopay.go
More file actions
121 lines (106 loc) · 2.68 KB
/
opay.go
File metadata and controls
121 lines (106 loc) · 2.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package opay
import (
"fmt"
"sync"
"github.com/jmoiron/sqlx"
)
type Opay struct {
metas map[string]*Meta
queue Queue //request queue
db *sqlx.DB //global database operation instance
*SettleFuncMap //global map of SettleFunc
*Floater
metasLock sync.RWMutex
}
func NewOpay(db *sqlx.DB, queueCapacity int, numOfDecimalPlaces int) *Opay {
opay := &Opay{
SettleFuncMap: globalSettleFuncMap,
db: db,
metas: make(map[string]*Meta),
Floater: NewFloater(numOfDecimalPlaces),
}
opay.queue = newOrderChan(queueCapacity, opay)
return opay
}
// 处理请求
func (opay *Opay) Do(req Request) *Response {
return <-opay.queue.Push(req)
}
func (opay *Opay) DB() *sqlx.DB {
return opay.db
}
// Opay start.
func (opay *Opay) Serve() {
if err := opay.db.Ping(); err != nil {
panic(err)
}
var maxRoutine = opay.queue.GetCap() / 5
if maxRoutine == 0 {
maxRoutine = 1
}
var src = make(chan struct{}, maxRoutine)
for {
// Gets an execute permission
src <- struct{}{}
// Read a request
// Unlimited wait
req := opay.queue.Pull()
var err error
// Gets the account balance operation function for the corresponding asset type.
var (
initiatorSettle SettleFunc
stakeholderSettle SettleFunc
)
initiatorSettle, err = opay.GetSettleFunc(req.Initiator.GetAid())
if err != nil {
// Returns if the operation interface of the specified asset account does not exist.
req.setError(err)
req.writeback()
continue
}
if req.Stakeholder != nil {
stakeholderSettle, err = opay.GetSettleFunc(req.Stakeholder.GetAid())
if err != nil {
// Returns if the operation interface of the specified asset account does not exist
req.setError(err)
req.writeback()
continue
}
}
// The order processing is performed by routing.
go func() {
var err error
defer func() {
r := recover()
if r != nil {
err = fmt.Errorf("opay panic: %v", r)
}
// Close the request, and mark the end of the request processing
req.setError(err)
req.writeback()
// Frees an execute permission
<-src
}()
if req.Tx == nil {
req.Tx, err = opay.db.Beginx()
if err != nil {
return
}
defer func() {
if err != nil {
req.Tx.Rollback()
} else {
req.Tx.Commit()
}
}()
}
err = req.Initiator.GetMeta().serve(&Context{
initiatorSettle: initiatorSettle,
stakeholderSettle: stakeholderSettle,
Request: req,
Response: req.response,
Floater: opay.Floater,
})
}()
}
}