Securing Internal Applications

A common infrastructure problem is managing access to internal applications. Some patterns for solving that problem include: fronting internal apps with HTTP Basic or HTTP Digest authentication through Apache or nginx, running the apps on secret ports, and bundling an authentication system as part of each application. Many off-the-shelf applications include no built-in authentication, and count on upstream proxies/systems for access control which limits options for securing them. Google Auth Proxy is a tool we have developed to give us a better ability to secure internal applications by using Google Account authentication (like many other startups, we rely on Google Apps).

At bitly, we like fronting HTTP applications with Nginx and use that approach for nearly all of our stack, from Tornado apps handling our core APIs to common internal tools (e.g. Nagios, Munin, Graphite) to homegrown tools like NSQ and our deploy system. For many of those systems, we took advantage of this by configuring HTTP Basic authentication to restrict access. Using Basic Auth however led to situations where the authentication information would be stored in configuration files and scripts, leaking it throughout the repository.

Some of our homegrown tools initially relied on bitly’s OAuth 2 service for authentication, which meant that sign in was managed through bitly accounts by the OAuth flow. This left each application to handle authorization by maintaining a list of bitly accounts that actually had permission to access the app. This was an improvement over HTTP Basic Auth as each application did not need to accept or store passwords, but it was less than ideal because each app still needed to store and manage its own list of authorized users. Over time, this led to a large number of disparate systems with separate authorization lists. This approach was only feasible for internal tools and thus couldn’t apply to other open source applications we were using.

We developed Google Auth Proxy as a new tool to use in securing internal applications. It is a HTTP Reverse Proxy that provides authentication using Google’s OAuth2 API with flexibility to authorize individual Google Accounts (by email address) or a whole Google apps domain. For internal applications, this is convenient because we can now allow our whole @bit.ly Google apps domain without separately managing accounts or passwords.

Google Auth Proxy requires a limited set of privileges in order to authenticate users (asking for only the userinfo.email and userinfo.profile OAuth2 scopes). This authentication information is then passed to the upstream application as HTTP Basic Auth (with an empty value for the password), and in a HTTP Header as X-Forwarded-User for applications that need that context.

Google Auth Proxy has been in production for 6 months and has helped reduce overhead for managing internal applications (no setup time for new or removed accounts) and has made it easier for us to open up access for important tools to the entire company. It’s also worth mentioning that if you use two-factor-authentication with your Google accounts, that security carries over to improved authentication for Google Auth Proxy.

We chose to write Google Auth Proxy in Go because golang has built in support for writing a concurrent ReverseProxy in the net/http/httputil package. This meant that little work was needed to proxy requests and we could focus on just writing the authentication layer.

If you find this useful, we’d love to hear about it @bitly

Forget Table

Forget Table is a solution to the problem of storing the recent dynamics of categorical distributions that change over time (ie: non-stationary distributions).

What does this mean? Why is this useful? What makes this the most important thing since sliced bread?! Read on!

Storing distributions is crucial for doing any sort of statistics on a dataset. Normally, this problem is as easy as keeping counters of how many times we’ve seen certain quantities, but when dealing with data streams that constantly deliver new data, these simple counters no longer do the job. They fail because data we saw weeks ago has the same weight as data we have just seen, even though the fundamental distribution the data is describing may be changing. In addition it provides an engineering challenge since the counters would strictly grow and very soon use all the resources available to it. This is why we created Forget Table.

Background

A categorical distribution describes the probability of seeing an event occur out of a set of possible events. So an example of this at bitly is that every click coming through our system comes from one of a fixed number of country codes (there are about 260). We would like to maintain a categorical distribution, that assigns a probability to each country, that describes how likely any given click comes from each country. With bitly’s data, this is going to give a high weight to the US and lower weights to Japan and Brazil, for example.

It’s very useful for bitly to store distributions like this, as it gives us a good idea as to what’s considered ‘normal’. Most of the time we produce analytics that show, for example, how many clicks came from a given country, or referrer. While this gives our clients a sense of where there traffic is coming from, it doesn’t directly express how surprising their traffic is. This can be remedied by maintaining a distribution over countries for all of bitly, so that we can identify anomalous traffic, ie: when a particular link is getting disproportionally more clicks from Oregon than we would normally expect.

The difficulty that Forget Table deals with comes from the fact that what bitly considers normal changes constantly. At 8am on the East Coast of the US, we’d be surprised to see a lot of traffic coming from San Francisco (they’re all still asleep over there) however at 11am EST we should expect to see tons of traffic coming from San Francisco. So unless we allow our understanding of what’s considered normal to change over time, we are going to have a skewed idea of normality.

This is a general problem over longer time scales, too. The behavior of the social web is different in December than it is in July, and it’s different in 2012 than it was in 2011. We need to make sure that our understanding of normality changes with time. One way of achieving this is to forget old data that’s no longer relevant to our current understanding. This is where Forget Table comes in.

Why forget in the first place?

The basic problem is that the fundamental distribution (ie: the distribution that the data is actually being created from) is changing. This property is called being “non-stationary”. Simply storing counts to represent your categorical distribution makes this problem worse because it keeps the same weight for data seen weeks ago as it does for the data that was just observed. As a result, the total count method simply shows the combination of every distribution the data was drawn from (which will approach a Gaussian by the central limit theorem).

A naive solution to this would be to simply have multiple categorical distributions that are in rotation. Similar to the way log files get rotated, we would have different distributions that get updated at different time. This approach, however, can lead to many problems.

When a distribution first gets rotated in, it has no information about the current state of the world. Similarly, at the end of a distribution’s rotation it is just as affected by events from the beginning of it’s rotation as it is by recent events. This creates artifacts and dynamics in the data which are only dependent on the time of and time between rotations. These effects are similar to various pathologies that come from binning data or by simply keeping a total count.

On the other hand, forgetting things smoothly using rates we have a continuous window of time that our distribution is always describing. The further back in time the event is, the less of an effect it has on the distribution’s current state.

Forgetting Data Responsibly

Forget table takes a principled approach to forgetting old data, by defining the rate at which old data should decay. Each bin of a categorical distribution forgets its counts depending on how many counts it currently has and a user specified rate. With this rule, bins that are more dominant get decayed faster than bins without many counts. This method also has the benefit of being a very simple process (one that was inspired by the decay of radioactive particles) which can be calculated very quickly. In addition, since bins with high counts get decayed faster than bins with low counts, this process helps smooth out sudden spikes in data automatically.

If the data suddenly stopped flowing into the Forget Table, then all the categorical distributions would eventually decay to the uniform distribution - each bin would have a count of 1 (at which point we stop decaying), and z would be equal to the number of bins (see a visualization of this happening). This captures the fact that we no longer have any information about the distribution of the variables in Forget Table.

The result of this approach is that the counts in each bin, in each categorical distribution, decay exponentially. Each time we decrement the count in a bin, we also decrement the normalising constant z. When using ForgetTable, we can choose a rate at which things should decay, depending on the dominant time constants of the system.

Building Categorical Distributions in Redis

We store the categorical distribution as a set of event counts, along with a ‘normalising constant’ which is simply the number of all the events we’ve stored. In the country example, we have ~260 bins, one per country, and in each bin we store the number of clicks we’ve seen from each country. Alongside it, our normalising constant stores the total number of clicks we’ve seen across all countries.

All this lives in a Redis sorted set where the key describes the variable which, in this case, would simply be bitly_country and the value would be a categorical distribution. Each element in the set would be a country and the score of each element would be the number of clicks from that country. We store a separate element in the set (traditionally called z) that records the total number of clicks stored in the set. When we want to report the categorical distribution, we extract the whole sorted set, divide each count by z, and report the result.

Storing the categorical distribution in this way allows us to make very rapid writes (simply increment the score of two elements of the sorted set) and means we can store millions of categorical distributions in memory. Storing a large number of these is important, as we’d often like to know the normal behavior of a particular key phrase, or the normal behavior of a topic, or a bundle, and so on.

Forgetting Data Under Strenuous Circumstances

This seems simple enough, but we have two problems. The first is that we have potentially millions of categorical distributions. Bitly maintains information for over a million separate key phrases at any given time, and (for some super secret future projects) it is necessary to store a few distributions per key phrase. So we are unable to iterate through each key of our Redis table in order to do our decays, so cron-like decays wouldn’t be feasible (ie: decaying every distribution in the database every several minutes).

The second problem is that data is constantly flowing into multiple distributions: we sometimes see spikes of up to 3 thousand clicks per second which can correspond to dozens of increments per second. At this sort of high volume, there is simply too much contention between the decay process and the incoming increments to safely do both.

So the real contribution of Forget Table is an approach to forgetting data at read time. When we are interested in the current distribution of a particular variable, we extract whatever sorted set is stored against that variable’s key and decrement the counts at that time.

