Implementing a Copyless Redis Protocol in Rust with Parsing Combinators (2020-02-01)

Implementing a Copyless Redis Protocol in Rust with Parsing Combinators

Redis is a fantastic noSQL database with a beautifully simple design. One of the fundamental responsibilities of the redis server is to encode and decode RESP (Redis Serialization Protocol) messages. For example, when a client issues the command:

SET foo bar

That's encoded by the client and sent to the server as:

*3\r\n$3\r\nSET\r\n$3\r\nfoo$3\r\nbar\r\n

It's not necessary important to understand that RESP message right now, but the server will need to decode that back into something equivalent so it can perform the operation. This blog post will go through my efforts to implement a copyless RESP parser in redis-oxide.

Redis Serialization Protocol (RESP)

The redis folks were kind enough to simply document the v3 protocol on their website. The protocol is CLRF (\r\n) delimited, with each word carrying a type. The simplest types are Simple Strings and Errors, which look like:

+OK\r\n
-Error Msg\r\n

There's also bulkstrings, which are strings with a length:

$3\r\nFOO\r\n

We also have integers:

:1337\r\n

And finally we have arrays, which simply have a size indicating how many redis values follow.

*3\r\n$3\r\nSET\r\n$3\r\nfoo$3\r\nbar\r\n

We can read the array resp as:

*3\r\n        -- We have three elements in this array
$3\r\nSET\r\n -- First element is a bulk string of length 3 with value SET
$3\r\nfoo     -- Second element is also a bulk string of length 3 with value foo
$3\r\nbar\r\n -- Third element is also a bulk string of length 3 with value foo

Now that we're familiar with the protocol, lets get an idea on what parsing combinators actually are.

Parsing Combinators

The basic idea behind parsing combinators is that you build more complex parsers from simpler parsers. A simple parser could do something like fetch a word, and then we can use that later to parse sentences, as they're composed of words.

From the RESP examples above, we can see that it's a series of words delimited by CLRF, so it would be very handy to have a word parser. We can implement one pretty simply by collecting all bytes until we hit a CLRF. As we'll see later, almost everything in RESP will be parsed by varying the output of our word parser.

All parsing combinators need a way of representing position in the input. The strategy I'll be using is to have a cursor which I'll track the position of (starting at 0).

Copyless

Now that we understand RESP, and have an idea on what a parser combinator is, we'll need to understand how we can avoid copying in redis oxide. For context, I previously heap allocated almost all bytes in redis-oxide. Without digging too deep into the details, my fundamental types were defined as:

/// These types are used by state and ops to actually perform useful work.
pub type Value = Vec<u8>;
/// Key is the standard type to index our structures
pub type Key = Vec<u8>;

Which meant my parser needed to output Vec's, which are heap allocated. Value and Key are used almost everywhere in the application to represent almost all values in a redis command. So we need to change these types to be small, stack allocated items. No matter what direction we take, we need to play nice with tokio's codec scheme.

Now it should be understood that tokio's Decoder trait works as follows:

  1. redis-oxide uses the tokio framework which provides services to listen on sockets.
  2. One of the APIs provided is the Decoder trait, which you use to carefully read bytes off a socket to produce a type.
  3. Tokio maintains a large buffer and will copy bytes received off a socket into this buffer.
  4. It will pass this buffer to the parser until the parser signals that a type can be produced.
  5. The tokio managed buffer is smart, and allows for several byte slices to be made safely (Bytes type).
  6. The parser will cleave off enough bytes from the buffer when producing a type to allow tokio to safely copy later received bytes.
  7. We bypass lifetime issues as the Bytes type maintains reference counts, so we can pass slices up further up the application.
  8. Once redis-oxide is done with the produced types, we'll drop the slice references, and memory can be reclaimed.

So our parser will need to dance this careful dance. As described above, we can safely share byte slices of this underlying buffer using the Bytes type. So we'll redefine our fundamental types in terms of Bytes:

/// These types are used by state and ops to actually perform useful work.
pub type Value = Bytes;
/// Key is the standard type to index our structures
pub type Key = Bytes;

