Task

A task is the executor's internal representation for a unit of work submitted by the programmer.

A task is created when a programmer spawns a task with a future. For example:

let fut = async { 1 + 2 };
local_ex.spawn(fut);

When spawn is called, the executor takes the future and creates a task. The task stores these properties in addition to the future:

  • state
  • output
  • waker
  • references

State

There's a couple of additional state that the executor needs to keep track of:

  • SCHEDULED: set if the task is scheduled for running
  • RUNNING: running is set when the future is polled.
  • COMPLETED: a task is completed when polling the future returns Poll::Ready. This means that the output is stored inside the task.
  • CLOSED: if a task is closed, it’s either canceled or the output has been consumed by a JoinHandle. If a task is CLOSED, the task’s future will never be polled again so it can be dropped.
  • HANDLE: set if the JoinHandle still exists.

For a more thorough explanation of the invariants of the state, check out this code snippet.

The state of the task is stored as an u8. Each of the states is stored as a bit. For example, SCHEDULED is 1 << 0 while HANDLE is 1 << 4.

Output

The task needs to store the output of a Task for the application to await.

#![allow(unused)]
fn main() {
let handle = spawn_local(async { 1 + 2 });
let res = future.await;
}

In this example, the Task needs to store the output (which is 3 in this example) to be consumed by an await.

Awaiter (Waker)

When the task is blocked (e.g. it's blocked by an I/O operation), we want the executor to switch to another task. But when should the task be scheduled to be run by the executor again?

This is what the Waker is for. The executor creates a Waker and passes it to the task each time it polls the task. The task stores the waker and invokes Waker::wake when it is unblocked. This will place the task back onto the task queue.

The task stores the Waker inside the awaiter property: pub(crate) awaiter: Option<Waker>

References

The Task needs to be deallocated when there is no more need for it. The Task is no longer needed if it’s canceled or when it’s completed and the output is consumed. The task has a references counter and the task is deallocated once the reference is 0.

Implementation

The raw task is allocated on the heap as follows:

#![allow(unused)]
fn main() {
pub struct Task {
    // Pointer to the raw task (allocated on heap)
    pub raw_task: NonNull<()>,
}
}

Here is the implementation of RawTask. It uses raw pointers

#![allow(unused)]
fn main() {
pub(crate) struct RawTask<F, R, S> {
    /// The task header.
    pub(crate) header: *const Header,

    /// The schedule function.
    pub(crate) schedule: *const S,

    /// The future.
    pub(crate) future: *mut F,

    /// The output of the future.
    pub(crate) output: *mut R,
}
}

The Header contains the state, the references, and the awaiter.

#![allow(unused)]
fn main() {
pub(crate) struct Header {
    pub(crate) state: u8,

    pub(crate) executor_id: usize,

    /// Current reference count of the task.
    pub(crate) references: AtomicI16,

    /// The virtual table.
    pub(crate) vtable: &'static TaskVTable,

    /// The task that is blocked on the `JoinHandle`.
    ///
    /// This waker needs to be woken up once the task completes or is closed.
    pub(crate) awaiter: Option<Waker>,
}
}

Both the Glommio crate and the async_task crate use the virtual table to contain pointers to methods necessary for bookkeeping the task. My understanding is that this reduces the runtime overhead, but let me know if there are other reasons why!

Creating a Task

Finally, to create a Task, you invoke the create_task method:

#![allow(unused)]
fn main() {
pub(crate) fn create_task<F, R, S>(
    executor_id: usize,
    future: F,
    schedule: S,
) -> (Task, JoinHandle<R>)
where
    F: Future<Output = R>,
    S: Fn(Task),
{
    let raw_task = RawTask::<_, R, S>::allocate(future, schedule, executor_id);

    let task = Task { raw_task };
    let handle = JoinHandle {
        raw_task,
        _marker: PhantomData,
    };
    (task, handle)
}
}

The create_task method takes a schedule function. Usually, the schedule method simply places the task onto the task queue.

#![allow(unused)]
fn main() {
let schedule = move |task| {
    let task_queue = tq.upgrade();
    task_queue.local_queue.push(task);
};
create_task(executor_id, future, schedule)
}

The core of this function is the allocate method which allocates the Task onto the heap:

#![allow(unused)]
fn main() {
pub(crate) fn allocate(future: F, schedule: S, executor_id: usize) -> NonNull<()> {
  let task_layout = Self::task_layout();
  unsafe {
      let raw_task = NonNull::new(alloc::alloc(task_layout.layout) as *mut ()).unwrap();
      let raw = Self::from_ptr(raw_task.as_ptr());
      // Write the header as the first field of the task.
      (raw.header as *mut Header).write(Header {
          state: SCHEDULED | HANDLE,
          executor_id,
          references: AtomicI16::new(0),
          vtable: &TaskVTable {
              schedule: Self::schedule,
              drop_future: Self::drop_future,
              get_output: Self::get_output,
              drop_task: Self::drop_task,
              destroy: Self::destroy,
              run: Self::run,
          },
          awaiter: None,
      });

      // Write the schedule function as the third field of the task.
      (raw.schedule as *mut S).write(schedule);

      // Write the future as the fourth field of the task.
      raw.future.write(future);
      raw_task
  }
}
}

Note that the initial state of a Task is SCHEDULED | HANDLE. It’s SCHEDULED because a task is considered to be scheduled whenever its Task reference exists. There’s a HANDLE because the JoinHandle hasn’t dropped yet.

API

The two most important APIs of a Task are schedule and run.

pub(crate) fn schedule(self)

This method schedules the task. It increments the references and calls the schedule method stored in the Task. In the context of an executor, the schedule method pushes itself onto the Task Queue that it was originally spawned into.

pub(crate) fn run(self)

The run method is how the user-provided future gets polled. Since the run method is quite meaty, I will dedicate the entire next page to talk about how it works.

Code References

To check out my toy implementation or Glommio’s implementation, check out:

My Toy Implementation

Glommio