It turns out that, using the simple rate based model of decay from above, we can decrement each bin by simply sampling an integer from a Poisson distribution whose rate is proportional to the current count of the bin and the length of time it has been since we last decayed that bin. So, by storing another piece of information, the time since we last successfully decayed this distribution, we can calculate the amount of counts to discard very cheaply (this algorithm is an approximation to Gillespie’s algorithm used to simulate stochastic systems).

In Redis we implement this using pipelines. Using a pipeline, we read the sorted set, form the distribution, calculate the amount of decay for each bin and then attempt to perform the decay. Assuming nothing’s written into the sorted set in that time, we decay each bin and update the time since last decayed. If the pipeline has detected a collision — either another process has decayed the set or a new event has arrived — we abandon the update. The algorithm we’ve chosen means that it’s not terribly important to actually store the decayed version of the distribution, so long as we know the time between the read and the last decay.

Get the Code and We’re Hiring!

The result is a wrapper on top of Redis that runs as a little HTTP service. Its API has an increment handler on /incr used for incrementing counts based on new events, and a get handler on /get used for retrieving a distribution. In addition, there is an /nmostprobable endpoint that returns the n categories which have the highest probability of occurring.

There are two versions, one in Go (for super speed) and the other in Python. The code is open source and up on Github, available at http://bitly.github.com/forgettable.

As always, if you like what you see (or feel the need to make improvements), don’t forget that bitly is hiring!.

Get ready for more posts on science at bitly, and If you have any specific questions, feel free to ping me on twitter @mynameisfiber.

spray some NSQ on it

We released NSQ on October 9th 2012. Supported by 3 talks and a blog post, it’s already the 4th most watched Go project on GitHub. There are client libraries in 7 languages and we continue to talk with folks experimenting with and transitioning to the platform.

This post aims to kickstart the documentation available for “getting started” and describe some NSQ patterns that solve a variety of common problems.

DISCLAIMER: this post makes some obvious technology suggestions to the reader but it generally ignores the deeply personal details of choosing proper tools, getting software installed on production machines, managing what service is running where, service configuration, and managing running processes (daemontools, supervisord, init.d, etc.).

Metrics Collection

Regardless of the type of web service you’re building, in most cases you’re going to want to collect some form of metrics in order to understand your infrastructure, your users, or your business.

For a web service, most often these metrics are produced by events that happen via HTTP requests, like an API. The naive approach would be to structure this synchronously, writing to your metrics system directly in the API request handler.

naive approach

  • What happens when your metrics system goes down?
  • Do your API requests hang and/or fail?
  • How will you handle the scaling challenge of increasing API request volume or breadth of metrics collection?

One way to resolve all of these issues is to somehow perform the work of writing into your metrics system asynchronously - that is, place the data in some sort of local queue and write into your downstream system via some other process (consuming that queue). This separation of concerns allows the system to be more robust and fault tolerant. At bitly, we use NSQ to achieve this.

Brief tangent: NSQ has the concept of topics and channels. Basically, think of a topic as a unique stream of messages (like our stream of API events above). Think of a channel as a copy of that stream of messages for a given set of consumers. Topics and channels are both independent queues, too. These properties enable NSQ to support both multicast (a topic copying each message to N channels) and distributed (a channel equally dividing its messages among N consumers) message delivery.

For a more thorough treatment of these concepts, read through the design doc and slides from our Golang NYC talk, specifically slides 19 through 33 describe topics and channels in detail.

architecture with NSQ

Integrating NSQ is straightforward, let’s take the simple case:

  1. Run an instance of nsqd on the same host that runs your API application.
  2. Update your API application to write to the local nsqd instance to queue events, instead of directly into the metrics system. To be able to easily introspect and manipulate the stream, we generally format this type of data in line-oriented JSON. Writing into nsqd can be as simple as performing an HTTP POST request to the /put endpoint.
  3. Create a consumer in your preferred language using one of our client libraries. This “worker” will subscribe to the stream of data and process the events, writing into your metrics system. It can also run locally on the host running both your API application and nsqd.

Here’s an example worker written with our official Python client library:

Python Example Code

In addition to de-coupling, by using one of our official client libraries, consumers will degrade gracefully when message processing fails. Our libraries have two key features that help with this:

  1. Retries - when your message handler indicates failure, that information is sent to nsqd in the form of a REQ (re-queue) command. Also, nsqd will automatically time out (and re-queue) a message if it hasn’t been responded to in a configurable time window. These two properties are critical to providing a delivery guarantee.
  2. Exponential Backoff - when message processing fails the reader library will delay the receipt of additional messages for a duration that scales exponentially based on the # of consecutive failures. The opposite sequence happens when a reader is in a backoff state and begins to process successfully, until 0.

In concert, these two features allow the system to respond gracefully to downstream failure, automagically.

Persistence

Ok, great, now you have the ability to withstand a situation where your metrics system is unavailable with no data loss and no degraded API service to other endpoints. You also have the ability to scale the processing of this stream horizontally by adding more worker instances to consume from the same channel.

But, it’s kinda hard ahead of time to think of all the types of metrics you might want to collect for a given API event.

Wouldn’t it be nice to have an archived log of this data stream for any future operation to leverage? Logs tend to be relatively easy to redundantly backup, making it a “plan z” of sorts in the event of catastrophic downstream data loss. But, would you want this same consumer to also have the responsibility of archiving the message data? Probably not, because of that whole “separation of concerns” thing.

Archiving an NSQ topic is such a common pattern that we built a utility, nsq_to_file, packaged with NSQ, that does exactly what you need.

Remember, in NSQ, each channel of a topic is independent and receives a copy of all the messages. You can use this to your advantage when archiving the stream by doing so over a new channel, archive. Practically, this means that if your metrics system is having issues and the metrics channel gets backed up, it won’t effect the separate archive channel you’ll be using to persist messages to disk.

So, add an instance of nsq_to_file to the same host and use a command line like the following:

/usr/local/bin/nsq_to_file --nsqd-tcp-address=127.0.0.1:4150 --topic=api_requests --channel=archive

archiving the stream

Distributed Systems

You’ll notice that the system has not yet evolved beyond a single production host, which is a glaring single point of failure.

Unfortunately, building a distributed system is hard. Fortunately, NSQ can help. The following changes demonstrate how NSQ alleviates some of the pain points of building distributed systems as well as how its design helps achieve high availability and fault tolerance.

Let’s assume for a second that this event stream is really important. You want to be able to tolerate host failures and continue to ensure that messages are at least archived, so you add another host.

adding a second host

Assuming you have some sort of load balancer in front of these two hosts you can now tolerate any single host failure.

Now, let’s say the process of persisting, compressing, and transferring these logs is affecting performance. How about splitting that responsibility off to a tier of hosts that have higher IO capacity?

separate archive hosts

This topology and configuration can easily scale to double-digit hosts, but you’re still managing configuration of these services manually, which does not scale. Specifically, in each consumer, this setup is hard-coding the address of where nsqd instances live, which is a pain. What you really want is for the configuration to evolve and be accessed at runtime based on the state of the NSQ cluster. This is exactly what we built nsqlookupd to address.

nsqlookupd is a daemon that records and disseminates the state of an NSQ cluster at runtime. nsqd instances maintain persistent TCP connections to nsqlookupd and push state changes across the wire. Specifically, an nsqd registers itself as a producer for a given topic as well as all channels it knows about. This allows consumers to query an nsqlookupd to determine who the producers are for a topic of interest, rather than hard-coding that configuration. Over time, they will learn about the existence of new producers and be able to route around failures.

The only changes you need to make are to point your existing nsqd and consumer instances at nsqlookupd (everyone explicitly knows where nsqlookupd instances are but consumers don’t explicitly know where producers are, and vice versa). The topology now looks like this:

adding nsqlookupd

At first glance this may look more complicated. It’s deceptive though, as the effect this has on a growing infrastructure is hard to communicate visually. You’ve effectively decoupled producers from consumers because nsqlookupd is now acting as a directory service in between. Adding additional downstream services that depend on a given stream is trivial, just specify the topic you’re interested in (producers will be discovered by querying nsqlookupd).

But what about availability and consistency of the lookup data? We generally recommend basing your decision on how many to run in congruence with your desired availability requirements. nsqlookupd is not resource intensive and can be easily homed with other services. Also, nsqlookupd instances do not need to coordinate or otherwise be consistent with each other. Consumers will generally only require one nsqlookupd to be available with the information they need (and they will union the responses from all of the nsqlookupd instances they know about). Operationally, this makes it easy to migrate to a new set of nsqlookupd.

EOL

Using these same strategies, we’re now peaking at 65k messages per second delivered through our production NSQ cluster. Although applicable to many use cases, we’re only scratching the surface with the configurations described above.

If you’re interested in hacking on NSQ, check out the GitHub repo and the list of client libraries. We also publish binary distributions for Linux and Darwin to make it easy to get started.

