Building NSQ Client Libraries

Brace yourself, this is a long one.

The following guide was originally intended for client library developers to describe in detail all the important features and functionality we expected in an NSQ client library.

While writing it we began to realize that it had value beyond client library developers. It incorporates a comprehensive analysis of most of the capabilities of NSQ (both client and server) and is therefore interesting and useful for end-users as well (or anyone using or interested in infrastructure messaging platforms).

If you need some background on NSQ please see our original blog post or its follow up, spray some NSQ on it.

Intro

NSQ’s design pushes a lot of responsibility onto client libraries in order to maintain overall cluster robustness and performance.

This guide attempts to outline the various responsibilities well-behaved client libraries need to fulfill. Because publishing to nsqd is trivial (just an HTTP POST to the /put endpoint), this document focuses on consumers.

By setting these expectations we hope to provide a foundation for achieving consistency across languages for NSQ users.

Overview

  1. Configuration
  2. Discovery (optional)
  3. Connection Handling
  4. Feature Negotiation
  5. Data Flow / Heartbeats
  6. Message Handling
  7. RDY State
  8. Backoff

Configuration

At a high level, our philosophy with respect to configuration is to design the system to have the flexibility to support different workloads, use sane defaults that run well “out of the box”, and minimize the number of dials.

A client subscribes to a topic on a channel over a TCP connection to nsqd instance(s). You can only subscribe to one topic per connection so multiple topic consumption needs to be structured accordingly.

Using nsqlookupd for discovery is optional so client libraries should support a configuration where a client connects directly to one or more nsqd instances or where it is configured to poll one or more nsqlookupd instances. When a client is configured to poll nsqlookupd the polling interval should be configurable. Additionally, because typical deployments of NSQ are in distributed environments with many producers and consumers, the client library should automatically add jitter based on a random % of the configured value. This will help avoid a thundering herd of connections. For more detail see Discovery.

An important performance knob for clients is the number of messages it can receive before nsqd expects a response. This pipelining facilitates buffered, batched, and asynchronous message handling. By convention this value is called max_in_flight and it effects how RDY state is managed. For more detail see RDY State.

Being a system that is designed to gracefully handle failure, client libraries are expected to implement retry handling for failed messages and provide options for bounding that behavior in terms of number of attempts per message. For more detail see Message Handling.

Relatedly, when message processing fails, the client library is expected to automatically handle re-queueing the message. NSQ supports sending a delay along with the REQ command. Client libraries are expected to provide options for what this delay should be set to initially (for the first failure) and how it should change for subsequent failures. For more detail see Backoff.

Most importantly, the client library should support some method of configuring callback handlers for message processing. The signature of these callbacks should be simple, typically accepting a single parameter (an instance of a “message object”).

Discovery

An important component of NSQ is nsqlookupd, which provides a discovery service for consumers to locate producers of a given topic at runtime.

Although optional, using nsqlookupd greatly reduces the amount of configuration required to maintain and scale a large distributed NSQ cluster.

When a client uses nsqlookupd for discovery, the client library should manage the process of polling all nsqlookupd instances for an up-to-date set of nsqd producers and should manage the connections to those producers.

Querying an nsqlookupd instance is straightforward. Perform an HTTP request to the lookup endpoint with a query parameter of the topic the client is attempting to discover (i.e. /lookup?topic=clicks). The response format is JSON:

Example JSON Response From nsqlookupd

The broadcast_address and tcp_port should be used to connect to an nsqd producer. Because, by design, nsqlookupd instances don’t coordinate their lists of producers, the client library should union the lists it received from all nsqlookupd queries to build the final list of nsqd producers to connect to. The broadcast_address:tcp_port combination should be used as the unique key for this union.

A periodic timer should be used to repeatedly poll the configured nsqlookupd so that clients will automatically discover new producers. The client library should automatically initiate connections to all newly found nsqd producers.

When client library execution begins it should bootstrap this polling process by kicking off an initial set of requests to the configured nsqlookupd instances.

Connection Handling

Once a client has an nsqd producer to connect to (via discovery or manual configuration), it should open a TCP connection to broadcast_address:port. A separate TCP connection should be made to each nsqd for each topic the client wants to consume.

When connecting to an nsqd instance, the client library should send the following data, in order:

  1. the magic identifier
  2. an IDENTIFY command (and payload) and read/verify response
  3. a SUB command (specifying desired topic) and read/verify response
  4. an initial RDY count of 1 (see RDY State).

(low-level details on the protocol are available in the spec)

Reconnection

Client libraries should automatically handle reconnection as follows:

Feature Negotiation

The IDENTIFY command can be used to set nsqd side metadata, modify client settings, and negotiate features. It satisfies two needs:

  1. In certain cases a client would like to modify how nsqd interacts with it (currently this is limited to modifying a client’s heartbeat interval but you can imagine this could evolve to include enabling compression, TLS, output buffering, etc.)
  2. nsqd responds to the IDENTIFY command with a JSON payload that includes important server side configuration values that the client should respect while interacting with the instance.

After connecting, based on the user’s configuration, a client library should send an IDENTIFY command (the body of an IDENTIFY command is a JSON payload), e.g:

Example IDENTIFY Command JSON Payload

The feature_negotiation field indicates that the client can accept a JSON payload in return. The short_id and long_id are arbitrary text fields that are used by nsqd (and nsqadmin) to identify clients. heartbeat_interval configures the interval between heartbeats on a per-client basis.

The nsqd will respond OK if it does not support feature negotiation (introduced in nsqd v0.2.20+), otherwise:

Example IDENTIFY Response JSON Payload

More detail on the use of the max_rdy_count field is in the RDY State section.

Data Flow and Heartbeats

Once a client is in a subscribed state, data flow in the NSQ protocol is asynchronous. For consumers, this means that in order to build truly robust and performant client libraries they should be structured using asynchronous network IO loops and/or “threads” (the scare quotes are used to represent both OS-level threads and userland threads, like coroutines).

Additionally clients are expected to respond to periodic heartbeats from the nsqd instances they’re connected to. By default this happens at 30 second intervals. The client can respond with any command but, by convention, it’s easiest to simply respond with a NOP whenever a heartbeat is received. See the protocol spec for specifics on how to identify heartbeats.

A “thread” should be dedicated to reading data off the TCP socket, unpacking the data from the frame, and performing the multiplexing logic to route the data as appropriate. This is also conveniently the best spot to handle heartbeats. At the lowest level, reading the protocol involves the following sequential steps:

  1. read 4 byte big endian uint32 size
  2. read size bytes data
  3. unpack data
  4. profit
  5. goto 1

A Brief Interlude on Errors

Due to their asynchronous nature, it would take a bit of extra state tracking in order to correlate protocol errors with the commands that generated them. Instead, we took the “fail fast” approach so the overwhelming majority of protocol-level error handling is fatal. This means that if the client sends an invalid command (or gets itself into an invalid state) the nsqd instance it’s connected to will protect itself (and the system) by forcibly closing the connection (and, if possible, sending an error to the client). This, coupled with the connection handling mentioned above, makes for a more robust and stable system.

The only errors that are not fatal are:

Because these errors are most often timing issues, they are not considered fatal. These situations typically occur when a message times out on the nsqd side and is re-queued and delivered to another client. The original recipient is no longer allowed to respond on behalf of that message.

Message Handling

When the IO loop unpacks a data frame containing a message, it should route that message to the configured handler for processing.

The nsqd producer expects to receive a reply within its configured message timeout (default: 60 seconds). There are a few possible scenarios:

  1. The handler indicates that the message was processed successfully.
  2. The handler indicates that the message processing was unsuccessful.
  3. The handler decides that it needs more time to process the message.
  4. The in-flight timeout expires and nsqd automatically re-queues the message.

In the first 3 cases, the client library should send the appropriate command on the client’s behalf (FIN, REQ, and TOUCH respectively).

The FIN command is the simplest of the bunch. It tells nsqd that it can safely discard the message. FIN can also be used to discard a message that you do not want to process or retry.

The REQ command tells nsqd that the message should be re-queued (with an optional parameter specifying the amount of time to defer additional attempts). If the optional parameter is not specified by the client, the client library should automatically calculate the duration in relation to the number of attempts to process the message (a multiple is typically sufficient). The client library should discard messages that exceed the configured max attempts. When this occurs, a user-supplied callback should be executed to notify and enable special handling.

If the message handler requires more time than the configured message timeout, the TOUCH command can be used to reset the timer on the nsqd side. This can be done repeatedly until the message is either FIN or REQ, up to the nsqd producer’s configured max timeout. Client libraries should never automatically TOUCH on behalf of the client.

If the nsqd instance receives no response, the message will time out and be automatically re-queued for delivery to an available client.

