Adventures in Optimizing Text Processing

Lessons learned while post-processing 1.75 billion lines of Hadoop output.

The Problem

Recently, I encountered a problem. I had a nightly Hadoop job running on EMR that churned over the past 30 days’ worth of Bitly redirect data in order to run reach analysis pertaining to about 1000 of our paid accounts. This job resulted in 175 gzipped “part” files, each containing at least 10 million lines of data. I needed to collate that data after the Hadoop job ran.

> ls part-*.gz
part-0000.gz
...
part-0174.gz

> zcat part-0000.gz | wc -l
10000000

The Hadoop output data inside the part files consisted of things like this:

"3,g,05-02,12SIMV6" 329
"175,geo,05,US,GA,Atlanta"  9987
"10,phrase,05,egg foo young"    1093
"11,n_clicks,05"    393999

Those were comma-delimited keys with the following structure and a count:

"[ACCOUNT_ID],[METRIC_TYPE],[DATE],[VALUE]"    COUNT

The challenge was this: How do I efficiently separate out this data by ACCOUNT_ID and METRIC_TYPE? That is, I wanted one file per ACCOUNT_ID-METRIC_TYPE combination.

First, Look on the Shelf

Like many people churning through volumes of data, we make use of the the mr_job python package for our Hadoop processing. At first I thought this was a no-brainer: “I’ll use the oddjob plugin. Yay, a solution already exists!” The plugin’s description was tailor-made for me:

"oddjob.MultipleJSONOutputFormat - Writes to the directories specified by the first element in the key" — https://github.com/jblomo/oddjob

Wrong.

  • oddjob plugin wouldn’t run at all on our Hadoop cluster.
  • oddjob plugin wouldn’t run consistently on EMR
  • This approach resulted in 890 x 175 x 5 = ~800K part files. To scp 800K files from EMR is a nightmare of a long time.

Secondary Hadoop Jobs

After days of struggling with oddjob, I cut bait on it and looked at running a set of secondary Hadoop jobs, using the output from the first Hadoop job as input to the second ones. Something like this:

for account_id in $account_ids
do
    run_emr_job_to_extract $account_id
done

Even if each job only took one minute (which it wouldn’t), 890 mins == 14 hours. That was no good.

On to Text Processing

Here’s where I started putting the bash time command and python timeit to work to try to whittle down to an approach that was viable.

zgrep

First I tried at the most straightforward approach I could think of — zgrep.

for account_id in $account_ids
do
    zgrep '^"$account_id,' part-*gz > $account_id.txt
done

That took 11.5 mins per account. 11.5 mins * 890 accounts = 170 hours

Blah. No.

zcat | awk

Next, I played around with zcat piped to awk. My final version of that was something akin to this:

zcat part-*gz | awk -F'[,\"]' '
{
    print >> $2"-"$3".txt"
}'

This approach seemed reasonably concise but it still took 15 hours to run. So yeah, no.

The problem with this approach is that it results in way more syscalls than is necessary. If you think about it, that append operator (>>) opens, writes, and closes the file being appended to every time you call it. Yikes, that’s 5.2 billion syscalls — three for each line of data!

Onward.

Python-Based Solutions

At this point, it was seeming like a bash solution wasn’t going to do it, so I ventured back into python. This was the concept:

import gzip
for part_file in part_files:
    with gzip.open(part_file, 'rb') as f:
        for line in f:
           # 1. parse line for account_id and metric_type
           # 2. write to appropriate file for account_id and metric_type

Before I even get into that second for loop, let’s consider gzip.open(). Let’s consider it, and then let’s ditch it.

Python gzip — Oh the Pain!

gzip.open() on those 10-million-line files was as slow as molasses in January. My first pass at an all-python solution took 15 hours. Much of that was spent in gzip.open(). Look at this:

> cat test-gzip.py
import gzip
f = gzip.open('10-million-line-file.gz')
for line in f:
    pass
f.close()

> time python test-gzip.py;  # on an m1.large
real    3m7.687s
user    3m6.844s
sys     0m0.068s

