Skip to content

Commit b5dfe4c

Browse files
Add mutable.Signal.imap
1 parent 6a3acfc commit b5dfe4c

File tree

3 files changed

+22
-6
lines changed

3 files changed

+22
-6
lines changed

core/src/main/scala/fs2/async/immutable/Signal.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,11 @@ trait Signal[F[_], A] { self =>
5858

5959
object Signal {
6060

61-
implicit class ImmutableSignalSyntax[F[_] : Async,A] (val self: Signal[F,A]) {
61+
implicit class ImmutableSignalSyntax[F[_] : Async, A] (val self: Signal[F, A]) {
6262
/**
6363
* Converts this signal to signal of `B` by applying `f`
6464
*/
65-
def map[B](f: A => B):Signal[F,B] = new Signal[F,B] {
65+
def map[B](f: A => B): Signal[F,B] = new Signal[F, B] {
6666
def continuous: Stream[F, B] = self.continuous.map(f)
6767
def changes: Stream[F, Unit] = self.discrete.through(pipe.changes(_ == _)).map(_ => ())
6868
def discrete: Stream[F, B] = self.discrete.map(f)

core/src/main/scala/fs2/async/mutable/Queue.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import fs2.async.immutable
1010
* a queue may have a bound on its size, in which case enqueuing may
1111
* block until there is an offsetting dequeue.
1212
*/
13-
trait Queue[F[_],A] { self =>
13+
trait Queue[F[_], A] { self =>
1414

1515
/**
1616
* Enqueues one element in this `Queue`.

core/src/main/scala/fs2/async/mutable/Signal.scala

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@ import fs2.Async.Change
66
import fs2._
77
import fs2.Stream._
88
import fs2.async.immutable
9-
import fs2.util.Monad
9+
import fs2.util.{Monad, Functor}
1010

1111
/**
1212
* A signal whose value may be set asynchronously. Provides continuous
1313
* and discrete streams for responding to changes to it's value.
1414
*/
15-
trait Signal[F[_],A] extends immutable.Signal[F,A] {
15+
trait Signal[F[_], A] extends immutable.Signal[F, A] { self =>
1616

1717
/** Sets the value of this `Signal`. */
1818
def set(a: A): F[Unit]
@@ -24,13 +24,29 @@ trait Signal[F[_],A] extends immutable.Signal[F,A] {
2424
*
2525
* `F` returns the result of applying `op` to current value.
2626
*/
27-
def modify(f: A => A): F[Change[A]]
27+
def modify(f: A => A): F[Change[A]]
2828

2929
/**
3030
* Asynchronously refreshes the value of the signal,
3131
* keep the value of this `Signal` the same, but notify any listeners.
3232
*/
3333
def refresh: F[Unit]
34+
35+
/**
36+
* Returns an alternate view of this `Signal` where its elements are of type [[B]],
37+
* given a function from `A` to `B`.
38+
*/
39+
def imap[B](f: A => B)(g: B => A)(implicit F: Functor[F]): Signal[F, B] =
40+
new Signal[F, B] {
41+
def discrete: Stream[F, B] = self.discrete.map(f)
42+
def continuous: Stream[F, B] = self.continuous.map(f)
43+
def changes: Stream[F, Unit] = self.changes
44+
def get: F[B] = F.map(self.get)(f)
45+
def set(b: B): F[Unit] = self.set(g(b))
46+
def refresh: F[Unit] = self.refresh
47+
def modify(bb: B => B): F[Change[B]] =
48+
F.map(self.modify(a => g(bb(f(a))))) { case Change(prev, now) => Change(f(prev), f(now)) }
49+
}
3450
}
3551

3652
object Signal {

0 commit comments

Comments
 (0)