Finally, a property of each message is the number of attempts. Client libraries should compare this value against the configured max and discard messages that have exceeded it. When a message is discarded there should be a callback fired. Typical default implementations of this callback might include writing to a directory on disk, logging, etc. The user should be able to override the default handling.

RDY State

Because messages are pushed from nsqd to clients we needed a way to manage the flow of data in userland rather than relying on low-level TCP semantics. A client’s RDY state is NSQ’s flow control mechanism.

As outlined in the configuration section, a consumer is configured with a max_in_flight. This is a concurrency and performance knob, e.g. some downstream systems are able to more-easily batch process messages and benefit greatly from a higher max-in-flight.

When a client connects to nsqd (and subscribes) it is placed in an initial RDY state of 0. No messages will be sent to the client.

Client libraries have a few responsibilities:

  1. bootstrap and evenly distribute the configured max_in_flight to all connections.
  2. never allow the aggregate sum of RDY counts for all connections (total_rdy_count) to exceed the configured max_in_flight.
  3. never exceed the per connection nsqd configured max_rdy_count.
  4. expose an API method to reliably indicate message flow starvation

1. Bootstrap and Distribution

There are a few considerations when choosing an appropriate RDY count for a connection (in order to evenly distribute max_in_flight):

To kickstart message flow a client library needs to send an initial RDY count. Because the eventual number of connections is often not known ahead of time it should start with a value of 1 so that the client library does not unfairly favor the first connection(s).

Additionally, after each message is processed, the client library should evaluate whether or not it’s time to update RDY state. An update should be triggered if the current value is 0 or if it is below ~25% of the last value sent.

The client library should always attempt to evenly distribute RDY count across all connections. Typically, this is implemented as max_in_flight / num_conns.

However, when max_in_flight < num_conns this simple formula isn’t sufficient. In this state, client libraries should perform a dynamic runtime evaluation of producer “liveness” by measuring the duration of time since it last received a message for a connection. After a configurable expiration, it should re-distribute whatever RDY count is available to a new (random) set of producers. By doing this, you guarantee that you’ll (eventually) find producers with messages. Clearly this has a latency impact.

2. Maintaining max_in_flight

The client library should maintain a ceiling for the maximum number of messages in flight for a given consumer. Specifically, the aggregate sum of each connection’s RDY count should never exceed the configured max_in_flight.

Below is example code in Python to determine whether or not the proposed RDY count is valid for a given connection:

send_ready.py

3. Producer Max RDY Count

Each nsqd is configurable with a --max-rdy-count (see feature negotiation for more information on the handshake a client can perform to ascertain this value). If the client sends a RDY count that is outside of the acceptable range its connection will be forcefully closed. For backwards compatibility, this value should be assumed to be 2500 if the nsqd instance does not support feature negotiation.

4. Message Flow Starvation

Finally, the client library should provide an API method to indicate message flow starvation. It is insufficient for clients (in their message handlers) to simply compare the number of messages they have in-flight vs. their configured max_in_flight in order to decide to “process a batch”. There are two cases when this is problematic:

  1. When clients configure max_in_flight > 1, due to variable num_conns, there are cases where max_in_flight is not evenly divisible by num_conns. Because the contract states that you should never exceed max_in_flight, you must round down, and you end up with cases where the sum of all RDY counts is less than max_in_flight.
  2. Consider the case where only a subset of producers have messages. Because of the expected even distribution of RDY count, those active producers only have a fraction of the configured max_in_flight.

In both cases, a client will never actually receive max_in_flight # of messages. Therefore, the client library should expose a method is_starved that will evaluate whether any of the connections are starved, as follows:

is_starved.py

The is_starved method should be used by message handlers to reliably identify when to process a batch of messages.

Backoff

The question of what to do when message processing fails is a complicated one to answer. The message handling section detailed client library behavior that would defer the processing of failed messages for some (increasing) duration of time. The other piece of the puzzle is whether or not to reduce throughput. The interplay between these two pieces of functionality is crucial for overall system stability.

By slowing down the rate of processing, or “backing off”, the consumer allows the downstream system to recover from transient failure. However, this behavior should be configurable as it isn’t always desirable, such as situations where latency is prioritized.

Backoff should be implemented by sending RDY 0 to the appropriate producers, stopping message flow. The duration of time to remain in this state should be calculated based on the number of repeated failures (exponential). Similarly, successful processing should reduce this duration until the reader is no longer in a backoff state.

While a reader is in a backoff state, after the timeout expires, the client library should only ever send RDY 1 regardless of max_in_flight. This effectively “tests the waters” before returning to full throttle. Additionally, during a backoff timeout, the client library should ignore any success or failure results with respect to calculating backoff duration (i.e. it should only take into account one result per backoff timeout).

client flow

Bringing It All Together

Distributed systems are fun.

The interactions between the various components of an NSQ cluster work in concert to provide a platform on which to build robust, performant, and stable infrastructure. We hope this guide shed some light as to how important the client’s role is.

In terms of actually implementing all of this, we treat pynsq and go-nsq as our reference codebases. The structure of pynsq can be broken down into three core components:

We’re happy to help support anyone interested in building client libraries for NSQ. We’re looking for contributors to continue to expand our language support as well as flesh out functionality in existing libraries. The community has already open sourced client libraries for 7 languages:

If you’re interested in hacking on NSQ, check out the GitHub repo. We also publish binary distributions for Linux and Darwin to make it easy to get started.

If you have any specific questions, feel free to ping me on twitter @imsnakes.

Finally, a huge thank you to the crew at bitly. @mccutchen and @ploxiln for their tireless review and @jehiah my partner in crime on NSQ.

Speeding things up with Redshift

Recently we’ve started to experiment with using Redshift, Amazon’s new data warehousing service. More specifically, we’re using it to speed up and expand our ad hoc data analysis.

The Challenge

bitly sees billions of clicks and shortens each month. Often we have various questions about the data generated from this activity. Sometimes these questions are driven by business needs (how much traffic do we see from a potential enterprise customer), sometimes they are more technically driven (how much traffic will a new sub-system need to deal with), and sometimes we like to just have fun (what are the top trashy celeb stories this week).

Unfortunately, when working with that volume of data it can be pretty difficult to do much of anything quickly. Pre-Redshift, all of these questions were answered by writing map-reduce jobs to be run on our Hadoop cluster or on Amazon’s EMR.

Whenever we wanted to answer a question with our data, the process would look something like this:

  1. Write map-reduce job in Python
  2. Run it on some local test data
  3. Fix bugs.
  4. Run it on the Hadoop cluster
  5. Wait 20-30 minutes for results
  6. Get an error back from Hadoop
  7. Dig through the logs to find the error.
  8. GOTO 3

This is clearly not ideal when all you want to do is get a simple count.

For a lot of the work we do Hadoop + Python make for an awesome combination, but for these ad hoc aggregation queries they’re very blunt instruments. In both cases, they are general purpose tools that are super flexible, but slow and difficult to use for this specific use case.

Redshift, on the other hand, is specifically built and optimized for doing aggregation queries over large sets of data. When we want to answer a question with Redshift, we just write a SQL query and get an answer within a few minutes—if not seconds.

Overall, our experience with Redshift has been a positive one but we have run into some gotchas that we’ll get into below.

The Good News

User Experience

From a user perspective, we’re really happy with Redshift. Any one of our developers or data scientists just need to write a SQL query and they have an answer to their question in less than 5 minutes. Moving from our old hadoop based workflow to an interactive console session with Redshift is a major improvement.

Additionally, since much of the user facing bits of Redshift are based on PostgreSQL there is a large ecosystem of mature, well-documented tools and libraries for us to take advantage of.

Finally, while it can be a bit slow at times, we’ve been very impressed with the web management console Amazon provides with Redshift. For a 1.0 product, the console is comprehensive and offers much more information than we expected it to.

Performance

For our current use case of ad hoc research queries, Redshift’s performance is adequate. Most queries return a response in less than five minutes and we rarely have many users executing queries concurrently.

That being said, we have done some experimentation with competing products (e.g. Vertica) and have seen better performance out of those tools. This is especially true for more complex queries that benefit from projections/secondary indexes and situations where the cluster’s resources are under contention.

Documentation

Just like the rest of AWS, Amazon provides reasonably comprehensive and thorough documentation. For everything that is directly exposed to us as users (e.g. loading operations, configuration params, etc) we are very happy with the documentation. The only places where we felt we wanted more information were those where Amazon makes things “just work”. Most significantly we would like to see more details about what exactly happens when a node in the cluster fails and how the cluster is expected to behave in that state.

Cost

We wouldn’t go so far as to call Redshift cheap, but compared to many competitors it is pretty cost effective. The biggest gotcha here is that while the simple model for scaling Redshift clusters and tuning performance within a cluster is nice as a user, it does mean that you have a bit of a one-size-fits all situation.

