Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ServerStream.stream() should release underlying resources when calling Stream.close() method #3358

Open
rossignolloic opened this issue Nov 8, 2024 · 0 comments
Labels
priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@rossignolloic
Copy link

Is your feature request related to a problem? Please describe.
When I use ServerStream.stream() and call any Stream terminal operation that does not consume all elements of the stream like findAny(), anyMatch(Predicate<?>), all underlying resources of the ServerStream are not released.

ServerStream<X> serverStream= ...;
try(Stream<X> stream = serverStream.stream()) {
    //consume partially the stream
}
//At this point serverStream underlying resources are not released

Describe the solution you'd like
The derived stream should close all underlying resources of the main ServerStream when it is closed.
To do that, the stream() method should register an onClose(Runnable) on the stream before returning it.
This Runnable should cancel the ServerStream when the stream is not fully consumed.
With this, the stream user should add the statement try-with-resources to ensure all underlying resources are released in all cases.

ServerStream<X> serverStream= ...;
try(Stream<X> stream = serverStream.stream()) {
    //consume totally or partially the stream
}
//At this point serverStream is fully consumed or canceled

Proposal implementation steps

In ServerStreamIterator, a new method can be created to be able to know if the iterator is fully consumed.

boolean isFullyConsumed() {
    return last == QueuingResponseObserver.EOF_MARKER;
}

In ServerStream, a new method can be created to cancel only when iterator is not fully consumed.

void cancelIfNecessary() {
    if (!iterator.isFullyConsumed()){
        cancel();
    }
}

In ServerStream, the stream() method can be updated to register the onClose(Runnable) to call the above method

public Stream<V> stream() {
    return StreamSupport.stream(this.spliterator(), false).onClose(this::cancelIfNecessary);
}

Describe alternatives you've considered
I have tried two different workarounds.

Register a onClose(Runnable) that cancel the ServerStream

ServerStream<X> serverStream= ...;
return serverStream.stream().onClose(serverStream::cancel)

But with this, ServerStream is canceled even if all its items have been successfully consumed. I don't think it's a good practice to do this.

Not use stream() method, but create the stream from the iterator() and register the onClose(Runnable) on it that check result of iterator.hasNext() to cancel or not the ServerStream

ServerStream<X> serverStream= ...;
Iterator<X> iterator = serverStream.iterator();
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 0), false).onClose(() -> {
    if (iterator.hasNext()) {
        serverStream.cancel();
    }
})

But iterator.hasNext() can force the Thread to wait until next element is received, which is a waste of time in that case.

@burkedavison burkedavison added type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. priority: p2 Moderately-important priority. Fix may not be included in next release. labels Nov 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
Development

No branches or pull requests

2 participants