-
Notifications
You must be signed in to change notification settings - Fork 8
/
bindings_test.go
101 lines (75 loc) · 2.32 KB
/
bindings_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package amqprpc
import (
"context"
"fmt"
"testing"
"time"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestTopic(t *testing.T) {
s, c, start, stop := initTest(t)
defer stop()
wasCalled := make(chan struct{})
s.Bind(TopicBinding("my_topic", "foo.#", func(_ context.Context, _ *ResponseWriter, _ amqp.Delivery) {
wasCalled <- struct{}{}
}))
start()
_, err := c.Send(NewRequest().
WithRoutingKey("foo.bar.baz").
WithExchange("amq.topic").
WithResponse(false))
require.NoError(t, err)
select {
case <-wasCalled:
// Yay!
case <-time.After(1 * time.Second):
t.Fatal("timed out waiting for request")
}
}
func TestHeaders(t *testing.T) {
s, c, start, stop := initTest(t)
defer stop()
handler := func(_ context.Context, rw *ResponseWriter, _ amqp.Delivery) {
fmt.Fprintf(rw, "Hello, world")
}
h := amqp.Table{
"x-match": "all",
"foo": "bar",
}
s.Bind(HeadersBinding("my_queue", h, handler))
start()
// Ensure 'somewhere.*' matches 'somewhere.there'.
response, err := c.Send(NewRequest().WithExchange("amq.match").WithHeaders(amqp.Table{"foo": "bar"}))
require.NoError(t, err, "no errors occurred")
assert.Equal(t, []byte("Hello, world"), response.Body, "correct request body")
}
func TestSkipQueueDeclare(t *testing.T) {
s, c, start, stop := initTest(t)
defer stop()
handler := func(_ context.Context, rw *ResponseWriter, _ amqp.Delivery) {
fmt.Fprintf(rw, "Hello, world")
}
queueName := "test-skip-queue-declare"
s.Bind(DirectBinding(queueName, handler))
start()
response, err := c.Send(NewRequest().WithRoutingKey(queueName))
require.NoError(t, err, "no errors occurred")
require.Equal(t, []byte("Hello, world"), response.Body, "correct request body")
stop()
s = NewServer(testURL)
s.Bind(
DirectBinding(queueName, handler).
WithSkipQueueDeclare(true).
// Set something different as some queue declare arguments, normally
// this fails but since we set SkipQueueDeclare it will skip declaring the
// queue so it won't care.
WithQueueDeclareArg("x-expires", 33),
)
stop = startAndWait(s)
defer stop()
response, err = c.Send(NewRequest().WithRoutingKey(queueName))
require.NoError(t, err, "no errors occurred")
require.Equal(t, []byte("Hello, world"), response.Body, "correct request body")
}