MapReduce-多路径输出

mapreduce中实现多路径输出主要使用MulitipleOutputs类

通过两个例子可以掌握

输入样例 mulitipleInput.txt

file1 001
file2 002
file3 003
file2 004
file1 005
file1 006
file3 007
复制代码

输出:

file1和file3开头的记录归到一个文件下

file2和file3开头的记录归到一个文件下

代码

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MultipleOutputsExample extends Configured implements Tool{
public static class MultipleMapper extends Mapper<Object, Text, Text, NullWritable>{
private MultipleOutputs<Text, NullWritable> mos;
@Override
protected void setup(Mapper<Object, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
mos = new MultipleOutputs<Text, NullWritable>(context);
}
@Override
protected void map(Object key, Text value, Mapper<Object, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
String[] infos = value.toString().split(" ");
if(infos[0].equals("file1")){
mos.write("file1", value, NullWritable.get());
}else if (infos[0].equals("file2")) {
mos.write("file2", value, NullWritable.get());
} else {
mos.write("file1", value, NullWritable.get());
mos.write("file2", value, NullWritable.get());
}
}
@Override
protected void cleanup(Mapper<Object, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
mos.close();
}
}
@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2){
System.err.println("Usage: Data Deduplication <in> <out>");
System.exit(2);
}
Job job = Job.getInstance(conf);
job.setJarByClass(MultipleOutputsExample.class);
job.setMapperClass(MultipleMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
MultipleOutputs.addNamedOutput(job, "file1", TextOutputFormat.class, Text.class, NullWritable.class);
MultipleOutputs.addNamedOutput(job, "file2", TextOutputFormat.class, Text.class, NullWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
return job.waitForCompletion(true)? 0:1;
}
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new MultipleOutputsExample(), args));
}
}
复制代码

结果

$ hadoop fs -cat /user/test/wordTest/mulitipleOutput/file1-m-00000
file1 001
file3 003
file1 005
file1 006
file3 007
$ hadoop fs -cat /user/test/wordTest/mulitipleOutput/file2-m-00000
file2 002
file3 003
file2 004
file3 007
file2 008
复制代码

如果想把file1和file2的内容放入不同的目录下,可以通过指定baseOutputPath,将file1开头的文件放在同一个目录中管理。
mos.write("file1", value, NullWritable.get());
mos.write("file2", value, NullWritable.get());
改为 mos.write("file1", value, NullWritable.get(),"file1/part");
mos.write("file2", value, NullWritable.get(),"file2/part");
mos.write(value, NullWritable.get(),"file1/part");
mos.write(value, NullWritable.get(),"file2/part");
可以看到输出结果

$ hadoop fs -ls /user/test/wordTest/mulitipleOutput
Found 4 items
-rw-r--r--   3 hdfs hdfs          0 2018-06-30 16:18 /user/test/wordTest/mulitipleOutput/_SUCCESS
drwxr-xr-x   - hdfs hdfs          0 2018-06-30 16:18 /user/test/wordTest/mulitipleOutput/file1
drwxr-xr-x   - hdfs hdfs          0 2018-06-30 16:18 /user/test/wordTest/mulitipleOutput/file2
-rw-r--r--   3 hdfs hdfs          0 2018-06-30 16:18 /user/test/wordTest/mulitipleOutput/part-r-00000
复制代码

指定baseOutputPath输出路径和输出文件名直接按照baseOutPutPath指定,但是默认输出文件名后缀会跟上-r-00000,如果想更改可以继承FileOutputFormat重写RecordWriter实现。

稀土掘金
我还没有学会写个人说明!
上一篇

K55是一款 Payload注入工具,该工具可以向正在运行的进程注入x86_64 shellcode Payload。

下一篇

使用docker部署简单的项目

你也可能喜欢

评论已经被关闭。

插入图片