In our case we are computationally and I/O constrained so we’re paying for a bunch of storage capacity and memory that we don’t use. At our current scale, things work out okay but as we continue to grow it may make sense to take advantage of something else that is more flexible in terms of both hardware and tuning.

The Bad News

Data Loading

We had to spend a lot of time getting our data into Redshift. This is partially our fault since our dataset is not the cleanest in the world, but overall this is the place that we felt the most pain from an immature tool chain.

The majority of bitly’s at-rest data is stored in line-oriented (i.e. one JSON blob per line) JSON files. The primary way to load data into Redshift is to use the COPY command to point the cluster at a pile of CSV or TSV files stored on S3. Clearly we needed to build out some kind of tool chain to transform our JSON logs into flat files.

Initially, we just wrote a simple Python script that would do the transformation. Unfortunately, we quickly discovered that this simple approach would be too slow. We estimated that it would have taken a month to process and load all the data we wanted in Redshift.

Next, we realized that we had a tool for easily doing highly parallelized, distributed text processing: Hadoop. Accordingly, we re-worked our quick Python script into a hadoop job to transform our logs in a big batch. Since we already keep a copy of our raw logs in S3, EMR proved to be a great tool for this

Overall this process worked well but we did still run into a few gotchas loading the flattened data into Redshift:

Now that we have a large body of data loaded into Redshift, we’re working on building out tooling based on NSQ to do our data prep work in a streaming fashion that should allow us to easily do smaller incremental updates.

In the end, we worked through our data loading issues with Redshift but it was one of the more acute pain points we encountered. From our conversations with Amazon, they’re definitely aware of this and we’re interested to see what they’ll come out with, but for now the provided tooling is pretty limited.

Availability

Long term, there are a number of periodic and online tasks that we’re thinking about using a tool like Redshift for. Unfortunately, as things stand today we would not be comfortable relying on Redshift as a highly available system.

Currently, if any node within a Redshift cluster fails, all outstanding queries will fail and Amazon will automatically start replacing the failed node. In theory this recovery should happen very quickly. The cluster will be available for querying as soon as the replacement node is added, but performance on the cluster will be degraded while data is restored on to the new node.

At this point, we have no data on how well this recovery process works in the real world. Additionally, we have concerns about how well this process will work when there are larger issues happening within an availability zone or region. Historically there are a number of cases where issues within one Amazon service (e.g. EBS) have cascaded into other services leading to long periods of unavailability or degradation.

Until there’s a significant track record behind a system like this, we’re hesitant to trust anything that will “automatically recover”.

There is the option of running a mirrored Redshift cluster in a different AZ or region, but that gets expensive fast. Additionally, we’d have to build out even more tooling to make sure those two clusters stay in sync with each other.

Limited Feature Set

Redshift is very impressive feature-wise for a 1.0 product. That being said, a number of the competing products have been around for a while and offer some major features that Reshift lacks.

The biggest missing feature for us with Redshift is some kind of secondary indexing or projections. Right now, if you sort or filter by any column other than the SORTKEY, Redshift will do a full table scan. Technically you could create a copy of that table with a different sort key but then it would become your problem to keep those tables in sync and to query the right table at the right time.

Some other “missing” features include tools for working with time series data, geospatial query tools, advanced HA features, and more mature data loading tools.

The Bottom Line

Redshift is great for our needs today, but we’ll see if Amazon’s development keeps up as our needs change going forward. Given Amazon’s impressive track record for quickly iterating on and improving products we’re hopeful, but we do have our eyes on competing products as our use of data warehousing tools matures.

by Sean O’Connor

Securing Internal Applications

A common infrastructure problem is managing access to internal applications. Some patterns for solving that problem include: fronting internal apps with HTTP Basic or HTTP Digest authentication through Apache or nginx, running the apps on secret ports, and bundling an authentication system as part of each application. Many off-the-shelf applications include no built-in authentication, and count on upstream proxies/systems for access control which limits options for securing them. Google Auth Proxy is a tool we have developed to give us a better ability to secure internal applications by using Google Account authentication (like many other startups, we rely on Google Apps).

At bitly, we like fronting HTTP applications with Nginx and use that approach for nearly all of our stack, from Tornado apps handling our core APIs to common internal tools (e.g. Nagios, Munin, Graphite) to homegrown tools like NSQ and our deploy system. For many of those systems, we took advantage of this by configuring HTTP Basic authentication to restrict access. Using Basic Auth however led to situations where the authentication information would be stored in configuration files and scripts, leaking it throughout the repository.

Some of our homegrown tools initially relied on bitly’s OAuth 2 service for authentication, which meant that sign in was managed through bitly accounts by the OAuth flow. This left each application to handle authorization by maintaining a list of bitly accounts that actually had permission to access the app. This was an improvement over HTTP Basic Auth as each application did not need to accept or store passwords, but it was less than ideal because each app still needed to store and manage its own list of authorized users. Over time, this led to a large number of disparate systems with separate authorization lists. This approach was only feasible for internal tools and thus couldn’t apply to other open source applications we were using.

We developed Google Auth Proxy as a new tool to use in securing internal applications. It is a HTTP Reverse Proxy that provides authentication using Google’s OAuth2 API with flexibility to authorize individual Google Accounts (by email address) or a whole Google apps domain. For internal applications, this is convenient because we can now allow our whole @bit.ly Google apps domain without separately managing accounts or passwords.

Google Auth Proxy requires a limited set of privileges in order to authenticate users (asking for only the userinfo.email and userinfo.profile OAuth2 scopes). This authentication information is then passed to the upstream application as HTTP Basic Auth (with an empty value for the password), and in a HTTP Header as X-Forwarded-User for applications that need that context.

Google Auth Proxy has been in production for 6 months and has helped reduce overhead for managing internal applications (no setup time for new or removed accounts) and has made it easier for us to open up access for important tools to the entire company. It’s also worth mentioning that if you use two-factor-authentication with your Google accounts, that security carries over to improved authentication for Google Auth Proxy.

We chose to write Google Auth Proxy in Go because golang has built in support for writing a concurrent ReverseProxy in the net/http/httputil package. This meant that little work was needed to proxy requests and we could focus on just writing the authentication layer.

If you find this useful, we’d love to hear about it @bitly

Forget Table

Forget Table is a solution to the problem of storing the recent dynamics of categorical distributions that change over time (ie: non-stationary distributions).

What does this mean? Why is this useful? What makes this the most important thing since sliced bread?! Read on!

Storing distributions is crucial for doing any sort of statistics on a dataset. Normally, this problem is as easy as keeping counters of how many times we’ve seen certain quantities, but when dealing with data streams that constantly deliver new data, these simple counters no longer do the job. They fail because data we saw weeks ago has the same weight as data we have just seen, even though the fundamental distribution the data is describing may be changing. In addition it provides an engineering challenge since the counters would strictly grow and very soon use all the resources available to it. This is why we created Forget Table.

Background

A categorical distribution describes the probability of seeing an event occur out of a set of possible events. So an example of this at bitly is that every click coming through our system comes from one of a fixed number of country codes (there are about 260). We would like to maintain a categorical distribution, that assigns a probability to each country, that describes how likely any given click comes from each country. With bitly’s data, this is going to give a high weight to the US and lower weights to Japan and Brazil, for example.

It’s very useful for bitly to store distributions like this, as it gives us a good idea as to what’s considered ‘normal’. Most of the time we produce analytics that show, for example, how many clicks came from a given country, or referrer. While this gives our clients a sense of where there traffic is coming from, it doesn’t directly express how surprising their traffic is. This can be remedied by maintaining a distribution over countries for all of bitly, so that we can identify anomalous traffic, ie: when a particular link is getting disproportionally more clicks from Oregon than we would normally expect.

The difficulty that Forget Table deals with comes from the fact that what bitly considers normal changes constantly. At 8am on the East Coast of the US, we’d be surprised to see a lot of traffic coming from San Francisco (they’re all still asleep over there) however at 11am EST we should expect to see tons of traffic coming from San Francisco. So unless we allow our understanding of what’s considered normal to change over time, we are going to have a skewed idea of normality.

This is a general problem over longer time scales, too. The behavior of the social web is different in December than it is in July, and it’s different in 2012 than it was in 2011. We need to make sure that our understanding of normality changes with time. One way of achieving this is to forget old data that’s no longer relevant to our current understanding. This is where Forget Table comes in.

Why forget in the first place?

The basic problem is that the fundamental distribution (ie: the distribution that the data is actually being created from) is changing. This property is called being “non-stationary”. Simply storing counts to represent your categorical distribution makes this problem worse because it keeps the same weight for data seen weeks ago as it does for the data that was just observed. As a result, the total count method simply shows the combination of every distribution the data was drawn from (which will approach a Gaussian by the central limit theorem).

A naive solution to this would be to simply have multiple categorical distributions that are in rotation. Similar to the way log files get rotated, we would have different distributions that get updated at different time. This approach, however, can lead to many problems.

