Under the Hood

The Future Trait

A Future represents an asynchronous computation that can produce a value, with the poll function being at the heart of its mechanics. The poll function drives the future as far towards completion as possible.

A simplified version might look like this:


#![allow(unused)]
fn main() {
trait SimpleFuture {
  type Output;
  fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}

enum Poll<T> {
  Ready(T), // Returned when the SimpleFuture has completed
  Pending, // Otherwise this
}
}

If poll returns Pending, it arranges for the wake function to be called when the Future is ready to make more progress. When wake is called, the executor driving the Future will call poll again so that the Future can make moar progress.

The purpose of the wake callback is to tell the executor when a future can make progress. Without it, the exeuctor would have to be constantly polling.

The real Future trait is slightly different:


#![allow(unused)]
fn main() {
trait Future {
  type Output;
  fn poll(
    self: Pin<&mut Self>, // Stuck here forever
    cx: &mut Context<'_>,
  ) -> Poll<Self::Output>;
}
}

There are two key differences:

  • The future is Pin'd.
  • The wake function pointer is now Context. Using just a function pointer as before means we couldn't tell an executor which Future called wake. Context fixes that by providing access to a Waker, which wakes a specific task.

Pinned objects can store pointers to their own fields.

Task Wakeups with Waker

Its the role of a Waker to tell an executor that its future is ready to make more progress, via the wake function that it provides.

When wake is called, the task executor knows to poll the future again at the next available opportunity.

Wakers implement Clone and can be copied around and stored.

Build a Timer

To get started, we'll need these imports:


#![allow(unused)]
fn main() {
use std::{
  future::Future,
  pin::Pin,
  sync::{Arc, Mutex},
  task::{Context, Poll, Waker},
  thread,
  time::Duration,
};
}

We start by just defining the future type, which needs a way for the thread to communicate that the timer has elapsed and the future should complete, for which we'll use a shared Arc<Mutex<..>>.


#![allow(unused)]
fn main() {
pub struct TimerFuture {
  shared_state: Arc<Mutex<SharedState>>,
  // ^ the arc + mutex enables communication between thread and future
}

// This is the state shared by the waiting thread and future
struct SharedState {
  // Whether the sleep time has elapsed
  completed: bool,

  // This is the waker for the task that `TimerFuture` is running on.
  // The thread can use this after setting `completed = true` to tell
  // `TimerFuture`'s task to wake up and move forward.
  waker: Option<Waker>,
}
}

Now the implementation:


#![allow(unused)]
fn main() {
impl Future for TimerFuture {
  type Output = ();

  fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
    // Check the state to see if we've already completed
    let mut shared_state = self.shared_state.lock().unwrap();

    if shared_state.completed {
      Poll::Ready(())
    } else {
      // Set waker so that the thread can wake up the current task when the
      // timer has completed, ensuring that the future is polled again and sees
      // that completed is true.
      //
      // We have to set the waker of shared_state on each poll because TimerFuture
      // can move between tasks on the executor
      // TODO: Figure out what that sentence actually means
      shared_state.waker = Some(cx.waker().clone());
      Poll::Pending
    }
  }
}
}

And finally the API for constructing a TimerFuture and starting the thread:


#![allow(unused)]
fn main() {
// Now we actually implement the timer thread
impl TimerFuture {
  pub fn new(duration: Duration) -> Self {
    let shared_state = Arc::new(Mutex::new(SharedState {
      completed: false,
      waker: None,
    }));

    // Spawn it
    let thread_shared_state = shared_state.clone();
    thread::spawn(move || {
      thread::sleep(duration);
      let mut shared_state = thread_shared_state.lock().unwrap();
      // Signal that the timer has finished and wake up the last task
      // on which the future was polled, if there is one.
      // Remember, the `shared_state.waker` is being set inside the `poll`
      // function of `TimerFuture`.
      shared_state.completed = true;
      if let Some(waker) = shared_state.waker.take() {
        waker.wake()
      }
    });

    TimerFuture { shared_state }
  }
}
}

Figure out what "However, the TimerFuture can move between tasks on the executor, which could cause a stale waker pointing to the wrong task" actually means.

Applied: Build an Executor

Concept: A future executor takes a set of top-level Futures and runs them to completion by calling their poll functions whenever the future is able to make progress.

