Back to posts.

Building a zero copy parser

While implementing a RTSP parser I thought about using the excellent http_parser from Joyent. Although this is a very well programmed library I wanted to have a rtsp reader w/o any dependencies. Also, the project for which I'm developing RTSP reader is completely managed by myself, so I can make sure that I'm not sending any silly bytes to the parser which means I don't have to bulid in extra safety checks.

Normally when I implement a simple parser for e.g. parsing http headers I use C++ std::string and find, split, and trim like features; though this time I wanted to implement a parser which tries not to make any memory allocations.

So what does that mean? Well, basically while parsing you only keep track of the start and end bytes of the information you want to extract. This means that strings that you parsed won't be null-terminated and can't be printed using printf() w/o using the %.*s format.

While implementing this I realised that implementing a zero copy parser is a lot simpler then I expected. In my implementation I take the following approach.

  • I parse byte per byte
  • I create separate loops for data extraction
  • I keep generic static

Parsing byte per byte

The first point seems pretty clear. I iterate over the buffer and check byte per byte if I need to stop parsing, change state etc.. Basically this means I have one main while loop which does the following, where dx is the current byte index (default 0) and nbytes is the number of bytes in the buffer.

while (dx < nbytes) {
  // Parse...
}

Separate loops for data extraction

The second point from the above list is the most interesting part to explain in more in detail. So I create a loop for data extraction. When you create a loop while parsing, you always need to make sure that you use a condition which checks that you're not reading outside the buffer, this can simply be done by doing: for(; dx < nbytes; ++dx) But you don't only want to check if you're reading outside the buffer; you also want to check for some character which marks the end of the data you want to extract.

For example, consider the following lines:

Content-Type: application/sdp\r\n
CSeq: 2\r\n
User-Agent: Lavf56.11.0\r\n
Content-Length: 307\r\n

Okay lets start with the first line, Content-Type: application/sdp\r\n. To extract the header name, we scan until we find the ':' character. Now, we basically have two end-conditions; lets create the loop. dx is the current read position.

size_t field_start = dx;
size_t field_end = dx;
 
for (; dx < nbytes; ++dx) {
   if (data[dx] == ':') {
     dx += 1; // we jump over the ":" character.
     break;
   }
   field_end = dx;
}

The above loop is a simple and readable approach to parsing sections from a buffer, byte per byte. We read as long as there are enough bytes available dx < nbytes. If the current character is our end character we stop and break out our loop. If not we update the end position.

So a more semantic pseudo code example:

size_t field_start = dx;
size_t field_end = dx;
 
while "there are bytes available" do 
 
   if "current byte is the end byte" 
      read_index = read_index + 1;
      break;
   endif
 
   field_end = read_index;
   read_index = read_index + 1;
 
end while

What if we want to check for multiple end characters?

In the case of parsing http headers we need to check for the \r\n character sequence which marks the end of a header line. To parse this we need to change the above loop a little bit. We simply peek not only at the current character, but also the next one. Therefore our end condition needs to be dx < (nbytes - 1). If we need to check two bytes ahead, we do dx < (nbytes - 2), etc..

field_start = dx;
 
for (; dx < nbytes - 1; ++dx) {
  if (data[dx] == '\r' && data[dx + 1] == '\n')  {
    dx += 2; // we want to jump over the \n
    break;
  }
  field_end = dx;
}

This appraoch can be used for any 'data' that you want to extract from a byte stream. In the above text I did not mention that you also want to keep track of the start position where your data begins, see field_start.

Keeping state and number of bytes parsed

The last part of my approach is how I keep state while parsing. Let me first explain why I keep state before jumping into details. When parsing some data you want to keep state so that you can handle buffers which do not contain complete chunks of parseable data which may occur when you're e.g. reading from the network, or when you're reading chunks from a file. For example, it's possible that when you're parsing a HTTP request you first receive the method line then some headers and then some body data. Because you don't want to parse the whole buffer from the start, when you receive data, you keep track of what parts you've parsed. In my RTSP reader, I keep track if I parsed the method line, the headers and the body. This does mean that when I have 5 reads before the end-of-headers is detected, the parser stays in the header-parsing-state. But this doesn't mean that I parse the same data over and over again. To make sure that, while you're in a specfic state, you don't keep parsing the same bytes over and over again, you also want to keep track of the number of bytes that you parsed. Every time you're ready with parsing a complete piece of data, you update the number of bytes parsed and let the user know how many bytes he can flush from the buffer. This is why my parsing functions look like:

int parse(const char* data, size_t nbytes, size_t& nparsed);

The code below shows how I'm keeping track of state and how I'm updating the number of bytes. This code parser basic RTSP headers and is created only to explain the methods I described above.

RtspReader.h

class RtspReader {
  public:
    RtspReader();
    int init(RtspReaderListener* lis);
    int parse(const char* data, size_t nbytes, size_t& nparsed);
 
  public:
    RtspReaderListener* listener;
    int state;
    int method;
    uint64_t cseq; /* @toto what's the max? */
  };
#include <sstream>
#include <stdio.h>
#include <algorithm>
#include <RtspReader.h>
#include <poly/Log.h>
 
#if defined(__APPLE__)
#  define strnicmp(a, b, n) strncasecmp(a, b, n)
#endif
 
namespace poly {
 
  RtspReader::RtspReader()
    :listener(NULL)
    ,state(RTSP_RSTATE_NONE)
    ,method(RTSP_METHOD_NONE)
    ,cseq(0)
  {
  }
 
