Joining Bitly Engineering

First post! (aka Introduction)

Hello everyone, my name is Peter Herndon. I recently started working at Bitly as an application engineer on Bitly’s backend systems (which are legion). My recent experience is with a series of smaller start-ups, preceded by a long stint in a much larger and more conservative enterprise setting. I bring to the table expertise in Python, systems administration (both cloudy and bare metal), databases and systems architecture.

I’ve been interested in Bitly for quite a while, and wanted to work here for much of that time. Since its beginning, Bitly has had a reputation for technical excellence. The engineers here have demonstrated that excellence both by solving engineering challenges and by the ingenuity of how they approach those solutions. Bitly’s former chief scientist, Hilary Mason, single-handedly popularized the concepts of Big Data and Data Science, and legitimized them as engineering disciplines. Her talks and blog posts created my own awareness of and interest in the field. So when I had the opportunity to work here, I gladly leapt into it head first.

What I Found

A Company in the Process of Renewing Itself

Bitly is a unique place to work, even among tech businesses. The company employs about 60 people, about 25 of them technical, and has been in existence for 4-5 years now. That said, Bitly is in many ways a very new company. Recently the company underwent a shift in management, resulting in a new focus on business. The new CEO, Mark Josephson, brings a laser-sharp clarity to helping Bitly’s customers become successful by providing insight into how their brands are performing. This clarity of purpose is in addition to continuing the company’s technical leadership. We began the new year here with a renewed sense of purpose that is reflected in the number of new hires and the number of open positions.

I’ve experienced the process of watching an ailing small business shed employees and management, in a downward spiral of despair, including my own exit from that company. This is the first time I’ve experienced the rebirth of a company, the upward swell of pride and energy that comes from active leadership and direction. I’m very happy to see that Bitly has retained a great deal of its technical team, thus providing good institutional memory and continuity. That retention speaks well of the new leadership and the amount of pride in what the folks here have previously built. And what they’ve built is tremendous.

A Remarkable Technical Architecture

Bitly’s business is insight: providing customers with information that helps them make better decisions regarding their business by analyzing shortlink creation (referred to as encodes internally) and link click data (internally, decodes). To that end, our infrastructure must handle accumulating and manipulating around 6 billion decodes per month. That’s a lot of incoming HTTP requests. Not Google scale, but not pocket change by a long shot. To handle that volume, we use a stream-based architecture, rather than batch processing. That is, instead of accumulating incoming data in a data store and periodically processing it to reveal insights, we have a very deep, very long chain of processing steps. Each step, each link in the chain (and chain is an oversimplification since the structure is more of a directed graph, mostly acyclic) is an asynchronous processor that accepts incoming event data and performs a single logical transformation on the data. That transformation may be as simple as writing the datum to a file, or it may involve comparing it to other aggregated data for building recommendations, or for detecting spam and abuse. Frequently, the processed datum is then emitted back into the queue system for consumption further down the chain. The processed data are then made available via a service-oriented API, which is used to power the dashboards and reports we present to our customers. If any given step in the chain requires more processing power to handle the load of incoming events, we can spin up additional servers to run that particular step.

The advantage of stream-based processing over a traditional batch processing system is that the stream processing system is a great deal more resilient to spikes in incoming data. Since each processing step is asynchronous and has a built-in capacity limit, messages remain in the queue for that step until the processor is ready to handle them. The result is that every step in the chain has its own, independent capacity for handling data, and while backlogs occur (and we do monitor for them), a backlog in a given step is by no means a breaking problem as a whole. It may signify a failure in a particular subsystem, but the rest of the Bitly world will usually remain unaffected. Of course, when the problem is corrected, the result will usually be a backlog in the next steps of the chain, but that is usually fine and expected. Each step of the chain will chew through its allotted tasks and move on.

This stream processing system is powered by NSQ (documentation), about which much has been written and said, both on this very blog (here, here, and here) and elsewhere. I won’t add more, as I’m far from an expert (yet!), but I will say that I am impressed with how useful NSQ is for building large distributed systems that are remarkably resilient.

A Fanatical Attention to Code Quality

Another aspect of Bitly that has made a great impression on me is the devotion to code quality embodied in the code review process. Bitly experienced enormous growth at a time before modern configuration management tools became popular, and as a result wound up building their own system for managing server configuration. There is a certain amount of cruft in the system (how could there not be?), but Bitly’s engineers have paid a great deal of attention over time to making the deployment system as streamlined as possible. After all, maintaining the fleets of servers necessary to keep Bitly running is no small task. And that attention to operational maintainability spills over to the code that runs on those servers. Bitly has a code review process where equal emphasis is placed on functional correctness and test coverage, and on operational ease and maintainability. I’ve never had my code pored over with such a fine-toothed comb as I’ve had here, and going through the review process made me a better programmer overnight. In previous positions, I’ve quickly produced code that works; here at Bitly, I produce code that works, is aesthetically and semantically appropriate (i.e., consistent naming, following a reasonable style guide), and fits conceptually within the greater whole that is our code base. The review process can be frustrating at times, as I attempt to figure out the most efficient way to get my changes merged, but overall is a huge benefit, contributing greatly to the quality of the Bitly product.

A colleague asked me to comment on whether rigorous code review is better or worse than pair programming at improving code quality, since pair programming is something he has not done. My experience with pair programming is limited, but in that experience, pair programming does not provide a huge benefit to code quality. Instead, it is much more useful for design quality, hashing out architectural issues, and for transferring knowledge. The kind of issues I’ve caught in pair programming, or been caught in creating, are typically typos or minor logic bugs (brainos). These are the kind of bugs that pop immediately on trying to run your code for the first time, or running tests. (Tests are a given, right? Everybody writes tests nowadays.) So while there might be a tiny bit of added productivity from pair programming on the code quality front, that benefit is offset by consuming double the amount of programmer hours. The trade-off is that rigorous code review improves code quality a great deal, but does tend to lose sight of architecture and design issues. It encourages deep focus on the code itself, without considering the design. I think code review is necessary (or at least more beneficial) for code quality, while pair programming is not. Pair programming can be swapped for design meetings, thus reducing the total time spent by multiple developers on a single task.

A New (to Bitly) Approach to Teams

A major change we’ve instituted recently is to create what are being called “feature teams”. These feature teams are composed of a cross-functional slice of Bitly, including back-end developers, front-end developers, product and project management, and most importantly, business stakeholders from our Customer Success team. Each feature team is tasked with making improvements to our products, starting with different sections of the Bitly Brand Tools. I think this is the number one change towards better directing Bitly’s amazing technical talent to creating something useful for our customers, rather than just yet another neat technical tool. With our Customer Success team getting feedback on our proposed improvements directly from our customers, we are now in a perfect position to make Bitly the best source of insight it can be. And that is our ultimate goal, to provide our customers with better insight into the world around them.

In my previous experience, I’ve never seen “improvements” ever actually improve anything without feedback from customers. Near-misses, yes, but not actual hits. The inspiration should often come from within, as we are in the best position to improve existing features for all our customers, rather than just taking the opinion of one. But without business-side involvement, and without customer feedback, I’ve never seen a tech-driven improvement result in success for the actual end-user, unless the intended end-user is in fact technical. That is why a large percentage of start-ups focus on tools for other engineers, it’s easier to get started.

10 Things We Forgot to Monitor

There is always a set of standard metrics that are universally monitored (Disk Usage, Memory Usage, Load, Pings, etc). Beyond that, there are a lot of lessons that we’ve learned from operating our production systems that have helped shape the breadth of monitoring that we perform at bitly.

One of my favorite all-time tweets is from @DevOps_Borat

"Law of Murphy for devops: if thing can able go wrong, is mean is already wrong but you not have Nagios alert of it yet."

What follows is a small list of things we monitor at bitly that have grown out of those (sometimes painful!) experiences, and where possible little snippets of the stories behind those instances.

1 - Fork Rate

We once had a problem where IPv6 was intentionally disabled on a box via options ipv6 disable=1 and alias ipv6 off in /etc/modprobe.conf. This caused a large issue for us: each time a new curl object was created, modprobe would spawn, checking net-pf-10 to evaluate IPv6 status. This fork bombed the box, and we eventually tracked it down by noticing that the process counter in /proc/stat was increasing by several hundred a second. Normally you would only expect a fork rate of 1-10/sec on a production box with steady traffic.

