Showing posts with label software. Show all posts
Showing posts with label software. Show all posts

Wednesday, May 28, 2008

Thoughts on twitter scalability problem

After I read the original twitter post on there architecture vows, I searched a bit more on the blogsphere for getting more insight into what the architecture of twitter would be and where in lies the problem. Most of the stuff I read agree on the point that the problem is due to the large number of people that a person follows or is followed by. Because of this during special events a large number of tweets are posted which result in heavy load on databases. Where the opinions differ is on the point that is this heavy load on db a read load or a write load. To explain this let me quote from here

"Nothing is as easy as it looks. When Robert Scoble writes a simple “I’m hanging out with…” message, Twitter has about two choices of how they can dispatch that message:

  1. PUSH the message to the queue’s of each of his 6,864 followers, or
  2. Wait for the 6,864 followers to log in, then PULL the message."

Doing 1 means that writes will be expensive but when a user logs in, only by reading his local queue he can be shown the tweets of all the users he is following. So read would be pretty fast. This is like email.
Doing 2 would mean that write would be fast as it just needs to write to one place, but while reading if a user is following a large number of users, twitter will have to first look up all the users that this guy is following and then look up messages of all those users, then combine them and generate the page for the logged in guy. Now all this users would typically reside on different databases. Therefore read would hit many databases and data needs to be combined across all the different machines. Read here would be quite expensive.

So what does twitter do. This guy says that twitter is optimising writes while here it mentions that twitter is optimising reads. My personal hunch is that twitter stores the tweets just once and pays on the read because you can have caching to make reads much faster but with writes you are stuck. They have to go to the database. Also isn't this similar to what lets say google would do when returning results for your search. It would get matching pages from various machines (because the index is split across multiple machines) and then combine them, rank them and generate the final query page. Ofcourse with google one advantage would be that a large number of queries probably would be repeated in a day and so there results can be got from the cache instantly. But each user would have a different sets of people he is following on twitter so it will not make sense to cache the page of individual user. Rather the tweets need to be cached and then the combination needs to be done on fly as is pointed out here.

Thursday, April 3, 2008

MapReducing at Google

Map Reduce is a programming paradigm/framework developed by engineers at Google that allows people to write distributed programs easily. It is pretty much the backbone of
most huge data crunching applications at google which need to process data in tera/peta bytes of size such as query logs, web index, orkut data, map tiles, advertiser database and many more. It basically abstracts out the whole rigmarole of distributing my program across 1000s of machines which involves non trivial steps like dividing the input data, taking care of machine/program failures, combining the output from all the copies and so on. The framework takes care of all this while I just have to write my main data processing logic albeit in a special way. I will explain that a bit and then discuss a bit about my experiences using it at google.

So in Map Reduce as the name suggests your whole process consists of two phases- Map and Reduce. There is also an intermediate phase called shuffle which is basically an orchestrator
to direct the data output by the Map phase to the Reduce phase in a proper way. Map phase consists of many mappers running across a large number of machines. Each mapper
processes a fragment of the input, one record at a time. A record can be anything, a line in a file, a node in a graph, a row in the database etc. But the limitation is that mapper can only examine one record at a time and while processing that record it does not have knowledge of any other record already seen or yet to be seen. For each record that Mapper sees, it can output a bunch of (Key, Value) pairs. For example if your record is a line and you are writing a word count application, you would output for each word in that line (word, count) pair. So your map code generally is an input processing phase which usually splits/converts/filter each record in a way which allows us to later combine the records in the reduce phase, but it does not do any combination itself.

Now the output generated by the Map phase acts as the input to the Reduce phase. Again you have multiples Reducers running across some machines, and each reducer processes a fragment of Map output. But the important distinction here is that, at a time Reducer gets to see all the pairs which have the same Key, even if they were generated by different mappers. So Reducer can then combine all those pairs to generate one or more output records. For example in our counting application, a single Reducer gets to see all the pairs which have the word "the" as there key and it can then add all the values to output the total count for word "the" in our input corpus.

That is all there is to Map Reduce essentially. There are other details I have skipped but this is the basic paradigm. The framework takes care of moving the data around across machines, running multiple copies of Map/Reduce, restarting Map or Reduce if they fail etc.All a programmer has to do is write the Map and the Reduce logic. This simple paradigm has found a surprisingly large numbers of uses at Google with number of Map Reduce code checked into code base continously growing. You can fine the nitty gritties and usage statistics at http://labs.google.com/papers/mapreduce-osdi04.pdf.

I want to touch upon two practicle aspects of programming with this paradigm:
1. Most of the applications at Google using Map Reduce actually have mutltiple Map Reduces running with output of one step feeding into next phase as input. It is a pipeline
where each step moulds the input slightly to take it closer to the desired output. This is necessary because of the simplicity of the paradigm which means that you cant do overly complicated things in a single pass. For example lets say you want to generate a histogram of word counts in you input, with the desired output being number of words that occur a particular number of times in the input. You will require two map reduce passes for this. First one will output the count of each word as in our previous example. In second pass, the input record now is (word, count) pair. Our Mapper takes this pair and inverts it thus outputting pair (count, word). So Count is our key now which means that all the words with the same count will go to the same Reducer. Our reducer just has to count the number of record it gets and output (count, number_of_words_with_that_count) and we are done. There are various tools at Google to manage this pipeline, which keep track of which step needs to run after which and on failure it doesnt start the whole pipeline again but continues from the step that failed.

2. Most of the times writing your Mapper and Reducer is straightforward but in some cases it requires great care. One thing to keep in mind is that you should try to distribute your
Map output to the Reducer as evenly as possible. Because if one Reducer ends up getting a large number of records, it can become a bottleneck, especially if its doing something non trivial with those. This is something I faced personally and I had to do some clever hacks, suggested by a fellow Googler, to keep things tractable. For example lets say for each country, I want a query count for each search query occuring in that country. My input is the query logs where each record contains a single query and auxilliary information about it such as the country it originated from. One way of doing this might be that in my Mapper I output (country, query). So all the queries occuring in a particular country go to a single reducer which can then compute the count for each query. But two problems with this. Firstly my number of Reducers here is limited to couple of hundred and secondly some reducers might get disproportiantely large number of pairs than others because query distribution over geography is highly skewed. So a reducer for country like US might become a bottleneck. To avoid this, what I can do is in my Mapper, the key I use is Country_FirstWordInQuery. This would work because duplicate queries would obviosuly share the First Word and would therefore go to the same Reducer. Instead of first word I can also use longest word in the query, as the query distribution across longest word would be more balanced. Even after doing this your reducer might be slow so you might have to optimise it. Infact I had to create an inverted index inside a Reducer I wrote to make it efficient.

What would be interesting is people doing some research on this model and abstracting things like what can/cannot be done in a single pass in MapReduce.