Thursday, April 3, 2008

MapReducing at Google

Map Reduce is a programming paradigm/framework developed by engineers at Google that allows people to write distributed programs easily. It is pretty much the backbone of
most huge data crunching applications at google which need to process data in tera/peta bytes of size such as query logs, web index, orkut data, map tiles, advertiser database and many more. It basically abstracts out the whole rigmarole of distributing my program across 1000s of machines which involves non trivial steps like dividing the input data, taking care of machine/program failures, combining the output from all the copies and so on. The framework takes care of all this while I just have to write my main data processing logic albeit in a special way. I will explain that a bit and then discuss a bit about my experiences using it at google.

So in Map Reduce as the name suggests your whole process consists of two phases- Map and Reduce. There is also an intermediate phase called shuffle which is basically an orchestrator
to direct the data output by the Map phase to the Reduce phase in a proper way. Map phase consists of many mappers running across a large number of machines. Each mapper
processes a fragment of the input, one record at a time. A record can be anything, a line in a file, a node in a graph, a row in the database etc. But the limitation is that mapper can only examine one record at a time and while processing that record it does not have knowledge of any other record already seen or yet to be seen. For each record that Mapper sees, it can output a bunch of (Key, Value) pairs. For example if your record is a line and you are writing a word count application, you would output for each word in that line (word, count) pair. So your map code generally is an input processing phase which usually splits/converts/filter each record in a way which allows us to later combine the records in the reduce phase, but it does not do any combination itself.

Now the output generated by the Map phase acts as the input to the Reduce phase. Again you have multiples Reducers running across some machines, and each reducer processes a fragment of Map output. But the important distinction here is that, at a time Reducer gets to see all the pairs which have the same Key, even if they were generated by different mappers. So Reducer can then combine all those pairs to generate one or more output records. For example in our counting application, a single Reducer gets to see all the pairs which have the word "the" as there key and it can then add all the values to output the total count for word "the" in our input corpus.

That is all there is to Map Reduce essentially. There are other details I have skipped but this is the basic paradigm. The framework takes care of moving the data around across machines, running multiple copies of Map/Reduce, restarting Map or Reduce if they fail etc.All a programmer has to do is write the Map and the Reduce logic. This simple paradigm has found a surprisingly large numbers of uses at Google with number of Map Reduce code checked into code base continously growing. You can fine the nitty gritties and usage statistics at http://labs.google.com/papers/mapreduce-osdi04.pdf.

I want to touch upon two practicle aspects of programming with this paradigm:
1. Most of the applications at Google using Map Reduce actually have mutltiple Map Reduces running with output of one step feeding into next phase as input. It is a pipeline
where each step moulds the input slightly to take it closer to the desired output. This is necessary because of the simplicity of the paradigm which means that you cant do overly complicated things in a single pass. For example lets say you want to generate a histogram of word counts in you input, with the desired output being number of words that occur a particular number of times in the input. You will require two map reduce passes for this. First one will output the count of each word as in our previous example. In second pass, the input record now is (word, count) pair. Our Mapper takes this pair and inverts it thus outputting pair (count, word). So Count is our key now which means that all the words with the same count will go to the same Reducer. Our reducer just has to count the number of record it gets and output (count, number_of_words_with_that_count) and we are done. There are various tools at Google to manage this pipeline, which keep track of which step needs to run after which and on failure it doesnt start the whole pipeline again but continues from the step that failed.

2. Most of the times writing your Mapper and Reducer is straightforward but in some cases it requires great care. One thing to keep in mind is that you should try to distribute your
Map output to the Reducer as evenly as possible. Because if one Reducer ends up getting a large number of records, it can become a bottleneck, especially if its doing something non trivial with those. This is something I faced personally and I had to do some clever hacks, suggested by a fellow Googler, to keep things tractable. For example lets say for each country, I want a query count for each search query occuring in that country. My input is the query logs where each record contains a single query and auxilliary information about it such as the country it originated from. One way of doing this might be that in my Mapper I output (country, query). So all the queries occuring in a particular country go to a single reducer which can then compute the count for each query. But two problems with this. Firstly my number of Reducers here is limited to couple of hundred and secondly some reducers might get disproportiantely large number of pairs than others because query distribution over geography is highly skewed. So a reducer for country like US might become a bottleneck. To avoid this, what I can do is in my Mapper, the key I use is Country_FirstWordInQuery. This would work because duplicate queries would obviosuly share the First Word and would therefore go to the same Reducer. Instead of first word I can also use longest word in the query, as the query distribution across longest word would be more balanced. Even after doing this your reducer might be slow so you might have to optimise it. Infact I had to create an inverted index inside a Reducer I wrote to make it efficient.

What would be interesting is people doing some research on this model and abstracting things like what can/cannot be done in a single pass in MapReduce.

3 comments:

Saurabh Joshi said...

wow, This really explains why google is leading at many frontiers. I guess, the terms "Map" and "Reduce" is borrowed from functional programming paradigm.

Nice post indeed!

viked said...

yeah you are right the terms and the concept is borrowed from functional programming. Reduce is same as foldr in haskell

Nishit said...

Nice post with good practical examples. Keep posting your experiences at Google.