Building NSQ Client Libraries
Brace yourself, this is a long one.
The following guide was originally intended for client library developers to describe in detail all the important features and functionality we expected in an NSQ client library.
While writing it we began to realize that it had value beyond client library developers. It incorporates a comprehensive analysis of most of the capabilities of NSQ (both client and server) and is therefore interesting and useful for end-users as well (or anyone using or interested in infrastructure messaging platforms).
If you need some background on NSQ please see our original blog post or its follow up, spray some NSQ on it.
Intro
NSQ’s design pushes a lot of responsibility onto client libraries in order to maintain overall cluster robustness and performance.
This guide attempts to outline the various responsibilities well-behaved client libraries need to
fulfill. Because publishing to nsqd is trivial (just an HTTP POST to the /put endpoint), this
document focuses on consumers.
By setting these expectations we hope to provide a foundation for achieving consistency across languages for NSQ users.
Overview
- Configuration
- Discovery (optional)
- Connection Handling
- Feature Negotiation
- Data Flow / Heartbeats
- Message Handling
- RDY State
- Backoff
Configuration
At a high level, our philosophy with respect to configuration is to design the system to have the flexibility to support different workloads, use sane defaults that run well “out of the box”, and minimize the number of dials.
A client subscribes to a topic on a channel over a TCP connection to nsqd instance(s). You can
only subscribe to one topic per connection so multiple topic consumption needs to be structured
accordingly.
Using nsqlookupd for discovery is optional so client libraries should support a configuration
where a client connects directly to one or more nsqd instances or where it is configured to poll
one or more nsqlookupd instances. When a client is configured to poll nsqlookupd the polling
interval should be configurable. Additionally, because typical deployments of NSQ are in distributed
environments with many producers and consumers, the client library should automatically add jitter
based on a random % of the configured value. This will help avoid a thundering herd of connections.
For more detail see Discovery.
An important performance knob for clients is the number of messages it can receive before nsqd
expects a response. This pipelining facilitates buffered, batched, and asynchronous message
handling. By convention this value is called max_in_flight and it effects how RDY state is
managed. For more detail see RDY State.
Being a system that is designed to gracefully handle failure, client libraries are expected to implement retry handling for failed messages and provide options for bounding that behavior in terms of number of attempts per message. For more detail see Message Handling.
Relatedly, when message processing fails, the client library is expected to automatically handle
re-queueing the message. NSQ supports sending a delay along with the REQ command. Client libraries
are expected to provide options for what this delay should be set to initially (for the first
failure) and how it should change for subsequent failures. For more detail see Backoff.
Most importantly, the client library should support some method of configuring callback handlers for message processing. The signature of these callbacks should be simple, typically accepting a single parameter (an instance of a “message object”).
Discovery
An important component of NSQ is nsqlookupd, which provides a discovery service for consumers to
locate producers of a given topic at runtime.
Although optional, using nsqlookupd greatly reduces the amount of configuration required to
maintain and scale a large distributed NSQ cluster.
When a client uses nsqlookupd for discovery, the client library should manage the process of
polling all nsqlookupd instances for an up-to-date set of nsqd producers and should manage the
connections to those producers.
Querying an nsqlookupd instance is straightforward. Perform an HTTP request to the lookup endpoint
with a query parameter of the topic the client is attempting to discover (i.e.
/lookup?topic=clicks). The response format is JSON:
Example JSON Response From nsqlookupd
The broadcast_address and tcp_port should be used to connect to an nsqd producer. Because, by
design, nsqlookupd instances don’t coordinate their lists of producers, the client library should
union the lists it received from all nsqlookupd queries to build the final list of nsqd
producers to connect to. The broadcast_address:tcp_port combination should be used as the unique
key for this union.
A periodic timer should be used to repeatedly poll the configured nsqlookupd so that clients will
automatically discover new producers. The client library should automatically initiate connections
to all newly found nsqd producers.
When client library execution begins it should bootstrap this polling process by kicking off an
initial set of requests to the configured nsqlookupd instances.
Connection Handling
Once a client has an nsqd producer to connect to (via discovery or manual configuration), it
should open a TCP connection to broadcast_address:port. A separate TCP connection should be made
to each nsqd for each topic the client wants to consume.
When connecting to an nsqd instance, the client library should send the following data, in order:
- the magic identifier
- an
IDENTIFYcommand (and payload) and read/verify response - a
SUBcommand (specifying desired topic) and read/verify response - an initial
RDYcount of 1 (see RDY State).
(low-level details on the protocol are available in the spec)
Reconnection
Client libraries should automatically handle reconnection as follows:
If the client is configured with a specific list of
nsqdinstances, reconnection should be handled by delaying the retry attempt in an exponential backoff manner (i.e. try to reconnect in 8s, 16s, 32s, etc., up to a max).If the client is configured to discover instances via
nsqlookupd, reconnection should be handled automatically based on the polling interval (i.e. if a client disconnects from annsqd, the client library should only attempt to reconnect if that instance is discovered by a subsequentnsqlookupdpolling round). This ensures that clients can learn about producers that are introduced to the topology and ones that are removed (or failed).
Feature Negotiation
The IDENTIFY command can be used to set nsqd side metadata, modify client settings,
and negotiate features. It satisfies two needs:
- In certain cases a client would like to modify how
nsqdinteracts with it (currently this is limited to modifying a client’s heartbeat interval but you can imagine this could evolve to include enabling compression, TLS, output buffering, etc.) nsqdresponds to theIDENTIFYcommand with a JSON payload that includes important server side configuration values that the client should respect while interacting with the instance.
After connecting, based on the user’s configuration, a client library should send an IDENTIFY
command (the body of an IDENTIFY command is a JSON payload), e.g:
Example IDENTIFY Command JSON Payload
The feature_negotiation field indicates that the client can accept a JSON payload in return. The
short_id and long_id are arbitrary text fields that are used by nsqd (and nsqadmin) to
identify clients. heartbeat_interval configures the interval between heartbeats on a per-client
basis.
The nsqd will respond OK if it does not support feature negotiation (introduced in nsqd
v0.2.20+), otherwise:
Example IDENTIFY Response JSON Payload
More detail on the use of the max_rdy_count field is in the RDY State section.
Data Flow and Heartbeats
Once a client is in a subscribed state, data flow in the NSQ protocol is asynchronous. For consumers, this means that in order to build truly robust and performant client libraries they should be structured using asynchronous network IO loops and/or “threads” (the scare quotes are used to represent both OS-level threads and userland threads, like coroutines).
Additionally clients are expected to respond to periodic heartbeats from the nsqd instances
they’re connected to. By default this happens at 30 second intervals. The client can respond with
any command but, by convention, it’s easiest to simply respond with a NOP whenever a heartbeat
is received. See the protocol spec for specifics on how to identify heartbeats.
A “thread” should be dedicated to reading data off the TCP socket, unpacking the data from the frame, and performing the multiplexing logic to route the data as appropriate. This is also conveniently the best spot to handle heartbeats. At the lowest level, reading the protocol involves the following sequential steps:
- read 4 byte big endian uint32 size
- read size bytes data
- unpack data
- …
- profit
- goto 1
A Brief Interlude on Errors
Due to their asynchronous nature, it would take a bit of extra state tracking in order to
correlate protocol errors with the commands that generated them. Instead, we took the “fail fast”
approach so the overwhelming majority of protocol-level error handling is fatal. This means that if
the client sends an invalid command (or gets itself into an invalid state) the nsqd instance it’s
connected to will protect itself (and the system) by forcibly closing the connection (and, if
possible, sending an error to the client). This, coupled with the connection handling mentioned
above, makes for a more robust and stable system.
The only errors that are not fatal are:
E_FIN_FAILED- The client tried to send aFINcommand for an invalid message ID.E_REQ_FAILED- The client tried to send aREQcommand for an invalid message ID.E_TOUCH_FAILED- The client tried to send aTOUCHcommand for an invalid message ID.
Because these errors are most often timing issues, they are not considered fatal. These situations
typically occur when a message times out on the nsqd side and is re-queued and delivered to
another client. The original recipient is no longer allowed to respond on behalf of that message.
Message Handling
When the IO loop unpacks a data frame containing a message, it should route that message to the configured handler for processing.
The nsqd producer expects to receive a reply within its configured message timeout (default: 60
seconds). There are a few possible scenarios:
- The handler indicates that the message was processed successfully.
- The handler indicates that the message processing was unsuccessful.
- The handler decides that it needs more time to process the message.
- The in-flight timeout expires and
nsqdautomatically re-queues the message.
In the first 3 cases, the client library should send the appropriate command on the client’s behalf
(FIN, REQ, and TOUCH respectively).
The FIN command is the simplest of the bunch. It tells nsqd that it can safely discard the
message. FIN can also be used to discard a message that you do not want to process or retry.
The REQ command tells nsqd that the message should be re-queued (with an optional parameter
specifying the amount of time to defer additional attempts). If the optional parameter is not
specified by the client, the client library should automatically calculate the duration in relation
to the number of attempts to process the message (a multiple is typically sufficient). The client
library should discard messages that exceed the configured max attempts. When this occurs, a
user-supplied callback should be executed to notify and enable special handling.
If the message handler requires more time than the configured message timeout, the TOUCH command
can be used to reset the timer on the nsqd side. This can be done repeatedly until the message is
either FIN or REQ, up to the nsqd producer’s configured max timeout. Client libraries should
never automatically TOUCH on behalf of the client.
If the nsqd instance receives no response, the message will time out and be automatically
re-queued for delivery to an available client.
Finally, a property of each message is the number of attempts. Client libraries should compare this value against the configured max and discard messages that have exceeded it. When a message is discarded there should be a callback fired. Typical default implementations of this callback might include writing to a directory on disk, logging, etc. The user should be able to override the default handling.
RDY State
Because messages are pushed from nsqd to clients we needed a way to manage the flow of data in
userland rather than relying on low-level TCP semantics. A client’s RDY state is NSQ’s flow
control mechanism.
As outlined in the configuration section, a consumer is configured with a
max_in_flight. This is a concurrency and performance knob, e.g. some downstream systems are able
to more-easily batch process messages and benefit greatly from a higher max-in-flight.
When a client connects to nsqd (and subscribes) it is placed in an initial RDY state of 0. No
messages will be sent to the client.
Client libraries have a few responsibilities:
- bootstrap and evenly distribute the configured
max_in_flightto all connections. - never allow the aggregate sum of
RDYcounts for all connections (total_rdy_count) to exceed the configuredmax_in_flight. - never exceed the per connection
nsqdconfiguredmax_rdy_count. - expose an API method to reliably indicate message flow starvation
1. Bootstrap and Distribution
There are a few considerations when choosing an appropriate RDY count for a connection (in order
to evenly distribute max_in_flight):
- the # of connections is dynamic, often times not even known in advance (ie. when
discovering producers via
nsqlookupd). max_in_flightmay be lower than your number of connections
To kickstart message flow a client library needs to send an initial RDY count. Because the
eventual number of connections is often not known ahead of time it should start with a value of 1
so that the client library does not unfairly favor the first connection(s).
Additionally, after each message is processed, the client library should evaluate whether or not
it’s time to update RDY state. An update should be triggered if the current value is 0 or if it
is below ~25% of the last value sent.
The client library should always attempt to evenly distribute RDY count across all connections.
Typically, this is implemented as max_in_flight / num_conns.
However, when max_in_flight < num_conns this simple formula isn’t sufficient. In this state,
client libraries should perform a dynamic runtime evaluation of producer “liveness” by measuring the
duration of time since it last received a message for a connection. After a configurable expiration,
it should re-distribute whatever RDY count is available to a new (random) set of producers. By
doing this, you guarantee that you’ll (eventually) find producers with messages. Clearly this has a
latency impact.
2. Maintaining max_in_flight
The client library should maintain a ceiling for the maximum number of messages in flight for a
given consumer. Specifically, the aggregate sum of each connection’s RDY count should never exceed
the configured max_in_flight.
Below is example code in Python to determine whether or not the proposed RDY count is valid for a given connection:
3. Producer Max RDY Count
Each nsqd is configurable with a --max-rdy-count (see feature
negotiation for more information on the handshake a client can perform to
ascertain this value). If the client sends a RDY count that is outside of the acceptable range its
connection will be forcefully closed. For backwards compatibility, this value should be assumed to
be 2500 if the nsqd instance does not support feature negotiation.
4. Message Flow Starvation
Finally, the client library should provide an API method to indicate message flow starvation. It is
insufficient for clients (in their message handlers) to simply compare the number of messages they
have in-flight vs. their configured max_in_flight in order to decide to “process a batch”. There
are two cases when this is problematic:
- When clients configure
max_in_flight > 1, due to variablenum_conns, there are cases wheremax_in_flightis not evenly divisible bynum_conns. Because the contract states that you should never exceedmax_in_flight, you must round down, and you end up with cases where the sum of allRDYcounts is less thanmax_in_flight. - Consider the case where only a subset of producers have messages. Because of the expected even
distribution of
RDYcount, those active producers only have a fraction of the configuredmax_in_flight.
In both cases, a client will never actually receive max_in_flight # of messages. Therefore, the
client library should expose a method is_starved that will evaluate whether any of the
connections are starved, as follows:
The is_starved method should be used by message handlers to reliably identify when to process a
batch of messages.
Backoff
The question of what to do when message processing fails is a complicated one to answer. The message handling section detailed client library behavior that would defer the processing of failed messages for some (increasing) duration of time. The other piece of the puzzle is whether or not to reduce throughput. The interplay between these two pieces of functionality is crucial for overall system stability.
By slowing down the rate of processing, or “backing off”, the consumer allows the downstream system to recover from transient failure. However, this behavior should be configurable as it isn’t always desirable, such as situations where latency is prioritized.
Backoff should be implemented by sending RDY 0 to the appropriate producers, stopping message
flow. The duration of time to remain in this state should be calculated based on the number of
repeated failures (exponential). Similarly, successful processing should reduce this duration until
the reader is no longer in a backoff state.
While a reader is in a backoff state, after the timeout expires, the client library should only ever
send RDY 1 regardless of max_in_flight. This effectively “tests the waters” before returning to
full throttle. Additionally, during a backoff timeout, the client library should ignore any success
or failure results with respect to calculating backoff duration (i.e. it should only take into
account one result per backoff timeout).