Aside from a massive related refactoring job, we now need to just write the parser 😛.

Writing the Parser

Writing the parser will require us to solve a few problems:

  1. Data representation and type transformations.
  2. Error handling and type setup.
  3. Writing the fundamental parsers.
  4. Dealing with arrays.

Data Representation and Type Transformations

To better understand our requirements, let us first consider our output type:

/// RedisValueRef is the canonical type for values flowing
/// through the system. Inputs are converted into RedisValues,
/// and outputs are converted into RedisValues.
#[derive(PartialEq, Clone)]
pub enum RedisValueRef {
    String(Bytes),
    Error(Bytes),
    Int(i64),
    Array(Vec<RedisValueRef>),
    NullArray,
    NullBulkString,
    ErrorMsg(Vec<u8>), // This is not a RESP type. This is an redis-oxide internal error type.
}

This is the type that redix-oxide uses to later run commands, so our parser will eventually need to output this type. This means we'll need to transform the given RESP buffer into one of those enums above. Doing it directly however is expensive – recall that the Bytes type needs to fiddle with reference counts. So we'll use a simpler type:

/// Fundamental struct for viewing byte slices
///
/// Used for zero-copy redis values.
struct BufSplit(usize, usize);

/// BufSplit based equivalent to our output type RedisValueRef
enum RedisBufSplit {
    String(BufSplit),
    Error(BufSplit),
    Int(i64),
    Array(Vec<RedisBufSplit>),
    NullArray,
    NullBulkString,
}

So as we're parsing, we'll need to need to track the start and end of a given byte slice that represents one of RedisBufSplit. Later we'll use BufSplit and the true tokio buffer to transform RedisBufSplit → RedisValueRef.

So for example, if I have the following RESP fragment:

frag:  $3\r\nFOO\r\n
index: 012 3 4567 8
(\r,\n are single characters)

We'd have the following type:

RedisBufSplit::String(BufSplit(4,7))

Representing the string byte slice "FOO".

Now that we can represent our values, we'll need to consider error handling.

Error handling and Types

There's a lot of ways that parsing can fail. A client could send us straight garbage, or something more subtle like an off-by-one error. We'll list each error in an enum type:

#[derive(Debug)]
pub enum RESPError {
    UnexpectedEnd,
    UnknownStartingByte,
    IOError(std::io::Error),
    IntParseFailure,
    BadBulkStringSize(i64),
    BadArraySize(i64),
}

As we're writing rust, it's natural to use the Result<T, E> type. Our success type needs to track our current position as well as returning a sensible type. As well, we'll need to signal our parsing status to tokio. The Decoder trait has the following signature:

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error>;

This is a peculiar type, so let's work through the possible cases:

Case Meaning
Ok(Some(Self::Item)) We successfully parsed a value!
Ok(None) Looks fine but incomplete. We need the client to send more data.
Err(Self::Error) Parsing failed somehow.

So now we have all the information required. Our Item type needs to track position and the actual type, so we can use a tuple (usize, RedisBufSplit). Our fundamental parsing type is then:

type RedisResult = Result<Option<(usize, RedisBufSplit)>, RESPError>;

All subsequent parsers will eventually need to output RedisResult.

Writing the Fundamental Parser

Now that we understand our data representation and errors, lets write our first parser! As mentioned several times, RESP is a word based protocol. So lets write a word parser! The only thing we care about is finding the position (index) of the next CLRF.

As this is infallible, we don't necessary need to use the RedisResult type. So our function can have the following signature:

fn word(buf: &BytesMut, pos: usize) -> Option<(usize, BufSplit)>

So we'll take the tokio provided buffer buf, and our current position pos, and if we can, output Some((next_pos, BufSplit)). We'll use burntsushi's fantastic memchr crate to accelerate searching for CLRF (\r\n):

