My goal here is to have a walk through the lower layers of the Tornado
asynchronous web server. I take a bottom-up approach, starting with the
polling loop and going up to the application layer, pointing out the interesting
things I see on my way.
So if you plan on reading the source code of the Tornado web
framework, or you are just curious to see how an asynchronous web server works, I
would love to be your guide.
After reading this, you will:
- be able to write the server part of Comet applications, even if
to do it from scratch
- have a better understanding of the Tornado web framework, if you
plan do develop
- have a bit more informed opinion in the tornado-twisted debate
I'll start with a few words of introduction to the Tornado
project, in case you have no idea what it is and why you might be interested in it.
If you're already interested in it, just jump to the next section.
Tornado is a asynchronous http server and web
framework written in Python. It is the framework that powers the FriendFeed website, recently acquired by Facebook. FriendFeed has quite
a few users and many
real-time features, so performance and scalability must have high priorities there.
Since it is open source now (kudos to Facebook), we can all have a look inside it
to see how it works.
I also feel obliged to talk a bit on nonblocking IO or
asynchronous IO (AIO) .
If you already know what it is,
goto next_section. I'll try to
with a simple example.
Let's suppose you're writing an application that has to query another
for some data (the database for example, or some sort of remote API) and it's
known that this query can take a long time. Let's say, 5 seconds.
In many web frameworks the handler would look something like:
1 2 3
def handler_request(self, request): answ = self.remote_server.query(request) # this takes 5 seconds request.write_response(answ)
If you do this in a single thread, you will serve one client every 5
the five secs, all other have to wait, so you're serving clients with a whooping
rate of 0.2 requests per second. Awesome!
Of course, nobody is that naive, so most will use a multi-threaded
server to be able
to support more clients at once. Lets say you have 20 threads.
You improved performance 20 times, so the rate is now 4 request per
second. Still, way too small.
You can keep throwing threads at the problem, but threads are expensive in terms of
memory usage and scheduling.
I doubt you'll ever reach hundreds of requests per second this way.
With AIO, however, thousands of such requests per second are a
breeze. The handler
has to be changed to look something like this:
1 2 3 4 5
def handler_request(self, request): self.remote_server.query_async(request, self.response_received) def response_received(self, request, answ): # this is called 5 seconds later request.write(answ)
The idea is that we're not blocking while waiting for the answer to
we give the framework a callback function to call us when the answer has come.
In the mean time, we're free to serve other clients.
This is also the downside of AIO: the code will be a bit... well,
twisted. Also, if
you're in a single threaded AIO server like Tornado, you have to be careful never
to block, because all the pending requests will be delayed by that.
A great resource to learn more (than this over simplistic intro) about
is The C10K problem page.
The project is hosted at github.
You can get it, although you don't need it for reading this article, with:
git clone git://github.com/facebook/tornado.git
tornado subdirectory contains a
.py file for each of the modules
can easily identify them if you have a checkout of the repository.
In each source file, you will find at least one large doc string explaining the module,
and giving an example or two on how to use it.
Lets go directly into the core of the server and look at the
This module is the heart of the asynchronous mechanism. It keeps a list of the
open file descriptors and handlers for each. Its job is to select the ones that are
ready for reading or writing and call the associated handler.
To add a socket to the
IOLoop, the application calls the
1 2 3 4
def add_handler(self, fd, handler, events): """Registers the given handler to receive the given events for fd.""" self._handlers[fd] = handler self._impl.register(fd, events | self.ERROR)
_handlers dictionary maps the file descriptor with the function to
when the file descriptor is ready (handler in Tornado terminology).
Then, the descriptor is registered to the epoll list. Tornado cares about three
types of events:
ERROR. As you can see,
added in for you.
Now lets see the actual main loop, somehow weirdly placed in the
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
def start(self): """Starts the I/O loop. The loop will run until one of the I/O handlers calls stop(), which will make the loop stop after the current event iteration completes. """ self._running = True while True: [ ... ] if not self._running: break [ ... ] try: event_pairs = self._impl.poll(poll_timeout) except Exception, e: if e.args == (4, "Interrupted system call"): logging.warning("Interrupted system call", exc_info=1) continue else: raise # Pop one fd at a time from the set of pending fds and run # its handler. Since that handler may perform actions on # other file descriptors, there may be reentrant calls to # this IOLoop that update self._events self._events.update(event_pairs) while self._events: fd, events = self._events.popitem() try: self._handlers[fd](fd, events) except KeyboardInterrupt: raise except OSError, e: if e == errno.EPIPE: # Happens when the client closes the connection pass else: logging.error("Exception in I/O handler for fd %d", fd, exc_info=True) except: logging.error("Exception in I/O handler for fd %d", fd, exc_info=True)
poll() function returns a dictionary with
(fd: events) pairs,
event_pairs variable. The
"Interrupted system call" special
exception is needed
because the C library
poll() function can return
EINTR (which has
value of 4), when a signal comes before any events occurred.
See man poll for details.
The inner while loop takes the pairs from the
one by one
and calls the associated handler. The pipe error exception is silenced here. To keep
the generality of this class it would have been perhaps a better idea to catch this
in the http handlers, but it was probably easier like this.
The comment explains why the dictionary had to be parsed using
popitem() rather than
the more obvious:
for fd, events in self._events.items():
In a nutshell, the dictionary can be modified
during the loop, inside the handlers. See, for example, the
The method extracts the fd from the
_events dictionary, so that the
handler is not
called even if it was selected by the current poll iteration.
1 2 3 4 5 6 7 8
def remove_handler(self, fd): """Stop listening for events on fd.""" self._handlers.pop(fd, None) self._events.pop(fd, None) try: self._impl.unregister(fd) except OSError: logging.debug("Error deleting fd from IOLoop", exc_info=True)
The (pointless) loop termination trick
A nice trick is how the loop is stopped. The
self._running variable is
used to break from it
and it can be set to
False from the handlers by using the
Normally, that would just be the end of it, but
stop() method might be also called from a signal handler. If 1)
the loop is in
poll(), 2) no requests are coming to the server and 3) the signal is
not delivered to
the right thread by the OS, you would have to wait for the poll to
timeout. Considering how unlikely this is and that
0.2 seconds by default,
that's hardly a tragedy, really.
But anyway, to do it they use an anonymous pipe with one end in the set
of polled file descriptors. When terminating,
it writes something on the other end, effectively waking up the loop from poll.
Here is the selected code for it:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
def __init__(self, impl=None): [...] # Create a pipe that we send bogus data to when we want to wake # the I/O loop when it is idle r, w = os.pipe() self._set_nonblocking(r) self._set_nonblocking(w) self._waker_reader = os.fdopen(r, "r", 0) self._waker_writer = os.fdopen(w, "w", 0) self.add_handler(r, self._read_waker, self.WRITE) def _wake(self): try: self._waker_writer.write("x") except IOError: pass
In fact, it seems to be a bug in the above code: the read file
opened for reading, is registered with the
WRITE event, which cannot
occur. As I said
earlier, it hardly makes a difference so I'm
not surprised that they actually didn't noticed this is not working.
about this, but I got no answer so far.
Another nice feature of the
IOLoop module is the simple timers
implementation. A list of
timers is maintained sorted by expiration time, by using python's bisect
1 2 3 4 5
def add_timeout(self, deadline, callback): """Calls the given callback at the time deadline from the I/O loop.""" timeout = _Timeout(deadline, callback) bisect.insort(self._timeouts, timeout) return timeout
Inside the main loop, the callbacks from all the expired timers are
executed in that order, until the current time is reached. The poll timeout is
adjusted such as the next timer is not delayed if no new requests arrive.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
self._running = True while True: poll_timeout = 0.2 [ ... ] if self._timeouts: now = time.time() while self._timeouts and self._timeouts.deadline <= now: timeout = self._timeouts.pop(0) self._run_callback(timeout.callback) if self._timeouts: milliseconds = self._timeouts.deadline - now poll_timeout = min(milliseconds, poll_timeout) [ ... poll ]
Selecting the select method
Let's now have a quick look at the code that selects the poll/select
implementation. Python 2.6
has epoll support in the standard library, which is sniffed with
hasattr() on the
If on python \< 2.6, Tornado will try to use its on C epoll module. You can find its sources in the
tornado/epoll.c file. Finally, if that fails (epoll is specific to
Linux), it will fallback to
_EPoll classes are wrappers for
select.epoll API. Before doing your benchmarks, make
sure you use
has poor performance with large sets of file descriptors.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
# Choose a poll implementation. Use epoll if it is available, fall back to # select() for non-Linux platforms if hasattr(select, "epoll"): # Python 2.6+ on Linux _poll = select.epoll else: try: # Linux systems with our C module installed import epoll _poll = _EPoll except: # All other systems import sys if "linux" in sys.platform: logging.warning("epoll module not found; using select()") _poll = _Select
With this, we've covered most of the
IOLoop module. As advertised, it
is indeed a
nice and simple piece of code.
From sockets to streams
Let's have a look now at the
Its purpose is to provide a small level of abstraction over nonblocking sockets, by
offering three functions:
read_until(), which reads from the socket until it finds a given string. This is
convenient for reading the HTTP headers until the empty line delimiter.
read_bytes(), which reads a give number of bytes from the socket. This is convenient
for reading the body of the HTTP message.
write()which writes a given buffer to the socket and keeps retrying until the whole
buffer is sent.
All of them can call a callback when they are done, in asynchronous style.
write() implementation buffers the data provided by the caller
writes it whenever
IOLoop calls its handler, because the socket is
ready for writing:
1 2 3 4 5 6 7 8 9 10 11 12
def write(self, data, callback=None): """Write the given data to this stream. If callback is given, we call it when all of the buffered write data has been successfully written to the stream. If there was previously buffered write data and an old write callback, that callback is simply overwritten with this new callback. """ self._check_closed() self._write_buffer += data self._add_io_state(self.io_loop.WRITE) self._write_callback = callback
The function that handles the
WRITE event simply does
is hit or the buffer is finished:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
def _handle_write(self): while self._write_buffer: try: num_bytes = self.socket.send(self._write_buffer) self._write_buffer = self._write_buffer[num_bytes:] except socket.error, e: if e in (errno.EWOULDBLOCK, errno.EAGAIN): break else: logging.warning("Write error on %d: %s", self.socket.fileno(), e) self.close() return if not self._write_buffer and self._write_callback: callback = self._write_callback self._write_callback = None callback()
Reading does the reverse process. The read event handler keeps reading
is gathered in the read buffer. This means either it has the required length (if
read_bytes()) or it contains the requested delimiter (if
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
def _handle_read(self): try: chunk = self.socket.recv(self.read_chunk_size) except socket.error, e: if e in (errno.EWOULDBLOCK, errno.EAGAIN): return else: logging.warning("Read error on %d: %s", self.socket.fileno(), e) self.close() return if not chunk: self.close() return self._read_buffer += chunk if len(self._read_buffer) >= self.max_buffer_size: logging.error("Reached maximum read buffer size") self.close() return if self._read_bytes: if len(self._read_buffer) >= self._read_bytes: num_bytes = self._read_bytes callback = self._read_callback self._read_callback = None self._read_bytes = None callback(self._consume(num_bytes)) elif self._read_delimiter: loc = self._read_buffer.find(self._read_delimiter) if loc != -1: callback = self._read_callback delimiter_len = len(self._read_delimiter) self._read_callback = None self._read_delimiter = None callback(self._consume(loc + delimiter_len))
_consume() function, that is used above, makes sure that no more
that what was
requested is taken out of the stream, and subsequent reads will get the immediate
1 2 3 4
def _consume(self, loc): result = self._read_buffer[:loc] self._read_buffer = self._read_buffer[loc:] return result
Also worth noting in the
_handle_read() function above is the capping
read buffer at
self.max_buffer_size. The default value for it is
seems a bit large to me. For example, if an attacker makes just 100 connections
to the server and keeps pushing headers to it without the end headers delimiter,
Tornado will need 10 GB of RAM to serve the requests.
Even if the RAM is not a problem, the copying operations done with a buffer of this size (like in the
_consume() method above) will likely overload the server. Note also
searches the delimiter in the whole buffer on each iteration, so if the attacker sends
the huge data in small chunks, the server has to do a lot of searches. Bottom of line,
you might want to tune this parameter unless you really expect requests that big and you have
the hardware for it.
The HTTP server
Armed with the
IOStream modules, writing an
asynchronous HTTP server is just one step away, and that step is done in
HTTPServer class itself only does the accepting of the new
connections by adding
their sockets to the IOLoop. The listening socket itself is part of IOLoop, as seen in
1 2 3 4 5 6 7 8 9 10 11 12
def listen(self, port, address=""): assert not self._socket self._socket = socket.(socket.AF_INET, socket.SOCK_STREAM, 0) flags = fcntl.fcntl(self._socket.fileno(), fcntl.F_GETFD) flags |= fcntl.FD_CLOEXEC fcntl.fcntl(self._socket.fileno(), fcntl.F_SETFD, flags) self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._socket.setblocking(0) self._socket.bind((address, port)) self._socket.listen(128) self.io_loop.add_handler(self._socket.fileno(), self._handle_events, self.io_loop.READ)
In addition to binding to given address and port, the code above sets
the "close on exec"
and "reuse address" flags. The former is useful in the case the application spawns new processes. In
this case, we don't want them to keep the socket open. The latter is useful for avoiding the
"Address already in use" error when restarting the server.
As you can see, the connection backlog is set to 128. This means that if
128 connection are waiting to
be accepted, new connections will be rejected until the server has time to accept some of them.
I suggest trying to increase this one when doing benchmarks, as it directly affects when the new
connections are dropped.
_handle_events() handler, registered above, accepts the new
connection, creates the
associated with the socket and starts a
HTTPConnection class, which
is responsible for the
rest of the interaction with it:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
def _handle_events(self, fd, events): while True: try: connection, address = self._socket.accept() except socket.error, e: if e in (errno.EWOULDBLOCK, errno.EAGAIN): return raise try: stream = iostream.IOStream(connection, io_loop=self.io_loop) HTTPConnection(stream, address, self.request_callback, self.no_keep_alive, self.xheaders) except: logging.error("Error in connection callback", exc_info=True)
Note that this method accepts all the pending connections in a single
iteration. It stays in
while True loop until
EWOULDBLOCK is returned, which means that
there are no more
new connections pending to be accepted.
HTTPConnection class starts parsing the HTTP headers right in its
1 2 3 4 5 6 7 8 9 10
def __init__(self, stream, address, request_callback, no_keep_alive=False, xheaders=False): self.stream = stream self.address = address self.request_callback = request_callback self.no_keep_alive = no_keep_alive self.xheaders = xheaders self._request = None self._request_finished = False self.stream.read_until("\r\n\r\n", self._on_headers)
If you're wondering what
xheaders parameter means, see this comment:
If xheaders is True, we support the X-Real-Ip and X-Scheme headers, which override the remote IP and HTTP scheme for all requests. These headers are useful when running Tornado behind a reverse proxy or load balancer.
_on_headers() callback parses the headers and uses
to read the
content of the request, if present. The
arguments and then calls the application callback:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
def _on_headers(self, data): eol = data.find("\r\n") start_line = data[:eol] method, uri, version = start_line.split(" ") if not version.startswith("HTTP/"): raise Exception("Malformed HTTP version in HTTP Request-Line") headers = HTTPHeaders.parse(data[eol:]) self._request = HTTPRequest( connection=self, method=method, uri=uri, version=version, headers=headers, remote_ip=self.address) content_length = headers.get("Content-Length") if content_length: content_length = int(content_length) if content_length > self.stream.max_buffer_size: raise Exception("Content-Length too long") if headers.get("Expect") == "100-continue": self.stream.write("HTTP/1.1 100 (Continue)\r\n\r\n") self.stream.read_bytes(content_length, self._on_request_body) return self.request_callback(self._request) def _on_request_body(self, data): self._request.body = data content_type = self._request.headers.get("Content-Type", "") if self._request.method == "POST": if content_type.startswith("application/x-www-form-urlencoded"): arguments = cgi.parse_qs(self._request.body) for name, values in arguments.iteritems(): values = [v for v in values if v] if values: self._request.arguments.setdefault(name, ).extend( values) elif content_type.startswith("multipart/form-data"): boundary = content_type[30:] if boundary: self._parse_mime_body(boundary, data) self.request_callback(self._request)
Writing the answer to the request is handled through the
class, which you can
see instantiated in the
_on_headers() method above. It just proxies
the write to the
1 2 3
def write(self, chunk): assert self._request, "Request closed" self.stream.write(chunk, self._on_write_complete)
To be continued?
With this, I covered all the way from the bare sockets to the
This should give you a pretty clear image of how Tornado works inside. All in all,
I would say it was a pleasant code hike which I hope you enjoyed as well.
There are still large parts of the framework that remain unexplored,
which is actually what your application is interacting with, or the template engine.
If there is enough interest, I'll cover those parts as well. Encourage me by subscribing
to my RSS feed.