Tag Archives: C

Reference counting tutorial

If you’re doing multi-threaded programs in C or C++ you really should get a good handle on reference counting. The Linux coding style says it [btw, if you haven't read that, do it now. Go on, I'll wait.]:

Remember: if another thread can find your data structure, and you don’t have a reference count on it, you almost certainly have a bug.

The solution I will present is, if not a design pattern (sounds too buzzy), at least good practice. After discovering it, I changed a lot of my code to use it consistently, and found out that makes the code simpler and less prone to synchronizations bugs, memory leaks and slugs. Get it right, and you won’t miss the GC too much.

I said a C or C++ application because the languages with embedded Garbage Collectors solve this transparently for you. Not for free, though, but for (many) extra CPU cycles.

Sample Problem

Let’s take something simple:

In a C/C++ application, you have a global list [| collection | hashtable | tree] of objects that can be accessed by multiple threads. Each object has an unique identifier. Each thread can add a new object to the list or get an object to modify it or delete it.

Sounds basic, doesn’t it? Well, you can really shoot yourself in the foot with it. Special care must be taken when deleting objects, because other threads might currently work with that object or iterate through the global list.

Disclaimers

The method is Not Invented Here ™. I saw it in the Linux code, to name one, and is popular between experienced programmers. I haven’t seen many tutorials about it, though. BTW, this article might be useful for you if you’re trying to understand how reference counting is used in the Linux kernel.

The code snippets are in C with the pthread library, but better consider them as pseudo-code. They’re meant as starting points, not as ready for copy’n'paste. I have never tried them, so I would be surprised if they even compile cleanly.

Solution

Over time, I found this minimal API to be convenient:

obj_t* obj_lookup(int id);
obj_t* obj_lookup_or_create(int id, int *isnew);
void obj_kill(obj_t *obj);

The obj_lookup() function needs no introduction, it simply searches an object by id in the global list. If not found, returns NULL.

There is no explicit create function because we cannot have two objects with the same id, and the API reflects this. The obj_lookup_or_create() searches for an object with the specified id, and if not found, creates one and returns it. It will return NULL only in case of error while creating. The isnew output parameter is used to let the caller know if the object was created by this operation or not.

The obj_kill() function removes the object from the global list and releases it.

Since I mentioned multi-threaded, we need to think about locking. Having a single Big Lock (for short BFL ;-) ) that protects both the global lists and all the objects is simple. It also doesn’t leave room for many bugs, but all operations are serialized and performance drops. I consider this as not an option. [In fact, there's more than a performance issue: I think it's a petty that many GUI applications choose this method and we get unresponsive applications when one thread blocks for a long time with the BFL taken.]

It’s better to lock each object individually and have another lock for accessing the global list. Operations on different objects can be made in parallel, so we take advantage of all those cores from the new machines.

The buggy way

I would like to start with a naive implementation, to better identify the issue:

static list_t *_list;
static pthread_mutex_t *_mutex;


type def obj {
    int id;
    void *value;

    pthread_mutex_t mutex;
} obj_t;


obj_t* lookup_or_create(int id, int *isnew)
{
    obj_t *obj;

    pthread_mutex_lock(&_mutex);
    list_for_each(obj, list) {
        if (obj->id == id) {
            PMUTEX_UNLOCK(&_mutex);
            return obj;
        }
    }

    /* not found, create */
    obj = obj_new(id);
    list_add(list, obj);
    pthread_mutex_unlock(&_mutex)

    return obj;
}

void obj_kill(obj_t *obj)
{
    pthread_mutex_lock(&_mutex);
    list_del(list, obj);
    pthread_mutex_unlock(&_mutex);
}

The obj_lookup() is too similar to the obj_lookup_or_create() to be worth adding it. A possible usage (again, naive) would be:

int process_obj_with_id(int id)
{
    obj_t *obj;
    int isnew;

    obj = lookup_or_create(id, &isnew);
    if (!obj)
        return -1;

    pthread_mutex_lock(&obj->mutex);
    process_object(obj);
    if (should_be_dead(obj))
        obj_kill(obj);
    pthread_mutex_unlock(&obj->mutex);

    if (obj_is_dead(obj))
        free(obj);

    return 0;
}

This function can be called by multiple threads concurentely, that was the whole point. The process_object() function do the actual job (whatever that is). The should_be_dead() is introduced only to suggest that the user can decide to delete the object at any point.

Can you spot the bug in the above code?

It’s easy, consider the following situation: two threads call the process_object_with_id() at the same time with the same id. An object with that id already exists. Both of them execute the lookup and get pointers to the object. One of the threads takes the lock, the other waits. So far, so good. But now, the first thread decides to delete the object and frees its memory. Oops. The second thread now has an invalid pointer to work with and will probably segfault.

