Skip to content

Commit 4ef2524

Browse files
Use URL-safe Base64 encoding for the topic name
There are some restrictions to what characters are allowed to use in a Grafana Live channel: https://github.com/grafana/grafana-plugin-sdk-go/blob/7470982de35f3b0bb5d17631b4163463153cc204/live/channel.go#L33 To comply with these restrictions, the topic is encoded using URL-safe base64 encoding. (RFC 4648; 5. Base 64 Encoding with URL and Filename Safe Alphabet)
1 parent 3ec8fa3 commit 4ef2524

File tree

5 files changed

+124
-12
lines changed

5 files changed

+124
-12
lines changed

pkg/mqtt/client.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,16 +131,20 @@ func (c *client) Subscribe(reqPath string) *Topic {
131131
return t
132132
}
133133

134-
log.DefaultLogger.Debug("Subscribing to MQTT topic", "topic", topicPath)
134+
topic, err := decodeTopic(t.Path)
135+
if err != nil {
136+
log.DefaultLogger.Error("Error decoding MQTT topic name", "encodedTopic", t.Path, "error", err)
137+
return nil
138+
}
135139

136-
topic := resolveTopic(t.Path)
140+
log.DefaultLogger.Debug("Subscribing to MQTT topic", "topic", topic)
137141

138142
if token := c.client.Subscribe(topic, 0, func(_ paho.Client, m paho.Message) {
139143
// by wrapping HandleMessage we can directly get the correct topicPath for the incoming topic
140144
// and don't need to regex it against + and #.
141145
c.HandleMessage(topicPath, []byte(m.Payload()))
142146
}); token.Wait() && token.Error() != nil {
143-
log.DefaultLogger.Error("Error subscribing to MQTT topic", "topic", topicPath, "error", token.Error())
147+
log.DefaultLogger.Error("Error subscribing to MQTT topic", "topic", topic, "error", token.Error())
144148
}
145149
c.topics.Store(t)
146150
return t
@@ -161,7 +165,12 @@ func (c *client) Unsubscribe(reqPath string) {
161165

162166
log.DefaultLogger.Debug("Unsubscribing from MQTT topic", "topic", t.Path)
163167

164-
topic := resolveTopic(t.Path)
168+
topic, err := decodeTopic(t.Path)
169+
if err != nil {
170+
log.DefaultLogger.Error("Error decoding MQTT topic name", "encodedTopic", t.Path, "error", err)
171+
return
172+
}
173+
165174
if token := c.client.Unsubscribe(topic); token.Wait() && token.Error() != nil {
166175
log.DefaultLogger.Error("Error unsubscribing from MQTT topic", "topic", t.Path, "error", token.Error())
167176
}

pkg/mqtt/topic.go

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package mqtt
22

33
import (
4+
"encoding/base64"
45
"path"
5-
"strings"
66
"sync"
77
"time"
88

9+
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
910
"github.com/grafana/grafana-plugin-sdk-go/data"
1011
)
1112

@@ -99,9 +100,21 @@ func (tm *TopicMap) Delete(key string) {
99100
tm.Map.Delete(key)
100101
}
101102

102-
// replace all __PLUS__ with + and one __HASH__ with #
103-
// Question: Why does grafana not allow + and # in query?
104-
func resolveTopic(topic string) string {
105-
resolvedTopic := strings.ReplaceAll(topic, "__PLUS__", "+")
106-
return strings.Replace(resolvedTopic, "__HASH__", "#", -1)
103+
// decodeTopic decodes an MQTT topic name from base64 URL encoding.
104+
//
105+
// There are some restrictions to what characters are allowed to use in a Grafana Live channel:
106+
//
107+
// https://github.com/grafana/grafana-plugin-sdk-go/blob/7470982de35f3b0bb5d17631b4163463153cc204/live/channel.go#L33
108+
//
109+
// To comply with these restrictions, the topic is encoded using URL-safe base64
110+
// encoding. (RFC 4648; 5. Base 64 Encoding with URL and Filename Safe Alphabet)
111+
func decodeTopic(topic string) (string, error) {
112+
log.DefaultLogger.Debug("Decoding MQTT topic name", "encodedTopic", topic)
113+
decoded, err := base64.RawURLEncoding.DecodeString(topic)
114+
115+
if err != nil {
116+
return "", err
117+
}
118+
119+
return string(decoded), nil
107120
}

pkg/mqtt/topic_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package mqtt
22

33
import (
4+
"encoding/base64"
45
"sync"
56
"testing"
67
"time"
@@ -165,3 +166,48 @@ func TestTopicMap_HasSubscription(t *testing.T) {
165166
require.False(t, tm.HasSubscription("testing"))
166167
})
167168
}
169+
170+
func TestDecodeTopic(t *testing.T) {
171+
tests := []struct {
172+
name string
173+
input string
174+
expTopic string
175+
expError bool
176+
}{
177+
{
178+
name: "Valid encoded string",
179+
input: base64.RawURLEncoding.EncodeToString([]byte("$test/topic/#")),
180+
expTopic: "$test/topic/#",
181+
expError: false,
182+
},
183+
{
184+
name: "Invalid encoded string",
185+
input: "invalid_@_base64",
186+
expError: true,
187+
},
188+
{
189+
name: "Empty string",
190+
input: "",
191+
expError: false,
192+
},
193+
{
194+
name: "Valid encoded string with padding",
195+
input: base64.URLEncoding.EncodeToString([]byte("test/topic")),
196+
expTopic: "",
197+
expError: true, // base64.RawURLEncoding does not accept padding
198+
},
199+
}
200+
201+
for _, tt := range tests {
202+
t.Run(tt.name, func(t *testing.T) {
203+
topic, err := decodeTopic(tt.input)
204+
205+
if tt.expError {
206+
require.Error(t, err)
207+
} else {
208+
require.NoError(t, err)
209+
require.Equal(t, tt.expTopic, topic)
210+
}
211+
})
212+
}
213+
}

src/datasource.test.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import { ScopedVars } from '@grafana/data';
2+
import { DataSource } from './datasource';
3+
import { getTemplateSrv } from '@grafana/runtime';
4+
5+
jest.mock('@grafana/runtime', () => ({
6+
DataSourceWithBackend: class {},
7+
getTemplateSrv: jest.fn(),
8+
}));
9+
10+
describe('DataSource', () => {
11+
const mockReplace = jest.fn().mockImplementation((value) => value);
12+
(getTemplateSrv as jest.Mock).mockReturnValue({
13+
replace: mockReplace,
14+
});
15+
16+
const scopedVars: ScopedVars = {};
17+
let dataSource = new DataSource();
18+
19+
const testCases = [
20+
{
21+
description: 'should apply base64 URL-safe encoding correctly',
22+
query: { topic: 'test/topic+/and:more' },
23+
expectedResult: 'dGVzdC90b3BpYysvYW5kOm1vcmU',
24+
},
25+
];
26+
27+
testCases.forEach(({ description, query, expectedReplaced, expectedResult }) => {
28+
it(description, () => {
29+
const result = dataSource.applyTemplateVariables(query, scopedVars);
30+
31+
expect(mockReplace).toHaveBeenCalledWith(query.topic, scopedVars);
32+
expect(result.topic).toBe(expectedResult);
33+
});
34+
});
35+
});

src/datasource.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,22 @@ export class DataSource extends DataSourceWithBackend<MqttQuery, MqttDataSourceO
99

1010
applyTemplateVariables(query: MqttQuery, scopedVars: ScopedVars): Record<string, any> {
1111
let resolvedTopic = getTemplateSrv().replace(query.topic, scopedVars);
12-
resolvedTopic = resolvedTopic.replace(/\+/gi, '__PLUS__');
13-
resolvedTopic = resolvedTopic.replace(/\#/gi, '__HASH__');
12+
resolvedTopic = this.base64UrlSafeEncode(resolvedTopic);
1413
const resolvedQuery: MqttQuery = {
1514
...query,
1615
topic: resolvedTopic,
1716
};
1817

1918
return resolvedQuery;
2019
}
20+
21+
// There are some restrictions to what characters are allowed to use in a Grafana Live channel:
22+
//
23+
// https://github.com/grafana/grafana-plugin-sdk-go/blob/7470982de35f3b0bb5d17631b4163463153cc204/live/channel.go#L33
24+
//
25+
// To comply with these restrictions, the topic is encoded using URL-safe base64 encoding.
26+
// (RFC 4648; 5. Base 64 Encoding with URL and Filename Safe Alphabet)
27+
private base64UrlSafeEncode(input: string): string {
28+
return btoa(input).replace(/\+/g, '-').replace(/\//g, '_').replace(/=+$/, '');
29+
}
2130
}

0 commit comments

Comments
 (0)