check_fork_rate.sh

2 - flow control packets

TL;DR; If your network configuration honors flow control packets and isn’t configured to disable them, they can temporarily cause dropped traffic. (If this doesn’t sound like an outage, you need your head checked.)

$ /usr/sbin/ethtool -S eth0 | grep flow_control
rx_flow_control_xon: 0
rx_flow_control_xoff: 0
tx_flow_control_xon: 0
tx_flow_control_xoff: 0

Note: Read this to understand how these flow control frames can cascade to switch-wide loss of connectivity if you use certain Broadcom NIC’s. You should also trend these metrics on your switch gear. While at it, watch your dropped frames.

3 - Swap In/Out Rate

It’s common to check for swap usage above a threshold, but even if you have a small quantity of memory swapped, it’s actually the rate it’s swapped in/out that can impact performance, not the quantity. This is a much more direct check for that state.

check_swap_paging_rate.sh

4 - Server Boot Notification

Unexpected reboots are part of life. Do you know when they happen on your hosts? Most people don’t. We use a simple init script that triggers an ops email on system boot. This is valuable to communicate provisioning of new servers, and helps capture state change even if services handle the failure gracefully without alerting.

notify.sh

5 - NTP Clock Offset

If not monitored, yes, one of your servers is probably off. If you’ve never thought about clock skew you might not even be running ntpd on your servers. Generally there are 3 things to check for. 1) That ntpd is running, 2) Clock skew inside your datacenter, 3) Clock skew from your master time servers to an external source.

We use check_ntp_time for this check

6 - DNS Resolutions

Internal DNS - It’s a hidden part of your infrastructure that you rely on more than you realize. The things to check for are 1) Local resolutions from each server, 2) If you have local DNS servers in your datacenter, you want to check resolution, and quantity of queries, 3) Check availability of each upstream DNS resolver you use.

External DNS - It’s good to verify your external domains resolve correctly against each of your published external nameservers. At bitly we also rely on several CC TLD’s and we monitor those authoritative servers directly as well (yes, it’s happened that all authoritative nameservers for a TLD have been offline).

7 - SSL Expiration

It’s the thing everyone forgets about because it happens so infrequently. The fix is easy, just check it and get alerted with enough timeframe to renew your SSL certificates.

define command{
    command_name    check_ssl_expire
    command_line    $USER1$/check_http --ssl -C 14 -H $ARG1$
}
define service{
    host_name               virtual
    service_description     bitly_com_ssl_expiration
    use                     generic-service
    check_command           check_ssl_expire!bitly.com
    contact_groups          email_only
    normal_check_interval   720
    retry_check_interval    10
    notification_interval   720
}

8 - DELL OpenManage Server Administrator (OMSA)

We run bitly split across two data centers, one is a managed environment with DELL hardware, and the second is Amazon EC2. For our DELL hardware it’s important for us to monitor the outputs from OMSA. This alerts us to RAID status, failed disks (predictive or hard failures), RAM Issues, Power Supply states and more.

9 - Connection Limits

You probably run things like memcached and mysql with connection limits, but do you monitor how close you are to those limits as you scale out application tiers?

Related to this is addressing the issue of processes running into file descriptor limits. We make a regular practice of running services with ulimit -n 65535 in our run scripts to minimize this. We also set Nginx worker_rlimit_nofile.

10 - Load Balancer Status.

We configure our Load Balancers with a health check which we can easily force to fail in order to have any given server removed from rotation.We’ve found it important to have visibility into the health check state, so we monitor and alert based on the same health check. (If you use EC2 Load Balancers you can monitor the ELB state from Amazon API’s).

Various Other things to watch

New entries written to Nginx Error Logs, service restarts (assuming you have something in place to auto-restart them on failure), numa stats, new process core dumps (great if you run any C code).

EOL

This scratches the surface of how we keep bitly stable, but if that’s an itch you like scratching, we’re hiring.

Adventures in Optimizing Text Processing

Lessons learned while post-processing 1.75 billion lines of Hadoop output.

The Problem

Recently, I encountered a problem. I had a nightly Hadoop job running on EMR that churned over the past 30 days’ worth of Bitly redirect data in order to run reach analysis pertaining to about 1000 of our paid accounts. This job resulted in 175 gzipped “part” files, each containing at least 10 million lines of data. I needed to collate that data after the Hadoop job ran.

> ls part-*.gz
part-0000.gz
...
part-0174.gz

> zcat part-0000.gz | wc -l
10000000

The Hadoop output data inside the part files consisted of things like this:

"3,g,05-02,12SIMV6" 329
"175,geo,05,US,GA,Atlanta"  9987
"10,phrase,05,egg foo young"    1093
"11,n_clicks,05"    393999

Those were comma-delimited keys with the following structure and a count:

"[ACCOUNT_ID],[METRIC_TYPE],[DATE],[VALUE]"    COUNT

The challenge was this: How do I efficiently separate out this data by ACCOUNT_ID and METRIC_TYPE? That is, I wanted one file per ACCOUNT_ID-METRIC_TYPE combination.

First, Look on the Shelf

Like many people churning through volumes of data, we make use of the the mr_job python package for our Hadoop processing. At first I thought this was a no-brainer: “I’ll use the oddjob plugin. Yay, a solution already exists!” The plugin’s description was tailor-made for me:

"oddjob.MultipleJSONOutputFormat - Writes to the directories specified by the first element in the key" — https://github.com/jblomo/oddjob

Wrong.

  • oddjob plugin wouldn’t run at all on our Hadoop cluster.
  • oddjob plugin wouldn’t run consistently on EMR
  • This approach resulted in 890 x 175 x 5 = ~800K part files. To scp 800K files from EMR is a nightmare of a long time.

Secondary Hadoop Jobs

After days of struggling with oddjob, I cut bait on it and looked at running a set of secondary Hadoop jobs, using the output from the first Hadoop job as input to the second ones. Something like this:

for account_id in $account_ids
do
    run_emr_job_to_extract $account_id
done

Even if each job only took one minute (which it wouldn’t), 890 mins == 14 hours. That was no good.

On to Text Processing

Here’s where I started putting the bash time command and python timeit to work to try to whittle down to an approach that was viable.

zgrep

First I tried at the most straightforward approach I could think of — zgrep.

for account_id in $account_ids
do
    zgrep '^"$account_id,' part-*gz > $account_id.txt
done

That took 11.5 mins per account. 11.5 mins * 890 accounts = 170 hours

Blah. No.

zcat | awk

Next, I played around with zcat piped to awk. My final version of that was something akin to this:

zcat part-*gz | awk -F'[,\"]' '
{
    print >> $2"-"$3".txt"
}'

This approach seemed reasonably concise but it still took 15 hours to run. So yeah, no.

The problem with this approach is that it results in way more syscalls than is necessary. If you think about it, that append operator (>>) opens, writes, and closes the file being appended to every time you call it. Yikes, that’s 5.2 billion syscalls — three for each line of data!

Onward.

Python-Based Solutions

At this point, it was seeming like a bash solution wasn’t going to do it, so I ventured back into python. This was the concept:

import gzip
for part_file in part_files:
    with gzip.open(part_file, 'rb') as f:
        for line in f:
           # 1. parse line for account_id and metric_type
           # 2. write to appropriate file for account_id and metric_type

Before I even get into that second for loop, let’s consider gzip.open(). Let’s consider it, and then let’s ditch it.

Python gzip — Oh the Pain!

gzip.open() on those 10-million-line files was as slow as molasses in January. My first pass at an all-python solution took 15 hours. Much of that was spent in gzip.open(). Look at this:

> cat test-gzip.py
import gzip
f = gzip.open('10-million-line-file.gz')
for line in f:
    pass
f.close()

> time python test-gzip.py;  # on an m1.large
real    3m7.687s
user    3m6.844s
sys     0m0.068s

That’s 3.1 minutes per file. 3.1 mins * 175 files = 9 hours. JUST TO READ THE FILES!

zcat is much faster, so I quickly switched to a zcat-python combo solution. This approach also happens to leverage 2 cores, one for the zcat, the other for the python script.

> cat test-zcat.py
import sys
for line in sys.stdin:
    pass

