Category Archives: Articles

Understanding the code inside Tornado, the asynchronous web server

My goal here is to have a walk through the lower layers of the Tornado asynchronous web server. I take a bottom-up approach, starting with the polling loop and going up to the application layer, pointing out the interesting things I see on my way.

So if you plan on reading the source code of the Tornado web framework, or you are just curious to see how an asynchronous web server works, I would love to be your guide.

After reading this, you will:

  • be able to write the server part of Comet applications, even if you have to do it from scratch
  • have a better understanding of the Tornado web framework, if you plan do develop on it
  • have a bit more informed opinion in the tornado-twisted debate

Intro

I’ll start with a few words of introduction to the Tornado project, in case you have no idea what it is and why you might be interested in it. If you’re already interested in it, just jump to the next section.

Tornado is a asynchronous http server and web framework written in Python. It is the framework that powers the FriendFeed website, recently acquired by Facebook. FriendFeed has quite a few users and many real-time features, so performance and scalability must have high priorities there. Since it is open source now (kudos to Facebook), we can all have a look inside it to see how it works.

I also feel obliged to talk a bit on nonblocking IO or asynchronous IO (AIO) . If you already know what it is, goto next_section. I’ll try to demonstrate it with a simple example.

Let’s suppose you’re writing an application that has to query another server for some data (the database for example, or some sort of remote API) and it’s known that this query can take a long time. Let’s say, 5 seconds. In many web frameworks the handler would look something like:

def handler_request(self, request):
    answ = self.remote_server.query(request) # this takes 5 seconds
    request.write_response(answ)

If you do this in a single thread, you will serve one client every 5 second. During the five secs, all other have to wait, so you’re serving clients with a whooping rate of 0.2 requests per second. Awesome!

Of course, nobody is that naive, so most will use a multi-threaded server to be able to support more clients at once. Lets say you have 20 threads. You improved performance 20 times, so the rate is now 4 request per second. Still, way too small. You can keep throwing threads at the problem, but threads are expensive in terms of memory usage and scheduling. I doubt you’ll ever reach hundreds of requests per second this way.

With AIO, however, thousands of such requests per second are a breeze. The handler has to be changed to look something like this:

def handler_request(self, request):
    self.remote_server.query_async(request, self.response_received)     

def response_received(self, request, answ):    # this is called 5 seconds later
    request.write(answ)

The idea is that we’re not blocking while waiting for the answer to come. Instead, we give the framework a callback function to call us when the answer has come. In the mean time, we’re free to serve other clients.

This is also the downside of AIO: the code will be a bit… well, twisted. Also, if you’re in a single threaded AIO server like Tornado, you have to be careful never to block, because all the pending requests will be delayed by that.

A great resource to learn more (than this over simplistic intro) about asynchronous IO is The C10K problem page.

Source code

The project is hosted at github. You can get it, although you don’t need it for reading this article, with:

git clone git://github.com/facebook/tornado.git

The tornado subdirectory contains a .py file for each of the modules so you can easily identify them if you have a checkout of the repository. In each source file, you will find at least one large doc string explaining the module, and giving an example or two on how to use it.

IOLoop

Lets go directly into the core of the server and look at the ioloop.py file. This module is the heart of the asynchronous mechanism. It keeps a list of the open file descriptors and handlers for each. Its job is to select the ones that are ready for reading or writing and call the associated handler.

To add a socket to the IOLoop, the application calls the add_handler() method:

def add_handler(self, fd, handler, events):
    """Registers the given handler to receive the given events for fd."""
    self._handlers[fd] = handler
    self._impl.register(fd, events | self.ERROR)

The _handlers dictionary maps the file descriptor with the function to be called when the file descriptor is ready (handler in Tornado terminology). Then, the descriptor is registered to the epoll list. Tornado cares about three types of events: READ, WRITE and ERROR. As you can see, ERROR is automatically added in for you.

The self._impl is an alias to either select.epoll() or select.select(). We’ll see how it chooses between them a bit later.

Now lets see the actual main loop, somehow weirdly placed in the start() method:

