Skip to content

Commit e39bfb3

Browse files
committed
Update Running.scala
1 parent 54c2302 commit e39bfb3

File tree

1 file changed

+5
-6
lines changed
  • persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal

1 file changed

+5
-6
lines changed

persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala

+5-6
Original file line numberDiff line numberDiff line change
@@ -120,13 +120,12 @@ private[pekko] object Running {
120120
/**
121121
* A marker to indicate to upstream code that an unstash is needed.
122122
*/
123-
final class UnstashNeeded(val next: Behavior[InternalProtocol], context: ActorContext[InternalProtocol])
123+
final class UnstashNeeded(val next: Behavior[InternalProtocol],
124+
unstashFunction: Behavior[InternalProtocol] => Behavior[InternalProtocol],
125+
context: ActorContext[InternalProtocol])
124126
extends AbstractBehavior[InternalProtocol](context) {
125127

126-
/**
127-
* @throws NotImplementedError as this behavior should not receive any messages
128-
*/
129-
override def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = ???
128+
override def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = unstashFunction(next)
130129
}
131130

132131
def startReplicationStream[C, E, S](
@@ -283,7 +282,7 @@ private[pekko] object Running {
283282
def onCommand(state: RunningState[S], cmd: C): Behavior[InternalProtocol] = {
284283
val effect = setup.commandHandler(state.state, cmd)
285284
val (next, doUnstash) = applyEffects(cmd, state, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast?
286-
if (doUnstash) new Running.UnstashNeeded(next, setup.context)
285+
if (doUnstash) new Running.UnstashNeeded(next, tryUnstashOne, setup.context)
287286
else next
288287
}
289288

0 commit comments

Comments
 (0)