/// Get a word from `buf` starting at `pos`
#[inline]
fn word(buf: &BytesMut, pos: usize) -> Option<(usize, BufSplit)> {
    // We're at the edge of `buf`, so we can't find a word.
    if buf.len() <= pos {
        return None;
    }
    // Find the position of the b'\r'
    memchr(b'\r', &buf[pos..]).and_then(|end| {
        if end + 1 < buf.len() {
            // pos + end == first index of b'\r' after `pos`
            // pos + end + 2 == ..word\r\n<HERE> -- skip to after CLRF
            Some((pos + end + 2, BufSplit(pos, pos + end)))
        } else {
            // Edge case: We received just enough bytes from the client
            // to get the \r but not the \n
            None
        }
    })
}

Great! We can now efficiently grab individual words from our input buffer. Even better, simple strings and errors are simple type transformations of this:

fn simple_string(buf: &BytesMut, pos: usize) -> RedisResult {
    Ok(word(buf, pos).map(|(pos, word)| (pos, RedisBufSplit::String(word))))
}

fn error(buf: &BytesMut, pos: usize) -> RedisResult {
    Ok(word(buf, pos).map(|(pos, word)| (pos, RedisBufSplit::Error(word))))
}

If that syntax isn't super familiar, both of the above are equivalent to:

fn simple_string(buf: &BytesMut, pos: usize) -> RedisResult {
    match word(buf, pos) {
        Some((pos, word)) => Ok(Some((pos, RedisBufSplit::String(word)))),
        None => Ok(None),
    }
}

So all we're doing is wrapping the BufSplit returned by word in the appropriate RedisBufSplit type.

Nice! So our easy types are out of the way. We now need to parse ints, bulk strings, and finally arrays.

Parsing Ints

Ints are the first non-trivial type to parse. RESP represents signed 64 bit integers as a base 10 string, so we'll need to:

  1. Grab a word (BufSplit, can turn into byte slice with BufSplit::as_slice)
  2. Convert byte slice to a str
  3. Convert the str to an i64

This process can fail on steps 2 and 3. Rust requires that strings are uft-8 encoded, so converting to a str can fail. Then someone could pass "abc" as the int, so converting to i64 can fail. Keeping those in mind, we can now write the int function:

fn int(buf: &BytesMut, pos: usize) -> Result<Option<(usize, i64)>, RESPError> {
    match word(buf, pos) {
        Some((pos, word)) => {
            // word.as_slice(buf) is the method call BufSplit::as_slice(&self, &BytesMut) to access the byte slice.
            let s = str::from_utf8(word.as_slice(buf)).map_err(|_| RESPError::IntParseFailure)?;
            // Convert the string to an i64. Note the `?` for early returns.
            let i = s.parse().map_err(|_| RESPError::IntParseFailure)?;
            Ok(Some((pos, i)))
        }
        None => Ok(None),
    }
}

Nice, so we can grab ints from the input. We only need a trivial function to get the desired RedisResult type:

fn resp_int(buf: &BytesMut, pos: usize) -> RedisResult {
    Ok(int(buf, pos)?.map(|(pos, int)| (pos, RedisBufSplit::Int(int))))
}

Bulk Strings

So bulk strings in RESP start with a length (i64), and then the string content (delimited by CLRF of course). So we can use our previous int function, and then work through the possible cases (see second code block for comments).

Here's the code without comments:

fn bulk_string(buf: &BytesMut, pos: usize) -> RedisResult {
    match int(buf, pos)? {
        Some((pos, -1)) => Ok(Some((pos, RedisBufSplit::NullBulkString))),
        Some((pos, size)) if size >= 0 => {
            let total_size = pos + size as usize;
            if buf.len() < total_size + 2 {
                Ok(None)
            } else {
                let bb = RedisBufSplit::String(BufSplit(pos, total_size));
                Ok(Some((total_size + 2, bb)))
            }
        }
        Some((_pos, bad_size)) => Err(RESPError::BadBulkStringSize(bad_size)),
        None => Ok(None),
    }
}

And here's the same code with comments explaining what's going on:

fn bulk_string(buf: &BytesMut, pos: usize) -> RedisResult {
    // recall that the `pos` returned by `int` is the first index of the string content.
    match int(buf, pos)? {
        // special case: redis defines a NullBulkString type, with length of -1.
        Some((pos, -1)) => Ok(Some((pos, RedisBufSplit::NullBulkString))),
        // We have a size >= 0
        Some((pos, size)) if size >= 0 => {
            // We trust the client here, and directly calculate the end index of string (absolute w.r.t pos)
            let total_size = pos + size as usize;
            // The client hasn't sent us enough bytes
            if buf.len() < total_size + 2 {
                Ok(None)
            } else {
                // We have enough bytes, so we can generate the correct type.
                let bb = RedisBufSplit::String(BufSplit(pos, total_size));
                // total_size + 2 == ...bulkstring\r\n<HERE> -- after CLRF
                Ok(Some((total_size + 2, bb)))
            }
        }
        // We recieved a garbage size (size < -1), so error out
        Some((_pos, bad_size)) => Err(RESPError::BadBulkStringSize(bad_size)),
        // Not enough bytes to parse an int (i.e. no CLRF to delimit the int)
        None => Ok(None),
    }
}

Now we have only one type left: Arrays.

Arrays: An Issue

Arrays are fundamentally more complex than other types as they are a sequence of redis values. We'll have to be more clever. They are defined as a size (i64) and then a size number of redis values. This is naturally recursive, as we can have arrays inside arrays.

The issue is that we need a function which will parse redis values, as fn array(..) is only responsible for redis arrays. But that generic parse function will also need to call the array parser!

Thankfully we can use some first year CS.

Mutual Recursion: Top Level Parse Function and Arrays

Lets first define our top level parse function. It's responsible for taking a buffer and returning a RedisResult, agnostic to particular RESP types. RESP tags every element with a type byte, so our function is short:

fn parse(buf: &BytesMut, pos: usize) -> RedisResult {
    if buf.is_empty() {
        return Ok(None);
    }

    match buf[pos] {
        b'+' => simple_string(buf, pos + 1),
        b'-' => error(buf, pos + 1),
        b'$' => bulk_string(buf, pos + 1),
        b':' => resp_int(buf, pos + 1),
        b'*' => array(buf, pos + 1),
        _ => Err(RESPError::UnknownStartingByte),
    }
}

So parse(..) will check the byte at pos (initially 0), and use that to delegate to the correct function. Now this is very useful, and will allow us to write the array parser.

Here's the code without comments:

fn array(buf: &BytesMut, pos: usize) -> RedisResult {
    match int(buf, pos)? {
        None => Ok(None),
        Some((pos, -1)) => Ok(Some((pos, RedisBufSplit::NullArray))),
        Some((pos, num_elements)) if num_elements >= 0 => {
            let mut values = Vec::with_capacity(num_elements as usize);
            let mut curr_pos = pos;
            for _ in 0..num_elements {
                match parse(buf, curr_pos)? {
                    Some((new_pos, value)) => {
                        curr_pos = new_pos;
                        values.push(value);
                    }
                    None => return Ok(None),
                }
            }
            Ok(Some((curr_pos, RedisBufSplit::Array(values))))
        }
        Some((_pos, bad_num_elements)) => Err(RESPError::BadArraySize(bad_num_elements)),
    }
}

And the same code with comments:

fn array(buf: &BytesMut, pos: usize) -> RedisResult {
    match int(buf, pos)? {
        // Not enough bytes to determine the array size
        None => Ok(None),
        // special value: NullArray. Has size -1.
        Some((pos, -1)) => Ok(Some((pos, RedisBufSplit::NullArray))),
        // Happy path. We have a valid size (num_elements > 0)
        Some((pos, num_elements)) if num_elements >= 0 => {
            // As we're recieving a dynamic number of elements, we need to heap allocate our BufSplits.
            let mut values = Vec::with_capacity(num_elements as usize);
            // We're going to forward iterate on `curr_pos`
            let mut curr_pos = pos;
            for _ in 0..num_elements {
                // Mutual Recursion! We need to parse the value at `curr_pos`
                match parse(buf, curr_pos)? {
                    // We got a value, so add it to the `values` vector and
                    // update `curr_pos`.
                    Some((new_pos, value)) => {
                        curr_pos = new_pos;
                        values.push(value);
                    }
                    // Not enough bytes. Abandon parsing and free vec.
                    None => return Ok(None),
                }
            }
            // We had enough bytes to fully parse the array! Return it.
            Ok(Some((curr_pos, RedisBufSplit::Array(values))))
        }
        // Client sent us a garbage size (num_elements < -1)
        Some((_pos, bad_num_elements)) => Err(RESPError::BadArraySize(bad_num_elements)),
    }
}

So we can now parse arrays, and can now put everything together.

Putting everything together

We're so close! We just need a few conversion functions before we can implement Decoder. Once we're done parsing, we're guaranteed to have a contiguous slice of memory that corresponds to the RedisBufSplit types we've generated until this moment. So we just need two functions:

  1. Take the large Bytes buffer and a BufSplit(start,end) slice into it to make a byte slice (also Bytes type)
  2. Take the RedisBufSplit and the large Bytes buffer and produce RedisValueRef types.

The conversion function is actually pretty mechanical:

// First, we need a convenient way to convert our index pairs into byte slices.
impl BufSplit {
    /// Get a lifetime appropriate slice of the underlying buffer.
    ///
    /// Constant time.
    #[inline]
    fn as_slice<'a>(&self, buf: &'a BytesMut) -> &'a [u8] {
        &buf[self.0..self.1]
    }

    /// Get a Bytes object representing the appropriate slice
    /// of bytes.
    ///
    /// Constant time.
    #[inline]
    fn as_bytes(&self, buf: &Bytes) -> Bytes {
        buf.slice(self.0..self.1)
    }
}
// Second, we'll need to convert a RedisBufSplit -> RedisValueRef given a Bytes buffer.
impl RedisBufSplit {
    fn redis_value(self, buf: &Bytes) -> RedisValueRef {
        match self {
            // bfs is BufSplit(start, end), which has the as_bytes method defined above
            RedisBufSplit::String(bfs) => RedisValueRef::String(bfs.as_bytes(buf)),
            RedisBufSplit::Error(bfs) => RedisValueRef::Error(bfs.as_bytes(buf)),
            RedisBufSplit::Array(arr) => {
                RedisValueRef::Array(arr.into_iter().map(|bfs| bfs.redis_value(buf)).collect())
            }
            RedisBufSplit::NullArray => RedisValueRef::NullArray,
            RedisBufSplit::NullBulkString => RedisValueRef::NullBulkString,
            RedisBufSplit::Int(i) => RedisValueRef::Int(i),
        }
    }
}

We can now implement the Decoder trait so our parser fits in with the tokio machinery:

/// The struct we're using. We don't need to store anything in the struct.
/// Later on we can expand this struct for optimization purposes.
#[derive(Default)]
pub struct RespParser;

impl Decoder for RespParser {
    type Item = RedisValueRef;
    type Error = RESPError;
    fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        if buf.is_empty() {
            return Ok(None);
        }

        match parse(buf, 0)? {
            Some((pos, value)) => {
                // We parsed a value! Shave off the bytes so tokio can continue filling the buffer.
                let our_data = buf.split_to(pos);
                // Use `redis_value` defined above to get the correct type
                Ok(Some(value.redis_value(&our_data.freeze())))
            }
            None => Ok(None),
        }
    }
}

We did it! We can now decode RedisValueRef's from bytes off a socket! A complete parser includes encoding RedisValueRef's, but the code is pretty simple so you can read it here. You can view the tests here and how it's actually used in redis-oxide here.

Conclusion

Overall this 500+ line journey has netted us an efficient, zero copy RESP parser using parsing combinators. It was a lot of work to get the project refactored, but I am proud to have a solution I wrote myself and actually understand (no offense to the combine people).