Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reactive Subjects #408

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# RxGo
# RxGo (Fork with Reactive Subjects)
![CI](https://github.com/ReactiveX/RxGo/actions/workflows/ci.yml/badge.svg)
[![Go Report Card](https://goreportcard.com/badge/github.com/reactivex/rxgo)](https://goreportcard.com/report/github.com/reactivex/rxgo)
[![Join the chat at https://gitter.im/ReactiveX/RxGo](https://badges.gitter.im/ReactiveX/RxGo.svg)](https://gitter.im/ReactiveX/RxGo?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
Expand Down Expand Up @@ -502,6 +502,9 @@ How to use the [assert API](doc/assert.md) to write unit tests while using RxGo.
* [Errors](doc/errors.md) — return all the errors thrown by an observable
* [ToMap](doc/tomap.md)/[ToMapWithValueSelector](doc/tomapwithvalueselector.md)/[ToSlice](doc/toslice.md) — convert an Observable into another object or data structure

## Subjects
This Fork contains an implementation of Reactive Subjects. Details see [Subjects](doc/subjects.md).

## Contributing

All contributions are very welcome! Be sure you check out the [contributing guidelines](CONTRIBUTING.md) first. Newcomers can take a look at ongoing issues and check for the `help needed` label.
Expand Down
50 changes: 50 additions & 0 deletions behaviorsubject.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package rxgo

import (
"sync"
)

// BehaviorSubject subject which returns the last received item to new subscribers
type BehaviorSubject struct {
Subject
lastValue interface{}
lastValueLock sync.Mutex
}

// NewBehaviorSubject Creates a new behavior subject
func NewBehaviorSubject(opts ...Option) *BehaviorSubject {
res := BehaviorSubject{
Subject: *NewSubject(opts...), // subscriber must be able to receive last item and new items
lastValueLock: sync.Mutex{},
}

return &res
}

// Next shadows base next function to capture the last item.
func (s *BehaviorSubject) Next(value interface{}) {
s.lastValueLock.Lock()
defer s.lastValueLock.Unlock()

s.lastValue = value

s.Subject.Next(value)
}

// Subscribe shadows base subscribe function to replay the last captured item.
func (s *BehaviorSubject) Subscribe() (Subscription, Observable) {
s.Lock()
defer s.Unlock()

// create buffered channel to hold last item
sub, obs := s.createSubscription(1)
subChan := s.subscribers[sub.GetId()]

if s.lastValue != nil {
s.lastValueLock.Lock()
subChan <- Of(s.lastValue)
s.lastValueLock.Unlock()
}

return sub, obs
}
36 changes: 36 additions & 0 deletions behaviorsubject_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package rxgo

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

// TestBehaviorSubject verifies that two subscribers started at different times receive the same result
func TestBehaviorSubject(t *testing.T) {
subject := NewBehaviorSubject()

// obs1 no last value
_, obs1 := subject.Subscribe()
values1 := make([]int, 0)
obs1.DoOnNext(func(i interface{}) {
values1 = append(values1, i.(int))
})

subject.Next(1)

// obs2 receives last value and new
_, obs2 := subject.Subscribe()
values2 := make([]int, 0)
obs2.DoOnNext(func(i interface{}) {
values2 = append(values2, i.(int))
})

subject.Next(2)

time.Sleep(10 * time.Millisecond)

assert.Equal(t, []int{1, 2}, values1)
assert.Equal(t, []int{1, 2}, values2)
}
44 changes: 44 additions & 0 deletions doc/subjects.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
## Subjects
A detailed description of Subjects can be found here [Subjects](https://reactivex.io/documentation/subject.html).

Available Subject types:
* Subject - a simple fan-out with the ability to subscribe and unsubscribe any time
* BehaviorSubject - a subject which replays the last published item to every new subscriber
* ReplaySubject - a subject which replays the last n published items to every new subscriber

### Design
Subjects are created with a set of Observable options. Every subject subscriber receives a Subscription and an Observable Object. The Subscription can be used to unsubscribe from the Subject. The Observable is used to receive items from the Subject. Each Observable is a cold Observable with its own event source channel.

> [!NOTE]
> Even though Subjects accept all options to create new Subscriber Observables, not all combinations make sense. For example, because each observer has its own Observable there is no point using connectable Observer options.

### Simple Subject Example
The code below shows a simple Subject example:
```go
// new subject with default options
subject := NewSubject()

sub, obs := subject.Subscribe()
obs.DoOnNext(func(i interface{}) {
// handle items
})

// call unsubscribe when done
sub.Unsubscribe()
```

### Subject with BackPressure Strategy
By default a slow Subscriber would block all other Subscribers. This can be changed by creating Subscribers with BackPressure Strategy Drop:
```go
subject := NewSubject(WithBackPressureStrategy(Drop))
```

### Behavior and Replay Subject Design
Both Behavior and Replay Subjects publish one or more stored items to new subscribers before publishing new items. To achieve this, these subjects use buffered Go channels to create hot Observables. The buffer size equals to max number of replay items. This is to ensure that new Observers can create Subscriptions without a deadlock or blocking out other Subscribers. The entire Subject will be locked until a new Subscriber consumed all replay items.

> [!NOTE]
> Even though Behavior and Replay Subjects accept all options to create new Subscriber Observables, not all combinations make sense. BackPressure strategy Drop should only used with care.

### Replay Subject Construction
The ReplaySubject constructor has an additional parameter "maxReplayItems". This parameter controls how many items are held in buffer for new subscribers.

40 changes: 40 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,17 +1,26 @@
github.com/cenkalti/backoff/v4 v4.1.1 h1:G2HAfAmvm/GcKan2oOQpBXOd2tT2G57ZnZGWa1PxPBQ=
github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg=
github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
github.com/frankban/quicktest v1.14.4/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
Expand All @@ -23,35 +32,64 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
github.com/teivah/onecontext v0.0.0-20200513185103-40f981bfd775 h1:BLNsFR8l/hj/oGjnJXkd4Vi3s4kQD3/3x8HSAE4bzN0=
github.com/teivah/onecontext v0.0.0-20200513185103-40f981bfd775/go.mod h1:XUZ4x3oGhWfiOnUvTslnKKs39AWUct3g3yJvXTQSJOQ=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.10.0 h1:lFO9qtOdlre5W1jxS3r/4szv2/6iXxScdzjoBMXNhYk=
golang.org/x/mod v0.10.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007 h1:gG67DSER+11cZvqIMb8S8bt0vZtiN6xWYARwirrOSfE=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU=
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/tools v0.8.0 h1:vSDcovVPld282ceKgDimkRSC8kpaH1dgyc9UMzlt84Y=
golang.org/x/tools v0.8.0/go.mod h1:JxBZ99ISMI5ViVkT1tr6tdNmXeTrcpVSD3vZ1RsRdN4=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
Expand All @@ -62,3 +100,5 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
mvdan.cc/gofumpt v0.5.0 h1:0EQ+Z56k8tXjj/6TQD25BFNKQXpCvT0rnansIc7Ug5E=
mvdan.cc/gofumpt v0.5.0/go.mod h1:HBeVDtMKRZpXyxFciAirzdKklDlGu8aAy1wEbH5Y9js=
64 changes: 64 additions & 0 deletions replaysubject.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package rxgo

import (
"container/list"
"sync"
)

// ReplaySubject subject which replays the last received items to new subscribers
type ReplaySubject struct {
Subject
buffer *list.List
bufferLock sync.Mutex
maxReplayItems int
}

// NewReplaySubject creates a new replay subject
func NewReplaySubject(maxReplayItems int, opts ...Option) *ReplaySubject {
res := ReplaySubject{
Subject: *NewSubject(opts...), // subscriber must be able to received current buffer and new items
maxReplayItems: maxReplayItems,
buffer: list.New(),
bufferLock: sync.Mutex{},
}

return &res
}

// Next shadows base next function to capture the item history
func (s *ReplaySubject) Next(value interface{}) {
s.bufferLock.Lock()
defer s.bufferLock.Unlock()

// add to buffer
s.buffer.PushBack(value)
// check for max length
if s.buffer.Len() > s.maxReplayItems {
// remove oldest item at the front
s.buffer.Remove(s.buffer.Front())
}

s.Subject.Next(value)
}

// Subscribe shadows base subscribe function to replay the item history
func (s *ReplaySubject) Subscribe() (Subscription, Observable) {
s.Lock()
defer s.Unlock()

// create buffered channel to hold all current replay items
sub, obs := s.createSubscription(s.buffer.Len())
subChan := s.subscribers[sub.GetId()]

s.bufferLock.Lock()
defer s.bufferLock.Unlock()

// replay buffered items
elem := s.buffer.Front()
for elem != nil {
subChan <- Of(elem.Value)
elem = elem.Next()
}

return sub, obs
}
63 changes: 63 additions & 0 deletions replaysubject_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package rxgo

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

// TestReplaySubject verifies that a new subscriber receives the entire history
func TestReplaySubject(t *testing.T) {
subject := NewReplaySubject(10)

// load buffer
for i := 0; i < 3; i++ {
subject.Next(i)
}

_, obs := subject.Subscribe()

values := make([]int, 0)
obs.DoOnNext(func(i interface{}) {
values = append(values, i.(int))
})

// add more
for i := 3; i < 5; i++ {
subject.Next(i)
// slow down to let subscriber read from buffer
time.Sleep(10 * time.Millisecond)
}

assert.Equal(t, []int{0, 1, 2, 3, 4}, values)
fmt.Printf("values: %v", values)
}

// TestMaxItemsReplay verifies only the last n elements are kept in replay buffer
func TestMaxItemsReplay(t *testing.T) {
subject := NewReplaySubject(2)

// load buffer, expect to keep 2,3 in buffer
for i := 0; i < 4; i++ {
subject.Next(i)
}

_, obs := subject.Subscribe()

values := make([]int, 0)
obs.DoOnNext(func(i interface{}) {
values = append(values, i.(int))
})

// add more
for i := 4; i < 6; i++ {
subject.Next(i)
// slow down to let subscriber read from buffer
time.Sleep(10 * time.Millisecond)
}

assert.Equal(t, []int{2, 3, 4, 5}, values)
fmt.Printf("values: %v", values)
}
Loading