Bringing It All Together
Distributed systems are fun.
The interactions between the various components of an NSQ cluster work in concert to provide a platform on which to build robust, performant, and stable infrastructure. We hope this guide shed some light as to how important the client’s role is.
In terms of actually implementing all of this, we treat pynsq and go-nsq as our reference codebases. The structure of pynsq can be broken down into three core components:
Message- a high-level message object, which exposes stateful methods for responding to thensqdproducer (FIN,REQ,TOUCH, etc.) as well as metadata such as attempts and timestamp.Connection- a high-level wrapper around a TCP connection to a specificnsqdproducer, which has knowledge of in flight messages, itsRDYstate, negotiated features, and various timings.Reader- the front-facing API a user interacts with, which handles discovery, creates connections (and subscribes), bootstraps and managesRDYstate, parses raw incoming data, createsMessageobjects, and dispatches messages to handlers.
We’re happy to help support anyone interested in building client libraries for NSQ. We’re looking for contributors to continue to expand our language support as well as flesh out functionality in existing libraries. The community has already open sourced client libraries for 7 languages:
- go-nsq Go (official)
- pynsq Python (official)
- libnsq C
- nsq-java Java
- TrendrrNSQClient Java
- nsqjava Java
- nsq-client Node.js
- nodensq Node.js
- nsqphp PHP
- ruby_nsq Ruby
If you’re interested in hacking on NSQ, check out the GitHub repo. We also publish binary distributions for Linux and Darwin to make it easy to get started.
If you have any specific questions, feel free to ping me on twitter @imsnakes.
Finally, a huge thank you to the crew at bitly. @mccutchen and @ploxiln for their tireless review and @jehiah my partner in crime on NSQ.
Speeding things up with Redshift
Recently we’ve started to experiment with using Redshift, Amazon’s new data warehousing service. More specifically, we’re using it to speed up and expand our ad hoc data analysis.
The Challenge
bitly sees billions of clicks and shortens each month. Often we have various questions about the data generated from this activity. Sometimes these questions are driven by business needs (how much traffic do we see from a potential enterprise customer), sometimes they are more technically driven (how much traffic will a new sub-system need to deal with), and sometimes we like to just have fun (what are the top trashy celeb stories this week).
Unfortunately, when working with that volume of data it can be pretty difficult to do much of anything quickly. Pre-Redshift, all of these questions were answered by writing map-reduce jobs to be run on our Hadoop cluster or on Amazon’s EMR.
Whenever we wanted to answer a question with our data, the process would look something like this:
- Write map-reduce job in Python
- Run it on some local test data
- Fix bugs.
- Run it on the Hadoop cluster
- Wait 20-30 minutes for results
- Get an error back from Hadoop
- Dig through the logs to find the error.
GOTO 3
This is clearly not ideal when all you want to do is get a simple count.
For a lot of the work we do Hadoop + Python make for an awesome combination, but for these ad hoc aggregation queries they’re very blunt instruments. In both cases, they are general purpose tools that are super flexible, but slow and difficult to use for this specific use case.
Redshift, on the other hand, is specifically built and optimized for doing aggregation queries over large sets of data. When we want to answer a question with Redshift, we just write a SQL query and get an answer within a few minutes—if not seconds.
Overall, our experience with Redshift has been a positive one but we have run into some gotchas that we’ll get into below.
The Good News
User Experience
From a user perspective, we’re really happy with Redshift. Any one of our developers or data scientists just need to write a SQL query and they have an answer to their question in less than 5 minutes. Moving from our old hadoop based workflow to an interactive console session with Redshift is a major improvement.
Additionally, since much of the user facing bits of Redshift are based on PostgreSQL there is a large ecosystem of mature, well-documented tools and libraries for us to take advantage of.
Finally, while it can be a bit slow at times, we’ve been very impressed with the web management console Amazon provides with Redshift. For a 1.0 product, the console is comprehensive and offers much more information than we expected it to.
Performance
For our current use case of ad hoc research queries, Redshift’s performance is adequate. Most queries return a response in less than five minutes and we rarely have many users executing queries concurrently.
That being said, we have done some experimentation with competing products (e.g. Vertica) and have seen better performance out of those tools. This is especially true for more complex queries that benefit from projections/secondary indexes and situations where the cluster’s resources are under contention.
Documentation
Just like the rest of AWS, Amazon provides reasonably comprehensive and thorough documentation. For everything that is directly exposed to us as users (e.g. loading operations, configuration params, etc) we are very happy with the documentation. The only places where we felt we wanted more information were those where Amazon makes things “just work”. Most significantly we would like to see more details about what exactly happens when a node in the cluster fails and how the cluster is expected to behave in that state.
Cost
We wouldn’t go so far as to call Redshift cheap, but compared to many competitors it is pretty cost effective. The biggest gotcha here is that while the simple model for scaling Redshift clusters and tuning performance within a cluster is nice as a user, it does mean that you have a bit of a one-size-fits all situation.
In our case we are computationally and I/O constrained so we’re paying for a bunch of storage capacity and memory that we don’t use. At our current scale, things work out okay but as we continue to grow it may make sense to take advantage of something else that is more flexible in terms of both hardware and tuning.
The Bad News
Data Loading
We had to spend a lot of time getting our data into Redshift. This is partially our fault since our dataset is not the cleanest in the world, but overall this is the place that we felt the most pain from an immature tool chain.
The majority of bitly’s at-rest data is stored in line-oriented (i.e. one JSON blob per line) JSON files. The primary way to load data into Redshift is to use the COPY command to point the cluster at a pile of CSV or TSV files stored on S3. Clearly we needed to build out some kind of tool chain to transform our JSON logs into flat files.
Initially, we just wrote a simple Python script that would do the transformation. Unfortunately, we quickly discovered that this simple approach would be too slow. We estimated that it would have taken a month to process and load all the data we wanted in Redshift.
Next, we realized that we had a tool for easily doing highly parallelized, distributed text processing: Hadoop. Accordingly, we re-worked our quick Python script into a hadoop job to transform our logs in a big batch. Since we already keep a copy of our raw logs in S3, EMR proved to be a great tool for this
Overall this process worked well but we did still run into a few gotchas loading the flattened data into Redshift:
- Redshift only supports a subset of UTF-8, specifically characters can only be 3 bytes long. Amazon does mention this in their docs, but it still bit us a few times.
- varchar field lengths are in terms of bytes, not characters. In Python, byte strings expose these as one and the same. Unicode strings on the other hand do not. It took us a little while to realize this was happening and to get our code setup to do byte length truncations without truncating mid-characater in unicode strings. To get an idea of how we went about doing unicode aware truncation, check out this relevant stack overflow thread.
- Moving floats between different systems always has some issues. In Python the default string output of small float values is scientific notation (.332e-8). Unfortunately Redshift doesn’t recognize this format so we needed to force our data prep job to always output floats in decimal format.
Now that we have a large body of data loaded into Redshift, we’re working on building out tooling based on NSQ to do our data prep work in a streaming fashion that should allow us to easily do smaller incremental updates.
In the end, we worked through our data loading issues with Redshift but it was one of the more acute pain points we encountered. From our conversations with Amazon, they’re definitely aware of this and we’re interested to see what they’ll come out with, but for now the provided tooling is pretty limited.
Availability
Long term, there are a number of periodic and online tasks that we’re thinking about using a tool like Redshift for. Unfortunately, as things stand today we would not be comfortable relying on Redshift as a highly available system.
Currently, if any node within a Redshift cluster fails, all outstanding queries will fail and Amazon will automatically start replacing the failed node. In theory this recovery should happen very quickly. The cluster will be available for querying as soon as the replacement node is added, but performance on the cluster will be degraded while data is restored on to the new node.
At this point, we have no data on how well this recovery process works in the real world. Additionally, we have concerns about how well this process will work when there are larger issues happening within an availability zone or region. Historically there are a number of cases where issues within one Amazon service (e.g. EBS) have cascaded into other services leading to long periods of unavailability or degradation.
Until there’s a significant track record behind a system like this, we’re hesitant to trust anything that will “automatically recover”.
There is the option of running a mirrored Redshift cluster in a different AZ or region, but that gets expensive fast. Additionally, we’d have to build out even more tooling to make sure those two clusters stay in sync with each other.
Limited Feature Set
Redshift is very impressive feature-wise for a 1.0 product. That being said, a number of the competing products have been around for a while and offer some major features that Reshift lacks.
The biggest missing feature for us with Redshift is some kind of secondary indexing or projections. Right now, if you sort or filter by any column other than the SORTKEY, Redshift will do a full table scan. Technically you could create a copy of that table with a different sort key but then it would become your problem to keep those tables in sync and to query the right table at the right time.
Some other “missing” features include tools for working with time series data, geospatial query tools, advanced HA features, and more mature data loading tools.
The Bottom Line
Redshift is great for our needs today, but we’ll see if Amazon’s development keeps up as our needs change going forward. Given Amazon’s impressive track record for quickly iterating on and improving products we’re hopeful, but we do have our eyes on competing products as our use of data warehousing tools matures.
Securing Internal Applications
A common infrastructure problem is managing access to internal applications. Some patterns for solving that problem include: fronting internal apps with HTTP Basic or HTTP Digest authentication through Apache or nginx, running the apps on secret ports, and bundling an authentication system as part of each application. Many off-the-shelf applications include no built-in authentication, and count on upstream proxies/systems for access control which limits options for securing them. Google Auth Proxy is a tool we have developed to give us a better ability to secure internal applications by using Google Account authentication (like many other startups, we rely on Google Apps).
At bitly, we like fronting HTTP applications with Nginx and use that approach for nearly all of our stack, from Tornado apps handling our core APIs to common internal tools (e.g. Nagios, Munin, Graphite) to homegrown tools like NSQ and our deploy system. For many of those systems, we took advantage of this by configuring HTTP Basic authentication to restrict access. Using Basic Auth however led to situations where the authentication information would be stored in configuration files and scripts, leaking it throughout the repository.
Some of our homegrown tools initially relied on bitly’s OAuth 2 service for authentication, which meant that sign in was managed through bitly accounts by the OAuth flow. This left each application to handle authorization by maintaining a list of bitly accounts that actually had permission to access the app. This was an improvement over HTTP Basic Auth as each application did not need to accept or store passwords, but it was less than ideal because each app still needed to store and manage its own list of authorized users. Over time, this led to a large number of disparate systems with separate authorization lists. This approach was only feasible for internal tools and thus couldn’t apply to other open source applications we were using.
We developed Google Auth Proxy as a new tool to use in securing internal applications. It is a HTTP
Reverse Proxy that provides authentication using Google’s OAuth2 API with flexibility to authorize
individual Google Accounts (by email address) or a whole Google apps domain.
For internal applications, this is convenient because we can now allow our whole @bit.ly Google apps domain without
separately managing accounts or passwords.
Google Auth Proxy requires a limited set of privileges in order to authenticate users (asking for only the
userinfo.email and userinfo.profile
OAuth2 scopes). This authentication information
is then passed to the upstream application as HTTP Basic Auth (with an empty value for the password), and in a HTTP
Header as X-Forwarded-User for applications that need that context.
Google Auth Proxy has been in production for 6 months and has helped reduce overhead for managing internal applications (no setup time for new or removed accounts) and has made it easier for us to open up access for important tools to the entire company. It’s also worth mentioning that if you use two-factor-authentication with your Google accounts, that security carries over to improved authentication for Google Auth Proxy.
We chose to write Google Auth Proxy in Go because golang has built in support for writing a concurrent
ReverseProxy in the net/http/httputil package. This meant that little work was needed to proxy
requests and we could focus on just writing the authentication layer.
If you find this useful, we’d love to hear about it @bitly
Forget Table
Forget Table is a solution to the problem of storing the recent dynamics of categorical distributions that change over time (ie: non-stationary distributions).
What does this mean? Why is this useful? What makes this the most important thing since sliced bread?! Read on!
Storing distributions is crucial for doing any sort of statistics on a dataset. Normally, this problem is as easy as keeping counters of how many times we’ve seen certain quantities, but when dealing with data streams that constantly deliver new data, these simple counters no longer do the job. They fail because data we saw weeks ago has the same weight as data we have just seen, even though the fundamental distribution the data is describing may be changing. In addition it provides an engineering challenge since the counters would strictly grow and very soon use all the resources available to it. This is why we created Forget Table.
Background
A categorical distribution describes the probability of seeing an event occur out of a set of possible events. So an example of this at bitly is that every click coming through our system comes from one of a fixed number of country codes (there are about 260). We would like to maintain a categorical distribution, that assigns a probability to each country, that describes how likely any given click comes from each country. With bitly’s data, this is going to give a high weight to the US and lower weights to Japan and Brazil, for example.
It’s very useful for bitly to store distributions like this, as it gives us a good idea as to what’s considered ‘normal’. Most of the time we produce analytics that show, for example, how many clicks came from a given country, or referrer. While this gives our clients a sense of where there traffic is coming from, it doesn’t directly express how surprising their traffic is. This can be remedied by maintaining a distribution over countries for all of bitly, so that we can identify anomalous traffic, ie: when a particular link is getting disproportionally more clicks from Oregon than we would normally expect.
The difficulty that Forget Table deals with comes from the fact that what bitly considers normal changes constantly. At 8am on the East Coast of the US, we’d be surprised to see a lot of traffic coming from San Francisco (they’re all still asleep over there) however at 11am EST we should expect to see tons of traffic coming from San Francisco. So unless we allow our understanding of what’s considered normal to change over time, we are going to have a skewed idea of normality.
This is a general problem over longer time scales, too. The behavior of the social web is different in December than it is in July, and it’s different in 2012 than it was in 2011. We need to make sure that our understanding of normality changes with time. One way of achieving this is to forget old data that’s no longer relevant to our current understanding. This is where Forget Table comes in.
Why forget in the first place?
The basic problem is that the fundamental distribution (ie: the distribution that the data is actually being created from) is changing. This property is called being “non-stationary”. Simply storing counts to represent your categorical distribution makes this problem worse because it keeps the same weight for data seen weeks ago as it does for the data that was just observed. As a result, the total count method simply shows the combination of every distribution the data was drawn from (which will approach a Gaussian by the central limit theorem).
A naive solution to this would be to simply have multiple categorical distributions that are in rotation. Similar to the way log files get rotated, we would have different distributions that get updated at different time. This approach, however, can lead to many problems.
When a distribution first gets rotated in, it has no information about the current state of the world. Similarly, at the end of a distribution’s rotation it is just as affected by events from the beginning of it’s rotation as it is by recent events. This creates artifacts and dynamics in the data which are only dependent on the time of and time between rotations. These effects are similar to various pathologies that come from binning data or by simply keeping a total count.
On the other hand, forgetting things smoothly using rates we have a continuous window of time that our distribution is always describing. The further back in time the event is, the less of an effect it has on the distribution’s current state.
Forgetting Data Responsibly
Forget table takes a principled approach to forgetting old data, by defining the rate at which old data should decay. Each bin of a categorical distribution forgets its counts depending on how many counts it currently has and a user specified rate. With this rule, bins that are more dominant get decayed faster than bins without many counts. This method also has the benefit of being a very simple process (one that was inspired by the decay of radioactive particles) which can be calculated very quickly. In addition, since bins with high counts get decayed faster than bins with low counts, this process helps smooth out sudden spikes in data automatically.
If the data suddenly stopped flowing into the Forget Table, then all the categorical distributions would eventually decay to the uniform distribution - each bin would have a count of 1 (at which point we stop decaying), and z would be equal to the number of bins (see a visualization of this happening). This captures the fact that we no longer have any information about the distribution of the variables in Forget Table.
The result of this approach is that the counts in each bin, in each categorical distribution, decay exponentially. Each time we decrement the count in a bin, we also decrement the normalising constant z. When using ForgetTable, we can choose a rate at which things should decay, depending on the dominant time constants of the system.
Building Categorical Distributions in Redis
We store the categorical distribution as a set of event counts, along with a ‘normalising constant’ which is simply the number of all the events we’ve stored. In the country example, we have ~260 bins, one per country, and in each bin we store the number of clicks we’ve seen from each country. Alongside it, our normalising constant stores the total number of clicks we’ve seen across all countries.
All this lives in a Redis sorted set where
the key describes the variable which, in this case, would simply be
bitly_country and the value would be a categorical distribution. Each element
in the set would be a country and the score of each element would be the number
of clicks from that country. We store a separate element in the set
(traditionally called z) that records the total number of clicks stored in
the set. When we want to report the categorical distribution, we extract the
whole sorted set, divide each count by z, and report the result.
Storing the categorical distribution in this way allows us to make very rapid writes (simply increment the score of two elements of the sorted set) and means we can store millions of categorical distributions in memory. Storing a large number of these is important, as we’d often like to know the normal behavior of a particular key phrase, or the normal behavior of a topic, or a bundle, and so on.
Forgetting Data Under Strenuous Circumstances
This seems simple enough, but we have two problems. The first is that we have potentially millions of categorical distributions. Bitly maintains information for over a million separate key phrases at any given time, and (for some super secret future projects) it is necessary to store a few distributions per key phrase. So we are unable to iterate through each key of our Redis table in order to do our decays, so cron-like decays wouldn’t be feasible (ie: decaying every distribution in the database every several minutes).
The second problem is that data is constantly flowing into multiple distributions: we sometimes see spikes of up to 3 thousand clicks per second which can correspond to dozens of increments per second. At this sort of high volume, there is simply too much contention between the decay process and the incoming increments to safely do both.
So the real contribution of Forget Table is an approach to forgetting data at read time. When we are interested in the current distribution of a particular variable, we extract whatever sorted set is stored against that variable’s key and decrement the counts at that time.
It turns out that, using the simple rate based model of decay from above, we can decrement each bin by simply sampling an integer from a Poisson distribution whose rate is proportional to the current count of the bin and the length of time it has been since we last decayed that bin. So, by storing another piece of information, the time since we last successfully decayed this distribution, we can calculate the amount of counts to discard very cheaply (this algorithm is an approximation to Gillespie’s algorithm used to simulate stochastic systems).
In Redis we implement this using pipelines. Using a pipeline, we read the sorted set, form the distribution, calculate the amount of decay for each bin and then attempt to perform the decay. Assuming nothing’s written into the sorted set in that time, we decay each bin and update the time since last decayed. If the pipeline has detected a collision — either another process has decayed the set or a new event has arrived — we abandon the update. The algorithm we’ve chosen means that it’s not terribly important to actually store the decayed version of the distribution, so long as we know the time between the read and the last decay.
Get the Code and We’re Hiring!
The result is a wrapper on top of Redis that runs as a little HTTP service. Its
API has an increment handler on /incr used for incrementing counts based on
new events, and a get handler on /get used for retrieving a distribution. In
addition, there is an /nmostprobable endpoint that returns the n categories
which have the highest probability of occurring.
There are two versions, one in Go (for super speed) and the other in Python. The code is open source and up on Github, available at http://bitly.github.com/forgettable.
As always, if you like what you see (or feel the need to make improvements), don’t forget that bitly is hiring!.
Get ready for more posts on science at bitly, and If you have any specific questions, feel free to ping me on twitter @mynameisfiber.
spray some NSQ on it
We released NSQ on October 9th 2012. Supported by 3 talks and a blog post, it’s already the 4th most watched Go project on GitHub. There are client libraries in 7 languages and we continue to talk with folks experimenting with and transitioning to the platform.
This post aims to kickstart the documentation available for “getting started” and describe some NSQ patterns that solve a variety of common problems.
DISCLAIMER: this post makes some obvious technology suggestions to the reader but it generally ignores the deeply personal details of choosing proper tools, getting software installed on production machines, managing what service is running where, service configuration, and managing running processes (daemontools, supervisord, init.d, etc.).
Metrics Collection
Regardless of the type of web service you’re building, in most cases you’re going to want to collect some form of metrics in order to understand your infrastructure, your users, or your business.
For a web service, most often these metrics are produced by events that happen via HTTP requests, like an API. The naive approach would be to structure this synchronously, writing to your metrics system directly in the API request handler.

