Skip to content

Commit

Permalink
Move draft from wg-net
Browse files Browse the repository at this point in the history
  • Loading branch information
aturon committed Dec 12, 2018
1 parent 29c28cc commit d598899
Show file tree
Hide file tree
Showing 15 changed files with 1,481 additions and 1 deletion.
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
MIT License

Copyright (c) 2018 The Rust Programming Language
Copyright (c) 2018 Aaron Turon

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
34 changes: 34 additions & 0 deletions src/TOC.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Table of Contents

- [Getting Started](getting_started/chapter.md)
- [What This Book Covers](getting_started/chapter.md#what-this-book-covers)
- [Why Async?](getting_started/why_async.md)
- [The State of Asynchronous Rust](getting_started/state_of_async_rust.md)
- [`async`/`await!` Primer](getting_started/async_await_primer.md)
- [Applied: HTTP Server](getting_started/http_server_example.md)
- [Under the Hood: Executing `Future`s and Tasks](execution/chapter.md)
- [The `Future` Trait](execution/future.md)
- [Task Wakeups with `LocalWaker` and `Waker`](execution/wakeups.md)
- [Applied: Build a Timer](execution/wakeups.md)
- [Applied: Build an Executor](execution/executor.md)
- [Executors and System IO](execution/io.md)
- [`async`/`await`]()
- [What and Why]()
- [`async` Blocks, Closures, and Functions]()
- [Applied: XXX]()
- [Pinning](pinning/chapter.md)
- [Practical Usage](pinning/chapter.md#how-to-use-pinning)
- [Streams](streams/chapter.md)
- [Patterns: Iteration and Concurrency]()
- [Executing Multiple Futures at a Time]()
- [`select!` and `join!`]()
- [Spawning]()
- [Cancellation and Timeouts]()
- [`FuturesUnordered`]()
- [I/O]()
- [`AsyncRead` and `AsyncWrite`]()
- [Asynchronous Design Patterns: Solutions and Suggestions]()
- [Modeling Servers and the Request/Response Pattern]()
- [Managing Shared State]()
- [The Ecosystem: Tokio and More]()
- Lots, lots more?...
154 changes: 154 additions & 0 deletions src/async_await/chapter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
# `async`/`await!`

In [the first chapter], we took a brief look at `async`/`await!` and used
it to build a simple server. This chapter will discuss `async`/`await!` in
greater detail, explaining how it works and how `async` code differs from
traditional Rust programs.

`async`/`await!` are special pieces of Rust syntax that make it possible to
yield control of the current thread rather than blocking, allowing other
code to make progress while waiting on an operation to complete.

There are three main ways to use `async`: `async fn`, `async` blocks, and
`async` closures. Each returns a value that implements the `Future` trait:

```rust
// `foo()` returns a type that implements `Future<Output = u8>`.
// `await!(foo())` will result in a value of type `u8`.
async fn foo() -> u8 { 5 }

fn bar() -> impl Future<Output = u8> {
// This `async` block results in a type that implements
// `Future<Output = u8>`.
async {
let x: u8 = await!(foo());
x + 5
}
}

fn baz() -> impl Future<Output = u8> {
// This `async` closure, when called, returns a type that
// implements `Future<Output = u8>`
let closure = async |x: u8| {
await!(bar()) + x
};
closure(5)
}
```

As we saw in the first chapter, `async` bodies and other futures are lazy:
they do nothing until they are run. The most common way to run a `Future`
is to `await!` it. When `await!` is called on a `Future`, it will attempt
to run it to completion. If the `Future` is blocked, it will yield control
of the current thread. When more progress can be made, the `Future` will be picked
up by the executor and will resume running, allowing the `await!` to resolve.

## `async` Lifetimes

Unlike traditional functions, `async fn`s which take references or other
non-`'static` arguments return a `Future` which is bounded by the lifetime of
the arguments:

```rust
// This function:
async fn foo(x: &u8) -> u8 { *x }

// Is equivalent ot this function:
fn foo<'a>(x: &'a u8) -> impl Future<Output = ()> + 'a {
async { *x }
}
```

This means that the future returned from an `async fn` must be `await!`ed
while its non-`'static` arguments are still valid. In the common
case of `await!`ing the future immediately after calling the function
(like `await!(foo(&x))`) this is not an issue. However, if storing the future
or sending it over to another task or thread, this may be an issue.

One common workaround for turning an `async fn` with references-as-arguments
into a `'static` future is to bundle the arguments with the call to the
`async fn` inside an `async` block:

```rust
async fn foo(x: &u8) -> u8 { *x }

fn bad() -> impl Future<Output = ()> {
let x = 5;
foo(&x) // ERROR: `x` does not live long enough
}

fn good() -> impl Future<Output = ()> {
async {
let x = 5;
await!(foo(&x))
}
}
```

By moving the argument into the `async` block, we extend its lifetime to match
that of the `Future` returned from the call to `foo`.

## `async move`

`async` blocks and closures allow the `move` keyword, much like normal
closures. An `async move` block will take ownership of the variables it
references, allowing it to outlive the current scope, but giving up the ability
to share those variables with other code:

```rust
/// `async` block:
///
/// Multiple different `async` blocks can access the same local variable
/// so long as they're executed within the variable's scope.
async fn foo() {
let my_string = "foo".to_string();

let future_one = async {
...
println!("{}", my_string);
};

let future_two = async {
...
println!("{}", my_string);
};

// Run both futures to completion, printing "foo" twice
let ((), ()) = join!(future_one, future_two);
}

/// `async move` block:
///
/// Only one `async` block can access captured variables, since they are
/// moved into the `Future` generated by the `async` block. However,
/// this allows the `Future` to outlive the original scope of the variable:
fn foo() -> impl Future<Output = ()> {
let my_string = "foo".to_string();
async move {
...
println!("{}", my_string);
}
}
```

## `await!`ing on a Multithreaded Executor

Note that, when using a multithreaded `Future` executor, a `Future` may move
between threads, so any variables used in `async` bodies must be able to travel
between threads, as any `await!` can potentially result in a switch to a new
thread.

This means that it is not safe to use `Rc`, `&RefCell` or any other types
that don't implement the `Send` trait, including references to types that don't
implement the `Sync` trait.

(Caveat: it is possible to use these types so long as they aren't in scope
during a call to `await!`.)

Similarly, it isn't a good idea to hold a traditional non-futures-aware lock
across an `await!`, as it can cause the threadpool to lock up: one task could
take out a lock, `await!` and yield to the executor, allowing another task to
attempt to take the lock and cause a deadlock. To avoid this, use the `Mutex`
in `futures::lock` rather than the one from `std::sync`.

[the first chapter]: TODO ../getting_started/async_await_primer.md
13 changes: 13 additions & 0 deletions src/execution/chapter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Under the Hood: Executing `Future`s and Tasks

In this section, we'll cover the underlying structure of how `Future`s and
asynchronous tasks are scheduled. If you're only interested in learning
how to write higher-level code that uses existing `Future` types and aren't
interested in the details of how `Future` types work, you can skip ahead to
the `async`/`await` chapter. However, several of the topics discussed in this
chapter are useful for understanding how `async`/`await` code works,
understanding the runtime and performance properties of `async`/`await` code,
and building new asynchronous primitives. If you decide to skip this section
now, you may want to bookmark it to revisit in the future.

Now, with that out of the, way, let's talk about the `Future` trait.
183 changes: 183 additions & 0 deletions src/execution/executor.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
# Applied: Build an Executor

`Future`s are lazy and must be actively driven to completion in order to do
anything. A common way to drive a future to completion is to `await!` it inside
an `async` function, but that just pushes the problem one level up: who will
run the futures returned from the top-level `async` functions? The answer is
that we need a `Future` executor.

`Future` executors take a set of top-level `Future`s and run them to completion
by calling `poll` whenever the `Future` can make progress. Typically, an
executor will `poll` a future once to start off. When `Future`s indicate that
they are ready to make progress by calling `wake()`, they are placed back
onto a queue and `poll` is called again, repeating until the `Future` has
completed.

In this section, we'll write our own simple executor capable of running a large
number of top-level futures to completion concurrently.

For this one, we're going to have to include the `futures` crate in order to
get the `FutureObj` type, which is a dynamically-dispatched `Future`, similar
to `Box<dyn Future<Output = T>>`. `Cargo.toml` should look something like this:

```toml
[package]
name = "xyz"
version = "0.1.0"
authors = ["XYZ Author"]
edition = "2018"

[dependencies]
futures-preview = "0.3.0-alpha.9"
```

Next, we need the following imports at the top of `src/main.rs`:

```rust
#![feature(arbitrary_self_types, async_await, await_macro, futures_api, pin)]

use {
futures::future::FutureObj,
std::{
future::Future,
pin::Pin,
sync::{Arc, Mutex},
sync::mpsc::{sync_channel, SyncSender, Receiver},
task::{
local_waker_from_nonlocal,
Poll, Wake,
},
},
};
```

Our executor will work by sending tasks to run over a channel. The executor
will pull events off of the channel and run them. When a task is ready to
do more work (is awoken), it can schedule itself to be polled again by
putting itself back onto the channel.

In this design, the executor itself just needs the receiving end of the task
channel. The user will get a sending end so that they can spawn new futures.
Tasks themselves are just futures that can reschedule themselves, so we'll
store them as a future paired with a sender that the task can use to requeue
itself.

```rust
/// Task executor that receives tasks off of a channel and runs them.
struct Executor {
ready_queue: Receiver<Arc<Task>>,
}

/// `Spawner` spawns new futures onto the task channel.
#[derive(Clone)]
struct Spawner {
task_sender: SyncSender<Arc<Task>>,
}

/// A future that can reschedule itself to be polled using a channel.
struct Task {
// In-progress future that should be pushed to completion
//
// The `Mutex` is not necessary for correctness, since we only have
// one thread executing tasks at once. However, `rustc` isn't smart
// enough to know that `future` is only mutated from one thread,
// so we use it in order to provide safety. A production executor would
// not need this, and could use `UnsafeCell` instead.
future: Mutex<Option<FutureObj<'static, ()>>>,

// Handle to spawn tasks onto the task queue
task_sender: SyncSender<Arc<Task>>,
}

fn new_executor_and_spawner() -> (Executor, Spawner) {
// Maximum number of tasks to allow queueing in the channel at once.
// This is just to make `sync_channel` happy, and wouldn't be present in
// a real executor.
const MAX_QUEUED_TASKS: usize = 10_000;
let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
(Executor { ready_queue }, Spawner { task_sender})
}
```

Let's also add a method to spawner to make it easy to spawn new futures.
This method will take a future type, box it and put it in a FutureObj,
and create a new `Arc<Task>` with it inside which can be enqueued onto the
executor.

```rust
impl Spawner {
fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
let future_obj = FutureObj::new(Box::new(future));
let task = Arc::new(Task {
future: Mutex::new(Some(future_obj)),
task_sender: self.task_sender.clone(),
});
self.task_sender.send(task).expect("too many tasks queued");
}
}
```

In order poll futures, we'll also need to create a `LocalWaker` to provide to
poll. As discussed in the [task wakeups section], `LocalWaker`s are responsible
for scheduling a task to be polled again once `wake` is called. Remember that
`LocalWaker`s tell the executor exactly which task has become ready, allowing
them to poll just the futures that are ready to make progress. The easiest way
to create a new `LocalWaker` is by implementing the `Wake` trait and then using
the `local_waker_from_nonlocal` or `local_waker` functions to turn a `Arc<T: Wake>`
into a `LocalWaker`. Let's implement `Wake` for our tasks to allow them to be
turned into `LocalWaker`s and awoken:

```rust
impl Wake for Task {
fn wake(arc_self: &Arc<Self>) {
// Implement `wake` by sending this task back onto the task channel
// so that it will be polled again by the executor.
let cloned = arc_self.clone();
arc_self.task_sender.send(cloned).expect("too many tasks queued");
}
}
```

When a `LocalWaker` is created from an `Arc<Task>`, calling `wake()` on it will
cause a copy of the `Arc` to be sent onto the task channel. Our executor then
needs to pick up the task and poll it. Let's implement that:

```rust
impl Executor {
fn run(&self) {
while let Ok(task) = self.ready_queue.recv() {
let mut future_slot = task.future.lock().unwrap();
// Take the future, and if it has not yet completed (is still Some),
// poll it in an attempt to complete it.
if let Some(mut future) = future_slot.take() {
// Create a `LocalWaker` from the task itself
let lw = local_waker_from_nonlocal(task.clone());
if let Poll::Pending = Pin::new(&mut future).poll(&lw) {
// We're not done processing the future, so put it
// back in its task to be run again in the future.
*future_slot = Some(future);
}
}
}
}
}
```

Congratulations! We now have a working futures executor. We can even use it
to run `async/await!` code and custom futures, such as the `TimerFuture` we
wrote earlier:

```rust
fn main() {
let (executor, spawner) = new_executor_and_spawner();
spawner.spawn(async {
println!("howdy!");
// Wait for our timer future to complete after two seconds.
await!(TimerFuture::new(Duration::new(2, 0)));
println!("done!");
});
executor.run();
}
```

[task wakeups section]: TODO
Loading

0 comments on commit d598899

Please sign in to comment.