From 4c52831ffd4e5fe0dea8b1750acefd6062f59610 Mon Sep 17 00:00:00 2001 From: Alexander Akhmetov Date: Sat, 24 Aug 2024 11:23:06 +0200 Subject: [PATCH] Use URL-safe Base64 encoding for the topic name There are some restrictions to what characters are allowed to use in a Grafana Live channel: https://github.com/grafana/grafana-plugin-sdk-go/blob/7470982de35f3b0bb5d17631b4163463153cc204/live/channel.go#L33 To comply with these restrictions, the topic is encoded using URL-safe base64 encoding. (RFC 4648; 5. Base 64 Encoding with URL and Filename Safe Alphabet) --- pkg/mqtt/client.go | 17 ++++++++++++---- pkg/mqtt/topic.go | 25 +++++++++++++++++------ pkg/mqtt/topic_test.go | 46 ++++++++++++++++++++++++++++++++++++++++++ scripts/test_broker.js | 5 ++++- src/datasource.test.ts | 35 ++++++++++++++++++++++++++++++++ src/datasource.ts | 13 ++++++++++-- 6 files changed, 128 insertions(+), 13 deletions(-) create mode 100644 src/datasource.test.ts diff --git a/pkg/mqtt/client.go b/pkg/mqtt/client.go index f46a7af..a56b1be 100644 --- a/pkg/mqtt/client.go +++ b/pkg/mqtt/client.go @@ -131,16 +131,20 @@ func (c *client) Subscribe(reqPath string) *Topic { return t } - log.DefaultLogger.Debug("Subscribing to MQTT topic", "topic", topicPath) + topic, err := decodeTopic(t.Path) + if err != nil { + log.DefaultLogger.Error("Error decoding MQTT topic name", "encodedTopic", t.Path, "error", err) + return nil + } - topic := resolveTopic(t.Path) + log.DefaultLogger.Debug("Subscribing to MQTT topic", "topic", topic) if token := c.client.Subscribe(topic, 0, func(_ paho.Client, m paho.Message) { // by wrapping HandleMessage we can directly get the correct topicPath for the incoming topic // and don't need to regex it against + and #. c.HandleMessage(topicPath, []byte(m.Payload())) }); token.Wait() && token.Error() != nil { - log.DefaultLogger.Error("Error subscribing to MQTT topic", "topic", topicPath, "error", token.Error()) + log.DefaultLogger.Error("Error subscribing to MQTT topic", "topic", topic, "error", token.Error()) } c.topics.Store(t) return t @@ -161,7 +165,12 @@ func (c *client) Unsubscribe(reqPath string) { log.DefaultLogger.Debug("Unsubscribing from MQTT topic", "topic", t.Path) - topic := resolveTopic(t.Path) + topic, err := decodeTopic(t.Path) + if err != nil { + log.DefaultLogger.Error("Error decoding MQTT topic name", "encodedTopic", t.Path, "error", err) + return + } + if token := c.client.Unsubscribe(topic); token.Wait() && token.Error() != nil { log.DefaultLogger.Error("Error unsubscribing from MQTT topic", "topic", t.Path, "error", token.Error()) } diff --git a/pkg/mqtt/topic.go b/pkg/mqtt/topic.go index 8e7db7f..ab42e5a 100644 --- a/pkg/mqtt/topic.go +++ b/pkg/mqtt/topic.go @@ -1,11 +1,12 @@ package mqtt import ( + "encoding/base64" "path" - "strings" "sync" "time" + "github.com/grafana/grafana-plugin-sdk-go/backend/log" "github.com/grafana/grafana-plugin-sdk-go/data" ) @@ -99,9 +100,21 @@ func (tm *TopicMap) Delete(key string) { tm.Map.Delete(key) } -// replace all __PLUS__ with + and one __HASH__ with # -// Question: Why does grafana not allow + and # in query? -func resolveTopic(topic string) string { - resolvedTopic := strings.ReplaceAll(topic, "__PLUS__", "+") - return strings.Replace(resolvedTopic, "__HASH__", "#", -1) +// decodeTopic decodes an MQTT topic name from base64 URL encoding. +// +// There are some restrictions to what characters are allowed to use in a Grafana Live channel: +// +// https://github.com/grafana/grafana-plugin-sdk-go/blob/7470982de35f3b0bb5d17631b4163463153cc204/live/channel.go#L33 +// +// To comply with these restrictions, the topic is encoded using URL-safe base64 +// encoding. (RFC 4648; 5. Base 64 Encoding with URL and Filename Safe Alphabet) +func decodeTopic(topic string) (string, error) { + log.DefaultLogger.Debug("Decoding MQTT topic name", "encodedTopic", topic) + decoded, err := base64.RawURLEncoding.DecodeString(topic) + + if err != nil { + return "", err + } + + return string(decoded), nil } diff --git a/pkg/mqtt/topic_test.go b/pkg/mqtt/topic_test.go index f747956..dcc788b 100644 --- a/pkg/mqtt/topic_test.go +++ b/pkg/mqtt/topic_test.go @@ -1,6 +1,7 @@ package mqtt import ( + "encoding/base64" "sync" "testing" "time" @@ -165,3 +166,48 @@ func TestTopicMap_HasSubscription(t *testing.T) { require.False(t, tm.HasSubscription("testing")) }) } + +func TestDecodeTopic(t *testing.T) { + tests := []struct { + name string + input string + expTopic string + expError bool + }{ + { + name: "Valid encoded string", + input: base64.RawURLEncoding.EncodeToString([]byte("$test/topic/#")), + expTopic: "$test/topic/#", + expError: false, + }, + { + name: "Invalid encoded string", + input: "invalid_@_base64", + expError: true, + }, + { + name: "Empty string", + input: "", + expError: false, + }, + { + name: "Valid encoded string with padding", + input: base64.URLEncoding.EncodeToString([]byte("test/topic")), + expTopic: "", + expError: true, // base64.RawURLEncoding does not accept padding + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + topic, err := decodeTopic(tt.input) + + if tt.expError { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Equal(t, tt.expTopic, topic) + } + }) + } +} diff --git a/scripts/test_broker.js b/scripts/test_broker.js index 22fdeae..04bd628 100644 --- a/scripts/test_broker.js +++ b/scripts/test_broker.js @@ -27,7 +27,9 @@ const toMillis = { const publishers = {}; const createPublisher = ({ topic, qos }) => { let i = 0; - const [duration, value] = topic.split('/'); + + const parts = topic.split('/'); + const [duration, value] = [parts[parts.length - 2], parts[parts.length - 1]]; const fn = toMillis[duration]; if (!fn || !value || value < 1) { @@ -38,6 +40,7 @@ const createPublisher = ({ topic, qos }) => { const interval = fn(value); if (!publishers[topic]) { + console.log('creating publisher for', topic, 'with interval', interval, 'ms'); publishers[topic] = setInterval(() => { let payload = Math.random(); diff --git a/src/datasource.test.ts b/src/datasource.test.ts new file mode 100644 index 0000000..4da80ac --- /dev/null +++ b/src/datasource.test.ts @@ -0,0 +1,35 @@ +import { ScopedVars } from '@grafana/data'; +import { DataSource } from './datasource'; +import { getTemplateSrv } from '@grafana/runtime'; + +jest.mock('@grafana/runtime', () => ({ + DataSourceWithBackend: class {}, + getTemplateSrv: jest.fn(), +})); + +describe('DataSource', () => { + const mockReplace = jest.fn().mockImplementation((value) => value); + (getTemplateSrv as jest.Mock).mockReturnValue({ + replace: mockReplace, + }); + + const scopedVars: ScopedVars = {}; + let dataSource = new DataSource(); + + const testCases = [ + { + description: 'should apply base64 URL-safe encoding correctly', + query: { topic: 'test/topic+/and:more' }, + expectedResult: 'dGVzdC90b3BpYysvYW5kOm1vcmU', + }, + ]; + + testCases.forEach(({ description, query, expectedReplaced, expectedResult }) => { + it(description, () => { + const result = dataSource.applyTemplateVariables(query, scopedVars); + + expect(mockReplace).toHaveBeenCalledWith(query.topic, scopedVars); + expect(result.topic).toBe(expectedResult); + }); + }); +}); diff --git a/src/datasource.ts b/src/datasource.ts index b5d0b65..e482aa2 100644 --- a/src/datasource.ts +++ b/src/datasource.ts @@ -9,8 +9,7 @@ export class DataSource extends DataSourceWithBackend { let resolvedTopic = getTemplateSrv().replace(query.topic, scopedVars); - resolvedTopic = resolvedTopic.replace(/\+/gi, '__PLUS__'); - resolvedTopic = resolvedTopic.replace(/\#/gi, '__HASH__'); + resolvedTopic = this.base64UrlSafeEncode(resolvedTopic); const resolvedQuery: MqttQuery = { ...query, topic: resolvedTopic, @@ -18,4 +17,14 @@ export class DataSource extends DataSourceWithBackend