dablooms - an open source, scalable, counting bloom filter library
Overview
This project aims to demonstrate a novel bloom filter implementation that can scale, and provide not only the addition of new members, but reliable removal of existing members.
Motivation
bitly has billions and billions of links and an infrastructure that serves
thousands of decode requests per second, i.e. redirect the user from bit.ly/short
to the long.destination.url.com/with/path. Users not only expect this redirect
to be fast, but they also trust bitly to not redirect them to a malicious website.
Unfortunately, a subset of destination urls are malicious, and we identify and
mark them as spam via automated, realtime analysis. This spam dataset is constantly
changing with an increasing rate of additions and a small number of deletions. In
order to maintain performance and provide a warning to users who might be going to a
spam website, we have to keep the content of the spam dataset in resident memory
for each decode request. This is complicated by the fact that the system is designed to
support both domain and path roll up, meaning we can block an entire domain, subdomain,
or top-level path (or any combination thereof) thus marking all children of that parent as
spam. Previously, we had implemented this as a simple hashtable with existence in the table
equating to membership in the spam dataset. This approach was simple and performant, but
not very memory efficient. On the occasions when resident memory was cleared, say on a
restart, it was necessary to reconstruct the dataset from the database. This made restarts
painfully long during the dataset reconstruction, and the hashtable took up considerable
amounts of memory, 2.5 gigabytes to be exact.
We began exploring bloom filters as a possible solution to our spam problems. Despite the many bloom filter variations out there - counting, compact, inverse, scalable, and bloomier - none of them met our criteria entirely. We needed the ability to scale and persist reliably while allowing for the removal of elements. It appeared that only a mixture of scalable and counting (elaborated on below) would solve our problems.
Bloom Filter Basics
Bloom filters are probabilistic data structures that provide
space-efficient storage of elements at the cost of occasional false positives on
membership queries, i.e. a bloom filter may state true on query when it in fact does
not contain said element. A bloom filter is traditionally implemented as an array of
M bits, where M is the size of the bloom filter. On initialization all bits are
set to zero. A filter is also parameterized by a constant k that defines the number
of hash functions used to set and test bits in the filter. Each hash function should
output one index in M. When inserting an element x into the filter, the bits
in the k indices h1(x), h2(x), ..., hk(X) are set.
In order to query a bloom filter, say for element x, it suffices to verify if
all bits in indices h1(x), h2(x), ..., hk(x) are set. If one or more of these
bits is not set then the queried element is definitely not present in the
filter. However, if all these bits are set, then the element is considered to
be in the filter. Given this procedure, an error probability exists for positive
matches, since the tested indices might have been set by the insertion of other
elements.
Counting Bloom Filters: Solving Removals
The same property that results in false positives also makes it difficult to remove an element from the filter as there is no easy means of discerning if another element is hashed to the same bit. Unsetting a bit that is hashed by multiple elements can cause false negatives. Using a counter, instead of a bit, can circumvent this issue. The bit can be incremented when an element is hashed to a given location, and decremented upon removal. Membership queries rely on whether a given counter is greater than zero. This reduces the exceptional space-efficiency provided by the standard bloom filter.
Scalable Bloom Filters: Solving Scale
Another important property of a bloom filter is its linear relationship between size and storage capacity. If given a maximum acceptable false positive ratio, it is straightforward to determine how much space is needed.
If the maximum allowable error probability and the number of elements to store are both known, it is relatively straightforward to dimension an appropriate filter. However, it is not always possible to know how many elements will need to be stored a priori. There is a trade off between over-dimensioning filters or suffering from a ballooning error probability as it fills.
Almeida, Baquero, Preguiça, Hutchison published a paper in 2006, on Scalable Bloom Filters, which suggested a means of scalable bloom filters by creating essentially a list of bloom filters that act as one large bloom filter. When greater capacity is desired, a new filter is added to the list.
Membership queries are conducted on each filter with the positives evaluated if the element is found in any one of the filters. Naively, this leads to an increasing compounding error probability since the probability of the given structure evaluates to:
1 - 𝚺(1 - P)
It is possible to bound this error probability by adding a reducing tightening
ratio, r. As a result, the bounded error probability is represented as:
1 - 𝚺(1 - P0 * r^i) where r is chosen as 0 < r < 1
Since size is simply a function of an error probability and capacity, any
array of growth functions can be applied to scale the size of the bloom filter
as necessary. We found it sufficient to pick .9 for r.
Problems with Mixing Scalable and Counting Bloom Filters
Scalable bloom filters do not allow for the removal of elements from the filter. In addition, simply converting each bloom filter in a scalable bloom filter into a counting filter also poses problems. Since an element can be in any filter, and bloom filters inherently allow for false positives, a given element may appear to be in two or more filters. If an element is inadvertently removed from a filter which did not contain it, it would introduce the possibility of false negatives.
If however, an element can be removed from the correct filter, it maintains the integrity of said filter, i.e. prevents the possibility of false negatives. Thus, a scaling, counting, bloom filter is possible if upon additions and deletions one can correctly decide which bloom filter contains the element.
There are several advantages to using a bloom filters. A bloom filter gives us cheap, memory efficient set operations, with no actual data stored about the given element. Rather, bloom filters allow us to test, with some given error probability, the membership of an item. This leads to the conclusion that the majority of operations performed on bloom filters are the queries of membership, rather than the addition and removal of elements. Thus, for a scaling, counting, bloom filter, we can optimize for membership queries at the expense of additions and removals. This expense comes not in performance, but in the addition of more metadata concerning an element and its relation to the bloom filter. With the addition of some sort of identification of an element, which does not need to be unique as long as it is fairly distributed, it is possible to correctly determine which filter an element belongs to, thereby, Hence, maintaining the integrity of a given bloom filter with accurate additions and removals.
Enter dablooms
dablooms is one such implementation of a scaling, counting, bloom filter that takes additional metadata during additions and deletions in the form of a monotonically increasing integer to classify elements such as a timestamp. This is used during additions/removals to easily classify an element into the correct bloom filter (essentially a comparison against a range).
dablooms is designed to scale itself using these monotonically increasing identifiers and the given capacity. When a bloom filter is at capacity, dablooms will create a new bloom filter using the to-be-added elements identifier as the beginning identifier for the new bloom filter. Given the fact that the identifiers monotonically increase, new elements will be added to the newest bloom filter. Note, in theory and as implemented, nothing prevents one from adding an element to any “older” filter. You just run the increasing risk of the error probability growing beyond the bound as it becomes “overfilled”.
You can then remove any element from any bloom filter using the identifier to intelligently pick which bloom filter to remove from. Consequently, as you continue to remove elements from bloom filters that you are not continuing to add to, these bloom filters will become more accurate.
For performance, the low-level operations are implemented in C. It is also memory mapped which provides async flushing and persistence at low cost. In an effort to maintain memory efficiency, rather than using integers, or even whole bytes as counters, we use only four bit counters. These four bit counters allow up to 15 items to share a counter in the map. If more than a small handful are sharing said counter, the bloom filter would be overloaded (resulting in excessive false positives anyway) at any sane error rate, so there is no benefit in supporting larger counters.
The bloom filter also employees change sequence numbers to track operations performed on bloom filter. These allow us to determine failed writes, leaving us with a ‘dirty’ filter where an element is only partially written. Upon restart, this information allows us to determine if a filter is clean and thus can just load it into memory locally rather than having to rebuild the filter. The counters also provide a means for us to identify a position at which the bloom filter is valid in order to replay operations to “catch up”.
EOL
For our spam dataset at scale, initial tests have shown a significant reduction - 98.8% in resident memory usage on each machine using dablooms while maintaining a rigid performance threshold. For our use of an identifier, we simply use timestamps for when the spam link was added and query our database on removals for the insertion timestamp.
We have also decided to open source this project, so fork it and submit patches at the github repo.
If this sounds like a fun project to use or to work on, bitly is hiring, including interns!
Debugging a Specialized Database Cache
This blog post is a tale of interesting patterns in system and application metrics, optimizing memory use, and debugging. Before we get into that stuff, it helps to be familiar with the application at the center of it.
Introduction to the Network view
A significant new feature of Bitly’s recent consumer relaunch is the ability to browse links shortened by all your connections on Facebook and Twitter, on a page titled “Your Network”. Internally we call this service “network history”. Looking up all your connections and all their links everytime you view the “Network” page is, due to Bitly’s scale, a non-trivial problem. Instead we structured the system to do the hard work at insertion time, to make the read side performant. In a minute, we’ll dig into how exactly we tackled that problem.
We use a key-value store (leveldb via simpleleveldb) where the key is the username and the value is the list of bitly links created by their connections. Such an arrangement is very efficient in terms of storage, memory, and cpu usage. However, based on the number of connections a user has, the number of updates we have to perform could be significant (thousands) per new Bitly link. We needed a way to gain control over the number of writes to our persistent datastore (and thus disk seeks) and provide the performance needed to handle many thousands of inserts per second - enter nhproxy.
nhproxy mechanics
nhproxy, the caching proxy to the network history database, maintains in-memory:
- a hash-table of all users for which it is caching network history
- a hash-table of all links in users’ network history
- attached to each user, a list of pointers to links, which is the network history
The actual link metadata is stored in a separate global hash-table so that it is not duplicated when multiple users have the same link in their history. Each user only stores a pointer into the global data structure.
nhproxy serves two main request types:
get requests specify a username to retrieve the full Network History for. If the user’s network history is already in memory it is quickly returned, otherwise it is fetched from the database and loaded into memory and then returned.
add requests specify a bitly link and a list of all the users for which the link will appear (all connections of the creator of the link). All users who aren’t already loaded into memory are fetched from the database and loaded into memory, and then a reference to the link is added to all their network history lists.
There are simple strategies for:
- “pruning” old network history list elements when the list grows beyond a set maximum
- saving a user’s changed network history to the database after a while
- discarding the user’s record and history from memory to keep the total number of users in the cache below another set maximum.
It’s interesting to note that the longer one delays saving a user’s changed network history to the persistent datastore, the more changes it accumulates per save, and the lower the overall rate of writes to the store. One can control that rate of writes by adjusting the delay. Don’t worry about what exactly these strategies are, as I’ll be focussing on other aspects of nhproxy in this article.
nhproxy memory load
In the initial deployed state, the memory usage of a box running both nhproxy and the database looked like this:

The reason it fell each night before exhausting all memory is because we cleanly terminated and restarted nhproxy with a cron job. nhproxy saves and reloads everything it’s caching when it restarts, so no un-flushed changes are lost. Still, we would much rather not need to have it regularly restarted. Also, notice that the data in nhproxy just before exiting and just after starting again is exactly the same (besides change timestamps the flushing logic uses), but uses up very different amounts of physical memory.
There had already been some testing and analysis with valgrind; it wasn’t a simple memory leak in nhproxy. One theory was that the problem was fragmentation: the multitude of small memory regions allocated and deallocated left awkwardly sized regions free that were too small to be reused. We considered trying an alternative allocator like jemalloc which has fancier strategies for dealing with fragmentation, but decided to try some less drastic measures first.
After a bit of thought I realized that many of the small regions we allocated were dominated by the overhead of malloc itself - the extra space from rounding the allocated region up to a well-aligned size, and the metadata of the region size (typically prepended to the region). This most likely wouldn’t fix the unbounded memory growth problem, but I couldn’t resist first going after some low-hanging fruit to make overall memory use and fragmentation significantly better in the short term.
Initially, the data in memory was represented by structs that looked something like this:
struct user {
char *username;
struct linked_list *history;
// plus a few miscellaneous flags and counters
};
struct linked_list {
struct linked_list *next;
struct linked_list *prev;
struct bitly_link *link;
};
struct bitly_link {
char *username;
char *hash;
int ref_count;
// plus a few miscellaneous flags and counters
};
That’s a simplification, but what’s accurate is that there was a separate allocation for each username, each bitly link hash, and each link reference in a history list. That is quite a lot of small allocations, with a lot of overhead. For example, each bitly_link hash is (currently) 6 characters, so we malloc()ed 7 bytes to store the string and null terminator, which malloc() rounded up to 8 bytes, and then prepended with the length in another 8 bytes, to end up as a 16 byte allocation. Also, I suspected that that the variable size of the username allocation was a significant cause of fragmentation.
So I refactored the code to use structures like this:
struct user {
char username[MAX_USERNAME_LEN+1];
struct bitly_link *history[MAX_HISTORY_LEN];
int history_count;
// plus a few miscellaneous flags and counters
};
struct bitly_link {
char username[MAX_USERNAME_LEN+1];
char hash[MAX_HASH_LEN+1];
int ref_count;
// plus a few miscellaneous flags and counters
};
Notice that, with the (minimum) 8-byte malloc() overhead, on 64-bit systems such as we use, a history list any size over 1/4 the maximum uses less space in the new design: an array of 1000 8-byte pointers in the new design, vs 4*8 bytes per link in the old design. In practice, most users in the nhproxy cache have full history lists (maintained at that length by pruning when they grow longer).
Satisfyingly, cpu usage was much improved, because all those extra malloc() and free() calls in the original design were significant. Here’s what it looked like before:

and here’s what it looked like after:

(that’s 100% per core)
But what we really cared about was memory use:

That’s much lower, but still grew until exhausting all memory, and it only took 3 times longer to do so. Until the real problem was solved, at least it was an improvement, right?
nhproxy history list bug
Unfortunately, we couldn’t roll it out to production yet, because we noticed this other graph:

We had a fair number of what we thought were pretty good tests, and the new version of nhproxy passed them all… but why on earth did the number of bitly links in the cache, and the number of references to them in history lists, gravitate to 2/3 of what they used to be? Fortunately, having solid instrumentation into the runtime behavior of the application enabled us to spot this discrepancy. We had theories about bugs in the history pruning strategy, but couldn’t find anything, so we had to write a tool to compare the state of the original system still running in production, and the new version we were testing and staging, which was getting the same add requests, but not servicing any of the get requests.
We couldn’t compare just what was cached in the nhproxy, because we had actually changed the user flush and drop strategy, and because even if the strategies were the same, the lack of get requests on the new system being staged would have made the list of users in nhproxy different. So I wrote a tool that took a list of user names, made get requests to both systems, compared the results, and output differences, with various options. We were hoping to see some pattern in which links were lost from which network history lists - maybe relatively old or new links? We looked in request logs to get some users with very new changes, or hour-old changes, or older, and fed these usernames to the tool.
Here’s some sample output:
login <user> common= 998 0_only= 0 1_only= 0 difference= 0%
login <user> common= 783 0_only= 0 1_only= 0 difference= 0%
...
login <user> common= 301 0_only= 315 1_only= 287 difference= 50%
login <user> common=1000 0_only= 0 1_only= 0 difference= 0%
Total successes=192 failures=3
We didn’t notice any clear patterns in the users, links, or ages. But we did notice that the loss wasn’t evenly distributed - some users matched completely between the systems, and others were over 50% different. For a few of the very different users, we manually queried and compared the caches and the backend databases, trying to find a clue. Finally, Jehiah noticed that the backend database was missing many list items for a user that the cache had, even after the user was flushed. We had no test that caused a user’s list to be flushed, then dropped, then re-fetched, so I added one. Soon after, snakes spotted the extra “i++” in the flush routine, iterating over the history list, introduced when a while-loop that already had an “i++” was changed to a for-loop in the memory-saving struct refactor - we were skipping over every other link and not writing it to the persistent datastore. It’s funny how much time such a little bug can take to track down, but our tests and tools are better for it.
libevent memory leak
At that point, nhproxy was much more efficient, and at least as correct as the original version, but it still had the unbounded memory use which becomes problematic after only 3 and a half days or so. When I wrote above that valgrind reported no memory leaks, I wasn’t being completely honest - it reported no memory leaks if one was careful to only use HTTP/1.0 requests without keep-alive, or HTTP/1.1 requests with Connection: close, when exercising or querying the nhproxy. There was a known leak with keep-alive requests, but it wasn’t attacked earlier because:
- while our internal services do often hold persistent connections, they rarely close them, and we didn’t think we experienced this effect to a degree significant enough to devote resources to solving it
- libevent is a sensitive and core part of our infrastructure, so we didn’t want to migrate to libevent-2.0.x right at that moment, nor did we want to maintain our own branch of libevent-1.4.x
Simultaneously, we were aware of a problem related to how heap memory was freed back to the
OS, which we had already seen in other applications. Memory use would grow due to a large
number of small allocations (using the glibc provided malloc()) caused by a temporary
high load, but when the load fell, the memory use wouldn’t significantly fall back. Even if
valgrind reported no real leaks, and even if load fell back to effectively zero, the OS
could still report that the process was using a huge amount of private memory (RSS). We
confirmed through some tests that a single tiny allocation on the top of the heap could
prevent many gigabytes of contiguous free space from being released back to the OS, and
when that top allocation was free()d, the rest would finally go.
Between the occasional tiny leak and the significant fragmentation, this high-water-mark effect could be a real problem for nhproxy. jemalloc would probably help, but at this point I really wanted to get rid of the leak. I wanted the output of memory use analysis tools like valgrind to be easier to use, and I didn’t want to wonder how much of an effect the leak was really having.
I started working with the original libevent-1.4.14b source, building and installing the shared library in a custom local path, and running nhproxy with it using LD_LIBRARY_PATH.
libevent uses a lot of callbacks of function pointers, and evhttp abstracts outgoing and
incoming http connections to share many functions between them, so I started by putting
in some useful printf()s to familiarize myself with the flow of the code in a simple
minimal use. I used curl with and without the --http1.0 argument to exercise nhproxy.
Valgrind reports that the root of the leak is a struct evhttp_request allocated in
evhttp_associate_new_request_with_connection(), which makes sense because that’s
where all evhttp_requests are allocated. I eventually traced the exceptional case
through
evhttp_associate_new_request_with_connection()evhttp_request_new()evhttp_start_read()evhttp_read()(callback set inevhttp_start_read())evhttp_connection_done()evhttp_handle_request()(the(*req_cb)())evhttp_connection_fail()evhttp_connection_incoming_fail()evhttp_connection_free()
The very interesting bit is that evhttp_connection_incoming_fail() seemed to
intentionally leak the evhttp_request, preventing evhttp_connection_free() from
freeing it, with this comment:
/*
* these are cases in which we probably should just
* close the connection and not send a reply. this
* case may happen when a browser keeps a persistent
* connection open and we timeout on the read. when
* the request is still being used for sending, we
* need to disassociated it from the connection here.
*/
if (!req->userdone) {
/* remove it so that it will not be freed */
TAILQ_REMOVE(&req->evcon->requests, req, next);
/* indicate that this request no longer has a
* connection object
*/
req->evcon = NULL;
}
I’m still not sure exactly why that case is needed. But I did know that if
evhttp_handle_request() is called with req->evcon->state == EVCON_DISCONNECTED,
then the user and the request is done. My minimal fix was
@@ -2231,6 +2231,7 @@ evhttp_handle_request(struct evhttp_requ
if (req->uri == NULL) {
event_debug(("%s: bad request", __func__));
if (req->evcon->state == EVCON_DISCONNECTED) {
+ req->userdone = 1;
evhttp_connection_fail(req->evcon, EVCON_HTTP_EOF);
} else {
event_debug(("%s: sending error", __func__));
In our testing, that didn’t result in any use-after-free crashes or unusual failed requests. This is what memory use looked like under real load with that fix:

That seemed to do it. We weren’t expecting the keep-alive connection leak to be that significant, but apparently it was. (The memory ramp up before it stabilizes is fragmentation of object allocations in memory approaching a maximum before gaps formed are large enough to be reused.)
Conclusion
Graphs of system and application metrics can reveal unexpected behaviors that reflect bugs. Create tools to help you investigate issues if you need them. Tiny fixes can have huge effects.
Metrics - Building Clickatron
One of the core product features of bitly has always been metrics. Unlike tools like google analytics, bitly metrics have always been a way to gather website metrics when you don’t control the website. Want metrics for a book you wrote on Amazon? Use a bitly link. Want metrics for a blog post? You can use a bitly link for that, too.
As bitly usage has grown, one of the systems that has evolved several times along the way is our metrics platform. Originally implemented as log files with a hierarchical timestamp key, later metrics data was loaded into a flat mysql table. A subsequent revision moved that data into a cluster of tokyocabinet servers. That was eventually supplemented with some additional metrics datasets in mongodb. By early 2011 we were using 3 different metrics systems, and had outgrown 3 others. We started with a set of goals to build a scalable time series database application, which we now call Clickatron.
Goals
The set of goals we wanted to achieve with a new metrics system were:
1) Move from daily to hourly granularity (not everyone should have to pretend they live in EST). A desire was to be able to satisfy read requests on-the-fly in any unit of time (hour, day, week, month) for any timezone based on the single hourly dataset (removing duplication present in other systems)
2) Compact on-disk format. One of the previous metrics systems had a very inefficient data structure which was causing both bloated data set sizes and performance problems resulting from insufficient disk IO. In order to satisfy read requests at different units of time, it was also storing several copies of the data.
3) Ability to bulk-load new datasets. There were two problems with previous metrics systems that inhibited adding new metrics datasets. Previous systems lacked proper namespacing which caused essentially duplicate systems for each dataset, and there was not enough excess capacity to insert new metrics going backwards in time.
4) Scalability. Our existing metrics systems were at their limits in terms of handling real-time increments and we didn’t have a simple path forward to add capacity to those systems.
Architecture
As mentioned in our post about sortdb, many datasets have a small subset that is heavily updated, and a larger related dataset that does not change. Metrics data is a textbook example where updates are limited to a small time range, and an overwhelming majority of the data is static.
We chose to take advantage of this and implement an architecture that split data into two separate storage systems. A Realtime System is responsible for per-hour data covering the past 48 hours and also handles increment operations. A separate Archival System stores all of the older data and is updated daily with a new set of static database files from a Rebuild System.
Each of these two systems are themselves clusters of key/value databases. The keys are a combination of
namespace, record key, and time components (explained in more detail below). Data is spread across the shards
of each cluster by a simple crc(shard_key) % number_shards. The shard key is comprised of only parts of the
actual key in order to improve data locality on a per-request basis. For example the shard key excludes time components
so that all data for a request is stored on the same shard regardless of the time range requested.
Realtime System
The Realtime System is comprised of a 3-position ring of storage engines with positions for Today, Yesterday, and
Tomorrow. The storage engines in the Today position handle increments. The storage engine in the Yesterday
position has a 24-hour window to be exported before being cleared for re-use in the Tomorrow position.
Each spot on the ring corresponds to a cluster of simpletokyo / ttserver pairs which data is
sharded across. Records stored in this system are stored in <key>,<value> format where each record represents
an integer value for a single hour.
Rebuild System
The Rebuild System serves 3 purposes. Its primary role is to do the export from the Realtime System every 24 hours and combine that new dataset with all previous datasets to generate new data files for the Archive System. As part of that export process, the Rebuild System generates a second copy of the merged data files to serve as backup. Because it generates new data files from scratch every day, it also provides a spot to introduce new historical data independent of the number of records added. We have been able to use this to introduce new datasets as large as 13 billion keys. Good luck waiting for that many inserts into [insert database name].
The rebuild process is a set of steps that involve sorting, merging, reformatting keys, and combining records. That’s followed by a final sharding and another sort and merge operation. These steps happen largely in parallel and are each split into smaller segments of work that get distributed across several machines. Many of these steps also intelligently avoid redoing unnecessary work by keeping intermediate files split by time range.
Archive System
The Archive System consists of static csv files accessed through sortdb. These files are spread across hosts as needed depending on desired read performance. Multiple sortdb instances for the same data files can be used on separate physical hosts for read availability and fault tolerance.
API
The API layer directs increment requests to the right spot on the Realtime System ring. For increments, the API layer also writes out a local oplog for data durability in case there is a hardware failure in the Realtime System. For data fetches, the API layer handles the potential need to pull data from both the Realtime System and the Archive System, merging the two data sets into a single response. The components of the API involved in incrementing are written in C using simplehttp, and the components for querying are written in a combination of python using tornadoweb and C using simplehttp.
Architecture Overview