Stay tuned for more posts on leveraging NSQ to build distributed systems. If you have any specific questions, feel free to ping me on twitter @imsnakes.

Improving Frontend Code Quality and Workflow

When I started at Bitly as a Frontend Engineer, we were about to launch the new bitly. It was exciting to be so close to a product launch and we were cranking out lots of code each day. It took me a few days to get my bearings in the sprawling codebase and I saw lots of opportunity for refactoring, cleanup, and style normalization as well as some architectural concepts we weren’t leveraging.

Product launch mode kept us from doing house keeping for the next few months after we released the new product as we collected feedback and iterated on several designs/functionalities.

Once we had a chance to take a step back and regroup, I saw that we could be more productive if we invested in a common JavaScript style (both syntax and module level patterns) and re-evaluated which libraries we were leveraging to build the site.

When we decided the next feature work would be on save/share modal dialogs, I used this as an opportunity to evaluate my new picks for tools and libraries. The result was a success and allowed us to adopt the new tools and libraries for all new features (see Easily Save and Share the Links You Love).

Keep reading to see what tools we ended up using to make us more productive!

Coding style guidelines

If you’ve seen frontend JavaScript codebases older than six months with two or more developers working on them, you’ve seen how hard it is to enforce style.

The goal of a style guide (and code conventions in general) is to reduce the amount of friction when switching between different modules in a codebase and increase the consistency and readability of code. If every module follows the same patterns, you can easily get your bearings in code written by your coworkers. Every team should have a common style that is enforced in code review and ideally written in a document that can be referred to for new hires.

For JavaScript at Bitly, we mostly use the Google Style Guide without the JSDoc and with a variation that non-method variables are lowercase_with_underscores e.g.

var Bitly = function() {
};

Bitly.prototype.doThing = function() {
    var an_object = {};
};

The bigger the team, the more you might have to diverge from your preferred style but you’ll still get tons of benefit from the consistency alone.

Coffeescript for teams

At Bitly, in addition to codifying the style guide, we gave Coffeescript a trial run and decided to continue using it for all new code (like Github does). Coffeescript is quite contentious in the JS community, but for our frontend team we saw a great improvement in workflow and maintainability. Since whitespace is significant and {} and ; are almost always omitted, there is a distinct Coffeescript style that is encouraged by default.

Here’s a snippet from a Backbone View that declares all the DOM event handlers for the view in a concise way.

events:
    'click': 'focusInput'
    'focusin .picker-input': 'onFocusIn'
    'focusout .picker-input': 'checkForFullEmail'
    'keydown .picker-display': 'keyDownCheckForFocusedChip'
    'keydown .picker-input': 'inputKeydown'
    'keyup .picker-input': 'inputKeyup'
    'input .picker-input': 'inputNewText'
    'keydown .picker-suggestions': 'suggestionKeydown'
    'mousedown .picker-suggestions a': 'suggestionMousedown'
    'click .picker-suggestions a': 'selectItem'
    'click .picked-item .remove': 'removeItem'

Fat arrow (=>) is another great feature used for binding a function to the current scope and obviates the var self = this; pattern you will see in almost all JS codebases.

Here, fat arrow binds selectItem at constructor time to the newly constructed instance of MultiPickerView (not the prototype) instead of the default jQuery wrapped current target so I can access View methods/members within the handler.

class App.MultiPickerView extends Backbone.View
# ...
# ...
  selectItem: (e) =>
    e.preventDefault()
    $item = $(e.currentTarget)
    @addEmail $item.data("email")
    # @$input is convention for cached jquery elements 
    # stored on the view instance
    @$input.val ""
    @hideSuggest()
    _.defer => @$input.focus()

Fast and (mostly) logic-less templates with Handlebars

Handlebars is a template language with partials, custom helpers, and minimal logic, which can be compiled on the client or server side. We take advantage of the watcher system we already had in place for compiling Coffeescript to automatically compile our Handlebars templates ahead of time in order to reduce the workload on clients.

Immediate feedback with Coffeescript + File watcher + Growl

One of the major benefits for using Coffeescript is the compile-time checking that catches syntax errors. Instead of using the built-in coffee -cw *.coffee to compile-on-change, I wrote a script that does this but also triggers Growl notifications if the compilation produced errors.

coffeescript-error

The script does the same for *.handlebars files (for which there is not a built-in watcher functionality).

handlebars-error

See the source: Multiwatcher gist

Backbone (Models, Views, Events)

Historically, frontend JavaScript is fertile ground for spaghetti code, tight coupling, repeated code and scattered entry points. The introduction of jQuery increased this trend by making it easy for developers to handle DOM events and add visual effects wherever it was most convenient. This scales to a point, but when you start doing lots of AJAX, manipulating application state, and generating HTML using the DOM APIs, things can quickly get out of hand.

Backbone introduced a few great primitives that can be leveraged to make more maintainable frontend code.

Models are a good abstraction when you have a lot application state that can change based on user interaction and/or AJAX calls.

Views encapsulate DOM elements and provide a declarative way of setting your event listeners using jQuery’s delegated event listeners.

Events implements pub/sub pattern which allows you to easily decouple your models and views. This is probably the most important development for frontend application architecture. The order of events generally might go like this:

  1. Instantiate Model, View
  2. View listens on Model changes
  3. Model attribute changes (via AJAX callback or direct user interaction)
  4. Model fires change event on it’s event channel
  5. View’s listener fires in response and performs some action

The decoupling is that the Model never has knowledge of the View. This means the Model handles only the business logic and can be much more easily tested.

There has been much written on Backbone and other MV* frameworks, so I won’t evangelize much here except to say that adding a View and Model layer in your frontend JavaScript will improve your architecture significantly.

Addy Osmani gives a good treatment as to why you would use MV* and how to select your tools in: Journey through the JavaScript MVC jungle

Unit/Integration Tests with Mocha, Sinon

Unit testing is a given for any modern web app with a long lifecycle and high volume of traffic.

There are many test runner frameworks for testing but you should choose one that matches your style. I like Mocha because it can handle BDD, TDD, and export styles, has a nice web browser interface, is being adopted in lots of places, and is written by TJ Holowaychuk who has a great reputation in the node community. The two other big names in testing frameworks are Jasmine and QUnit, but they are more opinionated and less flexible.

Besides a test runner, you’ll want some helper libraries to make your tests easier to write.

Sinon is essential as it provides spy, stub, and mock functionality to make sure you are testing the unit under test and not its dependencies! The author wrote Test-Driven JavaScript Development and is the foremost expert on this topic.

To support different styles of test assertions, you may like Should.js which provides expressive methods so you can write tests like:

user.pets.should.have.length(5)

Here is a sample test file for our AJAX wrapper method (which uses mockjax):

$.mockjaxSettings.responseTime = 1
$.mockjax(
  url: '/data/beta/missing'
  status: 404
)
$.mockjax(
  url: '/data/beta/invalid'
  status: 200
  responseText:
    status_code: 500
    status_txt: 'MISSING_ARG'
    data: null
)
$.mockjax(
  url: '/data/beta/valid'
  status: 200
  responseText:
    status_code: 200
    status_txt: 'OK'
    data: []
)

describe 'BITLY', ->
  describe '#post', ->
    it 'should trigger .fail for non-200 status_code', (done) ->
      failSpy = sinon.spy()
      doneSpy = sinon.spy()
      BITLY.post('/data/beta/missing', {},
        success: doneSpy
        error: failSpy
      )
        .done(doneSpy)
        .done((data) ->
          data.should.be.a 'object'
        )
        .fail(failSpy)
        .always(->
          doneSpy.called.should.be.false
          failSpy.calledTwice.should.be.true
          done()
        )
    it 'should trigger .fail for non-200 response', (done) ->
      failSpy = sinon.spy()
      doneSpy = sinon.spy()
      BITLY.post('/data/beta/invalid', {},
        success: doneSpy
        error: failSpy
      )
        .done(doneSpy)
        .fail(failSpy)
        .always(->
          doneSpy.called.should.be.false
          failSpy.calledTwice.should.be.true
          done()
        )
    it 'should trigger .done for 200 response', (done) ->
      failSpy = sinon.spy()
      doneSpy = sinon.spy()
      BITLY.post('/data/beta/valid', {},
        success: doneSpy
        error: failSpy
      )
        .done(doneSpy)
        .done((data) ->
          data.should.be.a 'object'
        )
        .fail(failSpy)
        .always(->
          failSpy.called.should.be.false
          doneSpy.calledTwice.should.be.true
          done()
    )

Here is a sample of our test suite and its test runner:

mocha tests

Always be looking to improve your toolkit and sweat the small stuff when it comes to code quality. Every engineer wants to work with other great engineers. Ask yourself “If not me, then who on my team will make the effort?”

If you like working with other great engineers, bitly is hiring.

Classifying Human Traffic with Random Forest Decision Trees