def start(self):
    """Starts the I/O loop.

    The loop will run until one of the I/O handlers calls stop(), which
    will make the loop stop after the current event iteration completes.
    """
    self._running = True
    while True:

    [ ... ]

        if not self._running:
            break

        [ ... ]

        try:
            event_pairs = self._impl.poll(poll_timeout)
        except Exception, e:
            if e.args == (4, "Interrupted system call"):
                logging.warning("Interrupted system call", exc_info=1)
                continue
            else:
                raise

        # Pop one fd at a time from the set of pending fds and run
        # its handler. Since that handler may perform actions on
        # other file descriptors, there may be reentrant calls to
        # this IOLoop that update self._events
        self._events.update(event_pairs)
        while self._events:
            fd, events = self._events.popitem()
            try:
                self._handlers[fd](fd, events)
            except KeyboardInterrupt:
                raise
            except OSError, e:
                if e[0] == errno.EPIPE:
                    # Happens when the client closes the connection
                    pass
                else:
                    logging.error("Exception in I/O handler for fd %d",
                                  fd, exc_info=True)
            except:
                logging.error("Exception in I/O handler for fd %d",
                              fd, exc_info=True)

The poll() function returns a dictionary with (fd: events) pairs, stored in the event_pairs variable. The "Interrupted system call" special case exception is needed because the C library poll() function can return EINTR (which has the numerical value of 4), when a signal comes before any events occurred. See man poll for details.

The inner while loop takes the pairs from the event_pairs dictionary one by one and calls the associated handler. The pipe error exception is silenced here. To keep the generality of this class it would have been perhaps a better idea to catch this in the http handlers, but it was probably easier like this.

The comment explains why the dictionary had to be parsed using popitem() rather than the more obvious:

for fd, events in self._events.items():

In a nutshell, the dictionary can be modified during the loop, inside the handlers. See, for example, the removeHandler() function. The method extracts the fd from the _events dictionary, so that the handler is not called even if it was selected by the current poll iteration.

def remove_handler(self, fd):
    """Stop listening for events on fd."""
    self._handlers.pop(fd, None)
    self._events.pop(fd, None)
    try:
        self._impl.unregister(fd)
    except OSError:
        logging.debug("Error deleting fd from IOLoop", exc_info=True)

The (pointless) loop termination trick

A nice trick is how the loop is stopped. The self._running variable is used to break from it and it can be set to False from the handlers by using the stop() method. Normally, that would just be the end of it, but the stop() method might be also called from a signal handler. If 1) the loop is in poll(), 2) no requests are coming to the server and 3) the signal is not delivered to the right thread by the OS, you would have to wait for the poll to timeout. Considering how unlikely this is and that poll_timeout is 0.2 seconds by default, that’s hardly a tragedy, really.

But anyway, to do it they use an anonymous pipe with one end in the set of polled file descriptors. When terminating, it writes something on the other end, effectively waking up the loop from poll. Here is the selected code for it:

def __init__(self, impl=None):

    [...]

    # Create a pipe that we send bogus data to when we want to wake
    # the I/O loop when it is idle
    r, w = os.pipe()
    self._set_nonblocking(r)
    self._set_nonblocking(w)
    self._waker_reader = os.fdopen(r, "r", 0)
    self._waker_writer = os.fdopen(w, "w", 0)
    self.add_handler(r, self._read_waker, self.WRITE)


def _wake(self):
    try:
        self._waker_writer.write("x")
    except IOError:
        pass

In fact, it seems to be a bug in the above code: the read file descriptor r, although opened for reading, is registered with the WRITE event, which cannot occur. As I said earlier, it hardly makes a difference so I’m not surprised that they actually didn’t noticed this is not working. I’ve pinged the mailing list about this, but I got no answer so far.

Timers

Another nice feature of the IOLoop module is the simple timers implementation. A list of timers is maintained sorted by expiration time, by using python’s bisect module:

def add_timeout(self, deadline, callback):
    """Calls the given callback at the time deadline from the I/O loop."""
    timeout = _Timeout(deadline, callback)
    bisect.insort(self._timeouts, timeout)
    return timeout

Inside the main loop, the callbacks from all the expired timers are simply executed in that order, until the current time is reached. The poll timeout is adjusted such as the next timer is not delayed if no new requests arrive.

self._running = True
while True:
    poll_timeout = 0.2

    [ ... ]
    if self._timeouts:
        now = time.time()
        while self._timeouts and self._timeouts[0].deadline <= now:
            timeout = self._timeouts.pop(0)
            self._run_callback(timeout.callback)
        if self._timeouts:
            milliseconds = self._timeouts[0].deadline - now
            poll_timeout = min(milliseconds, poll_timeout)

[ ... poll ]

Selecting the select method

Let's now have a quick look at the code that selects the poll/select implementation. Python 2.6 has epoll support in the standard library, which is sniffed with hasattr() on the select module. If on python < 2.6, Tornado will try to use its on C epoll module. You can find its sources in the tornado/epoll.c file. Finally, if that fails (epoll is specific to Linux), it will fallback to selec. _Select and _EPoll classes are wrappers for emulating the select.epoll API. Before doing your benchmarks, make sure you use epoll, because select has poor performance with large sets of file descriptors.

