-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathevents_test.go
94 lines (75 loc) · 1.99 KB
/
events_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package http
import (
"context"
"encoding/json"
"log"
"net/http"
"os"
"strings"
"testing"
"time"
_ "embed"
"github.com/r3labs/sse"
)
//go:embed testcases/events.json
var testcasesEvents []byte
var testStreamId = "stream"
func TestEvents(t *testing.T) {
// events cannot be tested with prism.Mock since it does
// not support sse streams.
server := sse.New()
server.CreateStream(testStreamId)
doneCh := make(chan struct{})
handler := func(m *http.ServeMux) {
m.HandleFunc("/eth/v1/events", func(w http.ResponseWriter, r *http.Request) {
// r3labs/sse requires an url field 'stream' with the name of the stream to subscribe.
// Unsure if that is part of the official sse spec or something specific of r3labs/sse
q := r.URL.Query()
q.Set("stream", testStreamId)
r.URL.RawQuery = q.Encode()
close(doneCh)
server.HTTPHandler(w, r)
})
}
addr := newMockHttpServer(t, handler)
clt := New("http://"+addr, WithLogger(log.New(os.Stdout, "", 0)), WithUntrackedKeys())
errCh := make(chan error)
objCh := make(chan interface{})
go func() {
err := clt.Events(context.Background(), []string{"head"}, func(obj interface{}) {
objCh <- obj
})
errCh <- err
}()
// wait for the request to be made
select {
case <-doneCh:
case <-time.After(1 * time.Second):
t.Fatal("timeout to ping sse server")
}
var cases []struct {
Event string
Data json.RawMessage
}
if err := json.Unmarshal(testcasesEvents, &cases); err != nil {
t.Fatal(err)
}
for indx, c := range cases {
// we need to remove any '\n\t' since it is used by sse to split
// the message data during transport
data := string(c.Data)
data = strings.Replace(data, "\n", "", -1)
data = strings.Replace(data, "\t", "", -1)
server.Publish(testStreamId, &sse.Event{
Event: []byte(c.Event),
Data: []byte(data),
})
select {
case <-errCh:
t.Fatal("sse client closed unexpectedly")
case <-objCh:
case <-time.After(1 * time.Second):
t.Fatalf("message %d not received", indx)
}
}
}