forked from vcavallo/nostr-hypermedia
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsingleflight.go
More file actions
218 lines (179 loc) · 5.99 KB
/
singleflight.go
File metadata and controls
218 lines (179 loc) · 5.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
package main
import (
"log/slog"
"strings"
"golang.org/x/sync/singleflight"
"nostr-server/internal/types"
"nostr-server/internal/util"
)
// Singleflight groups for deduplicating concurrent requests.
// When multiple goroutines request the same data simultaneously,
// only one actually fetches while others wait and share the result.
var (
relayListGroup singleflight.Group
reactionsGroup singleflight.Group
replyCountsGroup singleflight.Group
)
// buildBatchKey creates a stable key for singleflight deduplication.
// Sorts both slices to ensure identical batches produce identical keys.
func buildBatchKey(prefix string, relays, ids []string) string {
sortedRelays := util.SortedCopy(relays)
sortedIDs := util.SortedCopy(ids)
return prefix + ":" + strings.Join(sortedRelays, "|") + ":" + strings.Join(sortedIDs, ",")
}
// fetchRelayList fetches a relay list with singleflight deduplication.
// If multiple goroutines request the same pubkey simultaneously,
// only one fetch occurs and all share the result.
func fetchRelayList(pubkey string) *types.RelayList {
// Check cache first (avoid singleflight overhead for cache hits)
if relayList, notFound, ok := relayListCache.Get(pubkey); ok {
IncrementCacheHit()
if notFound {
return nil
}
return relayList
}
// Use singleflight for cache misses
result, _, shared := relayListGroup.Do(pubkey, func() (interface{}, error) {
return fetchRelayListDirect(pubkey), nil
})
if shared {
slog.Debug("singleflight: shared relay list fetch", "pubkey", shortID(pubkey))
}
if result == nil {
return nil
}
return result.(*types.RelayList)
}
// fetchRelayLists fetches relay lists for multiple pubkeys with singleflight.
// Uses per-pubkey singleflight to deduplicate overlapping concurrent requests.
func fetchRelayLists(pubkeys []string) map[string]*types.RelayList {
if len(pubkeys) == 0 {
return nil
}
// Check cache first
cached, missing := relayListCache.GetMultiple(pubkeys)
if len(missing) == 0 {
IncrementCacheHit()
return cached
}
IncrementCacheMiss()
// Fetch missing pubkeys with per-pubkey singleflight (parallel)
freshLists := fetchRelayListsUncached(missing)
// Merge results
result := make(map[string]*types.RelayList, len(cached)+len(freshLists))
for pk, rl := range cached {
result[pk] = rl
}
for pk, rl := range freshLists {
result[pk] = rl
}
return result
}
// fetchRelayListsUncached fetches relay lists for pubkeys without checking cache.
// Uses batcher for efficient batching of concurrent requests.
// Used internally by fetchRelayLists and warmRelayListsAsync.
func fetchRelayListsUncached(pubkeys []string) map[string]*types.RelayList {
if len(pubkeys) == 0 {
return nil
}
// Use batcher if available - it batches concurrent requests
if relayListBatcher != nil {
return relayListBatcher.GetMultiple(pubkeys)
}
// Fallback: direct batch fetch
return fetchRelayListsBatch(pubkeys)
}
// warmRelayListsAsync pre-fetches relay lists for pubkeys in the background.
// This is fire-and-forget - it populates the cache for future requests.
// Processes in batches to avoid overwhelming relays.
func warmRelayListsAsync(pubkeys []string) {
if len(pubkeys) == 0 {
return
}
// Check which ones are missing from cache
_, missing := relayListCache.GetMultiple(pubkeys)
if len(missing) == 0 {
return
}
go func() {
const batchSize = 50
for i := 0; i < len(missing); i += batchSize {
end := i + batchSize
if end > len(missing) {
end = len(missing)
}
batch := missing[i:end]
fetchRelayListsUncached(batch)
}
slog.Debug("warmed relay lists", "count", len(missing))
}()
}
// fetchProfiles fetches kind 0 profiles with singleflight deduplication.
func fetchProfiles(relays []string, pubkeys []string) map[string]*ProfileInfo {
return fetchProfilesWithOptions(relays, pubkeys, false)
}
// fetchProfilesWithOptions fetches profiles with request coalescing.
// Uses a batcher to collect requests over a time window and merge overlapping keys.
// This is more efficient than singleflight for overlapping (not just identical) requests.
func fetchProfilesWithOptions(relays []string, pubkeys []string, cacheOnly bool) map[string]*ProfileInfo {
if len(pubkeys) == 0 {
return nil
}
// Check cache first
cached, missing := profileCache.GetMultiple(pubkeys)
if len(missing) == 0 {
IncrementCacheHit()
return cached
}
if cacheOnly {
IncrementCacheMiss()
return cached
}
IncrementCacheMiss()
// Use batcher for cache misses - it will collect and merge with other concurrent requests
var freshProfiles map[string]*ProfileInfo
if profileBatcher != nil {
freshProfiles = profileBatcher.GetMultiple(missing)
} else {
// Fallback if batcher not initialized
freshProfiles = fetchProfilesWithOptionsDirect(relays, missing, false)
}
// Merge cached and fresh
finalResult := make(map[string]*ProfileInfo, len(cached)+len(freshProfiles))
for pk, p := range cached {
finalResult[pk] = p
}
for pk, p := range freshProfiles {
finalResult[pk] = p
}
return finalResult
}
// fetchReactions fetches reactions with singleflight deduplication.
func fetchReactions(relays []string, eventIDs []string) map[string]*ReactionsSummary {
if len(eventIDs) == 0 {
return nil
}
batchKey := buildBatchKey("reactions", relays, eventIDs)
result, _, shared := reactionsGroup.Do(batchKey, func() (interface{}, error) {
return fetchReactionsDirect(relays, eventIDs), nil
})
if shared {
slog.Debug("singleflight: shared reactions fetch", "count", len(eventIDs))
}
return result.(map[string]*ReactionsSummary)
}
// fetchReplyCounts fetches reply counts with singleflight deduplication.
func fetchReplyCounts(relays []string, eventIDs []string) map[string]int {
if len(eventIDs) == 0 {
return nil
}
batchKey := buildBatchKey("replies", relays, eventIDs)
result, _, shared := replyCountsGroup.Do(batchKey, func() (interface{}, error) {
return fetchReplyCountsDirect(relays, eventIDs), nil
})
if shared {
slog.Debug("singleflight: shared reply counts fetch", "count", len(eventIDs))
}
return result.(map[string]int)
}