Skip to content

Commit aa2ff02

Browse files
Harden SSE store publish semantics under slow subscribers
1 parent d3b61f9 commit aa2ff02

File tree

2 files changed

+112
-7
lines changed

2 files changed

+112
-7
lines changed

cli/internal/server/store.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -362,10 +362,10 @@ func (s *Store) publish(event Event) {
362362
s.subsMu.RUnlock()
363363

364364
for _, ch := range global {
365-
nonBlockingSend(ch, event)
365+
sendWithBackpressure(ch, event)
366366
}
367367
for _, ch := range session {
368-
nonBlockingSend(ch, event)
368+
sendWithBackpressure(ch, event)
369369
}
370370
}
371371

@@ -405,11 +405,8 @@ func (s *Store) persistEventLocked(event Event) {
405405
}
406406
}
407407

408-
func nonBlockingSend(ch chan Event, event Event) {
409-
select {
410-
case ch <- event:
411-
default:
412-
}
408+
func sendWithBackpressure(ch chan Event, event Event) {
409+
ch <- event
413410
}
414411

415412
func needsAttention(annotation Annotation) bool {
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package server
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
"time"
7+
)
8+
9+
func TestStoreSessionSubscriptionAppliesBackpressure(t *testing.T) {
10+
t.Setenv("AGENTATION_STORE", "memory")
11+
store := NewStore()
12+
session := store.CreateSession("http://example.com", "")
13+
14+
events, unsubscribe := store.SubscribeSession(session.ID)
15+
defer unsubscribe()
16+
17+
const burstCount = 96
18+
publishDone, publishErr := publishAnnotationsAsync(store, session.ID, burstCount)
19+
20+
select {
21+
case <-publishDone:
22+
t.Fatal("expected publisher to block once subscriber buffer is full")
23+
case <-time.After(50 * time.Millisecond):
24+
}
25+
26+
received := drainEvents(t, events, burstCount, 3*time.Second)
27+
if received != burstCount {
28+
t.Fatalf("drained %d events, want %d", received, burstCount)
29+
}
30+
31+
waitForPublishCompletion(t, publishDone, publishErr, 2*time.Second)
32+
}
33+
34+
func TestStoreGlobalSubscriptionDeliversBurstWithoutSilentDrop(t *testing.T) {
35+
t.Setenv("AGENTATION_STORE", "memory")
36+
store := NewStore()
37+
session := store.CreateSession("http://example.com", "")
38+
39+
events, unsubscribe := store.SubscribeAll()
40+
defer unsubscribe()
41+
42+
const burstCount = 96
43+
publishDone, publishErr := publishAnnotationsAsync(store, session.ID, burstCount)
44+
45+
time.Sleep(50 * time.Millisecond)
46+
received := drainEvents(t, events, burstCount, 3*time.Second)
47+
if received != burstCount {
48+
t.Fatalf("received %d events, want %d", received, burstCount)
49+
}
50+
51+
waitForPublishCompletion(t, publishDone, publishErr, 2*time.Second)
52+
}
53+
54+
func nonBlockingSend(ch chan Event, event Event) {
55+
select {
56+
case ch <- event:
57+
default:
58+
}
59+
}
60+
61+
func publishAnnotationsAsync(store *Store, sessionID string, count int) (<-chan struct{}, <-chan error) {
62+
done := make(chan struct{})
63+
errCh := make(chan error, 1)
64+
go func() {
65+
defer close(done)
66+
for i := range count {
67+
annotation := Annotation{
68+
Comment: fmt.Sprintf("annotation-%d", i),
69+
Element: "button",
70+
ElementPath: "body > button",
71+
}
72+
if _, ok := store.AddAnnotation(sessionID, annotation); !ok {
73+
errCh <- fmt.Errorf("AddAnnotation failed at index %d", i)
74+
return
75+
}
76+
}
77+
}()
78+
return done, errCh
79+
}
80+
81+
func drainEvents(t *testing.T, events <-chan Event, want int, timeout time.Duration) int {
82+
t.Helper()
83+
deadline := time.After(timeout)
84+
received := 0
85+
for received < want {
86+
select {
87+
case <-events:
88+
received++
89+
case <-deadline:
90+
t.Fatalf("timed out draining events: received=%d want=%d", received, want)
91+
}
92+
}
93+
return received
94+
}
95+
96+
func waitForPublishCompletion(t *testing.T, done <-chan struct{}, errCh <-chan error, timeout time.Duration) {
97+
t.Helper()
98+
select {
99+
case <-done:
100+
select {
101+
case err := <-errCh:
102+
t.Fatalf("publish failed: %v", err)
103+
default:
104+
}
105+
case <-time.After(timeout):
106+
t.Fatal("publisher did not complete after drain")
107+
}
108+
}

0 commit comments

Comments
 (0)