That’s 3.1 minutes per file. 3.1 mins * 175 files = 9 hours. JUST TO READ THE FILES!

zcat is much faster, so I quickly switched to a zcat-python combo solution. This approach also happens to leverage 2 cores, one for the zcat, the other for the python script.

> cat test-zcat.py
import sys
for line in sys.stdin:
    pass

> time $(zcat 10-million-lines.gz | python test-zcat.py)
real    0m3.642s
user    0m5.056s
sys     0m0.508s

3.6 SECONDS per file is much nicer, no? 3.6 seconds * 175 files = 10 minutes spent reading files. Definitely acceptable.

Dive into the python for loop

So now we’re here:

zcat part-*.gz | parse_accounts.py

What shall that parse_accounts.py look like? Something like this now:

import sys
for line in sys.stdin:
   # 1. parse line for account_id and metric_type
   # 2. write to appropriate file for account_id and metric_type
Parsing the lines

Recall that we want to extract ACCOUNT_ID and METRIC_TYPE from each line, and each line takes this form:

"[ACCOUNT_ID],[METRIC_TYPE],[DATE],[VALUE]"    COUNT

I’ll save us some pain and tell you to forget doing regular expression group matches. Using re.match() was 2X slower than line.split(). The fastest way I found was this:

# 1. parse line for account_id and metric_type
key = line.split(',')
account_id = key[ACCOUNT_ID_INDEX][1:] # strip the leading quote (")
metric_type = key[METRIC_TYPE_INDEX]

Note: I’m putting account_id and metric_type in variables here for clarity’s sake, but let me say this: If you’re going to be running a piece of code 1.75 billion times, it’s time to abandon clarity and embrace efficiency. If you’re only going to be accessing a variable one time, don’t bother setting it in a variable. If more than once, do bother setting it. You’ll see what I mean when it all comes together below.

Writing the files

At first, I tried to be clever and only open the files as needed, like this:

import sys
from collections import defaultdict
OUT_FILES = defaultdict(dict)
for line in sys.stdin:
    # 1. parse line for account_id and metric_type
    key = line.split(',')
    account_id = key[ACCOUNT_ID_INDEX][1:] # strip leading quote
    metric_type = key[METRIC_TYPE_INDEX]

    # 2. write to appropriate file for account_id and metric_type
    if metric_type not in OUT_FILES[account_id]:
         OUT_FILES[account_id][metric_type] = \
                 open(os.path.join(account_id, key[METRIC_TYPE_INDEX]), "wb")

    OUT_FILES[account_id][metric_type].write(line)

close_outfiles()  # close all the files we opened

Here I was at about 5 hours of processing time. Not bad considering where I started, but more paring could be done.

Again, I invoke the admonition to optimize the hell out of things when you’re performing an operation so many times.

So I should eliminate the extraneous if statement, right? Why did I need to open the files conditionally? So I didn’t open files unnecessarily? Who cares if a few extra files are opened and not used?

I ended up at essentially something like this:

import sys
from collections import defaultdict
OUT_FILES = defaultdict(dict)

open_outfiles()  # open all files I could possibly need

for line in sys.stdin:
    # 1. parse line for account_id and metric_type
    key = line.split(',')
    account_id = key[ACCOUNT_ID_INDEX][1:] # strip leading quote

    # 2. write to appropriate file for account_id and metric_type
    OUT_FILES[account_id][key[METRIC_TYPE_INDEX]].write(line)

close_outfiles()  # close all the files we opened

Final execution time: 1 hour 50 minutes.

try/finally

One last takeaway here is that sometimes you need to invest a good bit of development time in order to save yourself what might be an unacceptable amount of processing time. It requires patience and can be painful, sometimes with days’ worth of work ultimately abandoned.

But if you find yourself thinking, “There’s got to be a way to make this faster,” then it’s at least worth trying. I stopped short of rewriting it in C. Now that would have been faster!

If you’re interested in working on the next set of problems at Bitly, we’re actively hiring.