Step 2 - Submitting a SQE
The second step to asynchronous I/O is to ask io_uring
to monitor a file descriptor on when it’s ready to perform I/O by submitting a SQE
entry.
API
Async
has two methods which will perform an I/O operation and wait until it is completed:
#![allow(unused)] fn main() { // Performs a read I/O operation and wait until it is readable pub async fn read_with<R>(&self, op: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> // Performs a write I/O operation and wait until it is writable pub async fn write_with<R>(&self, op: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> }
For example, here is how you can use the read_with
method to implement Async<TcpListener>
's accept
method:
#![allow(unused)] fn main() { impl Async<TcpListener> { ... pub async fn accept(&self) -> io::Result<(Async<TcpStream>, SocketAddr)> { let (stream, addr) = self.read_with(|io| io.accept()).await?; ... } } }
Implementation
Here is the implementation of read_with
:
#![allow(unused)] fn main() { impl<T> Async<T> { ... pub async fn read_with<R>(&self, op: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> { let mut op = op; loop { match op(self.get_ref()) { Err(err) if err.kind() == io::ErrorKind::WouldBlock => { } res => return res, } // this waits until the I/O operation is readable (completed) self.source.readable().await?; } } pub fn get_ref(&self) -> &T { self.io.as_ref().unwrap() } } }
It first performs the I/O operation via the call to op(self.get_ref())
. It then waits for the I/O operation is completed with self.source.readable().await
.
Source::readable
is an async
method that does a few things:
- It stores the
waker
of thePoller
by invokingself.add_waiter(cx.waker().clone())
. This way, when the executor detects that the I/O operation is completed, it can invokewake()
on the stored waker. The mechanism for waking up the unblocked task is explained in the next page. - It adds a
SQE
to theio_uring
instance in the Reactor by callingget_reactor().sys.interest(self, true, false)
.
Here is the implementation of Source::readable
:
#![allow(unused)] fn main() { impl Source { ... /// Waits until the I/O source is readable. pub(crate) async fn readable(&self) -> io::Result<()> { future::poll_fn(|cx| { if self.take_result().is_some() { return Poll::Ready(Ok(())); } self.add_waiter(cx.waker().clone()); get_reactor().sys.interest(self, true, false); Poll::Pending }) .await } pub(crate) fn take_result(&self) -> Option<io::Result<usize>> { self.inner.borrow_mut().wakers.result.take() } pub(crate) fn add_waiter(&self, waker: Waker) { self.inner.borrow_mut().wakers.waiters.push(waker); } } }
Here is the implementation of the Reactor::interest
method invoked. It first computes the PollFlags that will be used to construct the SQE
. It then calls queue_request_into_ring
to add a SQE
entry to the submission queue.
#![allow(unused)] fn main() { impl Reactor { ... pub(crate) fn interest(&self, source: &Source, read: bool, write: bool) { let mut flags = common_flags(); if read { flags |= read_flags(); } if write { flags |= write_flags(); } queue_request_into_ring( &mut *self.main_ring.borrow_mut(), source, UringOpDescriptor::PollAdd(flags), &mut self.source_map.clone(), ); } } /// Epoll flags for all possible readability events. fn read_flags() -> PollFlags { PollFlags::POLLIN | PollFlags::POLLPRI } /// Epoll flags for all possible writability events. fn write_flags() -> PollFlags { PollFlags::POLLOUT } }
queue_request_into_ring
This method simply adds a UringDescriptor
onto the SleepableRing
's queue. Note that queueing the request into ring doesn’t actually add a SQE
to the io_uring
's submission_queue. It just adds it to the submission_queue
property on the SleepableRing
.
#![allow(unused)] fn main() { fn queue_request_into_ring( ring: &mut (impl UringCommon + ?Sized), source: &Source, descriptor: UringOpDescriptor, source_map: &mut Rc<RefCell<SourceMap>>, ) { let q = ring.submission_queue(); let id = source_map.borrow_mut().add_source(source, Rc::clone(&q)); let mut queue = q.borrow_mut(); queue.submissions.push_back(UringDescriptor { args: descriptor, fd: source.raw(), user_data: id, }); } }
Each UringDescriptor
contains all the information required to fill a SQE
. For example, since invoking io_uring_prep_write
requires providing a buffer to write data from, its corresponding UringOpDescriptor::Write
requires providing a pointer and size for the buffer.
#![allow(unused)] fn main() { struct SleepableRing { ring: iou::IoUring, in_kernel: usize, submission_queue: ReactorQueue, name: &'static str, source_map: Rc<RefCell<SourceMap>>, } pub(crate) type ReactorQueue = Rc<RefCell<UringQueueState>>; pub(crate) struct UringQueueState { submissions: VecDeque<UringDescriptor>, cancellations: VecDeque<UringDescriptor>, } pub(crate) struct UringDescriptor { fd: RawFd, user_data: u64, args: UringOpDescriptor, } #[derive(Debug)] enum UringOpDescriptor { PollAdd(PollFlags), Write(*const u8, usize, u64), ... } }
Each UringDescriptor
has a unique user_data
field. This is the same user_data
field on each SQE
and is passed as-is from the SQE
to the CQE
. To generate a unique Id, the add_source
method returns a new unique Id by incrementing a counter each time add_source
is called:
#![allow(unused)] fn main() { impl SourceMap { ... fn add_source(&mut self, source: &Source, queue: ReactorQueue) -> u64 { let id = self.id; self.id += 1; self.map.insert(id, source.inner.clone()); id } }
Submitting the Events
Consuming the event is performed by the consume_submission_queue
method, which calls consume_sqe_queue
. It repeatedly calls prep_one_event
to add a SQE
entry on the io_uring
's submission queue by calling prepare_sqe
to allocate a new SQE
and calling fill_sqe
to fill in the necessary details.
If dispatch
is true, it then calls submit_sqes
which finally sends the SQE
s to the kernel.
#![allow(unused)] fn main() { impl UringCommon for SleepableRing { fn consume_submission_queue(&mut self) -> io::Result<usize> { let q = self.submission_queue(); let mut queue = q.borrow_mut(); self.consume_sqe_queue(&mut queue.submissions, true) } fn consume_sqe_queue( &mut self, queue: &mut VecDeque<UringDescriptor>, mut dispatch: bool, ) -> io::Result<usize> { loop { match self.prep_one_event(queue) { None => { dispatch = true; break; } Some(true) => {} Some(false) => break, } } if dispatch { self.submit_sqes() } else { Ok(0) } } fn prep_one_event(&mut self, queue: &mut VecDeque<UringDescriptor>) -> Option<bool> { if queue.is_empty() { return Some(false); } if let Some(mut sqe) = self.ring.sq().prepare_sqe() { let op = queue.pop_front().unwrap(); // TODO: Allocator fill_sqe(&mut sqe, &op); Some(true) } else { None } } fn submit_sqes(&mut self) -> io::Result<usize> { let x = self.ring.submit_sqes()? as usize; self.in_kernel += x; Ok(x) } } fn fill_sqe(sqe: &mut iou::SQE<'_>, op: &UringDescriptor) { let mut user_data = op.user_data; unsafe { match op.args { UringOpDescriptor::PollAdd(flags) => { sqe.prep_poll_add(op.fd, flags); } ... } sqe.set_user_data(user_data); } } }