Skip to content

Commit 7efa5f1

Browse files
committed
chore: Hook up flag change listeners to FDv2
1 parent 6973749 commit 7efa5f1

10 files changed

+794
-56
lines changed

internal/datasourcev2/streaming_data_source_test.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,7 @@ func TestStreamingProcessorDoesNotUseConfiguredTimeoutAsReadTimeout(t *testing.T
115115
}}).
116116
WithPutObjects(ldservicesv2.NewServerSDKData().ToPutObjects()).
117117
WithTransferred(1)
118-
streamHandler, stream := ldservices.ServerSideStreamingV2ServiceHandler(protocol.Next())
119-
protocol.Enqueue(stream)
118+
streamHandler, _ := ldservices.ServerSideStreamingV2ServiceProtocolHandler(protocol)
120119

121120
dd := mocks.NewMockDataDestination(datastore.NewInMemoryDataStore(sharedtest.NewTestLoggers()))
122121

@@ -154,7 +153,12 @@ func TestStreamProcessorRecoverableErrorsCauseStreamRestart(t *testing.T) {
154153
expectRestart := func(t *testing.T, p streamingTestParams) {
155154
// Allow time for a reconnect (which sends the initial payload (server-intent), then we can queue up the transferred event.
156155
<-time.After(300 * time.Millisecond)
157-
p.protocol.WithTransferred(1).Enqueue(p.stream)
156+
p.protocol.WithIntent(fdv2proto.ServerIntent{
157+
Payload: fdv2proto.Payload{
158+
ID: "fake-id", Target: 0, Code: "none", Reason: "caughtup",
159+
}}).
160+
WithTransferred(1).
161+
Enqueue(p.stream)
158162

159163
<-p.requests
160164
th.RequireValue(t, p.requests, time.Millisecond*300, "expected stream restart, did not see one")
@@ -251,8 +255,7 @@ func runStreamingTest(
251255
WithTransferred(1)
252256

253257
events := make(chan eventsource.Event, 1000)
254-
streamHandler, stream := ldservices.ServerSideStreamingV2ServiceHandler(protocol.Next())
255-
protocol.Enqueue(stream)
258+
streamHandler, stream := ldservices.ServerSideStreamingV2ServiceProtocolHandler(protocol)
256259

257260
handler, requestsCh := httphelpers.RecordingHandler(streamHandler)
258261

@@ -307,8 +310,7 @@ func testStreamProcessorRecoverableHTTPError(t *testing.T, statusCode int) {
307310
}}).
308311
WithPutObjects(data.ToPutObjects()).
309312
WithTransferred(1)
310-
streamHandler, streamSender := ldservices.ServerSideStreamingV2ServiceHandler(protocol.Next())
311-
protocol.Enqueue(streamSender)
313+
streamHandler, _ := ldservices.ServerSideStreamingV2ServiceProtocolHandler(protocol)
312314

313315
sequentialHandler := httphelpers.SequentialHandler(
314316
httphelpers.HandlerWithStatus(statusCode), // fails the first time
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package datasystem
2+
3+
import (
4+
"github.com/launchdarkly/go-server-sdk/v7/internal/toposort"
5+
st "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes"
6+
)
7+
8+
// Maintains a bidirectional dependency graph that can be updated whenever an item has changed.
9+
//
10+
// This type is duplicated from internal.datasource. This duplication will be
11+
// removed when FDv1 is removed.
12+
type dependencyTracker struct {
13+
dependenciesFrom toposort.AdjacencyList
14+
dependenciesTo toposort.AdjacencyList
15+
}
16+
17+
func newDependencyTracker() *dependencyTracker {
18+
return &dependencyTracker{
19+
make(toposort.AdjacencyList),
20+
make(toposort.AdjacencyList),
21+
}
22+
}
23+
24+
// Updates the dependency graph when an item has changed.
25+
func (d *dependencyTracker) updateDependenciesFrom(
26+
kind st.DataKind,
27+
fromKey string,
28+
fromItem st.ItemDescriptor,
29+
) {
30+
fromWhat := toposort.NewVertex(kind, fromKey)
31+
updatedDependencies := toposort.GetNeighbors(kind, fromItem)
32+
33+
oldDependencySet := d.dependenciesFrom[fromWhat]
34+
for oldDep := range oldDependencySet {
35+
depsToThisOldDep := d.dependenciesTo[oldDep]
36+
if depsToThisOldDep != nil {
37+
delete(depsToThisOldDep, fromWhat)
38+
}
39+
}
40+
41+
d.dependenciesFrom[fromWhat] = updatedDependencies
42+
for newDep := range updatedDependencies {
43+
depsToThisNewDep := d.dependenciesTo[newDep]
44+
if depsToThisNewDep == nil {
45+
depsToThisNewDep = make(toposort.Neighbors)
46+
d.dependenciesTo[newDep] = depsToThisNewDep
47+
}
48+
depsToThisNewDep.Add(fromWhat)
49+
}
50+
}
51+
52+
func (d *dependencyTracker) reset() {
53+
d.dependenciesFrom = make(toposort.AdjacencyList)
54+
d.dependenciesTo = make(toposort.AdjacencyList)
55+
}
56+
57+
// Populates the given set with the union of the initial item and all items that directly or indirectly
58+
// depend on it (based on the current state of the dependency graph).
59+
func (d *dependencyTracker) addAffectedItems(itemsOut toposort.Neighbors, initialModifiedItem toposort.Vertex) {
60+
if !itemsOut.Contains(initialModifiedItem) {
61+
itemsOut.Add(initialModifiedItem)
62+
affectedItems := d.dependenciesTo[initialModifiedItem]
63+
for affectedItem := range affectedItems {
64+
d.addAffectedItems(itemsOut, affectedItem)
65+
}
66+
}
67+
}

0 commit comments

Comments
 (0)