亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

通用MapReduce程序復(fù)制HBase表數(shù)據(jù)

 更新時(shí)間:2018年12月26日 15:03:20   作者:Angelababy_huan  
這篇文章主要為大家詳細(xì)介紹了通用MapReduce程序復(fù)制HBase表數(shù)據(jù),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下

編寫MR程序,讓其可以適合大部分的HBase表數(shù)據(jù)導(dǎo)入到HBase表數(shù)據(jù)。其中包括可以設(shè)置版本數(shù)、可以設(shè)置輸入表的列導(dǎo)入設(shè)置(選取其中某幾列)、可以設(shè)置輸出表的列導(dǎo)出設(shè)置(選取其中某幾列)。

原始表test1數(shù)據(jù)如下:

每個(gè)row key都有兩個(gè)版本的數(shù)據(jù),這里只顯示了row key為1的數(shù)據(jù)

 在hbase shell 中創(chuàng)建數(shù)據(jù)表:

create 'test2',{NAME => 'cf1',VERSIONS => 10}  // 保存無版本、無列導(dǎo)入設(shè)置、無列導(dǎo)出設(shè)置的數(shù)據(jù)
create 'test3',{NAME => 'cf1',VERSIONS => 10}  // 保存無版本、無列導(dǎo)入設(shè)置、有列導(dǎo)出設(shè)置的數(shù)據(jù)
create 'test4',{NAME => 'cf1',VERSIONS => 10}  // 保存無版本、有列導(dǎo)入設(shè)置、無列導(dǎo)出設(shè)置的數(shù)據(jù)
create 'test5',{NAME => 'cf1',VERSIONS => 10}  // 保存有版本、無列導(dǎo)入設(shè)置、無列導(dǎo)出設(shè)置的數(shù)據(jù)
create 'test6',{NAME => 'cf1',VERSIONS => 10}  // 保存有版本、無列導(dǎo)入設(shè)置、有列導(dǎo)出設(shè)置的數(shù)據(jù)
create 'test7',{NAME => 'cf1',VERSIONS => 10}  // 保存有版本、有列導(dǎo)入設(shè)置、無列導(dǎo)出設(shè)置的數(shù)據(jù)
create 'test8',{NAME => 'cf1',VERSIONS => 10}  // 保存有版本、有列導(dǎo)入設(shè)置、有列導(dǎo)出設(shè)置的數(shù)據(jù)

main函數(shù)入口:

package GeneralHBaseToHBase;
import org.apache.hadoop.util.ToolRunner;
public class DriverTest {
 public static void main(String[] args) throws Exception {
 // 無版本設(shè)置、無列導(dǎo)入設(shè)置,無列導(dǎo)出設(shè)置
 String[] myArgs1= new String[]{
 "test1", // 輸入表
 "test2", // 輸出表
 "0",  // 版本大小數(shù),如果值為0,則為默認(rèn)從輸入表導(dǎo)出最新的數(shù)據(jù)到輸出表
 "-1", // 列導(dǎo)入設(shè)置,如果為-1 ,則沒有設(shè)置列導(dǎo)入
 "-1" // 列導(dǎo)出設(shè)置,如果為-1,則沒有設(shè)置列導(dǎo)出
 }; 
 ToolRunner.run(HBaseDriver.getConfiguration(), 
 new HBaseDriver(),
 myArgs1);
 // 無版本設(shè)置、有列導(dǎo)入設(shè)置,無列導(dǎo)出設(shè)置
 String[] myArgs2= new String[]{
 "test1",
 "test3",
 "0",
 "cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14",
 "-1"
 };
 ToolRunner.run(HBaseDriver.getConfiguration(), 
 new HBaseDriver(),
 myArgs2);
 // 無版本設(shè)置,無列導(dǎo)入設(shè)置,有列導(dǎo)出設(shè)置
 String[] myArgs3= new String[]{
 "test1",
 "test4",
 "0",
 "-1",
 "cf1:c1,cf1:c10,cf1:c14"
 };
 ToolRunner.run(HBaseDriver.getConfiguration(), 
 new HBaseDriver(),
 myArgs3);
 // 有版本設(shè)置,無列導(dǎo)入設(shè)置,無列導(dǎo)出設(shè)置
 String[] myArgs4= new String[]{
 "test1",
 "test5",
 "2",
 "-1",
 "-1"
 };
 ToolRunner.run(HBaseDriver.getConfiguration(), 
 new HBaseDriver(),
 myArgs4);
 // 有版本設(shè)置、有列導(dǎo)入設(shè)置,無列導(dǎo)出設(shè)置
 String[] myArgs5= new String[]{
 "test1",
 "test6",
 "2",
 "cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14",
 "-1"
 };
 ToolRunner.run(HBaseDriver.getConfiguration(), 
 new HBaseDriver(),
 myArgs5);
 
 // 有版本設(shè)置、無列導(dǎo)入設(shè)置,有列導(dǎo)出設(shè)置
 String[] myArgs6= new String[]{
 "test1",
 "test7",
 "2",
 "-1",
 "cf1:c1,cf1:c10,cf1:c14"
 };
 ToolRunner.run(HBaseDriver.getConfiguration(), 
 new HBaseDriver(),
 myArgs6);
 // 有版本設(shè)置、有列導(dǎo)入設(shè)置,有列導(dǎo)出設(shè)置
 String[] myArgs7= new String[]{
 "test1",
 "test8",
 "2",
 "cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14",
 "cf1:c1,cf1:c10,cf1:c14"
 };
 ToolRunner.run(HBaseDriver.getConfiguration(), 
 new HBaseDriver(),
 myArgs7);
 }
 
}

