-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtransport.go
166 lines (152 loc) · 4.13 KB
/
transport.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
package main
import (
"fmt"
"strconv"
"sync"
"time"
"github.com/valyala/fasthttp"
)
const (
gciHeader = "gci"
checkHeapHeader = "ch"
gcHeader = "gc"
)
type transport struct {
isAvailable bool
client *fasthttp.HostClient
protocolClient *fasthttp.HostClient
protocolTarget string
waiter pendingWaiter
window sampleWindow
st sheddingThreshold
printGC bool
mu sync.Mutex
checking bool
}
func timeMillis() int64 {
return time.Now().UnixNano() / int64(time.Millisecond)
}
func (t *transport) RoundTrip(ctx *fasthttp.RequestCtx) {
if !*disableGCI {
t.mu.Lock()
if !t.isAvailable {
t.mu.Unlock()
ctx.Error("", fasthttp.StatusServiceUnavailable)
return
}
t.waiter.requestArrived()
t.mu.Unlock()
}
ctx.Request.Header.Del("Connection")
if err := t.client.Do(&ctx.Request, &ctx.Response); err != nil {
panic(fmt.Sprintf("Problem calling proxy:%q", err))
}
ctx.Response.Header.Del("Connection")
if !*disableGCI {
if t.waiter.requestFinished()%t.window.size() == 0 {
// Yes, it is possible to call checkHeapAndGC twice
// before the service becomes unavailable.
t.mu.Lock()
if !t.checking {
t.checking = true
go t.checkHeapAndGC()
}
t.mu.Unlock()
}
}
}
// Request the agent to check the heap.
func (t *transport) callAgentCH() int64 {
req := fasthttp.AcquireRequest()
req.SetRequestURI(t.protocolTarget)
req.Header.Set(gciHeader, checkHeapHeader)
resp := fasthttp.AcquireResponse()
if err := t.protocolClient.Do(req, resp); err != nil {
panic(fmt.Sprintf("Err trying to check heap:%q\n", err))
}
if resp.StatusCode() != fasthttp.StatusOK {
panic(fmt.Sprintf("Heap check returned status code which is not 200:%v\n", resp.StatusCode()))
}
usage, err := strconv.ParseInt(string(resp.Body()), 10, 64)
if err != nil {
panic(fmt.Sprintf("Heap check returned buffer which could not be converted to int:%q\n", err))
}
fasthttp.ReleaseRequest(req)
fasthttp.ReleaseResponse(resp)
return usage
}
func (t *transport) callAgentGC() {
req := fasthttp.AcquireRequest()
req.SetRequestURI(t.protocolTarget)
req.Header.Set(gciHeader, "gc")
resp := fasthttp.AcquireResponse()
if err := t.protocolClient.Do(req, resp); err != nil {
panic(fmt.Sprintf("Err trying to GC:%q\n", err))
}
if resp.StatusCode() != fasthttp.StatusOK {
panic(fmt.Sprintf("GC returned status code which is not 200:%v\n", resp.StatusCode()))
}
fasthttp.ReleaseRequest(req)
fasthttp.ReleaseResponse(resp)
}
func (t *transport) checkHeapAndGC() {
defer func() {
t.mu.Lock()
t.checking = false
t.mu.Unlock()
}()
pendingTime, gcTime := int64(0), int64(0)
finished := t.waiter.finishedCount()
used := t.callAgentCH()
st := t.st.NextValue()
needGC := used > st
if needGC {
// Make the microsservice unavailable.
t.mu.Lock()
t.isAvailable = false
t.mu.Unlock()
// Wait for pending requests.
s := time.Now()
finished = t.waiter.waitPending()
pendingTime = time.Since(s).Nanoseconds() / 1e6
// Trigger GC.
s = time.Now()
t.callAgentGC()
gcTime = time.Since(s).Nanoseconds() / 1e6
// Update internal structures.
t.st.GC()
t.window.update(finished)
t.waiter.reset()
// Make the microsservice available again.
t.mu.Lock()
t.isAvailable = true
t.mu.Unlock()
}
if t.printGC {
fmt.Printf("%d,%t,%d,%d,%d,%d,%d\n", timeMillis(), needGC, finished, used, st, pendingTime, gcTime)
}
}
func newTransport(target string, yGen int64, printGC bool, gciTarget, gciCmdPath string) *transport {
if gciTarget == "" {
gciTarget = target
}
return &transport{
isAvailable: true,
client: &fasthttp.HostClient{
Addr: target,
Dial: fasthttp.Dial,
ReadTimeout: 120 * time.Second,
WriteTimeout: 120 * time.Second,
},
protocolClient: &fasthttp.HostClient{
Addr: gciTarget,
Dial: fasthttp.Dial,
ReadTimeout: 120 * time.Second,
WriteTimeout: 120 * time.Second,
},
protocolTarget: fmt.Sprintf("http://%s/%s", gciTarget, gciCmdPath),
window: newSampleWindow(time.Now().UnixNano()),
st: newSheddingThreshold(time.Now().UnixNano(), yGen),
printGC: printGC,
}
}