- What happens when your metrics system goes down?
- Do your API requests hang and/or fail?
- How will you handle the scaling challenge of increasing API request volume or breadth of metrics collection?
One way to resolve all of these issues is to somehow perform the work of writing into your metrics system asynchronously - that is, place the data in some sort of local queue and write into your downstream system via some other process (consuming that queue). This separation of concerns allows the system to be more robust and fault tolerant. At bitly, we use NSQ to achieve this.
Brief tangent: NSQ has the concept of topics and channels. Basically, think of a topic as a unique stream of messages (like our stream of API events above). Think of a channel as a copy of that stream of messages for a given set of consumers. Topics and channels are both independent queues, too. These properties enable NSQ to support both multicast (a topic copying each message to N channels) and distributed (a channel equally dividing its messages among N consumers) message delivery.
For a more thorough treatment of these concepts, read through the design doc and slides from our Golang NYC talk, specifically slides 19 through 33 describe topics and channels in detail.

Integrating NSQ is straightforward, let’s take the simple case:
- Run an instance of
nsqdon the same host that runs your API application. - Update your API application to write to the local
nsqdinstance to queue events, instead of directly into the metrics system. To be able to easily introspect and manipulate the stream, we generally format this type of data in line-oriented JSON. Writing intonsqdcan be as simple as performing an HTTP POST request to the/putendpoint. - Create a consumer in your preferred language using one of our
client libraries. This “worker” will subscribe to the
stream of data and process the events, writing into your metrics system.
It can also run locally on the host running both your API application
and
nsqd.
Here’s an example worker written with our official Python client library:
In addition to de-coupling, by using one of our official client libraries, consumers will degrade gracefully when message processing fails. Our libraries have two key features that help with this:
- Retries - when your message handler indicates failure, that information is
sent to
nsqdin the form of aREQ(re-queue) command. Also,nsqdwill automatically time out (and re-queue) a message if it hasn’t been responded to in a configurable time window. These two properties are critical to providing a delivery guarantee. - Exponential Backoff - when message processing fails the reader library will delay the receipt of additional messages for a duration that scales exponentially based on the # of consecutive failures. The opposite sequence happens when a reader is in a backoff state and begins to process successfully, until 0.
In concert, these two features allow the system to respond gracefully to downstream failure, automagically.
Persistence
Ok, great, now you have the ability to withstand a situation where your metrics system is unavailable with no data loss and no degraded API service to other endpoints. You also have the ability to scale the processing of this stream horizontally by adding more worker instances to consume from the same channel.
But, it’s kinda hard ahead of time to think of all the types of metrics you might want to collect for a given API event.
Wouldn’t it be nice to have an archived log of this data stream for any future operation to leverage? Logs tend to be relatively easy to redundantly backup, making it a “plan z” of sorts in the event of catastrophic downstream data loss. But, would you want this same consumer to also have the responsibility of archiving the message data? Probably not, because of that whole “separation of concerns” thing.
Archiving an NSQ topic is such a common pattern that we built a utility, nsq_to_file, packaged with NSQ, that does exactly what you need.
Remember, in NSQ, each channel of a topic is independent and receives a
copy of all the messages. You can use this to your advantage when archiving
the stream by doing so over a new channel, archive. Practically, this means
that if your metrics system is having issues and the metrics channel gets
backed up, it won’t effect the separate archive channel you’ll be using to
persist messages to disk.
So, add an instance of nsq_to_file to the same host and use a command line
like the following:
/usr/local/bin/nsq_to_file --nsqd-tcp-address=127.0.0.1:4150 --topic=api_requests --channel=archive