> time $(zcat 10-million-lines.gz | python test-zcat.py)
real    0m3.642s
user    0m5.056s
sys     0m0.508s

3.6 SECONDS per file is much nicer, no? 3.6 seconds * 175 files = 10 minutes spent reading files. Definitely acceptable.

Dive into the python for loop

So now we’re here:

zcat part-*.gz | parse_accounts.py

What shall that parse_accounts.py look like? Something like this now:

import sys
for line in sys.stdin:
   # 1. parse line for account_id and metric_type
   # 2. write to appropriate file for account_id and metric_type
Parsing the lines

Recall that we want to extract ACCOUNT_ID and METRIC_TYPE from each line, and each line takes this form:

"[ACCOUNT_ID],[METRIC_TYPE],[DATE],[VALUE]"    COUNT

I’ll save us some pain and tell you to forget doing regular expression group matches. Using re.match() was 2X slower than line.split(). The fastest way I found was this:

# 1. parse line for account_id and metric_type
key = line.split(',')
account_id = key[ACCOUNT_ID_INDEX][1:] # strip the leading quote (")
metric_type = key[METRIC_TYPE_INDEX]

Note: I’m putting account_id and metric_type in variables here for clarity’s sake, but let me say this: If you’re going to be running a piece of code 1.75 billion times, it’s time to abandon clarity and embrace efficiency. If you’re only going to be accessing a variable one time, don’t bother setting it in a variable. If more than once, do bother setting it. You’ll see what I mean when it all comes together below.

Writing the files

At first, I tried to be clever and only open the files as needed, like this:

import sys
from collections import defaultdict
OUT_FILES = defaultdict(dict)
for line in sys.stdin:
    # 1. parse line for account_id and metric_type
    key = line.split(',')
    account_id = key[ACCOUNT_ID_INDEX][1:] # strip leading quote
    metric_type = key[METRIC_TYPE_INDEX]

    # 2. write to appropriate file for account_id and metric_type
    if metric_type not in OUT_FILES[account_id]:
         OUT_FILES[account_id][metric_type] = \
                 open(os.path.join(account_id, key[METRIC_TYPE_INDEX]), "wb")

    OUT_FILES[account_id][metric_type].write(line)

close_outfiles()  # close all the files we opened

Here I was at about 5 hours of processing time. Not bad considering where I started, but more paring could be done.

Again, I invoke the admonition to optimize the hell out of things when you’re performing an operation so many times.

So I should eliminate the extraneous if statement, right? Why did I need to open the files conditionally? So I didn’t open files unnecessarily? Who cares if a few extra files are opened and not used?

I ended up at essentially something like this:

import sys
from collections import defaultdict
OUT_FILES = defaultdict(dict)

open_outfiles()  # open all files I could possibly need

for line in sys.stdin:
    # 1. parse line for account_id and metric_type
    key = line.split(',')
    account_id = key[ACCOUNT_ID_INDEX][1:] # strip leading quote

    # 2. write to appropriate file for account_id and metric_type
    OUT_FILES[account_id][key[METRIC_TYPE_INDEX]].write(line)

close_outfiles()  # close all the files we opened

Final execution time: 1 hour 50 minutes.

try/finally

One last takeaway here is that sometimes you need to invest a good bit of development time in order to save yourself what might be an unacceptable amount of processing time. It requires patience and can be painful, sometimes with days’ worth of work ultimately abandoned.

But if you find yourself thinking, “There’s got to be a way to make this faster,” then it’s at least worth trying. I stopped short of rewriting it in C. Now that would have been faster!

If you’re interested in working on the next set of problems at Bitly, we’re actively hiring.

Getting more clues from Python’s logging module

During development of Python web applications, there are a lot of tools that can help provide clues about what caused a bug or exception. For example, Django’s default debug error page prints out local variables at every stack frame when there’s an unhandled exception. The popular django_debugtoolbar adds more information. In a similar vein there are things like Pyramid’s pyramid_debugtoolbar and its predecessor WebError.

But when troubleshooting production issues, those tools aren’t available, for good reason: We need the useful information to be exposed to developers, not overwhelming our end users nor exposing sensitive information to malicious eyes.

So, we turn to logging instead. But that provides a lot less information out of the box. So at bitly, we often found ourselves in an irritating cycle that went something like this:

  • Sometime soon after a deploy, we notice an unusual number of exceptions being logged. (Let’s say that old Python chestnut, 'NoneType' object is unsubscriptable.) So we know that we have some code that is expecting a dict or list and getting None instead.

  • We may decide the error is not critical enough to roll back without first trying to diagnose the underlying cause, but of course we want to fix it quickly. The pressure is on.

  • We find the traceback in our logs. It looks like:

Python traceback example

  • But where’s the None? Is it response['data'] or is it one of the entry values? We have no way to see. (Tip: We can actually tell that it’s not response['data']['subtotal']. Do you know why? Would you remember that while under pressure to fix a production bug?)

  • We try, and fail, to provoke the error on our local development or staging environments.

  • Then we add some logging code.

  • We get a quick code review and deploy the logging code.

  • We wait for the error to happen again.

  • We find the log message and realize we forgot to log everything we wanted to see, or we need more contextual information. Go back 3 steps and repeat as needed.

  • Finally we can see what the bad value was and we finally have enough clues that we can understand the cause of the error and fix it.

This is a frustrating amount of overhead when you’re trying to diagnose a production issue as quickly as possible, even with multiple people helping out.

Fortunately, if you make liberal use of logging.exception() — for example, if your web framework calls it on any unhandled exception, and if you take care to call it on unknown exceptions that you deliberately catch — then it’s actually easy to change our logging configuration such that we get a lot more useful information logged. And we can do this globally without sprinkling logging changes throughout our code.

Here’s one possible way to hack your logging config to do this:

Python log handler example

Then, in whatever code sets up your logging configuration, you can add this to the relevant handler like so:

handler.setFormatter(VerboseExceptionFormatter(log_locals_on_exception=True))

The MAX_LINE_LENGTH and MAX_VARS_LINES values are used to limit the amount of data we dump out. And so far we’re only logging the local variables from the innermost frame, on the assumption that those are the most likely to be useful.

Having deployed this logging configuration, our hypothetical traceback would look more like this:

Python traceback example, with locals

Aha. Now we can clearly see that some upstream service failed, and our code is failing to handle that. And we can take action immediately.

This technique is not a magic crystal ball, certainly; sometimes the important clues are elsewhere in the stack, or they’re in the current global scope. If we wanted to, it would be trivial to log locals and globals at every frame, just by grabbing the values of tb.tb_frame.f_locals and tb.tb_frame.f_globals during the while tb.tb_next loop. But that could get very verbose, and some of our services get hit very hard and log a lot, so we haven’t gone that far yet.

We have already used these enhanced log entries to quickly diagnose a couple of bugs in production, so we’re happy with this little tweak. Hopefully you will find it useful too.

If you’d like to get your hands directly on our code, we’re hiring.

Networking: Using Linux Traffic Control for Fun and Profit Loss Prevention

Here at bitly, we are big fans of data, tubes and especially tubes that carry data.

This is a story about asking tubes to carry too much data.

A physical hadoop cluster has been a significant part of bitly’s infrastructure and a core tool of the bitly Data Science and Ops/Infra teams for some time. Long enough that we needed to cycle in a new cluster, copy data, and fold the old into the new.

Branches were opened, work was done, servers provisioned and the new cluster was stood up. Time to take a step back from the story and flesh out some technical details:

  • bitly operates at a consequential scale of data: At the time of this migration the hadoop cluster was just over 150TB consumed disk space of compressed stream data, that is data that is the valuable output of our various applications after having been manipulated and expanded on by other applications.

  • bitly’s physical presence is collocated with our data center partner. There are three physical chassis classes (application, storage and database) racked together in contiguous cabinets in rows. At the time of this story each chassis had three physical 1Gb Ethernet connections (each logically isolated by VLANs), frontlink, backlink and lights-out (for out of band management of the server chassis). Each connection, after a series of cabinet specific patch panels and switches, connects to our core switches over 10Gb glass in a hub and spoke topology.

  • While bitly also operates at a consequential physical scale (hundreds of physical server chassis), we depend on our data center partner for network infrastructure and topology. This means that within most levels of the physical networking stack, we have severely limited control and visibility.

