通用MapReduce程序復(fù)制HBase表數(shù)據(jù)
編寫MR程序,讓其可以適合大部分的HBase表數(shù)據(jù)導(dǎo)入到HBase表數(shù)據(jù)。其中包括可以設(shè)置版本數(shù)、可以設(shè)置輸入表的列導(dǎo)入設(shè)置(選取其中某幾列)、可以設(shè)置輸出表的列導(dǎo)出設(shè)置(選取其中某幾列)。
原始表test1數(shù)據(jù)如下:
每個(gè)row key都有兩個(gè)版本的數(shù)據(jù),這里只顯示了row key為1的數(shù)據(jù)
在hbase shell 中創(chuàng)建數(shù)據(jù)表:
create 'test2',{NAME => 'cf1',VERSIONS => 10} // 保存無版本、無列導(dǎo)入設(shè)置、無列導(dǎo)出設(shè)置的數(shù)據(jù) create 'test3',{NAME => 'cf1',VERSIONS => 10} // 保存無版本、無列導(dǎo)入設(shè)置、有列導(dǎo)出設(shè)置的數(shù)據(jù) create 'test4',{NAME => 'cf1',VERSIONS => 10} // 保存無版本、有列導(dǎo)入設(shè)置、無列導(dǎo)出設(shè)置的數(shù)據(jù) create 'test5',{NAME => 'cf1',VERSIONS => 10} // 保存有版本、無列導(dǎo)入設(shè)置、無列導(dǎo)出設(shè)置的數(shù)據(jù) create 'test6',{NAME => 'cf1',VERSIONS => 10} // 保存有版本、無列導(dǎo)入設(shè)置、有列導(dǎo)出設(shè)置的數(shù)據(jù) create 'test7',{NAME => 'cf1',VERSIONS => 10} // 保存有版本、有列導(dǎo)入設(shè)置、無列導(dǎo)出設(shè)置的數(shù)據(jù) create 'test8',{NAME => 'cf1',VERSIONS => 10} // 保存有版本、有列導(dǎo)入設(shè)置、有列導(dǎo)出設(shè)置的數(shù)據(jù)
main函數(shù)入口:
package GeneralHBaseToHBase; import org.apache.hadoop.util.ToolRunner; public class DriverTest { public static void main(String[] args) throws Exception { // 無版本設(shè)置、無列導(dǎo)入設(shè)置,無列導(dǎo)出設(shè)置 String[] myArgs1= new String[]{ "test1", // 輸入表 "test2", // 輸出表 "0", // 版本大小數(shù),如果值為0,則為默認(rèn)從輸入表導(dǎo)出最新的數(shù)據(jù)到輸出表 "-1", // 列導(dǎo)入設(shè)置,如果為-1 ,則沒有設(shè)置列導(dǎo)入 "-1" // 列導(dǎo)出設(shè)置,如果為-1,則沒有設(shè)置列導(dǎo)出 }; ToolRunner.run(HBaseDriver.getConfiguration(), new HBaseDriver(), myArgs1); // 無版本設(shè)置、有列導(dǎo)入設(shè)置,無列導(dǎo)出設(shè)置 String[] myArgs2= new String[]{ "test1", "test3", "0", "cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14", "-1" }; ToolRunner.run(HBaseDriver.getConfiguration(), new HBaseDriver(), myArgs2); // 無版本設(shè)置,無列導(dǎo)入設(shè)置,有列導(dǎo)出設(shè)置 String[] myArgs3= new String[]{ "test1", "test4", "0", "-1", "cf1:c1,cf1:c10,cf1:c14" }; ToolRunner.run(HBaseDriver.getConfiguration(), new HBaseDriver(), myArgs3); // 有版本設(shè)置,無列導(dǎo)入設(shè)置,無列導(dǎo)出設(shè)置 String[] myArgs4= new String[]{ "test1", "test5", "2", "-1", "-1" }; ToolRunner.run(HBaseDriver.getConfiguration(), new HBaseDriver(), myArgs4); // 有版本設(shè)置、有列導(dǎo)入設(shè)置,無列導(dǎo)出設(shè)置 String[] myArgs5= new String[]{ "test1", "test6", "2", "cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14", "-1" }; ToolRunner.run(HBaseDriver.getConfiguration(), new HBaseDriver(), myArgs5); // 有版本設(shè)置、無列導(dǎo)入設(shè)置,有列導(dǎo)出設(shè)置 String[] myArgs6= new String[]{ "test1", "test7", "2", "-1", "cf1:c1,cf1:c10,cf1:c14" }; ToolRunner.run(HBaseDriver.getConfiguration(), new HBaseDriver(), myArgs6); // 有版本設(shè)置、有列導(dǎo)入設(shè)置,有列導(dǎo)出設(shè)置 String[] myArgs7= new String[]{ "test1", "test8", "2", "cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14", "cf1:c1,cf1:c10,cf1:c14" }; ToolRunner.run(HBaseDriver.getConfiguration(), new HBaseDriver(), myArgs7); } }
driver:
package GeneralHBaseToHBase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import util.JarUtil; public class HBaseDriver extends Configured implements Tool{ public static String FROMTABLE=""; //導(dǎo)入表 public static String TOTABLE=""; //導(dǎo)出表 public static String SETVERSION=""; //是否設(shè)置版本 // args => {FromTable,ToTable,SetVersion,ColumnFromTable,ColumnToTable} @Override public int run(String[] args) throws Exception { if(args.length!=5){ System.err.println("Usage:\n demo.job.HBaseDriver <input> <inputTable> " + "<output> <outputTable>" +"< versions >" + " <set columns from inputTable> like <cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14> or <-1> " + "<set columns from outputTable> like <cf1:c1,cf1:c10,cf1:c14> or <-1>"); return -1; } Configuration conf = getConf(); FROMTABLE = args[0]; TOTABLE = args[1]; SETVERSION = args[2]; conf.set("SETVERSION", SETVERSION); if(!args[3].equals("-1")){ conf.set("COLUMNFROMTABLE", args[3]); } if(!args[4].equals("-1")){ conf.set("COLUMNTOTABLE", args[4]); } String jobName ="From table "+FROMTABLE+ " ,Import to "+ TOTABLE; Job job = Job.getInstance(conf, jobName); job.setJarByClass(HBaseDriver.class); Scan scan = new Scan(); // 判斷是否需要設(shè)置版本 if(SETVERSION != "0" || SETVERSION != "1"){ scan.setMaxVersions(Integer.parseInt(SETVERSION)); } // 設(shè)置HBase表輸入:表名、scan、Mapper類、mapper輸出鍵類型、mapper輸出值類型 TableMapReduceUtil.initTableMapperJob( FROMTABLE, scan, HBaseToHBaseMapper.class, ImmutableBytesWritable.class, Put.class, job); // 設(shè)置HBase表輸出:表名,reducer類 TableMapReduceUtil.initTableReducerJob(TOTABLE, null, job); // 沒有 reducers, 直接寫入到 輸出文件 job.setNumReduceTasks(0); return job.waitForCompletion(true) ? 0 : 1; } private static Configuration configuration; public static Configuration getConfiguration(){ if(configuration==null){ /** * TODO 了解如何直接從Windows提交代碼到Hadoop集群 * 并修改其中的配置為實(shí)際配置 */ configuration = new Configuration(); configuration.setBoolean("mapreduce.app-submission.cross-platform", true);// 配置使用跨平臺(tái)提交任務(wù) configuration.set("fs.defaultFS", "hdfs://master:8020");// 指定namenode configuration.set("mapreduce.framework.name", "yarn"); // 指定使用yarn框架 configuration.set("yarn.resourcemanager.address", "master:8032"); // 指定resourcemanager configuration.set("yarn.resourcemanager.scheduler.address", "master:8030");// 指定資源分配器 configuration.set("mapreduce.jobhistory.address", "master:10020");// 指定historyserver configuration.set("hbase.master", "master:16000"); configuration.set("hbase.rootdir", "hdfs://master:8020/hbase"); configuration.set("hbase.zookeeper.quorum", "slave1,slave2,slave3"); configuration.set("hbase.zookeeper.property.clientPort", "2181"); //TODO 需export->jar file ; 設(shè)置正確的jar包所在位置 configuration.set("mapreduce.job.jar",JarUtil.jar(HBaseDriver.class));// 設(shè)置jar包路徑 } return configuration; } }
mapper:
package GeneralHBaseToHBase; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.Map.Entry; import java.util.NavigableMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class HBaseToHBaseMapper extends TableMapper<ImmutableBytesWritable, Put> { Logger log = LoggerFactory.getLogger(HBaseToHBaseMapper.class); private static int versionNum = 0; private static String[] columnFromTable = null; private static String[] columnToTable = null; private static String column1 = null; private static String column2 = null; @Override protected void setup(Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); versionNum = Integer.parseInt(conf.get("SETVERSION", "0")); column1 = conf.get("COLUMNFROMTABLE",null); if(!(column1 == null)){ columnFromTable = column1.split(","); } column2 = conf.get("COLUMNTOTABLE",null); if(!(column2 == null)){ columnToTable = column2.split(","); } } @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { context.write(key, resultToPut(key,value)); } /*** * 把key,value轉(zhuǎn)換為Put * @param key * @param value * @return * @throws IOException */ private Put resultToPut(ImmutableBytesWritable key, Result value) throws IOException { HashMap<String, String> fTableMap = new HashMap<>(); HashMap<String, String> tTableMap = new HashMap<>(); Put put = new Put(key.get()); if(! (columnFromTable == null || columnFromTable.length == 0)){ fTableMap = getFamilyAndColumn(columnFromTable); } if(! (columnToTable == null || columnToTable.length == 0)){ tTableMap = getFamilyAndColumn(columnToTable); } if(versionNum==0){ if(fTableMap.size() == 0){ if(tTableMap.size() == 0){ for (Cell kv : value.rawCells()) { put.add(kv); // 沒有設(shè)置版本,沒有設(shè)置列導(dǎo)入,沒有設(shè)置列導(dǎo)出 } return put; } else{ return getPut(put, value, tTableMap); // 無版本、無列導(dǎo)入、有列導(dǎo)出 } } else { if(tTableMap.size() == 0){ return getPut(put, value, fTableMap);// 無版本、有列導(dǎo)入、無列導(dǎo)出 } else { return getPut(put, value, tTableMap);// 無版本、有列導(dǎo)入、有列導(dǎo)出 } } } else{ if(fTableMap.size() == 0){ if(tTableMap.size() == 0){ return getPut1(put, value); // 有版本,無列導(dǎo)入,無列導(dǎo)出 }else{ return getPut2(put, value, tTableMap); //有版本,無列導(dǎo)入,有列導(dǎo)出 } }else{ if(tTableMap.size() == 0){ return getPut2(put,value,fTableMap);// 有版本,有列導(dǎo)入,無列導(dǎo)出 }else{ return getPut2(put,value,tTableMap); // 有版本,有列導(dǎo)入,有列導(dǎo)出 } } } } /*** * 無版本設(shè)置的情況下,對(duì)于有列導(dǎo)入或者列導(dǎo)出 * @param put * @param value * @param tableMap * @return * @throws IOException */ private Put getPut(Put put,Result value,HashMap<String, String> tableMap) throws IOException{ for(Cell kv : value.rawCells()){ byte[] family = kv.getFamily(); if(tableMap.containsKey(new String(family))){ String columnStr = tableMap.get(new String(family)); ArrayList<String> columnBy = toByte(columnStr); if(columnBy.contains(new String(kv.getQualifier()))){ put.add(kv); //沒有設(shè)置版本,沒有設(shè)置列導(dǎo)入,有設(shè)置列導(dǎo)出 } } } return put; } /*** * (有版本,無列導(dǎo)入,有列導(dǎo)出)或者(有版本,有列導(dǎo)入,無列導(dǎo)出) * @param put * @param value * @param tTableMap * @return */ private Put getPut2(Put put,Result value,HashMap<String, String> tableMap){ NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> map=value.getMap(); for(byte[] family:map.keySet()){ if(tableMap.containsKey(new String(family))){ String columnStr = tableMap.get(new String(family)); log.info("@@@@@@@@@@@"+new String(family)+" "+columnStr); ArrayList<String> columnBy = toByte(columnStr); NavigableMap<byte[], NavigableMap<Long, byte[]>> familyMap = map.get(family);//列簇作為key獲取其中的列相關(guān)數(shù)據(jù) for(byte[] column:familyMap.keySet()){ //根據(jù)列名循壞 log.info("!!!!!!!!!!!"+new String(column)); if(columnBy.contains(new String(column))){ NavigableMap<Long, byte[]> valuesMap = familyMap.get(column); for(Entry<Long, byte[]> s:valuesMap.entrySet()){//獲取列對(duì)應(yīng)的不同版本數(shù)據(jù),默認(rèn)最新的一個(gè) System.out.println("***:"+new String(family)+" "+new String(column)+" "+s.getKey()+" "+new String(s.getValue())); put.addColumn(family, column, s.getKey(),s.getValue()); } } } } } return put; } /*** * 有版本、無列導(dǎo)入、無列導(dǎo)出 * @param put * @param value * @return */ private Put getPut1(Put put,Result value){ NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> map=value.getMap(); for(byte[] family:map.keySet()){ NavigableMap<byte[], NavigableMap<Long, byte[]>> familyMap = map.get(family);//列簇作為key獲取其中的列相關(guān)數(shù)據(jù) for(byte[] column:familyMap.keySet()){ //根據(jù)列名循壞 NavigableMap<Long, byte[]> valuesMap = familyMap.get(column); for(Entry<Long, byte[]> s:valuesMap.entrySet()){ //獲取列對(duì)應(yīng)的不同版本數(shù)據(jù),默認(rèn)最新的一個(gè) put.addColumn(family, column, s.getKey(),s.getValue()); } } } return put; } // str => {"cf1:c1","cf1:c2","cf1:c10","cf1:c11","cf1:c14"} /*** * 得到列簇名與列名的k,v形式的map * @param str => {"cf1:c1","cf1:c2","cf1:c10","cf1:c11","cf1:c14"} * @return map => {"cf1" => "c1,c2,c10,c11,c14"} */ private static HashMap<String, String> getFamilyAndColumn(String[] str){ HashMap<String, String> map = new HashMap<>(); HashSet<String> set = new HashSet<>(); for(String s : str){ set.add(s.split(":")[0]); } Object[] ob = set.toArray(); for(int i=0; i<ob.length;i++){ String family = String.valueOf(ob[i]); String columns = ""; for(int j=0;j < str.length;j++){ if(family.equals(str[j].split(":")[0])){ columns += str[j].split(":")[1]+","; } } map.put(family, columns.substring(0, columns.length()-1)); } return map; } private static ArrayList<String> toByte(String s){ ArrayList<String> b = new ArrayList<>(); String[] sarr = s.split(","); for(int i=0;i<sarr.length;i++){ b.add(sarr[i]); } return b; } }
程序運(yùn)行完之后,在hbase shell中查看每個(gè)表,看是否數(shù)據(jù)導(dǎo)入正確:
test2:(無版本、無列導(dǎo)入設(shè)置、無列導(dǎo)出設(shè)置)
test3 (無版本、有列導(dǎo)入設(shè)置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、無列導(dǎo)出設(shè)置)
test4(無版本、無列導(dǎo)入設(shè)置、有列導(dǎo)出設(shè)置("cf1:c1,cf1:c10,cf1:c14"))
test5(有版本、無列導(dǎo)入設(shè)置、無列導(dǎo)出設(shè)置)
test6(有版本、有列導(dǎo)入設(shè)置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、無列導(dǎo)出設(shè)置)
test7(有版本、無列導(dǎo)入設(shè)置、有列導(dǎo)出設(shè)置("cf1:c1,cf1:c10,cf1:c14"))
test8(有版本、有列導(dǎo)入設(shè)置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、有列導(dǎo)出設(shè)置("cf1:c1,cf1:c10,cf1:c14"))
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
mongodb官方的golang驅(qū)動(dòng)基礎(chǔ)使用教程分享
這篇文章主要給大家介紹了關(guān)于mongodb官方的golang驅(qū)動(dòng)基礎(chǔ)使用的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用mongodb具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2018-12-12Mongodb 3.2.9開啟用戶權(quán)限認(rèn)證問題的步驟詳解
這篇文章主要給大家介紹了關(guān)于Mongodb 3.2.9開啟用戶權(quán)限認(rèn)證問題的詳細(xì)步驟,通過開啟權(quán)限認(rèn)證,會(huì)對(duì)大家的Mongodb更加保護(hù)的安全些,文中將步驟介紹的非常詳細(xì),需要的朋友可以參考借鑒,下面來一起看看吧。2017-08-08MongoDB添加secondary節(jié)點(diǎn)的2種方法詳解
這篇文章主要給大家總結(jié)介紹了關(guān)于MongoDB添加secondary節(jié)點(diǎn)的2種方法,以及MongoDB secondary節(jié)點(diǎn)出現(xiàn)recovering狀態(tài)的解決方法,文中介紹的非常詳細(xì),需要的朋友可以參考下2018-10-10利用mongodb查詢某坐標(biāo)是否在規(guī)定多邊形區(qū)域內(nèi)的方法
這篇文章主要介紹了利用mongodb如何查詢某坐標(biāo)是否在固定多邊形區(qū)域內(nèi)的方法,文中給出了詳細(xì)示例代碼,相信對(duì)大家具有一定的參考價(jià)值,需要的朋友們下面來一起看看吧。2017-02-02MongoDB快速入門筆記(二)之MongoDB的概念及簡(jiǎn)單操作
MongoDB是面向集合的文檔式數(shù)據(jù)庫(kù),不像關(guān)系數(shù)據(jù)庫(kù)那樣,有表,列、行,mongoDB數(shù)據(jù)庫(kù)則是由一系列的文檔組成。接下來通過本文給大家介紹MongoDB的概念及簡(jiǎn)單操作,一起看看吧2016-06-06解決MongoDB?位置查詢報(bào)錯(cuò)planner?returned?error:?unable?to?find
這篇文章主要介紹了MongoDB位置查詢報(bào)錯(cuò)planner?returned?error:?unable?to?find?index?for?$geoNear?query的解決方案,需要的朋友可以參考下2023-08-08