When a distribution first gets rotated in, it has no information about the current state of the world. Similarly, at the end of a distribution’s rotation it is just as affected by events from the beginning of it’s rotation as it is by recent events. This creates artifacts and dynamics in the data which are only dependent on the time of and time between rotations. These effects are similar to various pathologies that come from binning data or by simply keeping a total count.

On the other hand, forgetting things smoothly using rates we have a continuous window of time that our distribution is always describing. The further back in time the event is, the less of an effect it has on the distribution’s current state.

Forgetting Data Responsibly

Forget table takes a principled approach to forgetting old data, by defining the rate at which old data should decay. Each bin of a categorical distribution forgets its counts depending on how many counts it currently has and a user specified rate. With this rule, bins that are more dominant get decayed faster than bins without many counts. This method also has the benefit of being a very simple process (one that was inspired by the decay of radioactive particles) which can be calculated very quickly. In addition, since bins with high counts get decayed faster than bins with low counts, this process helps smooth out sudden spikes in data automatically.

If the data suddenly stopped flowing into the Forget Table, then all the categorical distributions would eventually decay to the uniform distribution - each bin would have a count of 1 (at which point we stop decaying), and z would be equal to the number of bins (see a visualization of this happening). This captures the fact that we no longer have any information about the distribution of the variables in Forget Table.

The result of this approach is that the counts in each bin, in each categorical distribution, decay exponentially. Each time we decrement the count in a bin, we also decrement the normalising constant z. When using ForgetTable, we can choose a rate at which things should decay, depending on the dominant time constants of the system.

Building Categorical Distributions in Redis

We store the categorical distribution as a set of event counts, along with a ‘normalising constant’ which is simply the number of all the events we’ve stored. In the country example, we have ~260 bins, one per country, and in each bin we store the number of clicks we’ve seen from each country. Alongside it, our normalising constant stores the total number of clicks we’ve seen across all countries.

All this lives in a Redis sorted set where the key describes the variable which, in this case, would simply be bitly_country and the value would be a categorical distribution. Each element in the set would be a country and the score of each element would be the number of clicks from that country. We store a separate element in the set (traditionally called z) that records the total number of clicks stored in the set. When we want to report the categorical distribution, we extract the whole sorted set, divide each count by z, and report the result.

Storing the categorical distribution in this way allows us to make very rapid writes (simply increment the score of two elements of the sorted set) and means we can store millions of categorical distributions in memory. Storing a large number of these is important, as we’d often like to know the normal behavior of a particular key phrase, or the normal behavior of a topic, or a bundle, and so on.

Forgetting Data Under Strenuous Circumstances

This seems simple enough, but we have two problems. The first is that we have potentially millions of categorical distributions. Bitly maintains information for over a million separate key phrases at any given time, and (for some super secret future projects) it is necessary to store a few distributions per key phrase. So we are unable to iterate through each key of our Redis table in order to do our decays, so cron-like decays wouldn’t be feasible (ie: decaying every distribution in the database every several minutes).

The second problem is that data is constantly flowing into multiple distributions: we sometimes see spikes of up to 3 thousand clicks per second which can correspond to dozens of increments per second. At this sort of high volume, there is simply too much contention between the decay process and the incoming increments to safely do both.

So the real contribution of Forget Table is an approach to forgetting data at read time. When we are interested in the current distribution of a particular variable, we extract whatever sorted set is stored against that variable’s key and decrement the counts at that time.

It turns out that, using the simple rate based model of decay from above, we can decrement each bin by simply sampling an integer from a Poisson distribution whose rate is proportional to the current count of the bin and the length of time it has been since we last decayed that bin. So, by storing another piece of information, the time since we last successfully decayed this distribution, we can calculate the amount of counts to discard very cheaply (this algorithm is an approximation to Gillespie’s algorithm used to simulate stochastic systems).

In Redis we implement this using pipelines. Using a pipeline, we read the sorted set, form the distribution, calculate the amount of decay for each bin and then attempt to perform the decay. Assuming nothing’s written into the sorted set in that time, we decay each bin and update the time since last decayed. If the pipeline has detected a collision — either another process has decayed the set or a new event has arrived — we abandon the update. The algorithm we’ve chosen means that it’s not terribly important to actually store the decayed version of the distribution, so long as we know the time between the read and the last decay.

Get the Code and We’re Hiring!

The result is a wrapper on top of Redis that runs as a little HTTP service. Its API has an increment handler on /incr used for incrementing counts based on new events, and a get handler on /get used for retrieving a distribution. In addition, there is an /nmostprobable endpoint that returns the n categories which have the highest probability of occurring.

There are two versions, one in Go (for super speed) and the other in Python. The code is open source and up on Github, available at http://bitly.github.com/forgettable.

As always, if you like what you see (or feel the need to make improvements), don’t forget that bitly is hiring!.

Get ready for more posts on science at bitly, and If you have any specific questions, feel free to ping me on twitter @mynameisfiber.

spray some NSQ on it

We released NSQ on October 9th 2012. Supported by 3 talks and a blog post, it’s already the 4th most watched Go project on GitHub. There are client libraries in 7 languages and we continue to talk with folks experimenting with and transitioning to the platform.

This post aims to kickstart the documentation available for “getting started” and describe some NSQ patterns that solve a variety of common problems.

DISCLAIMER: this post makes some obvious technology suggestions to the reader but it generally ignores the deeply personal details of choosing proper tools, getting software installed on production machines, managing what service is running where, service configuration, and managing running processes (daemontools, supervisord, init.d, etc.).

Metrics Collection

Regardless of the type of web service you’re building, in most cases you’re going to want to collect some form of metrics in order to understand your infrastructure, your users, or your business.

For a web service, most often these metrics are produced by events that happen via HTTP requests, like an API. The naive approach would be to structure this synchronously, writing to your metrics system directly in the API request handler.

naive approach

One way to resolve all of these issues is to somehow perform the work of writing into your metrics system asynchronously - that is, place the data in some sort of local queue and write into your downstream system via some other process (consuming that queue). This separation of concerns allows the system to be more robust and fault tolerant. At bitly, we use NSQ to achieve this.

Brief tangent: NSQ has the concept of topics and channels. Basically, think of a topic as a unique stream of messages (like our stream of API events above). Think of a channel as a copy of that stream of messages for a given set of consumers. Topics and channels are both independent queues, too. These properties enable NSQ to support both multicast (a topic copying each message to N channels) and distributed (a channel equally dividing its messages among N consumers) message delivery.

For a more thorough treatment of these concepts, read through the design doc and slides from our Golang NYC talk, specifically slides 19 through 33 describe topics and channels in detail.

architecture with NSQ

Integrating NSQ is straightforward, let’s take the simple case:

  1. Run an instance of nsqd on the same host that runs your API application.
  2. Update your API application to write to the local nsqd instance to queue events, instead of directly into the metrics system. To be able to easily introspect and manipulate the stream, we generally format this type of data in line-oriented JSON. Writing into nsqd can be as simple as performing an HTTP POST request to the /put endpoint.
  3. Create a consumer in your preferred language using one of our client libraries. This “worker” will subscribe to the stream of data and process the events, writing into your metrics system. It can also run locally on the host running both your API application and nsqd.

Here’s an example worker written with our official Python client library:

Python Example Code

In addition to de-coupling, by using one of our official client libraries, consumers will degrade gracefully when message processing fails. Our libraries have two key features that help with this:

  1. Retries - when your message handler indicates failure, that information is sent to nsqd in the form of a REQ (re-queue) command. Also, nsqd will automatically time out (and re-queue) a message if it hasn’t been responded to in a configurable time window. These two properties are critical to providing a delivery guarantee.
  2. Exponential Backoff - when message processing fails the reader library will delay the receipt of additional messages for a duration that scales exponentially based on the # of consecutive failures. The opposite sequence happens when a reader is in a backoff state and begins to process successfully, until 0.

In concert, these two features allow the system to respond gracefully to downstream failure, automagically.

Persistence

Ok, great, now you have the ability to withstand a situation where your metrics system is unavailable with no data loss and no degraded API service to other endpoints. You also have the ability to scale the processing of this stream horizontally by adding more worker instances to consume from the same channel.

But, it’s kinda hard ahead of time to think of all the types of metrics you might want to collect for a given API event.

Wouldn’t it be nice to have an archived log of this data stream for any future operation to leverage? Logs tend to be relatively easy to redundantly backup, making it a “plan z” of sorts in the event of catastrophic downstream data loss. But, would you want this same consumer to also have the responsibility of archiving the message data? Probably not, because of that whole “separation of concerns” thing.

Archiving an NSQ topic is such a common pattern that we built a utility, nsq_to_file, packaged with NSQ, that does exactly what you need.

