Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introduce reduce with inout accumulator parameter #2611

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 54 additions & 4 deletions RxSwift/Observables/Reduce.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,40 @@


extension ObservableType {

/**
Applies an `accumulator` function over an observable sequence, returning the result of the aggregation as a single element in the result sequence. The specified `seed` value is used as the initial accumulator value.

For aggregation behavior with incremental intermediate results, see `scan`.

- seealso: [reduce operator on reactivex.io](http://reactivex.io/documentation/operators/reduce.html)

- parameter seed: The initial accumulator value.
- parameter accumulator: A accumulator function to be invoked on each element.
- parameter mapResult: A function to transform the final accumulator value into the result value.
- returns: An observable sequence containing a single element with the final accumulator value.
*/
public func reduce<A, Result>(into seed: A, accumulator: @escaping (inout A, Element) throws -> Void, mapResult: @escaping (A) throws -> Result)
-> Observable<Result> {
Reduce(source: self.asObservable(), seed: seed, accumulator: accumulator, mapResult: mapResult)
}

/**
Applies an `accumulator` function over an observable sequence, returning the result of the aggregation as a single element in the result sequence. The specified `seed` value is used as the initial accumulator value.

For aggregation behavior with incremental intermediate results, see `scan`.

- seealso: [reduce operator on reactivex.io](http://reactivex.io/documentation/operators/reduce.html)

- parameter seed: The initial accumulator value.
- parameter accumulator: A accumulator function to be invoked on each element.
- returns: An observable sequence containing a single element with the final accumulator value.
*/
public func reduce<A>(into seed: A, accumulator: @escaping (inout A, Element) throws -> Void)
-> Observable<A> {
Reduce(source: self.asObservable(), seed: seed, accumulator: accumulator, mapResult: { $0 })
}

/**
Applies an `accumulator` function over an observable sequence, returning the result of the aggregation as a single element in the result sequence. The specified `seed` value is used as the initial accumulator value.

Expand All @@ -22,7 +56,15 @@ extension ObservableType {
*/
public func reduce<A, Result>(_ seed: A, accumulator: @escaping (A, Element) throws -> A, mapResult: @escaping (A) throws -> Result)
-> Observable<Result> {
Reduce(source: self.asObservable(), seed: seed, accumulator: accumulator, mapResult: mapResult)
Reduce(
source: self.asObservable(),
seed: seed,
accumulator: { acc, element in
let currentAcc = acc
acc = try accumulator(currentAcc, element)
},
mapResult: mapResult
)
}

/**
Expand All @@ -38,7 +80,15 @@ extension ObservableType {
*/
public func reduce<A>(_ seed: A, accumulator: @escaping (A, Element) throws -> A)
-> Observable<A> {
Reduce(source: self.asObservable(), seed: seed, accumulator: accumulator, mapResult: { $0 })
Reduce(
source: self.asObservable(),
seed: seed,
accumulator: { acc, element in
let currenctAcc = acc
acc = try accumulator(currenctAcc, element)
},
mapResult: { $0 }
)
}
}

Expand All @@ -60,7 +110,7 @@ final private class ReduceSink<SourceType, AccumulateType, Observer: ObserverTyp
switch event {
case .next(let value):
do {
self.accumulation = try self.parent.accumulator(self.accumulation, value)
try self.parent.accumulator(&self.accumulation, value)
}
catch let e {
self.forwardOn(.error(e))
Expand All @@ -85,7 +135,7 @@ final private class ReduceSink<SourceType, AccumulateType, Observer: ObserverTyp
}

final private class Reduce<SourceType, AccumulateType, ResultType>: Producer<ResultType> {
typealias AccumulatorType = (AccumulateType, SourceType) throws -> AccumulateType
typealias AccumulatorType = (inout AccumulateType, SourceType) throws -> Void
typealias ResultSelectorType = (AccumulateType) throws -> ResultType

private let source: Observable<SourceType>
Expand Down
17 changes: 8 additions & 9 deletions Tests/RxSwiftTests/Observable+ReduceTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ extension ObservableReduceTest {
.completed(250)
])


let res = scheduler.start { xs.reduce(42, accumulator: +) }

let correctMessages = Recorded.events(
Expand All @@ -47,7 +46,7 @@ extension ObservableReduceTest {
.completed(250)
])

let res = scheduler.start { xs.reduce(42, accumulator: +) }
let res = scheduler.start { xs.reduce(into: 42, accumulator: +=) }

let correctMessages = Recorded.events(
.next(250, 42 + 24),
Expand Down Expand Up @@ -91,7 +90,7 @@ extension ObservableReduceTest {
.next(150, 1),
])

let res = scheduler.start { xs.reduce(42, accumulator: +) }
let res = scheduler.start { xs.reduce(into: 42, accumulator: +=) }

let correctMessages: [Recorded<Event<Int>>] = [
]
Expand Down Expand Up @@ -146,9 +145,9 @@ extension ObservableReduceTest {
])

let res = scheduler.start {
xs.reduce(42) { (a: Int, x: Int) throws -> Int in
xs.reduce(into: 42) { (a: inout Int, x: Int) throws -> Void in
if x < 3 {
return a + x
a += x
}
else {
throw testError
Expand Down Expand Up @@ -200,7 +199,7 @@ extension ObservableReduceTest {
.completed(250)
])

let res = scheduler.start { xs.reduce(42, accumulator: +, mapResult: { $0 * 5 }) }
let res = scheduler.start { xs.reduce(into: 42, accumulator: +=, mapResult: { $0 * 5 }) }

let correctMessages = Recorded.events(
.next(250, (42 + 24) * 5),
Expand Down Expand Up @@ -244,7 +243,7 @@ extension ObservableReduceTest {
.next(150, 1),
])

let res = scheduler.start { xs.reduce(42, accumulator: +, mapResult: { $0 * 5 }) }
let res = scheduler.start { xs.reduce(into: 42, accumulator: +=, mapResult: { $0 * 5 }) }

let correctMessages: [Recorded<Event<Int>>] = [
]
Expand Down Expand Up @@ -298,7 +297,7 @@ extension ObservableReduceTest {
.completed(260)
])

let res = scheduler.start { xs.reduce(42, accumulator: { a, x in if x < 3 { return a + x } else { throw testError } }, mapResult: { $0 * 5 }) }
let res = scheduler.start { xs.reduce(into: 42, accumulator: { a, x in if x < 3 { a += x } else { throw testError } }, mapResult: { $0 * 5 }) }

let correctMessages = [
Recorded.error(240, testError, Int.self)
Expand Down Expand Up @@ -345,7 +344,7 @@ extension ObservableReduceTest {
}

func testReduceReleasesResourcesOnError() {
_ = Observable<Int>.just(1).reduce(0, accumulator: +).subscribe()
_ = Observable<Int>.just(1).reduce(into: 0, accumulator: +=).subscribe()
}
#endif
}
Loading