async-task
is one of the most complicated crates in the smol
ecosystem. But, fundamentally, it’s just a future on the heap.
I pride myself on smol
packages being very easy to parse for anyone with a beginner’s level of experience in Rust. By that I mean, if you want to know how smol
works, it should be very easy to pick up the source code, read through it, and understand how each individual part works.
Dependency Dog: Wait, do people normally read source code for fun?
notgull: No, I think that’s just a “me” thing.
There’s a few crates that are a little harder to take as bathroom reading. There’s polling
, which does a lot of low-level system interaction to make asynchronous I/O work. I’ve done my best to make it interesting, but there’s not a whole lot to say about a crate that’s basically following the OS’s instruction manual.
Then there’s async-task
. async-task
’s philosophy runs counter to the rest of smol
. When it comes to optimization, smol
generally tries to go for safety and reasonability over crazy optimizations with diminishing returns. For async-task
however, we take the gloves off. We go all out to make sure tasks are as small and use as few resources as possible.
notgull: This is actually because
async-task
predatessmol
! It was originally used as the task implementation forasync-std
.
I’d like to provide this series of blogposts as a reference for how async-task
works, how you might arrive to an implementation like async-task
organically, and how it was optimized into its current state.
notgull: As a heads up: most posts for this blog assume an intermediate knowledge of Rust. However, this post is intended for readers who may not already be familiar with concepts like executors or dynamic dispatch.
Of course, it may be a good idea to review the basics, even if you’re an expert.
Background Basics
Let’s say you have two Future
s; blocks of asynchronous code that can be ran concurrently. You want to run both of them at once.
// Future #1
let foo = async {
let x = my_function().await;
do_something(x).await;
};
// Future #2
let bar = async {
for _ in 0..50 {
respond_to_user().await;
}
};
Dependency Dog: Wait…
Future
?async
?await
? What’s that?
notgull: They’re Rust’s user-space concurrency building blocks! If you need a refresher on what these mean, it may be worth it to read the async book.
Running two futures at a time can be done very easily. First, we bring in the futures-lite
crate:
$ cargo add futures-lite
Updating crates.io index
Adding futures-lite v2.3.0 to dependencies
Features:
+ alloc
+ fastrand
+ futures-io
+ parking
+ race
+ std
- memchr
Updating crates.io index
Then, we can use the zip
combinator to run both Future
s at the same time.
Finally, we can use block_on
to poll the resulting Future
until it
completes. It looks like this:
use futures_lite::future;
// Run the two futures in parallel.
let combined = future::zip(foo, bar);
// Block on the combined future until it completes.
future::block_on(combined);
How the zip
combinator works is as follows:
- It tries to poll the first
Future
. If it is ready, it takes the result and saves it the memory. It remembers not to poll the firstFuture
again. - It does the same thing for the second
Future
. It polls it if it hasn’t finished. If it has, it saves the result. - Once both
Future
s are finished, it returns a tuple of the result.
Using this strategy, we can poll two Future
s at the same time. The
following diagram shows what this looks like in practice:
Note that, even though only one thread of execution is used, it appears as though the futures are run at the same time.
Scalability Solutions
The zip
combinator works for very simple cases of concurrency, but falls
apart for higher-level scenarios. Let’s say you want to poll four futures at once. (The horror!)
let baz = async { /* ... */ };
let cap = async { /* ... */ };
Then, you would need to call zip
three times!
let combined = future::zip(
future::zip(foo, bar),
future::zip(baz, cap)
);
You run into the some problems too, like:
- You can only run a fixed number of futures at once. If you might run a variable number of futures, you’re out of luck.
- What if you want to cancel one of the futures halfway through?
- Each future is polled every time
zip
is woken up. This means pollingzip
is anO(n)
operation. This is sometimes known as the “thundering herd” problem.
Let’s try to solve these problems. Without any prior art, I mean. We can solve
the “fixed number of futures” problem pretty easily. Consider the [slab
] crate,
which lets us set up an indexed list of objects. It’s similar to an arena. We can fit our futures in there.
$ cargo add slab
Updating crates.io index
Adding slab v0.4.9 to dependencies
Features:
+ std
- serde
Updating crates.io index
Let’s also box the Future
s, so we can use multiple different implementors of Future
in our same zip
.
use slab::Slab;
use std::future::Future;
use std::pin::Pin;
struct GiantZip {
// Completed futures are represented by `None`.
futures: Slab<Option<Pin<Box<dyn Future<Output = ()>>>>>
}
impl GiantZip {
fn new() -> Self {
Self {
futures: Slab::new()
}
}
}
Let’s have an insert
method that can be used to add a new Future
to this
new zip
-equivalent. It will return a key that can be used to look up the
future in our list.
impl GiantZip {
fn insert<F: Future<Output = ()> + 'static>(&mut self, future: F) -> usize {
self.futures.insert(Some(Box::pin(future)))
}
}
Dependency Dog: The
Future
needs to be'static
because it’s being boxed and pinned on the heap without a lifetime. It’s possible to work around this by adding a lifetime toGiantZip
here, but let’s keep it simple for now.
Finally, let’s make it so poll
ing the GiantZip
tries to resolve every one
of the futures contained within.
impl Future for GiantZip {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let mut unfinished = false;
for (_, future_slot) in self.futures.iter_mut() {
if let Some(future) = future_slot.as_mut() {
// Try to poll this future.
match future.as_mut().poll(cx) {
Poll::Ready(()) => {
// Set the future to `None`.
*future_slot = None;
},
Poll::Pending => {
// We are unfinished; return Pending.
unfinished = true;
}
}
}
}
if unfinished {
Poll::Pending
} else {
Poll::Ready(())
}
}
}
Finally, we can test this out on futures that actually do something.
$ cargo add async-channel
Updating crates.io index
Adding async-channel v2.2.0 to dependencies
Features:
+ std
Updating crates.io index
// Create a channel with a capacity of 1.
let (sender, recv) = async_channel::bounded(1);
// This is basically an `async fn` that sends a number over the channel.
let our_future = |i: i32| {
let sender = sender.clone();
async move { sender.send(i).await.ok(); }
};
// Create a future that reads from the channel.
let reader = async move {
for _ in 0..3 {
println!("{}", recv.recv().await.unwrap());
}
};
// Use the GiantZip to poll all of these at once.
let mut zipper = GiantZip::new();
zipper.insert(our_future(1));
zipper.insert(our_future(2));
zipper.insert(our_future(3));
zipper.insert(reader);
// Wait for them to finish.
future::block_on(zipper);
When we run it, we see this:
$ time cargo run -q
1
2
3
cargo run -q 0.03s user 0.03s system 93% cpu 0.064 total
That’s pretty fast, but that’s only because we have a low number of futures. If we have 10,000,000 futures (not an unrealistic number for a web server!), it will run much slower.
$ time cargo run -q
0
1
2
cargo run -q 9.16s user 0.70s system 100% cpu 9.803 total
notgull: It’s hard to express in the textual format, but each line had a few seconds’ delay between each of them. So it’s taking a while to get to the future that actually prints the line.
In addition to being inefficient, we’ve also stumbled upon another issue: GiantZip
is unfair. The reader
Future
is at the very end of the futures
list, which means it’s processed last when polling. Since a lot of the futures
end up being blocked on reader
, it means polling the GiantZip
takes a lot longer than it normally should.
Thankfully, we can solve the O(n)
problem and also (kind of) solve the fairness problem in one fell swoop. Instead of polling every future every time we poll,
we should only poll the ones whose Waker
s have been woken up. Since we know those ones are ready, we should only poll those.
Let’s add a queue structure to the GiantZip
that contains the indexes of the
futures that are ready to be woken. I’m wrapping it in an Arc
and a Mutex
for reasons that will become obvious later.
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
struct GiantZip {
// Completed futures are represented by `None`.
futures: Slab<Option<Pin<Box<dyn Future<Output = ()>>>>>,
// NEW: Queue of futures that are waiting to be woken up.
queue: Arc<Mutex<VecDeque<usize>>>,
}
impl GiantZip {
fn new() -> Self {
Self {
futures: Slab::new(),
queue: Arc::new(Mutex::new(VecDeque::new()))
}
}
}
Now, when we first insert
the Future
into the GiantZip
, we have to mark
it as ready. This is done by just pushing the index of the future into the queue.
impl GiantZip {
fn insert<F: Future<Output = ()> + 'static>(&mut self, future: F) -> usize {
// NEW: Save the index and push it to the back of the queue before returning.
let index = self.futures.insert(Some(Box::pin(future)));
self.queue.lock().unwrap().push_back(index);
index
}
}
We also need to have a way for the Future
to mark itself as ready. The Future
calls the Waker
when it is ready to be woken up, so we can just create a Waker
that wraps around the top-level Waker
, but also marks the current
future as ready.
We’ll bring in waker-fn
to make this easier. A Waker
is just a glorified callback, so we can easily represent it as one.
$ cargo add waker-fn
Updating crates.io index
Adding waker-fn v1.1.1 to dependencies
Updating crates.io index
Let’s make creating the Waker
a helper function on GiantZip
, to keep things
clean.
use waker_fn::waker_fn;
use std::task::Waker;
impl GiantZip {
/// Create a waker that wakes the future in the provided slot.
fn waker_for_slot(&self, index: usize, toplevel: &Waker) -> Waker {
// Clone shared resources.
// *This* is why we made `queue` wrapped in an `Arc`, by the way.
let queue = self.queue.clone();
let toplevel = toplevel.clone();
// Create a waker.
waker_fn(move || {
// Mark the future as ready.
queue.lock().unwrap().push_back(index);
// Wake the toplevel `block_on` waker, so the GiantZip poll()
// implementation is ran again.
toplevel.wake_by_ref();
})
}
}
Finally, we can adjust the poll()
implementation for GiantZip
such that it
pops from the queue instead of polling each and every future in the list.
impl GiantZip {
/// Get the next index in the list.
fn next_index(&self) -> Option<usize> {
self.queue.lock().unwrap().pop_front()
}
}
impl Future for GiantZip {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
// Get around Rust's pinning rules.
let this = self.get_mut();
// NEW: We drain the "queue" instead of iterating over every future.
// Make sure not to hold the lock while polling; if a future is woken by another future,
// it would deadlock otherwise.
while let Some(index) = this.next_index() {
// NEW: Create a waker to poll this future with.
let waker = this.waker_for_slot(index, cx.waker());
let mut slot_context = Context::from_waker(&waker);
let future_slot = match this.futures.get_mut(index) {
Some(slot) => slot,
None => continue
};
if let Some(future) = future_slot.as_mut() {
// Try to poll this future.
match future.as_mut().poll(&mut slot_context) {
Poll::Ready(()) => {
// Set the future to `None`.
*future_slot = None;
},
Poll::Pending => {}
}
}
}
if this.futures.iter().any(|(_, fut)| fut.is_some()) {
Poll::Pending
} else {
Poll::Ready(())
}
}
}
When we run the program, we see the following output:
$ time cargo run -q
0
1
2
cargo run -q 5.03s user 0.62s system 101% cpu 5.583 total
There is still quite a bit of contention caused by the initial burst of futures as well as the last burst of futures. But, there is no more delay between the printing of the numbers. This indicates that the runtime is being used more efficiently.
This also solves our cancellation problem; we can just add a remove
method to
the GiantZip
to remove a keyed future from the list.
impl GiantZip {
fn remove(&mut self, index: usize) {
self.futures.remove(index);
}
}
let key = zipper.insert(async { panic!() });
// Actually, might not be the best idea to run that task.
zipper.remove(key);
Now we’ve solved all of our problems… and introduced a million new ones.
Persistent Problems
I’ve deliberately made some mistakes in the above example, in order to illustrate how fixing those mistakes can lead to a very important data pattern. So let’s discuss those mistakes.
Dependency Dog: “Deliberately”, you say?
notgull: Hey, I’ll have you know, I’m just following the natural evolution of the
async
/await
pattern.
Dependency Dog: Did the “natural evolution” force you to re-allocate a new
Waker
every time you polled a future?
The first issue is that all of this is very inefficient. Ignoring our suboptimal queueing structure, we have three main allocations here:
- We have a
Vec
to hold our futures inside of. - Each individual
Future
requires its ownBox
. - Every time the
GiantZip
is polled we have to create aWaker
to poll it with. The [waker_fn
] crate allocates this inside of anArc
.
Specifically we should be concerned about the Waker
allocation, since it occurs on the hot path. We should try our best to make sure that we can create a Waker
without allocating.
There are some other persistent problems, like:
- It’s very easy to misuse the API.
usize
isn’t a great type to index by, for a collection of hotly-polled futures. - You can’t get the result of a
Future
after it completes… or use aFuture
that returns anything other than()
, for that matter. - It is very difficult (albeit not impossible) to remove a
Future
from the list while it is running.
We’ll begin to address these problems in the next blog post, when we start to build a real task abstraction.