Under the Hood
The Future
Trait
A Future
represents an asynchronous computation that can produce a value, with the poll
function being at the heart of its mechanics. The poll
function drives the future as far towards completion as possible.
A simplified version might look like this:
#![allow(unused)] fn main() { trait SimpleFuture { type Output; fn poll(&mut self, wake: fn()) -> Poll<Self::Output>; } enum Poll<T> { Ready(T), // Returned when the SimpleFuture has completed Pending, // Otherwise this } }
If poll
returns Pending
, it arranges for the wake
function to be called when the Future
is ready to make more progress. When wake
is called, the executor driving the Future
will call poll
again so that the Future
can make moar progress.
The purpose of the wake
callback is to tell the executor when a future can make progress. Without it, the exeuctor would have to be constantly polling.
The real Future
trait is slightly different:
#![allow(unused)] fn main() { trait Future { type Output; fn poll( self: Pin<&mut Self>, // Stuck here forever cx: &mut Context<'_>, ) -> Poll<Self::Output>; } }
There are two key differences:
- The future is
Pin
'd. - The
wake
function pointer is nowContext
. Using just a function pointer as before means we couldn't tell an executor which Future calledwake
. Context fixes that by providing access to aWaker
, which wakes a specific task.
Pinned objects can store pointers to their own fields.
Task Wakeups with Waker
Its the role of a Waker
to tell an executor that its future is ready to make more progress, via the wake
function that it provides.
When wake
is called, the task executor knows to poll the future again at the next available opportunity.
Waker
s implement Clone
and can be copied around and stored.
Build a Timer
To get started, we'll need these imports:
#![allow(unused)] fn main() { use std::{ future::Future, pin::Pin, sync::{Arc, Mutex}, task::{Context, Poll, Waker}, thread, time::Duration, }; }
We start by just defining the future type, which needs a way for the thread to communicate that the timer has elapsed and the future should complete, for which we'll use a shared Arc<Mutex<..>>
.
#![allow(unused)] fn main() { pub struct TimerFuture { shared_state: Arc<Mutex<SharedState>>, // ^ the arc + mutex enables communication between thread and future } // This is the state shared by the waiting thread and future struct SharedState { // Whether the sleep time has elapsed completed: bool, // This is the waker for the task that `TimerFuture` is running on. // The thread can use this after setting `completed = true` to tell // `TimerFuture`'s task to wake up and move forward. waker: Option<Waker>, } }
Now the implementation:
#![allow(unused)] fn main() { impl Future for TimerFuture { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { // Check the state to see if we've already completed let mut shared_state = self.shared_state.lock().unwrap(); if shared_state.completed { Poll::Ready(()) } else { // Set waker so that the thread can wake up the current task when the // timer has completed, ensuring that the future is polled again and sees // that completed is true. // // We have to set the waker of shared_state on each poll because TimerFuture // can move between tasks on the executor // TODO: Figure out what that sentence actually means shared_state.waker = Some(cx.waker().clone()); Poll::Pending } } } }
And finally the API for constructing a TimerFuture
and starting the thread:
#![allow(unused)] fn main() { // Now we actually implement the timer thread impl TimerFuture { pub fn new(duration: Duration) -> Self { let shared_state = Arc::new(Mutex::new(SharedState { completed: false, waker: None, })); // Spawn it let thread_shared_state = shared_state.clone(); thread::spawn(move || { thread::sleep(duration); let mut shared_state = thread_shared_state.lock().unwrap(); // Signal that the timer has finished and wake up the last task // on which the future was polled, if there is one. // Remember, the `shared_state.waker` is being set inside the `poll` // function of `TimerFuture`. shared_state.completed = true; if let Some(waker) = shared_state.waker.take() { waker.wake() } }); TimerFuture { shared_state } } } }
Figure out what "However, the TimerFuture
can move between tasks on the executor, which could cause a stale waker pointing to the wrong task" actually means.
Applied: Build an Executor
Concept: A future executor takes a set of top-level
Future
s and runs them to completion by calling theirpoll
functions whenever the future is able to make progress.
Term: A task is just a future that can reschedule itself, usually paired with a sender so that it can requeue itself in the executor.
The process looks a bit like this:
- Executor sends tasks that needs to be run over a channel
- An executor will
poll
its futures once to get things started - A task will then call
wake()
, which schedules itself to be polled again by putting itself back onto the chanenl - The executor puts the woken-up future onto a queue, and
poll
is called again
In this process, the the executor itself only needs the receiving end of the task channel. The user of the executor will get a sending end so that new futures can be spawned.
Let's create an executor for our timer. We'll need to use the ArcWake
trait, which provides an easy way to construct a Waker
. These are the imports we'll need, in addition to those we used with the timer future implementation section:
#![allow(unused)] fn main() { use { futures::{ future::{BoxFuture}, task::{waker_ref, ArcWake}, }, std::sync::mpsc::{sync_channel, Receiver, SyncSender}, }; }
The executor will work by sending tasks to run over a channel. It'll pull events off of the channel and run them.
#![allow(unused)] fn main() { /// Task executor that receives tasks from a channel and runs them struct Executor { ready_queue: Receiver<Arc<Task>>, } /// This spawns new futures onto the task channel #[derive(Clone)] struct Spawner { task_sender: SyncSender<Arc<Task>>, } /// A Task is a future that can reschedule itself to be polled by an Executor struct Task { // Contains and in-progress future that needs to be pushed to completion. // The `Mutex` is here to prove to Rust that this is thread-safe. future: Mutex<Option<BoxFuture<'static, ()>>>, // Handle to place the task itself back onto the task queue. task_sender: SyncSender<Arc<Task>>, } fn new_executor_and_spawner() -> (Executor, Spawner) { let (task_sender, ready_queue) = sync_channel(10_000); (Executor { ready_queue }, Spawner { task_sender }) } }
Let's also create add a method to Spawner
that makes it easy to spawn new futures.
#![allow(unused)] fn main() { impl Spawner { fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) { let task = Arc::new(Task { future: Mutex::new(Some(Box::pin(future))), task_sender: self.task_sender.clone(), }); self .task_sender .send(task) .expect("Too many tasks are queued!"); } } }
Now we need to implement a Waker
(using ArcWake
) for our Task
, which will be responsible for scheduling a task to be polled again after wake is called.
Remember:
Waker
s have to specify which task has become ready.
#![allow(unused)] fn main() { impl ArcWake for Task { fn wake_by_ref(arc_self: &Arc<Self>) { // Implement `wake` by sending this task back onto the task channel // so that it'll be polled again by the executor. let cloned = arc_self.clone(); arc_self .task_sender .send(cloned) .expect("Too many tasks are queued!"); } } }
So now, when we create a waker, calling wake
on it will send a copy of the Arc
to be sent into the task channel.
Last step is to tell our Executor
how to pick up the task and poll it.
#![allow(unused)] fn main() { impl Executor { fn run(&self) { while let Ok(task) = self.ready_queue.recv() { // Take the future, and if it has not completed yet (is still Some), // poll it in an attempt to complete it. let mut future_slot = task.future.lock().unwrap(); if let Some(mut future) = future_slot.take() { // Create a `LocalWaker` from the task itself let waker = waker_ref(&task); let context = &mut Context::from_waker(&*waker); if let Poll::Pending = future.as_mut().poll(context) { // This future isn't done yet, so put it back in its task to be run again later *future_slot = Some(future); } } } } } }
FINALLY, we can run it:
fn main() { let (executor, spawner) = new_executor_and_spawner(); // Spawn a task to print before and after waiting on a timer spawner.spawn(async { println!("Wait for itttt...."); TimerFuture::new(Duration::new(2, 0)).await; println!("NOW!"); }); // Drop the spawner so that our executor knows it is finished // and won't receive anymore tasks to run. drop(spawner); // Run the executor until the task queue is empty executor.run(); }