# Choose a poll implementation. Use epoll if it is available, fall back to
# select() for non-Linux platforms
if hasattr(select, "epoll"):
    # Python 2.6+ on Linux
    _poll = select.epoll
else:
    try:
        # Linux systems with our C module installed
        import epoll
        _poll = _EPoll
    except:
        # All other systems
        import sys
        if "linux" in sys.platform:
            logging.warning("epoll module not found; using select()")
        _poll = _Select

With this, we've covered most of the IOLoop module. As advertised, it is indeed a nice and simple piece of code.

From sockets to streams

Let's have a look now at the IOStream module. Its purpose is to provide a small level of abstraction over nonblocking sockets, by offering three functions:

  • read_until(), which reads from the socket until it finds a given string. This is convenient for reading the HTTP headers until the empty line delimiter.
  • read_bytes(), which reads a give number of bytes from the socket. This is convenient for reading the body of the HTTP message.
  • write() which writes a given buffer to the socket and keeps retrying until the whole buffer is sent.

All of them can call a callback when they are done, in asynchronous style.

The write() implementation buffers the data provided by the caller and writes it whenever IOLoop calls its handler, because the socket is ready for writing:

def write(self, data, callback=None):
    """Write the given data to this stream.

    If callback is given, we call it when all of the buffered write
    data has been successfully written to the stream. If there was
    previously buffered write data and an old write callback, that
    callback is simply overwritten with this new callback.
    """
    self._check_closed()
    self._write_buffer += data
    self._add_io_state(self.io_loop.WRITE)
    self._write_callback = callback

The function that handles the WRITE event simply does socket.send() until EWOULDBLOCK is hit or the buffer is finished:

