During development of Python web applications, there are a lot of tools that
can help provide clues about what caused a bug or exception. For
example, Django’s default
debug error page prints out local variables at
every stack frame when there’s an unhandled exception. The popular
django_debugtoolbar adds more information. In a similar vein there are things
like Pyramid’s pyramid_debugtoolbar and its predecessor WebError.
But when troubleshooting production issues, those tools aren’t available, for good reason: We need the useful information to be exposed to developers, not overwhelming our end users nor exposing sensitive information to malicious eyes.
So, we turn to logging instead. But that provides a lot less information out of the box. So at bitly, we often found ourselves in an irritating cycle that went something like this:
Sometime soon after a deploy, we notice an unusual number of exceptions being logged. (Let’s say that old Python chestnut,
'NoneType' object is unsubscriptable.) So we know that we have some code that is expecting a dict or list and getting
We may decide the error is not critical enough to roll back without first trying to diagnose the underlying cause, but of course we want to fix it quickly. The pressure is on.
We find the traceback in our logs. It looks like:
But where’s the
None? Is it
response['data']or is it one of the
entryvalues? We have no way to see. (Tip: We can actually tell that it’s not
response['data']['subtotal']. Do you know why? Would you remember that while under pressure to fix a production bug?)
We try, and fail, to provoke the error on our local development or staging environments.
Then we add some logging code.
We get a quick code review and deploy the logging code.
We wait for the error to happen again.
We find the log message and realize we forgot to log everything we wanted to see, or we need more contextual information. Go back 3 steps and repeat as needed.
Finally we can see what the bad value was and we finally have enough clues that we can understand the cause of the error and fix it.
This is a frustrating amount of overhead when you’re trying to diagnose a production issue as quickly as possible, even with multiple people helping out.
Fortunately, if you make liberal use of
logging.exception() — for example,
if your web framework calls it on any unhandled exception, and if you take care
to call it on unknown exceptions that you deliberately catch — then it’s
actually easy to change our logging configuration such that we get a lot more
useful information logged. And we can do this globally without sprinkling
logging changes throughout our code.
Here’s one possible way to hack your logging config to do this:
Then, in whatever code sets up your logging configuration, you can add this to the relevant handler like so:
MAX_VARS_LINES values are used to limit the amount
of data we dump out. And so far we’re only logging the local variables from
the innermost frame, on the assumption that those are the most likely to be
Having deployed this logging configuration, our hypothetical traceback would look more like this:
Aha. Now we can clearly see that some upstream service failed, and our code is failing to handle that. And we can take action immediately.
This technique is not a magic crystal ball, certainly; sometimes the important
clues are elsewhere in the stack, or they’re in the current global scope. If
we wanted to, it would be trivial to log locals and globals at every frame,
just by grabbing the values of
while tb.tb_next loop. But that could get very verbose, and some
of our services get hit very hard and log a lot, so we haven’t gone that far
We have already used these enhanced log entries to quickly diagnose a couple of bugs in production, so we’re happy with this little tweak. Hopefully you will find it useful too.
If you’d like to get your hands directly on our code, we’re hiring.
Here at bitly, we are big fans of data, tubes and especially tubes that carry data.
This is a story about asking tubes to carry too much data.
A physical hadoop cluster has been a significant part of bitly’s infrastructure and a core tool of the bitly Data Science and Ops/Infra teams for some time. Long enough that we needed to cycle in a new cluster, copy data, and fold the old into the new.
Branches were opened, work was done, servers provisioned and the new cluster was stood up. Time to take a step back from the story and flesh out some technical details:
bitly operates at a consequential scale of data: At the time of this migration the hadoop cluster was just over 150TB consumed disk space of compressed stream data, that is data that is the valuable output of our various applications after having been manipulated and expanded on by other applications.
bitly’s physical presence is collocated with our data center partner. There are three physical chassis classes (application, storage and database) racked together in contiguous cabinets in rows. At the time of this story each chassis had three physical 1Gb Ethernet connections (each logically isolated by VLANs), frontlink, backlink and lights-out (for out of band management of the server chassis). Each connection, after a series of cabinet specific patch panels and switches, connects to our core switches over 10Gb glass in a hub and spoke topology.
While bitly also operates at a consequential physical scale (hundreds of physical server chassis), we depend on our data center partner for network infrastructure and topology. This means that within most levels of the physical networking stack, we have severely limited control and visibility.
Back to the story:
The distcp tool bundled with hadoop allowed us to quickly copy data from one cluster to the other. Put simply, distcp tool creates a mapreduce job to shuffle data from one hdfs cluster to another, in a many to many node copy. Distcp was fast, which was good.
bitly broke, and the Ops/Infra team was very sad.
Errors and latent responses were being returned to website users and api clients. We discovered that services were getting errors from other services, database calls were timing out and even DNS queries internal to our network were failing. We determined that the copy had caused unforeseen duress on the network tubes, particularly the tubes of the network that carried traffic cross physical cabinets. However the information given to us by our data center partner only added to the confusion: no connection, not even cabinet to core, showed signs of saturation, congestion or errors.
We now had two conflicting problems: a need to continue making headway with the hadoop migration as well as troubleshooting and understanding our network issues.
We limited the number of mapreduce mappers that the distcp tool used to copy data between clusters, which artificially throttled throughput for the copies, allowing us to resume moving forward with the migration. Eventually the copies completed, and we were able to swap the new cluster in for the old.
The new cluster had more nodes, which meant that hadoop was faster.
The Data Science team was happy.
Unfortunately, with hadoop being larger and faster, more data was getting shipped around to more nodes during mapreduce workloads, which had the unintended effect of:
bitly broke, a lot. The Ops/Infra team was very sad.
The first response action was to turn hadoop off.
The Data Science team was sad.
A turned off hadoop cluster is bad (just not as bad as breaking bitly), so we
time warped the cluster back to 1995 by
forcing all NICs to re-negotiate at 100Mbps (as opposed to 1Gbps) using
ethtool -s eth1 speed 100 duplex full autoneg
on. Now we could safely turn hadoop on, but it was painfully slow.
The Data Science team was still sad.
In fact it was so slow and congested, that data ingestion and scheduled ETL/reporting jobs began to fail frequently, triggering alarms that woke up Ops/Infra team members up in the middle of the night.
The Ops/Infra team was still sad.
Because of our lack of sufficient visibility into the state of the network, working through triage and troubleshooting with our data center partner was going to be an involved and lengthy task. Something had to be done to get hadoop into a usable state, while protecting bitly from breaking.
Time to take another step back:
Some tools we have at bitly:
roles.json: A list of servers (app01, app02, userdb01, hadoop01 etc), roles (userdb, app, web, monitoring, hadoop_node etc), and the mapping of servers into roles (app01,02 -> app, hadoop01,02 -> hadoop_node etc).
$datacenter/jsons/*: A directory containing a json file per logical server, with attributes describing the server such as ip address, names, provisioning information and most importantly for this story; cabinet location.
Since we could easily identify what servers do what things, where that server is racked and can leverage all the benefits of Linux, this was a solvable problem, and we got to work.
And the Ops/Infra team was sad.
Because Linux’s networking Traffic Control (tc) syntax was clunky and awkward and its documentation intimidating. After much swearing and keyboard smashing, perseverance paid off and working examples of tc magic surfaced. Branches were opened, scripts were written, deploys done, benchmarks run and finally some test nodes were left with the following:
$ tc class show dev eth1 class htb 1:100 root prio 0 rate 204800Kbit ceil 204800Kbit burst 1561b cburst 1561b class htb 1:10 root prio 0 rate 819200Kbit ceil 819200Kbit burst 1433b cburst 1433b class htb 1:20 root prio 0 rate 204800Kbit ceil 204800Kbit burst 1561b cburst 1561b $ tc filter show dev eth1 filter parent 1: protocol ip pref 49128 u32 filter parent 1: protocol ip pref 49128 u32 fh 818: ht divisor 1 filter parent 1: protocol ip pref 49128 u32 fh 818::800 order 2048 key ht 818 bkt 0 flowid 1:20 match 7f000001/ffffffff at 16 filter parent 1: protocol ip pref 49129 u32 filter parent 1: protocol ip pref 49129 u32 fh 817: ht divisor 1 filter parent 1: protocol ip pref 49129 u32 fh 817::800 order 2048 key ht 817 bkt 0 flowid 1:10 match 7f000002/ffffffff at 16 filter parent 1: protocol ip pref 49130 u32 filter parent 1: protocol ip pref 49130 u32 fh 816: ht divisor 1 filter parent 1: protocol ip pref 49130 u32 fh 816::800 order 2048 key ht 816 bkt 0 flowid 1:20 match 7f000003/ffffffff at 16 <snipped> $ tc qdisc show qdisc mq 0: dev eth2 root qdisc mq 0: dev eth0 root qdisc htb 1: dev eth1 root refcnt 9 r2q 10 default 100 direct_packets_stat 24
In plain English, there are three traffic control
classes. Each class
represents a logical group, to which a filter
can be subscribed, such as:
class htb 1:100 root prio 0 rate 204800Kbit ceil 204800Kbit burst 1561b cburst 1561b
Each class represents a ceiling or throughput limit of outgoing traffic aggregated across all filters subscribed to that class.
Each filter is a specific rule for a specific ip (unfortunately each IP is printed in hex) , so the filter:
filter parent 1: protocol ip pref 49128 u32 filter parent 1: protocol ip pref 49128 u32 fh 818: ht divisor 1 filter parent 1: protocol ip pref 49128 u32 fh 818::800 order 2048 key ht 818 bkt 0 flowid 1:20 match 7f000001/ffffffff at 16
can be read as “subscribe hadoop14 to the class 1:20” where “7f000001” can be read as the IP for hadoop14 and “flowid 1:20” is the class being subscribed to.
Finally there is a qdisc, which is more or less the active queue for the eth1 device. That queue defaults to placing any host that is not otherwise defined in a filter for a class, into the 1:100 class.
qdisc htb 1: dev eth1 root refcnt 9 r2q 10 default 100 direct_packets_stat 24
With this configuration, any host, hadoop or not, that is in the same cabinet as the host being configured gets a filter that is assigned to the “1:10” class, which allows up to ~800Mbps for the class as a whole. Similarly, there is a predefined list of roles that are deemed “roles of priority hosts”, which get a filter created on the same “1:100” rule. These are hosts that do uniquely important things, like running the hadoop namenode or jobtracker services, and also our monitoring hosts.
Any other hadoop host that is not in the same cabinet is attached to the “1:20” class, which is limited to a more conservative ~200Mbps class.
As mentioned before, any host not specified by a filter gets caught by the default class for the eth1 qdisc, which is “1:100”.
What does this actually look like? Here is a host that is caught by the catch all “1:100” rule:
[root@hadoop27 ~]# iperf -t 30 -c NONHADOOPHOST ------------------------------------------------------------ Client connecting to NONHADOOPHOST, TCP port 5001 TCP window size: 23.2 KByte (default) ------------------------------------------------------------ [ 3] local hadoop27 port 35897 connected with NONHADOOPHOST port 5001 [ ID] Interval Transfer Bandwidth [ 3] 0.0-30.1 sec 735 MBytes 205 Mbits/sec
Now when connecting to another host in the same cabinet, or the “1:10” rule :
[root@hadoop27 ~]# iperf -t 30 -c CABINETPEER ------------------------------------------------------------ Client connecting to CABINETPEER, TCP port 5001 TCP window size: 23.2 KByte (default) ------------------------------------------------------------ [ 3] local hadoop27 port 39016 connected with CABINETPEER port 5001 [ ID] Interval Transfer Bandwidth [ 3] 0.0-30.0 sec 2.86 GBytes 820 Mbits/sec
Now what happens when connecting to two servers that match the “1:10” rule?
[root@hadoop27 ~]# iperf -t 30 -c CABINETPEER1 ------------------------------------------------------------ Client connecting to CABINETPEER1, TCP port 5001 TCP window size: 23.2 KByte (default) ------------------------------------------------------------ [ 3] local hadoop27 port 39648 connected with CABINETPEER1 port 5001 [ ID] Interval Transfer Bandwidth [ 3] 0.0-30.0 sec 1.47 GBytes 421 Mbits/sec [root@hadoop27 ~]# iperf -t 30 -c CABINETPEER2 ------------------------------------------------------------ Client connecting to 10.241.28.160, TCP port 5001 TCP window size: 23.2 KByte (default) ------------------------------------------------------------ [ 3] local hadoop27 port 38218 connected with CABINETPEER2 port 5001 [ ID] Interval Transfer Bandwidth [ 3] 0.0-30.0 sec 1.43 GBytes 408 Mbits/sec
So the traffic got halved? Sounds about right.
Even better, trending the data was relatively easy by mangling the stats output to our trending services:
$ /sbin/tc -s class show dev eth1 classid 1:100 class htb 1:100 root prio 0 rate 204800Kbit ceil 204800Kbit burst 1561b cburst 1561b Sent 5876292240 bytes 41184081 pkt (dropped 0, overlimits 0 requeues 0) rate 3456bit 2pps backlog 0b 0p requeues 0 lended: 40130273 borrowed: 0 giants: 0 tokens: 906 ctokens: 906
After testing, we cycled through hadoop hosts, re-enabling their links to 1Gb after applying the traffic control roles. With deploys done, hadoop was use-ably performant.
The Data Science team was happy.
The Ops/Infra team could begin tackling longer term troubleshooting and solutions while being able to sleep at night, knowing that bitly was not being broken.
The Ops/Infra team was happy.
In dire moments: your toolset for managing your environment is as important as the environment itself. Because we already had the toolset available to holistically control the environment, we were able to dig ourselves out of the hole almost as quickly as we had fallen into it.
Don’t get into dire moments: Understand the environment that you live in. In this case, we should have had a better understanding and appreciation for the scope of the hadoop migration and its possible impacts.
Linux TC is a high cost, high reward tool. It was almost certainly written by people with the very longest of beards, and requires time and patience to implement. However we found it to be an incredibly powerful tool that helped save us from ourselves.
This story is a good reminder of the "Law of Murphy for devops". Temporary solutions like those in this story afforded us the time to complete troubleshooting of our network and implement permanent fixes. We have since unthrottled hadoop and moved it to its own dedicated network, worked around shoddy network hardware to harden our primary network and much more. Stay tuned.
Recently we gave a talk at Devopsdays Portland about how we deploy all day at bitly without breaking the Internet. Some of the topics we covered include automation, state management, systems architecture, distributed messaging, and peer review. Check it out!
For the past 15 years, I’ve worked as a remote employee. I currently live in Denver and work for Manhattan-based Bitly. Before that, I was on a Virginia-based team at AOL for 11 years, working from Philly, Dallas, Philly again, New York and then Denver. During that time, I built up some personal tenets for working remotely which have served me well. They are in some ways simple, but when applied consistently, I have found them to foster quite a positive experience for those working with me as a remote individual.
Here they are, captured for others who might benefit from them:
1) Make sure your online presence is accurate. If you’re online on IM, you should really be there. If you’re away from your desk, put up your away message or sign off. You should be almost as easy to reach as people in the office.
2) Call in 5 minutes early to every meeting. When people join a conference call, you should always already be there, on the call, ready to go. You want people to react this way about you, “Of course Michael’s there. He’s always there.” With Skype, this translates simply to, “Answer the moment they call.” With Google Hangouts, it applies directly — be the first one to join.
3) Speak up on calls. Make your presence known and heard. If you never have anything to say, why are you there?
4) If you can’t hear, tell them you can’t hear. If what people are saying has any importance, you should be able to hear it. Same goes for seeing on video calls. If you can’t see what’s being discussed, try to get it remedied.
5) Be productive. If there’s ever a question that you’re not getting your work done as expected, you’ll be the low-hanging fruit precisely because you’re remote. I survived layoff rounds at AOL practically every 6 months for years. I was never concerned, because I knew I always delivered.
6) Make regular trips to the mothership. In-person contact is still needed periodically. I have found about every 6 to 8 weeks to be a pretty good interval.
7) Make your core hours match the company’s core hours. This isn’t always possible, depending on the time difference, but you should strive to achieve at least 3 or 4 hours of overlap. When I worked with a QA group in India for a time, where the time difference was 12.5 hours, we all adjusted our work hours so we could achieve a 3-hour overlap. Before we did that, question-and-answer cycles often took literally 2 days. To answer one question!
8) Figure out what you need others to do differently. You may need to make people aware that working with a remote individual may take some work on their part as well. They may need to be reminded to take extra time to think, “Does Michael need to know about this decision we just made while brewing coffee?” If you are invited to a meeting, is there a call-in number set up? If Skype, who is calling whom?
I find that it doesn’t much matter where I’m working — which tends either to be home or Starbucks — as long as I am able to follow the above tenets. That means if connectivity is important for a given day (which it usually is), it’s my responsibility to make sure I have access to reliable connectivity. If I’m supposed to do a 4-hour planning session, home is the right choice. Solid coding session? Starbucks affords just the right amount of ambient noise, random distraction and caffeine.
It’s worth mentioning that being a remote employee is certainly not for everyone. It takes discipline. You also definitely miss out on at least some of the camaraderie of the in-office experience, which at Bitly is saying something. I haven’t once, for instance, partaken in Drink Cart Friday. This is a problem. Perhaps I need to adjust my travel times….
Oh, and we’re hiring. (Are we hiring remotes? I guess that depends on your tenets for working remotely.)
Here is a tale of how we leverage redundant datacenters, redundant code, and multi-tiered fallbacks in the quest for uptime.
High availability is important for any site operating at scale, but at bitly it is particularly important; people expect bitly links to work, no matter what. We have enterprise customers who rely on them for key metrics, users who share them on social networks, and websites with custom short domains that trust us to serve requests with their name on them. A bitly link not working in any of these scenarios would make our users look bad, so it is something we take very seriously.
No matter how redundant, distributed, and fault tolerant your main infrastructure is, things can always go wrong. Recently Google Apps and Search, probably the most distributed infrastructure in existence, went down. There are unknowns everywhere, and ultimately you have to plan for any part of your infrastructure breaking for unknown reasons. Under failure, a distributed system should degrade gracefully, not suddenly. This is why we created Z Proxy.
So What is Z Proxy?
Z Proxy is an application that serves decodes (this is what we call redirecting from a short bitly link to its long URL, and what happens every time you click on a bitly link) without relying on any other part of the bitly infrastructure. This means that it does not use our primary database of urls, or any of our other servers, to do lookups. So how does it work?
How it Works
Z Proxy is essentially a self contained wrapper around S3, written in Go. When all of bitly is running properly, every time a link is shortened, a message is put on NSQ, which a queuereader later grabs. A queuereader then writes the short and long urls into S3 so that Z Proxy can perform lookups against S3 by short url, get the long url, and serve a 301 or 302 redirect. To the browser, nothing different happened.
There are multiple host running Z Proxy in EC2. This location provides proximity to S3, high availability, and most importantly different availability from the main decode infrastructure, which exists outside of AWS. EC2 and S3 can have problems, but the chance of this happening at the same time as our other datacenter is extremely low, and most importantly gives us flexibility.
Each host has a local memcached instance used to cache the slow S3 lookups. Usually there are many more steps involved with decodes, but Z Proxy skips most that are not critically essential, such as spam checking. Because it has fewer features than the main decode path, and because it is written in optimized Go, this is a lightweight way to serve our decodes (thousands a second) in a failure scenario. We keep sufficient capacity on these systems to be ready for a failure at any time.
Metrics & Background Processing
Because we use NSQ, even if the primary infrastructure is down, hosts running Z Proxy (we call these “lastresort” hosts) can create and queue messages corresponding to each decode request. That means when everything is back up and running, the primary infrastructure will process messages from these hosts. Info+ pages will be updated with clicks that happened when everything was down, ratelimits will be adjusted, realtime will find new trends based on these clicks, and more.
Z Proxy also records metrics for internal use. It sends data to graphite recording response times, types of requests, etc., but of course since it makes no assumptions about anything in our infrastructure working, graphite included, it also aggregates some stats locally.
Normally our DNS points to our load balancers, which send requests off to frontend webservers. Nginx on each frontend webserver is configured to handle local timeouts and failures by transparently retrying the request against a lastresort host. Nginx on each lastresort host then sends the request to one of a few local Z Proxy instances. This is great because it allows failovers on a per request basis, but if our frontend servers or load balancers are taken out (ie: we loose datacenter connectivity), it doesn’t help. In this case, we can point DNS for all of our domains directly at the lastresort hosts.
But What if Z Proxy Doesn’t Work?
The trust-nobody approach of Z Proxy makes it very stable, but ultimately it could still break, so even the Go app isn’t enough.
To have an additional level of safety, the S3 key is the short link, but the value isn’t actually the long url itself. The S3 value is an HTML blob containing a meta refresh to the destination url. This allows Z Proxy to parse out the long url, but also allows nginx on lastresort hosts to proxy the 200 responses directly from S3 if Z Proxy goes down. This multi-tier approach to failures gives us increasing levels of availability with decreasing levels of features, metrics, performance, and consistency.
This system gives us confidence that we can serve decodes with high availability, and in the event of an outage or failure, it gives us options for where to send traffic. Because our link resolution dataset is immutable, S3 is an invaluable tool. While we might take slightly different approaches with dynamic data, providing layers of fallbacks from S3 and transparent retrying across datacenters is simple and effective at providing high availability.
- Because we have tens of billions of links, this makes our Z Proxy bucket about 1.25% of all S3 objects (about 4TB in size).
- The main decode path is written in Tcl, a strange and interesting language that I had never seen before working at bitly.
Brace yourself, this is a long one.
The following guide was originally intended for client library developers to describe in detail all the important features and functionality we expected in an NSQ client library.
While writing it we began to realize that it had value beyond client library developers. It incorporates a comprehensive analysis of most of the capabilities of NSQ (both client and server) and is therefore interesting and useful for end-users as well (or anyone using or interested in infrastructure messaging platforms).
NSQ’s design pushes a lot of responsibility onto client libraries in order to maintain overall cluster robustness and performance.
This guide attempts to outline the various responsibilities well-behaved client libraries need to
fulfill. Because publishing to
nsqd is trivial (just an HTTP POST to the
/put endpoint), this
document focuses on consumers.
By setting these expectations we hope to provide a foundation for achieving consistency across languages for NSQ users.
- Discovery (optional)
- Connection Handling
- Feature Negotiation
- Data Flow / Heartbeats
- Message Handling
- RDY State
At a high level, our philosophy with respect to configuration is to design the system to have the flexibility to support different workloads, use sane defaults that run well “out of the box”, and minimize the number of dials.
A client subscribes to a
topic on a
channel over a TCP connection to
nsqd instance(s). You can
only subscribe to one topic per connection so multiple topic consumption needs to be structured
nsqlookupd for discovery is optional so client libraries should support a configuration
where a client connects directly to one or more
nsqd instances or where it is configured to poll
one or more
nsqlookupd instances. When a client is configured to poll
nsqlookupd the polling
interval should be configurable. Additionally, because typical deployments of NSQ are in distributed
environments with many producers and consumers, the client library should automatically add jitter
based on a random % of the configured value. This will help avoid a thundering herd of connections.
For more detail see Discovery.
An important performance knob for clients is the number of messages it can receive before
expects a response. This pipelining facilitates buffered, batched, and asynchronous message
handling. By convention this value is called
max_in_flight and it effects how
RDY state is
managed. For more detail see RDY State.
Being a system that is designed to gracefully handle failure, client libraries are expected to implement retry handling for failed messages and provide options for bounding that behavior in terms of number of attempts per message. For more detail see Message Handling.
Relatedly, when message processing fails, the client library is expected to automatically handle
re-queueing the message. NSQ supports sending a delay along with the
REQ command. Client libraries
are expected to provide options for what this delay should be set to initially (for the first
failure) and how it should change for subsequent failures. For more detail see Backoff.
Most importantly, the client library should support some method of configuring callback handlers for message processing. The signature of these callbacks should be simple, typically accepting a single parameter (an instance of a “message object”).
An important component of NSQ is
nsqlookupd, which provides a discovery service for consumers to
locate producers of a given topic at runtime.
Although optional, using
nsqlookupd greatly reduces the amount of configuration required to
maintain and scale a large distributed NSQ cluster.
When a client uses
nsqlookupd for discovery, the client library should manage the process of
nsqlookupd instances for an up-to-date set of
nsqd producers and should manage the
connections to those producers.
nsqlookupd instance is straightforward. Perform an HTTP request to the lookup endpoint
with a query parameter of the topic the client is attempting to discover (i.e.
/lookup?topic=clicks). The response format is JSON:
tcp_port should be used to connect to an
nsqd producer. Because, by
nsqlookupd instances don’t coordinate their lists of producers, the client library should
union the lists it received from all
nsqlookupd queries to build the final list of
producers to connect to. The
broadcast_address:tcp_port combination should be used as the unique
key for this union.
A periodic timer should be used to repeatedly poll the configured
nsqlookupd so that clients will
automatically discover new producers. The client library should automatically initiate connections
to all newly found
When client library execution begins it should bootstrap this polling process by kicking off an
initial set of requests to the configured
Once a client has an
nsqd producer to connect to (via discovery or manual configuration), it
should open a TCP connection to
broadcast_address:port. A separate TCP connection should be made
nsqd for each topic the client wants to consume.
When connecting to an
nsqd instance, the client library should send the following data, in order:
- the magic identifier
IDENTIFYcommand (and payload) and read/verify response
SUBcommand (specifying desired topic) and read/verify response
- an initial
RDYcount of 1 (see RDY State).
(low-level details on the protocol are available in the spec)
Client libraries should automatically handle reconnection as follows:
If the client is configured with a specific list of
nsqdinstances, reconnection should be handled by delaying the retry attempt in an exponential backoff manner (i.e. try to reconnect in 8s, 16s, 32s, etc., up to a max).
If the client is configured to discover instances via
nsqlookupd, reconnection should be handled automatically based on the polling interval (i.e. if a client disconnects from an
nsqd, the client library should only attempt to reconnect if that instance is discovered by a subsequent
nsqlookupdpolling round). This ensures that clients can learn about producers that are introduced to the topology and ones that are removed (or failed).
IDENTIFY command can be used to set
nsqd side metadata, modify client settings,
and negotiate features. It satisfies two needs:
- In certain cases a client would like to modify how
nsqdinteracts with it (currently this is limited to modifying a client’s heartbeat interval but you can imagine this could evolve to include enabling compression, TLS, output buffering, etc.)
nsqdresponds to the
IDENTIFYcommand with a JSON payload that includes important server side configuration values that the client should respect while interacting with the instance.
After connecting, based on the user’s configuration, a client library should send an
command (the body of an
IDENTIFY command is a JSON payload), e.g:
feature_negotiation field indicates that the client can accept a JSON payload in return. The
long_id are arbitrary text fields that are used by
heartbeat_interval configures the interval between heartbeats on a per-client
nsqd will respond
OK if it does not support feature negotiation (introduced in
More detail on the use of the
max_rdy_count field is in the RDY State section.
Once a client is in a subscribed state, data flow in the NSQ protocol is asynchronous. For consumers, this means that in order to build truly robust and performant client libraries they should be structured using asynchronous network IO loops and/or “threads” (the scare quotes are used to represent both OS-level threads and userland threads, like coroutines).
Additionally clients are expected to respond to periodic heartbeats from the
they’re connected to. By default this happens at 30 second intervals. The client can respond with
any command but, by convention, it’s easiest to simply respond with a
NOP whenever a heartbeat
is received. See the protocol spec for specifics on how to identify heartbeats.
A “thread” should be dedicated to reading data off the TCP socket, unpacking the data from the frame, and performing the multiplexing logic to route the data as appropriate. This is also conveniently the best spot to handle heartbeats. At the lowest level, reading the protocol involves the following sequential steps:
- read 4 byte big endian uint32 size
- read size bytes data
- unpack data
- goto 1
A Brief Interlude on Errors
Due to their asynchronous nature, it would take a bit of extra state tracking in order to
correlate protocol errors with the commands that generated them. Instead, we took the “fail fast”
approach so the overwhelming majority of protocol-level error handling is fatal. This means that if
the client sends an invalid command (or gets itself into an invalid state) the
nsqd instance it’s
connected to will protect itself (and the system) by forcibly closing the connection (and, if
possible, sending an error to the client). This, coupled with the connection handling mentioned
above, makes for a more robust and stable system.
The only errors that are not fatal are:
E_FIN_FAILED- The client tried to send a
FINcommand for an invalid message ID.
E_REQ_FAILED- The client tried to send a
REQcommand for an invalid message ID.
E_TOUCH_FAILED- The client tried to send a
TOUCHcommand for an invalid message ID.
Because these errors are most often timing issues, they are not considered fatal. These situations
typically occur when a message times out on the
nsqd side and is re-queued and delivered to
another client. The original recipient is no longer allowed to respond on behalf of that message.
When the IO loop unpacks a data frame containing a message, it should route that message to the configured handler for processing.
nsqd producer expects to receive a reply within its configured message timeout (default: 60
seconds). There are a few possible scenarios:
- The handler indicates that the message was processed successfully.
- The handler indicates that the message processing was unsuccessful.
- The handler decides that it needs more time to process the message.
- The in-flight timeout expires and
nsqdautomatically re-queues the message.
In the first 3 cases, the client library should send the appropriate command on the client’s behalf
FIN command is the simplest of the bunch. It tells
nsqd that it can safely discard the
FIN can also be used to discard a message that you do not want to process or retry.
REQ command tells
nsqd that the message should be re-queued (with an optional parameter
specifying the amount of time to defer additional attempts). If the optional parameter is not
specified by the client, the client library should automatically calculate the duration in relation
to the number of attempts to process the message (a multiple is typically sufficient). The client
library should discard messages that exceed the configured max attempts. When this occurs, a
user-supplied callback should be executed to notify and enable special handling.
If the message handler requires more time than the configured message timeout, the
can be used to reset the timer on the
nsqd side. This can be done repeatedly until the message is
REQ, up to the
nsqd producer’s configured max timeout. Client libraries should
TOUCH on behalf of the client.
nsqd instance receives no response, the message will time out and be automatically
re-queued for delivery to an available client.
Finally, a property of each message is the number of attempts. Client libraries should compare this value against the configured max and discard messages that have exceeded it. When a message is discarded there should be a callback fired. Typical default implementations of this callback might include writing to a directory on disk, logging, etc. The user should be able to override the default handling.
Because messages are pushed from
nsqd to clients we needed a way to manage the flow of data in
userland rather than relying on low-level TCP semantics. A client’s
RDY state is NSQ’s flow
As outlined in the configuration section, a consumer is configured with a
max_in_flight. This is a concurrency and performance knob, e.g. some downstream systems are able
to more-easily batch process messages and benefit greatly from a higher
When a client connects to
nsqd (and subscribes) it is placed in an initial
RDY state of
messages will be sent to the client.
Client libraries have a few responsibilities:
- bootstrap and evenly distribute the configured
max_in_flightto all connections.
- never allow the aggregate sum of
RDYcounts for all connections (
total_rdy_count) to exceed the configured
- never exceed the per connection
- expose an API method to reliably indicate message flow starvation
There are a few considerations when choosing an appropriate
RDY count for a connection (in order
to evenly distribute
- the # of connections is dynamic, often times not even known in advance (ie. when
discovering producers via
max_in_flightmay be lower than your number of connections
To kickstart message flow a client library needs to send an initial
RDY count. Because the
eventual number of connections is often not known ahead of time it should start with a value of
so that the client library does not unfairly favor the first connection(s).
Additionally, after each message is processed, the client library should evaluate whether or not
it’s time to update
RDY state. An update should be triggered if the current value is
0 or if it
is below ~25% of the last value sent.
The client library should always attempt to evenly distribute
RDY count across all connections.
Typically, this is implemented as
max_in_flight / num_conns.
max_in_flight < num_conns this simple formula isn’t sufficient. In this state,
client libraries should perform a dynamic runtime evaluation of producer “liveness” by measuring the
duration of time since it last received a message for a connection. After a configurable expiration,
it should re-distribute whatever
RDY count is available to a new (random) set of producers. By
doing this, you guarantee that you’ll (eventually) find producers with messages. Clearly this has a
The client library should maintain a ceiling for the maximum number of messages in flight for a
given consumer. Specifically, the aggregate sum of each connection’s
RDY count should never exceed
Below is example code in Python to determine whether or not the proposed RDY count is valid for a given connection:
3. Producer Max RDY Count
nsqd is configurable with a
--max-rdy-count (see feature
negotiation for more information on the handshake a client can perform to
ascertain this value). If the client sends a
RDY count that is outside of the acceptable range its
connection will be forcefully closed. For backwards compatibility, this value should be assumed to
2500 if the
nsqd instance does not support feature negotiation.
4. Message Flow Starvation
Finally, the client library should provide an API method to indicate message flow starvation. It is
insufficient for clients (in their message handlers) to simply compare the number of messages they
have in-flight vs. their configured
max_in_flight in order to decide to “process a batch”. There
are two cases when this is problematic:
- When clients configure
max_in_flight > 1, due to variable
num_conns, there are cases where
max_in_flightis not evenly divisible by
num_conns. Because the contract states that you should never exceed
max_in_flight, you must round down, and you end up with cases where the sum of all
RDYcounts is less than
- Consider the case where only a subset of producers have messages. Because of the expected even
RDYcount, those active producers only have a fraction of the configured
In both cases, a client will never actually receive
max_in_flight # of messages. Therefore, the
client library should expose a method
is_starved that will evaluate whether any of the
connections are starved, as follows:
is_starved method should be used by message handlers to reliably identify when to process a
batch of messages.
The question of what to do when message processing fails is a complicated one to answer. The message handling section detailed client library behavior that would defer the processing of failed messages for some (increasing) duration of time. The other piece of the puzzle is whether or not to reduce throughput. The interplay between these two pieces of functionality is crucial for overall system stability.
By slowing down the rate of processing, or “backing off”, the consumer allows the downstream system to recover from transient failure. However, this behavior should be configurable as it isn’t always desirable, such as situations where latency is prioritized.
Backoff should be implemented by sending
RDY 0 to the appropriate producers, stopping message
flow. The duration of time to remain in this state should be calculated based on the number of
repeated failures (exponential). Similarly, successful processing should reduce this duration until
the reader is no longer in a backoff state.
While a reader is in a backoff state, after the timeout expires, the client library should only ever
RDY 1 regardless of
max_in_flight. This effectively “tests the waters” before returning to
full throttle. Additionally, during a backoff timeout, the client library should ignore any success
or failure results with respect to calculating backoff duration (i.e. it should only take into
account one result per backoff timeout).
Bringing It All Together
Distributed systems are fun.
The interactions between the various components of an NSQ cluster work in concert to provide a platform on which to build robust, performant, and stable infrastructure. We hope this guide shed some light as to how important the client’s role is.
Message- a high-level message object, which exposes stateful methods for responding to the
TOUCH, etc.) as well as metadata such as attempts and timestamp.
Connection- a high-level wrapper around a TCP connection to a specific
nsqdproducer, which has knowledge of in flight messages, its
RDYstate, negotiated features, and various timings.
Reader- the front-facing API a user interacts with, which handles discovery, creates connections (and subscribes), bootstraps and manages
RDYstate, parses raw incoming data, creates
Messageobjects, and dispatches messages to handlers.
We’re happy to help support anyone interested in building client libraries for NSQ. We’re looking for contributors to continue to expand our language support as well as flesh out functionality in existing libraries. The community has already open sourced client libraries for 7 languages:
- go-nsq Go (official)
- pynsq Python (official)
- libnsq C
- nsq-java Java
- TrendrrNSQClient Java
- nsqjava Java
- nsq-client Node.js
- nodensq Node.js
- nsqphp PHP
- ruby_nsq Ruby
If you have any specific questions, feel free to ping me on twitter @imsnakes.
Recently we’ve started to experiment with using Redshift, Amazon’s new data warehousing service. More specifically, we’re using it to speed up and expand our ad hoc data analysis.
bitly sees billions of clicks and shortens each month. Often we have various questions about the data generated from this activity. Sometimes these questions are driven by business needs (how much traffic do we see from a potential enterprise customer), sometimes they are more technically driven (how much traffic will a new sub-system need to deal with), and sometimes we like to just have fun (what are the top trashy celeb stories this week).
Unfortunately, when working with that volume of data it can be pretty difficult to do much of anything quickly. Pre-Redshift, all of these questions were answered by writing map-reduce jobs to be run on our Hadoop cluster or on Amazon’s EMR.
Whenever we wanted to answer a question with our data, the process would look something like this:
- Write map-reduce job in Python
- Run it on some local test data
- Fix bugs.
- Run it on the Hadoop cluster
- Wait 20-30 minutes for results
- Get an error back from Hadoop
- Dig through the logs to find the error.
This is clearly not ideal when all you want to do is get a simple count.
For a lot of the work we do Hadoop + Python make for an awesome combination, but for these ad hoc aggregation queries they’re very blunt instruments. In both cases, they are general purpose tools that are super flexible, but slow and difficult to use for this specific use case.
Redshift, on the other hand, is specifically built and optimized for doing aggregation queries over large sets of data. When we want to answer a question with Redshift, we just write a SQL query and get an answer within a few minutes—if not seconds.
Overall, our experience with Redshift has been a positive one but we have run into some gotchas that we’ll get into below.
The Good News
From a user perspective, we’re really happy with Redshift. Any one of our developers or data scientists just need to write a SQL query and they have an answer to their question in less than 5 minutes. Moving from our old hadoop based workflow to an interactive console session with Redshift is a major improvement.
Additionally, since much of the user facing bits of Redshift are based on PostgreSQL there is a large ecosystem of mature, well-documented tools and libraries for us to take advantage of.
Finally, while it can be a bit slow at times, we’ve been very impressed with the web management console Amazon provides with Redshift. For a 1.0 product, the console is comprehensive and offers much more information than we expected it to.
For our current use case of ad hoc research queries, Redshift’s performance is adequate. Most queries return a response in less than five minutes and we rarely have many users executing queries concurrently.
That being said, we have done some experimentation with competing products (e.g. Vertica) and have seen better performance out of those tools. This is especially true for more complex queries that benefit from projections/secondary indexes and situations where the cluster’s resources are under contention.
Just like the rest of AWS, Amazon provides reasonably comprehensive and thorough documentation. For everything that is directly exposed to us as users (e.g. loading operations, configuration params, etc) we are very happy with the documentation. The only places where we felt we wanted more information were those where Amazon makes things “just work”. Most significantly we would like to see more details about what exactly happens when a node in the cluster fails and how the cluster is expected to behave in that state.
We wouldn’t go so far as to call Redshift cheap, but compared to many competitors it is pretty cost effective. The biggest gotcha here is that while the simple model for scaling Redshift clusters and tuning performance within a cluster is nice as a user, it does mean that you have a bit of a one-size-fits all situation.
In our case we are computationally and I/O constrained so we’re paying for a bunch of storage capacity and memory that we don’t use. At our current scale, things work out okay but as we continue to grow it may make sense to take advantage of something else that is more flexible in terms of both hardware and tuning.
The Bad News
We had to spend a lot of time getting our data into Redshift. This is partially our fault since our dataset is not the cleanest in the world, but overall this is the place that we felt the most pain from an immature tool chain.
The majority of bitly’s at-rest data is stored in line-oriented (i.e. one JSON blob per line) JSON files. The primary way to load data into Redshift is to use the COPY command to point the cluster at a pile of CSV or TSV files stored on S3. Clearly we needed to build out some kind of tool chain to transform our JSON logs into flat files.
Initially, we just wrote a simple Python script that would do the transformation. Unfortunately, we quickly discovered that this simple approach would be too slow. We estimated that it would have taken a month to process and load all the data we wanted in Redshift.
Next, we realized that we had a tool for easily doing highly parallelized, distributed text processing: Hadoop. Accordingly, we re-worked our quick Python script into a hadoop job to transform our logs in a big batch. Since we already keep a copy of our raw logs in S3, EMR proved to be a great tool for this
Overall this process worked well but we did still run into a few gotchas loading the flattened data into Redshift:
- Redshift only supports a subset of UTF-8, specifically characters can only be 3 bytes long. Amazon does mention this in their docs, but it still bit us a few times.
- varchar field lengths are in terms of bytes, not characters. In Python, byte strings expose these as one and the same. Unicode strings on the other hand do not. It took us a little while to realize this was happening and to get our code setup to do byte length truncations without truncating mid-characater in unicode strings. To get an idea of how we went about doing unicode aware truncation, check out this relevant stack overflow thread.
- Moving floats between different systems always has some issues. In Python the default string output of small float values is scientific notation (.332e-8). Unfortunately Redshift doesn’t recognize this format so we needed to force our data prep job to always output floats in decimal format.
Now that we have a large body of data loaded into Redshift, we’re working on building out tooling based on NSQ to do our data prep work in a streaming fashion that should allow us to easily do smaller incremental updates.
In the end, we worked through our data loading issues with Redshift but it was one of the more acute pain points we encountered. From our conversations with Amazon, they’re definitely aware of this and we’re interested to see what they’ll come out with, but for now the provided tooling is pretty limited.
Long term, there are a number of periodic and online tasks that we’re thinking about using a tool like Redshift for. Unfortunately, as things stand today we would not be comfortable relying on Redshift as a highly available system.
Currently, if any node within a Redshift cluster fails, all outstanding queries will fail and Amazon will automatically start replacing the failed node. In theory this recovery should happen very quickly. The cluster will be available for querying as soon as the replacement node is added, but performance on the cluster will be degraded while data is restored on to the new node.
At this point, we have no data on how well this recovery process works in the real world. Additionally, we have concerns about how well this process will work when there are larger issues happening within an availability zone or region. Historically there are a number of cases where issues within one Amazon service (e.g. EBS) have cascaded into other services leading to long periods of unavailability or degradation.
Until there’s a significant track record behind a system like this, we’re hesitant to trust anything that will “automatically recover”.
There is the option of running a mirrored Redshift cluster in a different AZ or region, but that gets expensive fast. Additionally, we’d have to build out even more tooling to make sure those two clusters stay in sync with each other.
Limited Feature Set
Redshift is very impressive feature-wise for a 1.0 product. That being said, a number of the competing products have been around for a while and offer some major features that Reshift lacks.
The biggest missing feature for us with Redshift is some kind of secondary indexing or projections. Right now, if you sort or filter by any column other than the SORTKEY, Redshift will do a full table scan. Technically you could create a copy of that table with a different sort key but then it would become your problem to keep those tables in sync and to query the right table at the right time.
Some other “missing” features include tools for working with time series data, geospatial query tools, advanced HA features, and more mature data loading tools.
The Bottom Line
Redshift is great for our needs today, but we’ll see if Amazon’s development keeps up as our needs change going forward. Given Amazon’s impressive track record for quickly iterating on and improving products we’re hopeful, but we do have our eyes on competing products as our use of data warehousing tools matures.
A common infrastructure problem is managing access to internal applications. Some patterns for solving that problem include: fronting internal apps with HTTP Basic or HTTP Digest authentication through Apache or nginx, running the apps on secret ports, and bundling an authentication system as part of each application. Many off-the-shelf applications include no built-in authentication, and count on upstream proxies/systems for access control which limits options for securing them. Google Auth Proxy is a tool we have developed to give us a better ability to secure internal applications by using Google Account authentication (like many other startups, we rely on Google Apps).
At bitly, we like fronting HTTP applications with Nginx and use that approach for nearly all of our stack, from Tornado apps handling our core APIs to common internal tools (e.g. Nagios, Munin, Graphite) to homegrown tools like NSQ and our deploy system. For many of those systems, we took advantage of this by configuring HTTP Basic authentication to restrict access. Using Basic Auth however led to situations where the authentication information would be stored in configuration files and scripts, leaking it throughout the repository.
Some of our homegrown tools initially relied on bitly’s OAuth 2 service for authentication, which meant that sign in was managed through bitly accounts by the OAuth flow. This left each application to handle authorization by maintaining a list of bitly accounts that actually had permission to access the app. This was an improvement over HTTP Basic Auth as each application did not need to accept or store passwords, but it was less than ideal because each app still needed to store and manage its own list of authorized users. Over time, this led to a large number of disparate systems with separate authorization lists. This approach was only feasible for internal tools and thus couldn’t apply to other open source applications we were using.
We developed Google Auth Proxy as a new tool to use in securing internal applications. It is a HTTP
Reverse Proxy that provides authentication using Google’s OAuth2 API with flexibility to authorize
individual Google Accounts (by email address) or a whole Google apps domain.
For internal applications, this is convenient because we can now allow our whole
@bit.ly Google apps domain without
separately managing accounts or passwords.
Google Auth Proxy requires a limited set of privileges in order to authenticate users (asking for only the
OAuth2 scopes). This authentication information
is then passed to the upstream application as HTTP Basic Auth (with an empty value for the password), and in a HTTP
X-Forwarded-User for applications that need that context.
Google Auth Proxy has been in production for 6 months and has helped reduce overhead for managing internal applications (no setup time for new or removed accounts) and has made it easier for us to open up access for important tools to the entire company. It’s also worth mentioning that if you use two-factor-authentication with your Google accounts, that security carries over to improved authentication for Google Auth Proxy.
We chose to write Google Auth Proxy in Go because golang has built in support for writing a concurrent
ReverseProxy in the
net/http/httputil package. This meant that little work was needed to proxy
requests and we could focus on just writing the authentication layer.
If you find this useful, we’d love to hear about it @bitly
What does this mean? Why is this useful? What makes this the most important thing since sliced bread?! Read on!
Storing distributions is crucial for doing any sort of statistics on a dataset. Normally, this problem is as easy as keeping counters of how many times we’ve seen certain quantities, but when dealing with data streams that constantly deliver new data, these simple counters no longer do the job. They fail because data we saw weeks ago has the same weight as data we have just seen, even though the fundamental distribution the data is describing may be changing. In addition it provides an engineering challenge since the counters would strictly grow and very soon use all the resources available to it. This is why we created Forget Table.
A categorical distribution describes the probability of seeing an event occur out of a set of possible events. So an example of this at bitly is that every click coming through our system comes from one of a fixed number of country codes (there are about 260). We would like to maintain a categorical distribution, that assigns a probability to each country, that describes how likely any given click comes from each country. With bitly’s data, this is going to give a high weight to the US and lower weights to Japan and Brazil, for example.
It’s very useful for bitly to store distributions like this, as it gives us a good idea as to what’s considered ‘normal’. Most of the time we produce analytics that show, for example, how many clicks came from a given country, or referrer. While this gives our clients a sense of where there traffic is coming from, it doesn’t directly express how surprising their traffic is. This can be remedied by maintaining a distribution over countries for all of bitly, so that we can identify anomalous traffic, ie: when a particular link is getting disproportionally more clicks from Oregon than we would normally expect.
The difficulty that Forget Table deals with comes from the fact that what bitly considers normal changes constantly. At 8am on the East Coast of the US, we’d be surprised to see a lot of traffic coming from San Francisco (they’re all still asleep over there) however at 11am EST we should expect to see tons of traffic coming from San Francisco. So unless we allow our understanding of what’s considered normal to change over time, we are going to have a skewed idea of normality.
This is a general problem over longer time scales, too. The behavior of the social web is different in December than it is in July, and it’s different in 2012 than it was in 2011. We need to make sure that our understanding of normality changes with time. One way of achieving this is to forget old data that’s no longer relevant to our current understanding. This is where Forget Table comes in.
Why forget in the first place?
The basic problem is that the fundamental distribution (ie: the distribution that the data is actually being created from) is changing. This property is called being “non-stationary”. Simply storing counts to represent your categorical distribution makes this problem worse because it keeps the same weight for data seen weeks ago as it does for the data that was just observed. As a result, the total count method simply shows the combination of every distribution the data was drawn from (which will approach a Gaussian by the central limit theorem).
A naive solution to this would be to simply have multiple categorical distributions that are in rotation. Similar to the way log files get rotated, we would have different distributions that get updated at different time. This approach, however, can lead to many problems.
When a distribution first gets rotated in, it has no information about the current state of the world. Similarly, at the end of a distribution’s rotation it is just as affected by events from the beginning of it’s rotation as it is by recent events. This creates artifacts and dynamics in the data which are only dependent on the time of and time between rotations. These effects are similar to various pathologies that come from binning data or by simply keeping a total count.
On the other hand, forgetting things smoothly using rates we have a continuous window of time that our distribution is always describing. The further back in time the event is, the less of an effect it has on the distribution’s current state.
Forgetting Data Responsibly
Forget table takes a principled approach to forgetting old data, by defining the rate at which old data should decay. Each bin of a categorical distribution forgets its counts depending on how many counts it currently has and a user specified rate. With this rule, bins that are more dominant get decayed faster than bins without many counts. This method also has the benefit of being a very simple process (one that was inspired by the decay of radioactive particles) which can be calculated very quickly. In addition, since bins with high counts get decayed faster than bins with low counts, this process helps smooth out sudden spikes in data automatically.
If the data suddenly stopped flowing into the Forget Table, then all the categorical distributions would eventually decay to the uniform distribution - each bin would have a count of 1 (at which point we stop decaying), and z would be equal to the number of bins (see a visualization of this happening). This captures the fact that we no longer have any information about the distribution of the variables in Forget Table.
The result of this approach is that the counts in each bin, in each categorical distribution, decay exponentially. Each time we decrement the count in a bin, we also decrement the normalising constant z. When using ForgetTable, we can choose a rate at which things should decay, depending on the dominant time constants of the system.
Building Categorical Distributions in Redis
We store the categorical distribution as a set of event counts, along with a ‘normalising constant’ which is simply the number of all the events we’ve stored. In the country example, we have ~260 bins, one per country, and in each bin we store the number of clicks we’ve seen from each country. Alongside it, our normalising constant stores the total number of clicks we’ve seen across all countries.
All this lives in a Redis sorted set where
the key describes the variable which, in this case, would simply be
bitly_country and the value would be a categorical distribution. Each element
in the set would be a country and the score of each element would be the number
of clicks from that country. We store a separate element in the set
(traditionally called z) that records the total number of clicks stored in
the set. When we want to report the categorical distribution, we extract the
whole sorted set, divide each count by z, and report the result.
Storing the categorical distribution in this way allows us to make very rapid writes (simply increment the score of two elements of the sorted set) and means we can store millions of categorical distributions in memory. Storing a large number of these is important, as we’d often like to know the normal behavior of a particular key phrase, or the normal behavior of a topic, or a bundle, and so on.
Forgetting Data Under Strenuous Circumstances
This seems simple enough, but we have two problems. The first is that we have potentially millions of categorical distributions. Bitly maintains information for over a million separate key phrases at any given time, and (for some super secret future projects) it is necessary to store a few distributions per key phrase. So we are unable to iterate through each key of our Redis table in order to do our decays, so cron-like decays wouldn’t be feasible (ie: decaying every distribution in the database every several minutes).
The second problem is that data is constantly flowing into multiple distributions: we sometimes see spikes of up to 3 thousand clicks per second which can correspond to dozens of increments per second. At this sort of high volume, there is simply too much contention between the decay process and the incoming increments to safely do both.
So the real contribution of Forget Table is an approach to forgetting data at read time. When we are interested in the current distribution of a particular variable, we extract whatever sorted set is stored against that variable’s key and decrement the counts at that time.
It turns out that, using the simple rate based model of decay from above, we can decrement each bin by simply sampling an integer from a Poisson distribution whose rate is proportional to the current count of the bin and the length of time it has been since we last decayed that bin. So, by storing another piece of information, the time since we last successfully decayed this distribution, we can calculate the amount of counts to discard very cheaply (this algorithm is an approximation to Gillespie’s algorithm used to simulate stochastic systems).
In Redis we implement this using pipelines. Using a pipeline, we read the sorted set, form the distribution, calculate the amount of decay for each bin and then attempt to perform the decay. Assuming nothing’s written into the sorted set in that time, we decay each bin and update the time since last decayed. If the pipeline has detected a collision — either another process has decayed the set or a new event has arrived — we abandon the update. The algorithm we’ve chosen means that it’s not terribly important to actually store the decayed version of the distribution, so long as we know the time between the read and the last decay.
Get the Code and We’re Hiring!
The result is a wrapper on top of Redis that runs as a little HTTP service. Its
API has an increment handler on
/incr used for incrementing counts based on
new events, and a get handler on
/get used for retrieving a distribution. In
addition, there is an
/nmostprobable endpoint that returns the n categories
which have the highest probability of occurring.
There are two versions, one in Go (for super speed) and the other in Python. The code is open source and up on Github, available at http://bitly.github.com/forgettable.
As always, if you like what you see (or feel the need to make improvements), don’t forget that bitly is hiring!.
Get ready for more posts on science at bitly, and If you have any specific questions, feel free to ping me on twitter @mynameisfiber.
We released NSQ on October 9th 2012. Supported by 3 talks and a blog post, it’s already the 4th most watched Go project on GitHub. There are client libraries in 7 languages and we continue to talk with folks experimenting with and transitioning to the platform.
This post aims to kickstart the documentation available for “getting started” and describe some NSQ patterns that solve a variety of common problems.
DISCLAIMER: this post makes some obvious technology suggestions to the reader but it generally ignores the deeply personal details of choosing proper tools, getting software installed on production machines, managing what service is running where, service configuration, and managing running processes (daemontools, supervisord, init.d, etc.).
Regardless of the type of web service you’re building, in most cases you’re going to want to collect some form of metrics in order to understand your infrastructure, your users, or your business.
For a web service, most often these metrics are produced by events that happen via HTTP requests, like an API. The naive approach would be to structure this synchronously, writing to your metrics system directly in the API request handler.
- What happens when your metrics system goes down?
- Do your API requests hang and/or fail?
- How will you handle the scaling challenge of increasing API request volume or breadth of metrics collection?
One way to resolve all of these issues is to somehow perform the work of writing into your metrics system asynchronously - that is, place the data in some sort of local queue and write into your downstream system via some other process (consuming that queue). This separation of concerns allows the system to be more robust and fault tolerant. At bitly, we use NSQ to achieve this.
Brief tangent: NSQ has the concept of topics and channels. Basically, think of a topic as a unique stream of messages (like our stream of API events above). Think of a channel as a copy of that stream of messages for a given set of consumers. Topics and channels are both independent queues, too. These properties enable NSQ to support both multicast (a topic copying each message to N channels) and distributed (a channel equally dividing its messages among N consumers) message delivery.
Integrating NSQ is straightforward, let’s take the simple case:
- Run an instance of
nsqdon the same host that runs your API application.
- Update your API application to write to the local
nsqdinstance to queue events, instead of directly into the metrics system. To be able to easily introspect and manipulate the stream, we generally format this type of data in line-oriented JSON. Writing into
nsqdcan be as simple as performing an HTTP POST request to the
- Create a consumer in your preferred language using one of our
client libraries. This “worker” will subscribe to the
stream of data and process the events, writing into your metrics system.
It can also run locally on the host running both your API application
Here’s an example worker written with our official Python client library:
In addition to de-coupling, by using one of our official client libraries, consumers will degrade gracefully when message processing fails. Our libraries have two key features that help with this:
- Retries - when your message handler indicates failure, that information is
nsqdin the form of a
REQ(re-queue) command. Also,
nsqdwill automatically time out (and re-queue) a message if it hasn’t been responded to in a configurable time window. These two properties are critical to providing a delivery guarantee.
- Exponential Backoff - when message processing fails the reader library will delay the receipt of additional messages for a duration that scales exponentially based on the # of consecutive failures. The opposite sequence happens when a reader is in a backoff state and begins to process successfully, until 0.
In concert, these two features allow the system to respond gracefully to downstream failure, automagically.
Ok, great, now you have the ability to withstand a situation where your metrics system is unavailable with no data loss and no degraded API service to other endpoints. You also have the ability to scale the processing of this stream horizontally by adding more worker instances to consume from the same channel.
But, it’s kinda hard ahead of time to think of all the types of metrics you might want to collect for a given API event.
Wouldn’t it be nice to have an archived log of this data stream for any future operation to leverage? Logs tend to be relatively easy to redundantly backup, making it a “plan z” of sorts in the event of catastrophic downstream data loss. But, would you want this same consumer to also have the responsibility of archiving the message data? Probably not, because of that whole “separation of concerns” thing.
Archiving an NSQ topic is such a common pattern that we built a utility, nsq_to_file, packaged with NSQ, that does exactly what you need.
Remember, in NSQ, each channel of a topic is independent and receives a
copy of all the messages. You can use this to your advantage when archiving
the stream by doing so over a new channel,
archive. Practically, this means
that if your metrics system is having issues and the
metrics channel gets
backed up, it won’t effect the separate
archive channel you’ll be using to
persist messages to disk.
So, add an instance of
nsq_to_file to the same host and use a command line
like the following:
/usr/local/bin/nsq_to_file --nsqd-tcp-address=127.0.0.1:4150 --topic=api_requests --channel=archive
You’ll notice that the system has not yet evolved beyond a single production host, which is a glaring single point of failure.
Unfortunately, building a distributed system is hard. Fortunately, NSQ can help. The following changes demonstrate how NSQ alleviates some of the pain points of building distributed systems as well as how its design helps achieve high availability and fault tolerance.
Let’s assume for a second that this event stream is really important. You want to be able to tolerate host failures and continue to ensure that messages are at least archived, so you add another host.
Assuming you have some sort of load balancer in front of these two hosts you can now tolerate any single host failure.
Now, let’s say the process of persisting, compressing, and transferring these logs is affecting performance. How about splitting that responsibility off to a tier of hosts that have higher IO capacity?
This topology and configuration can easily scale to double-digit hosts, but
you’re still managing configuration of these services manually, which does not
scale. Specifically, in each consumer, this setup is hard-coding the address
nsqd instances live, which is a pain. What you really want is for
the configuration to evolve and be accessed at runtime based on the state of
the NSQ cluster. This is exactly what we built
nsqlookupd is a daemon that records and disseminates the state of an NSQ
cluster at runtime.
nsqd instances maintain persistent TCP connections to
nsqlookupd and push state changes across the wire. Specifically, an
registers itself as a producer for a given topic as well as all channels it
knows about. This allows consumers to query an
nsqlookupd to determine who
the producers are for a topic of interest, rather than hard-coding that
configuration. Over time, they will learn about the existence of new
producers and be able to route around failures.
The only changes you need to make are to point your existing
consumer instances at
nsqlookupd (everyone explicitly knows where
nsqlookupd instances are but consumers don’t explicitly know where
producers are, and vice versa). The topology now looks like this:
At first glance this may look more complicated. It’s deceptive though, as
the effect this has on a growing infrastructure is hard to communicate
visually. You’ve effectively decoupled producers from consumers because
nsqlookupd is now acting as a directory service in between. Adding
additional downstream services that depend on a given stream is trivial, just
specify the topic you’re interested in (producers will be discovered by
But what about availability and consistency of the lookup data? We generally
recommend basing your decision on how many to run in congruence with your
desired availability requirements.
nsqlookupd is not resource intensive and
can be easily homed with other services. Also,
nsqlookupd instances do not
need to coordinate or otherwise be consistent with each other. Consumers will
generally only require one
nsqlookupd to be available with the information
they need (and they will union the responses from all of the
instances they know about). Operationally, this makes it easy to migrate to a
new set of
Using these same strategies, we’re now peaking at 65k messages per second delivered through our production NSQ cluster. Although applicable to many use cases, we’re only scratching the surface with the configurations described above.
Stay tuned for more posts on leveraging NSQ to build distributed systems. If you have any specific questions, feel free to ping me on twitter @imsnakes.