1
1
import Gist from '../gist' ;
2
2
import { log } from "../utilities/log" ;
3
- import { getUserToken } from "./user-manager" ;
4
- import { getUserQueue , userQueueNextPullCheckLocalStoreName } from "../services/queue-service" ;
3
+ import { getUserToken , isAnonymousUser } from "./user-manager" ;
4
+ import { getUserQueue , getQueueSSEEndpoint , userQueueNextPullCheckLocalStoreName } from "../services/queue-service" ;
5
5
import { showMessage , embedMessage } from "./message-manager" ;
6
6
import { resolveMessageProperties } from "./gist-properties-manager" ;
7
7
import { getKeyFromLocalStore } from '../utilities/local-storage' ;
8
8
import { updateBroadcastsLocalStore , getEligibleBroadcasts , isShowAlwaysBroadcast } from './message-broadcast-manager' ;
9
9
import { updateQueueLocalStore , getMessagesFromLocalStore , isMessageLoading , setMessageLoading } from './message-user-queue-manager' ;
10
+ import { settings } from '../services/settings' ;
10
11
11
12
var sleep = time => new Promise ( resolve => setTimeout ( resolve , time ) )
12
13
var poll = ( promiseFn , time ) => promiseFn ( ) . then ( sleep ( time ) . then ( ( ) => poll ( promiseFn , time ) ) ) ;
13
14
var pollingSetup = false ;
15
+ let sseSource = null ;
14
16
15
17
export async function startQueueListener ( ) {
16
18
if ( ! pollingSetup ) {
@@ -22,7 +24,7 @@ export async function startQueueListener() {
22
24
log ( "User token not setup, queue not started." ) ;
23
25
}
24
26
} else {
25
- checkMessageQueue ( ) ;
27
+ await checkMessageQueue ( ) ;
26
28
}
27
29
}
28
30
@@ -42,7 +44,7 @@ export async function checkMessageQueue() {
42
44
async function handleMessage ( message ) {
43
45
var messageProperties = resolveMessageProperties ( message ) ;
44
46
if ( messageProperties . hasRouteRule ) {
45
- var currentUrl = Gist . currentRoute
47
+ var currentUrl = Gist . currentRoute ;
46
48
if ( currentUrl == null ) {
47
49
currentUrl = new URL ( window . location . href ) . pathname ;
48
50
}
@@ -75,6 +77,23 @@ async function handleMessage(message) {
75
77
}
76
78
77
79
export async function pullMessagesFromQueue ( ) {
80
+ // If SSE connection is already active, just check the local queue
81
+ if ( settings . hasActiveSSEConnection ( ) ) {
82
+ await checkMessageQueue ( ) ;
83
+ return ;
84
+ }
85
+
86
+ // If SSE is enabled and user is not anonymous, set up SSE listener
87
+ if ( settings . useSSE ( ) && ! isAnonymousUser ( ) ) {
88
+ await setupSSEQueueListener ( ) ;
89
+ return ;
90
+ }
91
+
92
+ // Fall back to polling
93
+ await checkQueueThroughPolling ( ) ;
94
+ }
95
+
96
+ async function checkQueueThroughPolling ( ) {
78
97
if ( getUserToken ( ) ) {
79
98
if ( Gist . isDocumentVisible ) {
80
99
// We're using the TTL as a way to determine if we should check the queue, so if the key is not there, we check the queue.
@@ -87,8 +106,7 @@ export async function pullMessagesFromQueue() {
87
106
responseData = response . data ;
88
107
updateQueueLocalStore ( responseData ) ;
89
108
updateBroadcastsLocalStore ( responseData ) ;
90
- }
91
- else if ( response . status === 304 ) {
109
+ } else if ( response . status === 304 ) {
92
110
log ( "304 response, using local store." ) ;
93
111
}
94
112
await checkMessageQueue ( ) ;
@@ -99,9 +117,66 @@ export async function pullMessagesFromQueue() {
99
117
log ( `Next queue pull scheduled for later.` ) ;
100
118
}
101
119
} else {
102
- log ( `Document not visible, skipping queue check.` ) ;
120
+ log ( `Document not visible, skipping queue check.` ) ;
103
121
}
104
122
} else {
105
123
log ( `User token reset, skipping queue check.` ) ;
106
124
}
107
- }
125
+ }
126
+
127
+ async function setupSSEQueueListener ( ) {
128
+ const sseURL = getQueueSSEEndpoint ( ) ;
129
+ if ( sseURL === null ) {
130
+ log ( "SSE endpoint not available, falling back to polling." ) ;
131
+ await checkQueueThroughPolling ( ) ;
132
+ return ;
133
+ }
134
+ log ( `Starting SSE queue listener on ${ sseURL } ` ) ;
135
+ sseSource = new EventSource ( sseURL ) ;
136
+ settings . setActiveSSEConnection ( ) ;
137
+
138
+ sseSource . addEventListener ( "connected" , async ( event ) => {
139
+ log ( "SSE connection received:" , event ) ;
140
+ settings . setActiveSSEConnection ( ) ;
141
+ settings . setUseSSEFlag ( true ) ;
142
+ } ) ;
143
+
144
+ sseSource . addEventListener ( "messages" , async ( event ) => {
145
+ try {
146
+ var messages = JSON . parse ( event . data ) ;
147
+ log ( "SSE message received:" , messages ) ;
148
+ await updateQueueLocalStore ( messages ) ;
149
+ await updateBroadcastsLocalStore ( messages ) ;
150
+ await checkMessageQueue ( ) ;
151
+ } catch ( e ) {
152
+ log ( "Failed to parse SSE message" , e ) ;
153
+ stopSSEListener ( ) ;
154
+ }
155
+ } ) ;
156
+
157
+ sseSource . addEventListener ( "error" , async ( event ) => {
158
+ log ( "SSE error received:" , event ) ;
159
+ stopSSEListener ( ) ;
160
+ } ) ;
161
+
162
+ sseSource . addEventListener ( "heartbeat" , async ( event ) => {
163
+ log ( "SSE heartbeat received:" , event ) ;
164
+ settings . setActiveSSEConnection ( ) ;
165
+ settings . setUseSSEFlag ( true ) ;
166
+ } ) ;
167
+ }
168
+
169
+ export function stopSSEListener ( ) {
170
+ // No active SSE connection to stop
171
+ if ( ! sseSource ) {
172
+ return ;
173
+ }
174
+
175
+ // Close the connection and clean up
176
+ log ( "Stopping SSE queue listener..." ) ;
177
+ sseSource . close ( ) ;
178
+ sseSource = null ;
179
+
180
+ // Update settings to reflect disconnected state
181
+ settings . setUseSSEFlag ( false ) ;
182
+ }
0 commit comments