Remember, in NSQ, each channel of a topic is independent and receives a copy of all the messages. You can use this to your advantage when archiving the stream by doing so over a new channel, archive. Practically, this means that if your metrics system is having issues and the metrics channel gets backed up, it won’t effect the separate archive channel you’ll be using to persist messages to disk.

So, add an instance of nsq_to_file to the same host and use a command line like the following:

/usr/local/bin/nsq_to_file --nsqd-tcp-address=127.0.0.1:4150 --topic=api_requests --channel=archive

archiving the stream

Distributed Systems

You’ll notice that the system has not yet evolved beyond a single production host, which is a glaring single point of failure.

Unfortunately, building a distributed system is hard. Fortunately, NSQ can help. The following changes demonstrate how NSQ alleviates some of the pain points of building distributed systems as well as how its design helps achieve high availability and fault tolerance.

Let’s assume for a second that this event stream is really important. You want to be able to tolerate host failures and continue to ensure that messages are at least archived, so you add another host.

adding a second host

Assuming you have some sort of load balancer in front of these two hosts you can now tolerate any single host failure.

Now, let’s say the process of persisting, compressing, and transferring these logs is affecting performance. How about splitting that responsibility off to a tier of hosts that have higher IO capacity?

separate archive hosts

This topology and configuration can easily scale to double-digit hosts, but you’re still managing configuration of these services manually, which does not scale. Specifically, in each consumer, this setup is hard-coding the address of where nsqd instances live, which is a pain. What you really want is for the configuration to evolve and be accessed at runtime based on the state of the NSQ cluster. This is exactly what we built nsqlookupd to address.

nsqlookupd is a daemon that records and disseminates the state of an NSQ cluster at runtime. nsqd instances maintain persistent TCP connections to nsqlookupd and push state changes across the wire. Specifically, an nsqd registers itself as a producer for a given topic as well as all channels it knows about. This allows consumers to query an nsqlookupd to determine who the producers are for a topic of interest, rather than hard-coding that configuration. Over time, they will learn about the existence of new producers and be able to route around failures.

The only changes you need to make are to point your existing nsqd and consumer instances at nsqlookupd (everyone explicitly knows where nsqlookupd instances are but consumers don’t explicitly know where producers are, and vice versa). The topology now looks like this:

adding nsqlookupd

At first glance this may look more complicated. It’s deceptive though, as the effect this has on a growing infrastructure is hard to communicate visually. You’ve effectively decoupled producers from consumers because nsqlookupd is now acting as a directory service in between. Adding additional downstream services that depend on a given stream is trivial, just specify the topic you’re interested in (producers will be discovered by querying nsqlookupd).

But what about availability and consistency of the lookup data? We generally recommend basing your decision on how many to run in congruence with your desired availability requirements. nsqlookupd is not resource intensive and can be easily homed with other services. Also, nsqlookupd instances do not need to coordinate or otherwise be consistent with each other. Consumers will generally only require one nsqlookupd to be available with the information they need (and they will union the responses from all of the nsqlookupd instances they know about). Operationally, this makes it easy to migrate to a new set of nsqlookupd.

EOL

Using these same strategies, we’re now peaking at 65k messages per second delivered through our production NSQ cluster. Although applicable to many use cases, we’re only scratching the surface with the configurations described above.

If you’re interested in hacking on NSQ, check out the GitHub repo and the list of client libraries. We also publish binary distributions for Linux and Darwin to make it easy to get started.

Stay tuned for more posts on leveraging NSQ to build distributed systems. If you have any specific questions, feel free to ping me on twitter @imsnakes.

Improving Frontend Code Quality and Workflow

When I started at Bitly as a Frontend Engineer, we were about to launch the new bitly. It was exciting to be so close to a product launch and we were cranking out lots of code each day. It took me a few days to get my bearings in the sprawling codebase and I saw lots of opportunity for refactoring, cleanup, and style normalization as well as some architectural concepts we weren’t leveraging.

Product launch mode kept us from doing house keeping for the next few months after we released the new product as we collected feedback and iterated on several designs/functionalities.

Once we had a chance to take a step back and regroup, I saw that we could be more productive if we invested in a common JavaScript style (both syntax and module level patterns) and re-evaluated which libraries we were leveraging to build the site.

When we decided the next feature work would be on save/share modal dialogs, I used this as an opportunity to evaluate my new picks for tools and libraries. The result was a success and allowed us to adopt the new tools and libraries for all new features (see Easily Save and Share the Links You Love).

Keep reading to see what tools we ended up using to make us more productive!

Coding style guidelines

If you’ve seen frontend JavaScript codebases older than six months with two or more developers working on them, you’ve seen how hard it is to enforce style.

The goal of a style guide (and code conventions in general) is to reduce the amount of friction when switching between different modules in a codebase and increase the consistency and readability of code. If every module follows the same patterns, you can easily get your bearings in code written by your coworkers. Every team should have a common style that is enforced in code review and ideally written in a document that can be referred to for new hires.

For JavaScript at Bitly, we mostly use the Google Style Guide without the JSDoc and with a variation that non-method variables are lowercase_with_underscores e.g.

var Bitly = function() {
};

Bitly.prototype.doThing = function() {
    var an_object = {};
};

The bigger the team, the more you might have to diverge from your preferred style but you’ll still get tons of benefit from the consistency alone.

Coffeescript for teams

At Bitly, in addition to codifying the style guide, we gave Coffeescript a trial run and decided to continue using it for all new code (like Github does). Coffeescript is quite contentious in the JS community, but for our frontend team we saw a great improvement in workflow and maintainability. Since whitespace is significant and {} and ; are almost always omitted, there is a distinct Coffeescript style that is encouraged by default.

Here’s a snippet from a Backbone View that declares all the DOM event handlers for the view in a concise way.

events:
    'click': 'focusInput'
    'focusin .picker-input': 'onFocusIn'
    'focusout .picker-input': 'checkForFullEmail'
    'keydown .picker-display': 'keyDownCheckForFocusedChip'
    'keydown .picker-input': 'inputKeydown'
    'keyup .picker-input': 'inputKeyup'
    'input .picker-input': 'inputNewText'
    'keydown .picker-suggestions': 'suggestionKeydown'
    'mousedown .picker-suggestions a': 'suggestionMousedown'
    'click .picker-suggestions a': 'selectItem'
    'click .picked-item .remove': 'removeItem'

Fat arrow (=>) is another great feature used for binding a function to the current scope and obviates the var self = this; pattern you will see in almost all JS codebases.

Here, fat arrow binds selectItem at constructor time to the newly constructed instance of MultiPickerView (not the prototype) instead of the default jQuery wrapped current target so I can access View methods/members within the handler.

class App.MultiPickerView extends Backbone.View
# ...
# ...
  selectItem: (e) =>
    e.preventDefault()
    $item = $(e.currentTarget)
    @addEmail $item.data("email")
    # @$input is convention for cached jquery elements 
    # stored on the view instance
    @$input.val ""
    @hideSuggest()
    _.defer => @$input.focus()

Fast and (mostly) logic-less templates with Handlebars

Handlebars is a template language with partials, custom helpers, and minimal logic, which can be compiled on the client or server side. We take advantage of the watcher system we already had in place for compiling Coffeescript to automatically compile our Handlebars templates ahead of time in order to reduce the workload on clients.

Immediate feedback with Coffeescript + File watcher + Growl

One of the major benefits for using Coffeescript is the compile-time checking that catches syntax errors. Instead of using the built-in coffee -cw *.coffee to compile-on-change, I wrote a script that does this but also triggers Growl notifications if the compilation produced errors.

coffeescript-error

The script does the same for *.handlebars files (for which there is not a built-in watcher functionality).

handlebars-error

See the source: Multiwatcher gist

Backbone (Models, Views, Events)

Historically, frontend JavaScript is fertile ground for spaghetti code, tight coupling, repeated code and scattered entry points. The introduction of jQuery increased this trend by making it easy for developers to handle DOM events and add visual effects wherever it was most convenient. This scales to a point, but when you start doing lots of AJAX, manipulating application state, and generating HTML using the DOM APIs, things can quickly get out of hand.

Backbone introduced a few great primitives that can be leveraged to make more maintainable frontend code.

Models are a good abstraction when you have a lot application state that can change based on user interaction and/or AJAX calls.

Views encapsulate DOM elements and provide a declarative way of setting your event listeners using jQuery’s delegated event listeners.

Events implements pub/sub pattern which allows you to easily decouple your models and views. This is probably the most important development for frontend application architecture. The order of events generally might go like this:

  1. Instantiate Model, View
  2. View listens on Model changes
  3. Model attribute changes (via AJAX callback or direct user interaction)
  4. Model fires change event on it’s event channel
  5. View’s listener fires in response and performs some action

The decoupling is that the Model never has knowledge of the View. This means the Model handles only the business logic and can be much more easily tested.

