PySpark中RDD的數(shù)據(jù)輸出問題詳解
RDD概念
RDD(resilient distributed dataset ,彈性分布式數(shù)據(jù)集),是 Spark 中最基礎的抽象。它表示了一個可以并行操作的、不可變得、被分區(qū)了的元素集合。用戶不需要關心底層復雜的抽象處理,直接使用方便的算子處理和計算就可以了。
RDD的特點
1) . 分布式 RDD是一個抽象的概念,RDD在spark driver中,通過RDD來引用數(shù)據(jù),數(shù)據(jù)真正存儲在節(jié)點機的partition上。
2). 只讀 在Spark中RDD一旦生成了,就不能修改。 那么為什么要設置為只讀,設置為只讀的話,因為不存在修改,并發(fā)的吞吐量就上來了。
3). 血緣關系 我們需要對RDD進行一系列的操作,因為RDD是只讀的,我們只能不斷的生產新的RDD,這樣,新的RDD與原來的RDD就會存在一些血緣關系。
Spark會記錄這些血緣關系,在后期的容錯上會有很大的益處。
4). 緩存 當一個 RDD 需要被重復使用時,或者當任務失敗重新計算的時候,這時如果將 RDD 緩存起來,就可以避免重新計算,保證程序運行的性能。
一. 回顧
數(shù)據(jù)輸入:
- sc.parallelize
- sc.textFile
數(shù)據(jù)計算:
- rdd.map
- rdd.flatMap
- rdd.reduceByKey
- .…
二.輸出為python對象
數(shù)據(jù)輸出可用的方法是很多的,這里簡單介紹常會用到的4個
- collect:將RDD內容轉換為list
- reduce:對RDD內容進行自定義聚合
- take:取出RDD的前N個元素組成list
- count:統(tǒng)計RDD元素個數(shù)
collect算子
功能:將RDD各個分區(qū)內的數(shù)據(jù),統(tǒng)一收集到Driver中,形成一個List對象
用法:
rdd.collect()
返回值是一個list
演示
from pyspark import SparkContext,SparkConf import os os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe" conf=SparkConf().setMaster("local[*]").setAppName("test_spark") sc=SparkContext(conf=conf) #準備一個RDD rdd=sc.parallelize([1,2,3,4,5]) #collect算子,輸出RDD為list對象 print("rdd是:",rdd) print("rdd.collect是:",rdd.collect())
結果是
單獨輸出rdd,輸出的是rdd的類名而非內容
reduce算子
功能:對RDD數(shù)據(jù)集按照你傳入的邏輯進行聚合
語法:
代碼
返回值等于計算函數(shù)的返回值
演示
from pyspark import SparkContext,SparkConf import os os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe" conf=SparkConf().setMaster("local[*]").setAppName("test_spark") sc=SparkContext(conf=conf) #準備一個RDD rdd=sc.parallelize([1,2,3,4,5]) #collect算子,輸出RDD為list對象 print("rdd是:",rdd) print("rdd.collect是:",rdd.collect()) print("rdd.collect的類型是:",type(rdd.collect())) #reduce算子,對RDD進行兩兩聚合 num=rdd.reduce(lambda x,y:x+y) print(num)
結果是
take算子
功能:取RDD的前N個元素,組合成list返回給你
用法:
演示
from pyspark import SparkContext,SparkConf import os os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe" conf=SparkConf().setMaster("local[*]").setAppName("test_spark") sc=SparkContext(conf=conf) #準備一個RDD rdd=sc.parallelize([1,2,3,4,5]) #collect算子,輸出RDD為list對象 print("rdd是:",rdd) print("rdd.collect是:",rdd.collect()) print("rdd.collect的類型是:",type(rdd.collect())) #reduce算子,對RDD進行兩兩聚合 num=rdd.reduce(lambda x,y:x+y) print(num) #take算子,取出RDD前n個元素,組成list返回 take_list=rdd.take(3) print(take_list)
結果是
count算子
功能:計算RDD有多少條數(shù)據(jù),返回值是一個數(shù)字
用法:
演示
from pyspark import SparkContext,SparkConf import os os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe" conf=SparkConf().setMaster("local[*]").setAppName("test_spark") sc=SparkContext(conf=conf) #準備一個RDD rdd=sc.parallelize([1,2,3,4,5]) #collect算子,輸出RDD為list對象 print("rdd是:",rdd) print("rdd.collect是:",rdd.collect()) print("rdd.collect的類型是:",type(rdd.collect())) #reduce算子,對RDD進行兩兩聚合 num=rdd.reduce(lambda x,y:x+y) print(num) #take算子,取出RDD前n個元素,組成list返回 take_list=rdd.take(3) print(take_list) #count算子,統(tǒng)計rdd中有多少條數(shù)據(jù),返回值為數(shù)字 num_count=rdd.count() print(num_count) #關閉鏈接 sc.stop()
結果是
小結
1.Spark的編程流程就是:
- 將數(shù)據(jù)加載為RDD(數(shù)據(jù)輸入)對RDD進行計算(數(shù)據(jù)計算)
- 將RDD轉換為Python對象(數(shù)據(jù)輸出)
2.數(shù)據(jù)輸出的方法
- collect:將RDD內容轉換為list
- reduce:對RDD內容進行自定義聚合
- take:取出RDD的前N個元素組成list
- count:統(tǒng)計RDD元素個數(shù)
數(shù)據(jù)輸出可用的方法是很多的,這里只是簡單介紹4個
三.輸出到文件中
savaAsTextFile算子
功能:將RDD的數(shù)據(jù)寫入文本文件中支持本地寫出, hdfs等文件系統(tǒng).
代碼:
演示
這是因為這個方法本質上依賴大數(shù)據(jù)的Hadoop框架,需要配置Hadoop 依賴.
配置Hadoop依賴
調用保存文件的算子,需要配置Hadoop依賴。
- 下載Hadoop安裝包解壓到電腦任意位置
- 在Python代碼中使用os模塊配置: os.environ['HADOOP_HOME']='HADOOP解壓文件夾路徑′。
- 下載winutils.exe,并放入Hadoop解壓文件夾的bin目錄內
- 下載hadoop.dll,并放入:C:/Windows/System32文件夾內
配置完成之后,執(zhí)行下面的代碼
from pyspark import SparkConf,SparkContext import os os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe" os.environ['HADOOP_HOME']='D:/heima_hadoop/hadoop-3.0.0' conf=SparkConf().setMaster("local[*]").setAppName("test_spark") sc=SparkContext(conf=conf) #準備rdd rdd1=sc.parallelize([1,2,3,4,5]) rdd2=sc.parallelize([("asdf",3),("w3er_!2",5),("hello",3)]) rdd3=sc.parallelize([[1,2,3],[3,2,4],[4,3,5]]) #輸出到文件中 rdd1.saveAsTextFile("D:/output1") rdd2.saveAsTextFile("D:/output2") rdd3.saveAsTextFile("D:/output3")
結果是
輸出的文件夾中有這么8文件,是因為RDD被默認為分成8個分區(qū)
SaveAsTextFile算子輸出文件的個數(shù)是根據(jù)RDD的分區(qū)來決定的,有多少分區(qū)就會輸出多少個文件,RDD在本電腦中默認是8(該電腦CPU核心數(shù)是8核)
打開設備管理器就可以查看處理器個數(shù),這里是有8個邏輯CPU
或者打開任務管理器就可以看到是4核8個邏輯CPU
修改rdd分區(qū)為1個
方式1, SparkConf對象設置屬性全局并行度為1:
方式2,創(chuàng)建RDD的時候設置( parallelize方法傳入numSlices參數(shù)為1)
from pyspark import SparkConf,SparkContext import os os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe" os.environ['HADOOP_HOME']='D:/heima_hadoop/hadoop-3.0.0' conf=SparkConf().setMaster("local[*]").setAppName("test_spark") #rdd分區(qū)設置為1 conf.set("spark.default.parallelism","1") sc=SparkContext(conf=conf) #準備rdd rdd1=sc.parallelize([1,2,3,4,5]) rdd2=sc.parallelize([("asdf",3),("w3er_!2",5),("hello",3)]) rdd3=sc.parallelize([[1,2,3],[3,2,4],[4,3,5]]) #輸出到文件中 rdd1.saveAsTextFile("D:/output1") rdd2.saveAsTextFile("D:/output2") rdd3.saveAsTextFile("D:/output3")
結果是
小結
1.RDD輸出到文件的方法
- rdd.saveAsTextFile(路徑)
- 輸出的結果是一個文件夾
- 有幾個分區(qū)就輸出多少個結果文件
2.如何修改RDD分區(qū)
- SparkConf對象設置conf.set("spark.default.parallelism", "7")
- 創(chuàng)建RDD的時候,sc.parallelize方法傳入numSlices參數(shù)為1
四.練習案例
需求:
讀取文件轉換成RDD,并完成:
- 打印輸出:熱門搜索時間段(小時精度)Top3
- 打印輸出:熱門搜索詞Top3
- 打印輸出:統(tǒng)計黑馬程序員關鍵字在哪個時段被搜索最多
- 將數(shù)據(jù)轉換為JSON格式,寫出為文件
代碼
from pyspark import SparkConf,SparkContext import os os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe" os.environ['HADOOP_HOME']='D:/heima_hadoop/hadoop-3.0.0' conf=SparkConf().setMaster("local[*]").setAppName("test_spark") #rdd分區(qū)設置為1 conf.set("spark.default.parallelism","1") sc=SparkContext(conf=conf) rdd=sc.textFile("D:/search_log.txt") #需求1 打印輸出:熱門搜索時間段(小時精度)Top3 # 取出全部的時間并轉換為小時 # 轉換為(小時,1)的二元元組 # Key分組聚合Value # 排序(降序) # 取前3 result1=rdd.map(lambda x:x.split("\t")).\ map(lambda x:x[0][:2]).\ map(lambda x:(x,1)).\ reduceByKey(lambda x,y:x+y).\ sortBy(lambda x:x[1],ascending=False,numPartitions=1).\ take(3)#上面用的‘/'是換行的意思,當一行代碼太長時就可以這樣用 print(result1) #需求2 打印輸出:熱門搜索詞Top3 # 取出全部的搜索詞 # (詞,1)二元元組 # 分組聚合 # 排序 # Top3 result2=rdd.map(lambda x:x.split("\t")).\ map(lambda x:x[2])\ .map(lambda x:(x,1)).\ reduceByKey(lambda x,y:x+y).\ sortBy(lambda x:x[1],ascending=False,numPartitions=1).\ take(3) print(result2) #需求3 打印輸出:統(tǒng)計黑馬程序員關鍵字在哪個時段被搜索最多 result3=rdd.map(lambda x:x.split("\t")).\ filter((lambda x:x[2]=="黑馬程序員")).\ map(lambda x:(x[0][:2],1)).\ reduceByKey(lambda x,y:x+y).\ sortBy(lambda x:x[1],ascending=False,numPartitions=1).\ take(3) print(result3) #需求4 將數(shù)據(jù)轉換為JSON格式,寫出為文件 rdd.map(lambda x:x.split("\t")).\ map(lambda x:{"time":x[0],"id":x[1],"key":x[2],"num1":x[3],"num2":x[4],"url":x[5]})\ .saveAsTextFile("D:/out_json")
結果是
到此這篇關于PySpark中RDD的數(shù)據(jù)輸出詳解的文章就介紹到這了,更多相關PySpark RDD數(shù)據(jù)輸出內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Python實現(xiàn)解析參數(shù)的三種方法詳解
這篇文章主要介紹了python解析參數(shù)的三種方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2022-07-07