什么是MapReduce
MapReduce是hadoop进行多节点计算时采用的计算模型,说白了就是hadoop拆分任务的一套方法论,刚接触MapReduce这个概念时,一时很难理解,也查了很多资料,因为每个人理解不一样,反而看的越多越糊涂,其实本质是很简单的东西,这里举一个例子帮助理解,因为网上大部分是hadoop官方计算单词(wordcount)的例子,这里就换一个场景举例。
假设有以下一份成绩单
1,张三,78,87,69 2,李四,56,76,91 3,王五,65,46,84 4,赵六,89,56,98 ...
各列分别是 编号,学生姓名,语文成绩,数学成绩,英语成绩
,现在要求统计各科成绩最高分,假设这份成绩单非常非常的长,有上千万行,在没有任何计算机系统的帮助下,要怎么靠人工解决这个问题?
- 单人统计
专门派一个人进行统计工作,优点是简单,缺点也很明显,需要非常长的时间,甚至数据量达到一定程度,一个人一辈子可能也统计不完
- 多人统计
如果有足够的人可以进行统计工作,要怎么去协调这些人?假设成绩单有100w行并且有1000人可以进行统计
-
- 设一个管理员,管理员把成绩单平均拆分成1000份给1000个人,每个人需要统计1000行数据
-
- 管理员制作一个表格,要求每个人把自己统计的结果填入该表格,表格格式如下
科目 | 人员1结果 | 人员2结果 | … | 人员1000结果 |
---|---|---|---|---|
语文 | ||||
数学 | ||||
英语 |
-
- 管理员最终得到了如下数据
科目| 人员1结果|人员2结果|…|人员1000结果
语文 | 80 | 85 | … | 76 |
数学 | 89 | 90 | … | 88 |
英语 | 94 | 85 | … | 90 |
-
- 各科各有1000个结果,管理员又把这个表格拆成了100个小表格分给100个人进行统计,这样每个小表格各有10个数据,小表格格式如下
第一个人领到的小表格
科目 | 人员1结果 | 人员2结果 | … | 人员10结果 |
---|---|---|---|---|
语文 | 80 | 85 | … | 76 |
数学 | 89 | 90 | … | 88 |
英语 | 94 | 85 | … | 90 |
第二个领到的小表格
科目 | 人员11结果 | 人员12结果 | … | 人员20结果 |
---|---|---|---|---|
语文 | 83 | 75 | … | 88 |
数学 | 79 | 95 | … | 58 |
英语 | 94 | 85 | … | 90 |
-
- 管理员再次把每个人的结果收集上来,又得到了100份数据,如果管理员愿意又可以把这个数据进行拆分交给多个人进行统计,如此反复最终得到一个最大值结果,管理员也可以自己完成最后的统计,因为数据量不大。
那么在这个过程中,我们看到了,一份庞大的成绩单经过以下几个步骤后,最终我们获得了我们想要的结果
- 成绩单拆分多份
- 每一份进行单独统计
- 对结果进行登记
- 对统计的结果可以再次进行拆分,也可以直接进行统计
- 如此反复之后最终得到了结果
那么把这个过程用MapReduce语言进行描述就可以是以下过程:
- 成绩单拆分多份- 分片(split)
- 每一份进行单独统计 – map
- 并且对结果进行登记 – shuffle
- 对统计的结果可以再次进行拆分- combiner
- 也可以直接进行统计 – reduce
另外在管理员的表格中,三个科目后面记录
开发
我们用实际java代码解决上面的问题,假设你已经按照上一篇教程安装好了hadoop集群环境
- 创建工程
你可以用你熟悉的ide创建一个普通java工程,建议用maven进行包管理,并加入以下包依赖
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.5.1</version> </dependency>
- 创建Mapper
Mapper对应是MapReduce中的map过程,在以下mapper代码:
StudentMapper.java
public class StudentMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { public void map(LongWritable longWritable, Text text, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException { String[] ss = text.toString().split(","); outputCollector.collect(new Text("语文"), new IntWritable(Integer.parseInt(ss[2]))); outputCollector.collect(new Text("数学"), new IntWritable(Integer.parseInt(ss[3]))); outputCollector.collect(new Text("英语"), new IntWritable(Integer.parseInt(ss[4]))); } }
StudentMapper实现了 Mapper<LongWritable, Text, Text, IntWritable>
接口,这里有四个参数,四个参数含义如下
1,张三,78,87,69
方法 map(LongWritable longWritable, Text text, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter)
的几个参数和上面含义一样,注意到outputCollector是一个数组,说明这里可以写入多个结果,reporter可以向hadoop汇报任务进度。在这个mapper里面,我们并没有做什么计算,我们只是把文本里面的成绩解析出来,并且按科目放到outputCollector中,相当于大家第一次都没干活,只是把数据整理好。经过mapper后,数据从
1,张三,78,87,69 2,李四,56,76,91 3,王五,65,46,84 4,赵六,89,56,98 ...
变成了
– | – | – | |||
---|---|---|---|---|---|
语文 | 78 | 56 | 65 | 89 | … |
数学 | 87 | 76 | 46 | 56 | … |
英语 | 69 | 91 | 84 | 98 | … |
StudentReducer.java
public class StudentReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text text, Iterator<IntWritable> iterator, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException { StringBuffer str = new StringBuffer(); Integer max = -1; while (iterator.hasNext()) { Integer score = iterator.next().get(); if (score > max) { max = score; } } outputCollector.collect(new Text(text.toString()), new IntWritable(max)); } }
Reducer就开始真正执行计算了,reducer函数 reduce(Text text, Iterator<IntWritable> iterator, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter)
参数含义如下:
- text:就是Mapper的第三个参数
- iterator:就是Mapper中写入outputCollector的数据,和第一参数组合起来就是mapper中的outputCollector
-
outputCollector:reducer计算后的结果需要写入到该参数中,这里我们写入的内容是类似
key:语文 value:90
结构的数据,所以类型为<Text, IntWritable>
前面提到过mapper会把数据整理好,并且按科目将成绩写入的outputCollector中,那么到了reducer这一步,hadoop就会把mapper写入的数据按照key进行汇总(也就是科目),并且交付给reducer,reducer负责计算里面最高分,并且也将结果写入outputCollector。
StudentProcessor
public class StudentProcessor { public static void main(String args[]) throws Exception { JobConf conf = new JobConf(StudentProcessor.class); conf.setJobName("max_scroe_poc1"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(StudentMapper.class); conf.setReducerClass(StudentReducer.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } }
我们还需要一个包含main函数的启动类,执行 mvn package
命令进行打包,我们假设包名为 hadoop-score-job.jar
,将jar包通过ftp等工具上传到服务器目录下。
- 上传数据
hadoop借助hdfs分布式文件系统,能够将大文件存储在多个节点,通过hdfs cli工具,我们感觉在操作本地文件一样,在上面的代码中 FileInputFormat.setInputPaths(conf, new Path(args[0]));
设置了MapReduce的数据来源,用户指定目录,该目录下文件作为数据来源,这里的目录就是hdfs中的目录,并且该目录必须存在,而且数据需要上传到该目录下,执行以下命令创建目录
hadoop fs -mkdir poc01_input
执行以下命令将数据导入到hdfs中
hadoop fs -put score.txt poc01_input
score.txt
内容为
1,张三,78,87,69 2,李四,56,76,91 3,王五,65,46,84 4,赵六,89,56,98
通过 ls
命令可以查看文件是否上传成功
$ hadoop fs -ls poc01_input Found 1 items -rw-r--r-- 1 hadoop supergroup 72 2020-12-13 15:43 poc01_input/score.txt
- 执行job
执行以下命令开始运行job
$ hadoop jar hadoop-score-job.jar com.hadoop.poc.StudentProcessor poc01_input poc01_output 20/12/13 16:01:33 INFO client.RMProxy: Connecting to ResourceManager at master/172.16.8.42:18040 20/12/13 16:01:33 INFO client.RMProxy: Connecting to ResourceManager at master/172.16.8.42:18040 20/12/13 16:01:34 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 20/12/13 16:01:34 INFO mapred.FileInputFormat: Total input files to process : 1 20/12/13 16:01:35 INFO mapreduce.JobSubmitter: number of splits:2 20/12/13 16:01:35 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1607087481584_0005 20/12/13 16:01:35 INFO conf.Configuration: resource-types.xml not found 20/12/13 16:01:35 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'. 20/12/13 16:01:35 INFO resource.ResourceUtils: Adding resource type - name = memory-mb, units = Mi, type = COUNTABLE 20/12/13 16:01:35 INFO resource.ResourceUtils: Adding resource type - name = vcores, units = , type = COUNTABLE 20/12/13 16:01:36 INFO impl.YarnClientImpl: Submitted application application_1607087481584_0005 20/12/13 16:01:36 INFO mapreduce.Job: The url to track the job: http://master:18088/proxy/application_1607087481584_0005/ 20/12/13 16:01:36 INFO mapreduce.Job: Running job: job_1607087481584_0005 20/12/13 16:01:43 INFO mapreduce.Job: Job job_1607087481584_0005 running in uber mode : false 20/12/13 16:01:43 INFO mapreduce.Job: map 0% reduce 0% 20/12/13 16:01:51 INFO mapreduce.Job: map 100% reduce 0% 20/12/13 16:01:57 INFO mapreduce.Job: map 100% reduce 100% 20/12/13 16:01:57 INFO mapreduce.Job: Job job_1607087481584_0005 completed successfully 20/12/13 16:01:57 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=84 FILE: Number of bytes written=625805 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=316 HDFS: Number of bytes written=30 HDFS: Number of read operations=9 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=2 Launched reduce tasks=1 Data-local map tasks=2 Total time spent by all maps in occupied slots (ms)=12036 Total time spent by all reduces in occupied slots (ms)=3311 Total time spent by all map tasks (ms)=12036 Total time spent by all reduce tasks (ms)=3311 Total vcore-milliseconds taken by all map tasks=12036 Total vcore-milliseconds taken by all reduce tasks=3311 Total megabyte-milliseconds taken by all map tasks=12324864 Total megabyte-milliseconds taken by all reduce tasks=3390464 Map-Reduce Framework Map input records=4 Map output records=12 Map output bytes=132 Map output materialized bytes=90 Input split bytes=208 Combine input records=12 Combine output records=6 Reduce input groups=3 Reduce shuffle bytes=90 Reduce input records=6 Reduce output records=3 Spilled Records=12 Shuffled Maps =2 Failed Shuffles=0 Merged Map outputs=2 GC time elapsed (ms)=395 CPU time spent (ms)=1790 Physical memory (bytes) snapshot=794595328 Virtual memory (bytes) snapshot=5784080384 Total committed heap usage (bytes)=533200896 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=108 File Output Format Counters Bytes Written=30
- hadoop-score-job.jar为上面打包的jar包,需要cd到jar包目录下执行命令
- com.hadoop.poc.StudentProcessor 包含main函数的类
- poc01_input 数据来源目录
- poc01_output 数据输出目录
job执行完后,结果会保存在 poc01_output
目录下
$ hadoop fs -ls poc01_output2 Found 2 items -rw-r--r-- 1 hadoop supergroup 0 2020-12-13 16:01 poc01_output2/_SUCCESS -rw-r--r-- 1 hadoop supergroup 30 2020-12-13 16:01 poc01_output2/part-00000 $ hadoop fs -cat poc01_output2/part-00000 数学 87 英语 98 语文 89