driver:

package GeneralHBaseToHBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import util.JarUtil;
 
 
public class HBaseDriver extends Configured implements Tool{
 public static String FROMTABLE=""; //導(dǎo)入表
 public static String TOTABLE=""; //導(dǎo)出表
 public static String SETVERSION=""; //是否設(shè)置版本
 // args => {FromTable,ToTable,SetVersion,ColumnFromTable,ColumnToTable}
 @Override
 public int run(String[] args) throws Exception {
 if(args.length!=5){
 System.err.println("Usage:\n demo.job.HBaseDriver <input> <inputTable> "
  + "<output> <outputTable>"
  +"< versions >"
  + " <set columns from inputTable> like <cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14> or <-1> "
  + "<set columns from outputTable> like <cf1:c1,cf1:c10,cf1:c14> or <-1>");
 return -1;
 }
 Configuration conf = getConf();
 FROMTABLE = args[0];
 TOTABLE = args[1];
 SETVERSION = args[2];
 conf.set("SETVERSION", SETVERSION);
 if(!args[3].equals("-1")){
 conf.set("COLUMNFROMTABLE", args[3]);
 }
 if(!args[4].equals("-1")){
 conf.set("COLUMNTOTABLE", args[4]);
 }
 String jobName ="From table "+FROMTABLE+ " ,Import to "+ TOTABLE;
 Job job = Job.getInstance(conf, jobName);
 job.setJarByClass(HBaseDriver.class);
 Scan scan = new Scan();
 // 判斷是否需要設(shè)置版本
 if(SETVERSION != "0" || SETVERSION != "1"){
 scan.setMaxVersions(Integer.parseInt(SETVERSION));
 }
 // 設(shè)置HBase表輸入:表名、scan、Mapper類、mapper輸出鍵類型、mapper輸出值類型
 TableMapReduceUtil.initTableMapperJob(
 FROMTABLE, 
 scan, 
 HBaseToHBaseMapper.class, 
 ImmutableBytesWritable.class, 
 Put.class, 
 job);
 // 設(shè)置HBase表輸出:表名,reducer類
 TableMapReduceUtil.initTableReducerJob(TOTABLE, null, job);
 // 沒有 reducers, 直接寫入到 輸出文件
  job.setNumReduceTasks(0);
 
  return job.waitForCompletion(true) ? 0 : 1;
  
 }
 private static Configuration configuration;
 public static Configuration getConfiguration(){
 if(configuration==null){
 /**
 * TODO 了解如何直接從Windows提交代碼到Hadoop集群
 *  并修改其中的配置為實(shí)際配置
 */
 configuration = new Configuration();
 configuration.setBoolean("mapreduce.app-submission.cross-platform", true);// 配置使用跨平臺(tái)提交任務(wù)
 configuration.set("fs.defaultFS", "hdfs://master:8020");// 指定namenode
 configuration.set("mapreduce.framework.name", "yarn"); // 指定使用yarn框架
 configuration.set("yarn.resourcemanager.address", "master:8032"); // 指定resourcemanager
 configuration.set("yarn.resourcemanager.scheduler.address", "master:8030");// 指定資源分配器
 configuration.set("mapreduce.jobhistory.address", "master:10020");// 指定historyserver
 configuration.set("hbase.master", "master:16000");
 configuration.set("hbase.rootdir", "hdfs://master:8020/hbase");
 configuration.set("hbase.zookeeper.quorum", "slave1,slave2,slave3");
 configuration.set("hbase.zookeeper.property.clientPort", "2181");
 //TODO 需export->jar file ; 設(shè)置正確的jar包所在位置
 configuration.set("mapreduce.job.jar",JarUtil.jar(HBaseDriver.class));// 設(shè)置jar包路徑
 }
 
 return configuration;
 }
 
 
}

