亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

PySpark中RDD的數(shù)據(jù)輸出問題詳解

 更新時間:2023年01月15日 09:46:02   作者:陽862  
RDD是 Spark 中最基礎的抽象,它表示了一個可以并行操作的、不可變得、被分區(qū)了的元素集合,這篇文章主要介紹了PySpark中RDD的數(shù)據(jù)輸出詳解,需要的朋友可以參考下

RDD概念

RDD(resilient distributed dataset ,彈性分布式數(shù)據(jù)集),是 Spark 中最基礎的抽象。它表示了一個可以并行操作的、不可變得、被分區(qū)了的元素集合。用戶不需要關心底層復雜的抽象處理,直接使用方便的算子處理和計算就可以了。

RDD的特點

1) . 分布式 RDD是一個抽象的概念,RDD在spark driver中,通過RDD來引用數(shù)據(jù),數(shù)據(jù)真正存儲在節(jié)點機的partition上。

2). 只讀 在Spark中RDD一旦生成了,就不能修改。 那么為什么要設置為只讀,設置為只讀的話,因為不存在修改,并發(fā)的吞吐量就上來了。

3). 血緣關系 我們需要對RDD進行一系列的操作,因為RDD是只讀的,我們只能不斷的生產新的RDD,這樣,新的RDD與原來的RDD就會存在一些血緣關系。

Spark會記錄這些血緣關系,在后期的容錯上會有很大的益處。

4). 緩存 當一個 RDD 需要被重復使用時,或者當任務失敗重新計算的時候,這時如果將 RDD 緩存起來,就可以避免重新計算,保證程序運行的性能。

一. 回顧

數(shù)據(jù)輸入:

  • sc.parallelize
  • sc.textFile

數(shù)據(jù)計算:

  • rdd.map
  • rdd.flatMap
  • rdd.reduceByKey
  • .…

二.輸出為python對象

數(shù)據(jù)輸出可用的方法是很多的,這里簡單介紹常會用到的4個

  • collect:將RDD內容轉換為list
  • reduce:對RDD內容進行自定義聚合
  • take:取出RDD的前N個元素組成list
  • count:統(tǒng)計RDD元素個數(shù)

collect算子

功能:將RDD各個分區(qū)內的數(shù)據(jù),統(tǒng)一收集到Driver中,形成一個List對象
用法:
rdd.collect()
返回值是一個list

演示

from pyspark import SparkContext,SparkConf
import os
os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
 
#準備一個RDD
rdd=sc.parallelize([1,2,3,4,5])
#collect算子,輸出RDD為list對象
print("rdd是:",rdd)
print("rdd.collect是:",rdd.collect())

結果是

 單獨輸出rdd,輸出的是rdd的類名而非內容

reduce算子

功能:對RDD數(shù)據(jù)集按照你傳入的邏輯進行聚合

語法:

代碼

 返回值等于計算函數(shù)的返回值

 演示

from pyspark import SparkContext,SparkConf
import os
os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
 
#準備一個RDD
rdd=sc.parallelize([1,2,3,4,5])
#collect算子,輸出RDD為list對象
print("rdd是:",rdd)
print("rdd.collect是:",rdd.collect())
print("rdd.collect的類型是:",type(rdd.collect()))
#reduce算子,對RDD進行兩兩聚合
num=rdd.reduce(lambda x,y:x+y)
print(num)

結果是

 take算子

功能:取RDD的前N個元素,組合成list返回給你
用法:

 演示

from pyspark import SparkContext,SparkConf
import os
os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
 
#準備一個RDD
rdd=sc.parallelize([1,2,3,4,5])
#collect算子,輸出RDD為list對象
print("rdd是:",rdd)
print("rdd.collect是:",rdd.collect())
print("rdd.collect的類型是:",type(rdd.collect()))
#reduce算子,對RDD進行兩兩聚合
num=rdd.reduce(lambda x,y:x+y)
print(num)
#take算子,取出RDD前n個元素,組成list返回
take_list=rdd.take(3)
print(take_list)

結果是

 count算子

功能:計算RDD有多少條數(shù)據(jù),返回值是一個數(shù)字
用法:

 演示

from pyspark import SparkContext,SparkConf
import os
os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
 
#準備一個RDD
rdd=sc.parallelize([1,2,3,4,5])
#collect算子,輸出RDD為list對象
print("rdd是:",rdd)
print("rdd.collect是:",rdd.collect())
print("rdd.collect的類型是:",type(rdd.collect()))
#reduce算子,對RDD進行兩兩聚合
num=rdd.reduce(lambda x,y:x+y)
print(num)
#take算子,取出RDD前n個元素,組成list返回
take_list=rdd.take(3)
print(take_list)
#count算子,統(tǒng)計rdd中有多少條數(shù)據(jù),返回值為數(shù)字
num_count=rdd.count()
print(num_count)
#關閉鏈接
sc.stop()