Distributed Systems
You’ll notice that the system has not yet evolved beyond a single production host, which is a glaring single point of failure.
Unfortunately, building a distributed system is hard. Fortunately, NSQ can help. The following changes demonstrate how NSQ alleviates some of the pain points of building distributed systems as well as how its design helps achieve high availability and fault tolerance.
Let’s assume for a second that this event stream is really important. You want to be able to tolerate host failures and continue to ensure that messages are at least archived, so you add another host.

Assuming you have some sort of load balancer in front of these two hosts you can now tolerate any single host failure.
Now, let’s say the process of persisting, compressing, and transferring these logs is affecting performance. How about splitting that responsibility off to a tier of hosts that have higher IO capacity?

This topology and configuration can easily scale to double-digit hosts, but
you’re still managing configuration of these services manually, which does not
scale. Specifically, in each consumer, this setup is hard-coding the address
of where nsqd instances live, which is a pain. What you really want is for
the configuration to evolve and be accessed at runtime based on the state of
the NSQ cluster. This is exactly what we built nsqlookupd to
address.
nsqlookupd is a daemon that records and disseminates the state of an NSQ
cluster at runtime. nsqd instances maintain persistent TCP connections to
nsqlookupd and push state changes across the wire. Specifically, an nsqd
registers itself as a producer for a given topic as well as all channels it
knows about. This allows consumers to query an nsqlookupd to determine who
the producers are for a topic of interest, rather than hard-coding that
configuration. Over time, they will learn about the existence of new
producers and be able to route around failures.
The only changes you need to make are to point your existing nsqd and
consumer instances at nsqlookupd (everyone explicitly knows where
nsqlookupd instances are but consumers don’t explicitly know where
producers are, and vice versa). The topology now looks like this:

At first glance this may look more complicated. It’s deceptive though, as
the effect this has on a growing infrastructure is hard to communicate
visually. You’ve effectively decoupled producers from consumers because
nsqlookupd is now acting as a directory service in between. Adding
additional downstream services that depend on a given stream is trivial, just
specify the topic you’re interested in (producers will be discovered by
querying nsqlookupd).
But what about availability and consistency of the lookup data? We generally
recommend basing your decision on how many to run in congruence with your
desired availability requirements. nsqlookupd is not resource intensive and
can be easily homed with other services. Also, nsqlookupd instances do not
need to coordinate or otherwise be consistent with each other. Consumers will
generally only require one nsqlookupd to be available with the information
they need (and they will union the responses from all of the nsqlookupd
instances they know about). Operationally, this makes it easy to migrate to a
new set of nsqlookupd.
EOL
Using these same strategies, we’re now peaking at 65k messages per second delivered through our production NSQ cluster. Although applicable to many use cases, we’re only scratching the surface with the configurations described above.
If you’re interested in hacking on NSQ, check out the GitHub repo and the list of client libraries. We also publish binary distributions for Linux and Darwin to make it easy to get started.
Stay tuned for more posts on leveraging NSQ to build distributed systems. If you have any specific questions, feel free to ping me on twitter @imsnakes.
Improving Frontend Code Quality and Workflow
When I started at Bitly as a Frontend Engineer, we were about to launch the new bitly. It was exciting to be so close to a product launch and we were cranking out lots of code each day. It took me a few days to get my bearings in the sprawling codebase and I saw lots of opportunity for refactoring, cleanup, and style normalization as well as some architectural concepts we weren’t leveraging.
Product launch mode kept us from doing house keeping for the next few months after we released the new product as we collected feedback and iterated on several designs/functionalities.
Once we had a chance to take a step back and regroup, I saw that we could be more productive if we invested in a common JavaScript style (both syntax and module level patterns) and re-evaluated which libraries we were leveraging to build the site.
When we decided the next feature work would be on save/share modal dialogs, I used this as an opportunity to evaluate my new picks for tools and libraries. The result was a success and allowed us to adopt the new tools and libraries for all new features (see Easily Save and Share the Links You Love).
Keep reading to see what tools we ended up using to make us more productive!
Coding style guidelines
If you’ve seen frontend JavaScript codebases older than six months with two or more developers working on them, you’ve seen how hard it is to enforce style.
The goal of a style guide (and code conventions in general) is to reduce the amount of friction when switching between different modules in a codebase and increase the consistency and readability of code. If every module follows the same patterns, you can easily get your bearings in code written by your coworkers. Every team should have a common style that is enforced in code review and ideally written in a document that can be referred to for new hires.
For JavaScript at Bitly, we mostly use the Google Style Guide without the JSDoc and with a variation that non-method variables are lowercase_with_underscores e.g.
var Bitly = function() {
};
Bitly.prototype.doThing = function() {
var an_object = {};
};
The bigger the team, the more you might have to diverge from your preferred style but you’ll still get tons of benefit from the consistency alone.
Coffeescript for teams
At Bitly, in addition to codifying the style guide, we gave Coffeescript a trial run and decided to continue using it for all new code (like Github does). Coffeescript is quite contentious in the JS community, but for our frontend team we saw a great improvement in workflow and maintainability.
Since whitespace is significant and {} and ; are almost always omitted, there is a distinct Coffeescript style that is encouraged by default.
Here’s a snippet from a Backbone View that declares all the DOM event handlers for the view in a concise way.
events:
'click': 'focusInput'
'focusin .picker-input': 'onFocusIn'
'focusout .picker-input': 'checkForFullEmail'
'keydown .picker-display': 'keyDownCheckForFocusedChip'
'keydown .picker-input': 'inputKeydown'
'keyup .picker-input': 'inputKeyup'
'input .picker-input': 'inputNewText'
'keydown .picker-suggestions': 'suggestionKeydown'
'mousedown .picker-suggestions a': 'suggestionMousedown'
'click .picker-suggestions a': 'selectItem'
'click .picked-item .remove': 'removeItem'
Fat arrow (=>) is another great feature used for binding a function to the current scope and obviates the var self = this; pattern you will see in almost all JS codebases.
Here, fat arrow binds selectItem at constructor time to the newly constructed instance of MultiPickerView (not the prototype) instead of the default jQuery wrapped current target so I can access View methods/members within the handler.
class App.MultiPickerView extends Backbone.View
# ...
# ...
selectItem: (e) =>
e.preventDefault()
$item = $(e.currentTarget)
@addEmail $item.data("email")
# @$input is convention for cached jquery elements
# stored on the view instance
@$input.val ""
@hideSuggest()
_.defer => @$input.focus()
Fast and (mostly) logic-less templates with Handlebars
Handlebars is a template language with partials, custom helpers, and minimal logic, which can be compiled on the client or server side. We take advantage of the watcher system we already had in place for compiling Coffeescript to automatically compile our Handlebars templates ahead of time in order to reduce the workload on clients.
Immediate feedback with Coffeescript + File watcher + Growl
One of the major benefits for using Coffeescript is the compile-time checking that catches syntax errors. Instead of using the built-in coffee -cw *.coffee to compile-on-change, I wrote a script that does this but also triggers Growl notifications if the compilation produced errors.

The script does the same for *.handlebars files (for which there is not a built-in watcher functionality).

