Skip to content

Commit

Permalink
Random tests (#308)
Browse files Browse the repository at this point in the history
  • Loading branch information
teivah authored Apr 6, 2021
1 parent a24c4a2 commit ad3b00e
Show file tree
Hide file tree
Showing 64 changed files with 2,827 additions and 4,754 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ jobs:
uses: golangci/[email protected]
with:
version: v1.29
- name: Test
run: go clean -testcache ./... && go test -race -timeout 10s ./...
- name: test
run: make test
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
test:
go clean -testcache ./...
go test -race -timeout 10s ./... --tags=all
go test -timeout 10s -run TestLeak
19 changes: 11 additions & 8 deletions factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
// to emit an item or notification.
func Amb(observables []Observable, opts ...Option) Observable {
option := parseOptions(opts...)
ctx := option.buildContext()
ctx := option.buildContext(emptyContext)
next := option.buildChannel()
once := sync.Once{}

Expand Down Expand Up @@ -65,7 +65,7 @@ func Amb(observables []Observable, opts ...Option) Observable {
// and emit items based on the results of this function.
func CombineLatest(f FuncN, observables []Observable, opts ...Option) Observable {
option := parseOptions(opts...)
ctx := option.buildContext()
ctx := option.buildContext(emptyContext)
next := option.buildChannel()

go func() {
Expand Down Expand Up @@ -130,7 +130,7 @@ func CombineLatest(f FuncN, observables []Observable, opts ...Option) Observable
// Concat emits the emissions from two or more Observables without interleaving them.
func Concat(observables []Observable, opts ...Option) Observable {
option := parseOptions(opts...)
ctx := option.buildContext()
ctx := option.buildContext(emptyContext)
next := option.buildChannel()

go func() {
Expand Down Expand Up @@ -186,7 +186,10 @@ func Empty() Observable {

// FromChannel creates a cold observable from a channel.
func FromChannel(next <-chan Item, opts ...Option) Observable {
option := parseOptions(opts...)
ctx := option.buildContext(emptyContext)
return &ObservableImpl{
parent: ctx,
iterable: newChannelIterable(next, opts...),
}
}
Expand All @@ -196,7 +199,7 @@ func FromEventSource(next <-chan Item, opts ...Option) Observable {
option := parseOptions(opts...)

return &ObservableImpl{
iterable: newEventSourceIterable(option.buildContext(), next, option.getBackPressureStrategy()),
iterable: newEventSourceIterable(option.buildContext(emptyContext), next, option.getBackPressureStrategy()),
}
}

Expand All @@ -205,7 +208,7 @@ func FromEventSource(next <-chan Item, opts ...Option) Observable {
func Interval(interval Duration, opts ...Option) Observable {
option := parseOptions(opts...)
next := option.buildChannel()
ctx := option.buildContext()
ctx := option.buildContext(emptyContext)

go func() {
i := 0
Expand Down Expand Up @@ -246,7 +249,7 @@ func JustItem(item interface{}, opts ...Option) Single {
// Merge combines multiple Observables into one by merging their emissions
func Merge(observables []Observable, opts ...Option) Observable {
option := parseOptions(opts...)
ctx := option.buildContext()
ctx := option.buildContext(emptyContext)
next := option.buildChannel()
wg := sync.WaitGroup{}
wg.Add(len(observables))
Expand Down Expand Up @@ -311,7 +314,7 @@ func Range(start, count int, opts ...Option) Observable {
func Start(fs []Supplier, opts ...Option) Observable {
option := parseOptions(opts...)
next := option.buildChannel()
ctx := option.buildContext()
ctx := option.buildContext(emptyContext)

go func() {
defer close(next)
Expand Down Expand Up @@ -343,7 +346,7 @@ func Thrown(err error) Observable {
func Timer(d Duration, opts ...Option) Observable {
option := parseOptions(opts...)
next := make(chan Item, 1)
ctx := option.buildContext()
ctx := option.buildContext(emptyContext)

go func() {
defer close(next)
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ module github.com/reactivex/rxgo/v2
go 1.13

require (
github.com/cenkalti/backoff/v4 v4.1.0
github.com/cenkalti/backoff/v4 v4.0.0
github.com/emirpasic/gods v1.12.0
github.com/stretchr/testify v1.7.0
github.com/stretchr/testify v1.4.0
github.com/teivah/onecontext v0.0.0-20200513185103-40f981bfd775
go.uber.org/goleak v1.1.10
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
)
14 changes: 8 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/cenkalti/backoff/v4 v4.1.0 h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm4fSc=
github.com/cenkalti/backoff/v4 v4.1.0/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/cenkalti/backoff/v4 v4.0.0 h1:6VeaLF9aI+MAUQ95106HwWzYZgJJpZ4stumjj6RFYAU=
github.com/cenkalti/backoff/v4 v4.0.0/go.mod h1:eEew/i+1Q6OrCDZh3WiXYv3+nJwBASZ8Bog/87DQnVg=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg=
Expand All @@ -13,9 +13,11 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/teivah/onecontext v0.0.0-20200513185103-40f981bfd775 h1:BLNsFR8l/hj/oGjnJXkd4Vi3s4kQD3/3x8HSAE4bzN0=
github.com/teivah/onecontext v0.0.0-20200513185103-40f981bfd775/go.mod h1:XUZ4x3oGhWfiOnUvTslnKKs39AWUct3g3yJvXTQSJOQ=
go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand All @@ -32,9 +34,9 @@ golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11 h1:Yq9t9jnGoR+dBuitxdo9l6Q7xh/zOyNnYUtDKaQ3x0E=
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
2 changes: 1 addition & 1 deletion iterable_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (i *channelIterable) Observe(opts ...Option) <-chan Item {
}

if option.isConnectOperation() {
i.connect(option.buildContext())
i.connect(option.buildContext(emptyContext))
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions iterable_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type createIterable struct {
func newCreateIterable(fs []Producer, opts ...Option) Iterable {
option := parseOptions(opts...)
next := option.buildChannel()
ctx := option.buildContext()
ctx := option.buildContext(emptyContext)

go func() {
defer close(next)
Expand All @@ -40,7 +40,7 @@ func (i *createIterable) Observe(opts ...Option) <-chan Item {
}

if option.isConnectOperation() {
i.connect(option.buildContext())
i.connect(option.buildContext(emptyContext))
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion iterable_defer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func newDeferIterable(f []Producer, opts ...Option) Iterable {
func (i *deferIterable) Observe(opts ...Option) <-chan Item {
option := parseOptions(append(i.opts, opts...)...)
next := option.buildChannel()
ctx := option.buildContext()
ctx := option.buildContext(emptyContext)

go func() {
defer close(next)
Expand Down
2 changes: 1 addition & 1 deletion iterable_just.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ func (i *justIterable) Observe(opts ...Option) <-chan Item {
option := parseOptions(append(i.opts, opts...)...)
next := option.buildChannel()

go SendItems(option.buildContext(), next, CloseChannel, i.items)
go SendItems(option.buildContext(emptyContext), next, CloseChannel, i.items)
return next
}
2 changes: 1 addition & 1 deletion iterable_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func newRangeIterable(start, count int, opts ...Option) Iterable {

func (i *rangeIterable) Observe(opts ...Option) <-chan Item {
option := parseOptions(append(i.opts, opts...)...)
ctx := option.buildContext()
ctx := option.buildContext(emptyContext)
next := option.buildChannel()

go func() {
Expand Down
2 changes: 1 addition & 1 deletion iterable_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func newSliceIterable(items []Item, opts ...Option) Iterable {
func (i *sliceIterable) Observe(opts ...Option) <-chan Item {
option := parseOptions(append(i.opts, opts...)...)
next := option.buildChannel()
ctx := option.buildContext()
ctx := option.buildContext(emptyContext)

go func() {
for _, item := range i.items {
Expand Down
41 changes: 19 additions & 22 deletions observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type Observable interface {

// ObservableImpl implements Observable.
type ObservableImpl struct {
parent context.Context
iterable Iterable
}

Expand All @@ -100,22 +101,19 @@ func defaultErrorFuncOperator(ctx context.Context, item Item, dst chan<- Item, o
operatorOptions.stop()
}

func customObservableOperator(f func(ctx context.Context, next chan Item, option Option, opts ...Option), opts ...Option) Observable {
func customObservableOperator(parent context.Context, f func(ctx context.Context, next chan Item, option Option, opts ...Option), opts ...Option) Observable {
option := parseOptions(opts...)
next := option.buildChannel()
ctx := option.buildContext(parent)

if option.isEagerObservation() {
next := option.buildChannel()
ctx := option.buildContext()
go f(ctx, next, option, opts...)
return &ObservableImpl{iterable: newChannelIterable(next)}
}

return &ObservableImpl{
iterable: newFactoryIterable(func(propagatedOptions ...Option) <-chan Item {
mergedOptions := append(opts, propagatedOptions...)
option := parseOptions(mergedOptions...)
next := option.buildChannel()
ctx := option.buildContext()
go f(ctx, next, option, mergedOptions...)
return next
}),
Expand All @@ -129,13 +127,13 @@ type operator interface {
gatherNext(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions)
}

func observable(iterable Iterable, operatorFactory func() operator, forceSeq, bypassGather bool, opts ...Option) Observable {
func observable(parent context.Context, iterable Iterable, operatorFactory func() operator, forceSeq, bypassGather bool, opts ...Option) Observable {
option := parseOptions(opts...)
parallel, _ := option.getPool()

if option.isEagerObservation() {
next := option.buildChannel()
ctx := option.buildContext()
ctx := option.buildContext(parent)
if forceSeq || !parallel {
runSequential(ctx, next, iterable, operatorFactory, option, opts...)
} else {
Expand All @@ -151,7 +149,7 @@ func observable(iterable Iterable, operatorFactory func() operator, forceSeq, by
option := parseOptions(mergedOptions...)

next := option.buildChannel()
ctx := option.buildContext()
ctx := option.buildContext(parent)
runSequential(ctx, next, iterable, operatorFactory, option, mergedOptions...)
return next
}),
Expand All @@ -167,7 +165,7 @@ func observable(iterable Iterable, operatorFactory func() operator, forceSeq, by
option := parseOptions(mergedOptions...)

next := option.buildChannel()
ctx := option.buildContext()
ctx := option.buildContext(parent)
observe := iterable.Observe(opts...)
go func() {
select {
Expand All @@ -186,7 +184,7 @@ func observable(iterable Iterable, operatorFactory func() operator, forceSeq, by
return next
}),
}
return obs.serialize(fromCh, f)
return obs.serialize(parent, fromCh, f)
}

return &ObservableImpl{
Expand All @@ -195,20 +193,20 @@ func observable(iterable Iterable, operatorFactory func() operator, forceSeq, by
option := parseOptions(mergedOptions...)

next := option.buildChannel()
ctx := option.buildContext()
ctx := option.buildContext(parent)
runParallel(ctx, next, iterable.Observe(mergedOptions...), operatorFactory, bypassGather, option, mergedOptions...)
return next
}),
}
}

func single(iterable Iterable, operatorFactory func() operator, forceSeq, bypassGather bool, opts ...Option) Single {
func single(parent context.Context, iterable Iterable, operatorFactory func() operator, forceSeq, bypassGather bool, opts ...Option) Single {
option := parseOptions(opts...)
parallel, _ := option.getPool()
next := option.buildChannel()
ctx := option.buildContext(parent)

if option.isEagerObservation() {
next := option.buildChannel()
ctx := option.buildContext()
if forceSeq || !parallel {
runSequential(ctx, next, iterable, operatorFactory, option, opts...)
} else {
Expand All @@ -222,8 +220,6 @@ func single(iterable Iterable, operatorFactory func() operator, forceSeq, bypass
mergedOptions := append(opts, propagatedOptions...)
option = parseOptions(mergedOptions...)

next := option.buildChannel()
ctx := option.buildContext()
if forceSeq || !parallel {
runSequential(ctx, next, iterable, operatorFactory, option, mergedOptions...)
} else {
Expand All @@ -234,13 +230,13 @@ func single(iterable Iterable, operatorFactory func() operator, forceSeq, bypass
}
}

func optionalSingle(iterable Iterable, operatorFactory func() operator, forceSeq, bypassGather bool, opts ...Option) OptionalSingle {
func optionalSingle(parent context.Context, iterable Iterable, operatorFactory func() operator, forceSeq, bypassGather bool, opts ...Option) OptionalSingle {
option := parseOptions(opts...)
ctx := option.buildContext(parent)
parallel, _ := option.getPool()

if option.isEagerObservation() {
next := option.buildChannel()
ctx := option.buildContext()
if forceSeq || !parallel {
runSequential(ctx, next, iterable, operatorFactory, option, opts...)
} else {
Expand All @@ -250,12 +246,13 @@ func optionalSingle(iterable Iterable, operatorFactory func() operator, forceSeq
}

return &OptionalSingleImpl{
parent: ctx,
iterable: newFactoryIterable(func(propagatedOptions ...Option) <-chan Item {
mergedOptions := append(opts, propagatedOptions...)
option = parseOptions(mergedOptions...)

next := option.buildChannel()
ctx := option.buildContext()
ctx := option.buildContext(parent)
if forceSeq || !parallel {
runSequential(ctx, next, iterable, operatorFactory, option, mergedOptions...)
} else {
Expand Down Expand Up @@ -423,11 +420,11 @@ func runFirstItem(ctx context.Context, f func(interface{}) int, notif chan Item,
}()
}

func (o *ObservableImpl) serialize(fromCh chan Item, identifier func(interface{}) int, opts ...Option) Observable {
func (o *ObservableImpl) serialize(parent context.Context, fromCh chan Item, identifier func(interface{}) int, opts ...Option) Observable {
option := parseOptions(opts...)
next := option.buildChannel()

ctx := option.buildContext()
ctx := option.buildContext(parent)
minHeap := binaryheap.NewWith(func(a, b interface{}) int {
return a.(int) - b.(int)
})
Expand Down
Loading

0 comments on commit ad3b00e

Please sign in to comment.