LocalExecutor
Now that we know how the task and task queue is implemented, we can perform a deep dive
into how the Executor::Run
and Executor::spawn
methods are implemented.
As a refresher, here’s an example of an executor running a task and spawning a task onto the executor.
#![allow(unused)] fn main() { let local_ex = LocalExecutor::default(); let res = local_ex.run(async { let handle = local_ex.spawn(async_write_file()); handle.await; }); }
Single Threaded
It’s important to understand that the LocalExecutor
is single-threaded. This means that the executor can only be run on the thread that created it. LocalExecutor
doesn’t implement the Send
or Sync
trait, so you cannot move a LocalExecutor
across threads. This makes it easier to reason about the methods on LocalExecutor
since it’s safe to assume that only one function invocation can be executing at any time. In other words, there won’t be two invocations of run
on the same executor at once.
Internals
Let’s look at the internals of an Executor:
#![allow(unused)] fn main() { pub(crate) struct LocalExecutor { pub(crate) id: usize, pub(crate) queues: Rc<RefCell<QueueManager>>, } }
A LocalExecutor
contains a QueueManager
. As explained earlier, a QueueManager
contains all the Task Queues
.
#![allow(unused)] fn main() { pub(crate) struct QueueManager { pub active_queues: BinaryHeap<Rc<RefCell<TaskQueue>>>, pub active_executing: Option<Rc<RefCell<TaskQueue>>>, pub available_queues: AHashMap<usize, Rc<RefCell<TaskQueue>>>, } }
At any time, a QueueManager
is actively working on at most one TaskQueue
. The active_queues
property stores the TaskQueues
that are not empty. Any TaskQueue
inside active_queues
is also inside available_queues
. A TaskQueue
is removed from active_queues
whenever it’s empty.
Now, we can finally look at run
, the core method of a LocalExecutor
.
Deep Dive into Run
The run
method runs the executor until the provided future
completes. Here is its implementation:
#![allow(unused)] fn main() { pub fn run<T>(&self, future: impl Future<Output = T>) -> T { assert!( !LOCAL_EX.is_set(), "There is already an LocalExecutor running on this thread" ); LOCAL_EX.set(self, || { let join_handle = self.spawn(async move { future.await }); let waker = dummy_waker(); let cx = &mut Context::from_waker(&waker); pin!(join_handle); loop { if let Poll::Ready(t) = join_handle.as_mut().poll(cx) { // can't be canceled, and join handle is None only upon // cancellation or panic. So in case of panic this just propagates return t.unwrap(); } // TODO: I/O work self.run_task_queues(); } }) } }
Let’s break down run
line by line. First, run
makes sure that no other executors are running on the same thread. LOCAL_EX
is a thread local storage key defined as:
#![allow(unused)] fn main() { scoped_tls::scoped_thread_local!(static LOCAL_EX: LocalExecutor); }
Next, it calls spawn
to create and schedule the task onto the TaskQueue
.
It then loops until the future
is completed. It’s super important to understand that the poll
method here doesn’t actually poll
the user-provided future. It simply poll
s the JoinHandle
, which checks if the COMPLETED
flag on the task’s state
is set.
Since the executor
is single-threaded, looping alone won’t actually progress the underlying future. Therefore, in each loop, the executor
calls the run_task_queues
method.
run_task_queues
simply loops and calls run_one_task_queue
until there are no more task
s left in the TaskQueue
.
#![allow(unused)] fn main() { fn run_task_queues(&self) -> bool { let mut ran = false; loop { // TODO: Check if prempt if !self.run_one_task_queue() { return false; } else { ran = true; } } ran } }
run_one_task_queue
sets the active_executing
queue to one of the active_queues
. It then loops until until there are no more tasks in that TaskQueue
.
In each loop, it calls get_task
which pops a task
from the TaskQueue
.
#![allow(unused)] fn main() { fn run_one_task_queue(&self) -> bool { let mut q_manager = self.queues.borrow_mut(); let size = q_manager.active_queues.len(); let tq = q_manager.active_queues.pop(); match tq { Some(tq) => { q_manager.active_executing = Some(tq.clone()); drop(q_manager); loop { // TODO: Break if pre-empted or yielded let tq = tq.borrow_mut(); if let Some(task) = tq.get_task() { drop(tq); task.run(); } else { break; } } true } None => { false } } } }
To summarize, run
spawns a task onto one of the task queues. The executor then runs one task_queue
at a time to completion until the spawned task
is COMPLETED
. The most important concept to remember here is that none of the task is blocking
. Whenever one of the task
is about to be run, it is popped from the TaskQueue
. It won’t be scheduled back onto the TaskQueue
until its waker
is invoked, which is when the thing blocking it is no longer blocking. In other words, the executor
will move from one task
to another without waiting on any blocking code.
spawn
The spawn
method is how a user can spawn a task onto the executor.
spawn
allows the developer to create two tasks that run concurrently instead of sequentially:
#![allow(unused)] fn main() { let res = local_ex.run(async { let handle1 = local_ex.spawn(async_write_file()); let handle2 = local_ex.spawn(async_write_file()); handle1.await; handle2.await; }); }
This is the implementation of spawn
:
Spawn_local
simply finds the LocalExecutor
on the current thread and calls LocalExecutor::spawn
. Here is the implementation of spawn
:
#![allow(unused)] fn main() { pub(crate) fn spawn<T>(&self, future: impl Future<Output = T>) -> JoinHandle<T> { let active_executing = self.queues.borrow().active_executing.clone(); let tq = active_executing .clone() // this clone is cheap because we clone an `Option<Rc<_>>` .or_else(|| self.get_default_queue()) .unwrap(); let tq_executor = tq.borrow().ex.clone(); tq_executor.spawn_and_schedule(self.id, tq, future) } pub(crate) fn spawn_and_schedule<T>( &self, executor_id: usize, tq: Rc<RefCell<TaskQueue>>, future: impl Future<Output = T>, ) -> JoinHandle<T> { let (task, handle) = self.create_task(executor_id, tq, future); task.schedule(); handle } }
Spawn
gets the active executing TaskQueue
, creates a task and schedules the Task
onto the TaskQueue
.
To summarize, spawn_local
simply schedules a Task
onto the LocalExecutor
's actively executing TaskQueue
.
Code References
To check out my toy implementation or Glommio’s implementation, check out:
My Toy Implementation
Glommio