There has been much written on Backbone and other MV* frameworks, so I won’t evangelize much here except to say that adding a View and Model layer in your frontend JavaScript will improve your architecture significantly.

Addy Osmani gives a good treatment as to why you would use MV* and how to select your tools in: Journey through the JavaScript MVC jungle

Unit/Integration Tests with Mocha, Sinon

Unit testing is a given for any modern web app with a long lifecycle and high volume of traffic.

There are many test runner frameworks for testing but you should choose one that matches your style. I like Mocha because it can handle BDD, TDD, and export styles, has a nice web browser interface, is being adopted in lots of places, and is written by TJ Holowaychuk who has a great reputation in the node community. The two other big names in testing frameworks are Jasmine and QUnit, but they are more opinionated and less flexible.

Besides a test runner, you’ll want some helper libraries to make your tests easier to write.

Sinon is essential as it provides spy, stub, and mock functionality to make sure you are testing the unit under test and not its dependencies! The author wrote Test-Driven JavaScript Development and is the foremost expert on this topic.

To support different styles of test assertions, you may like Should.js which provides expressive methods so you can write tests like:

user.pets.should.have.length(5)

Here is a sample test file for our AJAX wrapper method (which uses mockjax):

$.mockjaxSettings.responseTime = 1
$.mockjax(
  url: '/data/beta/missing'
  status: 404
)
$.mockjax(
  url: '/data/beta/invalid'
  status: 200
  responseText:
    status_code: 500
    status_txt: 'MISSING_ARG'
    data: null
)
$.mockjax(
  url: '/data/beta/valid'
  status: 200
  responseText:
    status_code: 200
    status_txt: 'OK'
    data: []
)

describe 'BITLY', ->
  describe '#post', ->
    it 'should trigger .fail for non-200 status_code', (done) ->
      failSpy = sinon.spy()
      doneSpy = sinon.spy()
      BITLY.post('/data/beta/missing', {},
        success: doneSpy
        error: failSpy
      )
        .done(doneSpy)
        .done((data) ->
          data.should.be.a 'object'
        )
        .fail(failSpy)
        .always(->
          doneSpy.called.should.be.false
          failSpy.calledTwice.should.be.true
          done()
        )
    it 'should trigger .fail for non-200 response', (done) ->
      failSpy = sinon.spy()
      doneSpy = sinon.spy()
      BITLY.post('/data/beta/invalid', {},
        success: doneSpy
        error: failSpy
      )
        .done(doneSpy)
        .fail(failSpy)
        .always(->
          doneSpy.called.should.be.false
          failSpy.calledTwice.should.be.true
          done()
        )
    it 'should trigger .done for 200 response', (done) ->
      failSpy = sinon.spy()
      doneSpy = sinon.spy()
      BITLY.post('/data/beta/valid', {},
        success: doneSpy
        error: failSpy
      )
        .done(doneSpy)
        .done((data) ->
          data.should.be.a 'object'
        )
        .fail(failSpy)
        .always(->
          failSpy.called.should.be.false
          doneSpy.calledTwice.should.be.true
          done()
    )

Here is a sample of our test suite and its test runner:

mocha tests

Always be looking to improve your toolkit and sweat the small stuff when it comes to code quality. Every engineer wants to work with other great engineers. Ask yourself “If not me, then who on my team will make the effort?”

If you like working with other great engineers, bitly is hiring.

Classifying Human Traffic with Random Forest Decision Trees

At bitly, we study human behavior on the social web, and we often need to figure out when data is generated by a deliberate human action (organic data) or by an action taken by a script or without a human’s knowledge (inorganic data). bitly data scientist Brian Eoff recently gave a talk at PyData NYC 2012 on a fast random forest decision tree approach, implemented in Python with scikits-learn, to identifying organic vs inorganic data in a realtime stream.


SciKit Random Forest - Brian Eoff from Continuum Analytics on Vimeo.

NSQ: realtime distributed message processing at scale

NSQ is a realtime message processing system designed to operate at bitly’s scale, handling billions of messages per day.

It promotes distributed and decentralized topologies without single points of failure, enabling fault tolerance and high availability coupled with a reliable message delivery guarantee.

Operationally, NSQ is easy to configure and deploy (all parameters are specified on the command line and compiled binaries have no runtime dependencies). For maximum flexibility, it is agnostic to data format (messages can be JSON, MsgPack, Protocol Buffers, or anything else). Go and Python libraries are available out of the box.

This post aims to provide a detailed overview of NSQ, from the problems that inspired us to build a better solution to how it works inside and out. There’s a lot to cover so let’s start off with a little history…

Background

Before NSQ, there was simplequeue, a simple (shocking, right?) in-memory message queue with an HTTP interface, developed as part of our open source simplehttp suite of tools. Like its successor, simplequeue is agnostic to the type and format of the data it handles.

We used simplequeue as the foundation for a distributed message queue by siloing an instance on each host that produced messages. This effectively reduced the potential for data loss in a system which otherwise did not persist messages by guaranteeing that the loss of any single host would not prevent the rest of the message producers or consumers from functioning.

We also used pubsub, an HTTP server to aggregate streams and provide an endpoint for multiple clients to subscribe. We used it to transmit streams across hosts (or datacenters) and be queued again for writing to various downstream services.

As a glue utility, we used ps_to_http to subscribe to a pubsub stream and write the data to simplequeue.

There are a couple of important properties of these tools with respect to message duplication and delivery. Each of the N clients of a pubsub receive all of the messages published (each message is delivered to all clients), whereas each of the N clients of a simplequeue receive 1 / N of the messages queued (each message is delivered to 1 client). Consequently, when multiple applications need to consume data from a single producer, we set up the following workflow:

old school setup

The producer publishes to pubsub and for each downstream service we set up a dedicated simplequeue with a ps_to_http process to route all messages from the pubsub into the queue. Each service has its own set of “queuereaders” which we scale independently according to the service’s needs.

We used this foundation to process 100s of millions of messages a day. It was the core upon which bitly was built.

This setup had several nice properties:

But, it also had its issues…

One is simply the operational overhead/complexity of having to setup and configure the various tools in the chain. Of particular note are the pubsub > ps_to_http links. Given this setup, consuming a stream in a way that avoids SPOFs is a challenge. There are two options, neither of which is ideal:

  1. just put the ps_to_http process on a single box and pray
  2. shard by consuming the full stream but processing only a percentage of it on each host (though this does not resolve the issue of seamless failover)

To make things even more complicated, we needed to repeat this for each stream of data we were interested in.

Also, messages traveling through the system had no delivery guarantee and the responsibility of re-queueing was placed on the client (for instance, if processing fails). This churn increased the potential for situations that result in message loss.

Enter NSQ

NSQ is designed to (in no particular order):

To introduce some NSQ concepts, let’s start off by discussing configuration.

Simplifying Configuration and Administration

A single nsqd instance is designed to handle multiple streams of data at once. Streams are called “topics” and a topic has 1 or more “channels”. Each channel receives a copy of all the messages for a topic. In practice, a channel maps to a downstream service consuming a topic.

Topics and channels all buffer data independently of each other, preventing a slow consumer from causing a backlog for other channels (the same applies at the topic level).

A channel can, and generally does, have multiple clients connected. Assuming all connected clients are in a state where they are ready to receive messages, each message will be delivered to a random client. For example:

nsqd clients

NSQ also includes a helper application, nsqlookupd, which provides a directory service where consumers can lookup the addresses of nsqd instances that provide the topics they are interested in subscribing to. In terms of configuration, this decouples the consumers from the producers (they both individually only need to know where to contact common instances of nsqlookupd, never each other), reducing complexity and maintenance.

At a lower level each nsqd has a long-lived TCP connection to nsqlookupd over which it periodically pushes its state. This data is used to inform which nsqd addresses nsqlookupd will give to consumers. For consumers, an HTTP /lookup endpoint is exposed for polling.

To introduce a new distinct consumer of a topic, simply start up an NSQ client configured with the addresses of your nsqlookupd instances. There are no configuration changes needed to add either new consumers or new publishers, greatly reducing overhead and complexity.

NOTE: in future versions, the heuristic nsqlookupd uses to return addresses could be based on depth, number of connected clients, or other “intelligent” strategies. The current implementation is simply all. Ultimately, the goal is to ensure that all producers are being read from such that depth stays near zero.

It is important to note that the nsqd and nsqlookupd daemons are designed to operate independently, without communication or coordination between siblings.

We also think that it’s really important to have a way to view, introspect, and manage the cluster in aggregate. We built nsqadmin to do this. It provides a web UI to browse the hierarchy of topics/channels/consumers and inspect depth and other key statistics for each layer. Additionally it supports a few administrative commands such as removing and emptying a channel (which is a useful tool when messages in a channel can be safely thrown away in order to bring depth back to 0).

nsqadmin

