shuffle过程中的分区,排序和Combiner

微信扫一扫,分享到朋友圈

shuffle过程中的分区,排序和Combiner

Partition分区

map端的输出会进行分区,hadoop默认根据 HashPartitioner
分区。默认的分区方式是:key的hashCode % ReduceTask的个数。

可以自定义分区,

CustomPartitioner.java

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class CustomPartitioner extends Partitioner<IntWritable, Text> {
@Override
public int getPartition(IntWritable key, Text value, int numReduceTasks) {
if (key.get() % 2 == 0)
return 0;
else
return 1;
}
}

PMapper.java

public class PMapper extends Mapper<LongWritable, Text, IntWritable, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(new IntWritable(Integer.parseInt(value.toString())), NullWritable.get());
}
}

PReducer.java

public class PReducer extends Reducer<IntWritable, NullWritable, IntWritable, NullWritable> {
@Override
protected void reduce(IntWritable key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}

hadoop的分区有以下特点,

  1. 如果numReduceTasks>分区数,输出文件(part-r-000xx)个数等于numReduceTasks,但是会有空文件;
  2. 如果1<numReduceTasks<分区数,则会报错;
  3. 如果1==numReduceTasks,则只有一个输出结果文件;
  4. 如果numReduceTasks==0,则输出文件的个数有MapTasks的个数而定,输出文件中的结果为MapTask中的输出结果。
  5. 分区号必须从0开始,逐一累加。

case1

job.setNumReduceTasks(2)
,分区数等于numReduceTasks。输出文件夹下会包含part-r-00000和part-r-000001

wzy@s100:~$ hdfs dfs -ls /user/wzy/hadoop/poutput/
Found 3 items
-rw-r--r--   3 wzy supergroup          0 2020-05-21 21:33 /user/wzy/hadoop/poutput/_SUCCESS
-rw-r--r--   3 wzy supergroup         25 2020-05-21 21:33 /user/wzy/hadoop/poutput/part-r-00000
-rw-r--r--   3 wzy supergroup         25 2020-05-21 21:33 /user/wzy/hadoop/poutput/part-r-00001
wzy@s100:~$ hdfs dfs -cat /user/wzy/hadoop/poutput/part-r-00000
2
4
6
8
10
12
14
16
18
wzy@s100:~$ hdfs dfs -cat /user/wzy/hadoop/poutput/part-r-00001
1
3
5
7
9
11
13
15
17
19

case2

job.setNumReduceTasks(2)
,分区数小于numReduceTasks。输出文件夹poutput1下会包含两个空文件part-r-00002和part-r-000003

wzy@s100:~$ hdfs dfs -ls /user/wzy/hadoop/poutput1/
Found 5 items
-rw-r--r--   3 wzy supergroup          0 2020-05-21 21:37 /user/wzy/hadoop/poutput1/_SUCCESS
-rw-r--r--   3 wzy supergroup         25 2020-05-21 21:37 /user/wzy/hadoop/poutput1/part-r-00000
-rw-r--r--   3 wzy supergroup         25 2020-05-21 21:37 /user/wzy/hadoop/poutput1/part-r-00001
-rw-r--r--   3 wzy supergroup          0 2020-05-21 21:37 /user/wzy/hadoop/poutput1/part-r-00002
-rw-r--r--   3 wzy supergroup          0 2020-05-21 21:37 /user/wzy/hadoop/poutput1/part-r-00003
wzy@s100:~$ hdfs dfs -cat /user/wzy/hadoop/poutput1/part-r-00002
wzy@s100:~$ hdfs dfs -cat /user/wzy/hadoop/poutput1/part-r-00003

case3

如果将分区数变为3且 job.setNumReduceTasks(2)

public int getPartition(IntWritable key, NullWritable value, int numReduceTasks) {
if (key.get() % 3 == 0)
return 0;
else if (key.get() % 3 == 0)
return 1;
else
return 2;
}

1<numReduceTasks<分区数,则会报异常 java.lang.Exception: java.io.IOException: Illegal partition for 1 (2)

case4

如果将分区数变为3且 job.setNumReduceTasks(1)
,则输出结果文件只有part-r-00000,包含数字1-19。

wzy@s100:~$ hdfs dfs -ls /user/wzy/hadoop/poutput2/
Found 2 items
-rw-r--r--   3 wzy supergroup          0 2020-05-21 21:45 /user/wzy/hadoop/poutput2/_SUCCESS
-rw-r--r--   3 wzy supergroup         50 2020-05-21 21:45 /user/wzy/hadoop/poutput2/part-r-00000

case5

如果将分区数变为3且 job.setNumReduceTasks(0)
,输入文件包含test.txt(数字0-19)和test1.txt(数字20-39),则输出结果文件夹包含part-m-00000和part-m-000001,分别为2个MapTask的输出。

wzy@s100:~$ hdfs dfs -ls /user/wzy/hadoop/poutput3
Found 3 items
-rw-r--r--   3 wzy supergroup          0 2020-05-21 21:50 /user/wzy/hadoop/poutput3/_SUCCESS
-rw-r--r--   3 wzy supergroup         60 2020-05-21 21:50 /user/wzy/hadoop/poutput3/part-m-00000
-rw-r--r--   3 wzy supergroup         50 2020-05-21 21:50 /user/wzy/hadoop/poutput3/part-m-00001
wzy@s100:~$ hdfs dfs -cat /user/wzy/hadoop/poutput3/part-r-00000
cat: `/user/wzy/hadoop/poutput3/part-r-00000': No such file or directory
wzy@s100:~$ hdfs dfs -cat /user/wzy/hadoop/poutput3/part-m-00000
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
wzy@s100:~$ hdfs dfs -cat /user/wzy/hadoop/poutput3/part-m-00001
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

排序

shuffle过程会将数据按照key排序。在map端,环形内存缓冲区达到一定阈值时,会对数据进行全排序,溢出到临时文件,MapTask结束时,会将所有的磁盘文件进行归并排序。在reduce端,ReduceTask会统一对内存和磁盘上的数据进行归并排序。

因此,如果使用自定义的key,需要实现 WritableComparable
接口

串行化和Comparable

序列化就是把内存中的对象,转换为字节序列(或其它数据传输协议),以便存储到磁盘(持久化)或是网络传输。反序列化就是将收到的字节序列(或其它数据传输协议)或是磁盘的持久化数据,转换成内存中的对象。hadoop的序列化需要以下几步,

实现 Writable
接口

import org.apache.hadoop.io.Writable;
public class xxx implements Writable {}

反序列化时,需要反射调用空参构造函数,所以必须有空参构造

public xxx() {
super();
}

重写序列化方法

import java.io.DataOutput;
@Override
public void write(DataOutput out) throws IOException {}

重写反序列化方法, 注意反序列化的顺序和序列化的顺序完全一致

import java.io.DataInput;
@Override
public void readFields(DataInput in) throws IOException {}

如果需要将自定义的class放在key中传输,则需要实现 WritableComparable
接口,因为MapReduce的Shuffle过程要求key必须能排序。实现 WritableComparable
接口,不仅要重写序列化和反序列化方法,还要重写 compareTo
方法,

public class xxx implements WritableComparable {
@Override
public int compareTo(Object o) {
return 0;
}
}

case

自定义类Bean实现WritableComparable接口,完成二次排序

public class Bean implements WritableComparable<Bean> {
private int a;
private int b;
@Override
public int compareTo(Bean o) {
int res = Integer.valueOf(a).compareTo(o.getA());
if (res == 0)
return Integer.valueOf(b).compareTo(o.getB());
return res;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(a);
out.writeInt(b);
}
@Override
public void readFields(DataInput in) throws IOException {
a = in.readInt();
b = in.readInt();
}
@Override
public String toString() {
return getA() + " " + getB();
}
}

查看输出,

wzy@s100:~$ hdfs dfs -cat /user/wzy/hadoop/output4/part-r-00000
1 2
1 3
1 5
2 2
2 3
3 1
3 5
4 1
4 2
4 5
4 7

Combiner

Combiner
继承自 Reducer
类,它的作用是对每一个MapTask的输出进行局部汇总,从而减小网络传输量。Combiner的使用不能改变最终的业务逻辑,且Combiner的输出key-value要对应Reducer的输入key-value。

自定义Combiner

public class xxxx extends Reducer<xx, xx, xx, xx>{
@Override
protected void reduce(xx key, Iterable<xx> values,Context context) throws IOException, InterruptedException {
}
}

使用Combiner, job.setCombinerClass(xxx.class)

GroupingComparator

CroupingComparator
可以对reduce端的数据按key分组。

定义分组规则为:按照Bean的a值分组

public class BeanSortGroupingComparator extends WritableComparator {
public  BeanSortGroupingComparator() {
super(Bean.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) { // 根据Bean的a分组
Bean aBean = (Bean)a;
Bean bBean = (Bean)b;
if (aBean.getA() > aBean.getA())
return 1;
else if (aBean.getA() < bBean.getA())
return -1;
else
return 0;
}
}

使用GroupingComparator, job.setGroupingComparatorClass(BeanSortGroupingComparator.class)

输出为,

wzy@s100:~$ hdfs dfs -cat /user/wzy/hadoop/output5/part-r-00000
1 2
2 2
3 1
4 1

微信扫一扫,分享到朋友圈

shuffle过程中的分区,排序和Combiner

外媒:亚马逊在美仓库爆发大规模疫情,超100名员工感染

上一篇

解决K8s “kernel:unregister_netdevice:waiting for vethxxx to become free. Usage count=1” 的问...

下一篇

你也可能喜欢

shuffle过程中的分区,排序和Combiner

长按储存图像,分享给朋友