Explaining the internals of async-task from the ground up

John Nunley · March 30, 2024

async-task is one of the most complicated crates in the smol ecosystem. But, fundamentally, it’s just a future on the heap.

I pride myself on smol packages being very easy to parse for anyone with a beginner’s level of experience in Rust. By that I mean, if you want to know how smol works, it should be very easy to pick up the source code, read through it, and understand how each individual part works.

Dependency Dog Dependency Dog: Wait, do people normally read source code for fun?

Notgull notgull: No, I think that’s just a “me” thing.

There’s a few crates that are a little harder to take as bathroom reading. There’s polling, which does a lot of low-level system interaction to make asynchronous I/O work. I’ve done my best to make it interesting, but there’s not a whole lot to say about a crate that’s basically following the OS’s instruction manual.

Then there’s async-task. async-task’s philosophy runs counter to the rest of smol. When it comes to optimization, smol generally tries to go for safety and reasonability over crazy optimizations with diminishing returns. For async-task however, we take the gloves off. We go all out to make sure tasks are as small and use as few resources as possible.

Notgull notgull: This is actually because async-task predates smol! It was originally used as the task implementation for async-std.

I’d like to provide this series of blogposts as a reference for how async-task works, how you might arrive to an implementation like async-task organically, and how it was optimized into its current state.

Notgull notgull: As a heads up: most posts for this blog assume an intermediate knowledge of Rust. However, this post is intended for readers who may not already be familiar with concepts like executors or dynamic dispatch.

Of course, it may be a good idea to review the basics, even if you’re an expert.

Background Basics

Let’s say you have two Futures; blocks of asynchronous code that can be ran concurrently. You want to run both of them at once.

// Future #1
let foo = async {
    let x = my_function().await;
    do_something(x).await;
};

// Future #2
let bar = async {
    for _ in 0..50 {
        respond_to_user().await;
    }
};

Dependency Dog Dependency Dog: Wait… Future? async? await? What’s that?

Notgull notgull: They’re Rust’s user-space concurrency building blocks! If you need a refresher on what these mean, it may be worth it to read the async book.

Running two futures at a time can be done very easily. First, we bring in the futures-lite crate:

$ cargo add futures-lite
    Updating crates.io index
      Adding futures-lite v2.3.0 to dependencies
             Features:
             + alloc
             + fastrand
             + futures-io
             + parking
             + race
             + std
             - memchr
    Updating crates.io index

Then, we can use the zip combinator to run both Futures at the same time. Finally, we can use block_on to poll the resulting Future until it completes. It looks like this:

use futures_lite::future;

// Run the two futures in parallel.
let combined = future::zip(foo, bar);

// Block on the combined future until it completes.
future::block_on(combined);

How the zip combinator works is as follows:

  • It tries to poll the first Future. If it is ready, it takes the result and saves it the memory. It remembers not to poll the first Future again.
  • It does the same thing for the second Future. It polls it if it hasn’t finished. If it has, it saves the result.
  • Once both Futures are finished, it returns a tuple of the result.

Using this strategy, we can poll two Futures at the same time. The following diagram shows what this looks like in practice:

zip diagram

Note that, even though only one thread of execution is used, it appears as though the futures are run at the same time.

Scalability Solutions

The zip combinator works for very simple cases of concurrency, but falls apart for higher-level scenarios. Let’s say you want to poll four futures at once. (The horror!)

let baz = async { /* ... */ };
let cap = async { /* ... */ };

Then, you would need to call zip three times!

let combined = future::zip(
    future::zip(foo, bar),
    future::zip(baz, cap)
);

You run into the some problems too, like:

  • You can only run a fixed number of futures at once. If you might run a variable number of futures, you’re out of luck.
  • What if you want to cancel one of the futures halfway through?
  • Each future is polled every time zip is woken up. This means polling zip is an O(n) operation. This is sometimes known as the “thundering herd” problem.

Let’s try to solve these problems. Without any prior art, I mean. We can solve the “fixed number of futures” problem pretty easily. Consider the [slab] crate, which lets us set up an indexed list of objects. It’s similar to an arena. We can fit our futures in there.

$ cargo add slab
    Updating crates.io index
      Adding slab v0.4.9 to dependencies
             Features:
             + std
             - serde
    Updating crates.io index

Let’s also box the Futures, so we can use multiple different implementors of Future in our same zip.

use slab::Slab;
use std::future::Future;
use std::pin::Pin;

struct GiantZip {
    // Completed futures are represented by `None`.
    futures: Slab<Option<Pin<Box<dyn Future<Output = ()>>>>>
}