def _handle_write(self):
    while self._write_buffer:
        try:
            num_bytes = self.socket.send(self._write_buffer)
            self._write_buffer = self._write_buffer[num_bytes:]
        except socket.error, e:
            if e[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
                break
            else:
                logging.warning("Write error on %d: %s",
                                self.socket.fileno(), e)
                self.close()
                return
    if not self._write_buffer and self._write_callback:
        callback = self._write_callback
        self._write_callback = None
        callback()

Reading does the reverse process. The read event handler keeps reading until enough is gathered in the read buffer. This means either it has the required length (if read_bytes()) or it contains the requested delimiter (if read_until()):

def _handle_read(self):
    try:
        chunk = self.socket.recv(self.read_chunk_size)
    except socket.error, e:
        if e[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
            return
        else:
            logging.warning("Read error on %d: %s",
                            self.socket.fileno(), e)
            self.close()
            return
    if not chunk:
        self.close()
        return
    self._read_buffer += chunk
    if len(self._read_buffer) >= self.max_buffer_size:
        logging.error("Reached maximum read buffer size")
        self.close()
        return
    if self._read_bytes:
        if len(self._read_buffer) >= self._read_bytes:
            num_bytes = self._read_bytes
            callback = self._read_callback
            self._read_callback = None
            self._read_bytes = None
            callback(self._consume(num_bytes))
    elif self._read_delimiter:
        loc = self._read_buffer.find(self._read_delimiter)
        if loc != -1:
            callback = self._read_callback
            delimiter_len = len(self._read_delimiter)
            self._read_callback = None
            self._read_delimiter = None
            callback(self._consume(loc + delimiter_len))

The _consume() function, that is used above, makes sure that no more that what was requested is taken out of the stream, and subsequent reads will get the immediate next bytes:

def _consume(self, loc):
    result = self._read_buffer[:loc]
    self._read_buffer = self._read_buffer[loc:]
    return result

Also worth noting in the _handle_read() function above is the capping of the read buffer at self.max_buffer_size. The default value for it is 100MB, which seems a bit large to me. For example, if an attacker makes just 100 connections to the server and keeps pushing headers to it without the end headers delimiter, Tornado will need 10 GB of RAM to serve the requests. Even if the RAM is not a problem, the copying operations done with a buffer of this size (like in the _consume() method above) will likely overload the server. Note also how _handle_read() searches the delimiter in the whole buffer on each iteration, so if the attacker sends the huge data in small chunks, the server has to do a lot of searches. Bottom of line, you might want to tune this parameter unless you really expect requests that big and you have the hardware for it.

The HTTP server

Armed with the IOLoop and IOStream modules, writing an asynchronous HTTP server is just one step away, and that step is done in httpserver.py.

The HTTPServer class itself only does the accepting of the new connections by adding their sockets to the IOLoop. The listening socket itself is part of IOLoop, as seen in the listen() method:

def listen(self, port, address=""):
    assert not self._socket
    self._socket = socket.(socket.AF_INET, socket.SOCK_STREAM, 0)
    flags = fcntl.fcntl(self._socket.fileno(), fcntl.F_GETFD)
    flags |= fcntl.FD_CLOEXEC
    fcntl.fcntl(self._socket.fileno(), fcntl.F_SETFD, flags)
    self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    self._socket.setblocking(0)
    self._socket.bind((address, port))
    self._socket.listen(128)
    self.io_loop.add_handler(self._socket.fileno(), self._handle_events,
                             self.io_loop.READ)

In addition to binding to given address and port, the code above sets the "close on exec" and "reuse address" flags. The former is useful in the case the application spawns new processes. In this case, we don't want them to keep the socket open. The latter is useful for avoiding the "Address already in use" error when restarting the server.

As you can see, the connection backlog is set to 128. This means that if 128 connection are waiting to be accepted, new connections will be rejected until the server has time to accept some of them. I suggest trying to increase this one when doing benchmarks, as it directly affects when the new connections are dropped.

The _handle_events() handler, registered above, accepts the new connection, creates the IOStream associated with the socket and starts a HTTPConnection class, which is responsible for the rest of the interaction with it:

def _handle_events(self, fd, events):
    while True:
        try:
            connection, address = self._socket.accept()
        except socket.error, e:
            if e[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
                return
            raise
        try:
            stream = iostream.IOStream(connection, io_loop=self.io_loop)
            HTTPConnection(stream, address, self.request_callback,
                           self.no_keep_alive, self.xheaders)
        except:
            logging.error("Error in connection callback", exc_info=True)

Note that this method accepts all the pending connections in a single iteration. It stays in the while True loop until EWOULDBLOCK is returned, which means that there are no more new connections pending to be accepted.

The HTTPConnection class starts parsing the HTTP headers right in its __init__() method:

def __init__(self, stream, address, request_callback, no_keep_alive=False,
             xheaders=False):
    self.stream = stream
    self.address = address
    self.request_callback = request_callback
    self.no_keep_alive = no_keep_alive
    self.xheaders = xheaders
    self._request = None
    self._request_finished = False
    self.stream.read_until("rnrn", self._on_headers)

If you're wondering what xheaders parameter means, see this comment:

If xheaders is True, we support the X-Real-Ip and X-Scheme headers,
which override the remote IP and HTTP scheme for all requests. These
headers are useful when running Tornado behind a reverse proxy or
load balancer.

The _on_headers() callback parses the headers and uses read_bytes() to read the content of the request, if present. The _on_request_body() callback parses the POST arguments and then calls the application callback:

def _on_headers(self, data):
    eol = data.find("rn")
    start_line = data[:eol]
    method, uri, version = start_line.split(" ")
    if not version.startswith("HTTP/"):
        raise Exception("Malformed HTTP version in HTTP Request-Line")
    headers = HTTPHeaders.parse(data[eol:])
    self._request = HTTPRequest(
        connection=self, method=method, uri=uri, version=version,
        headers=headers, remote_ip=self.address[0])

    content_length = headers.get("Content-Length")
    if content_length:
        content_length = int(content_length)
        if content_length > self.stream.max_buffer_size:
            raise Exception("Content-Length too long")
        if headers.get("Expect") == "100-continue":
            self.stream.write("HTTP/1.1 100 (Continue)rnrn")
        self.stream.read_bytes(content_length, self._on_request_body)
        return

    self.request_callback(self._request)

def _on_request_body(self, data):
    self._request.body = data
    content_type = self._request.headers.get("Content-Type", "")
    if self._request.method == "POST":
        if content_type.startswith("application/x-www-form-urlencoded"):
            arguments = cgi.parse_qs(self._request.body)
            for name, values in arguments.iteritems():
                values = [v for v in values if v]
                if values:
                    self._request.arguments.setdefault(name, []).extend(
                        values)
        elif content_type.startswith("multipart/form-data"):
            boundary = content_type[30:]
            if boundary: self._parse_mime_body(boundary, data)
    self.request_callback(self._request)

Writing the answer to the request is handled through the HTTPRequest class, which you can see instantiated in the _on_headers() method above. It just proxies the write to the stream object.

def write(self, chunk):
    assert self._request, "Request closed"
    self.stream.write(chunk, self._on_write_complete)

To be continued?

With this, I covered all the way from the bare sockets to the application layer. This should give you a pretty clear image of how Tornado works inside. All in all, I would say it was a pleasant code hike which I hope you enjoyed as well.

There are still large parts of the framework that remain unexplored, like wep.py, which is actually what your application is interacting with, or the template engine. If there is enough interest, I'll cover those parts as well. Encourage me by subscribing to my RSS feed.

Reference counting tutorial

If you’re doing multi-threaded programs in C or C++ you really should get a good handle on reference counting. The Linux coding style says it [btw, if you haven't read that, do it now. Go on, I'll wait.]:

Remember: if another thread can find your data structure, and you don’t have a reference count on it, you almost certainly have a bug.

The solution I will present is, if not a design pattern (sounds too buzzy), at least good practice. After discovering it, I changed a lot of my code to use it consistently, and found out that makes the code simpler and less prone to synchronizations bugs, memory leaks and slugs. Get it right, and you won’t miss the GC too much.

I said a C or C++ application because the languages with embedded Garbage Collectors solve this transparently for you. Not for free, though, but for (many) extra CPU cycles.

Sample Problem

Let’s take something simple:

In a C/C++ application, you have a global list [| collection | hashtable | tree] of objects that can be accessed by multiple threads. Each object has an unique identifier. Each thread can add a new object to the list or get an object to modify it or delete it.

Sounds basic, doesn’t it? Well, you can really shoot yourself in the foot with it. Special care must be taken when deleting objects, because other threads might currently work with that object or iterate through the global list.

Disclaimers

The method is Not Invented Here ™. I saw it in the Linux code, to name one, and is popular between experienced programmers. I haven’t seen many tutorials about it, though. BTW, this article might be useful for you if you’re trying to understand how reference counting is used in the Linux kernel.

The code snippets are in C with the pthread library, but better consider them as pseudo-code. They’re meant as starting points, not as ready for copy’n'paste. I have never tried them, so I would be surprised if they even compile cleanly.

Solution

Over time, I found this minimal API to be convenient:

obj_t* obj_lookup(int id);
obj_t* obj_lookup_or_create(int id, int *isnew);
void obj_kill(obj_t *obj);

The obj_lookup() function needs no introduction, it simply searches an object by id in the global list. If not found, returns NULL.

There is no explicit create function because we cannot have two objects with the same id, and the API reflects this. The obj_lookup_or_create() searches for an object with the specified id, and if not found, creates one and returns it. It will return NULL only in case of error while creating. The isnew output parameter is used to let the caller know if the object was created by this operation or not.

The obj_kill() function removes the object from the global list and releases it.

Since I mentioned multi-threaded, we need to think about locking. Having a single Big Lock (for short BFL ;-) ) that protects both the global lists and all the objects is simple. It also doesn’t leave room for many bugs, but all operations are serialized and performance drops. I consider this as not an option. [In fact, there's more than a performance issue: I think it's a petty that many GUI applications choose this method and we get unresponsive applications when one thread blocks for a long time with the BFL taken.]

It’s better to lock each object individually and have another lock for accessing the global list. Operations on different objects can be made in parallel, so we take advantage of all those cores from the new machines.

The buggy way

I would like to start with a naive implementation, to better identify the issue:

static list_t *_list;
static pthread_mutex_t *_mutex;


type def obj {
    int id;
    void *value;

    pthread_mutex_t mutex;
} obj_t;


obj_t* lookup_or_create(int id, int *isnew)
{
    obj_t *obj;

    pthread_mutex_lock(&_mutex);
    list_for_each(obj, list) {
        if (obj->id == id) {
            PMUTEX_UNLOCK(&_mutex);
            return obj;
        }
    }

    /* not found, create */
    obj = obj_new(id);
    list_add(list, obj);
    pthread_mutex_unlock(&_mutex)

    return obj;
}

void obj_kill(obj_t *obj)
{
    pthread_mutex_lock(&_mutex);
    list_del(list, obj);
    pthread_mutex_unlock(&_mutex);
}

The obj_lookup() is too similar to the obj_lookup_or_create() to be worth adding it. A possible usage (again, naive) would be:

int process_obj_with_id(int id)
{
    obj_t *obj;
    int isnew;

    obj = lookup_or_create(id, &isnew);
    if (!obj)
        return -1;

    pthread_mutex_lock(&obj->mutex);
    process_object(obj);
    if (should_be_dead(obj))
        obj_kill(obj);
    pthread_mutex_unlock(&obj->mutex);

    if (obj_is_dead(obj))
        free(obj);

    return 0;
}

This function can be called by multiple threads concurentely, that was the whole point. The process_object() function do the actual job (whatever that is). The should_be_dead() is introduced only to suggest that the user can decide to delete the object at any point.

Can you spot the bug in the above code?

It’s easy, consider the following situation: two threads call the process_object_with_id() at the same time with the same id. An object with that id already exists. Both of them execute the lookup and get pointers to the object. One of the threads takes the lock, the other waits. So far, so good. But now, the first thread decides to delete the object and frees its memory. Oops. The second thread now has an invalid pointer to work with and will probably segfault.

You can try to work around it by using properties specific to the application or by locking tricks, but I wouldn’t recommend it. All you’ll get is buggy code. If you recognize this pattern, you should use

The good way

The good way is to use reference counting. Whenever you get a pointer to an object, increment its reference count. Whenever the pointer gets out of scope or it’s removed, decrement it. When the reference count of an object gets to zero, free it. Reference counting can be efficiently implemented with atomic operations:

typedef struct obj {
    int id;
    void *value;

    pthread_mutex_t mutex;
    atomic_t ref;
    bool_t killed;
} obj_t;


inline void obj_refinc(obj_t *obj)
{
    assert(atomic_read(&obj->ref) > 0);
    atomic_inc(&obj->ref);
}

inline int obj_refdec(obj_t *obj)
{
    assert(obj->killed);
    if (atomic_dec_and_test(&obj->ref)) {
        free(obj);
        return TRUE;
    }
    return FALSE;
}

Things to note so far:

  • The assert from the obj_refinc() function (line 13) is a nice bug trap: you can’t increment the reference count of an object if you don’t already have a pointer to it. Thus, the reference count must be greater than zero already.
  • The ‘killed’ boolean for the obj structure is not always needed. It does, however, a good job against double deletion (see the obj_kill() function below), so I usually add it.
  • obj_refdec() returns TRUE if the object was deleted at this operations. This is also not strictly needed but it’s useful for bug trapping. If obj_refdec() returns TRUE, you know that accessing it from now on will cause problems.

The rest of the interesting functions can be implemented like this:

obj_t* lookup_or_create(int id, int *isnew)
{
    obj_t *obj;

    pthread_mutex_lock(&_mutex);
    list_for_each(obj, list) {
        if (!obj->killed && obj->id == id) {
            obj_refinc(obj);
            pthread_mutex_unlock(&_mutex);

            return obj;
        }
    }

    /* not found, create */
    obj = obj_new(id); /* sets reference count to 1 */

    list_add(list, obj);
    obj_refinc(obj);

    pthread_mutex_unlock(&_mutex)

    return obj;
}

void obj_kill(obj_t *obj)
{
    if (!obj->killed) {
        obj->killed = TRUE;

        pthread_mutex_lock(&_mutex);
        list_del(list, obj);
        pthread_mutex_unlock(&_mutex)

        if (obj_decref(obj)) {
            assert(0); /* BUG: the ref count got to zero to soon */
        }
    }
}

int process_obj_with_id(int id)
{
    obj_t *obj;
    int isnew;

    obj = lookup_or_create(id, &isnew);
    if (!obj)
        return -1;

    pthread_mutex_lock(&obj->mutex);
    if (!obj->killed) {
        process_object(obj);
        if (should_be_dead(obj))
            obj_kill(obj);
    }
    pthread_mutex_unlock(&obj->mutex);
    obj_refdec(obj);
    return 0;
}

Note that: * When creating an object (line 16), the reference count is set to 1. This is to reflect the initial pointer, as returned by malloc, that we have on the object. * When the object is added to the global lists, it’s reference count is incremented (line 19). * Every time a thread gets a pointer of the object from the global list, it increments its reference count (line 8), and every time the pointer gets out of scope, the reference count must be decremented (line 57). * obj_kill() checks the return code of the obj_refdec() (line 35). It is impossible to have a reference count of zero after this call, because there is at least one more pointer to the object (the *obj from process_obj_with_id, in this case). * Since after the call to lookup_or_create() (line 46) the reference count of obj is grater than one, the object will not be freed by another thread until we ref dec.. * The ‘killed’ flag, however, needs to be checked after getting the lock (line 51) because other thread might have set it before we got the lock. * After we are done with the object (usually when the pointer goes out of scope) we need to call obj_decref() (line 57).

Type safe hooks in C

I sometimes envy the C++ guys for having simple APIs for the signal-slot mechanism. While the signal and slots were designed by the people doing GUI toolkits — with the main credit going to the Qt project — they can be useful as a general inter-module communication mechanism in any software project.

Signals provide loose module coupling, which is crucial for good design. All you need to declare are the arguments of the signal, and then any module is free to generate them and any other module is free to receive them. If the module sending signals is freed, no problem, the receiver will simply not receive any more signals. If the receiver module is freed, again no problem, the signals will be simply ignored. It’s that simple.

Another property of the signals is that they are type safe, meaning that if the type of the arguments of a signal changes and you fail to update all receivers, you will get compile time errors instead of weird hard-to-find run-time bugs. The type unsafeness for callbacks is too often ignored by C programmers, and it generates a good share of bugs.

The reason I said I envy C++ programmers is that the mechanism can’t be elegantly implemented in C, because C lacks generics and function overloading. But fear not, if we know what we aim for, we can get pretty close to anything in C. Here is how I do it: a hook is a list of pointers to callback functions. The receivers register their callbacks by adding them to the list. When the hook is called, i.e. the signal is generated, it will call all the callbacks from its list. Pretty simple so far, but how do you provide type safeness? you ask. With wrappers and macros.

Lets suppose the hook structure looks something like:

typedef struct hook_cb {
    void (*f)(void);
    struct hook *next;
} hook_cb_t;

typedef struct hook {
    struct hook *hooks;
    pthread_rwlock_t *lock;
} hook_t;

It’s basically the list of callbacks that I was talking about, protected by a read-write lock. We then need methods for registering and unregistering the callbacks. Here are the prototypes, you have the pleasure of doing the single-linked list insertion and removal on your own (or cheat with whatever library).

/**
 * Adds f to the hook->hooks_cb list.
 */
int hook_register_cb(hook_t *hook, void(*f)(void));

/**
 * Removes f from the hook->hooks_cb.
 */
int hook_unregister_cb(hook_t *hook, void(*f)(void));

So far everything is type unsafe. The type of the callback was just randomly chosen to look like a function. But the macro comes into play:

#define hook_call(hook, type, ... ) do { 
        hook_cb_t __cb; 
        pthread_rwlock_rdlock(&(hook)->lock); 
        for (__cb = (hook)->hooks_cb; __cb; __cb = __cb->next) 
                ((type*)__cb->f)(__VA_ARGS__); 
        pthread_rwlock_unlock(&(hook)->lock); 
} while(0)

This hook_call macro receives as arguments a pointer to the hook structure, the type of the callback, and a variable number of arguments. It iterates through the callback list, casts the function pointers to the provided type and calls them with the macro arguments using the variadic macro for that. If the arguments don’t match the given function type, the compiler will report an error, which is all we ever wanted, actually.

For example, lets suppose we have a hook with two integer arguments. The callback type is:

typedef void(my_callback_t)(int, int, float);

Then, the hook can be called with something like:

hooks_call(hook, my_callback_t, 3, 4);

As the careful reader will notice, this only solves half of the problem. If the receivers register callbacks of the wrong type, it will be called anyway and cause troubles. This is why when declaring hooks, I also create wrappers for the register routines. Extending the above example:

inline static int my_hook_register( hook_t *hook, my_callback_t *f ) {
        return hook_register(hook, (void*)f) ;
}

inline static int my_hook_unregister( hook_t *hook, my_callback_t *f ) {
        return hooks_unregister( hook, (void*)f );
}

These 7 lines are the actual cost of type safeness. A fair price if you ask me. Especially since you can have callbacks with any number of arguments having any type. No need for casts in the callback, no need for packing structures, no need for documenting the type of the arguments. Things get simpler because the glue is centralized in the hook declaration.

Finally, a wrapper for the calling macro might also come handy to simplify the signal generators.

inline static void my_hook_call(hook_t *hook, int a, int b) {
        hook_call(hook, my_callback_t, a, b );
}

Useful? Found an error? Something not clear? Leave a comment.

Atomic Operations

“Where did atomic.h go?!?”

..was my surprised reaction when I compiled one of my applications in Debian Etch for the first time. It compiled with no problems on Sarge and on Gentoo, but couldn’t find the atomic.h header file on Etch. A bit confused, I asked my friend, and he didn’t seem to know at the first queries, so after I figured it out, I wrote this post.

First, to understand why was atomic.h removed, you should know the following: * The /usr/include/asm/atomic.h , as it is found on Debian Sarge is a kernel header, somehow cleaned up to compile well in user-space, but still a kernel header. * Including kernel headers in user-space is generally bad idea, unless you are using the kernel API (e.g. for an ioctl). * The atomic.h header, in particular, was not meant to be included in user-space. For example, if on a SMP machine you don’t compile with CONFIG_SMP, the operations will loose their atomicity. Even worse, on some architecture the atomic.h is completely broken in user-space because it’s working by disabling interrupts. Here is a LKML thread on the subject. * It’s Linux specific, other Unix-es might not have an equivalent.

Despite these things, many applications (e.g. mysql) used the atomic.h because of the lack of alternatives. There is no equivalent in glibc. Some framework libraries, like GLib or apr have their own implementation for atomic operations but linking against them just for that doesn’t always make sense. Simulating them with pthread spin locks is not much of an option either, as much of the performance is wasted. Finally, maintaining assembly versions in each application is out of the question.

The good news is that now there is a good and portable solution: gcc atomic builtins. Since they are provided by the compiler, who is our specialist in generating machine code, they are sure to be correct on all supported architectures and operating systems. In fact, it makes so much sense to me to have the atomic operations as a language extension that I’m surprised we had to wait until version 4.1 of gcc to see them implemented. The downsides are that (1) some old processor will not use them efficiently and (2) the API is a little cumbersome.

To get you going, here is an in-place replacement for the atomic.h header:

#ifndef _ATOMIC_H
#define _ATOMIC_H

/**
 * Atomic type.
 */

typedef struct {
    volatile int counter;
} atomic_t;

#define ATOMIC_INIT(i)  { (i) }

/**
 * Read atomic variable
 * @param v pointer of type atomic_t
 *
 * Atomically reads the value of @v.
 */
#define atomic_read(v) ((v)->counter)

/**
 * Set atomic variable
 * @param v pointer of type atomic_t
 * @param i required value
 */
#define atomic_set(v,i) (((v)->counter) = (i))

/**
 * Add to the atomic variable
 * @param i integer value to add
 * @param v pointer of type atomic_t
 */
static inline void atomic_add( int i, atomic_t *v )
{
         (void)__sync_add_and_fetch(&v->counter, i);
}

/**
 * Subtract the atomic variable
 * @param i integer value to subtract
 * @param v pointer of type atomic_t
 *
 * Atomically subtracts @i from @v.
 */
static inline void atomic_sub( int i, atomic_t *v )
{
        (void)__sync_sub_and_fetch(&v->counter, i);
}

/**
 * Subtract value from variable and test result
 * @param i integer value to subtract
 * @param v pointer of type atomic_t
 *
 * Atomically subtracts @i from @v and returns
 * true if the result is zero, or false for all
 * other cases.
 */
static inline int atomic_sub_and_test( int i, atomic_t *v )
{
        return !(__sync_sub_and_fetch(&v->counter, i));
}

/**
 * Increment atomic variable
 * @param v pointer of type atomic_t
 *
 * Atomically increments @v by 1.
 */
static inline void atomic_inc( atomic_t *v )
{
       (void)__sync_fetch_and_add(&v->counter, 1);
}

/**
 * @brief decrement atomic variable
 * @param v: pointer of type atomic_t
 *
 * Atomically decrements @v by 1.  Note that the guaranteed
 * useful range of an atomic_t is only 24 bits.
 */
static inline void atomic_dec( atomic_t *v )
{
       (void)__sync_fetch_and_sub(&v->counter, 1);
}

/**
 * @brief Decrement and test
 * @param v pointer of type atomic_t
 *
 * Atomically decrements @v by 1 and
 * returns true if the result is 0, or false for all other
 * cases.
 */
static inline int atomic_dec_and_test( atomic_t *v )
{
       return !(__sync_sub_and_fetch(&v->counter, 1));
}

/**
 * @brief Increment and test
 * @param v pointer of type atomic_t
 *
 * Atomically increments @v by 1
 * and returns true if the result is zero, or false for all
 * other cases.
 */
static inline int atomic_inc_and_test( atomic_t *v )
{
      return !(__sync_add_and_fetch(&v->counter, 1));
}

/**
 * @brief add and test if negative
 * @param v pointer of type atomic_t
 * @param i integer value to add
 *
 * Atomically adds @i to @v and returns true
 * if the result is negative, or false when
 * result is greater than or equal to zero.
 */
static inline int atomic_add_negative( int i, atomic_t *v )
{
       return (__sync_add_and_fetch(&v->counter, i) < 0);
}

#endif

Pretty straight forward isn't it? It could be even more powerful and simpler if you don't need precise compatibility with atomic.h. For example, atomic_add could easily return the result values:

static inline int atomic_add( int i, atomic_t *v )
{
         return __sync_add_and_fetch(&v->counter, i);
}

As a second example, consider a compare and swap operation, frequently used in lock-free algorithms. Once again, it's trivially:

/**
 * @brief compare and swap
 * @param v pointer of type atomic_t
 *
 * If the current value of @b v is @b oldval,
 * then write @b newval into @b v. Returns #TRUE if
 * the comparison is successful and @b newval was
 * written.
 */
static inline int atomic_cas( atomic_t *v, int oldval, int newval )
{
        return __sync_bool_compare_and_swap(&v->counter, oldval, newval);
}

Found this useful? Leave a comment.