See the source: Multiwatcher gist
Backbone (Models, Views, Events)
Historically, frontend JavaScript is fertile ground for spaghetti code, tight coupling, repeated code and scattered entry points. The introduction of jQuery increased this trend by making it easy for developers to handle DOM events and add visual effects wherever it was most convenient. This scales to a point, but when you start doing lots of AJAX, manipulating application state, and generating HTML using the DOM APIs, things can quickly get out of hand.
Backbone introduced a few great primitives that can be leveraged to make more maintainable frontend code.
Models are a good abstraction when you have a lot application state that can change based on user interaction and/or AJAX calls.
Views encapsulate DOM elements and provide a declarative way of setting your event listeners using jQuery’s delegated event listeners.
Events implements pub/sub pattern which allows you to easily decouple your models and views. This is probably the most important development for frontend application architecture. The order of events generally might go like this:
- Instantiate Model, View
- View listens on Model changes
- Model attribute changes (via AJAX callback or direct user interaction)
- Model fires change event on it’s event channel
- View’s listener fires in response and performs some action
The decoupling is that the Model never has knowledge of the View. This means the Model handles only the business logic and can be much more easily tested.
There has been much written on Backbone and other MV* frameworks, so I won’t evangelize much here except to say that adding a View and Model layer in your frontend JavaScript will improve your architecture significantly.
Addy Osmani gives a good treatment as to why you would use MV* and how to select your tools in: Journey through the JavaScript MVC jungle
Unit/Integration Tests with Mocha, Sinon
Unit testing is a given for any modern web app with a long lifecycle and high volume of traffic.
There are many test runner frameworks for testing but you should choose one that matches your style. I like Mocha because it can handle BDD, TDD, and export styles, has a nice web browser interface, is being adopted in lots of places, and is written by TJ Holowaychuk who has a great reputation in the node community. The two other big names in testing frameworks are Jasmine and QUnit, but they are more opinionated and less flexible.
Besides a test runner, you’ll want some helper libraries to make your tests easier to write.
Sinon is essential as it provides spy, stub, and mock functionality to make sure you are testing the unit under test and not its dependencies! The author wrote Test-Driven JavaScript Development and is the foremost expert on this topic.
To support different styles of test assertions, you may like Should.js which provides expressive methods so you can write tests like:
user.pets.should.have.length(5)
Here is a sample test file for our AJAX wrapper method (which uses mockjax):
$.mockjaxSettings.responseTime = 1
$.mockjax(
url: '/data/beta/missing'
status: 404
)
$.mockjax(
url: '/data/beta/invalid'
status: 200
responseText:
status_code: 500
status_txt: 'MISSING_ARG'
data: null
)
$.mockjax(
url: '/data/beta/valid'
status: 200
responseText:
status_code: 200
status_txt: 'OK'
data: []
)
describe 'BITLY', ->
describe '#post', ->
it 'should trigger .fail for non-200 status_code', (done) ->
failSpy = sinon.spy()
doneSpy = sinon.spy()
BITLY.post('/data/beta/missing', {},
success: doneSpy
error: failSpy
)
.done(doneSpy)
.done((data) ->
data.should.be.a 'object'
)
.fail(failSpy)
.always(->
doneSpy.called.should.be.false
failSpy.calledTwice.should.be.true
done()
)
it 'should trigger .fail for non-200 response', (done) ->
failSpy = sinon.spy()
doneSpy = sinon.spy()
BITLY.post('/data/beta/invalid', {},
success: doneSpy
error: failSpy
)
.done(doneSpy)
.fail(failSpy)
.always(->
doneSpy.called.should.be.false
failSpy.calledTwice.should.be.true
done()
)
it 'should trigger .done for 200 response', (done) ->
failSpy = sinon.spy()
doneSpy = sinon.spy()
BITLY.post('/data/beta/valid', {},
success: doneSpy
error: failSpy
)
.done(doneSpy)
.done((data) ->
data.should.be.a 'object'
)
.fail(failSpy)
.always(->
failSpy.called.should.be.false
doneSpy.calledTwice.should.be.true
done()
)
Here is a sample of our test suite and its test runner:

Always be looking to improve your toolkit and sweat the small stuff when it comes to code quality. Every engineer wants to work with other great engineers. Ask yourself “If not me, then who on my team will make the effort?”
If you like working with other great engineers, bitly is hiring.
Classifying Human Traffic with Random Forest Decision Trees
At bitly, we study human behavior on the social web, and we often need to figure out when data is generated by a deliberate human action (organic data) or by an action taken by a script or without a human’s knowledge (inorganic data). bitly data scientist Brian Eoff recently gave a talk at PyData NYC 2012 on a fast random forest decision tree approach, implemented in Python with scikits-learn, to identifying organic vs inorganic data in a realtime stream.
SciKit Random Forest - Brian Eoff from Continuum Analytics on Vimeo.
NSQ: realtime distributed message processing at scale
NSQ is a realtime message processing system designed to operate at bitly’s scale, handling billions of messages per day.
It promotes distributed and decentralized topologies without single points of failure, enabling fault tolerance and high availability coupled with a reliable message delivery guarantee.
Operationally, NSQ is easy to configure and deploy (all parameters are specified on the command line and compiled binaries have no runtime dependencies). For maximum flexibility, it is agnostic to data format (messages can be JSON, MsgPack, Protocol Buffers, or anything else). Go and Python libraries are available out of the box.
This post aims to provide a detailed overview of NSQ, from the problems that inspired us to build a better solution to how it works inside and out. There’s a lot to cover so let’s start off with a little history…
Background
Before NSQ, there was simplequeue, a simple (shocking, right?) in-memory message queue
with an HTTP interface, developed as part of our open source simplehttp suite of tools. Like
its successor, simplequeue is agnostic to the type and format of the data it handles.
We used simplequeue as the foundation for a distributed message queue by siloing an instance on
each host that produced messages. This effectively reduced the potential for data loss in a system
which otherwise did not persist messages by guaranteeing that the loss of any single host would
not prevent the rest of the message producers or consumers from functioning.
We also used pubsub, an HTTP server to aggregate streams and provide an endpoint for multiple clients to subscribe. We used it to transmit streams across hosts (or datacenters) and be queued again for writing to various downstream services.
As a glue utility, we used ps_to_http to subscribe to a pubsub stream and write the data to
simplequeue.
There are a couple of important properties of these tools with respect to message duplication and
delivery. Each of the N clients of a pubsub receive all of the messages published (each
message is delivered to all clients), whereas each of the N clients of a simplequeue receive
1 / N of the messages queued (each message is delivered to 1 client). Consequently, when
multiple applications need to consume data from a single producer, we set up the following workflow:

The producer publishes to pubsub and for each downstream service we set up a dedicated
simplequeue with a ps_to_http process to route all messages from the pubsub into the queue.
Each service has its own set of “queuereaders” which we scale independently according to the
service’s needs.
We used this foundation to process 100s of millions of messages a day. It was the core upon which bitly was built.
This setup had several nice properties:
- producers are de-coupled from downstream consumers
- no producer-side single point of failures
- easy to interact with (all HTTP)
But, it also had its issues…
One is simply the operational overhead/complexity of having to setup and configure the various tools
in the chain. Of particular note are the pubsub > ps_to_http links. Given this setup, consuming a
stream in a way that avoids SPOFs is a challenge. There are two options, neither of which is ideal:
- just put the
ps_to_httpprocess on a single box and pray - shard by consuming the full stream but processing only a percentage of it on each host (though this does not resolve the issue of seamless failover)
To make things even more complicated, we needed to repeat this for each stream of data we were interested in.
Also, messages traveling through the system had no delivery guarantee and the responsibility of re-queueing was placed on the client (for instance, if processing fails). This churn increased the potential for situations that result in message loss.
Enter NSQ
NSQ is designed to (in no particular order):
- provide easy topology solutions that enable high-availability and eliminate SPOFs
- address the need for stronger message delivery guarantees
- bound the memory footprint of a single process (by persisting some messages to disk)
- greatly simplify configuration requirements for producers and consumers
- provide a straightforward upgrade path
- improve efficiency
To introduce some NSQ concepts, let’s start off by discussing configuration.
Simplifying Configuration and Administration
A single nsqd instance is designed to handle multiple streams of data at once. Streams are called
“topics” and a topic has 1 or more “channels”. Each channel receives a copy of all the
messages for a topic. In practice, a channel maps to a downstream service consuming a topic.
Topics and channels all buffer data independently of each other, preventing a slow consumer from causing a backlog for other channels (the same applies at the topic level).
A channel can, and generally does, have multiple clients connected. Assuming all connected clients are in a state where they are ready to receive messages, each message will be delivered to a random client. For example:

NSQ also includes a helper application, nsqlookupd, which provides a directory service where
consumers can lookup the addresses of nsqd instances that provide the topics they are interested
in subscribing to. In terms of configuration, this decouples the consumers from the producers (they
both individually only need to know where to contact common instances of nsqlookupd, never each
other), reducing complexity and maintenance.
At a lower level each nsqd has a long-lived TCP connection to nsqlookupd over which it
periodically pushes its state. This data is used to inform which nsqd addresses nsqlookupd will
give to consumers. For consumers, an HTTP /lookup endpoint is exposed for polling.
To introduce a new distinct consumer of a topic, simply start up an NSQ client configured with
the addresses of your nsqlookupd instances. There are no configuration changes needed to add
either new consumers or new publishers, greatly reducing overhead and complexity.
NOTE: in future versions, the heuristic nsqlookupd uses to return addresses could be based on
depth, number of connected clients, or other “intelligent” strategies. The current implementation is
simply all. Ultimately, the goal is to ensure that all producers are being read from such that
depth stays near zero.
It is important to note that the nsqd and nsqlookupd daemons are designed to operate
independently, without communication or coordination between siblings.
We also think that it’s really important to have a way to view, introspect, and manage the
cluster in aggregate. We built nsqadmin to do this. It provides a web UI to browse the hierarchy
of topics/channels/consumers and inspect depth and other key statistics for each layer. Additionally
it supports a few administrative commands such as removing and emptying a channel (which is a useful
tool when messages in a channel can be safely thrown away in order to bring depth back to 0).

