Skip to content

Commit

Permalink
Merge pull request #64 from RxSwiftCommunity/not-replay-execution
Browse files Browse the repository at this point in the history
Not replay executionObservables for execute()
  • Loading branch information
ashfurrow authored Dec 29, 2016
2 parents 0abe2a7 + f15eebb commit 6c13748
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 27 deletions.
30 changes: 18 additions & 12 deletions Sources/Action/Action.swift
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public final class Action<Input, Element> {
return Observable.empty()
}
}
.shareReplay(1)
.share()

elements = executionObservables
.flatMap { $0.catchError { _ in Observable.empty() } }
Expand Down Expand Up @@ -120,20 +120,26 @@ public final class Action<Input, Element> {

@discardableResult
public func execute(_ value: Input) -> Observable<Element> {
let subject = ReplaySubject<Element>.createUnbounded()

executionObservables
defer {
inputs.onNext(value)
}

let execution = executionObservables
.take(1)
.flatMap { $0.catchError { _ in Observable.never() } }
.bindTo(subject)
.addDisposableTo(disposeBag)
.flatMap { $0 }
.catchError { throw ActionError.underlyingError($0) }

errors
.map { throw $0 }
.bindTo(subject)
.addDisposableTo(disposeBag)
let notEnabledError = inputs
.takeUntil(executionObservables)
.withLatestFrom(enabled)
.flatMap { $0 ? Observable<Element>.empty() : Observable.error(ActionError.notEnabled) }

inputs.onNext(value)
let subject = ReplaySubject<Element>.createUnbounded()
Observable
.of(execution, notEnabledError)
.merge()
.subscribe(subject)
.addDisposableTo(disposeBag)

return subject.asObservable()
}
Expand Down
94 changes: 79 additions & 15 deletions Tests/ActionTests/ActionTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ class ActionTests: QuickSpec {
executionObservables = scheduler.createObserver(Observable<String>.self)
}

func bindAndExecute(action: Action<String, String>) {
func bindAndExecuteTwice(action: Action<String, String>) {
action.executionObservables
.bindTo(executionObservables)
.addDisposableTo(disposeBag)
Expand All @@ -385,80 +385,144 @@ class ActionTests: QuickSpec {
.addDisposableTo(disposeBag)
}

scheduler.scheduleAt(20) {
action.execute("b")
.bindTo(element)
.addDisposableTo(disposeBag)
}

scheduler.start()
}

context("single element action") {
beforeEach {
action = Action { Observable.just($0) }
bindAndExecute(action: action)
bindAndExecuteTwice(action: action)
}

it("element receives single value") {
it("element receives single value for each execution") {
XCTAssertEqual(element.events, [
next(10, "a"),
completed(10),
next(20, "b"),
completed(20),
])
}

it("executes once") {
expect(executionObservables.events.count) == 1
it("executes twice") {
expect(executionObservables.events.count) == 2
}
}

context("multiple element action") {
beforeEach {
action = Action { Observable.of($0, $0, $0) }
bindAndExecute(action: action)
bindAndExecuteTwice(action: action)
}

it("element receives mutiple values") {
it("element receives 3 values for each execution") {
XCTAssertEqual(element.events, [
next(10, "a"),
next(10, "a"),
next(10, "a"),
completed(10),
next(20, "b"),
next(20, "b"),
next(20, "b"),
completed(20),
])
}

it("executes once") {
expect(executionObservables.events.count) == 1
it("executes twice") {
expect(executionObservables.events.count) == 2
}
}

context("error action") {
beforeEach {
action = Action { _ in Observable.error(TestError) }
bindAndExecute(action: action)
bindAndExecuteTwice(action: action)
}

it("element fails with underlyingError") {
XCTAssertEqual(element.events, [
error(10, ActionError.underlyingError(TestError))
error(10, ActionError.underlyingError(TestError)),
error(20, ActionError.underlyingError(TestError)),
])
}

it("executes once") {
expect(executionObservables.events.count) == 1
it("executes twice") {
expect(executionObservables.events.count) == 2
}
}

context("disabled") {
beforeEach {
action = Action(enabledIf: Observable.just(false)) { Observable.just($0) }
bindAndExecute(action: action)
bindAndExecuteTwice(action: action)
}

it("element fails with notEnabled") {
XCTAssertEqual(element.events, [
error(10, ActionError.notEnabled)
error(10, ActionError.notEnabled),
error(20, ActionError.notEnabled),
])
}

it("never executes") {
expect(executionObservables.events).to(beEmpty())
}
}

context("execute while executing") {
var secondElement: TestableObserver<String>!
var trigger: PublishSubject<Void>!

beforeEach {
secondElement = scheduler.createObserver(String.self)
trigger = PublishSubject<Void>()
action = Action { Observable.just($0).sample(trigger) }

action.executionObservables
.bindTo(executionObservables)
.addDisposableTo(disposeBag)

scheduler.scheduleAt(10) {
action.execute("a")
.bindTo(element)
.addDisposableTo(disposeBag)
}

scheduler.scheduleAt(20) {
action.execute("b")
.bindTo(secondElement)
.addDisposableTo(disposeBag)
}

scheduler.scheduleAt(30) {
trigger.onNext()
}

scheduler.start()
}

it("first element receives single value") {
XCTAssertEqual(element.events, [
next(30, "a"),
completed(30),
])
}

it("second element fails with notEnabled error") {
XCTAssertEqual(secondElement.events, [
error(20, ActionError.notEnabled)
])
}

it("executes once") {
expect(executionObservables.events.count) == 1
}
}
}
}
}
Expand Down

0 comments on commit 6c13748

Please sign in to comment.