Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix starmap_indexed not passing index #690

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions reactivex/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3228,16 +3228,16 @@ def starmap_indexed(
invoking the indexed mapper function with unpacked elements
of the source.
"""
from ._map import map_
from ._map import map_indexed_

if mapper is None:
return compose(identity)

def starred(values: Tuple[Any, ...]) -> Any:
def starred(values: Tuple[Any, ...], i: int) -> Any:
assert mapper # mypy is paranoid
return mapper(*values)
return mapper(*values, i)

return compose(map_(starred))
return compose(map_indexed_(starred))


def start_with(*args: _T) -> Callable[[Observable[_T]], Observable[_T]]:
Expand Down
2 changes: 1 addition & 1 deletion reactivex/operators/_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def _identity(value: _T1, _: int) -> _T2:

return compose(
ops.zip_with_iterable(infinite()),
ops.starmap_indexed(_mapper_indexed),
ops.starmap(_mapper_indexed),
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This depended on the incorrect behavior of starmap_indexed. If it actually were to pass the index, it would pass it twice: once from zip_with_iterable(infinite()) and once from starmap_indexed

)


Expand Down
205 changes: 205 additions & 0 deletions tests/test_observable/test_starmap.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,211 @@ def mapper(x, y):
assert xs.subscriptions == [subscribe(200, 290)]
assert invoked[0] == 3

def test_starmap_with_index_throws(self):
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests derived from

def test_map_with_index_throws(self):
and adapted for starmap

with self.assertRaises(RxException):
mapper = ops.starmap_indexed(lambda x, y, index: x)

return return_value((1, 10)).pipe(mapper).subscribe(lambda x: _raise("ex"))

with self.assertRaises(RxException):
return (
throw("ex").pipe(mapper).subscribe(lambda x: x, lambda ex: _raise(ex))
)

with self.assertRaises(RxException):
return (
empty()
.pipe(mapper)
.subscribe(lambda x: x, lambda ex: None, lambda: _raise("ex"))
)

with self.assertRaises(RxException):
return create(lambda o, s: _raise("ex")).pipe(mapper).subscribe()

def test_starmap_with_index_dispose_inside_mapper(self):
scheduler = TestScheduler()
xs = scheduler.create_hot_observable(
on_next(100, (4, 40)), on_next(200, (3, 30)), on_next(500, (2, 20)), on_next(600, (1, 10))
)
invoked = [0]
results = scheduler.create_observer()
d = SerialDisposable()

def projection(x, y, index):
invoked[0] += 1
if scheduler.clock > 400:
d.dispose()

return x + y + index * 100

d.disposable = xs.pipe(ops.starmap_indexed(projection)).subscribe(results)

def action(scheduler, state):
return d.dispose()

scheduler.schedule_absolute(disposed, action)
scheduler.start()
assert results.messages == [on_next(100, 44), on_next(200, 133)]
assert xs.subscriptions == [subscribe(0, 500)]
assert invoked[0] == 3

def test_starmap_with_index_completed(self):
scheduler = TestScheduler()
invoked = [0]
xs = scheduler.create_hot_observable(
on_next(180, (5, 50)),
on_next(210, (4, 40)),
on_next(240, (3, 30)),
on_next(290, (2, 20)),
on_next(350, (1, 10)),
on_completed(400),
on_next(410, (-1, -10)),
on_completed(420),
on_error(430, "ex"),
)

def factory():
def projection(x, y, index):
invoked[0] += 1
return (x + 1) + (y + 10) + (index * 100)

return xs.pipe(ops.starmap_indexed(projection))

results = scheduler.start(factory)
assert results.messages == [
on_next(210, 55),
on_next(240, 144),
on_next(290, 233),
on_next(350, 322),
on_completed(400),
]
assert xs.subscriptions == [subscribe(200, 400)]
assert invoked[0] == 4

def test_starmap_with_index_default_mapper(self):
scheduler = TestScheduler()
xs = scheduler.create_hot_observable(
on_next(180, (5, 50)),
on_next(210, (4, 40)),
on_next(240, (3, 30)),
on_next(290, (2, 20)),
on_next(350, (1, 10)),
on_completed(400),
on_next(410, (-1, -10)),
on_completed(420),
on_error(430, "ex"),
)

def factory():
return xs.pipe(ops.starmap_indexed())

results = scheduler.start(factory)
assert results.messages == [
on_next(210, (4, 40)),
on_next(240, (3, 30)),
on_next(290, (2, 20)),
on_next(350, (1, 10)),
on_completed(400),
]

assert xs.subscriptions == [subscribe(200, 400)]

def test_starmap_with_index_not_completed(self):
scheduler = TestScheduler()
invoked = [0]
xs = scheduler.create_hot_observable(
on_next(180, (5, 50)),
on_next(210, (4, 40)),
on_next(240, (3, 30)),
on_next(290, (2, 20)),
on_next(350, (1, 10)),
)

def factory():
def projection(x, y, index):
invoked[0] += 1
return (x + 1) + (y + 10) + (index * 100)

return xs.pipe(ops.starmap_indexed(projection))

results = scheduler.start(factory)
assert results.messages == [
on_next(210, 55),
on_next(240, 144),
on_next(290, 233),
on_next(350, 322),
]
assert xs.subscriptions == [subscribe(200, 1000)]
assert invoked[0] == 4

def test_starmap_with_index_error(self):
scheduler = TestScheduler()
ex = "ex"
invoked = [0]
xs = scheduler.create_hot_observable(
on_next(180, (5, 50)),
on_next(210, (4, 40)),
on_next(240, (3, 30)),
on_next(290, (2, 20)),
on_next(350, (1, 10)),
on_error(400, ex),
on_next(410, (-1, -10)),
on_completed(420),
on_error(430, "ex"),
)

def factory():
def projection(x, y, index):
invoked[0] += 1
return (x + 1) + (y + 10) + (index * 100)

return xs.pipe(ops.starmap_indexed(projection))

results = scheduler.start(factory)

assert results.messages == [
on_next(210, 55),
on_next(240, 144),
on_next(290, 233),
on_next(350, 322),
on_error(400, ex),
]
assert xs.subscriptions == [subscribe(200, 400)]

def test_starmap_with_index_mapper_throws(self):
scheduler = TestScheduler()
invoked = [0]
ex = "ex"
xs = scheduler.create_hot_observable(
on_next(180, (5, 50)),
on_next(210, (4, 40)),
on_next(240, (3, 30)),
on_next(290, (2, 20)),
on_next(350, (1, 10)),
on_completed(400),
on_next(410, (-1, -10)),
on_completed(420),
on_error(430, "ex"),
)

def factory():
def projection(x, y, index):
invoked[0] += 1
if invoked[0] == 3:
raise Exception(ex)
return (x + 1) + (y + 10) + (index * 100)

return xs.pipe(ops.starmap_indexed(projection))

results = scheduler.start(factory)
assert results.messages == [
on_next(210, 55),
on_next(240, 144),
on_error(290, ex),
]
assert xs.subscriptions == [subscribe(200, 290)]
assert invoked[0] == 3


if __name__ == "__main__":
unittest.main()
Loading