impl GiantZip {
    fn new() -> Self {
        Self {
            futures: Slab::new()
        }
    }
}

Let’s have an insert method that can be used to add a new Future to this new zip-equivalent. It will return a key that can be used to look up the future in our list.

impl GiantZip {
    fn insert<F: Future<Output = ()> + 'static>(&mut self, future: F) -> usize {
        self.futures.insert(Some(Box::pin(future)))
    }
}

Dependency Dog Dependency Dog: The Future needs to be 'static because it’s being boxed and pinned on the heap without a lifetime. It’s possible to work around this by adding a lifetime to GiantZip here, but let’s keep it simple for now.

Finally, let’s make it so polling the GiantZip tries to resolve every one of the futures contained within.

impl Future for GiantZip {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        let mut unfinished = false;

        for (_, future_slot) in self.futures.iter_mut() {
            if let Some(future) = future_slot.as_mut() {
                // Try to poll this future.
                match future.as_mut().poll(cx) {
                    Poll::Ready(()) => {
                        // Set the future to `None`.
                        *future_slot = None;
                    },

                    Poll::Pending => {
                        // We are unfinished; return Pending.
                        unfinished = true; 
                    }
                }
            }
        }

        if unfinished {
            Poll::Pending
        } else {
            Poll::Ready(())
        }
    }
}

Finally, we can test this out on futures that actually do something.

$ cargo add async-channel
    Updating crates.io index
      Adding async-channel v2.2.0 to dependencies
             Features:
             + std
    Updating crates.io index
// Create a channel with a capacity of 1.
let (sender, recv) = async_channel::bounded(1);

// This is basically an `async fn` that sends a number over the channel.
let our_future = |i: i32| {
    let sender = sender.clone();
    async move { sender.send(i).await.ok(); }
};

// Create a future that reads from the channel.
let reader = async move {
    for _ in 0..3 {
        println!("{}", recv.recv().await.unwrap());
    }
};

// Use the GiantZip to poll all of these at once.
let mut zipper = GiantZip::new();
zipper.insert(our_future(1));
zipper.insert(our_future(2));
zipper.insert(our_future(3));
zipper.insert(reader);

// Wait for them to finish.
future::block_on(zipper);

When we run it, we see this:

$ time cargo run -q
1
2
3
cargo run -q  0.03s user 0.03s system 93% cpu 0.064 total

That’s pretty fast, but that’s only because we have a low number of futures. If we have 10,000,000 futures (not an unrealistic number for a web server!), it will run much slower.

$ time cargo run -q
0
1
2
cargo run -q  9.16s user 0.70s system 100% cpu 9.803 total

Notgull notgull: It’s hard to express in the textual format, but each line had a few seconds’ delay between each of them. So it’s taking a while to get to the future that actually prints the line.

In addition to being inefficient, we’ve also stumbled upon another issue: GiantZip is unfair. The reader Future is at the very end of the futures list, which means it’s processed last when polling. Since a lot of the futures end up being blocked on reader, it means polling the GiantZip takes a lot longer than it normally should.

Thankfully, we can solve the O(n) problem and also (kind of) solve the fairness problem in one fell swoop. Instead of polling every future every time we poll, we should only poll the ones whose Wakers have been woken up. Since we know those ones are ready, we should only poll those.

Let’s add a queue structure to the GiantZip that contains the indexes of the futures that are ready to be woken. I’m wrapping it in an Arc and a Mutex for reasons that will become obvious later.

use std::collections::VecDeque;
use std::sync::{Arc, Mutex};

struct GiantZip {
    // Completed futures are represented by `None`.
    futures: Slab<Option<Pin<Box<dyn Future<Output = ()>>>>>,

    // NEW: Queue of futures that are waiting to be woken up.
    queue: Arc<Mutex<VecDeque<usize>>>,
}

impl GiantZip {
    fn new() -> Self {
        Self {
            futures: Slab::new(),
            queue: Arc::new(Mutex::new(VecDeque::new()))
        }
    }
}

Now, when we first insert the Future into the GiantZip, we have to mark it as ready. This is done by just pushing the index of the future into the queue.

impl GiantZip {
    fn insert<F: Future<Output = ()> + 'static>(&mut self, future: F) -> usize {
        // NEW: Save the index and push it to the back of the queue before returning.
        let index = self.futures.insert(Some(Box::pin(future)));
        self.queue.lock().unwrap().push_back(index);
        index
    }
}

