Motivation
In this blog, we will reimplement Redpanda's iobuf
library as introduced in their blog post.
The full source code is available here.
To understand iobuf
, we need to first understand Redpanda's threading and memory model. Redpanda uses a thread-per-core (TpC) architecture. TpC is a programming model that addresses the two shortcomings of threaded programming:
- Threads executing on the same data require synchronization mechanisms like locks, which are expensive.
- Context switching is required when a thread suspends itself and lets another thread run. Context switching is expensive.
In TpC, each application thread is pinned to a CPU. Since the OS cannot move the threads around, there are no context switches. Furthermore, under TpC each thread relies on message passing instead of shared memory to share data. This eliminates synchronization overheads from locks. To learn more about TpC, check out this article by Seastar or this blog by Glommio.
Under Seastar, Redpanda's TpC framework, the full memory of the computer is split evenly across the cores during system bootup. As stated in Redpanda's blog, "memory allocated on core-0 [core-N], must be deallocated on core-0 [core-N]. However, there is no way to guarantee that a Java or Go client connecting to Redpanda will communicate with the exact core that owns the data". Therefore, Redpanda created iobuf
, "a ref-counted, fragmented-buffer-chain with deferred deletes that allows Redpanda to simply share a view of a remote core’s parsed messages as the fragments come in, without incurring a copy overhead". In other words, iobuf
is a way for Redpanda to share data across cores cheaply and deallocate the data in the core that owns the data when no cores need that data.
Now, let's look at how it works under the hood by examining my toy implementation!
Temporary Buffer
Before we implement the iobuf
, we need to first implement its building block, the Temporary Buffer
.
Temporary Buffer is a memory management abstraction for a contiguous region of memory. What makes temporary buffers different from buffers like Vec
is that the underlying memory can be shared with another temporary_buffer
. In other words, it is a buffer with an automatic reference count, kind of like an Arc
in Rust.
My implementation is largely inspired by Seastar’s implementation.
API
Constructor
To create a TemporaryBuffer
, you use the new: (size: usize) -> TemporaryBuffer
API.
#![allow(unused)] fn main() { let buffer = TemporaryBuffer::new(12); }
Write
To write into a TemporaryBuffer
, you call get_write(&self) -> Option<*mut u8>
. It is dangerous to return a mutable raw pointer
if there is another reference to the TemporaryBuffer
as the other buffer may not expect data to change. Therefore, get_write
would return None
if there are more than one reference to the buffer.
#![allow(unused)] fn main() { let buffer = TemporaryBuffer::new(12); let ptr = buffer.get_write().unwrap(); let data: Vec<u8> = vec![1, 2, 3]; unsafe { std::ptr::copy_nonoverlapping(data.as_ptr(), ptr, 3); } }
Share
To share a buffer, you can either use share(&self) -> TemporaryBuffer
to share the entire buffer or share_slice(&self, pos: usize, len: usize) -> TemporaryBuffer
to get a sliced reference to the temporary buffer.
Note that in either scenario, the underlying data will not be destroyed until all references to the buffer is destroyed. This means that having a tiny slice of a temporary buffer would result in the entire data the original temporary buffer holds to be held in memory.
#![allow(unused)] fn main() { let buffer = TemporaryBuffer::new(12); { let second = buffer.share(); assert_eq!(buffer.get_ref_count(), 2); { let slice = buffer.share_slice(0, 3); assert_eq!(buffer.get_ref_count(), 3); } } assert_eq!(buffer.get_ref_count(), 1); }
In the example above, we created multiple references to the buffer
in different scopes. In the deepest scope, the reference count for the buffer
is 3. But in the outmost scope, since the scoped references get dropped, there is only 1 ref count.
Internals
Conceptually, a temporary_buffer
is quite similar to an Arc
. Instead of Arc::clone
, you perform share
or share_slice
to increment the ref count. When the reference count reaches 0, the underlying data is deallocated.
#![allow(unused)] fn main() { pub struct TemporaryBuffer { deleter: NonNull<BufferInternal>, size: usize, buffer: *mut u8, } struct BufferInternal { ref_counter: AtomicUsize, size: usize, buffer: *mut u8, } }
This is what the TemporaryBuffer
looks like under the hood:
Each TemporaryBuffer
has a NonNull
raw pointer to a BufferInternal
. BufferInternal
keeps track of the reference counts and deallocates the memory when the reference count reaches 0.
This is the implementation for the constructor:
#![allow(unused)] fn main() { pub fn new(size: usize) -> Self { let layout = Layout::array::<u8>(size).unwrap(); let buffer = unsafe { alloc(layout) }; TemporaryBuffer { deleter: NonNull::from(Box::leak(Box::new(BufferInternal { ref_counter: AtomicUsize::new(1), size, buffer, }))), size, buffer, } } }
Here, we used Box::new
to create a new allocation and Box::leak
to give up ownership of the value without deallocating the memory associated with it. Then we use NonNull
to get the pointer to the memory.
This is the implementation of share_slice
. Each time it’s called, the deleter
's ref_count
is incremented by one. We can use Relaxed
memory ordering here because
#![allow(unused)] fn main() { pub fn share_slice(&self, pos: usize, len: usize) -> TemporaryBuffer { if self.get_deleter().ref_count.fetch_add(1, Relaxed) > usize::MAX / 2 { std::process::abort(); } TemporaryBuffer { deleter: self.deleter, size: len, buffer: unsafe { self.buffer.add(pos) }, } } }
Just as we increment the ref count when sharing the buffer, we decrement the ref count (with fetch_sub
when the buffer gets dropped (i.e. out of scope).
#![allow(unused)] fn main() { impl Drop for TemporaryBuffer { fn drop(&mut self) { if self.get_deleter().ref_count.fetch_sub(1, Release) == 1 { fence(Acquire); unsafe { let layout = Layout::array::<u8>(self.deleter.as_ref().size).unwrap(); dealloc(self.deleter.as_ref().buffer, layout); drop(Box::from_raw(self.deleter.as_ptr())); } } } } }
If fetch_sub
returns 1, we need to ensure that no other references have access to the underlying memory. Note that this isn’t trivial as the buffer
is not part of the atomic
ref_count. To guarantee that any non-atomic loads and stores to buffer occur before the final fetch_sub
, we need to establish a happens-before relationship
between the final fetch_sub
and all other fetch_subs
.
The figure above illustrates how we establish the happens-before relationship. Note that all atomic operations (even relaxed ones) have total modification order for that atomic variable. This means that all modifications of the same atomic variable happen in an order that is the same from the perspective of every single thread. This means that if fetch_sub
returns 1
, it is the last fetch_sub
.
We use an acquire fence
to guarantee that whatever happens after the fence happens after any event before all other final_sub
s. Since loads or modifications to the buffer
count as any event
, we guarantee that after the fence, no other threads have access to the underlying memory.
Check out the implementation on github here.
IoBuf
My toy iobuf
implementation is based on Redpanda
's iobuf
implementation.
Iobuf is pretty much just a linked list of TemporaryBuffer
s. Each Temporary Buffer
is wrapped around an IoFragment
, which is like a linked list node with next/prev pointers to other fragments.
API
Constructor
There are a few different ways to create an iobuf
. The easiest way is to just use the new() -> Iobuf
as follows:
#![allow(unused)] fn main() { let mut iobuf = IoBuf::new(); }
Append
There are a few different ways to append
data to an iobuf
. Here are some of the method signatures to append:
#![allow(unused)] fn main() { fn append(&mut self, src: *const u8, len: usize) -> (); fn append_temporary_buffer(&mut self, buffer: TemporaryBuffer); fn append_fragment(&mut self, fragment: IoFragment); }
Iterator
To create an iterator over the IoFragment
s that make up the iobuf
, you can call the begin()
method with the signature: fn begin<'a>(&'a self) -> IoFragmentIter<'a>
.
I’ll go deeper into iterators in the next page where I talk about IoIteratorConsumer
.
Internals
#![allow(unused)] fn main() { pub struct IoBuf { frags: LinkedList<IoFragmentAdapter>, // number of bytes the IoBuf contains size: usize, } }
As we can see from the definition of IoBuf
, it’s pretty much just a linked list of IoFragment
nodes. The IoBuf
itself doesn’t store any data, it just holds pointers to temporary buffer
instances, which actually hold the data.
Similar to Redpanda’s implementation, I also used an intrusive linked list from the intrusive_collections trait.
The main difference between an intrusive collection and a normal collection is that intrusive collections don’t allocate memory themselves. This means that the next
and prev
pointers directly live inside the nodes. Intrusive collections are good because they eliminate dynamic memory allocation, which may cause the memory pools to be fragmented.
To work with the intrusive linked list, I just needed to use the intrusive_adapter
macro like this:
#![allow(unused)] fn main() { intrusive_adapter!(pub IoFragmentAdapter = Box<IoFragment>: IoFragment { link: LinkedListLink }); }
The macro automatically creates an IoFragmentAdapter
with next
and prev
pointers. IoFragmentAdapter
is the actual linked list node the iobuf
uses.
The actual definition of IoFragment
is:
#![allow(unused)] fn main() { pub struct IoFragment { used_bytes: RefCell<usize>, buf: TemporaryBuffer, pub link: LinkedListLink, } }
Append
Let’s look at how we implement fn append(&mut self, src: *const u8, len: usize) -> ();
The diagram shows an iobuf
with 2 fragments. The second fragment has 10
available bytes. If we insert 15
bytes into the iobuf
, it will first fill in the available bytes. Then it will create a new io_fragment
with the next allocation size and fill the remaining 5
bytes into the next fragment.
Computing the next allocation size is based on the iobuf allocation table logic provided by Redpanda. In general, fixed sizing and size capping help reduce memory fragmentation.
Here is the implementation of append
:
#![allow(unused)] fn main() { pub fn append(&mut self, src: *const u8, len: usize) -> () { if len <= self.available_bytes() { let fragment = self.get_last_fragment(); self.size += fragment.append(src, len); return; } let mut remaining = len; let mut ptr = src; while remaining > 0 { self.append_new_fragment(remaining); let appended_size = self.get_last_fragment().append(ptr, remaining); ptr = unsafe { ptr.add(appended_size) }; remaining -= appended_size; self.size += appended_size; } } }
Share
Sharing an iobuf
based on a pos
and a len
simply creates a new iobuf
and find the io_fragment
s that intersect with the share range. If it is, we use the temporary_buffer
's share
method to get a reference of the temporary buffer.
#![allow(unused)] fn main() { pub fn share(&self, pos: usize, len: usize) -> IoBuf { let mut ret = IoBuf::new(); let mut remaining = len; let mut pos = pos; for fragment in self.frags.iter() { if remaining == 0 { return ret; } if pos >= fragment.size() { pos -= fragment.size(); continue; } let right = std::cmp::min(pos + remaining, fragment.size() - 1); let buffer = fragment.share(pos, right); ret.append_temporary_buffer(buffer); remaining -= right - pos - 1; } ret } }
In other words, share
increments the reference count of each of the temporary buffers that it intersects. When the iobuf
drops, all the reference counts of the temporary buffers drop. Each temporary buffer is responsible for deallocating the memory if the reference count reaches zero.
Check out the implementation of iobuf here.
IoIteratorConsumer
To get data out of an iobuf
, the best way is to use a consumer iterator. My IoIteratorConsumer
implementation is based on Redpanda’s implementation. Check out their source code here.
#![allow(unused)] fn main() { let mut iobuf = IoBuf::new(); let values1 = generate_random_u8_vec(1000); iobuf.append(values1.as_ptr(), values1.len()); let values2 = generate_random_u8_vec(3000); iobuf.append(values2.as_ptr(), values2.len()); let mut consumer = IoIteratorConsumer::new(iobuf.begin()); let arr = consumer.consume_to_arr(1000); assert_eq!(values1, arr); let arr2 = consumer.consume_to_arr(3000); assert_eq!(values2, arr2); }
This example showcases how the IoIteratorConsumer
is used. You initialize it with a pointer to an IoFragmentIter
. You then invoke consumer_to_arr
to copy the data into an array. Note that each call to consume_to_arr
would advance the iterator pointer under the hood.
The method consume_to_arr
is actually powered by the consume
method, which takes a callback with the start pointer and the size.
#![allow(unused)] fn main() { pub fn consume<T>(&mut self, n: usize, consumer: T) where T: Fn(*const u8, usize) -> (), { let mut consumed = 0; while self.current_frag.is_some() && consumed < n { let segment_bytes_left = self.segment_bytes_left(); if segment_bytes_left == 0 { self.current_frag = self.frag_it.next(); self.frag_index = self.current_frag.map(|frag| frag.get_start()); continue; } let step = std::cmp::min(segment_bytes_left, n - consumed); let frag_index = self.frag_index.unwrap(); consumer(frag_index, step); self.frag_index = Some(unsafe { frag_index.add(step) }); consumed += step; } } }
The algorithm iterates over the fragments and calls callbacks with the start pointer and the size of that fragment to consume. Once the number of elements that have been consumed reaches n, the iterator stops.