結果是

小結

1.Spark的編程流程就是:

  • 將數(shù)據(jù)加載為RDD(數(shù)據(jù)輸入)對RDD進行計算(數(shù)據(jù)計算)
  • 將RDD轉換為Python對象(數(shù)據(jù)輸出)

 2.數(shù)據(jù)輸出的方法

  • collect:將RDD內容轉換為list
  • reduce:對RDD內容進行自定義聚合
  • take:取出RDD的前N個元素組成list
  • count:統(tǒng)計RDD元素個數(shù)

數(shù)據(jù)輸出可用的方法是很多的,這里只是簡單介紹4個

三.輸出到文件中

savaAsTextFile算子

功能:將RDD的數(shù)據(jù)寫入文本文件中支持本地寫出, hdfs等文件系統(tǒng).
代碼:

 演示

 這是因為這個方法本質上依賴大數(shù)據(jù)的Hadoop框架,需要配置Hadoop 依賴.

配置Hadoop依賴

調用保存文件的算子,需要配置Hadoop依賴。

  • 下載Hadoop安裝包解壓到電腦任意位置
  • 在Python代碼中使用os模塊配置: os.environ['HADOOP_HOME']='HADOOP解壓文件夾路徑′。
  • 下載winutils.exe,并放入Hadoop解壓文件夾的bin目錄內
  • 下載hadoop.dll,并放入:C:/Windows/System32文件夾內

配置完成之后,執(zhí)行下面的代碼

from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
os.environ['HADOOP_HOME']='D:/heima_hadoop/hadoop-3.0.0'
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
 
#準備rdd
rdd1=sc.parallelize([1,2,3,4,5])
rdd2=sc.parallelize([("asdf",3),("w3er_!2",5),("hello",3)])
rdd3=sc.parallelize([[1,2,3],[3,2,4],[4,3,5]])
#輸出到文件中
rdd1.saveAsTextFile("D:/output1")
rdd2.saveAsTextFile("D:/output2")
rdd3.saveAsTextFile("D:/output3")

結果是

 輸出的文件夾中有這么8文件,是因為RDD被默認為分成8個分區(qū)
SaveAsTextFile算子輸出文件的個數(shù)是根據(jù)RDD的分區(qū)來決定的,有多少分區(qū)就會輸出多少個文件,RDD在本電腦中默認是8(該電腦CPU核心數(shù)是8核)

 打開設備管理器就可以查看處理器個數(shù),這里是有8個邏輯CPU
或者打開任務管理器就可以看到是4核8個邏輯CPU

 修改rdd分區(qū)為1個

方式1, SparkConf對象設置屬性全局并行度為1:

 方式2,創(chuàng)建RDD的時候設置( parallelize方法傳入numSlices參數(shù)為1)

from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
os.environ['HADOOP_HOME']='D:/heima_hadoop/hadoop-3.0.0'
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
#rdd分區(qū)設置為1
conf.set("spark.default.parallelism","1")
sc=SparkContext(conf=conf)
 
#準備rdd
rdd1=sc.parallelize([1,2,3,4,5])
rdd2=sc.parallelize([("asdf",3),("w3er_!2",5),("hello",3)])
rdd3=sc.parallelize([[1,2,3],[3,2,4],[4,3,5]])
#輸出到文件中
rdd1.saveAsTextFile("D:/output1")
rdd2.saveAsTextFile("D:/output2")
rdd3.saveAsTextFile("D:/output3")

結果是

 小結

1.RDD輸出到文件的方法

  • rdd.saveAsTextFile(路徑)
  • 輸出的結果是一個文件夾
  • 有幾個分區(qū)就輸出多少個結果文件

2.如何修改RDD分區(qū)

  • SparkConf對象設置conf.set("spark.default.parallelism", "7")
  • 創(chuàng)建RDD的時候,sc.parallelize方法傳入numSlices參數(shù)為1

四.練習案例

需求: 

讀取文件轉換成RDD,并完成:

  • 打印輸出:熱門搜索時間段(小時精度)Top3
  • 打印輸出:熱門搜索詞Top3
  • 打印輸出:統(tǒng)計黑馬程序員關鍵字在哪個時段被搜索最多
  • 將數(shù)據(jù)轉換為JSON格式,寫出為文件

代碼

from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
os.environ['HADOOP_HOME']='D:/heima_hadoop/hadoop-3.0.0'
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
#rdd分區(qū)設置為1
conf.set("spark.default.parallelism","1")
sc=SparkContext(conf=conf)
 
