MapReduce at Rackspace
January 23rd, 2008 – Mailtrust
Posted by Stu Hood, Software Engineer, Mailtrust
The Hadoop project has been getting a lot of press recently, and for good reason: the project aims to create an open-source framework for commodity parallel processing. The two primary components of Hadoop are its distributed file system and MapReduce, which allow anyone with a cluster of computers to write simple code to perform tasks on massive datasets quickly and reliably. Although Hadoop’s version number (0.15.2 at the time of writing) indicates that it is beta software, it is quite stable, and already capable of scaling to thousands of machines.
Here at Mailtrust, Rackspace’s mail division, we are taking advantage of Hadoop to help us wrangle several hundred gigabytes of email log data that our mail servers generate each day. We’ve built a great tool for our support team that lets them search mail logs in order to troubleshoot problems for customers. Until recently, this log search and storage system was centered around a traditional relational database, which worked fine until the exponential growth in the volume of our dataset overcame what a single machine could cope with. The new logging backend we’ve developed based on Hadoop gives us virtually unlimited scalability.
The way it works is that raw logs get streamed from hundreds of mail servers to the Hadoop Distributed File System (”HDFS”) in real time, and scheduled MapReduce jobs run to index the new data using Apache Lucene and Solr. Once the indexes have been built, they are compressed and stored away in HDFS. Each Hadoop datanode also runs a Tomcat servlet container, which hosts a number of Solr instances that pull and merge the new indexes, and provide really fast search results to our support team.
Additionally, using MapReduce we are now able to look at our log data in all sorts of interesting ways. For example, we run nightly MapReduce jobs to collect statistics about our mail system, such as spam counts by domain, bytes transferred and number of logins. Now whenever we think of complex question about our customers’ usage patterns, we can pull the answer from our logs within hours via MapReduce. This is powerful stuff.
A few weeks ago we were curious about which regions in the world our customers are logging in from most frequently. So I wrote a MapReduce job that associates user logins to a geographical location (using MaxMind’s GeoIP data), and within a few hours we had the answer. This data was so useful that we’ve scheduled the MapReduce job to run monthly and we will be using this data to help us decide which Rackspace data centers to place new mail servers in as we grow. Below are the results from the MapReduce job if you are curious…
> Percentage of POP/IMAP logins by country for January 2008
>
> 83.2245% United States
> 6.3361 % Canada
> 2.1974 % India
> 1.7406 % Germany
> 1.1941 % Mexico
> 1.0838 % United Kingdom
> 0.3149 % Italy
> 0.2590 % Australia
> 0.2504 % Malaysia
> 0.2006 % China
> 0.1699 % South Africa
> 0.1373 % Brazil
> 0.1321 % France
> 0.1302 % Europe
> 0.1289 % Israel
> 0.1259 % Spain
> 0.1161 % Dominican Republic
> 0.1056 % United Arab Emirates
> 0.0968 % Argentina
> 0.0928 % Philippines
> 0.0899 % Singapore
> 0.0877 % Greece
> 0.0834 % Guatemala
> 0.0711 % Jordan
> 0.0698 % Panama
> 0.0554 % Costa Rica
> 0.0520 % Belgium
> 0.0519 % Vietnam
> 0.0499 % Switzerland
> 0.0477 % Kazakstan
> 0.0468 % New Zealand
> 0.0456 % Japan
> 0.0453 % Ghana
> 0.0417 % Thailand
> 0.0409 % Qatar
> 0.0381 % Netherlands
> 0.0376 % Egypt
> 0.0373 % Afghanistan
> 0.0353 % Romania
> 0.0350 % Hong Kong
> 0.0344 % Indonesia
> 0.0323 % Peru
> 0.0299 % Taiwan
> 0.0284 % Sri Lanka
> 0.0276 % Russian Federation
> 0.0276 % Norway
> 0.0264 % Venezuela
> 0.0264 % Ireland
> 0.0252 % Satellite Provider
> 0.0248 % Colombia
> 0.0238 % Pakistan
> 0.0230 % Cyprus
> 0.0229 % Puerto Rico
> 0.0229 % Kuwait
> 0.0225 % Austria
> 0.0221 % Portugal
> 0.0221 % Trinidad and Tobago
> 0.0220 % El Salvador
> 0.0203 % Poland
> 0.0200 % Czech Republic
> 0.0166 % Turkey
> 0.0165 % Nigeria
> 0.0155 % Iran, Islamic Republic of
> 0.0147 % Chile
> 0.0142 % Ecuador
> 0.0131 % Virgin Islands, U.S.
> 0.0130 % Finland
> 0.0127 % Jamaica
> 0.0126 % Denmark
> 0.0120 % Lebanon
> 0.0117 % Kenya
> 0.0117 % Netherlands Antilles
> 0.0116 % Honduras
> 0.0113 % Korea, Republic of
> 0.0112 % Bangladesh
> 0.0108 % Saudi Arabia
> 0.0096 % Turks and Caicos Islands
> 0.0083 % Ukraine
> 0.0078 % Sweden
> 0.0075 % Uruguay
> 0.0071 % Bahamas
> 0.0063 % Bulgaria
> 0.0057 % Morocco
> 0.0050 % Uzbekistan
> 0.0047 % Mauritius
> 0.0046 % Kyrgyzstan
> 0.0045 % Hungary
> 0.0041 % Cuba
> 0.0041 % Tanzania, United Republic of
> 0.0040 % Cayman Islands
> 0.0038 % Anonymous Proxy
> 0.0038 % Bahrain
> 0.0038 % Bosnia and Herzegovina
> 0.0036 % Belarus
> 0.0035 % Iceland
> 0.0032 % Mauritania
> 0.0032 % Tajikistan
> 0.0032 % Saint Lucia
> 0.0031 % Uganda
> 0.0030 % Oman
> 0.0030 % Bermuda
> 0.0027 % Paraguay
> 0.0025 % Latvia
> 0.0025 % Nepal
> 0.0025 % Tunisia
> 0.0024 % Cameroon
> 0.0024 % Syrian Arab Republic
> 0.0021 % Brunei Darussalam
> 0.0020 % Slovenia
> 0.0020 % Burkina Faso
> 0.0020 % Slovakia
> 0.0019 % Maldives
> 0.0019 % Nicaragua
> 0.0019 % Estonia
> 0.0019 % Iraq
> 0.0018 % Barbados
> 0.0018 % Sierra Leone
> 0.0017 % Asia/Pacific Region
> 0.0016 % Virgin Islands, British
> 0.0016 % Bolivia
> 0.0016 % Monaco
> 0.0015 % Saint Kitts and Nevis
> 0.0015 % Lithuania
> 0.0014 % Antigua and Barbuda
> 0.0014 % Suriname
> 0.0011 % Aruba
> 0.0010 % Libyan Arab Jamahiriya
Entry Filed under: Uncategorized
Comments
23 Comments Add your own
2. Daria Werbowy Pic… – April 5th, 2008 at 9:14 am
Daria Werbowy Pic…
I Googled for something completely different, but found your page…and have to say thanks. nice read….
3. Max… – August 2nd, 2008 at 9:26 pm
I’m curious how you guys are loading data into Hadoop? I don’t quite follow the “streaming” thing…
Are you guys doing real time indexing? Giving support the ability to search logs as they come in?
4. Stu Hood… – August 25th, 2008 at 8:12 am
Re: Max:
The loading of data is streaming, but the indexing is not. We write to a file in Hadoop until it reaches a size below the block size, or until it times out, and then we close and move it to where it will be processed.
Our processing jobs run every 10 minutes or so, meaning that the logs become available for Customer Care after about 15. We’ve executed around 150K jobs on this cluster with 3 restarts.
5. Sourav Mazumder… – November 28th, 2008 at 11:09 pm
Hi Stu,
Couple of Qs on indexing.
1. What is the indexing rate you could achieve using Solr on Hadoop? How many Hadoop nodes you have used for that ? I’ve a requirement of indexing 3-6M documents/hr. How many hadoop nood I might need ?
2. After the index is created do you cp the index files to local file system for Solr searcher ?
3. For indexing documents in HDFS do you essentially right a SolrJ client and make it running in paralle using MapReduce ?
Regards,
Sourav
6. Stu Hood… – December 2nd, 2008 at 9:33 am
> 1. What is the indexing rate…
The documents we are indexing are really small (500B average), and each node can index around 6 million of them per hour.
> 2. After the index is created do you cp the index files to local file system…
We create the indexes on local disk in our reducer, and compress them into HDFS after they are complete.
When we pull the index to make it available for search, we decompress it to local disk and merge it using the Lucene IndexWriter.addIndexes method before calling /commit on the Solr instance. The Nutch project created an IndexReader that can do read-only access on HDFS, but for speed reasons, we decided not to take that approach.
> 3. For indexing documents in HDFS do you essentially right a SolrJ client…
Since we are indexing to local disk, we use an embedded SolrCore, in the same JVM as the reducer.
Thanks!
7. Sourav Mazumder… – December 19th, 2008 at 8:04 pm
Thanks Stu for your explanations.
I’m also curious to know that
a) how many data nodes you use to store 6.3 TB of compressed data
b) what is the daily volume of data you index (in Bytes) and how many days data you store in HDFS at a given point of time
Regards,
Sourav
8. Stu Hood… – December 22nd, 2008 at 4:40 pm
> a) how many data nodes you use to store 6.3 TB of compressed data
We have 10 Hadoop data nodes, with 3 500GB hard drives each.
> b) what is the daily volume of data you index (in Bytes)
We managed to eliminate a lot of excess log volume since this was written, so we are currently indexing an average of 140,000,000,000 Bytes per day.
> … and how many days data you store in HDFS at a given point of time
We archive 6 months worth of compressed indexes in Hadoop.
9. Sourav Mazumder… – December 23rd, 2008 at 10:38 am
Stu,
Thanks a lot for these detailed inputs. This looks really cool. You are achieving processing of such a huge volume of data using 10 data nodes. However, I presume, each node must be powerful enough in terms of number of processor and RAM - at least each must be having around 12GB RAM and 8 processors.
Another point I’m not clear on is how are you achieving failover at Solr’s side. Are you always replicating the merged indexes (in a local file server) in othe boxes so that if any one of the data node goes down then search queries can be served from reemainng boxes ?
10. Stu Hood… – December 23rd, 2008 at 11:04 am
> However, I presume, each node must be powerful enough in terms of number of processor and RAM…
They are definitely just commodity boxes, with 8GB of RAM, and dual core processors. We are mostly storage bound at this point, since the system can perform the processing in plenty of time.
> Another point I’m not clear on is how are you achieving failover at Solr’s side.
The merged indexes are not replicated at all… only one Solr node has a copy of each index, so failover involves a brief downtime for queries. If we lose a node, other nodes (consistent hashing) become responsible and merge the indexes from the copies we always have in Hadoop.
11. seymour zhang… – January 19th, 2009 at 3:58 am
Hell Stu,
This is really cool.
Would you please share us what you have progressed on your Logging 3.1 and 4.0? Do these versions have been in production currently?
Thank you very much.
12. schubert zhang… – February 14th, 2009 at 10:31 am
Thanks the author and Rackspace for sharing such a great post.
We are also using Hadoop (MapReduce, HDFS) and Lucene/Solr to build our distributed indexing and querying system just like this post. We also support ad-hoc querying by some MapReduce jobs.
Now we also provide JDBC/SQL interface to applicaitons. And at the platform level, we also use Bigtable (Hypertable or HBase) to manage a global index. Bigtable can solve the problems of index merging and provide global access. I think it is better than shard solutions.
Could you please share any idea of using Bigtable in your solution.
13. Stu Hood… – February 16th, 2009 at 2:49 pm
> …I think it is better than shard solutions.
> Could you please share any idea of using Bigtable in your solution.
We definitely looked at Bigtable when we were designing our solution, but it was not mature enough at the time. Also, we would have needed to create an inverted index on top of the tables, and shard it by time in order to support expiring old data.
How do you use Bigtable to host Lucene/Solr indexes?
14. schubert zhang… – February 16th, 2009 at 10:44 pm
Hi Stu,
Thank you very much.
Your questions are great. We also meet such problems and are finding appropriate solution.
1. …but it was not mature enough at the time.
– Yes, we have tried Hypertable and HBase, and found their performance and reliability are problems, especially HBase. Our new design(with Bigtable) is still in experiment.
2. we would have needed to create an inverted index on top of the tables, and shard it by time in order to support expiring old data.
– Now, we just create simple inverted index in bigtable (by selecting appropriate rowkey).
How to expire the old data is a hard issue. we now just shard tables by time (create new table each month).
3. How do you use Bigtable to host Lucene/Solr indexes?
– Now we does not use Lucene/Solr in the experiment. Just simple index for userid and time (rowkey=userid+time).
I appreciate to talk with you for these solution and issues. And look forward to lean good ideas from you.
Thanks,
Schubert
15. Joe… – February 26th, 2009 at 3:38 am
This project will come in handy for our sister company who offers hosting. We are faced with the task of speeding up the processing of massive mail logs. Thanks for explaining how it works. I am forwarding this info the team.
16. akonkol… – March 14th, 2009 at 10:26 pm
Thanks for the great post, I have a few questions (naturally)..
1) Are you not leveraging hadoop for distributed search? It sounds like you are using solr’s distributed shard search. If so have you encountered any problems with sorting results from different shards.
2) Have you developed your own syslog server in java in order to stream logs directly to hdfs?
3) It sounds like you pull fresh indexes off of hdfs and merge those indexes to a pre-existing local index… does this mean that each datanode (which runs solr/lucene) has the same index?
Thanks,
-a
17. schubert zhang… – April 6th, 2009 at 11:58 pm
Hi Stu,
I am interesting in indexing and the sharding of indexing.
1. What is the index sharded by?
“The processing jobs run every 10 minutes.” Does it means the index is firstly sharded by time - 10 minutes?
And in a processing jobs (for indexing), what is the index sharded by (reduce partitioned by)? I think it should be userid, right?
2. In each solr instance for query, does it hold some index shards or the complete index?
Thank you.
18. schubert… – May 10th, 2009 at 2:49 pm
Hi stu,
I have another question.
By using Lucene/Solr, how do you deal with the sort of the search result? How to ensure the result are in right order by time.
Schubert
19. don don… – June 13th, 2009 at 11:44 pm
Hi,
Is it possible to get detailed technical docs on how you integrate Hadoop and Solr ?
Also is there any plan to open-source your system?
20. Neel… – July 29th, 2009 at 3:17 am
Hi,
Now country wide % login can be obtained with PIG-latin based simple queries.
Did Rackspace evaluated PIG latin instead of custom map-reduce. It would be good if you can publish some comparasion between the 2 approaches.
Thank you
-NP
21. Phil Whelan (MailChannels… – August 4th, 2009 at 5:59 pm
Hi,
We’re also using Hadoop + Lucene to index email logs from our customers, so I was very interested to read your article. We also make heavy use of ActiveMQ.
We keep only a few billion loglines available for search at any one time, so I think we’re on a much smaller scale to you.
“The processing jobs run every 10 minutes.” Are your customers able to wait this long? We had to re-architecture, as we found our customers wanted to search for delivery problems as soon as they happened and could not afford to keep support staff on the phone for 10 minutes. We’ve managed to reduce the time from email delivery attempt to appearing in the log search results in 30-60 seconds, and we are hoping we can maintain that as we scale out further. I would very much like to know what problems you’ve encountered as you’ve scaled up this system.
Great article!
Thanks,
Phil Whelan
MailChannels
22. farphadet… – December 19th, 2009 at 12:51 pm
Hy,
Do you think you will reverse the code to the open source community ?
23. Siddhant… – December 24th, 2009 at 6:20 am
Is there any performance benefit that you obtained by fitting Solr inside the Hadoop/HDFS architecture (compared against the regular Solr distributed search support)? I would guess that the performance should degrade if Solr has to read indexes from inside the HDFS, simply because of the loss of structure inside it.
Leave a Comment
Some HTML allowed:
<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <code> <em> <i> <strike> <strong>
Trackback this post – Subscribe to the comments via RSS Feed
1. sameer maggon's blog… – February 1st, 2008 at 10:59 am
Hadoop comes to rescue…
Here are two posts describing a solution to problems faced when you have to process large amount of data streaming in from a single or multiple sources.
How Rackspace Now Uses MapReduce and Hadoop to Query Terabytes of Data
MapReduce at Rack…