Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question about ConcurrencyLimit Service and Service trait in general #689

Open
Tony-X opened this issue Sep 2, 2022 · 5 comments
Open

Comments

@Tony-X
Copy link

Tony-X commented Sep 2, 2022

As I understood the Service trait's responsibility is to produce a Future that encapsulates all the computations needed to serve a request. And it is common to expect a Service being reused by many requests concurrently.

I noticed that ConcurrencyLimit<T> allows mutations to itself in both poll_ready() and call() to update the permit which means concurrent accesses would cause race conditions. Why is this okay?

pub struct ConcurrencyLimit<T> {
    inner: T,
    semaphore: PollSemaphore,
    /// The currently acquired semaphore permit, if there is sufficient
    /// concurrency to send a new request.
    ///
    /// The permit is acquired in `poll_ready`, and taken in `call` when sending
    /// a new request.
    permit: Option<OwnedSemaphorePermit>,
}

And in general why does Service trait define poll_ready() and call() to work with &mut self instead of &self?

@davidpdrsn
Copy link
Member

I noticed that ConcurrencyLimit allows mutations to itself in both poll_ready() and call() to update the permit which means concurrent accesses would cause race conditions

Can you elaborate on this? Why might it cause race conditions?

@Tony-X
Copy link
Author

Tony-X commented Sep 2, 2022

Can you elaborate on this? Why might it cause race conditions?

Here are the relevant functions.

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        // If we haven't already acquired a permit from the semaphore, try to
        // acquire one first.
        if self.permit.is_none() {
            self.permit = ready!(self.semaphore.poll_acquire(cx));  /*** MUTATION ***/
            debug_assert!(
                self.permit.is_some(),
                "ConcurrencyLimit semaphore is never closed, so `poll_acquire` \
                 should never fail",
            );
        }

        // Once we've acquired a permit (or if we already had one), poll the
        // inner service.
        self.inner.poll_ready(cx)
    }

    fn call(&mut self, request: Request) -> Self::Future {
        // Take the permit
        let permit = self
            .permit
            .take()    /*** MUTATION ***/
            .expect("max requests in-flight; poll_ready must be called first");

        // Call the inner service
        let future = self.inner.call(request);

        ResponseFuture::new(future, permit)
    }

Suppose we have let service: ConcurrencyLimit<T> = ...; that only allows two requests. imagine two async tasks which runs the following --

{
   service.poll_ready().await?                      
   runtime.spawn(service.call(req));            
}

suppose the scheduling decides to let both poll_ready() calls finish, so two tasks both are allowed to invoke call(). Then one tasks enters call() and consumes the permit and leaves self.permit as None. Then the other task enters call() and expect the permit to be present, which I think will panic. Maybe I missed something....

@hawkw
Copy link
Member

hawkw commented Sep 2, 2022

Suppose we have let service: ConcurrencyLimit<T> = ...; that only allows two requests. imagine two async tasks which runs the following --

{
   service.poll_ready().await?                      
   runtime.spawn(service.call(req));            
}

suppose the scheduling decides to let both poll_ready() calls finish, so two tasks both are allowed to invoke call(). Then one tasks enters call() and consumes the permit and leaves self.permit as None. Then the other task enters call() and expect the permit to be present, which I think will panic. Maybe I missed something....

This is not possible in safe Rust. In order to call poll_ready and call, both tasks would need to either own or mutably borrow the service. In most async runtimes, spawning a task that runs concurrently requires the spawned Future to be valid for the 'static lifetime, so the spawned future may not take ownership of an &mut ConcurrencyLimit<...>. Even if a runtime permitted spawning non-'static tasks to run concurrently,1 both tasks would need to move an &mut borrow of the service into the task's future, which the Rust compiler would not permit.2 As a demonstration of this, I would suggest trying to actually write code that reproduces the hypothetical race you've described — I think you'll find that it's not actually possible to write such code.

Instead, the common pattern is for each spawned task to move a clone of the service by value. Typically, if only a single instance of an inner Service should exist, or if it is expensive to clone, it is wrapped in a tower::buffer middleware service, which allows multiple lightweight clones that send requests to the wrapped service through message passing.

Note that the ConcurrencyLimit service, and similar middleware services that acquire a resource in poll_ready that's consumed in call, are typically implemented so that each clone has its own version of the acquired resource, rather than cloning a resource previously acquired by the instance of that middleware that the clone was cloned from:

impl<T: Clone> Clone for ConcurrencyLimit<T> {
fn clone(&self) -> Self {
// Since we hold an `OwnedSemaphorePermit`, we can't derive `Clone`.
// Instead, when cloning the service, create a new service with the
// same semaphore, but with the permit in the un-acquired state.
Self {
inner: self.inner.clone(),
semaphore: self.semaphore.clone(),
permit: None,
}
}
}

The shared state of such middlewares (in this case, the Semaphore that's used to track the concurrency limit), on the other hand, is typically reference counted, so that each clone of the middleware references the same instance of the shared state. In this case, all clones of a ConcurrencyLimit middleware are responsible for acquiring their own independent permits from the semaphore, but share the same Semaphore instance that those permits are acquired from. Thus, each task that attempts to use its own clone of the ConcurrencyLimit service has its own state that's only mutated by that task, but each clone's permit acquisition counts against a limit that applies to all clones of that service.

Incidentally, I think this also answers your other question about why the poll_ready method takes an &mut self rather than &self: it's intended to allow mutating the state of an individual clone of a Service in this manner, while any state that's shared between tasks is reference counted or similarly shared between clones of a Service.

I hope this is a helpful answer, and I'm happy to expand on anything I said if it doesn't make sense yet!

Footnotes

  1. Such as some form of "scoped tasks" API...

  2. This is why I said "not possible in safe Rust" earlier — it would be possible to write unsafe code that creates two &mut references to the same instance of a ConcurrencyLimit. However, this would be unsound to do anyway, so any unsafe code that does this is already a bug, and one that may have significantly worse consequences (e.g. memory corruption) than the potential race you're describing here.

@Tony-X
Copy link
Author

Tony-X commented Sep 3, 2022

That was an insightful answer with great details. Thank you @hawkw!

Indeed the custom Clone impl was the missing piece in my understanding. Now I see the usage pattern based on how Service is designed now. I have to say though, that Clone impl requires the inner service to be clonable which might be limiting.

I also started a discussion in discord and I'm quoting a comment from @seanmonstar

Have service be mutable or not is often asked about, and there's reasons in pro for both. If nothing else, it would probably good thing to out in an FAQ

Could you also please share some insight about the design tradeoffs?

@GlenDC
Copy link
Contributor

GlenDC commented Dec 5, 2023

@hawkw / @davidpdrsn . Is further action required here? E.g. more answers, doc changes, etc?
Or can it be closed as is? Has been open without action for more then a year now...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants