Skip to content

Adds support for handle_continue/2 to gen_stage #264

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed

Adds support for handle_continue/2 to gen_stage #264

wants to merge 1 commit into from

Conversation

hazardfn
Copy link

@hazardfn hazardfn commented Oct 8, 2020

  • {:continue, _term} instructions can now be returned as one would expect from gen_server.
  • :hibernate is now supported on init similar to gen_server.

Attempts to revive #227

@hazardfn
Copy link
Author

hazardfn commented Oct 8, 2020

The :continue tests will only work OTP 21+, not sure what the best strategy is for limiting these tests to those versions...

EDIT: Added a compile time conditional that checks the OTP major version, if there's a better way let me know! 😃

@hazardfn
Copy link
Author

hazardfn commented Oct 8, 2020

The failing test seems to be inherently flakey - I can reproduce on master. Not sure what's causing it but running mix test on a loop it occasionally shows up, can investigate separately:

Switched to branch 'master'
Your branch is up to date with 'origin/master'.

$ for i in 1 2 3 4 5; mix test; end
                                                                                                                                                              
Compiling 3 files (.ex)
................................................................................................................................................................................................

Finished in 6.0 seconds
192 tests, 0 failures

Randomized with seed 355674
................................................................................................................................................................................................

Finished in 6.0 seconds
192 tests, 0 failures

Randomized with seed 77758
.....................................................................................................

  1) test producer_consumer callbacks producer handle_cancel/3 on consumer down (GenStageTest)
     test/gen_stage_test.exs:1553
     Assertion failed, no matching message after 500ms
     The following variables were pinned:
       ref = #Reference<0.3941081809.3725066241.213827>
       consumer = #PID<0.589.0>
     Showing 2 of 2 messages in the mailbox
     code: assert_receive {:producer_consumer_cancelled, {^consumer, ^ref}, {:down, :killed}}
     mailbox:
       pattern: {:producer_consumer_cancelled, {^consumer, ^ref}, {:down, :killed}}
       value:   {:consumer_subscribed, {#PID<0.588.0>, #Reference<0.3941081809.3725066241.213827>}}

       pattern: {:producer_consumer_cancelled, {^consumer, ^ref}, {:down, :killed}}
       value:   {:producer_consumer_cancelled, {#PID<0.589.0>, #Reference<0.3941081809.3725066241.213827>}, {:down, :noproc}}
     stacktrace:
       test/gen_stage_test.exs:1560: (test)

..........................................................................................

Finished in 6.7 seconds
192 tests, 1 failure

Randomized with seed 791161
................................................................................................................................................................................................

Finished in 6.0 seconds
192 tests, 0 failures

Randomized with seed 199756
................................................................................................................................................................................................

Finished in 6.0 seconds
192 tests, 0 failures

Randomized with seed 937796

  * {:continue, _term} instructions can now be returned as one
    would expect from gen_server.

  * :hibernate is now supported on init similar to gen_server.
@hazardfn
Copy link
Author

Rebased for #265

@hazardfn
Copy link
Author

@josevalim is there something I need to do to move this forward?

@josevalim
Copy link
Member

Hi @hazardfn, apparently this fell under the cracks, sorry about that. I will try to save some time to review it this week but it may take a while because there are quite some factors to consider here. In any case, thanks for the PR and the ping!

@acco
Copy link

acco commented Jun 7, 2021

Would love this feature. Anything I can do to help test/QA/review?

when new_state: term, event: term

@doc """
Invoked to handle `continue` instructions.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Invoked to handle `continue` instructions.
Invoked to handle `:continue` instructions.

Comment on lines +1126 to +1127
It is useful for performing work after initialization or for splitting the work
in a callback in multiple steps, updating the process state along the way.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this might be a bit misleading. I think the biggest use of this is being able to emit events and then do something right after, being sure that no messages, calls, or anything else will be "injected" in between. Maybe we can rephrase this?

My suggestion:

Suggested change
It is useful for performing work after initialization or for splitting the work
in a callback in multiple steps, updating the process state along the way.
This callback can be used to perform work right after emitting events from
other callbacks. The "continue mechanism" makes sure that no messages,
calls, casts, or anything else will be handled between a callback emitting
a `:continue` tuple and the `c:handle_continue/2` callback being invoked.

Return values are the same as `c:handle_cast/2`.

This callback is optional. If one is not implemented, the server will fail
if a continue instruction is used.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if a continue instruction is used.
if a `:continue` instruction is used.

Comment on lines +1762 to +1765
{:producer, state, {:continue, _term} = continue} ->
init_producer(mod, [], state, continue)

{:producer, state, :hibernate} ->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would force these return values to be {:producer, state, [], {:continue, term} | :hibernate} to simplify the API. Thoughts? Same for the cases below with producer_consumer and consumer.

Copy link
Author

@hazardfn hazardfn Jan 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this incredibly long case statement is less than ideal but if I understand your suggestion correctly wouldn't this result in the GenStage not being initialized (validation/state setup) properly at all in the :hibernate case and in the {:continue, term} case initialization of the GenStage state would be forced upon the user...?

EDIT: Actually I think in the continue case it wouldn't work either because if I remember correctly user code only has access to the "inner" user state of the GenStage not the internals...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're OK with keeping the current logic as it is it might be cleaner to pull all this out into a pattern matched group of functions instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now you have a few extra cases because of the fact that you are allowing the third element of the tuple to be either the options, :hibernate, or {:continue, term}. What I am suggesting is that you don't allow it to be the options. The spec of the return would become something like {:producer, state :: term, options :: keyword, :hibernate | {:continue, term}}.

This way, you shave off a few case clauses by forcing users to use an empty list of options ([]) if they have no options to pass but still want to use :hibernate or {:continue, term}. Does it make more sense?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right I see, thanks for the clarification! I'll look into these changes tonight 👍

@hazardfn
Copy link
Author

Just checking in to say I won't have as much time as I thought tonight, will post here when I'm able to actually take a bite out of this.

@isaacsanders
Copy link

This would be incredible to have access to.

@hazardfn
Copy link
Author

Closed in favour of #300.

@hazardfn hazardfn closed this May 12, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

5 participants