mapper:

package GeneralHBaseToHBase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map.Entry;
import java.util.NavigableMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HBaseToHBaseMapper extends TableMapper<ImmutableBytesWritable, Put> {
 Logger log = LoggerFactory.getLogger(HBaseToHBaseMapper.class);
 private static int versionNum = 0;
 private static String[] columnFromTable = null;
 private static String[] columnToTable = null;
 private static String column1 = null;
 private static String column2 = null;
 @Override
 protected void setup(Context context)
 throws IOException, InterruptedException {
 Configuration conf = context.getConfiguration();
 versionNum = Integer.parseInt(conf.get("SETVERSION", "0"));
 column1 = conf.get("COLUMNFROMTABLE",null);
 if(!(column1 == null)){
 columnFromTable = column1.split(",");
 }
 column2 = conf.get("COLUMNTOTABLE",null); 
 if(!(column2 == null)){
 columnToTable = column2.split(",");
 }
 }
 @Override
 protected void map(ImmutableBytesWritable key, Result value,
 Context context)
 throws IOException, InterruptedException {
 context.write(key, resultToPut(key,value));
 } 
 /***
 * 把key,value轉(zhuǎn)換為Put
 * @param key
 * @param value
 * @return
 * @throws IOException
 */
 private Put resultToPut(ImmutableBytesWritable key, Result value) throws IOException {
 HashMap<String, String> fTableMap = new HashMap<>();
 HashMap<String, String> tTableMap = new HashMap<>();
 Put put = new Put(key.get());
 if(! (columnFromTable == null || columnFromTable.length == 0)){
 fTableMap = getFamilyAndColumn(columnFromTable);
 }
 if(! (columnToTable == null || columnToTable.length == 0)){
 tTableMap = getFamilyAndColumn(columnToTable);
 }
 if(versionNum==0){      
 if(fTableMap.size() == 0){   
 if(tTableMap.size() == 0){ 
  for (Cell kv : value.rawCells()) {
  put.add(kv); // 沒有設(shè)置版本,沒有設(shè)置列導(dǎo)入,沒有設(shè)置列導(dǎo)出
  }
  return put;
 } else{
  return getPut(put, value, tTableMap); // 無版本、無列導(dǎo)入、有列導(dǎo)出
 }
 } else {
 if(tTableMap.size() == 0){
  return getPut(put, value, fTableMap);// 無版本、有列導(dǎo)入、無列導(dǎo)出
 } else {
  return getPut(put, value, tTableMap);// 無版本、有列導(dǎo)入、有列導(dǎo)出
 }
 }
 } else{
 if(fTableMap.size() == 0){
 if(tTableMap.size() == 0){
  return getPut1(put, value); // 有版本,無列導(dǎo)入,無列導(dǎo)出
 }else{
  return getPut2(put, value, tTableMap); //有版本,無列導(dǎo)入,有列導(dǎo)出
 }
 }else{
 if(tTableMap.size() == 0){
  return getPut2(put,value,fTableMap);// 有版本,有列導(dǎo)入,無列導(dǎo)出
 }else{
  return getPut2(put,value,tTableMap); // 有版本,有列導(dǎo)入,有列導(dǎo)出
 }
 }
 }
 }
 /***
 * 無版本設(shè)置的情況下,對(duì)于有列導(dǎo)入或者列導(dǎo)出
 * @param put
 * @param value
 * @param tableMap
 * @return
 * @throws IOException
 */
 
 private Put getPut(Put put,Result value,HashMap<String, String> tableMap) throws IOException{
 for(Cell kv : value.rawCells()){
 byte[] family = kv.getFamily();
 if(tableMap.containsKey(new String(family))){
 String columnStr = tableMap.get(new String(family));
 ArrayList<String> columnBy = toByte(columnStr);
 if(columnBy.contains(new String(kv.getQualifier()))){
  put.add(kv); //沒有設(shè)置版本,沒有設(shè)置列導(dǎo)入,有設(shè)置列導(dǎo)出
 }
 }
 }
 return put;
 }
 /***
 * (有版本,無列導(dǎo)入,有列導(dǎo)出)或者(有版本,有列導(dǎo)入,無列導(dǎo)出)
 * @param put
 * @param value
 * @param tTableMap
 * @return
 */
 private Put getPut2(Put put,Result value,HashMap<String, String> tableMap){
 NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> map=value.getMap();
  for(byte[] family:map.keySet()){
   if(tableMap.containsKey(new String(family))){
   String columnStr = tableMap.get(new String(family));
   log.info("@@@@@@@@@@@"+new String(family)+" "+columnStr);
 ArrayList<String> columnBy = toByte(columnStr);
   NavigableMap<byte[], NavigableMap<Long, byte[]>> familyMap = map.get(family);//列簇作為key獲取其中的列相關(guān)數(shù)據(jù)
    for(byte[] column:familyMap.keySet()){        //根據(jù)列名循壞
     log.info("!!!!!!!!!!!"+new String(column));
     if(columnBy.contains(new String(column))){
     NavigableMap<Long, byte[]> valuesMap = familyMap.get(column);
      for(Entry<Long, byte[]> s:valuesMap.entrySet()){//獲取列對(duì)應(yīng)的不同版本數(shù)據(jù),默認(rèn)最新的一個(gè)
      System.out.println("***:"+new String(family)+" "+new String(column)+" "+s.getKey()+" "+new String(s.getValue()));
      put.addColumn(family, column, s.getKey(),s.getValue());
      }
     }
    }
   }
   
  }
 return put; 
 }
 /***
 * 有版本、無列導(dǎo)入、無列導(dǎo)出
 * @param put
 * @param value
 * @return
 */
 private Put getPut1(Put put,Result value){
 NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> map=value.getMap();
  for(byte[] family:map.keySet()){ 
   NavigableMap<byte[], NavigableMap<Long, byte[]>> familyMap = map.get(family);//列簇作為key獲取其中的列相關(guān)數(shù)據(jù)
   for(byte[] column:familyMap.keySet()){        //根據(jù)列名循壞
    NavigableMap<Long, byte[]> valuesMap = familyMap.get(column);
    for(Entry<Long, byte[]> s:valuesMap.entrySet()){    //獲取列對(duì)應(yīng)的不同版本數(shù)據(jù),默認(rèn)最新的一個(gè)
     put.addColumn(family, column, s.getKey(),s.getValue());
    }
   }
  }
  return put;
 }
 // str => {"cf1:c1","cf1:c2","cf1:c10","cf1:c11","cf1:c14"}
 /***
 * 得到列簇名與列名的k,v形式的map
 * @param str => {"cf1:c1","cf1:c2","cf1:c10","cf1:c11","cf1:c14"}
 * @return map => {"cf1" => "c1,c2,c10,c11,c14"}
 */
 private static HashMap<String, String> getFamilyAndColumn(String[] str){
 HashMap<String, String> map = new HashMap<>();
 HashSet<String> set = new HashSet<>();
 for(String s : str){
 set.add(s.split(":")[0]);
 }
 Object[] ob = set.toArray();
 for(int i=0; i<ob.length;i++){
 String family = String.valueOf(ob[i]);
 String columns = "";
 for(int j=0;j < str.length;j++){
 if(family.equals(str[j].split(":")[0])){
  columns += str[j].split(":")[1]+",";
 }
 }
 map.put(family, columns.substring(0, columns.length()-1));
 }
 return map; 
 }
 
 private static ArrayList<String> toByte(String s){
 ArrayList<String> b = new ArrayList<>();
 String[] sarr = s.split(",");
 for(int i=0;i<sarr.length;i++){
 b.add(sarr[i]);
 }
 return b;
 }
}