At bitly, we study human behavior on the social web, and we often need to figure out when data is generated by a deliberate human action (organic data) or by an action taken by a script or without a human’s knowledge (inorganic data). bitly data scientist Brian Eoff recently gave a talk at PyData NYC 2012 on a fast random forest decision tree approach, implemented in Python with scikits-learn, to identifying organic vs inorganic data in a realtime stream.


SciKit Random Forest - Brian Eoff from Continuum Analytics on Vimeo.

NSQ: realtime distributed message processing at scale

NSQ is a realtime message processing system designed to operate at bitly’s scale, handling billions of messages per day.

It promotes distributed and decentralized topologies without single points of failure, enabling fault tolerance and high availability coupled with a reliable message delivery guarantee.

Operationally, NSQ is easy to configure and deploy (all parameters are specified on the command line and compiled binaries have no runtime dependencies). For maximum flexibility, it is agnostic to data format (messages can be JSON, MsgPack, Protocol Buffers, or anything else). Go and Python libraries are available out of the box.

This post aims to provide a detailed overview of NSQ, from the problems that inspired us to build a better solution to how it works inside and out. There’s a lot to cover so let’s start off with a little history…

Background

Before NSQ, there was simplequeue, a simple (shocking, right?) in-memory message queue with an HTTP interface, developed as part of our open source simplehttp suite of tools. Like its successor, simplequeue is agnostic to the type and format of the data it handles.

We used simplequeue as the foundation for a distributed message queue by siloing an instance on each host that produced messages. This effectively reduced the potential for data loss in a system which otherwise did not persist messages by guaranteeing that the loss of any single host would not prevent the rest of the message producers or consumers from functioning.

We also used pubsub, an HTTP server to aggregate streams and provide an endpoint for multiple clients to subscribe. We used it to transmit streams across hosts (or datacenters) and be queued again for writing to various downstream services.

As a glue utility, we used ps_to_http to subscribe to a pubsub stream and write the data to simplequeue.

There are a couple of important properties of these tools with respect to message duplication and delivery. Each of the N clients of a pubsub receive all of the messages published (each message is delivered to all clients), whereas each of the N clients of a simplequeue receive 1 / N of the messages queued (each message is delivered to 1 client). Consequently, when multiple applications need to consume data from a single producer, we set up the following workflow:

old school setup

The producer publishes to pubsub and for each downstream service we set up a dedicated simplequeue with a ps_to_http process to route all messages from the pubsub into the queue. Each service has its own set of “queuereaders” which we scale independently according to the service’s needs.

We used this foundation to process 100s of millions of messages a day. It was the core upon which bitly was built.

This setup had several nice properties:

  • producers are de-coupled from downstream consumers
  • no producer-side single point of failures
  • easy to interact with (all HTTP)

But, it also had its issues…

One is simply the operational overhead/complexity of having to setup and configure the various tools in the chain. Of particular note are the pubsub > ps_to_http links. Given this setup, consuming a stream in a way that avoids SPOFs is a challenge. There are two options, neither of which is ideal:

  1. just put the ps_to_http process on a single box and pray
  2. shard by consuming the full stream but processing only a percentage of it on each host (though this does not resolve the issue of seamless failover)

To make things even more complicated, we needed to repeat this for each stream of data we were interested in.

Also, messages traveling through the system had no delivery guarantee and the responsibility of re-queueing was placed on the client (for instance, if processing fails). This churn increased the potential for situations that result in message loss.

Enter NSQ

NSQ is designed to (in no particular order):

  • provide easy topology solutions that enable high-availability and eliminate SPOFs
  • address the need for stronger message delivery guarantees
  • bound the memory footprint of a single process (by persisting some messages to disk)
  • greatly simplify configuration requirements for producers and consumers
  • provide a straightforward upgrade path
  • improve efficiency

To introduce some NSQ concepts, let’s start off by discussing configuration.

Simplifying Configuration and Administration

A single nsqd instance is designed to handle multiple streams of data at once. Streams are called “topics” and a topic has 1 or more “channels”. Each channel receives a copy of all the messages for a topic. In practice, a channel maps to a downstream service consuming a topic.

Topics and channels all buffer data independently of each other, preventing a slow consumer from causing a backlog for other channels (the same applies at the topic level).

A channel can, and generally does, have multiple clients connected. Assuming all connected clients are in a state where they are ready to receive messages, each message will be delivered to a random client. For example:

nsqd clients

NSQ also includes a helper application, nsqlookupd, which provides a directory service where consumers can lookup the addresses of nsqd instances that provide the topics they are interested in subscribing to. In terms of configuration, this decouples the consumers from the producers (they both individually only need to know where to contact common instances of nsqlookupd, never each other), reducing complexity and maintenance.

At a lower level each nsqd has a long-lived TCP connection to nsqlookupd over which it periodically pushes its state. This data is used to inform which nsqd addresses nsqlookupd will give to consumers. For consumers, an HTTP /lookup endpoint is exposed for polling.

To introduce a new distinct consumer of a topic, simply start up an NSQ client configured with the addresses of your nsqlookupd instances. There are no configuration changes needed to add either new consumers or new publishers, greatly reducing overhead and complexity.

NOTE: in future versions, the heuristic nsqlookupd uses to return addresses could be based on depth, number of connected clients, or other “intelligent” strategies. The current implementation is simply all. Ultimately, the goal is to ensure that all producers are being read from such that depth stays near zero.

It is important to note that the nsqd and nsqlookupd daemons are designed to operate independently, without communication or coordination between siblings.

We also think that it’s really important to have a way to view, introspect, and manage the cluster in aggregate. We built nsqadmin to do this. It provides a web UI to browse the hierarchy of topics/channels/consumers and inspect depth and other key statistics for each layer. Additionally it supports a few administrative commands such as removing and emptying a channel (which is a useful tool when messages in a channel can be safely thrown away in order to bring depth back to 0).

nsqadmin

Straightforward Upgrade Path

This was one of our highest priorities. Our production systems handle a large volume of traffic, all built upon our existing messaging tools, so we needed a way to slowly and methodically upgrade specific parts of our infrastructure with little to no impact.

First, on the message producer side we built nsqd to match simplequeue. Specifically, nsqd exposes an HTTP /put endpoint, just like simplequeue, to POST binary data (with the one caveat that the endpoint takes an additional query parameter specifying the “topic”). Services that wanted to switch to start publishing to nsqd only have to make minor code changes.

Second, we built libraries in both Python and Go that matched the functionality and idioms we had been accustomed to in our existing libraries. This eased the transition on the message consumer side by limiting the code changes to bootstrapping. All business logic remained the same.

Finally, we built utilities to glue old and new components together. These are all available in the examples directory in the repository:

  • nsq_pubsub - expose a pubsub like HTTP interface to topics in an NSQ cluster
  • nsq_to_file - durably write all messages for a given topic to a file
  • nsq_to_http - perform HTTP requests for all messages in a topic to (multiple) endpoints

Eliminating SPOFs

NSQ is designed to be used in a distributed fashion. nsqd clients are connected (over TCP) to all instances providing the specified topic. There are no middle-men, no message brokers, and no SPOFs:

nsq clients

This topology eliminates the need to chain single, aggregated, feeds. Instead you consume directly from all producers. Technically, it doesn’t matter which client connects to which NSQ, as long as there are enough clients connected to all producers to satisfy the volume of messages, you’re guaranteed that all will eventually be processed.

For nsqlookupd, high availability is achieved by running multiple instances. They don’t communicate directly to each other and data is considered eventually consistent. Consumers poll all of their configured nsqlookupd instances and union the responses. Stale, inaccessible, or otherwise faulty nodes don’t grind the system to a halt.

Message Delivery Guarantees

NSQ guarantees that a message will be delivered at least once, though duplicate messages are possible. Consumers should expect this and de-dupe or perform idempotent operations.

This guarantee is enforced as part of the protocol and works as follows (assume the client has successfully connected and subscribed to a topic):

  1. client indicates they are ready to receive messages
  2. NSQ sends a message and temporarily stores the data locally (in the event of re-queue or timeout)
  3. client replies FIN (finish) or REQ (re-queue) indicating success or failure respectively. If client does not reply NSQ will timeout after a configurable duration and automatically re-queue the message)

This ensures that the only edge case that would result in message loss is an unclean shutdown of an nsqd process. In that case, any messages that were in memory (or any buffered writes not flushed to disk) would be lost.

If preventing message loss is of the utmost importance, even this edge case can be mitigated. One solution is to stand up redundant nsqd pairs (on separate hosts) that receive copies of the same portion of messages. Because you’ve written your consumers to be idempotent, doing double-time on these messages has no downstream impact and allows the system to endure any single node failure without losing messages.

The takeaway is that NSQ provides the building blocks to support a variety of production use cases and configurable degrees of durability.

Bounded Memory Footprint