Storage Formats
Database Key Optimizations
Two optimizations directly aimed at lowering disk storage needs are a compact time format and a global lookup table for records with long keys. The choice to use sortdb also directly reduced our storage requirements as the data index is stored with the data (more specifically the index is the data).
For time values we use a compact 4 character YMDH representation. Using this format c3p1 corresponds to 1am on March 25th, 2012.
YMDH == _b32(year % 100) + _b32(month) + _b32(day) + _b32(hour)
Any time a key contains a reserved character (<space> : , . |) or is 12 or more characters, we create a
12-character hash of it, and store a lookup record. This means that if we want to count the number of referrers from
http://www.facebook.com/ instead of storing that 24-character string every time we wanted to count it, we count a
smaller 12-character hash such that _hash('http://www.facebook.com/') == 'h0aMB8AuNw4=', and do a reverse lookup to
expand the hash back to the full value at query time. It may not seem like much, a savings of even a few bytes for
repeated values makes a big difference on datasets with billions of records.
Record Types
Our time series database is made up of 3 types of records. Lookup Records (which are the expansions for the 12 character hashes above). Total Records which are a single key with values per hour over time. And Subtotal Records which represent the line item values whose sum rolls up to the Total Record for each hour.
We use a different data format between the Realtime System and the Archive System for storing Total Records and Subtotal Records. The data format used for the Realtime System is optimized for writes while the format used for the Archive System is optimized for compactness and reads.
In the Realtime System records have one data point, but in the Archive System records are multi-column and have many values. Many records in the Realtime System will be collapsed to a single multi-column record in the Archive System. Records in the Realtime System are always for a single hour, but Total Records in the Archive System are for all time ranges, and Subtotal Records are similarly for all subtotal keys (for an hour). This facilitates retrieval, but it also provides for a more compact storage (the record key is stored only once for that multi-column record compared with the Realtime System). Metrics data is very sparse (many keys only have values for a few hours) so the time values in a Total Record also serve as an index of what data to expected in the equivalent subtotal data set.
Record Formats
Total Records - Realtime System
A Total Record as stored in the Realtime System consists of a namespace, a total key, an hourly time
value, and the counter value.