程序運(yùn)行完之后,在hbase shell中查看每個(gè)表,看是否數(shù)據(jù)導(dǎo)入正確:

test2:(無版本、無列導(dǎo)入設(shè)置、無列導(dǎo)出設(shè)置)

test3 (無版本、有列導(dǎo)入設(shè)置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、無列導(dǎo)出設(shè)置)

test4(無版本、無列導(dǎo)入設(shè)置、有列導(dǎo)出設(shè)置("cf1:c1,cf1:c10,cf1:c14"))

test5(有版本、無列導(dǎo)入設(shè)置、無列導(dǎo)出設(shè)置)

test6(有版本、有列導(dǎo)入設(shè)置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、無列導(dǎo)出設(shè)置)

test7(有版本、無列導(dǎo)入設(shè)置、有列導(dǎo)出設(shè)置("cf1:c1,cf1:c10,cf1:c14"))

test8(有版本、有列導(dǎo)入設(shè)置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、有列導(dǎo)出設(shè)置("cf1:c1,cf1:c10,cf1:c14"))

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

  • mongodb官方的golang驅(qū)動(dòng)基礎(chǔ)使用教程分享

    mongodb官方的golang驅(qū)動(dòng)基礎(chǔ)使用教程分享

    這篇文章主要給大家介紹了關(guān)于mongodb官方的golang驅(qū)動(dòng)基礎(chǔ)使用的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用mongodb具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2018-12-12
  • Mongodb 3.2.9開啟用戶權(quán)限認(rèn)證問題的步驟詳解

    Mongodb 3.2.9開啟用戶權(quán)限認(rèn)證問題的步驟詳解

    這篇文章主要給大家介紹了關(guān)于Mongodb 3.2.9開啟用戶權(quán)限認(rèn)證問題的詳細(xì)步驟,通過開啟權(quán)限認(rèn)證,會(huì)對(duì)大家的Mongodb更加保護(hù)的安全些,文中將步驟介紹的非常詳細(xì),需要的朋友可以參考借鑒,下面來一起看看吧。
    2017-08-08
  • MongoDB添加secondary節(jié)點(diǎn)的2種方法詳解

    MongoDB添加secondary節(jié)點(diǎn)的2種方法詳解

    這篇文章主要給大家總結(jié)介紹了關(guān)于MongoDB添加secondary節(jié)點(diǎn)的2種方法,以及MongoDB secondary節(jié)點(diǎn)出現(xiàn)recovering狀態(tài)的解決方法,文中介紹的非常詳細(xì),需要的朋友可以參考下
    2018-10-10
  • Windows下mongodb安裝與配置三步走

    Windows下mongodb安裝與配置三步走

    大家應(yīng)該都知道m(xù)ongodb是當(dāng)下流行的非關(guān)系型數(shù)據(jù)庫(kù),特別是配合node使用,下面示范一下在Windows系統(tǒng)下如何安裝和配置的,文中通過圖文介紹的非常詳細(xì),需要的朋友可以參考借鑒,下面來一起看看吧。
    2017-03-03
  • 利用mongodb查詢某坐標(biāo)是否在規(guī)定多邊形區(qū)域內(nèi)的方法

    利用mongodb查詢某坐標(biāo)是否在規(guī)定多邊形區(qū)域內(nèi)的方法

    這篇文章主要介紹了利用mongodb如何查詢某坐標(biāo)是否在固定多邊形區(qū)域內(nèi)的方法,文中給出了詳細(xì)示例代碼,相信對(duì)大家具有一定的參考價(jià)值,需要的朋友們下面來一起看看吧。
    2017-02-02
  • MongoDB使用profile分析慢查詢的步驟

    MongoDB使用profile分析慢查詢的步驟

    這篇文章主要介紹了MongoDB profile分析慢查詢的示例,幫助大家更好的理解和學(xué)習(xí)使用MongoDB數(shù)據(jù)庫(kù),感興趣的朋友可以了解下
    2021-04-04
  • mongodb BSON的基本使用教程

    mongodb BSON的基本使用教程

    這篇文章主要給大家介紹了關(guān)于mongodb BSON的基本使用教程,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2018-12-12
  • MongoDB聚合運(yùn)算符$divide詳解

    MongoDB聚合運(yùn)算符$divide詳解

    $divide聚合運(yùn)算符返回兩個(gè)表達(dá)式相除的結(jié)果,參數(shù)通過數(shù)組傳遞給$divide運(yùn)算符,這篇文章介紹了MongoDB聚合運(yùn)算符$divide的相關(guān)知識(shí),感興趣的朋友跟隨小編一起看看吧
    2024-03-03
  • MongoDB快速入門筆記(二)之MongoDB的概念及簡(jiǎn)單操作

    MongoDB快速入門筆記(二)之MongoDB的概念及簡(jiǎn)單操作

    MongoDB是面向集合的文檔式數(shù)據(jù)庫(kù),不像關(guān)系數(shù)據(jù)庫(kù)那樣,有表,列、行,mongoDB數(shù)據(jù)庫(kù)則是由一系列的文檔組成。接下來通過本文給大家介紹MongoDB的概念及簡(jiǎn)單操作,一起看看吧
    2016-06-06
  • 解決MongoDB?位置查詢報(bào)錯(cuò)planner?returned?error:?unable?to?find?index?for?$geoNear?query的問題

    解決MongoDB?位置查詢報(bào)錯(cuò)planner?returned?error:?unable?to?find

    這篇文章主要介紹了MongoDB位置查詢報(bào)錯(cuò)planner?returned?error:?unable?to?find?index?for?$geoNear?query的解決方案,需要的朋友可以參考下
    2023-08-08

最新評(píng)論