Releases: tokio-rs/tokio
Tokio v0.2.13
Fixes a minor bug in the previous release that resulted in compilation errors using the new pin!
form.
Fixes
- macros: unresolved import in
pin!
(#2281).
Tokio v0.2.12
Polish, small additions, and fixes. The biggest additions in this release are StreamMap
and Notify
.
StreamMap
Similar to StreamExt::merge
, StreamMap
supports merging multiple source streams into a single stream, producing items as they become available in the source streams. However, StreamMap
supports inserting and removing streams at run-time. This is useful for cases where a consumer wishes to subscribe to messages from multiple sources and dynamically manage those subscriptions.
As the name implies, StreamMap
maps keys to streams. Streams are [inserted] or [removed] as needed and then the StreamMap
is used as any other stream. Items are returned with their keys, enabling the caller to identify which source stream the item originated from.
Example
use tokio::stream::{StreamExt, StreamMap};
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (mut tx1, rx1) = mpsc::channel(10);
let (mut tx2, rx2) = mpsc::channel(10);
// use `Sender` handles
let mut map = StreamMap::new();
// Insert both streams
map.insert("one", rx1);
map.insert("two", rx2);
// Read twice
for _ in 0..2 {
let (key, val) = map.next().await.unwrap();
println!("got {} from {}", val, key);
// Remove the stream to prevent reading the next value
map.remove(key);
}
}
Notify
Notify
is the next step in providing async / await
based synchronization primitives. It is similar to how thread::park() / unpark()
work, but for asynchronous tasks. Consumers await notifications and producers notify consumers. Notify
is intended to be used as a building block for higher level synchronization primitives, such as channels.
Examples
Basic usage.
use tokio::sync::Notify;
use std::sync::Arc;
#[tokio::main]
async fn main() {
let notify = Arc::new(Notify::new());
let notify2 = notify.clone();
tokio::spawn(async move {
notify2.notified().await;
println!("received notification");
});
println!("sending notification");
notify.notify();
}
Here is how Notify
can be used as a building block for an unbounded channel.
use tokio::sync::Notify;
use std::collections::VecDeque;
use std::sync::Mutex;
struct Channel<T> {
values: Mutex<VecDeque<T>>,
notify: Notify,
}
impl<T> Channel<T> {
pub fn send(&self, value: T) {
self.values.lock().unwrap()
.push_back(value);
// Notify the consumer a value is available
self.notify.notify();
}
pub async fn recv(&self) -> T {
loop {
// Drain values
if let Some(value) = self.values.lock().unwrap().pop_front() {
return value;
}
// Wait for values to be available
self.notify.notified().await;
}
}
}
Changes
Fixes
- net:
UnixStream::poll_shutdown
should callshutdown(Write)
(#2245). - process: Wake up read and write on
EPOLLERR
(#2218). - rt: potential deadlock when using
block_in_place
and shutting down the
runtime (#2119). - rt: only detect number of CPUs if
core_threads
not specified (#2238). - sync: reduce
watch::Receiver
struct size (#2191). - time: succeed when setting delay of
$MAX-1
(#2184). - time: avoid having to poll
DelayQueue
after inserting new delay (#2217).
Added
- macros:
pin!
variant that assigns to identifier and pins (#2274). - net: impl
Stream
forListener
types (#2275). - rt:
Runtime::shutdown_timeout
waits for runtime to shutdown for specified
duration (#2186). - stream:
StreamMap
merges streams and can insert / remove streams at
runtime (#2185). - stream:
StreamExt::skip()
skips a fixed number of items (#2204). - stream:
StreamExt::skip_while()
skips items based on a predicate (#2205). - sync:
Notify
provides basicasync
/await
task notification (#2210). - sync:
Mutex::into_inner
retrieves guarded data (#2250). - sync:
mpsc::Sender::send_timeout
sends, waiting for up to specified duration
for channel capacity (#2227). - time: impl
Ord
andHash
forInstant
(#2239).
Tokio v0.2.11
Introduces select!
, join!
, and try_join!
macros for waiting on multiple async operations concurrently from the same task. These macros are implemented primarily as declarative macros, which works around the recursion limit issue. The select!
macro works with any type that implements Future
and does not require special FusedFuture
traits.
Fixes
- docs: misc fixes and tweaks (#2155, #2103, #2027, #2167, #2175).
- macros: handle generics in
#[tokio::main]
method (#2177). - sync:
broadcast
potential lost notifications (#2135). - rt: improve "no runtime" panic messages (#2145).
Added
- optional support for using
parking_lot
internally (#2164). - fs:
fs::copy
, an async version ofstd::fs::copy
(#2079). - macros:
select!
waits for the first branch to complete (#2152). - macros:
join!
waits for all branches to complete (#2158). - macros:
try_join!
waits for all branches to complete or the first error (#2169). - macros:
pin!
pins a value to the stack (#2163). - net:
ReadHalf::poll()
andReadHalf::poll_peak
(#2151) - stream:
StreamExt::timeout()
sets a per-item max duration (#2149). - stream:
StreamExt::fold()
applies a function, producing a single value. (#2122). - sync: impl
Eq
,PartialEq
foroneshot::RecvError
(#2168). - task: methods for inspecting the
JoinError
cause (#2051).
Tokio v0.2.10
Introduces a task-local storage solution that works with Rust's "task" concept and supports multiplexing futures on the same runtime task (#2126). A number of other incremental improvements are included.
This release includes a few fixes, including fixing a scenario in which undefined behavior could be introduced when the user provided a buggy AsyncRead
implementation.
Fixes
#[tokio::main]
whenrt-core
feature flag is not enabled (#2139).- remove
AsyncBufRead
fromBufStream
impl block (#2108). - potential undefined behavior when implementing
AsyncRead
incorrectly (#2030).
Added
BufStream::with_capacity
(#2125).- impl
From
andDefault
forRwLock
(#2089). io::ReadHalf::is_pair_of
checks if providedWriteHalf
is for the same
underlying object (#1762, #2144).runtime::Handle::try_current()
returns a handle to the current runtime (#2118).stream::empty()
returns an immediately ready empty stream (#2092).stream::once(val)
returns a stream that yields a single value:val
(#2094).stream::pending()
returns a stream that never becomes ready (#2092).StreamExt::chain()
sequences a second stream after the first completes (#2093).StreamExt::collect()
transform a stream into a collection (#2109).StreamExt::fuse
ends the stream after the firstNone
(#2085).StreamExt::merge
combines two streams, yielding values as they become ready (#2091).- Task-local storage (#2126).
Tokio v0.2.9
Tokio v0.2.8
A breaking change was accidentally introduced in tokio-macros. The breaking change was reverted and Tokio v0.2.8 depends on the correct version of tokio-macros.
Fixes
- depend on new version of
tokio-macros
.
Tokio v0.2.7
This release includes both bug fixes and incremental improvements across most of Tokio. The primary bug fixes are to Runtime
configured with basic_scheduler
and task::LocalSet
.
Fixes
- potential deadlock when dropping
basic_scheduler
Runtime. - calling
spawn_blocking
from within aspawn_blocking
(#2006). - storing a
Runtime
instance in a thread-local (#2011). - miscellaneous documentation fixes.
- rt: fix
Waker::will_wake
to return true when tasks match (#2045). - test-util:
time::advance
runs pending tasks before changing the time (#2059).
Added
net::lookup_host
maps aT: ToSocketAddrs
to a stream ofSocketAddrs
(#1870).process::Child
fields are made public to matchstd
(#2014).- impl
Stream
forsync::broadcast::Receiver
(#2012). sync::RwLock
provides an asynchonous read-write lock (#1699).runtime::Handle::current
returns the handle for the current runtime (#2040).StreamExt::filter
filters stream values according to a predicate (#2001).StreamExt::filter_map
simultaneously filter and map stream values (#2001).StreamExt::try_next
convenience for streams ofResult<T, E>
(#2005).StreamExt::take
limits a stream to a specified number of values (#2025).StreamExt::take_while
limits a stream based on a predicate (#2029).StreamExt::all
tests if every element of the stream matches a predicate (#2035).StreamExt::any
tests if any element of the stream matches a predicate (#2034).task::LocalSet.await
runs spawned tasks until the set is idle (#1971).time::DelayQueue::len
returns the number entries in the queue (#1755).- expose runtime options from the
#[tokio::main]
and#[tokio::test]
(#2022).
Tokio v0.2.6
This release fixes an API regression introduced as part of v0.2.5.
Fixes
fs::File::seek
API regression (#1991).
Tokio v0.2.5
Includes new APIs, utilities, and fixes. Some highlights:
tokio::sync::broadcast
A multi-producer, multi-consumer channel where each sent value is sent to all consumers (fan-out). The channel is bounded and when consumers lag, they will receive an error indicating they have lagged too far behind.
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut rx1) = broadcast::channel(16);
let mut rx2 = tx.subscribe();
tokio::spawn(async move {
assert_eq!(rx1.recv().await.unwrap(), 10);
assert_eq!(rx1.recv().await.unwrap(), 20);
});
tokio::spawn(async move {
assert_eq!(rx2.recv().await.unwrap(), 10);
assert_eq!(rx2.recv().await.unwrap(), 20);
});
tx.send(10).unwrap();
tx.send(20).unwrap();
}
Senders never block. When the channel is full, the oldest value still held by the channel is overwritten. This tends to be the desired behavior in order to prevent slow consumers from blocking the entire system. However, you can use a Semaphore
(also added in this release) to ensure that all consumers see all messages.
tokio::sync::Semaphore
A counting synchronization primitive. It is used to limit a critical section to 1 or more concurrent tasks. For example, assume we wish to limit the number of in-flight database queries, we could do something like:
struct MyDbClient {
db: MyDbHandle,
semaphore: tokio::sync:Semaphore,
}
async fn query(client: &MyDbClient, query: Query) -> QueryResult {
let _permit = client.semaphore.acquire().await;
client.db.query(query).await
}
There may be any number of concurrent calls to query
, but the semaphore will limit the number that are able to concurrently perform the query.
Added
io::AsyncSeek
trait (#1924).Mutex::try_lock
(#1939)mpsc::Receiver::try_recv
andmpsc::UnboundedReceiver::try_recv
(#1939).writev
support forTcpStream
(#1956).time::throttle
for throttling streams (#1949).- implement
Stream
fortime::DelayQueue
(#1975). sync::broadcast
provides a fan-out channel (#1943).sync::Semaphore
provides an async semaphore (#1973).stream::StreamExt
provides stream utilities (#1962).
Fixes
- deadlock risk while shutting down the runtime (#1972).
- panic while shutting down the runtime (#1978).
sync::MutexGuard
debug output (#1961).- misc doc improvements (#1933, #1934, #1940, #1942).
Changes
- runtime threads are configured with
runtime::Builder::core_threads
and
runtime::Builder::max_threads
.runtime::Builder::num_threads
is
deprecated (#1977).
Tokio v0.2.4
A small release to fix a potential deadlock when using Mutex
.
Fixes
sync::Mutex
deadlock whenlock()
future is dropped early (#1898).