Spark自定義累加器的使用實(shí)例詳解
累加器(accumulator)是Spark中提供的一種分布式的變量機(jī)制,其原理類似于mapreduce,即分布式的改變,然后聚合這些改變。累加器的一個(gè)常見用途是在調(diào)試時(shí)對(duì)作業(yè)執(zhí)行過程中的事件進(jìn)行計(jì)數(shù)。
累加器簡(jiǎn)單使用
Spark內(nèi)置的提供了Long和Double類型的累加器。下面是一個(gè)簡(jiǎn)單的使用示例,在這個(gè)例子中我們?cè)谶^濾掉RDD中奇數(shù)的同時(shí)進(jìn)行計(jì)數(shù),最后計(jì)算剩下整數(shù)的和。
val sparkConf = new SparkConf().setAppName("Test").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val accum = sc.longAccumulator("longAccum") //統(tǒng)計(jì)奇數(shù)的個(gè)數(shù)
val sum = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2).filter(n=>{
if(n%2!=0) accum.add(1L)
n%2==0
}).reduce(_+_)
println("sum: "+sum)
println("accum: "+accum.value)
sc.stop()
結(jié)果為:
sum: 20
accum: 5
這是結(jié)果正常的情況,但是在使用累加器的過程中如果對(duì)于spark的執(zhí)行過程理解的不夠深入就會(huì)遇到兩類典型的錯(cuò)誤:少加(或者沒加)、多加。
自定義累加器
自定義累加器類型的功能在1.X版本中就已經(jīng)提供了,但是使用起來比較麻煩,在2.0版本后,累加器的易用性有了較大的改進(jìn),而且官方還提供了一個(gè)新的抽象類:AccumulatorV2來提供更加友好的自定義類型累加器的實(shí)現(xiàn)方式。官方同時(shí)給出了一個(gè)實(shí)現(xiàn)的示例:CollectionAccumulator類,這個(gè)類允許以集合的形式收集spark應(yīng)用執(zhí)行過程中的一些信息。例如,我們可以用這個(gè)類收集Spark處理數(shù)據(jù)時(shí)的一些細(xì)節(jié),當(dāng)然,由于累加器的值最終要匯聚到driver端,為了避免 driver端的outofmemory問題,需要對(duì)收集的信息的規(guī)模要加以控制,不宜過大。
繼承AccumulatorV2類,并復(fù)寫它的所有方法
package spark
import constant.Constant
import org.apache.spark.util.AccumulatorV2
import util.getFieldFromConcatString
import util.setFieldFromConcatString
open class SessionAccmulator : AccumulatorV2<String, String>() {
private var result = Constant.SESSION_COUNT + "=0|"+
Constant.TIME_PERIOD_1s_3s + "=0|"+
Constant.TIME_PERIOD_4s_6s + "=0|"+
Constant.TIME_PERIOD_7s_9s + "=0|"+
Constant.TIME_PERIOD_10s_30s + "=0|"+
Constant.TIME_PERIOD_30s_60s + "=0|"+
Constant.TIME_PERIOD_1m_3m + "=0|"+
Constant.TIME_PERIOD_3m_10m + "=0|"+
Constant.TIME_PERIOD_10m_30m + "=0|"+
Constant.TIME_PERIOD_30m + "=0|"+
Constant.STEP_PERIOD_1_3 + "=0|"+
Constant.STEP_PERIOD_4_6 + "=0|"+
Constant.STEP_PERIOD_7_9 + "=0|"+
Constant.STEP_PERIOD_10_30 + "=0|"+
Constant.STEP_PERIOD_30_60 + "=0|"+
Constant.STEP_PERIOD_60 + "=0"
override fun value(): String {
return this.result
}
/**
* 合并數(shù)據(jù)
*/
override fun merge(other: AccumulatorV2<String, String>?) {
if (other == null) return else {
if (other is SessionAccmulator) {
var newResult = ""
val resultArray = arrayOf(Constant.SESSION_COUNT,Constant.TIME_PERIOD_1s_3s, Constant.TIME_PERIOD_4s_6s, Constant.TIME_PERIOD_7s_9s,
Constant.TIME_PERIOD_10s_30s, Constant.TIME_PERIOD_30s_60s, Constant.TIME_PERIOD_1m_3m,
Constant.TIME_PERIOD_3m_10m, Constant.TIME_PERIOD_10m_30m, Constant.TIME_PERIOD_30m,
Constant.STEP_PERIOD_1_3, Constant.STEP_PERIOD_4_6, Constant.STEP_PERIOD_7_9,
Constant.STEP_PERIOD_10_30, Constant.STEP_PERIOD_30_60, Constant.STEP_PERIOD_60)
resultArray.forEach {
val oldValue = other.result.getFieldFromConcatString("|", it)
if (oldValue.isNotEmpty()) {
val newValue = oldValue.toInt() + 1
//找到原因,一直在循環(huán)賦予值,debug30分鐘 很煩
if (newResult.isEmpty()){
newResult = result.setFieldFromConcatString("|", it, newValue.toString())
}
//問題就在于這里,自定義沒有寫錯(cuò),合并錯(cuò)了
newResult = newResult.setFieldFromConcatString("|", it, newValue.toString())
}
}
result = newResult
}
}
}
override fun copy(): AccumulatorV2<String, String> {
val sessionAccmulator = SessionAccmulator()
sessionAccmulator.result = this.result
return sessionAccmulator
}
override fun add(p0: String?) {
val v1 = this.result
val v2 = p0
if (v2.isNullOrEmpty()){
return
}else{
var newResult = ""
val oldValue = v1.getFieldFromConcatString("|", v2!!)
if (oldValue.isNotEmpty()){
val newValue = oldValue.toInt() + 1
newResult = result.setFieldFromConcatString("|", v2, newValue.toString())
}
result = newResult
}
}
override fun reset() {
val newResult = Constant.SESSION_COUNT + "=0|"+
Constant.TIME_PERIOD_1s_3s + "=0|"+
Constant.TIME_PERIOD_4s_6s + "=0|"+
Constant.TIME_PERIOD_7s_9s + "=0|"+
Constant.TIME_PERIOD_10s_30s + "=0|"+
Constant.TIME_PERIOD_30s_60s + "=0|"+
Constant.TIME_PERIOD_1m_3m + "=0|"+
Constant.TIME_PERIOD_3m_10m + "=0|"+
Constant.TIME_PERIOD_10m_30m + "=0|"+
Constant.TIME_PERIOD_30m + "=0|"+
Constant.STEP_PERIOD_1_3 + "=0|"+
Constant.STEP_PERIOD_4_6 + "=0|"+
Constant.STEP_PERIOD_7_9 + "=0|"+
Constant.STEP_PERIOD_10_30 + "=0|"+
Constant.STEP_PERIOD_30_60 + "=0|"+
Constant.STEP_PERIOD_60 + "=0"
result = newResult
}
override fun isZero(): Boolean {
val newResult = Constant.SESSION_COUNT + "=0|"+
Constant.TIME_PERIOD_1s_3s + "=0|"+
Constant.TIME_PERIOD_4s_6s + "=0|"+
Constant.TIME_PERIOD_7s_9s + "=0|"+
Constant.TIME_PERIOD_10s_30s + "=0|"+
Constant.TIME_PERIOD_30s_60s + "=0|"+
Constant.TIME_PERIOD_1m_3m + "=0|"+
Constant.TIME_PERIOD_3m_10m + "=0|"+
Constant.TIME_PERIOD_10m_30m + "=0|"+
Constant.TIME_PERIOD_30m + "=0|"+
Constant.STEP_PERIOD_1_3 + "=0|"+
Constant.STEP_PERIOD_4_6 + "=0|"+
Constant.STEP_PERIOD_7_9 + "=0|"+
Constant.STEP_PERIOD_10_30 + "=0|"+
Constant.STEP_PERIOD_30_60 + "=0|"+
Constant.STEP_PERIOD_60 + "=0"
return this.result == newResult
}
}
方法介紹
value方法:獲取累加器中的值
merge方法:該方法特別重要,一定要寫對(duì),這個(gè)方法是各個(gè)task的累加器進(jìn)行合并的方法(下面介紹執(zhí)行流程中將要用到)
iszero方法:判斷是否為初始值
reset方法:重置累加器中的值
copy方法:拷貝累加器
spark中累加器的執(zhí)行流程:
首先有幾個(gè)task,spark engine就調(diào)用copy方法拷貝幾個(gè)累加器(不注冊(cè)的),然后在各個(gè)task中進(jìn)行累加(注意在此過程中,被最初注冊(cè)的累加器的值是不變的),執(zhí)行最后將調(diào)用merge方法和各個(gè)task的結(jié)果累計(jì)器進(jìn)行合并(此時(shí)被注冊(cè)的累加器是初始值)
總結(jié)
以上就是本文關(guān)于Spark自定義累加器的使用實(shí)例詳解的全部?jī)?nèi)容,希望對(duì)大家有所幫助。有什么問題可以隨時(shí)留言,小編會(huì)及時(shí)回復(fù)大家的。
相關(guān)文章
rsync同步時(shí)出現(xiàn)rsync: failed to set times on “xxxx”: Operation no
今天在同步數(shù)據(jù)的時(shí)候提示rsync: failed to set times on “xxxx”: Operation not permitted,一般來說要不是服務(wù)器時(shí)間不對(duì)或者權(quán)限沒有設(shè)置好2016-12-12
用phpMyadmin創(chuàng)建Mysql數(shù)據(jù)庫及獨(dú)立數(shù)據(jù)庫帳號(hào)的圖文教程
在一個(gè)服務(wù)器上一般來講都不止一個(gè)站點(diǎn),更不止一個(gè)MySQL(和PHP搭配之最佳組合)數(shù)據(jù)庫。2010-03-03
TeamCenter12登陸報(bào)404/503問題解決方案
這篇文章主要介紹了TeamCenter12登陸報(bào)404/503問題解決方案,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-10-10
公網(wǎng)使用SSH遠(yuǎn)程登錄macOS服務(wù)器的過程(內(nèi)網(wǎng)穿透)
這篇文章主要介紹了公網(wǎng)使用SSH遠(yuǎn)程登錄macOS服務(wù)器【內(nèi)網(wǎng)穿透】,本次教程,我們將使用cpolar內(nèi)網(wǎng)穿透工具,映射ssh服務(wù)默認(rèn)端口:22端口,獲取公網(wǎng)地址,實(shí)現(xiàn)在公網(wǎng)環(huán)境下的ssh遠(yuǎn)程登錄,無需公網(wǎng)IP,也無需設(shè)置路由器,需要的朋友可以參考下2023-04-04
VScode連接遠(yuǎn)程服務(wù)器踩坑實(shí)戰(zhàn)記錄(新版離線vscode-server安裝)
本文主要介紹了如何使用VScode連接遠(yuǎn)程服務(wù)器,并對(duì)離線安裝vscode-server進(jìn)行了詳細(xì)的操作步驟說明,其中包括VScode擴(kuò)展的安裝與配置,vscode-server的離線下載,文件的解壓縮和移動(dòng),以及VScode的一些更新設(shè)置,能夠幫助讀者更好地理解和掌握VScode連接遠(yuǎn)程服務(wù)器的方法2024-10-10

