Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ChannelFlow instances do not prefer conflation over buffering when fused #4352

Closed
kevincianfarini opened this issue Feb 12, 2025 · 8 comments
Labels
docs KDoc and API reference flow-fusing

Comments

@kevincianfarini
Copy link
Contributor

kevincianfarini commented Feb 12, 2025

Describe the bug

The Flow.conflate() documentation states that:

Adjacent applications of conflate/buffer, channelFlow, flowOn and produceIn are always fused so that only one properly configured channel is used for execution. Conflation takes precedence over buffer() calls with any other capacity.

This is not the behavior I've observed when using a callbackFlow within my library.

What happened? What should have happened instead?

While working on a library I publish, I wanted to ensure that a flow I return from my public API is always conflated. It does not make sense for events to be buffered because, even though I was using a callbackFlow, the items within that flow describe state and not events. I thought I had achieved this behavior by applying the Flow.conflate operator to the flow I return, however I was able to disprove that this unconditionally always configures the underlying channel to be conflated in some unit tests. In my experience, adjacent applications of the Flow.buffer operator would reconfigure the underlying channel's capacity, but would not reconfigure the buffer overflow strategy.

Instead, I would expect calls to Flow.conflate to always take precedence over other fused operators as the documentation leads me to believe.

I am hoping that this isn't just treated as unclear documentation. From my perspective, I don't see much utility in allowing Flow consumers to buffer a flow which has previously been conflated.

Provide a Reproducer

// The flow that's returned from my public API. 
val original = channelFlow { awaitCancellation() }.conflate()

// The flow that's altered by the API consumer. 
val altered = original.buffer(3, onBufferOverflow = BufferOverflow.SUSPEND)

The above snippet will configure the underlying channel to have a capacity of three and a buffer overflow strategy of DROP_OLDEST. It seems like partial retention of the conflated policy is successful since DROP_OLDEST is retained, however the final configured capacity is inconsistent with what's documented.

@dkhalanskyjb
Copy link
Collaborator

A simplified answer: buffer(3) is simply a very fast consumer of your elements. You can't stop a consumer from consuming elements as quickly as you produce them, even if you think the older elements are not worth consuming. It's not exactly what happens when fusing operators, but it's close.

So, yes, this is not a bug but unclear documentation.

To elaborate, the mental model of operator chaining is that of a pipeline through which data moves.

.conflate() is a small basket with enough space for only one piece of data, and this basket is operated by a person that observes the incoming data, and if something arrives, throws away the contents of the basket (if there are any) and puts the new piece of data in its place.

.buffer(3, BufferOverflow.SUSPEND) is a larger basket, with a strong, strict person operating it. The person observes the incoming data; if there is enough space in the basket for the new data, the person puts the data into the basket, but if not, the person yells: "STOP", and the incoming data stops.

What happens when the large basket is put right behind the small basket? If there is space in the large basket, the strict person moves the data from the small basket; if not, the strict person forbids new data from entering the small basket, so the operator of the small basket throws away the incoming data when both baskets are full. So, what we observe is: the oldest element gets dropped, and 4 elements fit into the two baskets.

What fusion does is it allows throwing away all basket but the last one while effectively preserving the basket operators: if you are given a flow with a basket of size 64, you can demand that it's replaced by a small basket.

If your flow pipeline ends with a basket, a consumer of your flow can replace your basket with a larger or a smaller one, but you get to decide what the operator of the basket does to your flow.

@dkhalanskyjb dkhalanskyjb added docs KDoc and API reference and removed bug labels Feb 13, 2025
@kevincianfarini
Copy link
Contributor Author

I think on a philosophical level that line of reasoning makes some sense, however what are some real world use cases for this behavior?

There's at least one scenario I can think of that the following statement doesn't apply to:

buffer(3) is simply a very fast consumer of your elements. You can't stop a consumer from consuming elements as quickly as you produce them, even if you think the older elements are not worth consuming. It's not exactly what happens when fusing operators, but it's close.

I'm currently using a callbackFlow which I conflate in my library to deal with synchronously emitted elements from callbacks which are sent using trySend. In this scenario it's possible that my callback could by synchronously invoked multiple times before coroutine dispatch occurs. In such a case, even if I had a downstream consumer working as quickly as possible, these elements would still be conflated because they are sent to the channel before dispatch can occur. Within the above line of reasoning they'd be buffered.

