Skip to content

Commit 58307aa

Browse files
committed
feat(mq): open telemetry support
1 parent a03758d commit 58307aa

File tree

3 files changed

+35
-2
lines changed

3 files changed

+35
-2
lines changed

mq/producer.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"crypto/md5"
66
"fmt"
77
amqp "github.com/rabbitmq/amqp091-go"
8+
"go.opentelemetry.io/otel"
89
)
910

1011
type Producer struct {
@@ -86,6 +87,10 @@ func parseOptions(options []string) map[string]string {
8687

8788
func (p *Producer) publish(ctx context.Context, key string, msg []byte, opts map[string]string) error {
8889

90+
carrier := RabbitMQHeaderCarrier{}
91+
otel.GetTextMapPropagator().Inject(ctx, carrier)
92+
headers := amqp.Table(carrier)
93+
8994
err := p.Channel.PublishWithContext(
9095
ctx,
9196
exchangeName,
@@ -99,6 +104,7 @@ func (p *Producer) publish(ctx context.Context, key string, msg []byte, opts map
99104
AppId: p.appId,
100105
UserId: opts[UserIdKey],
101106
MessageId: fmt.Sprintf("%x", md5.Sum(msg)),
107+
Headers: headers,
102108
})
103109

104110
return err

mq/producer_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@ func TestSendNotify(t *testing.T) {
1717
SetData("keyword4", "通知内容", "") // 通知内容
1818
notice.Receiver("12345678")
1919

20-
p, err := NewProducer("sdk_test", "amqpURL")
20+
p, err := NewProducer("sdk_test", "amqpURL") // 创建一个MQ连接负责消息生产
2121
if err != nil {
2222
t.Error(err)
2323
}
2424

25-
err = p.PublishNotice(context.Background(), notice)
25+
err = p.PublishNotice(context.Background(), notice) // 发送消息
2626
if err != nil {
2727
t.Error(err)
2828
}

mq/propagator.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package mq
2+
3+
import (
4+
amqp "github.com/rabbitmq/amqp091-go"
5+
)
6+
7+
// RabbitMQHeaderCarrier adapts http.Header to satisfy the TextMapCarrier interface.
8+
type RabbitMQHeaderCarrier amqp.Table
9+
10+
// Get returns the value associated with the passed key.
11+
func (hc RabbitMQHeaderCarrier) Get(key string) string {
12+
return hc.Get(key)
13+
}
14+
15+
// Set stores the key-value pair.
16+
func (hc RabbitMQHeaderCarrier) Set(key string, value string) {
17+
hc.Set(key, value)
18+
}
19+
20+
// Keys lists the keys stored in this carrier.
21+
func (hc RabbitMQHeaderCarrier) Keys() []string {
22+
keys := make([]string, 0, len(hc))
23+
for k := range hc {
24+
keys = append(keys, k)
25+
}
26+
return keys
27+
}

0 commit comments

Comments
 (0)