We also need to have a way for the Future to mark itself as ready. The Future calls the Waker when it is ready to be woken up, so we can just create a Waker that wraps around the top-level Waker, but also marks the current future as ready.

We’ll bring in waker-fn to make this easier. A Waker is just a glorified callback, so we can easily represent it as one.

$ cargo add waker-fn
    Updating crates.io index
      Adding waker-fn v1.1.1 to dependencies
    Updating crates.io index

Let’s make creating the Waker a helper function on GiantZip, to keep things clean.

use waker_fn::waker_fn;
use std::task::Waker;

impl GiantZip {
    /// Create a waker that wakes the future in the provided slot.
    fn waker_for_slot(&self, index: usize, toplevel: &Waker) -> Waker {
        // Clone shared resources.
        // *This* is why we made `queue` wrapped in an `Arc`, by the way.
        let queue = self.queue.clone();
        let toplevel = toplevel.clone();

        // Create a waker.
        waker_fn(move || {
            // Mark the future as ready.
            queue.lock().unwrap().push_back(index);

            // Wake the toplevel `block_on` waker, so the GiantZip poll()
            // implementation is ran again.
            toplevel.wake_by_ref();
        })
    }
}

Finally, we can adjust the poll() implementation for GiantZip such that it pops from the queue instead of polling each and every future in the list.

impl GiantZip {
    /// Get the next index in the list.
    fn next_index(&self) -> Option<usize> {
        self.queue.lock().unwrap().pop_front()
    }
}

impl Future for GiantZip {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        // Get around Rust's pinning rules.
        let this = self.get_mut();

        // NEW: We drain the "queue" instead of iterating over every future.
        // Make sure not to hold the lock while polling; if a future is woken by another future,
        // it would deadlock otherwise.
        while let Some(index) = this.next_index() {
            // NEW: Create a waker to poll this future with.
            let waker = this.waker_for_slot(index, cx.waker());
            let mut slot_context = Context::from_waker(&waker);

            let future_slot = match this.futures.get_mut(index) {
                Some(slot) => slot,
                None => continue
            };
            if let Some(future) = future_slot.as_mut() {
                // Try to poll this future.
                match future.as_mut().poll(&mut slot_context) {
                    Poll::Ready(()) => {
                        // Set the future to `None`.
                        *future_slot = None;
                    },

                    Poll::Pending => {}
                }
            }
        }

        if this.futures.iter().any(|(_, fut)| fut.is_some()) {
            Poll::Pending
        } else {
            Poll::Ready(())
        }
    }
}

When we run the program, we see the following output:

$ time cargo run -q
0
1
2
cargo run -q  5.03s user 0.62s system 101% cpu 5.583 total

There is still quite a bit of contention caused by the initial burst of futures as well as the last burst of futures. But, there is no more delay between the printing of the numbers. This indicates that the runtime is being used more efficiently.

This also solves our cancellation problem; we can just add a remove method to the GiantZip to remove a keyed future from the list.

impl GiantZip {
    fn remove(&mut self, index: usize) {
        self.futures.remove(index);
    }
}

let key = zipper.insert(async { panic!() });
// Actually, might not be the best idea to run that task.
zipper.remove(key);

Now we’ve solved all of our problems… and introduced a million new ones.

Persistent Problems

I’ve deliberately made some mistakes in the above example, in order to illustrate how fixing those mistakes can lead to a very important data pattern. So let’s discuss those mistakes.

Dependency Dog Dependency Dog: “Deliberately”, you say?

Notgull notgull: Hey, I’ll have you know, I’m just following the natural evolution of the async/await pattern.

Dependency Dog Dependency Dog: Did the “natural evolution” force you to re-allocate a new Waker every time you polled a future?

The first issue is that all of this is very inefficient. Ignoring our suboptimal queueing structure, we have three main allocations here:

  • We have a Vec to hold our futures inside of.
  • Each individual Future requires its own Box.
  • Every time the GiantZip is polled we have to create a Waker to poll it with. The [waker_fn] crate allocates this inside of an Arc.

Specifically we should be concerned about the Waker allocation, since it occurs on the hot path. We should try our best to make sure that we can create a Waker without allocating.

There are some other persistent problems, like:

  • It’s very easy to misuse the API. usize isn’t a great type to index by, for a collection of hotly-polled futures.
  • You can’t get the result of a Future after it completes… or use a Future that returns anything other than (), for that matter.
  • It is very difficult (albeit not impossible) to remove a Future from the list while it is running.

We’ll begin to address these problems in the next blog post, when we start to build a real task abstraction.

Twitter, Facebook

This website's source code is hosted via Codeberg