Tuesday, January 26, 2010

some useful knowleges in Hadoop

1. Visual Report

Hadoop Core may be configured to report detailed information about the activities on the cluster. Hadoop Core may report to a file, via Ganglia (http://ganglia.info/), which provides a framework for displaying graphical reports summarizing the activities of large clusters of machines. The file hadoop-metrics.properties controls the reporting. The default is to not report.

2. JVM options for the task virtual machines

The mapred.child.java.opts parameter is commonly used to set a default maximum heap size for tasks. The default value is -Xmx200m. Most installation administrators immediately change this value to -Xmx500m. A significant and unexpected influence on this is the heap requirements (io.sort.mb), which by default will cause 100MB of space to be used for sorting.

During the run phase of a job, there may be up to mapred.tasktracker.map.tasks.maximum map tasks and mapred.tasktracker.reduce.tasks.maximum reduce tasks running simultaneously on each TaskTracker node, as well as the TaskTracker JVM. The node must have sufficient virtual memory to meet the memory requirements of all of the JVMs. JVMs have non-heap memory requirements; for simplicity, 20MB is assumed.

A cluster that sets the map task maximum to 1, the reduce task maximum to 8, and the JVM heap size to 500MB would need a minimum of (1 + 8 + 1) * (500+20) = 10 * 520 = 5200MB, or 5GB, of virtual memory available for the JVMs on each TaskTracker host. This 5GB value does not include memory for other processes or servers that may be running on the node.

The mapred.child.java.opts parameter is configurable by the job.

3. Enable Job Control Options on the Web Interfaces

Both the JobTracker and the NameNode provide a web interface for monitoring and control. By default, the JobTracker provides web service on http://JobtrackerHost:50030 and the NameNode provides web service on http://NamenodeHost:50070. If the webinterface.private.actions parameter is set to true, the JobTracker web interface will add Kill This Job and Change Job Priority options to the per-job detail page. The default location of these additional options is the bottom-left corner of the page (so you usually need to scroll down the page to see them).

Hadoop code efficient hint

The pattern of creating a new key object in the mapper for the transformation object is not the most efficient pattern. Most key classes provide a set() method, which sets the current value of the key. The context.write() method uses the current value of the key, and once the write() method is complete, the key object or the value object is free to be reused.

If the job is configured to multithread the map method, via conf.setMapRunner(MultithreadedMapRunner.class), the map method will be called by multiple threads. Extreme care must be taken in using the mapper class member variables. A ThreadLocal LongWritable object could be used to ensure thread safety.The following sample snippet demonstrates a common pattern for per-job management of map task parallelism. The choice of 100 was made for demonstration purposes and is not suitable for a CPU-intensive map task.

if (conf.getInt("mapred.tasktracker.map.tasks.maximum", 2)==1) {
conf.setMapRunnerClass(MultithreadedMapRunner.class);
conf.setInt("mapred.map.multithreadedrunner.threads", 100);
}



Object churn is a significant performance issue in a map method, and to a lesser extent, in the reduce method. Object reuse can provide a significant performance gain.

Saturday, January 23, 2010

Iterative process

Recently, I am working on some problems that can be solved by iterative process. Since I am not quite familiar with this type of logic, I met many problems on understanding this process. And when a problem comes out, I really don't know what is the key point reason, much less the solution.

Actually, several iterative processes I am working on these days are very powerful. Computing Google's pagerank, community learning in social network, random walk for getting Youtube's suggestion list, non-negative matrix factorization applied on face identification, text mining, graph clustering, and so on.

Take the pagerank as example, there is an excellent introduction and explanation of Pagerank Tech here.

PR(A) = (1-d) + d(PR(t1)/C(t1) + ... + PR(tn)/C(tn))

That is the equation to compute page A's pagerank. PR(t1) is the pagerank of webpage t1, and C(t1) is page t1's total number of outbound links. t1...tn are the pages linking to page A. d is a damping factor, which is usually set as 0.85.

So, how can we get the pagerank. Let's consider there are two pages A and B, and they have some connection pattern. We say PR(A) is depended on PR(B), and on the other hand, PR(B) depends on PR(A) too.

PR(A) = (1-d) + d(PR(B/C(B))
PR(B) = (1-d) + d(PR(A/C(A))

This is a catch 22 situation. Pagerank of A depends on accurate pagerank of B, pagerank of B depends on accurate pagerank of A. But initially we just have inaccurate A and B. So, that is why iterative process comes out. We run many iterative computation of A and B until the value of A and B are converged. Here, what I mean convergence may be that a small variation of this iteration's result and last iteration's result, always we can define some threshold.

I am thinking how to compute PageRank on MapReduce.

For example, here is a simple link graph, with the initial pagerank of each node. So, first we use mapreduce to parse the link graph and output the total number of outbound links of each node. Then, we compute pagerank iteratively until convergence. The details are as following: the key value format is .

Map1
Input: link graph data
parsing
Output:
Reduce1
Input: Map1's Output
for each key, counting outbound links
Output:

repeat:
Map2
Input: C> A> A> B>

combining the information: link destination, link source, source's pagerank, number of source's outbound links
Output:
>

Reduce2
Input: Map2's Output
updating pagerank for each key
PR(B)=PR(A)/2+PR(C)/2, PR(C)=PR(A)/2, PR(A)=PR(B)/1+PR(C)/2
Output: > >
until convergence

There is also a MapReduce implementation on computing Pagerank, please see here.

Say something irrelevant to this subject. Yesterday, my advisor in China would like to apply MapReduce to a quite simple information management system, with really simple computation logic (so simple that all the data is hold in a single machine, and all the computation work for a request can be done in miliseconds,). Instead of handling jobs that are with large computation work, they just want to improve this system's scalability when huge connections taken place.

At the beginning, I thought this was not feasible. Even though possible, I thought the performance would be lowered. But another guy gave me a hint, he said making a relay server, and designing a scheduler to distribute these connections to other servers. I guess this be the MapReduce's power. Actually, many problems can be handled by MapReduce.

In this particular case, we can set 1 mapper and n reducer, and the only mapper is the relay server. We can parse the ingress connect session information, and output each request session's initialization time as the key. Then we implement our own Partitioner class to distribute these sessions based on the initialization time. Instead of hash, which is the elements with the same key are hashed to the same destination, another hash is need, which is the elements with the same or nearly initialization time, are hashed to different destinations. Then the reducer as the processing worker to process the request. In this way, when a large number of request are occurred in a short period, it can distribute these request into multiple worker nodes to process it.

In fact, mapreduce will not improve the overall performance more than a simple relay server just having a scheduler. But I widened my knowledge on what MapReduce is.



Thursday, January 21, 2010

My first blog

Hi, everybody. I would like to set this as my personal study place. I will post my study or research observation here, sharing with all the people around the world.

This is my first post. Actually, I didn't prepare anything to post. So, I think I can say something about my current research and work. My current work is to have a deep understanding MapReduce, which is a parallel programming model, and also expect to find some improvement or enhancement of this model. All my work is based on Hadoop, an open source MapReduce implementation, including GFS(Google FileSystem, a distributed file system), BigTable(a distributed database)'s implementation HDFS, Hbase. The most recent work is on accelerating update function on Hadoop, these update functions are usually have many matrix multiplication computations and many iterations. So, MapReduce can handle this type of problem in a reasonable time when the input data is really large.

Unfortunately, there is no meaningful and publicable result up to now. Anyway, I just like to share what I am learning and what I observed, and whatever it is. And I expect this blog would push myself to learn more.

Another thing is that I would like to practice my English here. Even though I am now in the U.S., all the people around are mostly Chinese. No chance to speak English, no improvement on English.

So this is a good place, where I can blog, where I can learn, where I can speak, where I can see my growing in the future.