rdd=sc.textFile("D:/search_log.txt")
#需求1 打印輸出:熱門搜索時間段(小時精度)Top3
# 取出全部的時間并轉換為小時
# 轉換為(小時,1)的二元元組
# Key分組聚合Value
# 排序(降序)
# 取前3
result1=rdd.map(lambda x:x.split("\t")).\
    map(lambda x:x[0][:2]).\
    map(lambda x:(x,1)).\
    reduceByKey(lambda x,y:x+y).\
    sortBy(lambda x:x[1],ascending=False,numPartitions=1).\
    take(3)#上面用的‘/'是換行的意思,當一行代碼太長時就可以這樣用
print(result1)
#需求2 打印輸出:熱門搜索詞Top3
# 取出全部的搜索詞
# (詞,1)二元元組
# 分組聚合
# 排序
# Top3
result2=rdd.map(lambda x:x.split("\t")).\
    map(lambda x:x[2])\
    .map(lambda x:(x,1)).\
    reduceByKey(lambda x,y:x+y).\
    sortBy(lambda x:x[1],ascending=False,numPartitions=1).\
    take(3)
print(result2)
#需求3 打印輸出:統(tǒng)計黑馬程序員關鍵字在哪個時段被搜索最多
result3=rdd.map(lambda x:x.split("\t")).\
    filter((lambda x:x[2]=="黑馬程序員")).\
    map(lambda x:(x[0][:2],1)).\
    reduceByKey(lambda x,y:x+y).\
    sortBy(lambda x:x[1],ascending=False,numPartitions=1).\
    take(3)
print(result3)
#需求4 將數(shù)據(jù)轉換為JSON格式,寫出為文件
rdd.map(lambda x:x.split("\t")).\
    map(lambda x:{"time":x[0],"id":x[1],"key":x[2],"num1":x[3],"num2":x[4],"url":x[5]})\
    .saveAsTextFile("D:/out_json")

結果是

到此這篇關于PySpark中RDD的數(shù)據(jù)輸出詳解的文章就介紹到這了,更多相關PySpark RDD數(shù)據(jù)輸出內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • Python實現(xiàn)簡易信息分類存儲軟件

    Python實現(xiàn)簡易信息分類存儲軟件

    這篇文章主要介紹的是通過Python制作一個簡易的文件分類存儲文件,可以實現(xiàn)信息的增刪改查以及內容的導出和回復,文中的示例代碼對我們的學習有一定的價值,感興趣的同學可以了解一下
    2021-12-12
  • Python常用內置函數(shù)和關鍵字使用詳解

    Python常用內置函數(shù)和關鍵字使用詳解

    在Python中有許許多多的內置函數(shù)和關鍵字,它們是我們日常中經常可以使用的到的一些基礎的工具,可以方便我們的工作。本文將詳細講解他們的使用方法,需要的可以參考一下
    2022-05-05
  • python?super()函數(shù)的詳解

    python?super()函數(shù)的詳解

    這篇文章主要為大家介紹了python?super()函數(shù),具有一定的參考價值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助
    2021-11-11
  • Python實現(xiàn)解析參數(shù)的三種方法詳解

    Python實現(xiàn)解析參數(shù)的三種方法詳解

    這篇文章主要介紹了python解析參數(shù)的三種方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2022-07-07
  • python將excel轉換為csv的代碼方法總結

    python將excel轉換為csv的代碼方法總結

    在本篇文章里小編給大家分享了關于python如何將excel轉換為csv的實例方法和代碼內容,需要的朋友們學習下。
    2019-07-07
  • Python 將json序列化后的字符串轉換成字典(推薦)

    Python 將json序列化后的字符串轉換成字典(推薦)

    這篇文章主要介紹了Python 將json序列化后的字符串轉換成字典,本文給大家介紹的非常詳細,具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-01-01
  • Python可視化Tkinter進階grid布局詳情

    Python可視化Tkinter進階grid布局詳情

    這篇文章主要介紹了Python可視化Tkinter進階grid布局詳情,文章圍繞主題展開詳細的內容介紹,具有一定的參考價值,需要的小伙伴可以參考一下
    2022-07-07
  • Python 恐龍跑跑小游戲實現(xiàn)流程

    Python 恐龍跑跑小游戲實現(xiàn)流程

    大家好,本篇文章主要講的是用python實現(xiàn)谷歌小恐龍小游戲,看看這是你斷網(wǎng)時的樣子么,感興趣的同學趕快來看一看吧,對你有幫助的話記得收藏一下
    2022-02-02
  • Python生成rsa密鑰對操作示例

    Python生成rsa密鑰對操作示例

    這篇文章主要介紹了Python生成rsa密鑰對操作,涉及Python rsa加密與密鑰生成相關操作技巧,需要的朋友可以參考下
    2019-04-04
  • python閉包的實例詳解

    python閉包的實例詳解

    在本篇文章里小編給大家整理的是一篇關于python閉包的實例詳解內容,有興趣的朋友們可以學習下。
    2021-10-10

最新評論