futures
comes with many additional combinators that smol
doesn’t have
out of the box. We can rebuild them, better.
Whew, it’s almost been a month since my last blogpost here. This was because I was spending time doing research and testing, and not because I lost the PGP key that allows me to upload to this site. No sir, how could anyone be that irresponsible?
…or maybe I was just using the PGP key as an excuse not to write? It’s not like I’m being paid to psychoanalyze myself in front of you people.
It doesn’t matter, we’re back! Let’s talk about smol
.
The Problem with futures
futures
was originally released in 2016 to provide an implementation of
asynchronous programming for Rust. In the time since, it’s accumulated a lot of
baggage. Many of its combinators have been superseded by the async
/await
syntax, meaning that a large amount of its API surface isn’t relevant anymore.
For instance, take the Map
combinator. It takes the value of some
Future
and maps the return value through some closure.
use futures::prelude::*;
let fut = async { 1 };
let mapped_fut = fut.map(|x| x * 2);
In the pre-2018-edition days, these combinators were the only way to manipulate
the value of a Future
. They were completely necessary for using these
asynchronous values back in the day. Nowadays, we can just wrap the original
futures using async
/await
and treat it more like a normal expression.
let fut = async { 1 };
let mapped_fut = async { fut.await * 2 };
Therefore, in this brave new post-async
world, many of these combinators
became unnecessary. futures-lite
, one of the core components of smol
,
was created to address this new reality.
futures-lite
explicitly ignores all combinators that can be implemented
using async
/await
or features that have already been moved into the standard
library. This leaves behind a small, clean subset of the API that builds fast.
A semi-frequently asked question I see goes along the lines of: “I was
porting my application over to smol
, but I noticed that it doesn’t have
for_each_concurrent
or buffered
. Is this API excluded purposely?”
This is a reasonable question. The short answer is “yes”, and the medium answer
is “these concurrent functions have small but frustrating problems that
futures-lite
avoids by not implementing them”. This article is the longer
answer.
Concurrency Conundrum
I would argue that the concurrent Stream
stream combinators mentioned above
are a code smell. Well-formed production-ready code should not use
for_each_concurrent
or buffered
. If I knew how compilers worked, I would
suggest a Clippy lint that would flag these functions as a warning.
The for_each_concurrent
function is called like
this:
let my_stream = /* ... */;
my_stream.for_each_concurrent(
None, // Limiter parameter, not important for now.
|x| async move { do_something(x).await } // Closure to run for each element.
).await;
To massively oversimplify, for_each_concurrent
does this:
use futures::prelude::*;
use futures::stream::FuturesUnordered;
let my_stream = /* ... */;
// Create a list to store all of the futures.
let mut futures_list = FuturesUnordered::new();
// Get all of the values from our stream.
while let Some(x) = my_stream.next().await {
// Push the future to the list.
futures_list.push(async move { x + 5 });
}
// Wait for all of the futures to complete. FuturesUnordered polls each future
// in order and returns their results.
futures_list.for_each(|()| {}).await;
FuturesUnordered
is sort of an unholy combination of an executor and a
Stream
. It collects a bunch of Future
s into a list, then maintains a
queue of which Future
s are ready to be polled. Once a Future
returns
that it is Ready
, it returns that Future
’s value.
This means that, every time you call for_each_concurrent
, it creates an
entire new executor, runs the Stream
to completion on it, then discards that
executor entirely. buffered
is implemented in a similar way.
It’s bad for a couple of reasons. Most async
runtimes already provide an
executor. tokio
provides one out of the box, and smol
encourages you to
create and optimize your own. By using for_each_concurrent
or buffered
,
you are essentially ignoring your previous executor in order to spawn a
short-lived temporary executor.
In addition to the resources that are wasted on the new executor, it’s often
less efficient than async
runtime executors. tokio
and smol
support
options to let you offload tasks on other threads or handle contention more
efficiently. In contrast, FuturesUnordered
is a relatively naive executor
that is completely unaware of its surrounding runtime.
Not to mention, FuturesUnordered
comes with a few footguns that make it
impractical for common use cases.
Make your own, better combinator
In smol
, you can emulate these use cases somewhat easily. First, you need to
create an Executor
and execute your features in its context.
let ex = smol::Executor::new();
ex.run(async {
// The code written below will take place in this context.
}).await;
You can emulate for_each_concurrent
by turning every future in the stream into
a task, then await
ing all of those tasks. Here’s how it looks if you don’t
have a task limit:
use smol::prelude::*;
let my_stream = smol::stream::iter(vec![1u32, 2, 3]);
// Spawn the set of futures on an executor.
let handles: Vec<smol::Task<()>> = my_stream
.map(|item| {
// Spawn the future on the executor.
ex.spawn(do_something(item))
}).collect().await;
// Wait for all of the handles to complete.
for handle in handles {
handle.await;
}
Here, we spawn every future involved onto the executor. We then take all of the
task handles and collect them. Since we are running inside of the executor, all
of these tasks will be run in parallel. Finally, we just .await
on each handle
to wait for all of the tasks to complete.
The best part is that the allocation, the Vec<smol::Task<()>>
, isn’t even
necessary. It could be one-time allocation that is just extended to hold the
tasks.
Generally, it doesn’t matter how many tasks are spawned onto the global
executor. In contrast to the mini-executor that for_each_concurrent
spawns,
the global executor is designed to handle large numbers of tasks. However, if
you still want to impose a resource limit, you can use a Semaphore
.
use smol::prelude::*;
use std::sync::Arc;
let my_stream = smol::stream::iter(vec![1u32, 2, 3]);
let my_limit = 5;
// Semaphore for limiting the number of tasks.
let semaphore = Arc::new(smol::lock::Semaphore::new(my_limit));
// Spawn the set of futures on an executor.
let handles: Vec<smol::Task<()>> = my_stream
// We use using `then` now, since we need to `.await` for the
// semaphore to have a permit available.
.then(|item| {
// Borrow the semaphore and executor.
let (ex, semaphore) = (&ex, &semaphore);
async move {
// Wait for a semaphore permit.
let permit = semaphore.acquire_arc().await;
// Spawn the future on the executor.
ex.spawn(async move {
// Run our future.
do_something(item).await;
// Drop the permit to let another task run.
drop(permit);
})
}
})
.collect()
.await;
// Wait for the remaining handles to complete.
for handle in handles {
handle.await;
}
This works by having each task borrow a Semaphore
permit. The semaphore is
sort of like a Mutex
that can be locked by multiple parties at once, up to a
certain limit. Once it runs out of permits, this code doesn’t spawn a task until
one of the tasks completes. The permit is moved into the task and is dropped
once the computation completes.
The then
-stream above is also practically the equivalent of buffered
. It
yields tasks that can then be await
ed to get their results.
// snip: semaphore and stream setup
// This time, we do something else that maps the value to another.
async fn do_something_else(x: u32) -> u32 { x + 1 }
// Get a `Stream` of tasks that can be `await`ed to get their value.
let buffered_stream = my_stream
.then(|item| {
// Borrow the semaphore and executor.
let (ex, semaphore) = (&ex, &semaphore);
async move {
// Wait for a semaphore permit.
let permit = semaphore.acquire_arc().await;
// Spawn the future on the executor.
ex.spawn(async move {
// Run our future.
// NEW: This now returns a value.
let result = do_something_else(item).await;
// Drop the permit to let another task run.
drop(permit);
// NEW: Return the result of the inner future.
result
})
}
});
// NEW: Because the stream uses an unpinned async closure,
// we have to pin it.
smol::pin!(buffered_stream);
// NEW: We can now wait on this stream for its values.
while let Some(task) = buffered_stream.next().await {
println!("Value: {}", task.await);
}
This is all practically more efficient than buffered
while giving you much
greater control over how it runs.
Parting Shots
Unfortunately there’s not much documentation for the fact that for_each_concurrent
and buffered
spawn their own separate executors. Raising awareness of proper
async
code is powerful, in my opinion, as it unlocks a whole new world of
computations for intermediately experienced Rustaceans. I hope this makes it
clearer what should be happening in well-formed code.