TaskQueue

An executor needs to store a list of scheduled Tasks. This is what the TaskQueue is for, it holds a collection of managed tasks.

Here is the implemnetation for the TaskQueue:

#![allow(unused)]
fn main() {
pub(crate) struct TaskQueue {
    // contains the actual queue of Tasks
    pub(crate) ex: Rc<TaskQueueExecutor>,
    // The invariant around active is that when it's true,
    // it needs to be inside the active_executors
    pub(crate) active: bool,
}

pub(crate) struct TaskQueueExecutor {
    local_queue: LocalQueue,
    name: String,
}

struct LocalQueue {
    queue: RefCell<VecDeque<Task>>,
}
}

The TaskQueue contains a TaskQueueExecutor which contains the actual LocalQueue which holds a VecDeque of Tasks.

The two most important methods on a TaskQueueExecutor are:

  • create_task
  • spawn_and_schedule

create_task

Create task allocates the Task and creates the corresponding JoinHandle. Note that creating a Task requires providing a schedule method. The provided schedule method is a closure that simply pushes the task onto the local_queue.

#![allow(unused)]
fn main() {
// Creates a Task with the Future and push it onto the queue by scheduling
fn create_task<T>(
    &self,
    executor_id: usize,
    tq: Rc<RefCell<TaskQueue>>,
    future: impl Future<Output = T>,
) -> (Task, JoinHandle<T>) {
    let tq = Rc::downgrade(&tq);
    let schedule = move |task| {
        let tq = tq.upgrade();

        if let Some(tq) = tq {
            {
                tq.borrow().ex.as_ref().local_queue.push(task);
            }
            {
                LOCAL_EX.with(|local_ex| {
                    let mut queues = local_ex.queues.as_ref().borrow_mut();
                    queues.maybe_activate_queue(tq);
                });
            }
        }
    };
    create_task(executor_id, future, schedule)
}
}

spawn_and_schedule

#![allow(unused)]
fn main() {
pub(crate) fn spawn_and_schedule<T>(
    &self,
    executor_id: usize,
    tq: Rc<RefCell<TaskQueue>>,
    future: impl Future<Output = T>,
) -> JoinHandle<T> {
    let (task, handle) = self.create_task(executor_id, tq, future);
    task.schedule();
    handle
}
}

spawn_and_schedule simply creates the task and invokes the schedule method which pushes the task onto the LocalQueue of the TaskQueueExecutor.

Code References

To check out my toy implementation or Glommio’s implementation, check out:

My Toy Implementation

Glommio