Spark自定義累加器的使用實(shí)例詳解
累加器(accumulator)是Spark中提供的一種分布式的變量機(jī)制,其原理類似于mapreduce,即分布式的改變,然后聚合這些改變。累加器的一個(gè)常見用途是在調(diào)試時(shí)對(duì)作業(yè)執(zhí)行過(guò)程中的事件進(jìn)行計(jì)數(shù)。
累加器簡(jiǎn)單使用
Spark內(nèi)置的提供了Long和Double類型的累加器。下面是一個(gè)簡(jiǎn)單的使用示例,在這個(gè)例子中我們?cè)谶^(guò)濾掉RDD中奇數(shù)的同時(shí)進(jìn)行計(jì)數(shù),最后計(jì)算剩下整數(shù)的和。
val sparkConf = new SparkConf().setAppName("Test").setMaster("local[2]") val sc = new SparkContext(sparkConf) val accum = sc.longAccumulator("longAccum") //統(tǒng)計(jì)奇數(shù)的個(gè)數(shù) val sum = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2).filter(n=>{ if(n%2!=0) accum.add(1L) n%2==0 }).reduce(_+_) println("sum: "+sum) println("accum: "+accum.value) sc.stop()
結(jié)果為:
sum: 20
accum: 5
這是結(jié)果正常的情況,但是在使用累加器的過(guò)程中如果對(duì)于spark的執(zhí)行過(guò)程理解的不夠深入就會(huì)遇到兩類典型的錯(cuò)誤:少加(或者沒(méi)加)、多加。
自定義累加器
自定義累加器類型的功能在1.X版本中就已經(jīng)提供了,但是使用起來(lái)比較麻煩,在2.0版本后,累加器的易用性有了較大的改進(jìn),而且官方還提供了一個(gè)新的抽象類:AccumulatorV2來(lái)提供更加友好的自定義類型累加器的實(shí)現(xiàn)方式。官方同時(shí)給出了一個(gè)實(shí)現(xiàn)的示例:CollectionAccumulator類,這個(gè)類允許以集合的形式收集spark應(yīng)用執(zhí)行過(guò)程中的一些信息。例如,我們可以用這個(gè)類收集Spark處理數(shù)據(jù)時(shí)的一些細(xì)節(jié),當(dāng)然,由于累加器的值最終要匯聚到driver端,為了避免 driver端的outofmemory問(wèn)題,需要對(duì)收集的信息的規(guī)模要加以控制,不宜過(guò)大。
繼承AccumulatorV2類,并復(fù)寫它的所有方法
package spark import constant.Constant import org.apache.spark.util.AccumulatorV2 import util.getFieldFromConcatString import util.setFieldFromConcatString open class SessionAccmulator : AccumulatorV2<String, String>() { private var result = Constant.SESSION_COUNT + "=0|"+ Constant.TIME_PERIOD_1s_3s + "=0|"+ Constant.TIME_PERIOD_4s_6s + "=0|"+ Constant.TIME_PERIOD_7s_9s + "=0|"+ Constant.TIME_PERIOD_10s_30s + "=0|"+ Constant.TIME_PERIOD_30s_60s + "=0|"+ Constant.TIME_PERIOD_1m_3m + "=0|"+ Constant.TIME_PERIOD_3m_10m + "=0|"+ Constant.TIME_PERIOD_10m_30m + "=0|"+ Constant.TIME_PERIOD_30m + "=0|"+ Constant.STEP_PERIOD_1_3 + "=0|"+ Constant.STEP_PERIOD_4_6 + "=0|"+ Constant.STEP_PERIOD_7_9 + "=0|"+ Constant.STEP_PERIOD_10_30 + "=0|"+ Constant.STEP_PERIOD_30_60 + "=0|"+ Constant.STEP_PERIOD_60 + "=0" override fun value(): String { return this.result } /** * 合并數(shù)據(jù) */ override fun merge(other: AccumulatorV2<String, String>?) { if (other == null) return else { if (other is SessionAccmulator) { var newResult = "" val resultArray = arrayOf(Constant.SESSION_COUNT,Constant.TIME_PERIOD_1s_3s, Constant.TIME_PERIOD_4s_6s, Constant.TIME_PERIOD_7s_9s, Constant.TIME_PERIOD_10s_30s, Constant.TIME_PERIOD_30s_60s, Constant.TIME_PERIOD_1m_3m, Constant.TIME_PERIOD_3m_10m, Constant.TIME_PERIOD_10m_30m, Constant.TIME_PERIOD_30m, Constant.STEP_PERIOD_1_3, Constant.STEP_PERIOD_4_6, Constant.STEP_PERIOD_7_9, Constant.STEP_PERIOD_10_30, Constant.STEP_PERIOD_30_60, Constant.STEP_PERIOD_60) resultArray.forEach { val oldValue = other.result.getFieldFromConcatString("|", it) if (oldValue.isNotEmpty()) { val newValue = oldValue.toInt() + 1 //找到原因,一直在循環(huán)賦予值,debug30分鐘 很煩 if (newResult.isEmpty()){ newResult = result.setFieldFromConcatString("|", it, newValue.toString()) } //問(wèn)題就在于這里,自定義沒(méi)有寫錯(cuò),合并錯(cuò)了 newResult = newResult.setFieldFromConcatString("|", it, newValue.toString()) } } result = newResult } } } override fun copy(): AccumulatorV2<String, String> { val sessionAccmulator = SessionAccmulator() sessionAccmulator.result = this.result return sessionAccmulator } override fun add(p0: String?) { val v1 = this.result val v2 = p0 if (v2.isNullOrEmpty()){ return }else{ var newResult = "" val oldValue = v1.getFieldFromConcatString("|", v2!!) if (oldValue.isNotEmpty()){ val newValue = oldValue.toInt() + 1 newResult = result.setFieldFromConcatString("|", v2, newValue.toString()) } result = newResult } } override fun reset() { val newResult = Constant.SESSION_COUNT + "=0|"+ Constant.TIME_PERIOD_1s_3s + "=0|"+ Constant.TIME_PERIOD_4s_6s + "=0|"+ Constant.TIME_PERIOD_7s_9s + "=0|"+ Constant.TIME_PERIOD_10s_30s + "=0|"+ Constant.TIME_PERIOD_30s_60s + "=0|"+ Constant.TIME_PERIOD_1m_3m + "=0|"+ Constant.TIME_PERIOD_3m_10m + "=0|"+ Constant.TIME_PERIOD_10m_30m + "=0|"+ Constant.TIME_PERIOD_30m + "=0|"+ Constant.STEP_PERIOD_1_3 + "=0|"+ Constant.STEP_PERIOD_4_6 + "=0|"+ Constant.STEP_PERIOD_7_9 + "=0|"+ Constant.STEP_PERIOD_10_30 + "=0|"+ Constant.STEP_PERIOD_30_60 + "=0|"+ Constant.STEP_PERIOD_60 + "=0" result = newResult } override fun isZero(): Boolean { val newResult = Constant.SESSION_COUNT + "=0|"+ Constant.TIME_PERIOD_1s_3s + "=0|"+ Constant.TIME_PERIOD_4s_6s + "=0|"+ Constant.TIME_PERIOD_7s_9s + "=0|"+ Constant.TIME_PERIOD_10s_30s + "=0|"+ Constant.TIME_PERIOD_30s_60s + "=0|"+ Constant.TIME_PERIOD_1m_3m + "=0|"+ Constant.TIME_PERIOD_3m_10m + "=0|"+ Constant.TIME_PERIOD_10m_30m + "=0|"+ Constant.TIME_PERIOD_30m + "=0|"+ Constant.STEP_PERIOD_1_3 + "=0|"+ Constant.STEP_PERIOD_4_6 + "=0|"+ Constant.STEP_PERIOD_7_9 + "=0|"+ Constant.STEP_PERIOD_10_30 + "=0|"+ Constant.STEP_PERIOD_30_60 + "=0|"+ Constant.STEP_PERIOD_60 + "=0" return this.result == newResult } }
方法介紹
value方法:獲取累加器中的值
merge方法:該方法特別重要,一定要寫對(duì),這個(gè)方法是各個(gè)task的累加器進(jìn)行合并的方法(下面介紹執(zhí)行流程中將要用到)
iszero方法:判斷是否為初始值
reset方法:重置累加器中的值
copy方法:拷貝累加器
spark中累加器的執(zhí)行流程:
首先有幾個(gè)task,spark engine就調(diào)用copy方法拷貝幾個(gè)累加器(不注冊(cè)的),然后在各個(gè)task中進(jìn)行累加(注意在此過(guò)程中,被最初注冊(cè)的累加器的值是不變的),執(zhí)行最后將調(diào)用merge方法和各個(gè)task的結(jié)果累計(jì)器進(jìn)行合并(此時(shí)被注冊(cè)的累加器是初始值)
總結(jié)
以上就是本文關(guān)于Spark自定義累加器的使用實(shí)例詳解的全部?jī)?nèi)容,希望對(duì)大家有所幫助。有什么問(wèn)題可以隨時(shí)留言,小編會(huì)及時(shí)回復(fù)大家的。
相關(guān)文章
rsync同步時(shí)出現(xiàn)rsync: failed to set times on “xxxx”: Operation no
今天在同步數(shù)據(jù)的時(shí)候提示rsync: failed to set times on “xxxx”: Operation not permitted,一般來(lái)說(shuō)要不是服務(wù)器時(shí)間不對(duì)或者權(quán)限沒(méi)有設(shè)置好2016-12-12用phpMyadmin創(chuàng)建Mysql數(shù)據(jù)庫(kù)及獨(dú)立數(shù)據(jù)庫(kù)帳號(hào)的圖文教程
在一個(gè)服務(wù)器上一般來(lái)講都不止一個(gè)站點(diǎn),更不止一個(gè)MySQL(和PHP搭配之最佳組合)數(shù)據(jù)庫(kù)。2010-03-03詳解如何將本地JAR包添加到本地Maven倉(cāng)庫(kù)中
這篇文章主要介紹了詳解如何將本地JAR包添加到本地Maven倉(cāng)庫(kù)中的相關(guān)資料,希望通過(guò)本文能幫助到大家來(lái)實(shí)現(xiàn)這樣的功能,需要的朋友可以參考下2017-09-09TeamCenter12登陸報(bào)404/503問(wèn)題解決方案
這篇文章主要介紹了TeamCenter12登陸報(bào)404/503問(wèn)題解決方案,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-10-10公網(wǎng)使用SSH遠(yuǎn)程登錄macOS服務(wù)器的過(guò)程(內(nèi)網(wǎng)穿透)
這篇文章主要介紹了公網(wǎng)使用SSH遠(yuǎn)程登錄macOS服務(wù)器【內(nèi)網(wǎng)穿透】,本次教程,我們將使用cpolar內(nèi)網(wǎng)穿透工具,映射ssh服務(wù)默認(rèn)端口:22端口,獲取公網(wǎng)地址,實(shí)現(xiàn)在公網(wǎng)環(huán)境下的ssh遠(yuǎn)程登錄,無(wú)需公網(wǎng)IP,也無(wú)需設(shè)置路由器,需要的朋友可以參考下2023-04-04VScode連接遠(yuǎn)程服務(wù)器踩坑實(shí)戰(zhàn)記錄(新版離線vscode-server安裝)
本文主要介紹了如何使用VScode連接遠(yuǎn)程服務(wù)器,并對(duì)離線安裝vscode-server進(jìn)行了詳細(xì)的操作步驟說(shuō)明,其中包括VScode擴(kuò)展的安裝與配置,vscode-server的離線下載,文件的解壓縮和移動(dòng),以及VScode的一些更新設(shè)置,能夠幫助讀者更好地理解和掌握VScode連接遠(yuǎn)程服務(wù)器的方法2024-10-10