Straightforward Upgrade Path

This was one of our highest priorities. Our production systems handle a large volume of traffic, all built upon our existing messaging tools, so we needed a way to slowly and methodically upgrade specific parts of our infrastructure with little to no impact.

First, on the message producer side we built nsqd to match simplequeue. Specifically, nsqd exposes an HTTP /put endpoint, just like simplequeue, to POST binary data (with the one caveat that the endpoint takes an additional query parameter specifying the “topic”). Services that wanted to switch to start publishing to nsqd only have to make minor code changes.

Second, we built libraries in both Python and Go that matched the functionality and idioms we had been accustomed to in our existing libraries. This eased the transition on the message consumer side by limiting the code changes to bootstrapping. All business logic remained the same.

Finally, we built utilities to glue old and new components together. These are all available in the examples directory in the repository:

Eliminating SPOFs

NSQ is designed to be used in a distributed fashion. nsqd clients are connected (over TCP) to all instances providing the specified topic. There are no middle-men, no message brokers, and no SPOFs:

nsq clients

This topology eliminates the need to chain single, aggregated, feeds. Instead you consume directly from all producers. Technically, it doesn’t matter which client connects to which NSQ, as long as there are enough clients connected to all producers to satisfy the volume of messages, you’re guaranteed that all will eventually be processed.

For nsqlookupd, high availability is achieved by running multiple instances. They don’t communicate directly to each other and data is considered eventually consistent. Consumers poll all of their configured nsqlookupd instances and union the responses. Stale, inaccessible, or otherwise faulty nodes don’t grind the system to a halt.

Message Delivery Guarantees

NSQ guarantees that a message will be delivered at least once, though duplicate messages are possible. Consumers should expect this and de-dupe or perform idempotent operations.

This guarantee is enforced as part of the protocol and works as follows (assume the client has successfully connected and subscribed to a topic):

  1. client indicates they are ready to receive messages
  2. NSQ sends a message and temporarily stores the data locally (in the event of re-queue or timeout)
  3. client replies FIN (finish) or REQ (re-queue) indicating success or failure respectively. If client does not reply NSQ will timeout after a configurable duration and automatically re-queue the message)

This ensures that the only edge case that would result in message loss is an unclean shutdown of an nsqd process. In that case, any messages that were in memory (or any buffered writes not flushed to disk) would be lost.

If preventing message loss is of the utmost importance, even this edge case can be mitigated. One solution is to stand up redundant nsqd pairs (on separate hosts) that receive copies of the same portion of messages. Because you’ve written your consumers to be idempotent, doing double-time on these messages has no downstream impact and allows the system to endure any single node failure without losing messages.

The takeaway is that NSQ provides the building blocks to support a variety of production use cases and configurable degrees of durability.

Bounded Memory Footprint

nsqd provides a configuration option --mem-queue-size that will determine the number of messages that are kept in memory for a given queue. If the depth of a queue exceeds this threshold messages are transparently written to disk. This bounds the memory footprint of a given nsqd process to mem-queue-size * #_of_channels_and_topics:

message overflow

Also, an astute observer might have identified that this is a convenient way to gain an even higher guarantee of delivery by setting this value to something low (like 1 or even 0). The disk-backed queue is designed to survive unclean restarts (although messages might be delivered twice).

Also, related to message delivery guarantees, clean shutdowns (by sending a nsqd process the TERM signal) safely persist the messages currently in memory, in-flight, deferred, and in various internal buffers.

Note, a channel whose name ends in the string #ephemeral will not be buffered to disk and will instead drop messages after passing the mem-queue-size. This enables consumers which do not need message guarantees to subscribe to a channel. These ephemeral channels will also not persist after its last client disconnects.

Efficiency

NSQ was designed to communicate over a “memcached-like” command protocol with simple size-prefixed responses. All message data is kept in the core including metadata like number of attempts, timestamps, etc. This eliminates the copying of data back and forth from server to client, an inherent property of the previous toolchain when re-queueing a message. This also simplifies clients as they no longer need to be responsible for maintaining message state.

Also, by reducing configuration complexity, setup and development time is greatly reduced (especially in cases where there are >1 consumers of a topic).

For the data protocol, we made a key design decision that maximizes performance and throughput by pushing data to the client instead of waiting for it to pull. This concept, which we call RDY state, is essentially a form of client-side flow control.

When a client connects to nsqd and subscribes to a channel it is placed in a RDY state of 0. This means that no messages will be sent to the client. When a client is ready to receive messages it sends a command that updates its RDY state to some # it is prepared to handle, say 100. Without any additional commands, 100 messages will be pushed to the client as they are available (each time decrementing the server-side RDY count for that client).

Client libraries are designed to send a command to update RDY count when it reaches ~25% of the configurable max-in-flight setting (and properly account for connections to multiple nsqd instances, dividing appropriately).

nsq protocol

This is a significant performance knob as some downstream systems are able to more-easily batch process messages and benefit greatly from a higher max-in-flight.

Notably, because it is both buffered and push based with the ability to satisfy the need for independent copies of streams (channels), we’ve produced a daemon that behaves like simplequeue and pubsub combined . This is powerful in terms of simplifying the topology of our systems where we would have traditionally maintained the older toolchain discussed above.

Go

We made a strategic decision early on to build the NSQ core in Go. We recently blogged about our use of Go at bitly and alluded to this very project - it might be helpful to browse through that post to get an understanding of our thinking with respect to the language.

Regarding NSQ, Go channels (not to be confused with NSQ channels) and the language’s built in concurrency features are a perfect fit for the internal workings of nsqd. We leverage buffered channels to manage our in memory message queues and seamlessly write overflow to disk.

The standard library makes it easy to write the networking layer and client code. The built in memory and cpu profiling hooks highlight opportunities for optimization and require very little effort to integrate. We also found it really easy to test components in isolation, mock types using interfaces, and iteratively build functionality.

Overall, it’s been a fantastic project to use as an opportunity to really dig into the language and see what it’s capable of on a larger scale. We’ve been extremely happy with our choice to use golang, its performance, and how productive we are using it.

EOL

We’ve been using NSQ in production for several months and we’re excited to share this with the open source community.

Across the 13 services we’ve upgraded, we’re processing ~35,000 messages/second at peak through the cluster. It has proved both performant and stable and made our lives easier operating our production systems.

There is more work to be done though — so far we’ve converted ~40% of our infrastructure. Fortunately, the upgrade process has been straightforward and well worth the short-term time tradeoff.

We’re really curious to hear what you think, so grab the source from github and try it out.

Finally, this labor of love began as scratching an itch — bitly provided an environment to experiment, build, and open source it… we’re always hiring.

Static Analysis, Syntax Checking, Code Formatting

Nothing describes the need for consistent code style more than this quote from Martin Fowler: “Any fool can write code that a computer can understand. Good programmers write code that humans can understand.”

I would amend that quote though, because in reality it’s hard to write good code for computers to understand, too. That is where tools that perform Static Analysis and Syntax Checking come into play. They both help track down and identify classes of problems that might otherwise go undetected, resulting in production issues.

We want to share a few specific and practical ways we perform Static Analysis and produce consistently styled code at bitly.

pyflakes and pep8 for Python

For python, we like to run our code through pyflakes. There are a few different versions of pyflakes floating around, but we use this one because it’s been rewritten to use AST so it’s fast, and it has output we like for scanning a whole directory tree.

Pyflakes for us looks like this

$ pyflakes --quiet
.................F........

$ pyflakes
./graphite/conf/graphite_settings.py:23: redefinition of unused 
    'rrdtool' from line 21

There are ways to plug this type of check into various editors. Some of us use editor plugins for Textmate, Sublime to give feedback immediately during development.

Pyflakes is good at highlighting syntax issues, but it doesn’t help resolve inconsistent python style. Do you put whitespace around operators? Do you inline your if statements? Tabs or spaces? To address some of these, we use a version of pep8 which has the ability to automatically apply consistent formatting.

$ pep8 --ignore=W293 --ignore=W391 --max-line-length=120 .
./deploy_ui.py:95:66: W291 trailing whitespace
./deploy_ui.py:253:62: E225 missing whitespace around operator
./deploy_ui.py:257:5: E303 too many blank lines (2>1)
./settings.py:7:10: E203 whitespace before ':'

$ pep8 --fix --inplace settings.py
settings.py:9:80: E501 line too long (121 characters)
settings.py:16:7: W291 trailing whitespace
settings.py:7:10: E203 whitespace before ':'
settings.py:9:54: E261 at least two spaces before inline comment
settings.py:50:1: E302 expected 2 blank lines, found 1
 - pep8 fix: Writing changes to settings.py
 - pep8 fix: Applying fix for E302 at line 50, offset 0
 - pep8 fix: Applying fix for E261 at line 45, offset 34
 - pep8 fix: Applying fix for E203 at line 45, offset 15
 - pep8 fix: Applying fix for E203 at line 44, offset 26
 - pep8 fix: Applying fix for E261 at line 43, offset 45
 [truncated]