Back to the story:

The distcp tool bundled with hadoop allowed us to quickly copy data from one cluster to the other. Put simply, distcp tool creates a mapreduce job to shuffle data from one hdfs cluster to another, in a many to many node copy. Distcp was fast, which was good.

bitly broke, and the Ops/Infra team was very sad.

Errors and latent responses were being returned to website users and api clients. We discovered that services were getting errors from other services, database calls were timing out and even DNS queries internal to our network were failing. We determined that the copy had caused unforeseen duress on the network tubes, particularly the tubes of the network that carried traffic cross physical cabinets. However the information given to us by our data center partner only added to the confusion: no connection, not even cabinet to core, showed signs of saturation, congestion or errors.

We now had two conflicting problems: a need to continue making headway with the hadoop migration as well as troubleshooting and understanding our network issues.

We limited the number of mapreduce mappers that the distcp tool used to copy data between clusters, which artificially throttled throughput for the copies, allowing us to resume moving forward with the migration. Eventually the copies completed, and we were able to swap the new cluster in for the old.

The new cluster had more nodes, which meant that hadoop was faster.

The Data Science team was happy.

Unfortunately, with hadoop being larger and faster, more data was getting shipped around to more nodes during mapreduce workloads, which had the unintended effect of:

bitly broke, a lot. The Ops/Infra team was very sad.

The first response action was to turn hadoop off.

The Data Science team was sad.

A turned off hadoop cluster is bad (just not as bad as breaking bitly), so we time warped the cluster back to 1995 by forcing all NICs to re-negotiate at 100Mbps (as opposed to 1Gbps) using ethtool -s eth1 speed 100 duplex full autoneg on. Now we could safely turn hadoop on, but it was painfully slow.

The Data Science team was still sad.

In fact it was so slow and congested, that data ingestion and scheduled ETL/reporting jobs began to fail frequently, triggering alarms that woke up Ops/Infra team members up in the middle of the night.

The Ops/Infra team was still sad.

Because of our lack of sufficient visibility into the state of the network, working through triage and troubleshooting with our data center partner was going to be an involved and lengthy task. Something had to be done to get hadoop into a usable state, while protecting bitly from breaking.

Time to take another step back:

Some tools we have at bitly:

  • roles.json : A list of servers (app01, app02, userdb01, hadoop01 etc), roles (userdb, app, web, monitoring, hadoop_node etc), and the mapping of servers into roles (app01,02 -> app, hadoop01,02 -> hadoop_node etc).

  • $datacenter/jsons/* : A directory containing a json file per logical server, with attributes describing the server such as ip address, names, provisioning information and most importantly for this story; cabinet location.

  • Linux : Linux.

Since we could easily identify what servers do what things, where that server is racked and can leverage all the benefits of Linux, this was a solvable problem, and we got to work.

And the Ops/Infra team was sad.

Because Linux’s networking Traffic Control (tc) syntax was clunky and awkward and its documentation intimidating. After much swearing and keyboard smashing, perseverance paid off and working examples of tc magic surfaced. Branches were opened, scripts were written, deploys done, benchmarks run and finally some test nodes were left with the following:

$ tc class show dev eth1
class htb 1:100 root prio 0 rate 204800Kbit ceil 204800Kbit burst 1561b
    cburst 1561b
class htb 1:10 root prio 0 rate 819200Kbit ceil 819200Kbit burst 1433b 
    cburst 1433b
class htb 1:20 root prio 0 rate 204800Kbit ceil 204800Kbit burst 1561b 
    cburst 1561b

$ tc filter show dev eth1
filter parent 1: protocol ip pref 49128 u32 
filter parent 1: protocol ip pref 49128 u32 fh 818: ht divisor 1 
filter parent 1: protocol ip pref 49128 u32 fh 818::800 order 2048 key 
    ht 818 bkt 0 flowid 1:20 
    match 7f000001/ffffffff at 16
filter parent 1: protocol ip pref 49129 u32 
filter parent 1: protocol ip pref 49129 u32 fh 817: ht divisor 1 
filter parent 1: protocol ip pref 49129 u32 fh 817::800 order 2048 key 
    ht 817 bkt 0 flowid 1:10 
    match 7f000002/ffffffff at 16
filter parent 1: protocol ip pref 49130 u32 
filter parent 1: protocol ip pref 49130 u32 fh 816: ht divisor 1 
filter parent 1: protocol ip pref 49130 u32 fh 816::800 order 2048 key 
    ht 816 bkt 0 flowid 1:20 
    match 7f000003/ffffffff at 16
<snipped>

$ tc qdisc show
qdisc mq 0: dev eth2 root 
qdisc mq 0: dev eth0 root 
qdisc htb 1: dev eth1 root refcnt 9 r2q 10 default 100 
    direct_packets_stat 24

In plain English, there are three traffic control classes. Each class represents a logical group, to which a filter can be subscribed, such as:

class htb 1:100 root prio 0 rate 204800Kbit ceil 204800Kbit burst 1561b cburst 1561b

Each class represents a ceiling or throughput limit of outgoing traffic aggregated across all filters subscribed to that class.

Each filter is a specific rule for a specific ip (unfortunately each IP is printed in hex) , so the filter:

filter parent 1: protocol ip pref 49128 u32 
filter parent 1: protocol ip pref 49128 u32 fh 818: ht divisor 1 
filter parent 1: protocol ip pref 49128 u32 fh 818::800 order 2048 key 
    ht 818 bkt 0 flowid 1:20 
    match 7f000001/ffffffff at 16

can be read as “subscribe hadoop14 to the class 1:20” where “7f000001” can be read as the IP for hadoop14 and “flowid 1:20” is the class being subscribed to.

Finally there is a qdisc, which is more or less the active queue for the eth1 device. That queue defaults to placing any host that is not otherwise defined in a filter for a class, into the 1:100 class.

qdisc htb 1: dev eth1 root refcnt 9 r2q 10 default 100 direct_packets_stat 24

With this configuration, any host, hadoop or not, that is in the same cabinet as the host being configured gets a filter that is assigned to the “1:10” class, which allows up to ~800Mbps for the class as a whole. Similarly, there is a predefined list of roles that are deemed “roles of priority hosts”, which get a filter created on the same “1:100” rule. These are hosts that do uniquely important things, like running the hadoop namenode or jobtracker services, and also our monitoring hosts.

Any other hadoop host that is not in the same cabinet is attached to the “1:20” class, which is limited to a more conservative ~200Mbps class.

As mentioned before, any host not specified by a filter gets caught by the default class for the eth1 qdisc, which is “1:100”.

What does this actually look like? Here is a host that is caught by the catch all “1:100” rule:

[root@hadoop27 ~]# iperf -t 30 -c NONHADOOPHOST
------------------------------------------------------------
Client connecting to NONHADOOPHOST, TCP port 5001
TCP window size: 23.2 KByte (default)
------------------------------------------------------------
[  3] local hadoop27 port 35897 connected with NONHADOOPHOST port 5001
[ ID] Interval       Transfer     Bandwidth
[  3]  0.0-30.1 sec   735 MBytes   205 Mbits/sec

Now when connecting to another host in the same cabinet, or the “1:10” rule :

[root@hadoop27 ~]# iperf -t 30 -c CABINETPEER
------------------------------------------------------------
Client connecting to CABINETPEER, TCP port 5001
TCP window size: 23.2 KByte (default)
------------------------------------------------------------
[  3] local hadoop27 port 39016 connected with CABINETPEER port 5001
[ ID] Interval       Transfer     Bandwidth
[  3]  0.0-30.0 sec  2.86 GBytes   820 Mbits/sec

Now what happens when connecting to two servers that match the “1:10” rule?

[root@hadoop27 ~]# iperf -t 30 -c CABINETPEER1
------------------------------------------------------------
Client connecting to CABINETPEER1, TCP port 5001
TCP window size: 23.2 KByte (default)
------------------------------------------------------------
[  3] local hadoop27 port 39648 connected with CABINETPEER1 port 5001
[ ID] Interval       Transfer     Bandwidth
[  3]  0.0-30.0 sec  1.47 GBytes   421 Mbits/sec

[root@hadoop27 ~]# iperf -t 30 -c CABINETPEER2
------------------------------------------------------------
Client connecting to 10.241.28.160, TCP port 5001
TCP window size: 23.2 KByte (default)
------------------------------------------------------------
[  3] local hadoop27 port 38218 connected with CABINETPEER2 port 5001
[ ID] Interval       Transfer     Bandwidth
[  3]  0.0-30.0 sec  1.43 GBytes   408 Mbits/sec

So the traffic got halved? Sounds about right.

Even better, trending the data was relatively easy by mangling the stats output to our trending services:

$ /sbin/tc -s class show dev eth1 classid 1:100
class htb 1:100 root prio 0 rate 204800Kbit ceil 204800Kbit 
    burst 1561b cburst 1561b 
Sent 5876292240 bytes 41184081 pkt (dropped 0, overlimits 0 requeues 0) 
rate 3456bit 2pps backlog 0b 0p requeues 0 
lended: 40130273 borrowed: 0 giants: 0
tokens: 906 ctokens: 906

After testing, we cycled through hadoop hosts, re-enabling their links to 1Gb after applying the traffic control roles. With deploys done, hadoop was use-ably performant.

The Data Science team was happy.

The Ops/Infra team could begin tackling longer term troubleshooting and solutions while being able to sleep at night, knowing that bitly was not being broken.

The Ops/Infra team was happy.

Take aways:

  • In dire moments: your toolset for managing your environment is as important as the environment itself. Because we already had the toolset available to holistically control the environment, we were able to dig ourselves out of the hole almost as quickly as we had fallen into it.

  • Don’t get into dire moments: Understand the environment that you live in. In this case, we should have had a better understanding and appreciation for the scope of the hadoop migration and its possible impacts.

  • Linux TC is a high cost, high reward tool. It was almost certainly written by people with the very longest of beards, and requires time and patience to implement. However we found it to be an incredibly powerful tool that helped save us from ourselves.

  • Linux: Linux

EOL

This story is a good reminder of the "Law of Murphy for devops". Temporary solutions like those in this story afforded us the time to complete troubleshooting of our network and implement permanent fixes. We have since unthrottled hadoop and moved it to its own dedicated network, worked around shoddy network hardware to harden our primary network and much more. Stay tuned.

Deploying all day - Sean O Connor.mp4 from devopsdays on Vimeo.

Recently we gave a talk at Devopsdays Portland about how we deploy all day at bitly without breaking the Internet. Some of the topics we covered include automation, state management, systems architecture, distributed messaging, and peer review. Check it out!

Tenets for Working Remotely

For the past 15 years, I’ve worked as a remote employee. I currently live in Denver and work for Manhattan-based Bitly. Before that, I was on a Virginia-based team at AOL for 11 years, working from Philly, Dallas, Philly again, New York and then Denver. During that time, I built up some personal tenets for working remotely which have served me well. They are in some ways simple, but when applied consistently, I have found them to foster quite a positive experience for those working with me as a remote individual.

Here they are, captured for others who might benefit from them:

1) Make sure your online presence is accurate. If you’re online on IM, you should really be there. If you’re away from your desk, put up your away message or sign off. You should be almost as easy to reach as people in the office.

2) Call in 5 minutes early to every meeting. When people join a conference call, you should always already be there, on the call, ready to go. You want people to react this way about you, “Of course Michael’s there. He’s always there.” With Skype, this translates simply to, “Answer the moment they call.” With Google Hangouts, it applies directly — be the first one to join.

3) Speak up on calls. Make your presence known and heard. If you never have anything to say, why are you there?

4) If you can’t hear, tell them you can’t hear. If what people are saying has any importance, you should be able to hear it. Same goes for seeing on video calls. If you can’t see what’s being discussed, try to get it remedied.

5) Be productive. If there’s ever a question that you’re not getting your work done as expected, you’ll be the low-hanging fruit precisely because you’re remote. I survived layoff rounds at AOL practically every 6 months for years. I was never concerned, because I knew I always delivered.

6) Make regular trips to the mothership. In-person contact is still needed periodically. I have found about every 6 to 8 weeks to be a pretty good interval.

7) Make your core hours match the company’s core hours. This isn’t always possible, depending on the time difference, but you should strive to achieve at least 3 or 4 hours of overlap. When I worked with a QA group in India for a time, where the time difference was 12.5 hours, we all adjusted our work hours so we could achieve a 3-hour overlap. Before we did that, question-and-answer cycles often took literally 2 days. To answer one question!

8) Figure out what you need others to do differently. You may need to make people aware that working with a remote individual may take some work on their part as well. They may need to be reminded to take extra time to think, “Does Michael need to know about this decision we just made while brewing coffee?” If you are invited to a meeting, is there a call-in number set up? If Skype, who is calling whom?

I find that it doesn’t much matter where I’m working — which tends either to be home or Starbucks — as long as I am able to follow the above tenets. That means if connectivity is important for a given day (which it usually is), it’s my responsibility to make sure I have access to reliable connectivity. If I’m supposed to do a 4-hour planning session, home is the right choice. Solid coding session? Starbucks affords just the right amount of ambient noise, random distraction and caffeine.

It’s worth mentioning that being a remote employee is certainly not for everyone. It takes discipline. You also definitely miss out on at least some of the camaraderie of the in-office experience, which at Bitly is saying something. I haven’t once, for instance, partaken in Drink Cart Friday. This is a problem. Perhaps I need to adjust my travel times….

Oh, and we’re hiring. (Are we hiring remotes? I guess that depends on your tenets for working remotely.)

Z Proxy

Here is a tale of how we leverage redundant datacenters, redundant code, and multi-tiered fallbacks in the quest for uptime.

But Why?

High availability is important for any site operating at scale, but at bitly it is particularly important; people expect bitly links to work, no matter what. We have enterprise customers who rely on them for key metrics, users who share them on social networks, and websites with custom short domains that trust us to serve requests with their name on them. A bitly link not working in any of these scenarios would make our users look bad, so it is something we take very seriously.

No matter how redundant, distributed, and fault tolerant your main infrastructure is, things can always go wrong. Recently Google Apps and Search, probably the most distributed infrastructure in existence, went down. There are unknowns everywhere, and ultimately you have to plan for any part of your infrastructure breaking for unknown reasons. Under failure, a distributed system should degrade gracefully, not suddenly. This is why we created Z Proxy.

So What is Z Proxy?

Z Proxy is an application that serves decodes (this is what we call redirecting from a short bitly link to its long URL, and what happens every time you click on a bitly link) without relying on any other part of the bitly infrastructure. This means that it does not use our primary database of urls, or any of our other servers, to do lookups. So how does it work?

How it Works

Z Proxy is essentially a self contained wrapper around S3, written in Go. When all of bitly is running properly, every time a link is shortened, a message is put on NSQ, which a queuereader later grabs. A queuereader then writes the short and long urls into S3 so that Z Proxy can perform lookups against S3 by short url, get the long url, and serve a 301 or 302 redirect. To the browser, nothing different happened.

There are multiple host running Z Proxy in EC2. This location provides proximity to S3, high availability, and most importantly different availability from the main decode infrastructure, which exists outside of AWS. EC2 and S3 can have problems, but the chance of this happening at the same time as our other datacenter is extremely low, and most importantly gives us flexibility.

Each host has a local memcached instance used to cache the slow S3 lookups. Usually there are many more steps involved with decodes, but Z Proxy skips most that are not critically essential, such as spam checking. Because it has fewer features than the main decode path, and because it is written in optimized Go, this is a lightweight way to serve our decodes (thousands a second) in a failure scenario. We keep sufficient capacity on these systems to be ready for a failure at any time.

Metrics & Background Processing

Because we use NSQ, even if the primary infrastructure is down, hosts running Z Proxy (we call these “lastresort” hosts) can create and queue messages corresponding to each decode request. That means when everything is back up and running, the primary infrastructure will process messages from these hosts. Info+ pages will be updated with clicks that happened when everything was down, ratelimits will be adjusted, realtime will find new trends based on these clicks, and more.

Z Proxy also records metrics for internal use. It sends data to graphite recording response times, types of requests, etc., but of course since it makes no assumptions about anything in our infrastructure working, graphite included, it also aggregates some stats locally.

Failover Plan

Normally our DNS points to our load balancers, which send requests off to frontend webservers. Nginx on each frontend webserver is configured to handle local timeouts and failures by transparently retrying the request against a lastresort host. Nginx on each lastresort host then sends the request to one of a few local Z Proxy instances. This is great because it allows failovers on a per request basis, but if our frontend servers or load balancers are taken out (ie: we loose datacenter connectivity), it doesn’t help. In this case, we can point DNS for all of our domains directly at the lastresort hosts.

But What if Z Proxy Doesn’t Work?

The trust-nobody approach of Z Proxy makes it very stable, but ultimately it could still break, so even the Go app isn’t enough.

To have an additional level of safety, the S3 key is the short link, but the value isn’t actually the long url itself. The S3 value is an HTML blob containing a meta refresh to the destination url. This allows Z Proxy to parse out the long url, but also allows nginx on lastresort hosts to proxy the 200 responses directly from S3 if Z Proxy goes down. This multi-tier approach to failures gives us increasing levels of availability with decreasing levels of features, metrics, performance, and consistency.

EOL

This system gives us confidence that we can serve decodes with high availability, and in the event of an outage or failure, it gives us options for where to send traffic. Because our link resolution dataset is immutable, S3 is an invaluable tool. While we might take slightly different approaches with dynamic data, providing layers of fallbacks from S3 and transparent retrying across datacenters is simple and effective at providing high availability.

Fun Facts

  • Because we have tens of billions of links, this makes our Z Proxy bucket about 1.25% of all S3 objects (about 4TB in size).
  • The main decode path is written in Tcl, a strange and interesting language that I had never seen before working at bitly.

Building NSQ Client Libraries

Brace yourself, this is a long one.

The following guide was originally intended for client library developers to describe in detail all the important features and functionality we expected in an NSQ client library.

While writing it we began to realize that it had value beyond client library developers. It incorporates a comprehensive analysis of most of the capabilities of NSQ (both client and server) and is therefore interesting and useful for end-users as well (or anyone using or interested in infrastructure messaging platforms).

If you need some background on NSQ please see our original blog post or its follow up, spray some NSQ on it.

Intro

NSQ’s design pushes a lot of responsibility onto client libraries in order to maintain overall cluster robustness and performance.

This guide attempts to outline the various responsibilities well-behaved client libraries need to fulfill. Because publishing to nsqd is trivial (just an HTTP POST to the /put endpoint), this document focuses on consumers.

By setting these expectations we hope to provide a foundation for achieving consistency across languages for NSQ users.

Overview

  1. Configuration
  2. Discovery (optional)
  3. Connection Handling
  4. Feature Negotiation
  5. Data Flow / Heartbeats
  6. Message Handling
  7. RDY State
  8. Backoff

Configuration

At a high level, our philosophy with respect to configuration is to design the system to have the flexibility to support different workloads, use sane defaults that run well “out of the box”, and minimize the number of dials.

A client subscribes to a topic on a channel over a TCP connection to nsqd instance(s). You can only subscribe to one topic per connection so multiple topic consumption needs to be structured accordingly.

Using nsqlookupd for discovery is optional so client libraries should support a configuration where a client connects directly to one or more nsqd instances or where it is configured to poll one or more nsqlookupd instances. When a client is configured to poll nsqlookupd the polling interval should be configurable. Additionally, because typical deployments of NSQ are in distributed environments with many producers and consumers, the client library should automatically add jitter based on a random % of the configured value. This will help avoid a thundering herd of connections. For more detail see Discovery.

An important performance knob for clients is the number of messages it can receive before nsqd expects a response. This pipelining facilitates buffered, batched, and asynchronous message handling. By convention this value is called max_in_flight and it effects how RDY state is managed. For more detail see RDY State.

Being a system that is designed to gracefully handle failure, client libraries are expected to implement retry handling for failed messages and provide options for bounding that behavior in terms of number of attempts per message. For more detail see Message Handling.

Relatedly, when message processing fails, the client library is expected to automatically handle re-queueing the message. NSQ supports sending a delay along with the REQ command. Client libraries are expected to provide options for what this delay should be set to initially (for the first failure) and how it should change for subsequent failures. For more detail see Backoff.

Most importantly, the client library should support some method of configuring callback handlers for message processing. The signature of these callbacks should be simple, typically accepting a single parameter (an instance of a “message object”).

Discovery

An important component of NSQ is nsqlookupd, which provides a discovery service for consumers to locate producers of a given topic at runtime.

Although optional, using nsqlookupd greatly reduces the amount of configuration required to maintain and scale a large distributed NSQ cluster.

When a client uses nsqlookupd for discovery, the client library should manage the process of polling all nsqlookupd instances for an up-to-date set of nsqd producers and should manage the connections to those producers.

Querying an nsqlookupd instance is straightforward. Perform an HTTP request to the lookup endpoint with a query parameter of the topic the client is attempting to discover (i.e. /lookup?topic=clicks). The response format is JSON:

Example JSON Response From nsqlookupd

The broadcast_address and tcp_port should be used to connect to an nsqd producer. Because, by design, nsqlookupd instances don’t coordinate their lists of producers, the client library should union the lists it received from all nsqlookupd queries to build the final list of nsqd producers to connect to. The broadcast_address:tcp_port combination should be used as the unique key for this union.

A periodic timer should be used to repeatedly poll the configured nsqlookupd so that clients will automatically discover new producers. The client library should automatically initiate connections to all newly found nsqd producers.

When client library execution begins it should bootstrap this polling process by kicking off an initial set of requests to the configured nsqlookupd instances.

Connection Handling

Once a client has an nsqd producer to connect to (via discovery or manual configuration), it should open a TCP connection to broadcast_address:port. A separate TCP connection should be made to each nsqd for each topic the client wants to consume.

When connecting to an nsqd instance, the client library should send the following data, in order:

  1. the magic identifier
  2. an IDENTIFY command (and payload) and read/verify response
  3. a SUB command (specifying desired topic) and read/verify response
  4. an initial RDY count of 1 (see RDY State).

(low-level details on the protocol are available in the spec)

Reconnection

Client libraries should automatically handle reconnection as follows:

  • If the client is configured with a specific list of nsqd instances, reconnection should be handled by delaying the retry attempt in an exponential backoff manner (i.e. try to reconnect in 8s, 16s, 32s, etc., up to a max).

  • If the client is configured to discover instances via nsqlookupd, reconnection should be handled automatically based on the polling interval (i.e. if a client disconnects from an nsqd, the client library should only attempt to reconnect if that instance is discovered by a subsequent nsqlookupd polling round). This ensures that clients can learn about producers that are introduced to the topology and ones that are removed (or failed).

Feature Negotiation

The IDENTIFY command can be used to set nsqd side metadata, modify client settings, and negotiate features. It satisfies two needs:

  1. In certain cases a client would like to modify how nsqd interacts with it (currently this is limited to modifying a client’s heartbeat interval but you can imagine this could evolve to include enabling compression, TLS, output buffering, etc.)
  2. nsqd responds to the IDENTIFY command with a JSON payload that includes important server side configuration values that the client should respect while interacting with the instance.

After connecting, based on the user’s configuration, a client library should send an IDENTIFY command (the body of an IDENTIFY command is a JSON payload), e.g:

Example IDENTIFY Command JSON Payload

The feature_negotiation field indicates that the client can accept a JSON payload in return. The short_id and long_id are arbitrary text fields that are used by nsqd (and nsqadmin) to identify clients. heartbeat_interval configures the interval between heartbeats on a per-client basis.

The nsqd will respond OK if it does not support feature negotiation (introduced in nsqd v0.2.20+), otherwise:

Example IDENTIFY Response JSON Payload

More detail on the use of the max_rdy_count field is in the RDY State section.

Data Flow and Heartbeats

Once a client is in a subscribed state, data flow in the NSQ protocol is asynchronous. For consumers, this means that in order to build truly robust and performant client libraries they should be structured using asynchronous network IO loops and/or “threads” (the scare quotes are used to represent both OS-level threads and userland threads, like coroutines).

Additionally clients are expected to respond to periodic heartbeats from the nsqd instances they’re connected to. By default this happens at 30 second intervals. The client can respond with any command but, by convention, it’s easiest to simply respond with a NOP whenever a heartbeat is received. See the protocol spec for specifics on how to identify heartbeats.

A “thread” should be dedicated to reading data off the TCP socket, unpacking the data from the frame, and performing the multiplexing logic to route the data as appropriate. This is also conveniently the best spot to handle heartbeats. At the lowest level, reading the protocol involves the following sequential steps:

  1. read 4 byte big endian uint32 size
  2. read size bytes data
  3. unpack data
  4. profit
  5. goto 1

A Brief Interlude on Errors

Due to their asynchronous nature, it would take a bit of extra state tracking in order to correlate protocol errors with the commands that generated them. Instead, we took the “fail fast” approach so the overwhelming majority of protocol-level error handling is fatal. This means that if the client sends an invalid command (or gets itself into an invalid state) the nsqd instance it’s connected to will protect itself (and the system) by forcibly closing the connection (and, if possible, sending an error to the client). This, coupled with the connection handling mentioned above, makes for a more robust and stable system.

The only errors that are not fatal are:

  • E_FIN_FAILED - The client tried to send a FIN command for an invalid message ID.
  • E_REQ_FAILED - The client tried to send a REQ command for an invalid message ID.
  • E_TOUCH_FAILED - The client tried to send a TOUCH command for an invalid message ID.

Because these errors are most often timing issues, they are not considered fatal. These situations typically occur when a message times out on the nsqd side and is re-queued and delivered to another client. The original recipient is no longer allowed to respond on behalf of that message.

Message Handling

When the IO loop unpacks a data frame containing a message, it should route that message to the configured handler for processing.

The nsqd producer expects to receive a reply within its configured message timeout (default: 60 seconds). There are a few possible scenarios:

  1. The handler indicates that the message was processed successfully.
  2. The handler indicates that the message processing was unsuccessful.
  3. The handler decides that it needs more time to process the message.
  4. The in-flight timeout expires and nsqd automatically re-queues the message.

In the first 3 cases, the client library should send the appropriate command on the client’s behalf (FIN, REQ, and TOUCH respectively).

The FIN command is the simplest of the bunch. It tells nsqd that it can safely discard the message. FIN can also be used to discard a message that you do not want to process or retry.

The REQ command tells nsqd that the message should be re-queued (with an optional parameter specifying the amount of time to defer additional attempts). If the optional parameter is not specified by the client, the client library should automatically calculate the duration in relation to the number of attempts to process the message (a multiple is typically sufficient). The client library should discard messages that exceed the configured max attempts. When this occurs, a user-supplied callback should be executed to notify and enable special handling.

If the message handler requires more time than the configured message timeout, the TOUCH command can be used to reset the timer on the nsqd side. This can be done repeatedly until the message is either FIN or REQ, up to the nsqd producer’s configured max timeout. Client libraries should never automatically TOUCH on behalf of the client.

If the nsqd instance receives no response, the message will time out and be automatically re-queued for delivery to an available client.

Finally, a property of each message is the number of attempts. Client libraries should compare this value against the configured max and discard messages that have exceeded it. When a message is discarded there should be a callback fired. Typical default implementations of this callback might include writing to a directory on disk, logging, etc. The user should be able to override the default handling.

RDY State

Because messages are pushed from nsqd to clients we needed a way to manage the flow of data in userland rather than relying on low-level TCP semantics. A client’s RDY state is NSQ’s flow control mechanism.

As outlined in the configuration section, a consumer is configured with a max_in_flight. This is a concurrency and performance knob, e.g. some downstream systems are able to more-easily batch process messages and benefit greatly from a higher max-in-flight.

When a client connects to nsqd (and subscribes) it is placed in an initial RDY state of 0. No messages will be sent to the client.

Client libraries have a few responsibilities:

  1. bootstrap and evenly distribute the configured max_in_flight to all connections.
  2. never allow the aggregate sum of RDY counts for all connections (total_rdy_count) to exceed the configured max_in_flight.
  3. never exceed the per connection nsqd configured max_rdy_count.
  4. expose an API method to reliably indicate message flow starvation

1. Bootstrap and Distribution

There are a few considerations when choosing an appropriate RDY count for a connection (in order to evenly distribute max_in_flight):

  • the # of connections is dynamic, often times not even known in advance (ie. when discovering producers via nsqlookupd).
  • max_in_flight may be lower than your number of connections

To kickstart message flow a client library needs to send an initial RDY count. Because the eventual number of connections is often not known ahead of time it should start with a value of 1 so that the client library does not unfairly favor the first connection(s).

Additionally, after each message is processed, the client library should evaluate whether or not it’s time to update RDY state. An update should be triggered if the current value is 0 or if it is below ~25% of the last value sent.

The client library should always attempt to evenly distribute RDY count across all connections. Typically, this is implemented as max_in_flight / num_conns.

However, when max_in_flight &lt; num_conns this simple formula isn’t sufficient. In this state, client libraries should perform a dynamic runtime evaluation of producer “liveness” by measuring the duration of time since it last received a message for a connection. After a configurable expiration, it should re-distribute whatever RDY count is available to a new (random) set of producers. By doing this, you guarantee that you’ll (eventually) find producers with messages. Clearly this has a latency impact.

2. Maintaining max_in_flight

The client library should maintain a ceiling for the maximum number of messages in flight for a given consumer. Specifically, the aggregate sum of each connection’s RDY count should never exceed the configured max_in_flight.

Below is example code in Python to determine whether or not the proposed RDY count is valid for a given connection:

send_ready.py

3. Producer Max RDY Count

Each nsqd is configurable with a --max-rdy-count (see feature negotiation for more information on the handshake a client can perform to ascertain this value). If the client sends a RDY count that is outside of the acceptable range its connection will be forcefully closed. For backwards compatibility, this value should be assumed to be 2500 if the nsqd instance does not support feature negotiation.

4. Message Flow Starvation

Finally, the client library should provide an API method to indicate message flow starvation. It is insufficient for clients (in their message handlers) to simply compare the number of messages they have in-flight vs. their configured max_in_flight in order to decide to “process a batch”. There are two cases when this is problematic:

  1. When clients configure max_in_flight &gt; 1, due to variable num_conns, there are cases where max_in_flight is not evenly divisible by num_conns. Because the contract states that you should never exceed max_in_flight, you must round down, and you end up with cases where the sum of all RDY counts is less than max_in_flight.
  2. Consider the case where only a subset of producers have messages. Because of the expected even distribution of RDY count, those active producers only have a fraction of the configured max_in_flight.

In both cases, a client will never actually receive max_in_flight # of messages. Therefore, the client library should expose a method is_starved that will evaluate whether any of the connections are starved, as follows:

is_starved.py

The is_starved method should be used by message handlers to reliably identify when to process a batch of messages.

Backoff

The question of what to do when message processing fails is a complicated one to answer. The message handling section detailed client library behavior that would defer the processing of failed messages for some (increasing) duration of time. The other piece of the puzzle is whether or not to reduce throughput. The interplay between these two pieces of functionality is crucial for overall system stability.

By slowing down the rate of processing, or “backing off”, the consumer allows the downstream system to recover from transient failure. However, this behavior should be configurable as it isn’t always desirable, such as situations where latency is prioritized.

Backoff should be implemented by sending RDY 0 to the appropriate producers, stopping message flow. The duration of time to remain in this state should be calculated based on the number of repeated failures (exponential). Similarly, successful processing should reduce this duration until the reader is no longer in a backoff state.

While a reader is in a backoff state, after the timeout expires, the client library should only ever send RDY 1 regardless of max_in_flight. This effectively “tests the waters” before returning to full throttle. Additionally, during a backoff timeout, the client library should ignore any success or failure results with respect to calculating backoff duration (i.e. it should only take into account one result per backoff timeout).

client flow

Bringing It All Together

Distributed systems are fun.

The interactions between the various components of an NSQ cluster work in concert to provide a platform on which to build robust, performant, and stable infrastructure. We hope this guide shed some light as to how important the client’s role is.

In terms of actually implementing all of this, we treat pynsq and go-nsq as our reference codebases. The structure of pynsq can be broken down into three core components:

  • Message - a high-level message object, which exposes stateful methods for responding to the nsqd producer (FIN, REQ, TOUCH, etc.) as well as metadata such as attempts and timestamp.
  • Connection - a high-level wrapper around a TCP connection to a specific nsqd producer, which has knowledge of in flight messages, its RDY state, negotiated features, and various timings.
  • Reader - the front-facing API a user interacts with, which handles discovery, creates connections (and subscribes), bootstraps and manages RDY state, parses raw incoming data, creates Message objects, and dispatches messages to handlers.

We’re happy to help support anyone interested in building client libraries for NSQ. We’re looking for contributors to continue to expand our language support as well as flesh out functionality in existing libraries. The community has already open sourced client libraries for 7 languages:

If you’re interested in hacking on NSQ, check out the GitHub repo. We also publish binary distributions for Linux and Darwin to make it easy to get started.

If you have any specific questions, feel free to ping me on twitter @imsnakes.

Finally, a huge thank you to the crew at bitly. @mccutchen and @ploxiln for their tireless review and @jehiah my partner in crime on NSQ.

Speeding things up with Redshift

Recently we’ve started to experiment with using Redshift, Amazon’s new data warehousing service. More specifically, we’re using it to speed up and expand our ad hoc data analysis.

The Challenge

bitly sees billions of clicks and shortens each month. Often we have various questions about the data generated from this activity. Sometimes these questions are driven by business needs (how much traffic do we see from a potential enterprise customer), sometimes they are more technically driven (how much traffic will a new sub-system need to deal with), and sometimes we like to just have fun (what are the top trashy celeb stories this week).

Unfortunately, when working with that volume of data it can be pretty difficult to do much of anything quickly. Pre-Redshift, all of these questions were answered by writing map-reduce jobs to be run on our Hadoop cluster or on Amazon’s EMR.

Whenever we wanted to answer a question with our data, the process would look something like this:

  1. Write map-reduce job in Python
  2. Run it on some local test data
  3. Fix bugs.
  4. Run it on the Hadoop cluster
  5. Wait 20-30 minutes for results
  6. Get an error back from Hadoop
  7. Dig through the logs to find the error.
  8. GOTO 3

This is clearly not ideal when all you want to do is get a simple count.

For a lot of the work we do Hadoop + Python make for an awesome combination, but for these ad hoc aggregation queries they’re very blunt instruments. In both cases, they are general purpose tools that are super flexible, but slow and difficult to use for this specific use case.

Redshift, on the other hand, is specifically built and optimized for doing aggregation queries over large sets of data. When we want to answer a question with Redshift, we just write a SQL query and get an answer within a few minutes—if not seconds.

Overall, our experience with Redshift has been a positive one but we have run into some gotchas that we’ll get into below.

The Good News

User Experience

From a user perspective, we’re really happy with Redshift. Any one of our developers or data scientists just need to write a SQL query and they have an answer to their question in less than 5 minutes. Moving from our old hadoop based workflow to an interactive console session with Redshift is a major improvement.

Additionally, since much of the user facing bits of Redshift are based on PostgreSQL there is a large ecosystem of mature, well-documented tools and libraries for us to take advantage of.

Finally, while it can be a bit slow at times, we’ve been very impressed with the web management console Amazon provides with Redshift. For a 1.0 product, the console is comprehensive and offers much more information than we expected it to.

Performance

For our current use case of ad hoc research queries, Redshift’s performance is adequate. Most queries return a response in less than five minutes and we rarely have many users executing queries concurrently.

That being said, we have done some experimentation with competing products (e.g. Vertica) and have seen better performance out of those tools. This is especially true for more complex queries that benefit from projections/secondary indexes and situations where the cluster’s resources are under contention.

Documentation

Just like the rest of AWS, Amazon provides reasonably comprehensive and thorough documentation. For everything that is directly exposed to us as users (e.g. loading operations, configuration params, etc) we are very happy with the documentation. The only places where we felt we wanted more information were those where Amazon makes things “just work”. Most significantly we would like to see more details about what exactly happens when a node in the cluster fails and how the cluster is expected to behave in that state.

Cost

We wouldn’t go so far as to call Redshift cheap, but compared to many competitors it is pretty cost effective. The biggest gotcha here is that while the simple model for scaling Redshift clusters and tuning performance within a cluster is nice as a user, it does mean that you have a bit of a one-size-fits all situation.

In our case we are computationally and I/O constrained so we’re paying for a bunch of storage capacity and memory that we don’t use. At our current scale, things work out okay but as we continue to grow it may make sense to take advantage of something else that is more flexible in terms of both hardware and tuning.

The Bad News

Data Loading

We had to spend a lot of time getting our data into Redshift. This is partially our fault since our dataset is not the cleanest in the world, but overall this is the place that we felt the most pain from an immature tool chain.

The majority of bitly’s at-rest data is stored in line-oriented (i.e. one JSON blob per line) JSON files. The primary way to load data into Redshift is to use the COPY command to point the cluster at a pile of CSV or TSV files stored on S3. Clearly we needed to build out some kind of tool chain to transform our JSON logs into flat files.

Initially, we just wrote a simple Python script that would do the transformation. Unfortunately, we quickly discovered that this simple approach would be too slow. We estimated that it would have taken a month to process and load all the data we wanted in Redshift.

Next, we realized that we had a tool for easily doing highly parallelized, distributed text processing: Hadoop. Accordingly, we re-worked our quick Python script into a hadoop job to transform our logs in a big batch. Since we already keep a copy of our raw logs in S3, EMR proved to be a great tool for this

Overall this process worked well but we did still run into a few gotchas loading the flattened data into Redshift:

  • Redshift only supports a subset of UTF-8, specifically characters can only be 3 bytes long. Amazon does mention this in their docs, but it still bit us a few times.
  • varchar field lengths are in terms of bytes, not characters. In Python, byte strings expose these as one and the same. Unicode strings on the other hand do not. It took us a little while to realize this was happening and to get our code setup to do byte length truncations without truncating mid-characater in unicode strings. To get an idea of how we went about doing unicode aware truncation, check out this relevant stack overflow thread.
  • Moving floats between different systems always has some issues. In Python the default string output of small float values is scientific notation (.332e-8). Unfortunately Redshift doesn’t recognize this format so we needed to force our data prep job to always output floats in decimal format.

Now that we have a large body of data loaded into Redshift, we’re working on building out tooling based on NSQ to do our data prep work in a streaming fashion that should allow us to easily do smaller incremental updates.

In the end, we worked through our data loading issues with Redshift but it was one of the more acute pain points we encountered. From our conversations with Amazon, they’re definitely aware of this and we’re interested to see what they’ll come out with, but for now the provided tooling is pretty limited.

Availability

Long term, there are a number of periodic and online tasks that we’re thinking about using a tool like Redshift for. Unfortunately, as things stand today we would not be comfortable relying on Redshift as a highly available system.

Currently, if any node within a Redshift cluster fails, all outstanding queries will fail and Amazon will automatically start replacing the failed node. In theory this recovery should happen very quickly. The cluster will be available for querying as soon as the replacement node is added, but performance on the cluster will be degraded while data is restored on to the new node.

At this point, we have no data on how well this recovery process works in the real world. Additionally, we have concerns about how well this process will work when there are larger issues happening within an availability zone or region. Historically there are a number of cases where issues within one Amazon service (e.g. EBS) have cascaded into other services leading to long periods of unavailability or degradation.

Until there’s a significant track record behind a system like this, we’re hesitant to trust anything that will “automatically recover”.

There is the option of running a mirrored Redshift cluster in a different AZ or region, but that gets expensive fast. Additionally, we’d have to build out even more tooling to make sure those two clusters stay in sync with each other.

Limited Feature Set

Redshift is very impressive feature-wise for a 1.0 product. That being said, a number of the competing products have been around for a while and offer some major features that Reshift lacks.

The biggest missing feature for us with Redshift is some kind of secondary indexing or projections. Right now, if you sort or filter by any column other than the SORTKEY, Redshift will do a full table scan. Technically you could create a copy of that table with a different sort key but then it would become your problem to keep those tables in sync and to query the right table at the right time.

Some other “missing” features include tools for working with time series data, geospatial query tools, advanced HA features, and more mature data loading tools.

The Bottom Line

Redshift is great for our needs today, but we’ll see if Amazon’s development keeps up as our needs change going forward. Given Amazon’s impressive track record for quickly iterating on and improving products we’re hopeful, but we do have our eyes on competing products as our use of data warehousing tools matures.

by Sean O’Connor