API
Our goal here is to implement a set of internal APIs to make it easy to convert synchronous I/O operations into asynchronous ones.
Here are the rough steps to convert a blocking I/O operation into an asynchronous one:
- we set the file descriptor to non-blocking
- we perform the non-blocking operation
- we tell
io_uring
to monitor the file descriptor by submitting anSQE
- we store the poller’s
waker
and invokewake()
when the I/O operation is complete. We detect when an I/O operation is complete when the correspondingCQE
is posted to theio_uring
's completion queue.
To make it easier to implement new asynchronous operations, we introduce Async
, an adapter for I/O types inspired by the async_io crate. Async
abstracts away the steps listed above so that developers who build on top of Async
don’t have to worry about things like io_uring
, Waker
, O_NONBLOCK
, etc.
Here is how you use the Async
adapter to implement an asynchronous TcpListener
with an asynchronous accept
method:
#![allow(unused)] fn main() { impl Async<TcpListener> { pub fn bind<A: Into<SocketAddr>>(addr: A) -> io::Result<Async<TcpListener>> { let addr = addr.into(); let listener = TcpListener::bind(addr)?; Ok(Async::new(listener)?) } pub async fn accept(&self) -> io::Result<(Async<TcpStream>, SocketAddr)> { let (stream, addr) = self.read_with(|io| io.accept()).await?; Ok((Async::new(stream)?, addr)) } } }
Here is how you can use the Async<TcpListener>
inside an executor to perform asynchronous I/O:
#![allow(unused)] fn main() { let local_ex = LocalExecutor::default(); let res = local_ex.run(async { let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 8080)).unwrap(); let (stream, _) = listener.accept().await.unwrap(); handle_connection(stream); }); }
Next, let's look at what the Async
adapter actually does.