Example: These are two Total Record in the Realtime System representing total clicks for a single user in two
separate hours. u is our namespace for “user clicks”, jehiah is my username, c413 and c41l are time components
and these records have values of 2 and 5 respectively.
u|jehiah.c413,2
u|jehiah.c41l,5
Total Records - Archive System
In the Archive System, all Total Records for the same namespace + total key combination are collapsed into one
record with the individual time value and counter value pairs as the multi-column component.

Example: This the same two Total Records from the Realtime System as stored in the Archive System. Here the key is
only the u namespace and jehiah total key, but the value is multi-column pairs of time components c413 and c41l
and values of 2 and 5. In this small example, the archive record is 27% more compact.
u|jehiah,c413:2 c41l:5
Subtotal Records - Realtime System
A Subtotal Record as stored in the Realtime System consists of a subtotal namespace, a total
namespace, a total key, a subtotal key, a hourly time value and a counter value. This record is structured such
that the (total namespace + total key) matches exactly to a Total Record. Further the count values are related
such that sum of count values for all subtotal keys for a subtotal namespace, total namespace, total key,
time value exactly equals the matching value in a Total Record

Example: Here are 4 Subtotal Records that represent two different datasets related to the Total Record above as
stored in the Realtime System. One dataset with the c subtotal namespace represents the clicks by country, and the
other r subtotal namespace represents the clicks by referrer. Each of these datasets (4 + 1 and 3 + 2) sum up
to the 5 in the Total Record above. The US and JP are country values in the subtotal key, and 5bmehgOB4+w= and
h0aMB8AuNw4= are 12 character lookup values that are placeholders for longer subtotal keys. These records in the
Realtime System are per-hour so they can be incremented individually.
c.u|jehiah.US.c41l,4
c.u|jehiah.JP.c41l,1
r.u|jehiah.5bmehgOB4+w=.c41l,3
r.u|jehiah.h0aMB8AuNw4=.c41l,2
Subtotal Records - Archive System
In the Archive System all Subtotal Records for a given hour for a subtotal namespace collapse so that the subtotal
key and count values create the multi-column data.

Example: The same 4 values in the Realtime System are represented as two multi-column records in the Archive System. Here the records are still one per hour, but all subtotal keys for that hour are stored together. For this small example, the archive format is 31% smaller.
c.u|jehiah.c41l,US:4 JP:1
r.u|jehiah.c41l,5bmehgOB4+w=:3 h0aMB8AuNw4=:2
Record Visualization
This is a visualization of the data contained within the pair of Total Record (green) and a Subtotal Record (purple) from the Archive System. On the right are the same records but with example values filled in.