  int RtspReader::init(RtspReaderListener* lis) {
 
    if (NULL == lis) {
      SX_ERROR("Cannot initialize the rtsp reader because the given listener is NULL.");
      return -1;
    }
 
    listener = lis;
 
    return 0;
  }
 
  int RtspReader::parse(const char* data, size_t nbytes, size_t& nparsed) {
 
    size_t j = 0;
    size_t field_start = 0;
    size_t field_end = 0;
    size_t val_start = 0;
    size_t val_end = 0;
    size_t method_start = 0;
    size_t method_end = 0;
    size_t len = 0;
    size_t uri_start = 0;
    size_t uri_end = 0;
    size_t version_start = 0;
    size_t version_end = 0;
    size_t dx = 0;
 
    nparsed = 0;
 
    if (NULL == listener) {
      SX_ERROR("Cannot parse the given data because the listener is not set.");
      return -1;
    }
 
    if (NULL == data) {
      SX_ERROR("Cannot parse RTSP because given data is NULL.");
      return -1;
    }
 
    if (0 == nbytes) {
      SX_ERROR("Cannot parse RTSP because given data length is 0.");
      return -2;
    }
 
    while (dx < nbytes) {
 
      if (RTSP_RSTATE_NONE == state) {
 
        /* Reset */
        method = RTSP_METHOD_NONE;
        cseq = 0;
 
        /* REQUEST-LINE */
        {
          /* method */
          method_start = dx;
          for (; dx < nbytes; ++dx) {
            if (data[dx] == ' ') {
              dx = dx + 1;
              break;
            }
            method_end = dx;
          }
 
          /* URI */
          uri_start = dx;
          for (; dx < nbytes - 1; ++dx) {
            if (data[dx] == ' ') {
              dx = dx + 1;
              break;
            }
            uri_end = dx;
          }
 
          /* RTSP/1.0 */
          version_start = dx;
 
          for (; dx < nbytes - 1; ++dx) {
 
            if (data[dx] == '\r' && data[dx + 1] == '\n') {
 
              field_end = dx ;
              dx = dx + 2;
              nparsed = dx;
              state = RTSP_RSTATE_METHOD;
              len = 1 + method_end - method_start;
 
              if (0 == strnicmp(data + method_start, "OPTIONS", std::min<size_t>(len, 7))) {
                method = RTSP_METHOD_OPTIONS;
              }
              else if (0 == strnicmp(data + method_start, "DESCRIBE", std::min<size_t>(len, 7))) {
                method = RTSP_METHOD_DESCRIBE;
              }
              else if (0 == strnicmp(data + method_start, "ANNOUNCE", std::min<size_t>(len, 8))) {
                method = RTSP_METHOD_ANNOUNCE;
              }
              else if (0 == strnicmp(data + method_start, "SETUP", std::min<size_t>(len, 5))) {
                method = RTSP_METHOD_SETUP;
              }
              else if (0 == strnicmp(data + method_start, "PLAY", std::min<size_t>(len, 4))) {
                method = RTSP_METHOD_PLAY;
              }
              else if (0 == strnicmp(data + method_start, "PAUSE", std::min<size_t>(len, 5))) {
                method = RTSP_METHOD_PAUSE;
              }
              else if (0 == strnicmp(data + method_start, "TEARDOWN", std::min<size_t>(len, 8))) {
                method = RTSP_METHOD_TEARDOWN;
              }
              else if (0 == strnicmp(data + method_start, "GET_PARAMETER", std::min<size_t>(len, 13))) {
                method = RTSP_METHOD_GET_PARAMETER;
              }
              else if (0 == strnicmp(data + method_start, "SET_PARAMETER", std::min<size_t>(len, 13))) {
                method = RTSP_METHOD_SET_PARAMETER;
              }
              else if (0 == strnicmp(data + method_start, "REDIRECT", std::min<size_t>(len, 8))) {
                method = RTSP_METHOD_REDIRECT;
              }
              else if (0 == strnicmp(data + method_start, "RECORD", std::min<size_t>(len, 6))) {
                method = RTSP_METHOD_RECORD;
              }
              else {
                method = RTSP_METHOD_NONE;
              }
 
              listener->onRtspReaderMethod(method);
 
              break;
            }
          }
        }
      }
      else if (RTSP_RSTATE_METHOD == state) {
 
         /* HEADERS */
        field_start = dx;
        for(; dx < nbytes; ++dx) {
          if (data[dx] == ':') {
            dx = dx + 1;
            break;
          }
          field_end = dx;
        }
 
        /* remove any spaces between : and value. */
        while (dx < nbytes && data[dx] == ' ') {
          dx++;
        }
        val_start = dx;
 
        /* Field value. */
        for(; dx < nbytes - 1; ++dx) {
          if (data[dx] == '\r' && data[dx + 1] == '\n') {
 
            dx = dx + 2;
            nparsed = dx + 1;
 
            if (len >= 4 && 0 == strnicmp(data + field_start, "CSEQ", 4)) {
              std::stringstream ss(std::string(data + val_start, 1 + (val_end - val_start)));
              ss >> cseq;
            }
 
            listener->onRtspReaderHeader(data + field_start,  1 + (field_end - field_start), data + val_start, 1 + (val_end - val_start));
            break;
          }
          val_end = dx;
        }
 
         /* last line? */
        if (dx < (nbytes - 1)
            && data[dx + 0] == '\r'
            && data[dx + 1] == '\n')
          {
            dx = dx + 1;
            nparsed = dx + 1;
            state = RTSP_RSTATE_HEADERS;
            listener->onRtspReaderHeadersComplete();
            break;
          }
      }
      else {
        break;
      }
    } /* while */
 
    return 0;
  }
 
} /* namespace poly */