Hadoop MapReduce多輸出詳細(xì)介紹
Hadoop MapReduce多輸出
FileOutputFormat及其子類產(chǎn)生的文件放在輸出目錄下。每個(gè)reducer一個(gè)文件并且文件由分區(qū)號(hào)命名:part-r-00000,part-r-00001,等等。有時(shí)可能要對(duì)輸出的文件名進(jìn)行控制或讓每個(gè)reducer輸出多個(gè)文件。MapReduce為此提供了MultipleOutputFormat類。
MultipleOutputFormat類可以將數(shù)據(jù)寫到多個(gè)文件,這些文件的名稱源于輸出的鍵和值或者任意字符串。這允許每個(gè)reducer(或者只有map作業(yè)的mapper)創(chuàng)建多個(gè)文件。采用name-r-nnnnn形式的文件名用于map輸出,name-r-nnnnn形式的文件名用于reduce輸出,其中name是由程序設(shè)定的任意名字,nnnnn是一個(gè)指名塊號(hào)的整數(shù)(從0開始)。塊號(hào)保證從不同塊(mapper或者reducer)寫的輸出在相同名字情況下不會(huì)沖突。
1. 重定義輸出文件名
我們可以對(duì)輸出的文件名進(jìn)行控制??紤]這樣一個(gè)需求:按男女性別來區(qū)分度假訂單數(shù)據(jù)。這需要運(yùn)行一個(gè)作業(yè),作業(yè)的輸出是男女各一個(gè)文件,此文件包含男女性別的所有數(shù)據(jù)記錄。
這個(gè)需求可以使用MultipleOutputs來實(shí)現(xiàn):
package com.sjf.open.test; import java.io.IOException; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.JobPriority; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import com.sjf.open.utils.ConfigUtil; /** * Created by xiaosi on 16-11-7. */ public class VacationOrderBySex extends Configured implements Tool { public static void main(String[] args) throws Exception { int status = ToolRunner.run(new VacationOrderBySex(), args); System.exit(status); } public static class VacationOrderBySexMapper extends Mapper<LongWritable, Text, Text, Text> { public String fInputPath = ""; @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); fInputPath = ((FileSplit) context.getInputSplit()).getPath().toString(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); if(fInputPath.contains("vacation_hot_country_order")){ String[] params = line.split("\t"); String sex = params[2]; if(StringUtils.isBlank(sex)){ return; } context.write(new Text(sex.toLowerCase()), value); } } } public static class VacationOrderBySexReducer extends Reducer<Text, Text, NullWritable, Text> { private MultipleOutputs<NullWritable, Text> multipleOutputs; @Override protected void setup(Context context) throws IOException, InterruptedException { multipleOutputs = new MultipleOutputs<NullWritable, Text>(context); } @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value : values) { multipleOutputs.write(NullWritable.get(), value, key.toString()); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { multipleOutputs.close(); } } @Override public int run(String[] args) throws Exception { if (args.length != 2) { System.err.println("./run <input> <output>"); System.exit(1); } String inputPath = args[0]; String outputPath = args[1]; int numReduceTasks = 16; Configuration conf = this.getConf(); conf.setBoolean("mapred.output.compress", true); conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class); Job job = Job.getInstance(conf); job.setJobName("vacation_order_by_jifeng.si"); job.setJarByClass(VacationOrderBySex.class); job.setMapperClass(VacationOrderBySexMapper.class); job.setReducerClass(VacationOrderBySexReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, new Path(outputPath)); job.setNumReduceTasks(numReduceTasks); boolean success = job.waitForCompletion(true); return success ? 0 : 1; } }
在生成輸出的reduce中,在setup()方法中構(gòu)造一個(gè)MultipleOutputs的實(shí)例并將它賦予一個(gè)實(shí)例變量。在reduce()方法中使用MultipleOutputs實(shí)例來寫輸出,而不是context。write()方法作用于鍵,值和名字。這里使用的是性別作為名字,因此最后產(chǎn)生的輸出名稱的形式為sex-r-nnnnn:
-rw-r--r-- 3 wirelessdev wirelessdev 0 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/_SUCCESS -rw-r--r-- 3 wirelessdev wirelessdev 88574 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/f-r-00005.gz -rw-r--r-- 3 wirelessdev wirelessdev 60965 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/m-r-00012.gz -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00000.gz -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00001.gz -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00002.gz -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00003.gz -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00004.gz -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00005.gz -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00006.gz -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00007.gz -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00008.gz
我們可以看到在輸出文件中不僅有我們想要的輸出文件類型,還有part-r-nnnnn形式的文件,但是文件內(nèi)沒有信息,這是程序默認(rèn)的輸出文件。所以我們?cè)谥付ㄝ敵鑫募Q時(shí)(name-r-nnnnn),不要指定name為part,因?yàn)樗呀?jīng)被使用為默認(rèn)值了。
2. 多目錄輸出
在MultipleOutputs的write()方法中指定的基本路徑相對(duì)于輸出路徑進(jìn)行解釋,因?yàn)樗梢园募窂椒指舴?),創(chuàng)建任意深度的子目錄。例如,我們改動(dòng)上面的需求:按男女性別來區(qū)分度假訂單數(shù)據(jù),不同性別數(shù)據(jù)位于不同子目錄(例如:sex=f/part-r-00000)。
public static class VacationOrderBySexReducer extends Reducer<Text, Text, NullWritable, Text> { private MultipleOutputs<NullWritable, Text> multipleOutputs; @Override protected void setup(Context context) throws IOException, InterruptedException { multipleOutputs = new MultipleOutputs<NullWritable, Text>(context); } @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value : values) { String basePath = String.format("sex=%s/part", key.toString()); multipleOutputs.write(NullWritable.get(), value, basePath); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { multipleOutputs.close(); } }
后產(chǎn)生的輸出名稱的形式為sex=f/part-r-nnnnn或者sex=m/part-r-nnnnn:
-rw-r--r-- 3 wirelessdev wirelessdev 0 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/_SUCCESS -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00000.gz -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00001.gz -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00002.gz -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00003.gz -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00004.gz -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00005.gz -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00006.gz -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00007.gz drwxr-xr-x - wirelessdev wirelessdev 0 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/sex=f drwxr-xr-x - wirelessdev wirelessdev 0 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/sex=m
3. 延遲輸出
FileOutputFormat的子類會(huì)產(chǎn)生輸出文件(part-r-nnnnn),即使文件是空的,也會(huì)產(chǎn)生。我們有時(shí)候不想要這些空的文件,我們可以使用LazyOutputFormat進(jìn)行處理。它是一個(gè)封裝輸出格式,可以指定分區(qū)第一條記錄輸出時(shí)才真正創(chuàng)建文件。要使用它,用JobConf和相關(guān)輸出格式作為參數(shù)來調(diào)用setOutputFormatClass()方法即可:
Configuration conf = this.getConf(); Job job = Job.getInstance(conf); LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
再次檢查一下我們的輸出文件(第一個(gè)例子):
sudo -uwirelessdev hadoop fs -ls tmp/data_group/order/vacation_hot_country_order_by_sex/ Found 3 items -rw-r--r-- 3 wirelessdev wirelessdev 0 2016-12-06 13:36 tmp/data_group/order/vacation_hot_country_order_by_sex/_SUCCESS -rw-r--r-- 3 wirelessdev wirelessdev 88574 2016-12-06 13:36 tmp/data_group/order/vacation_hot_country_order_by_sex/f-r-00005.gz -rw-r--r-- 3 wirelessdev wirelessdev 60965 2016-12-06 13:36 tmp/data_group/order/vacation_hot_country_order_by_sex/m-r-00012.gz
感謝閱讀,希望能幫助到大家,謝謝大家對(duì)本站的支持!
相關(guān)文章
rsync相對(duì)于其他同步工具的優(yōu)缺點(diǎn)介紹
rsync是linux與windows下常用的同步工具,以前的版本都是免費(fèi)的,配置也不復(fù)雜,主要是當(dāng)前確實(shí)沒有多好的替代工具,我自己也是一直用的這個(gè)工具,剛好看到這篇文字特分享下2013-11-11SSH端口轉(zhuǎn)發(fā),本地端口轉(zhuǎn)發(fā),遠(yuǎn)程端口轉(zhuǎn)發(fā),動(dòng)態(tài)端口轉(zhuǎn)發(fā)詳解
本文為大家詳細(xì)介紹了SSH端口轉(zhuǎn)發(fā),本地端口轉(zhuǎn)發(fā),遠(yuǎn)程端口轉(zhuǎn)發(fā),動(dòng)態(tài)端口轉(zhuǎn)發(fā)等相關(guān)知識(shí)2018-10-10通過IBM 3650 M2服務(wù)器的ServerGuide工具配置RAID圖文教程
這篇文章主要介紹了通過IBM 3650 M2服務(wù)器的ServerGuide工具配置RAID圖文教程,需要的朋友可以參考下2018-05-05數(shù)據(jù)庫數(shù)據(jù)同步常用的5種實(shí)施方案
本文將探討幾種常見的數(shù)據(jù)同步方案,涵蓋了數(shù)據(jù)庫主從同步、數(shù)據(jù)遷移同步和數(shù)據(jù)實(shí)時(shí)同步,通過深入了解各種方案的特點(diǎn)、優(yōu)勢(shì)和局限性,我們可以更好地選擇和定制適合特定業(yè)務(wù)場(chǎng)景的數(shù)據(jù)同步策略,為構(gòu)建高效、穩(wěn)定、可擴(kuò)展的系統(tǒng)奠定基礎(chǔ)2024-06-06