亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

flink?RichFunction之坑及解決

 更新時間:2022年12月17日 08:51:57   投稿:jingxian  
這篇文章主要介紹了flink?RichFunction之坑及解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教

flink RichFunction之坑

flink的RichMapFunction,RichSinkFunction等,并不能百分百做到每次只open一個數(shù)據(jù)庫連接。

在有些情況下他會一直創(chuàng)建然后銷毀,創(chuàng)建銷毀。

舉例: 重點在第三行的注釋

  val value = env.socketTextStream("192.168.13.11", 9090)
    val value2 = value.filter(x => {
      try {
        var a = 1 / 0   //此處若沒有異常處理,任務(wù)不會斷,但是會重復(fù)打開數(shù)據(jù)庫連接
      } catch {
        case e: Exception =>
      }
      isInter(x)
    }).map(fun = x => {
      x.toLong
    })
    val value1 = value2.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Long](Time.seconds(1)) {
      override def extractTimestamp(element: Long): Long = {
        println(element + "***************")
        element
      }
    })

    try {
      var a = 1 / 0
    } catch {
      case e: Exception =>
    }
    value1.map(new mymap)
    env.execute("test")

  }

  def isInter(input: String): Boolean = {
    val matcher = Pattern.compile("^[0-9]+$").matcher(input)
    matcher.find()
  }
}


class myRichMapfun6() extends RichMapFunction[ListBuffer[String], Unit] {
  var conn: Connection = _
  var pst: PreparedStatement = _

  override def open(parameters: Configuration): Unit = {
    conn = DriverManager.getConnection("jdbc:mysql://xxxxxxx:3306/zzt?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&autoReconnect=true", "root", "bigdata@mysql")
    println(conn)
    pst = conn.prepareStatement("insert into testa (str) values (?)")
  }

  override def close(): Unit = {
    conn.close()
    pst.close()
  }

  override def map(in: ListBuffer[String]): Unit = {
    pst.setString(1, in.head)
    pst.execute()
  }
}

所以你是不是覺得那就價格異常處理不就得了?

NO

再看:

在這里插入圖片描述

這個時候,如果傳進來line不是數(shù)字或者格式不對,就會觸發(fā)異常,然而此時就不會像上面那樣幫你解決問題,而是一遍遍創(chuàng)建對象銷毀對象,一條消息創(chuàng)建一個連接,我就問你慌不慌,

原因

據(jù)觀察是因為,輸入的數(shù)據(jù)有問題,直接導致

 val value1 = value2.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Long](Time.seconds(1)) {
      override def extractTimestamp(element: Long): Long = {
        println(element + "***************")
        element
      }
    })

這個崩潰了,不走這行代碼了,沒有獲得eventime,然后估計。。。 剩下的我也沒詳細測。。。

解決方案

先fiiter過濾任何可能導致異常的臟數(shù)據(jù)確保數(shù)據(jù)都沒問題就可以了。 

flink中RichFunction的一點小作用

①傳遞參數(shù)

所有需要用戶定義的函數(shù)都可以轉(zhuǎn)換成richfunction,例如實現(xiàn)map operator中你需要實現(xiàn)一個內(nèi)部類,并實現(xiàn)它的map方法:

data.map (new MapFunction<String, Integer>() {
  public Integer map(String value) { return Integer.parseInt(value); }
});

然后我們可以將其轉(zhuǎn)換為RichMapFunction:

data.map (new RichMapFunction<String, Integer>() {
  public Integer map(String value) { return Integer.parseInt(value); }
});

當然,RichFuction除了提供原來MapFuction的方法之外,還提供open, close, getRuntimeContext 和setRuntimeContext方法,這些功能可用于參數(shù)化函數(shù)(傳遞參數(shù)),創(chuàng)建和完成本地狀態(tài),訪問廣播變量以及訪問運行時信息以及有關(guān)迭代中的信息。

下面我們來看看RichFuction中傳遞參數(shù)的例子,以下代碼是測試RichFilterFuction的例子,基于DataSet而非DataStream。

由代碼可見,可以將Configuration中的limit參數(shù)的值傳遞進RichFuction里面,通過后面withParameters方法傳遞進去,最后的結(jié)果是

由此可見,我從configuration中獲取了limit的值,并設(shè)定了fliter的閾值是2,從而過濾了1,2。

②傳遞廣播變量

原理和上面差不多,下面我直接把代碼貼出來:

這是目前我學習到的RichFunction的用法,和大家分享一下。

總結(jié)

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

最新評論