You can try to work around it by using properties specific to the application or by locking tricks, but I wouldn’t recommend it. All you’ll get is buggy code. If you recognize this pattern, you should use

The good way

The good way is to use reference counting. Whenever you get a pointer to an object, increment its reference count. Whenever the pointer gets out of scope or it’s removed, decrement it. When the reference count of an object gets to zero, free it. Reference counting can be efficiently implemented with atomic operations:

typedef struct obj {
    int id;
    void *value;

    pthread_mutex_t mutex;
    atomic_t ref;
    bool_t killed;
} obj_t;


inline void obj_refinc(obj_t *obj)
{
    assert(atomic_read(&obj->ref) > 0);
    atomic_inc(&obj->ref);
}

inline int obj_refdec(obj_t *obj)
{
    assert(obj->killed);
    if (atomic_dec_and_test(&obj->ref)) {
        free(obj);
        return TRUE;
    }
    return FALSE;
}

Things to note so far:

  • The assert from the obj_refinc() function (line 13) is a nice bug trap: you can’t increment the reference count of an object if you don’t already have a pointer to it. Thus, the reference count must be greater than zero already.
  • The ‘killed’ boolean for the obj structure is not always needed. It does, however, a good job against double deletion (see the obj_kill() function below), so I usually add it.
  • obj_refdec() returns TRUE if the object was deleted at this operations. This is also not strictly needed but it’s useful for bug trapping. If obj_refdec() returns TRUE, you know that accessing it from now on will cause problems.

The rest of the interesting functions can be implemented like this:

obj_t* lookup_or_create(int id, int *isnew)
{
    obj_t *obj;

    pthread_mutex_lock(&_mutex);
    list_for_each(obj, list) {
        if (!obj->killed && obj->id == id) {
            obj_refinc(obj);
            pthread_mutex_unlock(&_mutex);

            return obj;
        }
    }

    /* not found, create */
    obj = obj_new(id); /* sets reference count to 1 */

    list_add(list, obj);
    obj_refinc(obj);

    pthread_mutex_unlock(&_mutex)

    return obj;
}

void obj_kill(obj_t *obj)
{
    if (!obj->killed) {
        obj->killed = TRUE;

        pthread_mutex_lock(&_mutex);
        list_del(list, obj);
        pthread_mutex_unlock(&_mutex)

        if (obj_decref(obj)) {
            assert(0); /* BUG: the ref count got to zero to soon */
        }
    }
}

int process_obj_with_id(int id)
{
    obj_t *obj;
    int isnew;

    obj = lookup_or_create(id, &isnew);
    if (!obj)
        return -1;

    pthread_mutex_lock(&obj->mutex);
    if (!obj->killed) {
        process_object(obj);
        if (should_be_dead(obj))
            obj_kill(obj);
    }
    pthread_mutex_unlock(&obj->mutex);
    obj_refdec(obj);
    return 0;
}

Note that: * When creating an object (line 16), the reference count is set to 1. This is to reflect the initial pointer, as returned by malloc, that we have on the object. * When the object is added to the global lists, it’s reference count is incremented (line 19). * Every time a thread gets a pointer of the object from the global list, it increments its reference count (line 8), and every time the pointer gets out of scope, the reference count must be decremented (line 57). * obj_kill() checks the return code of the obj_refdec() (line 35). It is impossible to have a reference count of zero after this call, because there is at least one more pointer to the object (the *obj from process_obj_with_id, in this case). * Since after the call to lookup_or_create() (line 46) the reference count of obj is grater than one, the object will not be freed by another thread until we ref dec.. * The ‘killed’ flag, however, needs to be checked after getting the lock (line 51) because other thread might have set it before we got the lock. * After we are done with the object (usually when the pointer goes out of scope) we need to call obj_decref() (line 57).

Type safe hooks in C

I sometimes envy the C++ guys for having simple APIs for the signal-slot mechanism. While the signal and slots were designed by the people doing GUI toolkits — with the main credit going to the Qt project — they can be useful as a general inter-module communication mechanism in any software project.

Signals provide loose module coupling, which is crucial for good design. All you need to declare are the arguments of the signal, and then any module is free to generate them and any other module is free to receive them. If the module sending signals is freed, no problem, the receiver will simply not receive any more signals. If the receiver module is freed, again no problem, the signals will be simply ignored. It’s that simple.

