Spark中的數(shù)據(jù)讀取保存和累加器實(shí)例詳解
數(shù)據(jù)讀取與保存
Text文件
對(duì)于 Text文件的讀取和保存 ,其語(yǔ)法和實(shí)現(xiàn)是最簡(jiǎn)單的,因此我只是簡(jiǎn)單敘述一下這部分相關(guān)知識(shí)點(diǎn),大家可以結(jié)合demo具體分析記憶。
1)基本語(yǔ)法
(1)數(shù)據(jù)讀取:textFile(String)
(2)數(shù)據(jù)保存:saveAsTextFile(String)
2)實(shí)現(xiàn)代碼demo如下:
object Operate_Text { def main(args: Array[String]): Unit = { //1.創(chuàng)建SparkConf并設(shè)置App名稱 val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]") //2.創(chuàng)建SparkContext,該對(duì)象是提交Spark App的入口 val sc: SparkContext = new SparkContext(conf) //3.1 讀取輸入文件 val inputRDD: RDD[String] = sc.textFile("input/demo.txt") //3.2 保存數(shù)據(jù) inputRDD.saveAsTextFile("textFile") //4.關(guān)閉連接 sc.stop() } }
Sequence文件
SequenceFile文件 是Hadoop中用來(lái)存儲(chǔ)二進(jìn)制形式的 key-value對(duì) 的一種平面文件(Flat File)。在SparkContext中,可以通過(guò)調(diào)用 sequenceFile[ keyClass,valueClass ] (path) 來(lái)調(diào)用。
1)基本語(yǔ)法
- (1)數(shù)據(jù)讀?。簊equenceFile[ keyClass, valueClass ] (path)
- (2)數(shù)據(jù)保存:saveAsSequenceFile(String)
2)實(shí)現(xiàn)代碼demo如下:
object Operate_Sequence { def main(args: Array[String]): Unit = { //1.創(chuàng)建SparkConf并設(shè)置App名稱 val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]") //2.創(chuàng)建SparkContext,該對(duì)象是提交Spark App的入口 val sc: SparkContext = new SparkContext(conf) //3.1 創(chuàng)建rdd val dataRDD: RDD[(Int, Int)] = sc.makeRDD(Array((1,2,3),(4,5,6),(7,8,9))) //3.2 保存數(shù)據(jù)為SequenceFile dataRDD.saveAsSequenceFile("seqFile") //3.3 讀取SequenceFile文件 sc.sequenceFile[Int,Int]("seqFile").collect().foreach(println) //4.關(guān)閉連接 sc.stop() } }
Object對(duì)象文件
對(duì)象文件是將對(duì)象序列化后保存的文件,采用Hadoop的序列化機(jī)制??梢酝ㄟ^(guò) objectFile[ k , v ] (path) 函數(shù)接收一個(gè)路徑,讀取對(duì)象文件,返回對(duì)應(yīng)的RDD,也可以通過(guò)調(diào)用 saveAsObjectFile() 實(shí)現(xiàn)對(duì)對(duì)象文件的輸出。因?yàn)橐蛄谢砸付愋汀?/p>
1)基本語(yǔ)法
- (1)數(shù)據(jù)讀取:objectFile[ k , v ] (path)
- (2)數(shù)據(jù)保存:saveAsObjectFile(String)
2)實(shí)現(xiàn)代碼demo如下:
object Operate_Object { def main(args: Array[String]): Unit = { //1.創(chuàng)建SparkConf并設(shè)置App名稱 val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]") //2.創(chuàng)建SparkContext,該對(duì)象是提交Spark App的入口 val sc: SparkContext = new SparkContext(conf) //3.1 創(chuàng)建RDD val dataRDD: RDD[Int] = sc.makeRDD(Array(1,2,3,4,5,6),2) //3.2 保存數(shù)據(jù) dataRDD.saveAsObjectFile("objFile") //3.3 讀取數(shù)據(jù) sc.objectFile[Int]("objFile").collect().foreach(println) //4.關(guān)閉連接 sc.stop() } }
累加器
累加器概念
累加器,是一種變量---分布式共享只寫(xiě)變量。僅支持“add”,支持并發(fā),但Executor和Executor之間不能讀數(shù)據(jù),可實(shí)現(xiàn)所有分片處理時(shí)更新共享變量的功能。
累加器用來(lái)把Executor端變量信息聚合到Driver端。在Driver中定義的一個(gè)變量,在Executor端的每個(gè)task都會(huì)得到這個(gè)變量的一份新的副本,每個(gè)task更新這些副本的值后,傳回Driver端進(jìn)行合并計(jì)算。
系統(tǒng)累加器
1)累加器定義(SparkContext.accumulator(initialValue)方法)
val sum: LongAccumulator = sc.longAccumulator("sum")
2)累加器添加數(shù)據(jù)(累加器.add方法)
sum.add(count)
3)累加器獲取數(shù)據(jù)(累加器.value)
sum.value
注意:Executor端的任務(wù)不能讀取累加器的值(例如:在Executor端調(diào)用sum.value,獲取的值不是累加器最終的值)。因此我們說(shuō),累加器是一個(gè)分布式共享只寫(xiě)變量。
4)累加器要放在行動(dòng)算子中
因?yàn)檗D(zhuǎn)換算子執(zhí)行的次數(shù)取決于job的數(shù)量,如果一個(gè) spark應(yīng)用 有多個(gè)行動(dòng)算子,那么轉(zhuǎn)換算子中的累加器可能會(huì)發(fā)生不止一次更新,導(dǎo)致結(jié)果錯(cuò)誤。所以,如果想要一個(gè)無(wú)論在失敗還是重復(fù)計(jì)算時(shí)都絕對(duì)可靠的累加器,必須把它放在foreach()這樣的行動(dòng)算子中。
5) 代碼實(shí)現(xiàn):
object accumulator_system { package com.atguigu.cache import org.apache.spark.rdd.RDD import org.apache.spark.util.LongAccumulator import org.apache.spark.{SparkConf, SparkContext} object accumulator_system { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]") val sc = new SparkContext(conf) val dataRDD: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4))) //需求:統(tǒng)計(jì)a出現(xiàn)的所有次數(shù) ("a",10) //普通算子實(shí)現(xiàn) reduceByKey 代碼會(huì)走shuffle 效率低 val rdd: RDD[(String, Int)] = dataRDD.reduceByKey(_ + _) //累加器實(shí)現(xiàn) //1 聲明累加器 val accSum: LongAccumulator = sc.longAccumulator("sum") dataRDD.foreach{ case (a,count) => { //2 使用累加器累加 累加器.add() accSum.add(count) // 4 不在executor端獲取累加器的值,因?yàn)榈玫降闹挡粶?zhǔn)確,所以累加器叫分布式共享只寫(xiě)變量 //println("sum = " + accSum.value) } } //3 獲取累加器的值 累加器.value println(("a",accSum.value)) sc.stop() } }
以上就是Spark中的數(shù)據(jù)讀取保存和累加器實(shí)例詳解的詳細(xì)內(nèi)容,更多關(guān)于Spark數(shù)據(jù)讀取保存累加器的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
linux系統(tǒng)使用vscode進(jìn)行qt開(kāi)發(fā)的過(guò)程分享
最近在Linux上搞Qt,搞的一頭霧水,小編把整個(gè)過(guò)程記錄下,分享需要的朋友,如果大家對(duì)linux系統(tǒng)使用vscode進(jìn)行qt開(kāi)發(fā)相關(guān)知識(shí)感興趣的朋友跟隨小編一起看看吧2021-12-12Mac如何給應(yīng)用單獨(dú)設(shè)置語(yǔ)言
這篇文章主要介紹了Mac如何給應(yīng)用單獨(dú)設(shè)置語(yǔ)言,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-10-10解決Unable to access ''https://gitee.com/自己的項(xiàng)目/'': Could not r
這篇文章主要介紹了解決Unable to access 'https://gitee.com/自己的項(xiàng)目/': Could not resolve host: gitee.com問(wèn)題,需要的朋友可以參考下2020-09-09git的遠(yuǎn)程分支的作用和本地分支有什么區(qū)別
這篇文章主要介紹了git的遠(yuǎn)程分支的作用和本地的有什么區(qū)別,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-10-10textarea 在IE和FF下?lián)Q行無(wú)法正常顯示的解決方法
今天在做項(xiàng)目時(shí)用到textarea 用戶輸入信息后顯示不換行在IE下測(cè)試成功在FF沒(méi)反應(yīng)2010-07-07GitLab使用外部提供的Redis緩存數(shù)據(jù)庫(kù)的方法詳解
這篇文章主要介紹了GitLab: 如何使用外部提供的Redis緩存數(shù)據(jù)庫(kù),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-09-09云開(kāi)發(fā) VSCode 插件 Cloudbase Toolkit 的正確打開(kāi)方式及應(yīng)用場(chǎng)景分析
Tencent CloudBase Toolkit 是云開(kāi)發(fā)的 VS Code(Visual Studio Code)插件。這篇文章主要介紹了云開(kāi)發(fā) VSCode 插件 Cloudbase Toolkit 的正確打開(kāi)方式,需要的朋友可以參考下2020-07-07十進(jìn)制負(fù)數(shù)轉(zhuǎn)換為二進(jìn)制、八進(jìn)制、十六進(jìn)制的知識(shí)分享
這篇文章主要介紹了十進(jìn)制負(fù)數(shù)轉(zhuǎn)換為二進(jìn)制、八進(jìn)制、十六進(jìn)制的知識(shí)分享,需要的朋友可以參考下2014-02-02風(fēng)中葉老師講述的學(xué)習(xí)方法(學(xué)習(xí)編程的朋友需要看)
風(fēng)中葉老師講述的學(xué)習(xí)方法(學(xué)習(xí)編程的朋友需要看),希望大家能按照說(shuō)明的那樣,自己多動(dòng)手動(dòng)腦2008-10-10