Queries
Since Clickatron stores data with hourly granularity, the API merges that hourly data on the fly to fulfill requests for
other granularities. This approach enables the API to accept an hour_offset parameter in order to fulfill queries for
any timezone aligned on the hour (with apologies to India and other locales aligned on the half hour). In our public API
endpoint we lookup hour_offset from a more generic timezone parameter. It also uses the technique for fulfilling day
queries to response to week, and month queries, and can even respond with week units that start on monday instead of
sunday (we call this “mweek”). You can see an example of these parameters in our API Documentation
for user and link metrics.
Performance in the Real World
Clickatron has been in production for over a year now. It has successfully replaced several other metrics systems, reduced hardware requirements, improved data granularity, and reduced query latency. As previously mentioned, we have leveraged its rebuild functionality to bulk load datasets with as many as 13 billion data points at once with zero service impact - directly contributing to our ability to expand the breadth of our data. Despite an increase in the breadth of metrics, the granularity of data storage, the timespan of accumulated data, and a significant increase in traffic, Clickatron’s current persistent footprint is less than one quarter the size of the disparate systems it replaced a year ago. Clickatron currently tracks well over 100 billion data points, a number that increases every day, and we feel well situated to handle future needs as they arise.
EOL
If this sounds like a fun metrics system to use or to work on, bitly is hiring.
sortdb - static key value database
As dataset sizes grow, you need to buy larger/faster database machines, or shard data across multiple machines. sortdb is a database server we have written to help fill a specific need for performant access to static data.
One observation about many datasets is that there is often a small dataset that is heavily updated, and a larger older related dataset that does not change. It is this second case where sortdb can be used to expose a query interface to static datasets.
sortdb has a HTTP interface built on libevent, and exposes get, mget and fwmatch endpoints to key / value data in
a sorted csv data file. Internally, the datafile is mmaped and a binary search is
performed to find rows in the file. It is part of
simplehttp, a family of libraries and daemons for building
scalable web infrastructures.
By mmap’ing the data file sortdb allows the operating system to handle maintaining the most important parts of the data in memory while retrieving others from disk transparently. Since the file is searched using a binary search, it means the first few common steps of the search tree are quickly cached in ram and execute very quickly. The operating system will also share mmap’d data between multiple processes. Practically, this means you can start a second sortdb process pointed at the same data file for higher read throughput.
In database terms, a sorted key/value datafile is essentially a covering index where all the values are stored with the key. Because there is no need to store a separate index of the data, this is a compact on-disk representation. (removing the need for separate indexes can often give a 20~30% savings in disk usage).
Operationally managing sortdb is easy because the data files it uses are static and read-only. This makes it easy to
copy the file to multiple servers and distribute requests to multiple sortdb instances on separate servers to handle a
higher workload, or to add redundancy. It is also possible to point sortdb to a new file with zero downtime (literally
mv data.csv old.csv && mv new.csv data.csv && kill -s HUP $pid).
Using sortdb
At bitly we use sortdb successfully as a core component of our metrics system and are very happy with it’s performance characteristics. Our long term metrics system is composed of two different storage systems. One “realtime” system stores metrics for the past 48 hours and processes all increment requests. A second “static” storage system is built on sortdb and stores all data prior to the past 48 hours. Every day, 24 hours of data is exported from the realtime system, and new static data files are re-built for the static system comprising all the existing data, and the new 24 hour data segment.
A small slice of our “static” metrics data looks like this:
c.u|jehiah.c3h5,US:2
c.u|jehiah.c3h7,US:1
c.u|jehiah.c3h9,None:2 US:3
c.u|jehiah.c3ha,None:2
c.u|jehiah.c3hb,None:2 US:3
c.u|jehiah.c3hc,None:3 US:4
c.u|jehiah.c3hd,None:5 US:4
c.u|jehiah.c3he,None:2 US:6
c.u|jehiah.c3hf,None:3 US:5
c.u|jehiah.c3hg,None:4 US:5
c.u|jehiah.c3hh,DE:1 None:5 US:7
c.u|jehiah.c3hi,CA:1 None:6 US:5
u|jehiah.c3hh,13
u|jehiah.c3i0,7
You will notice that these are key,value records where the key is a compound key often made up of a namespace and
sometimes a time component, and the value is sometimes a repeating (key + : + value) sequence.
With that dataset in plain text file named data.csv, you can point sortdb at it with this run command (setting comma
as the field separator)
$ sortdb --field-separator=, --db-file=data.csv
Now it’s possible to query for a single record
$ curl 'http://127.0.0.1:8080/get?key=u|jehiah.c3hh'
13
$ curl 'http://127.0.0.1:8080/get?key=c.u|jehiah.c3hh'
DE:1 None:5 US:7
Or forward match a range of records
$ curl 'http://127.0.0.1:8080/fwmatch?key=u|jehiah.'
u|jehiah.c3hh,13
u|jehiah.c3i0,7
EOL
If this sound like a fun project to hack on, bitly is hiring.
Infrastructure as a Platform
Introduction
At bitly, infrastructure has two core responsibilities.
- The obvious - systems architecture, performance, scaling, technology choices, and implementation details.
- The not-so-obvious - new hire on-boarding, developer tools, and idioms that enable non-infrastructure engineers (science/research, application developers, etc.) to build scalable, performant, operationally sound, and easy-to-develop pieces of your product.
I say not-so-obvious because many of these things aren’t pressing issues when teams are small, everyone is in one room, and it’s crystal clear what the most important goal is from an engineering perspective.
As an engineering team grows, it becomes more and more important to develop and evangelize a common way of solving problems and getting things done. Without these guidelines you end up with inconsistent solutions that have a dramatic impact on the ability to operationally handle production systems (let alone the time cost of engineers constantly re-inventing the wheel).
Maybe you’re responsible 24/7 for production systems - consider any combination of the following situations:
- logs are in 14 different places
- an engineer chose a different one of the far too many Python web frameworks
- services are started by the current flavor-of-the-week process manager
- response formats from APIs differ
- techniques used to process data vary
- monitoring is done differently or not at all
- backups aren’t consistent
- no tests
Sounds like one helluva mess to debug. (and it’s always at 3am)
Let’s talk about how we’ve made progress over the past year solving some of these problems.
Development Workflow
A big win here was our switch to Git (and GitHub). The freedom and flexibility this gave us to design a powerful code-flow (develop -> review -> merge -> deploy) has proved extremely valuable.
We keep our code in one repository (application code, infrastructure tools, configuration and system dependencies). When possible we work to move things into external open source projects that are paired with an install script in our main repo. Every engineer develops in their own fork. Code makes it into master in one of two ways:
- cherry-picking - a small, well defined changeset or high-priority bug fix can be cherry-picked into
master. - pull request - anything else follows the process of creating a branch off
master, developing the feature, and opening a pull request.
It should be noted that another member of the engineering team will review either of the above and provide constructive feedback, ultimately being the one to bring the code into master. Code review is a first class member of our development workflow - it is hugely beneficial to both the developer and the reviewer. The better you are at reading code, the better you are at writing it. For efficiency, and to respect each others time, we often exchange commit hashes and play tit-for-tat with pull-request reviewing.
Another benefit of this model is that master maintains a clean history, always moving forward. In doing so we avoid the pitfalls of coordinating a destructive history change with so many outstanding branches. Still, we understand the value of Git’s power - engineers are encouraged to re-write history to their hearts content while developing in their own branch. When ready, a comment on the pull request stating “ready for review @github_user” will notify the corresponding person to begin review at their discretion.
Reviewing takes the form of comments on the pull request. As each round of review is completed, the developer will comment “resolved” where appropriate and push additional commits to the branch. This culminates in one final rebase and squash before a merge. We find that looking back you rarely need the back-and-forth of many small commits and we prefer to see the change as a whole as it relates to the issue documented in the pull request.
Developing in a Virtual Machine
If you’re not:
- writing code in an environment that mimics production
- setup by the same scripts and processes that stand up production hosts
- exposed to the challenges of making things play nice together
As soon as your code is actually running in production it’s probably not going to go well.
We believe whole-heartedly in the VM approach. Every developer (and even designers!) have a local VM for developing changes to any component in our infrastructure, front to back.
Most importantly, it forces you to design your applications to be “environment aware”. Your application code should only know the how. The where and how many is the responsibility of configuration files that pivot on the environment the code is running in. (see @jehiah’s post for more info about how we structure this)
The benefits of this are easy to understand - if you can get it working, tested, and deployable in the VM then getting it running successfully in production should be as simple as a deploy.
However, the choice of using a VM doesn’t come without challenges:
- When you’re service-oriented, as the overall architecture grows, it becomes increasingly difficult to “fit” all services in a single VM on your local machine (RAM, etc.).
- Not all services are equal and different engineers tend to touch different components more often.
- Setup, maintenance, and learning curve are a time sink.
We’ve only begun to scratch the surface in addressing these issues. We’ve begun to logically group services so that they can be spun up/down on demand, giving some control over whats running simultaneously. We’ll surely post a follow up with our findings as we continue to make progress in this area.
The Way of the Bitly
The value of choosing sane and repeatable solutions to common problems becomes clearer and clearer as things grow - whether it’s data, engineering team size, or number of services in your infrastructure.
Often the # of operations engineers doesn’t scale linearly with the # of services they have to monitor in production. It becomes extremely important to make their lives easy by developing good frameworks and solutions for solving problems, and make those solutions readily available, well documented, and easy to use.
We begin this education process early on with new engineers.
An engineer’s first day sets the tone for his future at your company. A new engineer at bitly can expect that on the first day they’ll have some shiny new hardware to play with as well as one simple goal, get your bio on the bitly about page. Since we put a strong focus on working in a VM, that implies that the existing operations and infrastructure team has done their job to have all the appropriate accounts setup such as access to email, wiki, GitHub, and of course, a VM.
This simple task facilitates the process of familiarizing themselves with the dev workflow described above as well as providing the satisfaction of seeing code go into production on day 1.
We try to expose new engineers to all different facets of bitly infrastructure and encourage them to explore the codebase. Fixing bugs, adding small pieces of functionality, and developing brand new services are common first week tasks.
We think it’s important to step away from the computer, too. We schedule internal tech talks where we discuss the successes (and challenges) of the current state of the infrastructure. It’s an open forum where questions are answered, odd names get definitions, and bitly idioms are discussed.
EOL
There’s lots more to talk about:
- how we deploy…
- what are some of those bitly idioms…
- how we make technology decisions…
- meetings and communication…
and more. We’ll save all that for future posts.
As always, if any of this sounds interesting to you, bitly is hiring.
Introducing Asyncdynamo
When Amazon announced its DynamoDB service in late January, we quickly identified it as a promising candidate to meet many of our database demands: high availability and fault tolerance, low latency, and low maintenance. SSD hardware promised to grant us similar performance to what we experience with an in-memory datastore, while replication handled by Amazon promised to let our ops team sleep soundly at night (a cranky ops team is never a good thing). While it would be impractical for us to move all of our core infrastructure over to hosted services, we felt that Dynamo could be a good persistence option for applications already deployed to EC2.
Since all interactions with Dynamo are carried out over HTTP, there’s no real need for a custom client library to begin performing database options. Our applications are written primarily in Python, and Boto provided just enough helper methods to handle authentication and proper request formatting (neither of which are very straightforward tasks). However, Boto’s network calls are executed using Python’s built in httplib, whose blocking nature makes it impractical for use with Tornado, an asynchronous framework.
Consequently, we set out to develop a library that would make use of Boto’s facilities for Dynamo-specific operations (request formatting and signing, as well as response parsing) but would leverage Tornado’s async HTTP client to execute the actual requests. For the benefit of any other Tornado users hoping to make use of DynamoDB, we are proud to announce Asyncdynamo.
Asyncdynamo requires Boto and Tornado to be installed, and must be run with Python 2.7. It replaces Boto’s synchronous calls to Dynamo and to Amazon STS (to retrieve session tokens) with non-blocking Tornado calls. For the end user its interface seeks to mimic that of Boto Layer1, with each method now requiring an additional callback parameter.
A little trickery was involved in making this work: Boto’s methods to sign requests expect to be operating on an instance of boto.connection.HTTPRequest, so we needed to trick them into accepting our tornado.httpclient.HTTPRequest. Working with a dynamic language makes this kind of type-hijacking much easier than it might otherwise be, and after a short bit of trial and error we were able to successfully fire off our hybrid requests to Amazon.
Currently, Boto Layer1 equivalents of the essential operations - Get, Put, and Query - have all been implemented, and any other interaction with Dynamo is possible using a generic make_request method. Once initialized with your AWS keys, Asyncdynamo will handle all authentication and session token management behind the scenes. Our immediate development plans are to fully replicate the methods offered in Boto Layer1, and in the longer term we hope to be able to add a further layer of abstraction similar to Boto Layer2.
Interested in working on these kinds of projects? We’re hiring. We’ll also be at PyCon this week if you’d like to chat about this project, small links, big data, or anything in between.
Use Amazon SNS for Nagios Alerts
Amazon recently added SMS publishing capability to their Simple Notification Service(SNS) platform.
Using SNS to send nagios alerts:
At Bitly, we used individual carrier’s email SMS gateways (eg. 123456789@text.att.net) to send pages to the on-call person as well as other people on the ops team. This solution was not cutting it as messages were heavily throttled on the carrier’s side (the gateway being a free service provided by the carrier meant that we couldn’t really complain or get the problem rectified). This basically meant that pages arrived hours later in some cases and defeated the purpose of an alert during an emergency.
Once SMS capability was announced, it was pretty much a no-brainer to switch to SNS.
I wrote a quick and simple python script with the help of a popular AWS python library called boto. You can find the source over at http://bit.ly/pyamazonsns.
This script can be used to send any kind of SNS message, not just SMS.
Here is a snippet of how this would tie into nagios:
define command{
command_name notify-host-by-txt
command_line printf "%b" "$HOSTALIAS$" | send_sns.py $CONTACTPAGER$
}
Feel free to fork it, improve on it and add features beyond the scope of our usage. Drop us a line to let us know :)
Introduction to simplehttp
Part of our engineering philosophy is to keep things fast and simple. Aim to serve one purpose and serve it well. Speak HTTP and encode in JSON. Prototype in Python and speed it up in C.
There are a few components that follow these tenets and sit at the core of our infrastructure. We’ve open-sourced them under the simplehttp moniker. They serve as the the architectural foundation for higher-order functionality.
simplehttp
At the lowest level is the simplehttp library, an abstraction of libevent’s evhttp functions, aimed at trivializing the task of writing an evented HTTP server in C. It’s dead simple yet provides high-level features such as:
- Tornado inspired options parsing
- Tornado inspired logging
- Automatic per-endpoint stat tracking (request counts, 95% times, averages)
- Clean API to perform async HTTP requests
Built on top of simplehttp, perhaps the most important daemons are simplequeue and pubsub.
simplequeue
A rock-solid in-memory message queue for arbitrary message bodies (we use JSON), providing basic /get and /put endpoints. We use this in several key areas to serve as a work queue for asynchronous processing. During a maintenance window or situations where backend services are degraded it also acts as a buffer, queueing up work that needs to be done when backend services are restored.
We use long-lived Python “queuereaders” to poll the simplequeue and perform work. This work might be writing to a database, logging, aggregating, or anything else that you might not want to perform in a blocking fashion during a request cycle. These queuereaders have built-in backoff timers which slow down the processing rate when errors are detected to allow a struggling backend to recover gracefully and to reduce the load on the machine running the queuereader.
Generally, we silo a simplequeue and its associated queuereaders on each host of the service. Meaning a simplequeue on hostA will only contain messages from requests received by that host and its queuereaders will only process messages from its local simplequeue. We do this to address single point of failure issues.
pubsub
We have many different types of data at bitly, each classified into a stream. There are streams of encodes (shortens), decodes (“clicks”), user events, etc. In order to provide a central, consistent, means for developers to access data in realtime we expose these streams via pubsub.
Publishing a message is a simple HTTP request to the /pub endpoint, in our case this usually happens in an queuereader who’s sole purpose is to read off a specific simplequeue and write to a specific pubsub.
A client consuming the stream is a a long-lived HTTP request to the /sub endpoint. Messages are transmitted as newline deliminated JSON.
To pair with pubsub the repository contains three additional utilities built on pubsubclient. ps_to_file and ps_to_http are fairly self-explanatory. One archives a pubsub stream to a file (automatically rolling the output files for you based on a configurable strftime format string), and the other writes a stream of data to destination HTTP endpoints. The latter can be used to send messages to a simplequeue, another pubsub stream, or any other HTTP endpoint. Additionally, pubsub_filtered repeats a pubsub stream and provides the option to remove or obfuscate fields, creating a filtered view on a subset of the data. At bitly we use these tools to archive our data streams and to pass data published by one application into another application (or another datacenter).
EOL
If any of these things sound like fun projects to hack on, bitly is hiring.
Welcome to the bitly Engineering Blog
At bitly, we have been happy to contribute to a number of open source projects, and we have even started a few of our own. We look forward to talking about those, and other engineering details here. You can stay up-to-date by following @bitly