Join Handle

When a task is spawned, the user needs a way to consume the output or cancel the task. This is what the JoinHandle does - it allows the user to consume the output of the task or cancel the task.

After a task is spawned, the way to consume the output is to await the handle. For example:

#![allow(unused)]
fn main() {
let handle = spawn_local(async { 1 + 3 });
let res: i32 = handle.await;
}

Awaiting is also a control flow mechanism that allows the user to control the execution order of two tasks. For example, in the following method, the second task won’t be spawned until the first task is completed.

#![allow(unused)]
fn main() {
let handle = spawn_local(...);
handle.await;
spawn_local(...);
}

Since the JoinHandle can be awaited, it must implement the Future trait. So what does the poll method of the JoinHandle do?

Poll

Polling a JoinHandle doesn’t actually poll the user-provided future to progress it. The only way for the user-provided future to be polled is with the RawTask::run method which is invoked by the LocalExecutor’s run method.

Before we look into what poll does, let’s first look at the different ways a JoinHandle is used.

There are two different ways a JoinHandle gets created:

  • LocalExecutor::run
  • spawn_local / spawn_local_into

LocalExecutor::run

Here is a code snippet for the run method:

#![allow(unused)]
fn main() {
 LOCAL_EX.set(self, || {
    let waker = dummy_waker();
    let cx = &mut Context::from_waker(&waker);
    let join_handle = self.spawn(async move { future.await });
    pin!(join_handle);
    loop {
        if let Poll::Ready(t) = join_handle.as_mut().poll(cx) {
            return t.unwrap();
        }
        self.run_task_queues();
    }
})
}

We can see that join_handle is only used as a way to inspect whether the user-provided future is completed or not. Therefore, a dummy_waker is used. A dummy_waker is a Waker that doesn’t do anything when wake() is invoked.

spawn_local / spawn_local_into

Earlier, we talked about how the compiler converts the body of an async function into a state machine, where each .await call represents a new state. We also learned that when the state machine is polled and it returns Poll::Pending, then the executor wouldn’t want to poll the state machine again until the blocking task is completed. Therefore, the blocking task needs to store the waker of the parent task and notify it when the parent task can be polled again.

This is what the JoinHandle created from spawn_local and spawn_local_into needs to do. It stores the waker from the poll method and notifies the executor that the parent task can be polled again.

#![allow(unused)]
fn main() {
let local_ex = LocalExecutor::default();
local_ex.run(async {
    let join_handle = spawn_local(async_write_file());
    join_handle.await;
});
}

In the example above, the run method would spawn the Future created from the async block as follows:

#![allow(unused)]
fn main() {
let join_handle = self.spawn(async move { future.await });
}

Let’s call this Task A. When Task A gets polled, it executes the following two lines of code:

#![allow(unused)]
fn main() {
let join_handle = spawn_local(async_write_file());
join_handle.await;
}

Let’s call the task associated with async_write_file as Task B. When the join handle for Task B is polled, Task B is most likely not complete yet. Therefore, Task B needs to store the Waker from the poll method. The Waker would schedule Task A back onto the executor when .wake() is invoked.

Deep Dive into Poll

Here is the rough structure of the JoinHandle's poll method. Notice that the Output type is Option<R> instead of R. The poll method returns Poll::Ready(None) if the task is CLOSED. In general, there are three scenarios to cover:

  • if the task is CLOSED
  • if the task is not COMPLETED
  • if the task is neither CLOSED nor not COMPLETED
#![allow(unused)]
fn main() {
impl<R> Future for JoinHandle<R> {
    type Output = Option<R>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let ptr = self.raw_task.as_ptr();
        let header = ptr as *mut Header;

        unsafe {
            let state = (*header).state;

            if state & CLOSED != 0 {
							 ...
            }

            if state & COMPLETED == 0 {
               ...
            }

            ...
        }
    }
}
}

Let’s first look at what happens if the task is CLOSED.

#![allow(unused)]
fn main() {
if state & CLOSED != 0 {
    // If the task is scheduled or running, we need to wait until its future is
    // dropped.
    if state & (SCHEDULED | RUNNING) != 0 {
        // Replace the waker with one associated with the current task.
        (*header).register(cx.waker());
        return Poll::Pending;
    }

    // Even though the awaiter is most likely the current task, it could also be
    // another task.
    (*header).notify(Some(cx.waker()));
    return Poll::Ready(None);
}
}

If the task is closed, we notify the awaiter and return None. However, in the case that it’s CLOSED but still SCHEDULED | RUNNING, that means the future hasn’t dropped yet. My understanding of this is that we are playing safe here, but let me know if there’s another reason why we need to return Poll::Pending when the future hasn’t dropped yet.

Next, if the state is not COMPLETED, then we simply register the waker as the awaiter and return Poll::Pending.

#![allow(unused)]
fn main() {
if state & COMPLETED == 0 {
    // Replace the waker with one associated with the current task.
    (*header).register(cx.waker());

    return Poll::Pending;
}
}

Finally, in the case that the task’s state is not CLOSED and COMPLETED, then we mark the task as CLOSED since the output has been consumed. We notify the awaiter. And we return Poll::Ready(Some(output).

#![allow(unused)]
fn main() {
(*header).state |= CLOSED;

// Notify the awaiter. Even though the awaiter is most likely the current
// task, it could also be another task.
(*header).notify(Some(cx.waker()));

// Take the output from the task.
let output = ((*header).vtable.get_output)(ptr) as *mut R;
Poll::Ready(Some(output.read()))
}

Cancel

Another responsibility of JoinHandle is that it’s a handle for the user to cancel a task. I won’t go into too much detail about how cancel works. But the general idea is that canceling a task means that the future will not be polled again. However, if the task is already COMPLETED, canceling a JoinHandle does nothing.

Code References

To check out my toy implementation or Glommio’s implementation, check out:

Mini Async Runtime

Glommio