From 1c6aebef6c4215a141f3454d36a3cfd67a8393ba Mon Sep 17 00:00:00 2001 From: Karmanyaah Malhotra Date: Sun, 17 Dec 2023 16:55:09 -0600 Subject: [PATCH 1/6] Reject Matrix with 200 and rejected pushkey instead of returning 5xx remove other complex logic --- server/server.go | 10 ++-------- server/server_matrix.go | 9 --------- server/server_test.go | 12 ++++++------ 3 files changed, 8 insertions(+), 23 deletions(-) diff --git a/server/server.go b/server/server.go index c0f3b641d..78a4ab661 100644 --- a/server/server.go +++ b/server/server.go @@ -848,18 +848,12 @@ func (s *Server) handlePublishMatrix(w http.ResponseWriter, r *http.Request, v * if err != nil { minc(metricMessagesPublishedFailure) minc(metricMatrixPublishedFailure) - if e, ok := err.(*errHTTP); ok && e.HTTPCode == errHTTPInsufficientStorageUnifiedPush.HTTPCode { - topic, err := fromContext[*topic](r, contextTopic) - if err != nil { - return err - } + if e, ok := err.(*errHTTP); ok && e.HTTPCode == errHTTPNoSubscriberUnifiedPush.HTTPCode { pushKey, err := fromContext[string](r, contextMatrixPushKey) if err != nil { return err } - if time.Since(topic.LastAccess()) > matrixRejectPushKeyForUnifiedPushTopicWithoutRateVisitorAfter { - return writeMatrixResponse(w, pushKey) - } + return writeMatrixResponse(w, pushKey) } return err } diff --git a/server/server_matrix.go b/server/server_matrix.go index f99bea8fe..6613b591e 100644 --- a/server/server_matrix.go +++ b/server/server_matrix.go @@ -8,7 +8,6 @@ import ( "io" "net/http" "strings" - "time" ) // Matrix Push Gateway / UnifiedPush / ntfy integration: @@ -72,14 +71,6 @@ type matrixResponse struct { Rejected []string `json:"rejected"` } -const ( - // matrixRejectPushKeyForUnifiedPushTopicWithoutRateVisitorAfter is the time after which a Matrix response - // will return an HTTP 200 with the push key (i.e. "rejected":[""]}), if no rate visitor has been set on - // the topic. Rejecting the push key will instruct the Matrix server to invalidate the pushkey and stop sending - // messages to it. This must be longer than topicExpungeAfter. See https://spec.matrix.org/v1.6/push-gateway-api/ - matrixRejectPushKeyForUnifiedPushTopicWithoutRateVisitorAfter = 12 * time.Hour -) - // errMatrixPushkeyRejected represents an error when handing Matrix gateway messages // // If the push key is set, the app server will remove it and will never send messages using the same diff --git a/server/server_test.go b/server/server_test.go index 5f2bac6a8..ed41059a9 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -1456,8 +1456,8 @@ func TestServer_MatrixGateway_Push_Failure_NoSubscriber(t *testing.T) { s := newTestServer(t, c) notification := `{"notification":{"devices":[{"pushkey":"http://127.0.0.1:12345/mytopic?up=1"}]}}` response := request(t, s, "POST", "/_matrix/push/v1/notify", notification, nil) - require.Equal(t, 507, response.Code) - require.Equal(t, 50701, toHTTPError(t, response.Body.String()).Code) + require.Equal(t, 200, response.Code) + require.Equal(t, `{"rejected":["http://127.0.0.1:12345/mytopic?up=1"]}`+"\n", response.Body.String()) } func TestServer_MatrixGateway_Push_Failure_NoSubscriber_After13Hours(t *testing.T) { @@ -1466,16 +1466,16 @@ func TestServer_MatrixGateway_Push_Failure_NoSubscriber_After13Hours(t *testing. s := newTestServer(t, c) notification := `{"notification":{"devices":[{"pushkey":"http://127.0.0.1:12345/mytopic?up=1"}]}}` - // No success if no rate visitor set (this also creates the topic in memory) + // Simply reject if no rate visitor set (this creates the topic in memory) response := request(t, s, "POST", "/_matrix/push/v1/notify", notification, nil) - require.Equal(t, 507, response.Code) - require.Equal(t, 50701, toHTTPError(t, response.Body.String()).Code) + require.Equal(t, 200, response.Code) + require.Equal(t, `{"rejected":["http://127.0.0.1:12345/mytopic?up=1"]}`, strings.TrimSpace(response.Body.String())) require.Nil(t, s.topics["mytopic"].rateVisitor) // Fake: This topic has been around for 13 hours without a rate visitor s.topics["mytopic"].lastAccess = time.Now().Add(-13 * time.Hour) - // Same request should now return HTTP 200 with a rejected pushkey + // Same request should still return an HTTP 200 with a rejected pushkey response = request(t, s, "POST", "/_matrix/push/v1/notify", notification, nil) require.Equal(t, 200, response.Code) require.Equal(t, `{"rejected":["http://127.0.0.1:12345/mytopic?up=1"]}`, strings.TrimSpace(response.Body.String())) From 072520eacc6c78bd11262ba8b141312dfbfb9cc7 Mon Sep 17 00:00:00 2001 From: Karmanyaah Malhotra Date: Sun, 17 Dec 2023 16:57:34 -0600 Subject: [PATCH 2/6] rename UnifiedPush can't publish error to 404 --- server/errors.go | 4 ++-- server/server.go | 7 +++---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/server/errors.go b/server/errors.go index 072bdc011..543a21eac 100644 --- a/server/errors.go +++ b/server/errors.go @@ -117,9 +117,10 @@ var ( errHTTPBadRequestWebPushSubscriptionInvalid = &errHTTP{40038, http.StatusBadRequest, "invalid request: web push payload malformed", "", nil} errHTTPBadRequestWebPushEndpointUnknown = &errHTTP{40039, http.StatusBadRequest, "invalid request: web push endpoint unknown", "", nil} errHTTPBadRequestWebPushTopicCountTooHigh = &errHTTP{40040, http.StatusBadRequest, "invalid request: too many web push topic subscriptions", "", nil} - errHTTPNotFound = &errHTTP{40401, http.StatusNotFound, "page not found", "", nil} errHTTPUnauthorized = &errHTTP{40101, http.StatusUnauthorized, "unauthorized", "https://ntfy.sh/docs/publish/#authentication", nil} errHTTPForbidden = &errHTTP{40301, http.StatusForbidden, "forbidden", "https://ntfy.sh/docs/publish/#authentication", nil} + errHTTPNotFound = &errHTTP{40401, http.StatusNotFound, "page not found", "", nil} + errHTTPNoSubscriberUnifiedPush = &errHTTP{40401, http.StatusInsufficientStorage, "cannot publish to UnifiedPush topic without previously active subscriber", "", nil} errHTTPConflictUserExists = &errHTTP{40901, http.StatusConflict, "conflict: user already exists", "", nil} errHTTPConflictTopicReserved = &errHTTP{40902, http.StatusConflict, "conflict: access control entry for topic or topic pattern already exists", "", nil} errHTTPConflictSubscriptionExists = &errHTTP{40903, http.StatusConflict, "conflict: topic subscription already exists", "", nil} @@ -142,5 +143,4 @@ var ( errHTTPInternalErrorInvalidPath = &errHTTP{50002, http.StatusInternalServerError, "internal server error: invalid path", "", nil} errHTTPInternalErrorMissingBaseURL = &errHTTP{50003, http.StatusInternalServerError, "internal server error: base-url must be be configured for this feature", "https://ntfy.sh/docs/config/", nil} errHTTPInternalErrorWebPushUnableToPublish = &errHTTP{50004, http.StatusInternalServerError, "internal server error: unable to publish web push message", "", nil} - errHTTPInsufficientStorageUnifiedPush = &errHTTP{50701, http.StatusInsufficientStorage, "cannot publish to UnifiedPush topic without previously active subscriber", "", nil} ) diff --git a/server/server.go b/server/server.go index 78a4ab661..92ea503d7 100644 --- a/server/server.go +++ b/server/server.go @@ -744,10 +744,9 @@ func (s *Server) handlePublishInternal(r *http.Request, v *visitor) (*message, e } if unifiedpush && s.config.VisitorSubscriberRateLimiting && t.RateVisitor() == nil { // UnifiedPush clients must subscribe before publishing to allow proper subscriber-based rate limiting (see - // Rate-Topics header). The 5xx response is because some app servers (in particular Mastodon) will remove - // the subscription as invalid if any 400-499 code (except 429/408) is returned. - // See https://github.com/mastodon/mastodon/blob/730bb3e211a84a2f30e3e2bbeae3f77149824a68/app/workers/web/push_notification_worker.rb#L35-L46 - return nil, errHTTPInsufficientStorageUnifiedPush.With(t) + // Rate-Topics header). The 404 response might remove the push subscription from application servers, + // but the client should resubscribe them when sent the new_topic parameter. + return nil, errHTTPNoSubscriberUnifiedPush.With(t) } else if !util.ContainsIP(s.config.VisitorRequestExemptIPAddrs, v.ip) && !vrate.MessageAllowed() { return nil, errHTTPTooManyRequestsLimitMessages.With(t) } else if email != "" && !vrate.EmailAllowed() { From 8f5213ab1a41f7e2db30bdcae84b52cf8cd1b606 Mon Sep 17 00:00:00 2001 From: Karmanyaah Malhotra Date: Sun, 17 Dec 2023 16:58:06 -0600 Subject: [PATCH 3/6] keep track of topics that have never been subscribed to --- server/topic.go | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/server/topic.go b/server/topic.go index 49def94b9..b78ea0965 100644 --- a/server/topic.go +++ b/server/topic.go @@ -19,11 +19,12 @@ const ( // topic represents a channel to which subscribers can subscribe, and publishers // can publish a message type topic struct { - ID string - subscribers map[int]*topicSubscriber - rateVisitor *visitor - lastAccess time.Time - mu sync.RWMutex + ID string + subscribers map[int]*topicSubscriber + rateVisitor *visitor + lastAccess time.Time + neverSubscribed bool + mu sync.RWMutex } type topicSubscriber struct { @@ -41,6 +42,7 @@ func newTopic(id string) *topic { ID: id, subscribers: make(map[int]*topicSubscriber), lastAccess: time.Now(), + neverSubscribed: true, } } @@ -61,6 +63,7 @@ func (t *topic) Subscribe(s subscriber, userID string, cancel func()) (subscribe cancel: cancel, } t.lastAccess = time.Now() + t.neverSubscribed = false return subscriberID } @@ -79,6 +82,12 @@ func (t *topic) LastAccess() time.Time { return t.lastAccess } +func (t *topic) NeverSubscribed() bool { + t.mu.RLock() + defer t.mu.RUnlock() + return t.neverSubscribed +} + func (t *topic) SetRateVisitor(v *visitor) { t.mu.Lock() defer t.mu.Unlock() From 9bbb1ac064d474d89e643a11e859140617c58172 Mon Sep 17 00:00:00 2001 From: Karmanyaah Malhotra Date: Sun, 17 Dec 2023 16:58:28 -0600 Subject: [PATCH 4/6] add the "new_topic" param to the open message --- server/message_cache_test.go | 4 ++-- server/server_firebase_test.go | 2 +- server/types.go | 9 +++++++-- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/server/message_cache_test.go b/server/message_cache_test.go index 79b7fc54c..d51294e98 100644 --- a/server/message_cache_test.go +++ b/server/message_cache_test.go @@ -32,8 +32,8 @@ func testCacheMessages(t *testing.T, c *messageCache) { require.Nil(t, c.AddMessage(m2)) // Adding invalid - require.Equal(t, errUnexpectedMessageType, c.AddMessage(newKeepaliveMessage("mytopic"))) // These should not be added! - require.Equal(t, errUnexpectedMessageType, c.AddMessage(newOpenMessage("example"))) // These should not be added! + require.Equal(t, errUnexpectedMessageType, c.AddMessage(newKeepaliveMessage("mytopic"))) // These should not be added! + require.Equal(t, errUnexpectedMessageType, c.AddMessage(newOpenMessage("example", false))) // These should not be added! // mytopic: count counts, err := c.MessageCounts() diff --git a/server/server_firebase_test.go b/server/server_firebase_test.go index 9b653a29c..2740fc85c 100644 --- a/server/server_firebase_test.go +++ b/server/server_firebase_test.go @@ -92,7 +92,7 @@ func TestToFirebaseMessage_Keepalive(t *testing.T) { } func TestToFirebaseMessage_Open(t *testing.T) { - m := newOpenMessage("mytopic") + m := newOpenMessage("mytopic", false) fbm, err := toFirebaseMessage(m, nil) require.Nil(t, err) require.Equal(t, "mytopic", fbm.Topic) diff --git a/server/types.go b/server/types.go index fb08fb054..83b0ca9ba 100644 --- a/server/types.go +++ b/server/types.go @@ -17,6 +17,7 @@ const ( keepaliveEvent = "keepalive" messageEvent = "message" pollRequestEvent = "poll_request" + createdNewTopicParameter = "new_topic" ) const ( @@ -123,8 +124,12 @@ func newMessage(event, topic, msg string) *message { } // newOpenMessage is a convenience method to create an open message -func newOpenMessage(topic string) *message { - return newMessage(openEvent, topic, "") +func newOpenMessage(topic string, createdNewTopics bool) *message { + msg := "" + if createdNewTopics { // can expand this to a comma seperated string for more future parameters + msg = createdNewTopicParameter + } + return newMessage(openEvent, topic, msg) } // newKeepaliveMessage is a convenience method to create a keepalive message From 0d607c7107ae42afb47c9d8e2d39e5455556b71a Mon Sep 17 00:00:00 2001 From: Karmanyaah Malhotra Date: Sun, 17 Dec 2023 16:59:23 -0600 Subject: [PATCH 5/6] Return new_topic in open message when a fresh topic is subscribed to Associated tests --- server/server.go | 11 +++++++-- server/server_test.go | 52 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 60 insertions(+), 3 deletions(-) diff --git a/server/server.go b/server/server.go index 92ea503d7..d3d972c4d 100644 --- a/server/server.go +++ b/server/server.go @@ -1218,8 +1218,11 @@ func (s *Server) handleSubscribeHTTP(w http.ResponseWriter, r *http.Request, v * } ctx, cancel := context.WithCancel(context.Background()) defer cancel() + + createdNewTopics := false subscriberIDs := make([]int, 0) for _, t := range topics { + createdNewTopics = createdNewTopics || t.NeverSubscribed() subscriberIDs = append(subscriberIDs, t.Subscribe(sub, v.MaybeUserID(), cancel)) } defer func() { @@ -1227,7 +1230,7 @@ func (s *Server) handleSubscribeHTTP(w http.ResponseWriter, r *http.Request, v * topics[i].Unsubscribe(subscriberID) // Order! } }() - if err := sub(v, newOpenMessage(topicsStr)); err != nil { // Send out open message + if err := sub(v, newOpenMessage(topicsStr, createdNewTopics)); err != nil { // Send out open message return err } if err := s.sendOldMessages(topics, since, scheduled, v, sub); err != nil { @@ -1367,8 +1370,11 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi } return s.sendOldMessages(topics, since, scheduled, v, sub) } + + createdNewTopic := false subscriberIDs := make([]int, 0) for _, t := range topics { + createdNewTopic = createdNewTopic || t.NeverSubscribed() subscriberIDs = append(subscriberIDs, t.Subscribe(sub, v.MaybeUserID(), cancel)) } defer func() { @@ -1376,7 +1382,8 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi topics[i].Unsubscribe(subscriberID) // Order! } }() - if err := sub(v, newOpenMessage(topicsStr)); err != nil { // Send out open message + + if err := sub(v, newOpenMessage(topicsStr, createdNewTopic)); err != nil { // Send out open message return err } if err := s.sendOldMessages(topics, since, scheduled, v, sub); err != nil { diff --git a/server/server_test.go b/server/server_test.go index ed41059a9..a1bfdd862 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -136,7 +136,7 @@ func TestServer_SubscribeOpenAndKeepalive(t *testing.T) { require.Equal(t, openEvent, messages[0].Event) require.Equal(t, "mytopic", messages[0].Topic) - require.Equal(t, "", messages[0].Message) + require.Equal(t, "new_topic", messages[0].Message) require.Equal(t, "", messages[0].Title) require.Equal(t, 0, messages[0].Priority) require.Nil(t, messages[0].Tags) @@ -147,6 +147,56 @@ func TestServer_SubscribeOpenAndKeepalive(t *testing.T) { require.Equal(t, "", messages[1].Title) require.Equal(t, 0, messages[1].Priority) require.Nil(t, messages[1].Tags) + + // The next time subscribing to the same topic will not result in new_topic on open + rr = httptest.NewRecorder() + ctx, cancel = context.WithCancel(context.Background()) + req, err = http.NewRequestWithContext(ctx, "GET", "/mytopic/json", nil) + if err != nil { + t.Fatal(err) + } + go func() { + s.handle(rr, req) + doneChan <- true + }() + time.Sleep(300 * time.Millisecond) + cancel() + <-doneChan + + messages = toMessages(t, rr.Body.String()) + require.Equal(t, 1, len(messages)) + + require.Equal(t, openEvent, messages[0].Event) + require.Equal(t, "mytopic", messages[0].Topic) + require.Equal(t, "", messages[0].Message) + require.Equal(t, "", messages[0].Title) + require.Equal(t, 0, messages[0].Priority) + require.Nil(t, messages[0].Tags) + + // Subscribing to any new topic again will result in new_topic being sent + rr = httptest.NewRecorder() + ctx, cancel = context.WithCancel(context.Background()) + req, err = http.NewRequestWithContext(ctx, "GET", "/mytopic,topic2/json", nil) + if err != nil { + t.Fatal(err) + } + go func() { + s.handle(rr, req) + doneChan <- true + }() + time.Sleep(300 * time.Millisecond) + cancel() + <-doneChan + + messages = toMessages(t, rr.Body.String()) + require.Equal(t, 1, len(messages)) + + require.Equal(t, openEvent, messages[0].Event) + require.Equal(t, "mytopic,topic2", messages[0].Topic) + require.Equal(t, "new_topic", messages[0].Message) + require.Equal(t, "", messages[0].Title) + require.Equal(t, 0, messages[0].Priority) + require.Nil(t, messages[0].Tags) } func TestServer_PublishAndSubscribe(t *testing.T) { From ec7977891e09271e7a9085d6ab124fc9b194f0cf Mon Sep 17 00:00:00 2001 From: Karmanyaah Malhotra Date: Sun, 17 Dec 2023 17:11:41 -0600 Subject: [PATCH 6/6] format --- server/topic.go | 6 +++--- server/types.go | 12 ++++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/server/topic.go b/server/topic.go index b78ea0965..aafaee30c 100644 --- a/server/topic.go +++ b/server/topic.go @@ -39,9 +39,9 @@ type subscriber func(v *visitor, msg *message) error // newTopic creates a new topic func newTopic(id string) *topic { return &topic{ - ID: id, - subscribers: make(map[int]*topicSubscriber), - lastAccess: time.Now(), + ID: id, + subscribers: make(map[int]*topicSubscriber), + lastAccess: time.Now(), neverSubscribed: true, } } diff --git a/server/types.go b/server/types.go index 83b0ca9ba..9b69962d4 100644 --- a/server/types.go +++ b/server/types.go @@ -13,11 +13,11 @@ import ( // List of possible events const ( - openEvent = "open" - keepaliveEvent = "keepalive" - messageEvent = "message" - pollRequestEvent = "poll_request" - createdNewTopicParameter = "new_topic" + openEvent = "open" + openCreatedNewTopic = "new_topic" + keepaliveEvent = "keepalive" + messageEvent = "message" + pollRequestEvent = "poll_request" ) const ( @@ -127,7 +127,7 @@ func newMessage(event, topic, msg string) *message { func newOpenMessage(topic string, createdNewTopics bool) *message { msg := "" if createdNewTopics { // can expand this to a comma seperated string for more future parameters - msg = createdNewTopicParameter + msg = openCreatedNewTopic } return newMessage(openEvent, topic, msg) }