This repository has been archived by the owner on Sep 2, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 6
/
main.go
265 lines (237 loc) · 7.1 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
package main
import (
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"strings"
"syscall"
"github.com/markbates/pkger"
flag "github.com/spf13/pflag"
"github.com/ztrue/shutdown"
)
const (
verboseCredentialErrors = true
)
var (
// Version is injected at build time via the '-X' linker flag
// https://golang.org/cmd/link/#hdr-Command_Line
Version string
flags struct {
version bool
listenPort int
defaultTargetPort int
batchSize int
template string
extension string
debug bool
streamRoleArn string
streamName string
}
client = &http.Client{}
telemetry telemetryAgent
)
func check(err error) {
if err != nil {
log.Fatal(err)
}
}
func logErr(err error) {
if err != nil {
log.Println(err)
}
}
func logWarn(msg string, v ...interface{}) {
if len(v) > 0 {
println(msg)
log.Printf("[WARN] "+msg+"\n", v...)
} else {
log.Printf("[WARN] " + msg + "\n")
}
}
func logDebug(msg string, v ...interface{}) {
if flags.debug {
if len(v) > 0 {
log.Printf("[DEBUG] "+msg+"\n", v...)
} else {
log.Printf("[DEBUG] " + msg + "\n")
}
}
}
func readFlags() {
// Overrides the default message of "pflag: help requested" in the case of -h / --help
flag.ErrHelp = errors.New("")
flag.Usage = func() {
fmt.Fprintf(os.Stderr, "Usage of replay-zero:\n")
flag.PrintDefaults()
}
flag.BoolVarP(&flags.version, "version", "V", false, "Print version info and exit")
flag.IntVarP(&flags.listenPort, "listen-port", "l", 9000, "The port the Replay Zero proxy will listen on")
flag.IntVarP(&flags.defaultTargetPort, "target-port", "p", 8080, "The port the Replay Zero proxy will forward to on localhost")
flag.BoolVar(&flags.debug, "debug", false, "Set logging to also print debug messages")
flag.IntVarP(&flags.batchSize, "batch-size", "b", 1, "Buffer events before writing out to a file")
flag.StringVarP(&flags.template, "template", "t", "karate", "Either [karate] or [gatling] or [path/to/custom/template]")
flag.StringVarP(&flags.extension, "extension", "e", "", "For custom output template")
flag.StringVarP(&flags.streamName, "stream-name", "s", "", "AWS Kinesis Stream name (streaming mode only)")
flag.StringVarP(&flags.streamRoleArn, "stream-role-arn", "r", "", "AWS Kinesis Stream ARN (streaming mode only)")
flag.Parse()
if flags.version {
fmt.Printf("replay-zero version %s\n", Version)
os.Exit(0)
}
if flags.batchSize == 0 {
log.Println("Batch size cannot be zero! Using batch=1")
flags.batchSize = 1
} else if flags.batchSize > 1 {
log.Printf("Using fixed batch size of %d\n", flags.batchSize)
}
// check if template exist at the provided path
// TODO: add validation for correctness of template
if !(flags.template == "karate" || flags.template == "gatling") {
_, err := ioutil.ReadFile(flags.template)
if err != nil {
log.Printf("Failed to load template")
panic(err)
}
if flags.extension == "" {
log.Fatal("For custom template, output extension is expected to be passed using --extension or -e flag.")
os.Exit(0)
}
}
}
// get embeded template file content
func getPkgTemplate(filePath string) string {
f, err := pkger.Open(filePath)
if err != nil {
panic(err)
}
defer f.Close()
info, err := f.Stat()
if err != nil {
panic(err)
}
b := make([]byte, info.Size())
content, err := f.Read(b)
if err != nil {
panic(err)
}
return string(b[:content])
}
func getFormat(template string, extension string) outputFormat {
switch template {
case "karate":
return outputFormat{
template: getPkgTemplate("/templates/karate_default.template"),
extension: "feature",
}
case "gatling":
return outputFormat{
template: getPkgTemplate("/templates/gatling_default.template"),
extension: "scala",
}
default:
dat, err := ioutil.ReadFile(template)
if err != nil {
panic(err)
}
return outputFormat{
template: string(dat),
extension: extension,
}
}
}
func buildNewTargetURL(req *http.Request) string {
scheme := req.URL.Scheme
if scheme == "" {
scheme = "http"
}
host := req.URL.Host
if host == "" {
host = fmt.Sprintf("localhost:%d", flags.defaultTargetPort)
}
return fmt.Sprintf("%s://%s%s", scheme, host, req.URL.Path)
}
// A higher-order function that accepts an HTTPEvent handler and
// returns a network interceptor that passes parses + builds an HTTPEvent
// and passes that on to the parametrized handler for further processing.
func createServerHandler(h eventHandler) func(http.ResponseWriter, *http.Request) {
return func(wr http.ResponseWriter, originalRequest *http.Request) {
// 1. Construct proxy request
newURL := buildNewTargetURL(originalRequest)
defer originalRequest.Body.Close()
originalBody, err := ioutil.ReadAll(originalRequest.Body)
if err != nil {
log.Printf("[ERROR] Could not read original request body: %v\n", err)
return
}
originalBodyString := string(originalBody)
request, err := http.NewRequest(originalRequest.Method, newURL, strings.NewReader(originalBodyString))
if err != nil {
log.Printf("[ERROR] Could not build the outgoing request: %v\n", err)
return
}
request.Header = originalRequest.Header
// 2. Execute proxy request
response, err := client.Do(request)
if err != nil {
log.Printf("[ERROR] Could not process HTTP request to target: %v\n", err)
return
}
// 3. Copy data for proxy response
for k, v := range response.Header {
wr.Header().Set(k, strings.Join(v, ","))
}
wr.WriteHeader(response.StatusCode)
defer response.Body.Close()
originalRespBody, err := ioutil.ReadAll(response.Body)
if err != nil {
log.Printf("[ERROR] Could not read response body: %v\n", err)
return
}
_, err = io.Copy(wr, strings.NewReader(string(originalRespBody)))
if err != nil {
log.Printf("[ERROR] Could not create copy of response body: %v\n", err)
return
}
originalRespBodyString := string(originalRespBody)
// 4. Parse request + response data and pass on to event handler
event, err := convertRequestResponse(request, response, originalBodyString, originalRespBodyString)
if err != nil {
log.Printf("[ERROR] Could not convert request and response: %v\n", err)
return
}
log.Printf("Saw event:\n%s %s %s", event.PairID, event.HTTPMethod, event.Endpoint)
h.handleEvent(event)
}
}
func main() {
readFlags()
telemetry = getTelemetryAgent()
go telemetry.logUsage(telemetryUsageOpen)
var h eventHandler
if len(flags.streamName) > 0 {
if len(flags.streamRoleArn) == 0 {
log.Println("AWS Kinesis Stream ARN and name required for streaming mode")
os.Exit(1)
}
log.Println("Running ONLINE, sending recorded events to Kinesis")
h = getOnlineHandler(flags.streamName, flags.streamRoleArn)
} else {
log.Printf("Running OFFLINE, writing out events to %s files\n", flags.template)
h = getOfflineHandler(flags.template, flags.extension)
}
shutdown.Add(func() {
log.Println("Cleaning up...")
h.flushBuffer()
})
http.HandleFunc("/", createServerHandler(h))
listenAddr := fmt.Sprintf("localhost:%d", flags.listenPort)
log.Println("Proxy listening on " + listenAddr)
go func() {
log.Fatal(http.ListenAndServe(listenAddr, nil))
}()
shutdown.Listen(syscall.SIGINT)
}