Term: A task is just a future that can reschedule itself, usually paired with a sender so that it can requeue itself in the executor.

The process looks a bit like this:

  • Executor sends tasks that needs to be run over a channel
  • An executor will poll its futures once to get things started
  • A task will then call wake(), which schedules itself to be polled again by putting itself back onto the chanenl
  • The executor puts the woken-up future onto a queue, and poll is called again

In this process, the the executor itself only needs the receiving end of the task channel. The user of the executor will get a sending end so that new futures can be spawned.

Let's create an executor for our timer. We'll need to use the ArcWake trait, which provides an easy way to construct a Waker. These are the imports we'll need, in addition to those we used with the timer future implementation section:


#![allow(unused)]
fn main() {
use {
  futures::{
    future::{BoxFuture},
    task::{waker_ref, ArcWake},
  },
  std::sync::mpsc::{sync_channel, Receiver, SyncSender},
};
}

The executor will work by sending tasks to run over a channel. It'll pull events off of the channel and run them.


#![allow(unused)]
fn main() {
/// Task executor that receives tasks from a channel and runs them
struct Executor {
  ready_queue: Receiver<Arc<Task>>,
}

/// This spawns new futures onto the task channel
#[derive(Clone)]
struct Spawner {
  task_sender: SyncSender<Arc<Task>>,
}

/// A Task is a future that can reschedule itself to be polled by an Executor
struct Task {
  // Contains and in-progress future that needs to be pushed to completion.
  // The `Mutex` is here to prove to Rust that this is thread-safe.
  future: Mutex<Option<BoxFuture<'static, ()>>>,

  // Handle to place the task itself back onto the task queue.
  task_sender: SyncSender<Arc<Task>>,
}

fn new_executor_and_spawner() -> (Executor, Spawner) {
  let (task_sender, ready_queue) = sync_channel(10_000);
  (Executor { ready_queue }, Spawner { task_sender })
}
}

Let's also create add a method to Spawner that makes it easy to spawn new futures.


#![allow(unused)]
fn main() {
impl Spawner {
  fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
    let task = Arc::new(Task {
      future: Mutex::new(Some(Box::pin(future))),
      task_sender: self.task_sender.clone(),
    });

    self
      .task_sender
      .send(task)
      .expect("Too many tasks are queued!");
  }
}
}

Now we need to implement a Waker (using ArcWake) for our Task, which will be responsible for scheduling a task to be polled again after wake is called.

Remember: Wakers have to specify which task has become ready.


#![allow(unused)]
fn main() {
impl ArcWake for Task {
  fn wake_by_ref(arc_self: &Arc<Self>) {
    // Implement `wake` by sending this task back onto the task channel
    // so that it'll be polled again by the executor.
    let cloned = arc_self.clone();
    arc_self
      .task_sender
      .send(cloned)
      .expect("Too many tasks are queued!");
  }
}
}

So now, when we create a waker, calling wake on it will send a copy of the Arc to be sent into the task channel.

Last step is to tell our Executor how to pick up the task and poll it.


#![allow(unused)]
fn main() {
impl Executor {
  fn run(&self) {
    while let Ok(task) = self.ready_queue.recv() {
      // Take the future, and if it has not completed yet (is still Some),
      // poll it in an attempt to complete it.
      let mut future_slot = task.future.lock().unwrap();
      if let Some(mut future) = future_slot.take() {
        // Create a `LocalWaker` from the task itself
        let waker = waker_ref(&task);
        let context = &mut Context::from_waker(&*waker);
        if let Poll::Pending = future.as_mut().poll(context) {
          // This future isn't done yet, so put it back in its task to be run again later
          *future_slot = Some(future);
        }
      }
    }
  }
}
}

FINALLY, we can run it:

fn main() {
  let (executor, spawner) = new_executor_and_spawner();
  // Spawn a task to print before and after waiting on a timer
  spawner.spawn(async {
    println!("Wait for itttt....");
    TimerFuture::new(Duration::new(2, 0)).await;
    println!("NOW!");
  });

  // Drop the spawner so that our executor knows it is finished
  // and won't receive anymore tasks to run.
  drop(spawner);

  // Run the executor until the task queue is empty
  executor.run();
}