Straightforward Upgrade Path
This was one of our highest priorities. Our production systems handle a large volume of traffic, all built upon our existing messaging tools, so we needed a way to slowly and methodically upgrade specific parts of our infrastructure with little to no impact.
First, on the message producer side we built nsqd to match simplequeue. Specifically, nsqd
exposes an HTTP /put endpoint, just like simplequeue, to POST binary data (with the one caveat
that the endpoint takes an additional query parameter specifying the “topic”). Services that wanted
to switch to start publishing to nsqd only have to make minor code changes.
Second, we built libraries in both Python and Go that matched the functionality and idioms we had been accustomed to in our existing libraries. This eased the transition on the message consumer side by limiting the code changes to bootstrapping. All business logic remained the same.
Finally, we built utilities to glue old and new components together. These are all available in the
examples directory in the repository:
nsq_pubsub- expose apubsublike HTTP interface to topics in an NSQ clusternsq_to_file- durably write all messages for a given topic to a filensq_to_http- perform HTTP requests for all messages in a topic to (multiple) endpoints
Eliminating SPOFs
NSQ is designed to be used in a distributed fashion. nsqd clients are connected (over TCP) to
all instances providing the specified topic. There are no middle-men, no message brokers, and no
SPOFs:

This topology eliminates the need to chain single, aggregated, feeds. Instead you consume directly from all producers. Technically, it doesn’t matter which client connects to which NSQ, as long as there are enough clients connected to all producers to satisfy the volume of messages, you’re guaranteed that all will eventually be processed.
For nsqlookupd, high availability is achieved by running multiple instances. They don’t
communicate directly to each other and data is considered eventually consistent. Consumers poll
all of their configured nsqlookupd instances and union the responses. Stale, inaccessible, or
otherwise faulty nodes don’t grind the system to a halt.
Message Delivery Guarantees
NSQ guarantees that a message will be delivered at least once, though duplicate messages are possible. Consumers should expect this and de-dupe or perform idempotent operations.
This guarantee is enforced as part of the protocol and works as follows (assume the client has successfully connected and subscribed to a topic):
- client indicates they are ready to receive messages
- NSQ sends a message and temporarily stores the data locally (in the event of re-queue or timeout)
- client replies FIN (finish) or REQ (re-queue) indicating success or failure respectively. If client does not reply NSQ will timeout after a configurable duration and automatically re-queue the message)
This ensures that the only edge case that would result in message loss is an unclean shutdown of an
nsqd process. In that case, any messages that were in memory (or any buffered writes not flushed
to disk) would be lost.
If preventing message loss is of the utmost importance, even this edge case can be mitigated. One
solution is to stand up redundant nsqd pairs (on separate hosts) that receive copies of the same
portion of messages. Because you’ve written your consumers to be idempotent, doing double-time on
these messages has no downstream impact and allows the system to endure any single node failure
without losing messages.
The takeaway is that NSQ provides the building blocks to support a variety of production use cases and configurable degrees of durability.
Bounded Memory Footprint
nsqd provides a configuration option --mem-queue-size that will determine the number of messages
that are kept in memory for a given queue. If the depth of a queue exceeds this threshold messages
are transparently written to disk. This bounds the memory footprint of a given nsqd process to
mem-queue-size * #_of_channels_and_topics:

Also, an astute observer might have identified that this is a convenient way to gain an even higher guarantee of delivery by setting this value to something low (like 1 or even 0). The disk-backed queue is designed to survive unclean restarts (although messages might be delivered twice).
Also, related to message delivery guarantees, clean shutdowns (by sending a nsqd process the
TERM signal) safely persist the messages currently in memory, in-flight, deferred, and in various
internal buffers.
Note, a channel whose name ends in the string #ephemeral will not be buffered to disk and will
instead drop messages after passing the mem-queue-size. This enables consumers which do not need
message guarantees to subscribe to a channel. These ephemeral channels will also not persist after
its last client disconnects.
Efficiency
NSQ was designed to communicate over a “memcached-like” command protocol with simple size-prefixed responses. All message data is kept in the core including metadata like number of attempts, timestamps, etc. This eliminates the copying of data back and forth from server to client, an inherent property of the previous toolchain when re-queueing a message. This also simplifies clients as they no longer need to be responsible for maintaining message state.
Also, by reducing configuration complexity, setup and development time is greatly reduced (especially in cases where there are >1 consumers of a topic).
For the data protocol, we made a key design decision that maximizes performance and throughput by
pushing data to the client instead of waiting for it to pull. This concept, which we
call RDY state, is essentially a form of client-side flow control.
When a client connects to nsqd and subscribes to a channel it is placed in a RDY state of 0.
This means that no messages will be sent to the client. When a client is ready to receive messages
it sends a command that updates its RDY state to some # it is prepared to handle, say 100. Without
any additional commands, 100 messages will be pushed to the client as they are available (each time
decrementing the server-side RDY count for that client).
Client libraries are designed to send a command to update RDY count when it reaches ~25% of the
configurable max-in-flight setting (and properly account for connections to multiple nsqd
instances, dividing appropriately).