nsqd provides a configuration option --mem-queue-size that will determine the number of messages that are kept in memory for a given queue. If the depth of a queue exceeds this threshold messages are transparently written to disk. This bounds the memory footprint of a given nsqd process to mem-queue-size * #_of_channels_and_topics:

message overflow

Also, an astute observer might have identified that this is a convenient way to gain an even higher guarantee of delivery by setting this value to something low (like 1 or even 0). The disk-backed queue is designed to survive unclean restarts (although messages might be delivered twice).

Also, related to message delivery guarantees, clean shutdowns (by sending a nsqd process the TERM signal) safely persist the messages currently in memory, in-flight, deferred, and in various internal buffers.

Note, a channel whose name ends in the string #ephemeral will not be buffered to disk and will instead drop messages after passing the mem-queue-size. This enables consumers which do not need message guarantees to subscribe to a channel. These ephemeral channels will also not persist after its last client disconnects.

Efficiency

NSQ was designed to communicate over a “memcached-like” command protocol with simple size-prefixed responses. All message data is kept in the core including metadata like number of attempts, timestamps, etc. This eliminates the copying of data back and forth from server to client, an inherent property of the previous toolchain when re-queueing a message. This also simplifies clients as they no longer need to be responsible for maintaining message state.

Also, by reducing configuration complexity, setup and development time is greatly reduced (especially in cases where there are >1 consumers of a topic).

For the data protocol, we made a key design decision that maximizes performance and throughput by pushing data to the client instead of waiting for it to pull. This concept, which we call RDY state, is essentially a form of client-side flow control.

When a client connects to nsqd and subscribes to a channel it is placed in a RDY state of 0. This means that no messages will be sent to the client. When a client is ready to receive messages it sends a command that updates its RDY state to some # it is prepared to handle, say 100. Without any additional commands, 100 messages will be pushed to the client as they are available (each time decrementing the server-side RDY count for that client).

Client libraries are designed to send a command to update RDY count when it reaches ~25% of the configurable max-in-flight setting (and properly account for connections to multiple nsqd instances, dividing appropriately).

nsq protocol

This is a significant performance knob as some downstream systems are able to more-easily batch process messages and benefit greatly from a higher max-in-flight.

Notably, because it is both buffered and push based with the ability to satisfy the need for independent copies of streams (channels), we’ve produced a daemon that behaves like simplequeue and pubsub combined . This is powerful in terms of simplifying the topology of our systems where we would have traditionally maintained the older toolchain discussed above.

Go

We made a strategic decision early on to build the NSQ core in Go. We recently blogged about our use of Go at bitly and alluded to this very project - it might be helpful to browse through that post to get an understanding of our thinking with respect to the language.

Regarding NSQ, Go channels (not to be confused with NSQ channels) and the language’s built in concurrency features are a perfect fit for the internal workings of nsqd. We leverage buffered channels to manage our in memory message queues and seamlessly write overflow to disk.

The standard library makes it easy to write the networking layer and client code. The built in memory and cpu profiling hooks highlight opportunities for optimization and require very little effort to integrate. We also found it really easy to test components in isolation, mock types using interfaces, and iteratively build functionality.

Overall, it’s been a fantastic project to use as an opportunity to really dig into the language and see what it’s capable of on a larger scale. We’ve been extremely happy with our choice to use golang, its performance, and how productive we are using it.

EOL

We’ve been using NSQ in production for several months and we’re excited to share this with the open source community.

Across the 13 services we’ve upgraded, we’re processing ~35,000 messages/second at peak through the cluster. It has proved both performant and stable and made our lives easier operating our production systems.

There is more work to be done though — so far we’ve converted ~40% of our infrastructure. Fortunately, the upgrade process has been straightforward and well worth the short-term time tradeoff.

We’re really curious to hear what you think, so grab the source from github and try it out.

Finally, this labor of love began as scratching an itch — bitly provided an environment to experiment, build, and open source it… we’re always hiring.

Static Analysis, Syntax Checking, Code Formatting

Nothing describes the need for consistent code style more than this quote from Martin Fowler: “Any fool can write code that a computer can understand. Good programmers write code that humans can understand.”

I would amend that quote though, because in reality it’s hard to write good code for computers to understand, too. That is where tools that perform Static Analysis and Syntax Checking come into play. They both help track down and identify classes of problems that might otherwise go undetected, resulting in production issues.

We want to share a few specific and practical ways we perform Static Analysis and produce consistently styled code at bitly.

pyflakes and pep8 for Python

For python, we like to run our code through pyflakes. There are a few different versions of pyflakes floating around, but we use this one because it’s been rewritten to use AST so it’s fast, and it has output we like for scanning a whole directory tree.

Pyflakes for us looks like this

$ pyflakes --quiet
.................F........

$ pyflakes
./graphite/conf/graphite_settings.py:23: redefinition of unused 
    'rrdtool' from line 21

There are ways to plug this type of check into various editors. Some of us use editor plugins for Textmate, Sublime to give feedback immediately during development.

Pyflakes is good at highlighting syntax issues, but it doesn’t help resolve inconsistent python style. Do you put whitespace around operators? Do you inline your if statements? Tabs or spaces? To address some of these, we use a version of pep8 which has the ability to automatically apply consistent formatting.

$ pep8 --ignore=W293,W391 --max-line-length=120 .
./deploy_ui.py:95:66: W291 trailing whitespace
./deploy_ui.py:253:62: E225 missing whitespace around operator
./deploy_ui.py:257:5: E303 too many blank lines (2>1)
./settings.py:7:10: E203 whitespace before ':'

$ pep8 --fix --inplace settings.py
settings.py:9:80: E501 line too long (121 characters)
settings.py:16:7: W291 trailing whitespace
settings.py:7:10: E203 whitespace before ':'
settings.py:9:54: E261 at least two spaces before inline comment
settings.py:50:1: E302 expected 2 blank lines, found 1
 - pep8 fix: Writing changes to settings.py
 - pep8 fix: Applying fix for E302 at line 50, offset 0
 - pep8 fix: Applying fix for E261 at line 45, offset 34
 - pep8 fix: Applying fix for E203 at line 45, offset 15
 - pep8 fix: Applying fix for E203 at line 44, offset 26
 - pep8 fix: Applying fix for E261 at line 43, offset 45
 [truncated]

Bash Syntax Checking

Bash has a built in way to syntax check a file by running bash -n $file. We wrap this in a short script to give us similar usage to pyflakes. If all goes well, and there are no errors, it’s one dot per file checked. The syntax checking isn’t as extensive as pyflakes, but it beats finding out in the middle of a production maintenance that the one script you needed to work has a syntax error.

$ test_shell_scripts.sh --quiet
.......................................F...................

