IoBuf

My toy iobuf implementation is based on Redpanda's iobuf implementation.

Iobuf is pretty much just a linked list of TemporaryBuffers. Each Temporary Buffer is wrapped around an IoFragment, which is like a linked list node with next/prev pointers to other fragments.

API

Constructor

There are a few different ways to create an iobuf. The easiest way is to just use the new() -> Iobuf as follows:

#![allow(unused)]
fn main() {
let mut iobuf = IoBuf::new();
}

Append

There are a few different ways to append data to an iobuf. Here are some of the method signatures to append:

#![allow(unused)]
fn main() {
fn append(&mut self, src: *const u8, len: usize) -> ();
fn append_temporary_buffer(&mut self, buffer: TemporaryBuffer);
fn append_fragment(&mut self, fragment: IoFragment);
}

Iterator

To create an iterator over the IoFragments that make up the iobuf, you can call the begin() method with the signature: fn begin<'a>(&'a self) -> IoFragmentIter<'a>.

I’ll go deeper into iterators in the next page where I talk about IoIteratorConsumer.

Internals

#![allow(unused)]
fn main() {
pub struct IoBuf {
    frags: LinkedList<IoFragmentAdapter>,
    // number of bytes the IoBuf contains
    size: usize,
}
}

As we can see from the definition of IoBuf, it’s pretty much just a linked list of IoFragment nodes. The IoBuf itself doesn’t store any data, it just holds pointers to temporary buffer instances, which actually hold the data.

Similar to Redpanda’s implementation, I also used an intrusive linked list from the intrusive_collections trait.

The main difference between an intrusive collection and a normal collection is that intrusive collections don’t allocate memory themselves. This means that the next and prev pointers directly live inside the nodes. Intrusive collections are good because they eliminate dynamic memory allocation, which may cause the memory pools to be fragmented.

To work with the intrusive linked list, I just needed to use the intrusive_adapter macro like this:

#![allow(unused)]
fn main() {
intrusive_adapter!(pub IoFragmentAdapter = Box<IoFragment>: IoFragment { link: LinkedListLink });
}

The macro automatically creates an IoFragmentAdapter with next and prev pointers. IoFragmentAdapter is the actual linked list node the iobuf uses.

The actual definition of IoFragment is:

#![allow(unused)]
fn main() {
pub struct IoFragment {
    used_bytes: RefCell<usize>,
    buf: TemporaryBuffer,
    pub link: LinkedListLink,
}
}

Append

Let’s look at how we implement fn append(&mut self, src: *const u8, len: usize) -> ();

The diagram shows an iobuf with 2 fragments. The second fragment has 10 available bytes. If we insert 15 bytes into the iobuf, it will first fill in the available bytes. Then it will create a new io_fragment with the next allocation size and fill the remaining 5 bytes into the next fragment.

Computing the next allocation size is based on the iobuf allocation table logic provided by Redpanda. In general, fixed sizing and size capping help reduce memory fragmentation.

Here is the implementation of append:

#![allow(unused)]
fn main() {
pub fn append(&mut self, src: *const u8, len: usize) -> () {
        if len <= self.available_bytes() {
            let fragment = self.get_last_fragment();
            self.size += fragment.append(src, len);
            return;
        }
        let mut remaining = len;
        let mut ptr = src;
        while remaining > 0 {
            self.append_new_fragment(remaining);
            let appended_size = self.get_last_fragment().append(ptr, remaining);
            ptr = unsafe { ptr.add(appended_size) };
            remaining -= appended_size;
            self.size += appended_size;
        }
    }
}

Share

Sharing an iobuf based on a pos and a len simply creates a new iobuf and find the io_fragments that intersect with the share range. If it is, we use the temporary_buffer's share method to get a reference of the temporary buffer.

#![allow(unused)]
fn main() {
pub fn share(&self, pos: usize, len: usize) -> IoBuf {
        let mut ret = IoBuf::new();
        let mut remaining = len;
        let mut pos = pos;
        for fragment in self.frags.iter() {
            if remaining == 0 {
                return ret;
            }
            if pos >= fragment.size() {
                pos -= fragment.size();
                continue;
            }

            let right = std::cmp::min(pos + remaining, fragment.size() - 1);
            let buffer = fragment.share(pos, right);
            ret.append_temporary_buffer(buffer);
            remaining -= right - pos - 1;
        }
        ret
    }
}

In other words, share increments the reference count of each of the temporary buffers that it intersects. When the iobuf drops, all the reference counts of the temporary buffers drop. Each temporary buffer is responsible for deallocating the memory if the reference count reaches zero.

Check out the implementation of iobuf here.

IoIteratorConsumer

To get data out of an iobuf, the best way is to use a consumer iterator. My IoIteratorConsumer implementation is based on Redpanda’s implementation. Check out their source code here.

#![allow(unused)]
fn main() {
let mut iobuf = IoBuf::new();
let values1 = generate_random_u8_vec(1000);
iobuf.append(values1.as_ptr(), values1.len());

let values2 = generate_random_u8_vec(3000);
iobuf.append(values2.as_ptr(), values2.len());

let mut consumer = IoIteratorConsumer::new(iobuf.begin());
let arr = consumer.consume_to_arr(1000);
assert_eq!(values1, arr);

let arr2 = consumer.consume_to_arr(3000);
assert_eq!(values2, arr2);
}

This example showcases how the IoIteratorConsumer is used. You initialize it with a pointer to an IoFragmentIter. You then invoke consumer_to_arr to copy the data into an array. Note that each call to consume_to_arr would advance the iterator pointer under the hood.

The method consume_to_arr is actually powered by the consume method, which takes a callback with the start pointer and the size.

#![allow(unused)]
fn main() {
pub fn consume<T>(&mut self, n: usize, consumer: T)
    where
        T: Fn(*const u8, usize) -> (),
    {
        let mut consumed = 0;
        while self.current_frag.is_some() && consumed < n {
            let segment_bytes_left = self.segment_bytes_left();

            if segment_bytes_left == 0 {
                self.current_frag = self.frag_it.next();
                self.frag_index = self.current_frag.map(|frag| frag.get_start());
                continue;
            }
            let step = std::cmp::min(segment_bytes_left, n - consumed);
            let frag_index = self.frag_index.unwrap();
            consumer(frag_index, step);
            self.frag_index = Some(unsafe { frag_index.add(step) });
            consumed += step;
        }
    }
}

The algorithm iterates over the fragments and calls callbacks with the start pointer and the size of that fragment to consume. Once the number of elements that have been consumed reaches n, the iterator stops.