Bash Syntax Checking

Bash has a built in way to syntax check a file by running bash -n $file. We wrap this in a short script to give us similar usage to pyflakes. If all goes well, and there are no errors, it’s one dot per file checked. The syntax checking isn’t as extensive as pyflakes, but it beats finding out in the middle of a production maintenance that the one script you needed to work has a syntax error.

$ test_shell_scripts.sh --quiet
.......................................F...................

$ test_shell_scripts.sh 
./test.sh
./test.sh: line 7: unexpected EOF while looking for matching `)'
./test.sh: line 245: syntax error: unexpected end of file

Go Syntax Checking

We have written previously about our experience adopting golang as a primary language at bitly. One of the golang choices we really like is directly related to code formatting. The Go core developers made a choice to preempt coding style wars by designing a formatting tool go fmt as part of the language distribution.

Additionally, things that many languages list as warning (or just swallow completely) are complier errors in Go. Examples of errors in Go are: unused variables, undefined variables, missing imports, mixing types, etc. These issues are raised by the compiler and will keep code from being compiled. To this end, the language itself enforces static analysis and syntax checking at compile time, which is nice because we don’t need any extra tools.

C Style with astyle

For C code we don’t use a static analysis tool (we use dynamic tools like valgrind), but we do programmatically apply One True Brace Style with astyle. (We suggest a recent version of astyle from trunk; it has bug fixes we found useful). Our specific astyle syntax is applied with the following command

$ astyle --style=1tbs \
    --lineend=linux \
    --convert-tabs \
    --preserve-date \
    --fill-empty-lines \
    --pad-header \
    --indent-switches \
    --align-pointer=name \
    --align-reference=name \
    --pad-oper \
    --suffix=none \
    $file

Javascript with Closure Compiler

We use Google’s Closure Compiler to prepare our Javascript files for production use. In addition to concatenating and compressing our Javascript, the Closure Compiler also provides syntax and type checking, variable reference checking, and warnings about common Javascript pitfalls:

$ java -jar closure-compiler.jar \
    --js input.js \
    --js_output_file output-min.js
input.js:83: WARNING - Suspicious code. This code lacks side-effects.
        Is there a bug?
        if(!data.mode) { data.mode === "save"; }
input.js:1150: ERROR - Parse error. Internet Explorer has a
        non-standard intepretation of trailing commas. Arrays will
        have the wrong length and objects will not parse at all.

EOL

If this sounds like how you would like to write code, bitly is hiring.

Go Go Gadget

bitly’s stack isn’t fancy. We use a combination of languages and tools that interact well with each other and promote philosophies such as explicitness, simplicity, and robustness. Most of our services are Python with many core components in C (of which some are open sourced under simplehttp). We believe whole-heartedly in a services oriented approach where components do one thing and do it well.

C is a fantastic language, one which I highly encourage all developers to learn and experiment with as it pulls away the veil and provides insight into all the things that are happening behind those seemingly innocent lines of a scripting language. With great power comes great responsibility and sometimes that power comes at a cost – primarily in terms of development and debugging time and thus the time it takes for you to ship your product.

Python has also treated us well. Its readability, standard library, and obviousness make it a great language to work with on a team. It performs well and is flexible to the point where it can be used successfully in most any situation – whether it be data science, network daemons, or one off shell scripts.

Enter Go

We identified early on that Go had all the makings of a language that could supersede some of the places we would have traditionally turned to C and some of the places where we wanted to move away from Python.

For example, we’ve built many network daemons in C that don’t require strict, absolute, control over memory. This is where Go shines. The “cost” of a garbage collected language in these types of situations is overshadowed by the huge benefit to development time and flexibility. Code becomes more readable as a natural side effect of being able to focus more on the “meat” of the problem instead of having to worry about low level details. Mix in the “batteries included” standard library and you’re cookin’.

Also, all of our C apps are built on libevent - i.e. a single-threaded event loop with callbacks that enables us to efficiently handle many thousands of simultaneous open connections. We use the same technique in Python via tornado, but the amount of work a single Python process can do is limited largely by runtime overhead. Additionally, this style of code can be hard to read and debug when you’re not used to it. Go’s lightweight concurrency model allows you to write in an imperative style while providing a built-in way to leverage the multi-core architecture of the computers we operate on.

Just as in C, Go promotes a style of very explicit and localized error handling. While sometimes verbose it forces a developer to really think about how their program might fail and what to do about it. Unlike C, Go has the built-in capability of returning multiple values from a function, resulting in clearer and more concise error-handling code.

The points above are mostly low-level. Go’s true power comes from the fact that the language is small and the standard library is large. We’ve had developers start from zero experience with Go to writing working, production, code in a day. That is why we’re so excited about its place in our stack.

Think about it like this… if you can write something in Go just as fast as you could in Python and:

What would you choose?

And in an effort to encourage the team to experiment and learn about Go we created the “Go Gopher Award”! The most recent engineer to learn the language and get code into production gets to have the Go Gopher Squishable sit with them at their desk. This is an extremely prestigious award.

Go Isn’t Perfect

Let’s talk about garbage collection. It isn’t free. Go’s garbage collector is a conservative, stop-the-world, mark-and-sweep GC. Practically speaking this means that the more careless you are in what, how many, and how often you allocate the longer the world stops when the Go runtime cleans things up. Unsurprisingly, significant performance gains can be had by being more careful… not only in raw speed but in the consistency of it. Fortunately Go provides fantastic hooks (pprof) to introspect the runtime behavior of your application. It’s generally obvious when your bottleneck is the GC.

Interacting with JSON can be a bit clumsy. It’s not terrible… just isn’t as elegant and readable as the alternative in Python. Truthfully it’s quite impressive that it’s as good as it is considering the type safety of the language. We’ve made an effort to make this even easier and open sourced simplejson, a package that exposes a clean, chainable, API for interacting with JSON where the format may not be known ahead of time.

Packaging has also given us some trouble. This is an area where we’ve probably gone the most back-and-forth and are admittedly still in the early stages of finding what works for us. The biggest problem is somewhat related to its purported greatest strength, i.e. qualifying the full path when importing an open-source package. This has led to a whole crop of third-party utilities that work around this by re-writing fully qualified import paths to something the user specifies (and installing the package as the same, modified, name). We use go-install-as and our policy is to install non-standard packages under our own custom import paths to ensure that our apps are always compiled against the version and source we’ve strictly enforced to be installed on a machine.

One notable feature missing from the HTTP client in Go’s standard library is the ability to set timeouts. It is further complicated by the inability of the API to provide easy access to the connection (to do it yourself). We set out to bridge the gap and wrote go-httpclient (with the expectation that these features will eventually make it into the standard library).

Oh, and one night after way too much drinking they decided to use a radical syntax for formatting time as strings (instead of your standard strftime parameters). We open sourced go-strftime to remedy that.

All in all these “issues” are minor and ultimately we felt that the benefits far outweigh the tradeoffs – Go meets our needs and philosophies as software engineers.

Go In Production

Where are we actually using Go in production? Let’s take a step back and talk about how data moves through our infrastructure first.

In the spirit of keeping things simple, fault tolerant, and asynchronous, most everything that gets written to downstream systems is done so via message queues (formatted in JSON with all communication over HTTP). Some of these message queues process volumes in the thousands per second. We call the workers queuereaders, all of which were originally written in Python. At the lower level are two C apps – simplequeue, an HTTP interface to an in-memory data agnostic queue, and pubsub, an HTTP interface for one-to-many streaming.

As volume increases, the overhead of processing each message becomes more significant. Therefore, these applications emerged as one key area where Go could replace Python. We started by re-writing some queuereaders in Go by building matching libraries to support the same API we expose in Python. Also, whereas in Python we would run multiple processes, in Go we leverage channels and goroutines to write straightforward code that parallelizes HTTP requests to downstream systems or aggregates writes to disk. These same features make it really easy to batch retrieve messages from the queue while still processing them independently, reducing round-trips.

By experimenting with non-critical systems at the edge of our infrastructure we were able to safely and methodically learn about Go’s behavior in production. Needless to say, we’ve been very happy. We’ve seen consistent, measurable performance gains without sacrificing stability.

We also wanted a bit more functionality out of the core combination of simplequeue and pubsub, so we decided to build a successor with the goal of improving message delivery guarantees, maintaining the spirit and simplicity of what we already had, increasing performance, solving some configuration and setup issues, and providing a straightforward upgrade path.

We realized Go would be perfect for this project. We’ll talk about it in depth in a future blog post (and don’t worry, it will be open source). Is it any good? Yes.

EOL

Finally, we’ve also open sourced a few other Go related items…

As always, bitly is hiring.