$ test_shell_scripts.sh 
./test.sh
./test.sh: line 7: unexpected EOF while looking for matching `)'
./test.sh: line 245: syntax error: unexpected end of file

Go Syntax Checking

We have written previously about our experience adopting golang as a primary language at bitly. One of the golang choices we really like is directly related to code formatting. The Go core developers made a choice to preempt coding style wars by designing a formatting tool go fmt as part of the language distribution.

Additionally, things that many languages list as warning (or just swallow completely) are complier errors in Go. Examples of errors in Go are: unused variables, undefined variables, missing imports, mixing types, etc. These issues are raised by the compiler and will keep code from being compiled. To this end, the language itself enforces static analysis and syntax checking at compile time, which is nice because we don’t need any extra tools.

C Style with astyle

For C code we don’t use a static analysis tool (we use dynamic tools like valgrind), but we do programmatically apply One True Brace Style with astyle. (We suggest a recent version of astyle from trunk; it has bug fixes we found useful). Our specific astyle syntax is applied with the following command

$ astyle --style=1tbs \
    --lineend=linux \
    --convert-tabs \
    --preserve-date \
    --fill-empty-lines \
    --pad-header \
    --indent-switches \
    --align-pointer=name \
    --align-reference=name \
    --pad-oper \
    --suffix=none \
    $file

Javascript with Closure Compiler

We use Google’s Closure Compiler to prepare our Javascript files for production use. In addition to concatenating and compressing our Javascript, the Closure Compiler also provides syntax and type checking, variable reference checking, and warnings about common Javascript pitfalls:

$ java -jar closure-compiler.jar \
    --js input.js \
    --js_output_file output-min.js
input.js:83: WARNING - Suspicious code. This code lacks side-effects.
        Is there a bug?
        if(!data.mode) { data.mode === "save"; }
input.js:1150: ERROR - Parse error. Internet Explorer has a
        non-standard intepretation of trailing commas. Arrays will
        have the wrong length and objects will not parse at all.

EOL

If this sounds like how you would like to write code, bitly is hiring.

Go Go Gadget

bitly’s stack isn’t fancy. We use a combination of languages and tools that interact well with each other and promote philosophies such as explicitness, simplicity, and robustness. Most of our services are Python with many core components in C (of which some are open sourced under simplehttp). We believe whole-heartedly in a services oriented approach where components do one thing and do it well.

C is a fantastic language, one which I highly encourage all developers to learn and experiment with as it pulls away the veil and provides insight into all the things that are happening behind those seemingly innocent lines of a scripting language. With great power comes great responsibility and sometimes that power comes at a cost – primarily in terms of development and debugging time and thus the time it takes for you to ship your product.

Python has also treated us well. Its readability, standard library, and obviousness make it a great language to work with on a team. It performs well and is flexible to the point where it can be used successfully in most any situation – whether it be data science, network daemons, or one off shell scripts.

Enter Go

We identified early on that Go had all the makings of a language that could supersede some of the places we would have traditionally turned to C and some of the places where we wanted to move away from Python.

For example, we’ve built many network daemons in C that don’t require strict, absolute, control over memory. This is where Go shines. The “cost” of a garbage collected language in these types of situations is overshadowed by the huge benefit to development time and flexibility. Code becomes more readable as a natural side effect of being able to focus more on the “meat” of the problem instead of having to worry about low level details. Mix in the “batteries included” standard library and you’re cookin’.

Also, all of our C apps are built on libevent - i.e. a single-threaded event loop with callbacks that enables us to efficiently handle many thousands of simultaneous open connections. We use the same technique in Python via tornado, but the amount of work a single Python process can do is limited largely by runtime overhead. Additionally, this style of code can be hard to read and debug when you’re not used to it. Go’s lightweight concurrency model allows you to write in an imperative style while providing a built-in way to leverage the multi-core architecture of the computers we operate on.

Just as in C, Go promotes a style of very explicit and localized error handling. While sometimes verbose it forces a developer to really think about how their program might fail and what to do about it. Unlike C, Go has the built-in capability of returning multiple values from a function, resulting in clearer and more concise error-handling code.

The points above are mostly low-level. Go’s true power comes from the fact that the language is small and the standard library is large. We’ve had developers start from zero experience with Go to writing working, production, code in a day. That is why we’re so excited about its place in our stack.

Think about it like this… if you can write something in Go just as fast as you could in Python and:

  • gain the speed and robustness of a compiled, statically typed language without all of the rope to hang yourself
  • clearly express concurrent solutions to parallelizable problems
  • sacrifice little to nothing in terms of functionality
  • unambiguously produce consistently styled code (thank you go fmt)

What would you choose?

And in an effort to encourage the team to experiment and learn about Go we created the “Go Gopher Award”! The most recent engineer to learn the language and get code into production gets to have the Go Gopher Squishable sit with them at their desk. This is an extremely prestigious award.

Go Isn’t Perfect

Let’s talk about garbage collection. It isn’t free. Go’s garbage collector is a conservative, stop-the-world, mark-and-sweep GC. Practically speaking this means that the more careless you are in what, how many, and how often you allocate the longer the world stops when the Go runtime cleans things up. Unsurprisingly, significant performance gains can be had by being more careful… not only in raw speed but in the consistency of it. Fortunately Go provides fantastic hooks (pprof) to introspect the runtime behavior of your application. It’s generally obvious when your bottleneck is the GC.

Interacting with JSON can be a bit clumsy. It’s not terrible… just isn’t as elegant and readable as the alternative in Python. Truthfully it’s quite impressive that it’s as good as it is considering the type safety of the language. We’ve made an effort to make this even easier and open sourced simplejson, a package that exposes a clean, chainable, API for interacting with JSON where the format may not be known ahead of time.

Packaging has also given us some trouble. This is an area where we’ve probably gone the most back-and-forth and are admittedly still in the early stages of finding what works for us. The biggest problem is somewhat related to its purported greatest strength, i.e. qualifying the full path when importing an open-source package. This has led to a whole crop of third-party utilities that work around this by re-writing fully qualified import paths to something the user specifies (and installing the package as the same, modified, name). We use go-install-as and our policy is to install non-standard packages under our own custom import paths to ensure that our apps are always compiled against the version and source we’ve strictly enforced to be installed on a machine.

One notable feature missing from the HTTP client in Go’s standard library is the ability to set timeouts. It is further complicated by the inability of the API to provide easy access to the connection (to do it yourself). We set out to bridge the gap and wrote go-httpclient (with the expectation that these features will eventually make it into the standard library).

Oh, and one night after way too much drinking they decided to use a radical syntax for formatting time as strings (instead of your standard strftime parameters). We open sourced go-strftime to remedy that.

All in all these “issues” are minor and ultimately we felt that the benefits far outweigh the tradeoffs – Go meets our needs and philosophies as software engineers.

Go In Production

Where are we actually using Go in production? Let’s take a step back and talk about how data moves through our infrastructure first.

In the spirit of keeping things simple, fault tolerant, and asynchronous, most everything that gets written to downstream systems is done so via message queues (formatted in JSON with all communication over HTTP). Some of these message queues process volumes in the thousands per second. We call the workers queuereaders, all of which were originally written in Python. At the lower level are two C apps – simplequeue, an HTTP interface to an in-memory data agnostic queue, and pubsub, an HTTP interface for one-to-many streaming.

As volume increases, the overhead of processing each message becomes more significant. Therefore, these applications emerged as one key area where Go could replace Python. We started by re-writing some queuereaders in Go by building matching libraries to support the same API we expose in Python. Also, whereas in Python we would run multiple processes, in Go we leverage channels and goroutines to write straightforward code that parallelizes HTTP requests to downstream systems or aggregates writes to disk. These same features make it really easy to batch retrieve messages from the queue while still processing them independently, reducing round-trips.

By experimenting with non-critical systems at the edge of our infrastructure we were able to safely and methodically learn about Go’s behavior in production. Needless to say, we’ve been very happy. We’ve seen consistent, measurable performance gains without sacrificing stability.

We also wanted a bit more functionality out of the core combination of simplequeue and pubsub, so we decided to build a successor with the goal of improving message delivery guarantees, maintaining the spirit and simplicity of what we already had, increasing performance, solving some configuration and setup issues, and providing a straightforward upgrade path.

We realized Go would be perfect for this project. We’ll talk about it in depth in a future blog post (and don’t worry, it will be open source). Is it any good? Yes.

EOL

Finally, we’ve also open sourced a few other Go related items…

  • go-notify - a package to standardize one-to-many communication over channels
  • file2http - a utility to spray a line-oriented file at an HTTP endpoint

As always, bitly is hiring.

dablooms - an open source, scalable, counting bloom filter library

Overview

This project aims to demonstrate a novel bloom filter implementation that can scale, and provide not only the addition of new members, but reliable removal of existing members.

Motivation

bitly has billions and billions of links and an infrastructure that serves thousands of decode requests per second, i.e. redirect the user from bit.ly/short to the long.destination.url.com/with/path. Users not only expect this redirect to be fast, but they also trust bitly to not redirect them to a malicious website. Unfortunately, a subset of destination urls are malicious, and we identify and mark them as spam via automated, realtime analysis. This spam dataset is constantly changing with an increasing rate of additions and a small number of deletions. In order to maintain performance and provide a warning to users who might be going to a spam website, we have to keep the content of the spam dataset in resident memory for each decode request. This is complicated by the fact that the system is designed to support both domain and path roll up, meaning we can block an entire domain, subdomain, or top-level path (or any combination thereof) thus marking all children of that parent as spam. Previously, we had implemented this as a simple hashtable with existence in the table equating to membership in the spam dataset. This approach was simple and performant, but not very memory efficient. On the occasions when resident memory was cleared, say on a restart, it was necessary to reconstruct the dataset from the database. This made restarts painfully long during the dataset reconstruction, and the hashtable took up considerable amounts of memory, 2.5 gigabytes to be exact.

We began exploring bloom filters as a possible solution to our spam problems. Despite the many bloom filter variations out there - counting, compact, inverse, scalable, and bloomier - none of them met our criteria entirely. We needed the ability to scale and persist reliably while allowing for the removal of elements. It appeared that only a mixture of scalable and counting (elaborated on below) would solve our problems.

Bloom Filter Basics

Bloom filters are probabilistic data structures that provide space-efficient storage of elements at the cost of occasional false positives on membership queries, i.e. a bloom filter may state true on query when it in fact does not contain said element. A bloom filter is traditionally implemented as an array of M bits, where M is the size of the bloom filter. On initialization all bits are set to zero. A filter is also parameterized by a constant k that defines the number of hash functions used to set and test bits in the filter. Each hash function should output one index in M. When inserting an element x into the filter, the bits in the k indices h1(x), h2(x), ..., hk(X) are set.

In order to query a bloom filter, say for element x, it suffices to verify if all bits in indices h1(x), h2(x), ..., hk(x) are set. If one or more of these bits is not set then the queried element is definitely not present in the filter. However, if all these bits are set, then the element is considered to be in the filter. Given this procedure, an error probability exists for positive matches, since the tested indices might have been set by the insertion of other elements.

Counting Bloom Filters: Solving Removals

The same property that results in false positives also makes it difficult to remove an element from the filter as there is no easy means of discerning if another element is hashed to the same bit. Unsetting a bit that is hashed by multiple elements can cause false negatives. Using a counter, instead of a bit, can circumvent this issue. The bit can be incremented when an element is hashed to a given location, and decremented upon removal. Membership queries rely on whether a given counter is greater than zero. This reduces the exceptional space-efficiency provided by the standard bloom filter.

Scalable Bloom Filters: Solving Scale

Another important property of a bloom filter is its linear relationship between size and storage capacity. If given a maximum acceptable false positive ratio, it is straightforward to determine how much space is needed.

If the maximum allowable error probability and the number of elements to store are both known, it is relatively straightforward to dimension an appropriate filter. However, it is not always possible to know how many elements will need to be stored a priori. There is a trade off between over-dimensioning filters or suffering from a ballooning error probability as it fills.

Almeida, Baquero, Preguiça, Hutchison published a paper in 2006, on Scalable Bloom Filters, which suggested a means of scalable bloom filters by creating essentially a list of bloom filters that act as one large bloom filter. When greater capacity is desired, a new filter is added to the list.

Membership queries are conducted on each filter with the positives evaluated if the element is found in any one of the filters. Naively, this leads to an increasing compounding error probability since the probability of the given structure evaluates to:

1 - 𝚺(1 - P)

It is possible to bound this error probability by adding a reducing tightening ratio, r. As a result, the bounded error probability is represented as:

1 - 𝚺(1 - P0 * r^i) where r is chosen as 0 < r < 1

Since size is simply a function of an error probability and capacity, any array of growth functions can be applied to scale the size of the bloom filter as necessary. We found it sufficient to pick .9 for r.

Problems with Mixing Scalable and Counting Bloom Filters

Scalable bloom filters do not allow for the removal of elements from the filter. In addition, simply converting each bloom filter in a scalable bloom filter into a counting filter also poses problems. Since an element can be in any filter, and bloom filters inherently allow for false positives, a given element may appear to be in two or more filters. If an element is inadvertently removed from a filter which did not contain it, it would introduce the possibility of false negatives.

If however, an element can be removed from the correct filter, it maintains the integrity of said filter, i.e. prevents the possibility of false negatives. Thus, a scaling, counting, bloom filter is possible if upon additions and deletions one can correctly decide which bloom filter contains the element.

There are several advantages to using a bloom filters. A bloom filter gives us cheap, memory efficient set operations, with no actual data stored about the given element. Rather, bloom filters allow us to test, with some given error probability, the membership of an item. This leads to the conclusion that the majority of operations performed on bloom filters are the queries of membership, rather than the addition and removal of elements. Thus, for a scaling, counting, bloom filter, we can optimize for membership queries at the expense of additions and removals. This expense comes not in performance, but in the addition of more metadata concerning an element and its relation to the bloom filter. With the addition of some sort of identification of an element, which does not need to be unique as long as it is fairly distributed, it is possible to correctly determine which filter an element belongs to, thereby, Hence, maintaining the integrity of a given bloom filter with accurate additions and removals.

Enter dablooms

dablooms is one such implementation of a scaling, counting, bloom filter that takes additional metadata during additions and deletions in the form of a monotonically increasing integer to classify elements such as a timestamp. This is used during additions/removals to easily classify an element into the correct bloom filter (essentially a comparison against a range).

dablooms is designed to scale itself using these monotonically increasing identifiers and the given capacity. When a bloom filter is at capacity, dablooms will create a new bloom filter using the to-be-added elements identifier as the beginning identifier for the new bloom filter. Given the fact that the identifiers monotonically increase, new elements will be added to the newest bloom filter. Note, in theory and as implemented, nothing prevents one from adding an element to any “older” filter. You just run the increasing risk of the error probability growing beyond the bound as it becomes “overfilled”.

You can then remove any element from any bloom filter using the identifier to intelligently pick which bloom filter to remove from. Consequently, as you continue to remove elements from bloom filters that you are not continuing to add to, these bloom filters will become more accurate.

For performance, the low-level operations are implemented in C. It is also memory mapped which provides async flushing and persistence at low cost. In an effort to maintain memory efficiency, rather than using integers, or even whole bytes as counters, we use only four bit counters. These four bit counters allow up to 15 items to share a counter in the map. If more than a small handful are sharing said counter, the bloom filter would be overloaded (resulting in excessive false positives anyway) at any sane error rate, so there is no benefit in supporting larger counters.

The bloom filter also employees change sequence numbers to track operations performed on bloom filter. These allow us to determine failed writes, leaving us with a ‘dirty’ filter where an element is only partially written. Upon restart, this information allows us to determine if a filter is clean and thus can just load it into memory locally rather than having to rebuild the filter. The counters also provide a means for us to identify a position at which the bloom filter is valid in order to replay operations to “catch up”.

EOL

For our spam dataset at scale, initial tests have shown a significant reduction - 98.8% in resident memory usage on each machine using dablooms while maintaining a rigid performance threshold. For our use of an identifier, we simply use timestamps for when the spam link was added and query our database on removals for the insertion timestamp.

We have also decided to open source this project, so fork it and submit patches at the github repo.

If this sounds like a fun project to use or to work on, bitly is hiring, including interns!

Debugging a Specialized Database Cache

This blog post is a tale of interesting patterns in system and application metrics, optimizing memory use, and debugging. Before we get into that stuff, it helps to be familiar with the application at the center of it.

Introduction to the Network view

A significant new feature of Bitly’s recent consumer relaunch is the ability to browse links shortened by all your connections on Facebook and Twitter, on a page titled “Your Network”. Internally we call this service “network history”. Looking up all your connections and all their links everytime you view the “Network” page is, due to Bitly’s scale, a non-trivial problem. Instead we structured the system to do the hard work at insertion time, to make the read side performant. In a minute, we’ll dig into how exactly we tackled that problem.

We use a key-value store (leveldb via simpleleveldb) where the key is the username and the value is the list of bitly links created by their connections. Such an arrangement is very efficient in terms of storage, memory, and cpu usage. However, based on the number of connections a user has, the number of updates we have to perform could be significant (thousands) per new Bitly link. We needed a way to gain control over the number of writes to our persistent datastore (and thus disk seeks) and provide the performance needed to handle many thousands of inserts per second - enter nhproxy.

nhproxy mechanics

nhproxy, the caching proxy to the network history database, maintains in-memory:

  • a hash-table of all users for which it is caching network history
  • a hash-table of all links in users’ network history
  • attached to each user, a list of pointers to links, which is the network history

The actual link metadata is stored in a separate global hash-table so that it is not duplicated when multiple users have the same link in their history. Each user only stores a pointer into the global data structure.

nhproxy serves two main request types:

  • get requests specify a username to retrieve the full Network History for. If the user’s network history is already in memory it is quickly returned, otherwise it is fetched from the database and loaded into memory and then returned.

  • add requests specify a bitly link and a list of all the users for which the link will appear (all connections of the creator of the link). All users who aren’t already loaded into memory are fetched from the database and loaded into memory, and then a reference to the link is added to all their network history lists.

There are simple strategies for:

  • "pruning" old network history list elements when the list grows beyond a set maximum
  • saving a user’s changed network history to the database after a while
  • discarding the user’s record and history from memory to keep the total number of users in the cache below another set maximum.

It’s interesting to note that the longer one delays saving a user’s changed network history to the persistent datastore, the more changes it accumulates per save, and the lower the overall rate of writes to the store. One can control that rate of writes by adjusting the delay. Don’t worry about what exactly these strategies are, as I’ll be focussing on other aspects of nhproxy in this article.

nhproxy memory load

In the initial deployed state, the memory usage of a box running both nhproxy and the database looked like this:

memory graph 1

The reason it fell each night before exhausting all memory is because we cleanly terminated and restarted nhproxy with a cron job. nhproxy saves and reloads everything it’s caching when it restarts, so no un-flushed changes are lost. Still, we would much rather not need to have it regularly restarted. Also, notice that the data in nhproxy just before exiting and just after starting again is exactly the same (besides change timestamps the flushing logic uses), but uses up very different amounts of physical memory.

There had already been some testing and analysis with valgrind; it wasn’t a simple memory leak in nhproxy. One theory was that the problem was fragmentation: the multitude of small memory regions allocated and deallocated left awkwardly sized regions free that were too small to be reused. We considered trying an alternative allocator like jemalloc which has fancier strategies for dealing with fragmentation, but decided to try some less drastic measures first.

After a bit of thought I realized that many of the small regions we allocated were dominated by the overhead of malloc itself - the extra space from rounding the allocated region up to a well-aligned size, and the metadata of the region size (typically prepended to the region). This most likely wouldn’t fix the unbounded memory growth problem, but I couldn’t resist first going after some low-hanging fruit to make overall memory use and fragmentation significantly better in the short term.

Initially, the data in memory was represented by structs that looked something like this:

struct user {
    char *username;
    struct linked_list *history;
    // plus a few miscellaneous flags and counters
};

struct linked_list {
    struct linked_list *next;
    struct linked_list *prev;
    struct bitly_link *link;
};

struct bitly_link {
    char *username;
    char *hash;
    int ref_count;
    // plus a few miscellaneous flags and counters
};

That’s a simplification, but what’s accurate is that there was a separate allocation for each username, each bitly link hash, and each link reference in a history list. That is quite a lot of small allocations, with a lot of overhead. For example, each bitly_link hash is (currently) 6 characters, so we malloc()ed 7 bytes to store the string and null terminator, which malloc() rounded up to 8 bytes, and then prepended with the length in another 8 bytes, to end up as a 16 byte allocation. Also, I suspected that that the variable size of the username allocation was a significant cause of fragmentation.

So I refactored the code to use structures like this:

struct user {
    char username[MAX_USERNAME_LEN+1];
    struct bitly_link *history[MAX_HISTORY_LEN];
    int history_count;
    // plus a few miscellaneous flags and counters
};

struct bitly_link {
    char username[MAX_USERNAME_LEN+1];
    char hash[MAX_HASH_LEN+1];
    int ref_count;
    // plus a few miscellaneous flags and counters
};

Notice that, with the (minimum) 8-byte malloc() overhead, on 64-bit systems such as we use, a history list any size over 1/4 the maximum uses less space in the new design: an array of 1000 8-byte pointers in the new design, vs 4*8 bytes per link in the old design. In practice, most users in the nhproxy cache have full history lists (maintained at that length by pruning when they grow longer).

Satisfyingly, cpu usage was much improved, because all those extra malloc() and free() calls in the original design were significant. Here’s what it looked like before:

cpu graph 1

and here’s what it looked like after:

cpu graph 2

(that’s 100% per core)

But what we really cared about was memory use:

memory graph 2

That’s much lower, but still grew until exhausting all memory, and it only took 3 times longer to do so. Until the real problem was solved, at least it was an improvement, right?

nhproxy history list bug

Unfortunately, we couldn’t roll it out to production yet, because we noticed this other graph:

counters graph

We had a fair number of what we thought were pretty good tests, and the new version of nhproxy passed them all… but why on earth did the number of bitly links in the cache, and the number of references to them in history lists, gravitate to 2/3 of what they used to be? Fortunately, having solid instrumentation into the runtime behavior of the application enabled us to spot this discrepancy. We had theories about bugs in the history pruning strategy, but couldn’t find anything, so we had to write a tool to compare the state of the original system still running in production, and the new version we were testing and staging, which was getting the same add requests, but not servicing any of the get requests.

We couldn’t compare just what was cached in the nhproxy, because we had actually changed the user flush and drop strategy, and because even if the strategies were the same, the lack of get requests on the new system being staged would have made the list of users in nhproxy different. So I wrote a tool that took a list of user names, made get requests to both systems, compared the results, and output differences, with various options. We were hoping to see some pattern in which links were lost from which network history lists - maybe relatively old or new links? We looked in request logs to get some users with very new changes, or hour-old changes, or older, and fed these usernames to the tool.

Here’s some sample output:

login <user> common= 998   0_only=   0   1_only=   0   difference=  0%
login <user> common= 783   0_only=   0   1_only=   0   difference=  0%
...
login <user> common= 301   0_only= 315   1_only= 287   difference= 50%
login <user> common=1000   0_only=   0   1_only=   0   difference=  0%
Total successes=192 failures=3

We didn’t notice any clear patterns in the users, links, or ages. But we did notice that the loss wasn’t evenly distributed - some users matched completely between the systems, and others were over 50% different. For a few of the very different users, we manually queried and compared the caches and the backend databases, trying to find a clue. Finally, Jehiah noticed that the backend database was missing many list items for a user that the cache had, even after the user was flushed. We had no test that caused a user’s list to be flushed, then dropped, then re-fetched, so I added one. Soon after, snakes spotted the extra “i++” in the flush routine, iterating over the history list, introduced when a while-loop that already had an “i++” was changed to a for-loop in the memory-saving struct refactor - we were skipping over every other link and not writing it to the persistent datastore. It’s funny how much time such a little bug can take to track down, but our tests and tools are better for it.

libevent memory leak

At that point, nhproxy was much more efficient, and at least as correct as the original version, but it still had the unbounded memory use which becomes problematic after only 3 and a half days or so. When I wrote above that valgrind reported no memory leaks, I wasn’t being completely honest - it reported no memory leaks if one was careful to only use HTTP/1.0 requests without keep-alive, or HTTP/1.1 requests with Connection: close, when exercising or querying the nhproxy. There was a known leak with keep-alive requests, but it wasn’t attacked earlier because:

  • while our internal services do often hold persistent connections, they rarely close them, and we didn’t think we experienced this effect to a degree significant enough to devote resources to solving it
  • libevent is a sensitive and core part of our infrastructure, so we didn’t want to migrate to libevent-2.0.x right at that moment, nor did we want to maintain our own branch of libevent-1.4.x

Simultaneously, we were aware of a problem related to how heap memory was freed back to the OS, which we had already seen in other applications. Memory use would grow due to a large number of small allocations (using the glibc provided malloc()) caused by a temporary high load, but when the load fell, the memory use wouldn’t significantly fall back. Even if valgrind reported no real leaks, and even if load fell back to effectively zero, the OS could still report that the process was using a huge amount of private memory (RSS). We confirmed through some tests that a single tiny allocation on the top of the heap could prevent many gigabytes of contiguous free space from being released back to the OS, and when that top allocation was free()d, the rest would finally go.

Between the occasional tiny leak and the significant fragmentation, this high-water-mark effect could be a real problem for nhproxy. jemalloc would probably help, but at this point I really wanted to get rid of the leak. I wanted the output of memory use analysis tools like valgrind to be easier to use, and I didn’t want to wonder how much of an effect the leak was really having.

I started working with the original libevent-1.4.14b source, building and installing the shared library in a custom local path, and running nhproxy with it using LD_LIBRARY_PATH.

libevent uses a lot of callbacks of function pointers, and evhttp abstracts outgoing and incoming http connections to share many functions between them, so I started by putting in some useful printf()s to familiarize myself with the flow of the code in a simple minimal use. I used curl with and without the --http1.0 argument to exercise nhproxy.

Valgrind reports that the root of the leak is a struct evhttp_request allocated in evhttp_associate_new_request_with_connection(), which makes sense because that’s where all evhttp_requests are allocated. I eventually traced the exceptional case through

  1. evhttp_associate_new_request_with_connection()
  2. evhttp_request_new()
  3. evhttp_start_read()
  4. evhttp_read() (callback set in evhttp_start_read())
  5. evhttp_connection_done()
  6. evhttp_handle_request() (the (*req_cb)())
  7. evhttp_connection_fail()
  8. evhttp_connection_incoming_fail()
  9. evhttp_connection_free()

The very interesting bit is that evhttp_connection_incoming_fail() seemed to intentionally leak the evhttp_request, preventing evhttp_connection_free() from freeing it, with this comment:

/* 
 * these are cases in which we probably should just
 * close the connection and not send a reply.  this
 * case may happen when a browser keeps a persistent
 * connection open and we timeout on the read.  when
 * the request is still being used for sending, we
 * need to disassociated it from the connection here.
 */
if (!req->userdone) {
        /* remove it so that it will not be freed */
        TAILQ_REMOVE(&req->evcon->requests, req, next);
        /* indicate that this request no longer has a
         * connection object
         */
        req->evcon = NULL;
}

I’m still not sure exactly why that case is needed. But I did know that if evhttp_handle_request() is called with req->evcon->state == EVCON_DISCONNECTED, then the user and the request is done. My minimal fix was

@@ -2231,6 +2231,7 @@ evhttp_handle_request(struct evhttp_requ
    if (req->uri == NULL) {
        event_debug(("%s: bad request", __func__));
        if (req->evcon->state == EVCON_DISCONNECTED) {
+           req->userdone = 1;
            evhttp_connection_fail(req->evcon, EVCON_HTTP_EOF);
        } else {
            event_debug(("%s: sending error", __func__));

In our testing, that didn’t result in any use-after-free crashes or unusual failed requests. This is what memory use looked like under real load with that fix:

memory graph 3

That seemed to do it. We weren’t expecting the keep-alive connection leak to be that significant, but apparently it was. (The memory ramp up before it stabilizes is fragmentation of object allocations in memory approaching a maximum before gaps formed are large enough to be reused.)

Conclusion

Graphs of system and application metrics can reveal unexpected behaviors that reflect bugs. Create tools to help you investigate issues if you need them. Tiny fixes can have huge effects.