-
Notifications
You must be signed in to change notification settings - Fork 3
/
topic_test.go
152 lines (123 loc) · 3.93 KB
/
topic_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package gokaf
import (
"context"
"testing"
"time"
)
func TestNewTopic(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
name := "testTopic"
topic := newTopic(ctx, mockLogger, name, 0)
// Add assertions to check if the topic is created correctly
if topic == nil {
t.Error("Expected non-nil topic, got nil")
}
if topic != nil && topic.name != name {
t.Errorf("Expected topic name %s, got %s", name, topic.name)
}
}
func TestTopicClose(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
name := "testTopic"
topic := newTopic(ctx, mockLogger, name, 0)
// Use a channel to check if the close method is called
closedChan := make(chan struct{})
go func() {
topic.close()
closedChan <- struct{}{}
}()
// Wait for the close method to complete
select {
case <-closedChan:
// Close method completed successfully
case <-time.After(time.Second):
t.Error("Timeout waiting for close method to complete")
}
// Add assertions to check if the close method behaves as expected
// For example, you can check if the context is canceled, etc.
// You may need to modify these assertions based on your specific implementation.
if topic.ctx.Err() != context.Canceled {
t.Error("Expected canceled context after close, got", topic.ctx.Err())
}
}
func TestTopicPublish(t *testing.T) {
// Create a new topic for testing
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
topicName := "testTopic"
testTopic := newTopic(ctx, mockLogger, topicName, 0)
// Use a channel to capture published messages
publishedMessages := make(chan interface{}, 3)
// Start a goroutine to simulate message consumption
go func() {
for msg := range testTopic.channel.ch {
publishedMessages <- msg
}
}()
// Publish messages to the topic
message1 := "Message 1"
err1 := testTopic.publish(message1)
if err1 != nil {
t.Errorf("Error publishing message1: %v", err1)
}
message2 := "Message 2"
err2 := testTopic.publish(message2)
if err2 != nil {
t.Errorf("Error publishing message2: %v", err2)
}
// Close the topic to stop the goroutine
testTopic.close()
testTopic.wg.Wait()
// Verify the published messages
select {
case receivedMsg1 := <-publishedMessages:
if receivedMsg1 != message1 {
t.Errorf("Expected message1 %v, got %v", message1, receivedMsg1)
}
case <-time.After(time.Second):
t.Error("Timeout waiting for message1")
}
select {
case receivedMsg2 := <-publishedMessages:
if receivedMsg2 != message2 {
t.Errorf("Expected message2 %v, got %v", message2, receivedMsg2)
}
case <-time.After(time.Second):
t.Error("Timeout waiting for message2")
}
}
func TestClosedTopicPublish(t *testing.T) {
// Create a new topic
ctx, cancel := context.WithCancel(context.Background())
name := "testTopic"
topic := newTopic(ctx, mockLogger, name, 0)
// Test publishing a message to the topic
message := "Hello, world!"
// Test publishing when the topic is closed
cancel() // Close the topic
err := topic.publish(message)
if err == nil {
t.Error("Expected error when publishing to a closed topic, but got none.")
} else if err.Error() != "Topic testTopic is already closed" {
t.Errorf("Unexpected error message. Expected 'Topic testTopic is already closed', but got '%v'", err.Error())
}
}
func TestClosedTopicChannelPublish(t *testing.T) {
// Create a new topic
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
name := "testTopic"
topic := newTopic(ctx, mockLogger, name, 0)
// Test publishing a message to the topic
message := "Hello, world!"
// Test publishing when the topic channel is closed
topic.channel.Close()
err := topic.publish(message)
if err == nil {
t.Error("Expected error when publishing to a closed topic, but got none.")
} else if err.Error() != "Topic testTopic is already closed" {
t.Errorf("Unexpected error message. Expected 'Topic testTopic is already closed', but got '%v'", err.Error())
}
}