Step 3 - Processing the CQE

After adding a SQE to the io_uring's submission queue, the executor needs a way to detect when the I/O operation is completed and resume the task that is blocked.

Detecting when the I/O operation is completed is done by checking if there are new CQE entries on the io_uring instance’s completion queue. Resuming the task that is blocked is performed by calling wake() on the stored Waker in the Source.

API

Each Reactor has a wait API that the executor can use to check for new CQE entries and process the completed event. Here is its API:

#![allow(unused)]
fn main() {
pub(crate) fn wait(&self) -> ()
}

Implementation

The Reactor::wait API first calls consume_completion_queue to check if there are any new CQE entries. It then calls consume_submission_queue to submit SQE entries to the kernel as covered in the last page.

#![allow(unused)]
fn main() {
impl Reactor {
		...
		
		pub(crate) fn wait(&self) {
        let mut main_ring = self.main_ring.borrow_mut();
        main_ring.consume_completion_queue();
        main_ring.consume_submission_queue().unwrap();
    }
}
}

Here is the implementation of consume_completion_queue. It simply calls consume_one_event repeatedly until there are no more new CQE events. Consume_one_event simply invokes process_one_event.

#![allow(unused)]
fn main() {
pub(crate) trait UringCommon {
		...

		fn consume_completion_queue(&mut self) -> usize {
        let mut completed: usize = 0;
        loop {
            if self.consume_one_event().is_none() {
                break;
            } else {
            }
            completed += 1;
        }
        completed
    }
}

impl UringCommon for SleepableRing {
	fn consume_one_event(&mut self) -> Option<bool> {
      let source_map = self.source_map.clone();
      process_one_event(self.ring.peek_for_cqe(), source_map).map(|x| {
          self.in_kernel -= 1;
          x
      })
  }
}
}

Here is the implementation for process_one_event:

#![allow(unused)]
fn main() {
fn process_one_event(cqe: Option<iou::CQE>, source_map: Rc<RefCell<SourceMap>>) -> Option<bool> {
    if let Some(value) = cqe {
        // No user data is `POLL_REMOVE` or `CANCEL`, we won't process.
        if value.user_data() == 0 {
            return Some(false);
        }

        let src = source_map.borrow_mut().consume_source(value.user_data());

        let result = value.result();

        let mut woke = false;

        let mut inner_source = src.borrow_mut();
        inner_source.wakers.result = Some(result.map(|v| v as usize));
        woke = inner_source.wakers.wake_waiters();

        return Some(woke);
    }
    None
}
}

The method first retrieves the Source with the user_data on the CQE. Next, it wakes up the waiters stored on the Source. This resumes the tasks blocked by scheduling them back onto the executor.

The executor calls Reactor::wait on each iteration in the loop inside the run method via the poll_io method as shown below:

#![allow(unused)]
fn main() {
/// Runs the executor until the given future completes.
pub fn run<T>(&self, future: impl Future<Output = T>) -> T {
		...
    LOCAL_EX.set(self, || {
        ...
        loop {
            if let Poll::Ready(t) = join_handle.as_mut().poll(cx) {
                ...
            }
						// This would call Reactor::wait()
            self.parker
                .poll_io()
                .expect("Failed to poll io! This is actually pretty bad!");
            ...
        }
    })
}