-
Notifications
You must be signed in to change notification settings - Fork 74
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix memory memory management for flatMap #376
Open
thepont
wants to merge
2
commits into
ReactiveX:master
Choose a base branch
from
thepont:fix/flatmap-memory
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for calling this out.
This operator is technically working as designed, i.e. we are intentionally pulling and buffering values from the outer sequence as quickly as they're emitted, but only flattening
concurrent
number of inner sequences at a time (similar to the Observable implementation).But I can see how the internal buffering is problematic, and I agree that we shouldn't need to buffer the outer sequence values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @trxcllnt, I manged to get the tests running locally, thanks for your help identifying the problems with my code.
When I started looking at the actual code I figured it must have been by design.
My intent was to fan out to do multiple API requests ( to AWS SQS) concurrently based on the result of of a dynamodb scan and the speed that I could processes the items is reduced significantly when I removed the concurrency and tried a
concatMap
( although this technically works, and solves my memory issues, although a little slower for my application).My initial motivation for moving from RX to IX was to fix this issue, figuring it would be difficult to prevent my memory issues unless I had a way of throttling the producer, I assume another approach that would work as the library currently stands is to use a
throttle
before theflatMap
and attempt to tweak that time so the consumer can keep up, and the buffer never explodes.On the tests, I managed to run the unit tests although the contribution guide mentions performance tests but I seem unable to locate these?