This is a significant performance knob as some downstream systems are able to more-easily batch
process messages and benefit greatly from a higher max-in-flight.
Notably, because it is both buffered and push based with the ability to satisfy the need for
independent copies of streams (channels), we’ve produced a daemon that behaves like simplequeue
and pubsub combined . This is powerful in terms of simplifying the topology of our systems where
we would have traditionally maintained the older toolchain discussed above.
Go
We made a strategic decision early on to build the NSQ core in Go. We recently blogged about our use of Go at bitly and alluded to this very project - it might be helpful to browse through that post to get an understanding of our thinking with respect to the language.
Regarding NSQ, Go channels (not to be confused with NSQ channels) and the language’s built
in concurrency features are a perfect fit for the internal workings of nsqd. We leverage buffered
channels to manage our in memory message queues and seamlessly write overflow to disk.
The standard library makes it easy to write the networking layer and client code. The built in memory and cpu profiling hooks highlight opportunities for optimization and require very little effort to integrate. We also found it really easy to test components in isolation, mock types using interfaces, and iteratively build functionality.
Overall, it’s been a fantastic project to use as an opportunity to really dig into the language and see what it’s capable of on a larger scale. We’ve been extremely happy with our choice to use golang, its performance, and how productive we are using it.
EOL
We’ve been using NSQ in production for several months and we’re excited to share this with the open source community.
Across the 13 services we’ve upgraded, we’re processing ~35,000 messages/second at peak through the cluster. It has proved both performant and stable and made our lives easier operating our production systems.
There is more work to be done though — so far we’ve converted ~40% of our infrastructure. Fortunately, the upgrade process has been straightforward and well worth the short-term time tradeoff.
We’re really curious to hear what you think, so grab the source from github and try it out.
Finally, this labor of love began as scratching an itch — bitly provided an environment to experiment, build, and open source it… we’re always hiring.
Static Analysis, Syntax Checking, Code Formatting
Nothing describes the need for consistent code style more than this quote from Martin Fowler: “Any fool can write code that a computer can understand. Good programmers write code that humans can understand.”
I would amend that quote though, because in reality it’s hard to write good code for computers to understand, too. That is where tools that perform Static Analysis and Syntax Checking come into play. They both help track down and identify classes of problems that might otherwise go undetected, resulting in production issues.
We want to share a few specific and practical ways we perform Static Analysis and produce consistently styled code at bitly.
pyflakes and pep8 for Python
For python, we like to run our code through pyflakes. There are a few different versions of pyflakes floating around, but we use this one because it’s been rewritten to use AST so it’s fast, and it has output we like for scanning a whole directory tree.
Pyflakes for us looks like this
$ pyflakes --quiet
.................F........
$ pyflakes
./graphite/conf/graphite_settings.py:23: redefinition of unused
'rrdtool' from line 21
There are ways to plug this type of check into various editors. Some of us use editor plugins for Textmate, Sublime to give feedback immediately during development.
Pyflakes is good at highlighting syntax issues, but it doesn’t help resolve inconsistent python style. Do you put whitespace around operators? Do you inline your if statements? Tabs or spaces? To address some of these, we use a version of pep8 which has the ability to automatically apply consistent formatting.
$ pep8 --ignore=W293 --ignore=W391 --max-line-length=120 .
./deploy_ui.py:95:66: W291 trailing whitespace
./deploy_ui.py:253:62: E225 missing whitespace around operator
./deploy_ui.py:257:5: E303 too many blank lines (2>1)
./settings.py:7:10: E203 whitespace before ':'
$ pep8 --fix --inplace settings.py
settings.py:9:80: E501 line too long (121 characters)
settings.py:16:7: W291 trailing whitespace
settings.py:7:10: E203 whitespace before ':'
settings.py:9:54: E261 at least two spaces before inline comment
settings.py:50:1: E302 expected 2 blank lines, found 1
- pep8 fix: Writing changes to settings.py
- pep8 fix: Applying fix for E302 at line 50, offset 0
- pep8 fix: Applying fix for E261 at line 45, offset 34
- pep8 fix: Applying fix for E203 at line 45, offset 15
- pep8 fix: Applying fix for E203 at line 44, offset 26
- pep8 fix: Applying fix for E261 at line 43, offset 45
[truncated]
Bash Syntax Checking
Bash has a built in way to syntax check a file by running bash -n $file. We wrap this in a short
script to give us similar usage to pyflakes. If all goes well, and there are no
errors, it’s one dot per file checked. The syntax checking isn’t as extensive as pyflakes, but it beats finding out in
the middle of a production maintenance that the one script you needed to work has a syntax error.
$ test_shell_scripts.sh --quiet
.......................................F...................
$ test_shell_scripts.sh
./test.sh
./test.sh: line 7: unexpected EOF while looking for matching `)'
./test.sh: line 245: syntax error: unexpected end of file
Go Syntax Checking
We have written previously about our experience adopting golang as a primary language at bitly. One of the
golang choices we really like is directly related to code formatting. The Go core developers made a choice to preempt
coding style wars by designing a formatting tool go fmt as part of the language distribution.
Additionally, things that many languages list as warning (or just swallow completely) are complier errors in Go. Examples of errors in Go are: unused variables, undefined variables, missing imports, mixing types, etc. These issues are raised by the compiler and will keep code from being compiled. To this end, the language itself enforces static analysis and syntax checking at compile time, which is nice because we don’t need any extra tools.
C Style with astyle
For C code we don’t use a static analysis tool (we use dynamic tools like valgrind), but we do programmatically apply One True Brace Style with astyle. (We suggest a recent version of astyle from trunk; it has bug fixes we found useful). Our specific astyle syntax is applied with the following command
$ astyle --style=1tbs \
--lineend=linux \
--convert-tabs \
--preserve-date \
--fill-empty-lines \
--pad-header \
--indent-switches \
--align-pointer=name \
--align-reference=name \
--pad-oper \
--suffix=none \
$file
Javascript with Closure Compiler
We use Google’s Closure Compiler to prepare our Javascript files for production use. In addition to concatenating and compressing our Javascript, the Closure Compiler also provides syntax and type checking, variable reference checking, and warnings about common Javascript pitfalls:
$ java -jar closure-compiler.jar \
--js input.js \
--js_output_file output-min.js
input.js:83: WARNING - Suspicious code. This code lacks side-effects.
Is there a bug?
if(!data.mode) { data.mode === "save"; }
input.js:1150: ERROR - Parse error. Internet Explorer has a
non-standard intepretation of trailing commas. Arrays will
have the wrong length and objects will not parse at all.
EOL
If this sounds like how you would like to write code, bitly is hiring.
Go Go Gadget
bitly’s stack isn’t fancy. We use a combination of languages and tools that interact well with each other and promote philosophies such as explicitness, simplicity, and robustness. Most of our services are Python with many core components in C (of which some are open sourced under simplehttp). We believe whole-heartedly in a services oriented approach where components do one thing and do it well.
C is a fantastic language, one which I highly encourage all developers to learn and experiment with as it pulls away the veil and provides insight into all the things that are happening behind those seemingly innocent lines of a scripting language. With great power comes great responsibility and sometimes that power comes at a cost – primarily in terms of development and debugging time and thus the time it takes for you to ship your product.
Python has also treated us well. Its readability, standard library, and obviousness make it a great language to work with on a team. It performs well and is flexible to the point where it can be used successfully in most any situation – whether it be data science, network daemons, or one off shell scripts.
Enter Go
We identified early on that Go had all the makings of a language that could supersede some of the places we would have traditionally turned to C and some of the places where we wanted to move away from Python.
For example, we’ve built many network daemons in C that don’t require strict, absolute, control over memory. This is where Go shines. The “cost” of a garbage collected language in these types of situations is overshadowed by the huge benefit to development time and flexibility. Code becomes more readable as a natural side effect of being able to focus more on the “meat” of the problem instead of having to worry about low level details. Mix in the “batteries included” standard library and you’re cookin’.
Also, all of our C apps are built on libevent - i.e. a single-threaded event loop with callbacks that enables us to efficiently handle many thousands of simultaneous open connections. We use the same technique in Python via tornado, but the amount of work a single Python process can do is limited largely by runtime overhead. Additionally, this style of code can be hard to read and debug when you’re not used to it. Go’s lightweight concurrency model allows you to write in an imperative style while providing a built-in way to leverage the multi-core architecture of the computers we operate on.
Just as in C, Go promotes a style of very explicit and localized error handling. While sometimes verbose it forces a developer to really think about how their program might fail and what to do about it. Unlike C, Go has the built-in capability of returning multiple values from a function, resulting in clearer and more concise error-handling code.
The points above are mostly low-level. Go’s true power comes from the fact that the language is small and the standard library is large. We’ve had developers start from zero experience with Go to writing working, production, code in a day. That is why we’re so excited about its place in our stack.
Think about it like this… if you can write something in Go just as fast as you could in Python and:
- gain the speed and robustness of a compiled, statically typed language without all of the rope to hang yourself
- clearly express concurrent solutions to parallelizable problems
- sacrifice little to nothing in terms of functionality
- unambiguously produce consistently styled code (thank you
go fmt)
What would you choose?
And in an effort to encourage the team to experiment and learn about Go we created the “Go Gopher Award”! The most recent engineer to learn the language and get code into production gets to have the Go Gopher Squishable sit with them at their desk. This is an extremely prestigious award.
Go Isn’t Perfect
Let’s talk about garbage collection. It isn’t free. Go’s garbage collector is a conservative, stop-the-world, mark-and-sweep GC. Practically speaking this means that the more careless you are in what, how many, and how often you allocate the longer the world stops when the Go runtime cleans things up. Unsurprisingly, significant performance gains can be had by being more careful… not only in raw speed but in the consistency of it. Fortunately Go provides fantastic hooks (pprof) to introspect the runtime behavior of your application. It’s generally obvious when your bottleneck is the GC.
Interacting with JSON can be a bit clumsy. It’s not terrible… just isn’t as elegant and readable as the alternative in Python. Truthfully it’s quite impressive that it’s as good as it is considering the type safety of the language. We’ve made an effort to make this even easier and open sourced simplejson, a package that exposes a clean, chainable, API for interacting with JSON where the format may not be known ahead of time.
Packaging has also given us some trouble. This is an area where we’ve probably gone the most back-and-forth and are admittedly still in the early stages of finding what works for us. The biggest problem is somewhat related to its purported greatest strength, i.e. qualifying the full path when importing an open-source package. This has led to a whole crop of third-party utilities that work around this by re-writing fully qualified import paths to something the user specifies (and installing the package as the same, modified, name). We use go-install-as and our policy is to install non-standard packages under our own custom import paths to ensure that our apps are always compiled against the version and source we’ve strictly enforced to be installed on a machine.
One notable feature missing from the HTTP client in Go’s standard library is the ability to set timeouts. It is further complicated by the inability of the API to provide easy access to the connection (to do it yourself). We set out to bridge the gap and wrote go-httpclient (with the expectation that these features will eventually make it into the standard library).
Oh, and one night after way too much drinking they decided to use a radical syntax for
formatting time as strings (instead of your standard strftime parameters). We open sourced
go-strftime to remedy that.
All in all these “issues” are minor and ultimately we felt that the benefits far outweigh the tradeoffs – Go meets our needs and philosophies as software engineers.
Go In Production
Where are we actually using Go in production? Let’s take a step back and talk about how data moves through our infrastructure first.
In the spirit of keeping things simple, fault tolerant, and asynchronous, most everything that gets
written to downstream systems is done so via message queues (formatted in JSON with all
communication over HTTP). Some of these message queues process volumes in the thousands per second.
We call the workers queuereaders, all of which were originally written in Python. At the lower
level are two C apps – simplequeue, an HTTP interface to an in-memory data agnostic queue, and
pubsub, an HTTP interface for one-to-many streaming.
As volume increases, the overhead of processing each message becomes more significant. Therefore, these applications emerged as one key area where Go could replace Python. We started by re-writing some queuereaders in Go by building matching libraries to support the same API we expose in Python. Also, whereas in Python we would run multiple processes, in Go we leverage channels and goroutines to write straightforward code that parallelizes HTTP requests to downstream systems or aggregates writes to disk. These same features make it really easy to batch retrieve messages from the queue while still processing them independently, reducing round-trips.
By experimenting with non-critical systems at the edge of our infrastructure we were able to safely and methodically learn about Go’s behavior in production. Needless to say, we’ve been very happy. We’ve seen consistent, measurable performance gains without sacrificing stability.
We also wanted a bit more functionality out of the core combination of simplequeue and pubsub,
so we decided to build a successor with the goal of improving message delivery guarantees,
maintaining the spirit and simplicity of what we already had, increasing performance, solving some
configuration and setup issues, and providing a straightforward upgrade path.
We realized Go would be perfect for this project. We’ll talk about it in depth in a future blog post (and don’t worry, it will be open source). Is it any good? Yes.
EOL
Finally, we’ve also open sourced a few other Go related items…
- go-notify - a package to standardize one-to-many communication over channels
- file2http - a utility to spray a line-oriented file at an HTTP endpoint
As always, bitly is hiring.