I'm not trying to be a jerk and nitpick your reasoning, either. This is a real scenario I ran into when writing some unit tests for my library which then uncovered this behavior in the first place.

@dkhalanskyjb
Copy link
Collaborator

In such a case, even if I had a downstream consumer working as quickly as possible, these elements would still be conflated

Only if you're in a single-threaded scenario.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.channels.*

fun main() {
    runBlocking {
        channelFlow {
            repeat(50) {
                trySend(it)
                Thread.sleep(50) // not a suspension, just making the producer slow
            }
        }.conflate().flowOn(Dispatchers.IO).collect {
            println(it)
        }
    }
}

prints every number.

So, semantically, buffer is an extremely fast consumer running in parallel.

@kevincianfarini
Copy link
Contributor Author

I'm sorry, I don't really see how your example emulates a producer quickly (and synchronously) sending many values to the underlying channel. If I remove the producer from being made slow, which isn't reflective of the example I was attempting to make, then I end up with the following code that does exhibit conflation.

@Test
fun foo() = runTest {
    // Multithreaded, fast as possible consumer.
    val list = channelFlow<Int> {
        repeat(50) { trySend(it) }
    }.conflate().flowOn(Dispatchers.Default).toList()

    println(list) // [31, 33, 34, 36, 37, 49]

    // Single threaded buffer.
    val list2 = channelFlow<Int> {
        repeat(50) { trySend(it) }
    }.conflate().buffer(50).toList()

    println(list2) // [0, ..., 49]
}

This example really emphasizes my core point -- buffer is not a great model for a fast consumer in this scenario because coroutines must be dispatched, and during that dispatch cycle synchronous values sent from a producer would still conflate.

@dkhalanskyjb
Copy link
Collaborator

I'm sorry, I don't really see how your example emulates a producer quickly (and synchronously) sending many values to the underlying channel.

It's not what the example emulates. It emulates the consumer being much quicker than the producer. buffer is an extremely fast consumer, even faster than the producer in your example. flowOn is not nearly as fast, so I artifically slowed down the producer to emphasize the effect.

@kevincianfarini
Copy link
Contributor Author

Okay, so to summarize:

  • Flow.buffer acts semantically like a really fast consumer. Faster than coroutine dispatch, even.
  • Other flows which don't experience underlying channel fusing don't benefit from this really fast consumer because they'd have to undergo coroutine dispatch, so these benefits are not evenly distributed unless you have multiple adjacent ChannelFlow operations. For example, the following flow would still conflate entries if the callback was synchronously invoked in quick succession.
val original = flow {
  val channel = Channel(CONFLATED)
  // ... register some callback
  try {
    for (item in channel) emit(item)
  } finally {
    // ... unregister the callback
  }
} 

val buffered = original.buffer(3)

What are some example use cases of this behavior? More specifically, when would it be useful for a flow which was previously conflated to be buffered later?

@dkhalanskyjb
Copy link
Collaborator

Faster than coroutine dispatch

they'd have to undergo coroutine dispatch

A coroutine doesn't always need to go through a dispatch to consume an element. It may just keep spinning if, after being done with one element, it notices that another one is available already.

For example, the following flow would still conflate entries

Most of the time, sure, probably. You can't rely on that, though, as it's subject to race conditions if the callback gets executed in another thread.

More specifically, when would it be useful for a flow which was previously conflated to be buffered later?

For example, we use .conflate() in our javafx.beans.value.ObservableValue.asFlow() implementation https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-javafx/kotlinx.coroutines.javafx/as-flow.html. This makes sense in general, because ObservableValue represents a state, and users are typically only interested in the latest state. However, this may not be true in some specific use case: some code can expose an ObservableValue where the values in the stream are important. Then, you'd override the default conflating buffering strategy with another one, more suitable for you.

Another example: diagnostics, to see how many values fly through the system.

@kevincianfarini
Copy link
Contributor Author

Okay, I think you've convinced me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
docs KDoc and API reference flow-fusing
Projects
None yet
Development

No branches or pull requests

2 participants