Another property of the signals is that they are type safe, meaning that if the type of the arguments of a signal changes and you fail to update all receivers, you will get compile time errors instead of weird hard-to-find run-time bugs. The type unsafeness for callbacks is too often ignored by C programmers, and it generates a good share of bugs.

The reason I said I envy C++ programmers is that the mechanism can’t be elegantly implemented in C, because C lacks generics and function overloading. But fear not, if we know what we aim for, we can get pretty close to anything in C. Here is how I do it: a hook is a list of pointers to callback functions. The receivers register their callbacks by adding them to the list. When the hook is called, i.e. the signal is generated, it will call all the callbacks from its list. Pretty simple so far, but how do you provide type safeness? you ask. With wrappers and macros.

Lets suppose the hook structure looks something like:

typedef struct hook_cb {
    void (*f)(void);
    struct hook *next;
} hook_cb_t;

typedef struct hook {
    struct hook *hooks;
    pthread_rwlock_t *lock;
} hook_t;

It’s basically the list of callbacks that I was talking about, protected by a read-write lock. We then need methods for registering and unregistering the callbacks. Here are the prototypes, you have the pleasure of doing the single-linked list insertion and removal on your own (or cheat with whatever library).

/**
 * Adds f to the hook->hooks_cb list.
 */
int hook_register_cb(hook_t *hook, void(*f)(void));

/**
 * Removes f from the hook->hooks_cb.
 */
int hook_unregister_cb(hook_t *hook, void(*f)(void));

So far everything is type unsafe. The type of the callback was just randomly chosen to look like a function. But the macro comes into play:

#define hook_call(hook, type, ... ) do { 
        hook_cb_t __cb; 
        pthread_rwlock_rdlock(&(hook)->lock); 
        for (__cb = (hook)->hooks_cb; __cb; __cb = __cb->next) 
                ((type*)__cb->f)(__VA_ARGS__); 
        pthread_rwlock_unlock(&(hook)->lock); 
} while(0)

This hook_call macro receives as arguments a pointer to the hook structure, the type of the callback, and a variable number of arguments. It iterates through the callback list, casts the function pointers to the provided type and calls them with the macro arguments using the variadic macro for that. If the arguments don’t match the given function type, the compiler will report an error, which is all we ever wanted, actually.

For example, lets suppose we have a hook with two integer arguments. The callback type is:

typedef void(my_callback_t)(int, int, float);

Then, the hook can be called with something like:

hooks_call(hook, my_callback_t, 3, 4);

As the careful reader will notice, this only solves half of the problem. If the receivers register callbacks of the wrong type, it will be called anyway and cause troubles. This is why when declaring hooks, I also create wrappers for the register routines. Extending the above example:

inline static int my_hook_register( hook_t *hook, my_callback_t *f ) {
        return hook_register(hook, (void*)f) ;
}

inline static int my_hook_unregister( hook_t *hook, my_callback_t *f ) {
        return hooks_unregister( hook, (void*)f );
}

These 7 lines are the actual cost of type safeness. A fair price if you ask me. Especially since you can have callbacks with any number of arguments having any type. No need for casts in the callback, no need for packing structures, no need for documenting the type of the arguments. Things get simpler because the glue is centralized in the hook declaration.

Finally, a wrapper for the calling macro might also come handy to simplify the signal generators.

inline static void my_hook_call(hook_t *hook, int a, int b) {
        hook_call(hook, my_callback_t, a, b );
}

Useful? Found an error? Something not clear? Leave a comment.

Atomic Operations

“Where did atomic.h go?!?”

..was my surprised reaction when I compiled one of my applications in Debian Etch for the first time. It compiled with no problems on Sarge and on Gentoo, but couldn’t find the atomic.h header file on Etch. A bit confused, I asked my friend, and he didn’t seem to know at the first queries, so after I figured it out, I wrote this post.

First, to understand why was atomic.h removed, you should know the following: * The /usr/include/asm/atomic.h , as it is found on Debian Sarge is a kernel header, somehow cleaned up to compile well in user-space, but still a kernel header. * Including kernel headers in user-space is generally bad idea, unless you are using the kernel API (e.g. for an ioctl). * The atomic.h header, in particular, was not meant to be included in user-space. For example, if on a SMP machine you don’t compile with CONFIG_SMP, the operations will loose their atomicity. Even worse, on some architecture the atomic.h is completely broken in user-space because it’s working by disabling interrupts. Here is a LKML thread on the subject. * It’s Linux specific, other Unix-es might not have an equivalent.

Despite these things, many applications (e.g. mysql) used the atomic.h because of the lack of alternatives. There is no equivalent in glibc. Some framework libraries, like GLib or apr have their own implementation for atomic operations but linking against them just for that doesn’t always make sense. Simulating them with pthread spin locks is not much of an option either, as much of the performance is wasted. Finally, maintaining assembly versions in each application is out of the question.

The good news is that now there is a good and portable solution: gcc atomic builtins. Since they are provided by the compiler, who is our specialist in generating machine code, they are sure to be correct on all supported architectures and operating systems. In fact, it makes so much sense to me to have the atomic operations as a language extension that I’m surprised we had to wait until version 4.1 of gcc to see them implemented. The downsides are that (1) some old processor will not use them efficiently and (2) the API is a little cumbersome.

To get you going, here is an in-place replacement for the atomic.h header:

#ifndef _ATOMIC_H
#define _ATOMIC_H

/**
 * Atomic type.
 */

typedef struct {
    volatile int counter;
} atomic_t;

#define ATOMIC_INIT(i)  { (i) }

/**
 * Read atomic variable
 * @param v pointer of type atomic_t
 *
 * Atomically reads the value of @v.
 */
#define atomic_read(v) ((v)->counter)

/**
 * Set atomic variable
 * @param v pointer of type atomic_t
 * @param i required value
 */
#define atomic_set(v,i) (((v)->counter) = (i))

/**
 * Add to the atomic variable
 * @param i integer value to add
 * @param v pointer of type atomic_t
 */
static inline void atomic_add( int i, atomic_t *v )
{
         (void)__sync_add_and_fetch(&v->counter, i);
}

/**
 * Subtract the atomic variable
 * @param i integer value to subtract
 * @param v pointer of type atomic_t
 *
 * Atomically subtracts @i from @v.
 */
static inline void atomic_sub( int i, atomic_t *v )
{
        (void)__sync_sub_and_fetch(&v->counter, i);
}

/**
 * Subtract value from variable and test result
 * @param i integer value to subtract
 * @param v pointer of type atomic_t
 *
 * Atomically subtracts @i from @v and returns
 * true if the result is zero, or false for all
 * other cases.
 */
static inline int atomic_sub_and_test( int i, atomic_t *v )
{
        return !(__sync_sub_and_fetch(&v->counter, i));
}

/**
 * Increment atomic variable
 * @param v pointer of type atomic_t
 *
 * Atomically increments @v by 1.
 */
static inline void atomic_inc( atomic_t *v )
{
       (void)__sync_fetch_and_add(&v->counter, 1);
}

/**
 * @brief decrement atomic variable
 * @param v: pointer of type atomic_t
 *
 * Atomically decrements @v by 1.  Note that the guaranteed
 * useful range of an atomic_t is only 24 bits.
 */
static inline void atomic_dec( atomic_t *v )
{
       (void)__sync_fetch_and_sub(&v->counter, 1);
}

/**
 * @brief Decrement and test
 * @param v pointer of type atomic_t
 *
 * Atomically decrements @v by 1 and
 * returns true if the result is 0, or false for all other
 * cases.
 */
static inline int atomic_dec_and_test( atomic_t *v )
{
       return !(__sync_sub_and_fetch(&v->counter, 1));
}

/**
 * @brief Increment and test
 * @param v pointer of type atomic_t
 *
 * Atomically increments @v by 1
 * and returns true if the result is zero, or false for all
 * other cases.
 */
static inline int atomic_inc_and_test( atomic_t *v )
{
      return !(__sync_add_and_fetch(&v->counter, 1));
}

/**
 * @brief add and test if negative
 * @param v pointer of type atomic_t
 * @param i integer value to add
 *
 * Atomically adds @i to @v and returns true
 * if the result is negative, or false when
 * result is greater than or equal to zero.
 */
static inline int atomic_add_negative( int i, atomic_t *v )
{
       return (__sync_add_and_fetch(&v->counter, i) < 0);
}

#endif

Pretty straight forward isn't it? It could be even more powerful and simpler if you don't need precise compatibility with atomic.h. For example, atomic_add could easily return the result values:

static inline int atomic_add( int i, atomic_t *v )
{
         return __sync_add_and_fetch(&v->counter, i);
}

As a second example, consider a compare and swap operation, frequently used in lock-free algorithms. Once again, it's trivially:

/**
 * @brief compare and swap
 * @param v pointer of type atomic_t
 *
 * If the current value of @b v is @b oldval,
 * then write @b newval into @b v. Returns #TRUE if
 * the comparison is successful and @b newval was
 * written.
 */
static inline int atomic_cas( atomic_t *v, int oldval, int newval )
{
        return __sync_bool_compare_and_swap(&v->counter, oldval, newval);
}

Found this useful? Leave a comment.