亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

pyspark創(chuàng)建DataFrame的幾種方法

 更新時(shí)間:2021年05月17日 14:17:14   作者:Nick_Spider  
為了便于操作,使用pyspark時(shí)我們通常將數(shù)據(jù)轉(zhuǎn)為DataFrame的形式來(lái)完成清洗和分析動(dòng)作。那么你知道pyspark創(chuàng)建DataFrame有幾種方法嗎,下面就一起來(lái)了解一下

pyspark創(chuàng)建DataFrame

為了便于操作,使用pyspark時(shí)我們通常將數(shù)據(jù)轉(zhuǎn)為DataFrame的形式來(lái)完成清洗和分析動(dòng)作。

RDD和DataFrame

在上一篇pyspark基本操作有提到RDD也是spark中的操作的分布式數(shù)據(jù)對(duì)象。

這里簡(jiǎn)單看一下RDD和DataFrame的類型。

print(type(rdd))  # <class 'pyspark.rdd.RDD'>
print(type(df))   # <class 'pyspark.sql.dataframe.DataFrame'>

翻閱了一下源碼的定義,可以看到他們之間并沒(méi)有繼承關(guān)系。

class RDD(object):

    """
    A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
    Represents an immutable, partitioned collection of elements that can be
    operated on in parallel.
    """

class DataFrame(object):
    """A distributed collection of data grouped into named columns.

    A :class:`DataFrame` is equivalent to a relational table in Spark SQL,
    and can be created using various functions in :class:`SparkSession`::
 ...
    """

RDD是一種彈性分布式數(shù)據(jù)集,Spark中的基本抽象。表示一種不可變的、分區(qū)儲(chǔ)存的集合,可以進(jìn)行并行操作。
DataFrame是一種以列對(duì)數(shù)據(jù)進(jìn)行分組表達(dá)的分布式集合, DataFrame等同于Spark SQL中的關(guān)系表。相同點(diǎn)是,他們都是為了支持分布式計(jì)算而設(shè)計(jì)。

但是RDD只是元素的集合,但是DataFrame以列進(jìn)行分組,類似于MySQL的表或pandas中的DataFrame。

實(shí)際工作中,我們用的更多的還是DataFrame。

使用二元組創(chuàng)建DataFrame

嘗試第一種情形發(fā)現(xiàn),僅僅傳入二元組,結(jié)果是沒(méi)有列名稱的。
于是我們嘗試第二種,同時(shí)傳入二元組和列名稱。

a = [('Alice', 1)]
output = spark.createDataFrame(a).collect()
print(output)
# [Row(_1='Alice', _2=1)]

output = spark.createDataFrame(a, ['name', 'age']).collect()
print(output)
# [Row(name='Alice', age=1)]

這里collect()是按行展示數(shù)據(jù)表,也可以使用show()對(duì)數(shù)據(jù)表進(jìn)行展示。

spark.createDataFrame(a).show()
# +-----+---+
# |   _1| _2|
# +-----+---+
# |Alice|  1|
# +-----+---+

spark.createDataFrame(a, ['name', 'age']).show()
# +-----+---+
# | name|age|
# +-----+---+
# |Alice|  1|
# +-----+---+

使用鍵值對(duì)創(chuàng)建DataFrame

d = [{'name': 'Alice', 'age': 1}]
output = spark.createDataFrame(d).collect()
print(output)

# [Row(age=1, name='Alice')]

使用rdd創(chuàng)建DataFrame

a = [('Alice', 1)]
rdd = sc.parallelize(a)
output = spark.createDataFrame(rdd).collect()
print(output)
output = spark.createDataFrame(rdd, ["name", "age"]).collect()
print(output)

# [Row(_1='Alice', _2=1)]
# [Row(name='Alice', age=1)]

基于rdd和ROW創(chuàng)建DataFrame

from pyspark.sql import Row


a = [('Alice', 1)]
rdd = sc.parallelize(a)
Person = Row("name", "age")
person = rdd.map(lambda r: Person(*r))
output = spark.createDataFrame(person).collect()
print(output)

# [Row(name='Alice', age=1)]

基于rdd和StructType創(chuàng)建DataFrame

from pyspark.sql.types import *

a = [('Alice', 1)]
rdd = sc.parallelize(a)
schema = StructType(
    [
        StructField("name", StringType(), True),
        StructField("age", IntegerType(), True)
    ]
)
output = spark.createDataFrame(rdd, schema).collect()
print(output)

# [Row(name='Alice', age=1)]

基于pandas DataFrame創(chuàng)建pyspark DataFrame

df.toPandas()可以把pyspark DataFrame轉(zhuǎn)換為pandas DataFrame。

df = spark.createDataFrame(rdd, ['name', 'age'])
print(df)  # DataFrame[name: string, age: bigint]

print(type(df.toPandas()))  # <class 'pandas.core.frame.DataFrame'>

# 傳入pandas DataFrame
output = spark.createDataFrame(df.toPandas()).collect()
print(output)

# [Row(name='Alice', age=1)]

創(chuàng)建有序的DataFrame

output = spark.range(1, 7, 2).collect()
print(output)
# [Row(id=1), Row(id=3), Row(id=5)]

output = spark.range(3).collect()
print(output)
# [Row(id=0), Row(id=1), Row(id=2)]

通過(guò)臨時(shí)表得到DataFrame

spark.registerDataFrameAsTable(df, "table1")
df2 = spark.table("table1")
b = df.collect() == df2.collect()
print(b)
# True

配置DataFrame和臨時(shí)表

創(chuàng)建DataFrame時(shí)指定列類型

在createDataFrame中可以指定列類型,只保留滿足數(shù)據(jù)類型的列,如果沒(méi)有滿足的列,會(huì)拋出錯(cuò)誤。

a = [('Alice', 1)]
rdd = sc.parallelize(a)

# 指定類型于預(yù)期數(shù)據(jù)對(duì)應(yīng)時(shí),正常創(chuàng)建
output = spark.createDataFrame(rdd, "a: string, b: int").collect()
print(output)  # [Row(a='Alice', b=1)]
rdd = rdd.map(lambda row: row[1])
print(rdd)  # PythonRDD[7] at RDD at PythonRDD.scala:53

# 只有int類型對(duì)應(yīng)上,過(guò)濾掉其他列。
output = spark.createDataFrame(rdd, "int").collect()
print(output)   # [Row(value=1)]

# 沒(méi)有列能對(duì)應(yīng)上,會(huì)拋出錯(cuò)誤。
output = spark.createDataFrame(rdd, "boolean").collect()
# TypeError: field value: BooleanType can not accept object 1 in type <class 'int'>

注冊(cè)DataFrame為臨時(shí)表

spark.registerDataFrameAsTable(df, "table1")
spark.dropTempTable("table1")

獲取和修改配置

print(spark.getConf("spark.sql.shuffle.partitions"))  # 200
print(spark.getConf("spark.sql.shuffle.partitions", u"10"))  # 10
print(spark.setConf("spark.sql.shuffle.partitions", u"50"))  # None
print(spark.getConf("spark.sql.shuffle.partitions", u"10"))  # 50

注冊(cè)自定義函數(shù)

spark.registerFunction("stringLengthString", lambda x: len(x))
output = spark.sql("SELECT stringLengthString('test')").collect()
print(output)
# [Row(stringLengthString(test)='4')]

spark.registerFunction("stringLengthString", lambda x: len(x), IntegerType())
output = spark.sql("SELECT stringLengthString('test')").collect()
print(output)
# [Row(stringLengthString(test)=4)]

spark.udf.register("stringLengthInt", lambda x: len(x), IntegerType())
output = spark.sql("SELECT stringLengthInt('test')").collect()
print(output)
# [Row(stringLengthInt(test)=4)]

查看臨時(shí)表列表

可以查看所有臨時(shí)表名稱和對(duì)象。

spark.registerDataFrameAsTable(df, "table1")
print(spark.tableNames())  # ['table1']
print(spark.tables())  # DataFrame[database: string, tableName: string, isTemporary: boolean]
print("table1" in spark.tableNames())  # True
print("table1" in spark.tableNames("default"))  # True

spark.registerDataFrameAsTable(df, "table1")
df2 = spark.tables()
df2.filter("tableName = 'table1'").first()
print(df2)  # DataFrame[database: string, tableName: string, isTemporary: boolean]

從其他數(shù)據(jù)源創(chuàng)建DataFrame

MySQL

前提是需要下載jar包。
Mysql-connector-java.jar

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pyspark.sql.functions as F


sc = SparkContext("local", appName="mysqltest")
sqlContext = SQLContext(sc)
df = sqlContext.read.format("jdbc").options(
    url="jdbc:mysql://localhost:3306/mydata?user=root&password=mysql&"
        "useUnicode=true&characterEncoding=utf-8&useJDBCCompliantTimezoneShift=true&"
        "useLegacyDatetimeCode=false&serverTimezone=UTC ", dbtable="detail_data").load()
df.show(n=5)
sc.stop()

參考

RDD和DataFrame的區(qū)別
spark官方文檔 翻譯 之pyspark.sql.SQLContext

到此這篇關(guān)于pyspark創(chuàng)建DataFrame的幾種方法的文章就介紹到這了,更多相關(guān)pyspark創(chuàng)建DataFrame 內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Python3監(jiān)控疫情的完整代碼

    Python3監(jiān)控疫情的完整代碼

    這篇文章主要介紹了Python3監(jiān)控疫情的完整代碼,代碼簡(jiǎn)單易懂,非常不錯(cuò)具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-02-02
  • 關(guān)于numpy.where()函數(shù) 返回值的解釋

    關(guān)于numpy.where()函數(shù) 返回值的解釋

    今天小編就為大家分享一篇關(guān)于numpy.where()函數(shù) 返回值的解釋,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2019-12-12
  • Python解析多幀dicom數(shù)據(jù)詳解

    Python解析多幀dicom數(shù)據(jù)詳解

    今天小編就為大家分享一篇Python解析多幀dicom數(shù)據(jù)詳解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2020-01-01
  • Python?statistics模塊示例詳解

    Python?statistics模塊示例詳解

    這篇文章主要介紹了Python?statistics模塊示例詳解,本文總結(jié)了 statistics 模塊的常規(guī)操作,對(duì)于數(shù)據(jù)分析還是非常有益處的,需要的朋友可以參考下
    2023-05-05
  • Pandas中shift庫(kù)的具體使用

    Pandas中shift庫(kù)的具體使用

    shift函數(shù)是Pandas庫(kù)中用于數(shù)據(jù)位移的函數(shù),常用于時(shí)間序列數(shù)據(jù)的處理,本文主要介紹了Pandas中shift庫(kù)的具體使用,具有一定的參考價(jià)值,感興趣的可以了解一下
    2024-06-06
  • Python自帶的IDE在哪里

    Python自帶的IDE在哪里

    在本篇內(nèi)容里小編給大家分享的是關(guān)于如何找到Python自帶的IDE的相關(guān)內(nèi)容,有興趣的朋友們可以學(xué)習(xí)下。
    2020-07-07
  • 淺談PyTorch的數(shù)據(jù)讀取機(jī)制Dataloader與Dataset

    淺談PyTorch的數(shù)據(jù)讀取機(jī)制Dataloader與Dataset

    這篇文章主要介紹了淺談PyTorch的數(shù)據(jù)讀取機(jī)制Dataloader與Dataset,DataLoader的作用是構(gòu)建一個(gè)可迭代的數(shù)據(jù)裝載器,每次執(zhí)行循環(huán)的時(shí)候,就從中讀取一批Batchsize大小的樣本進(jìn)行訓(xùn)練,需要的朋友可以參考下
    2023-07-07
  • Pycharm安裝python庫(kù)的方法

    Pycharm安裝python庫(kù)的方法

    這篇文章主要介紹了Pycharm安裝python庫(kù)的方法,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-11-11
  • python 數(shù)據(jù)類(dataclass)的具體使用

    python 數(shù)據(jù)類(dataclass)的具體使用

    本文主要介紹了python 數(shù)據(jù)類(dataclass)的具體使用,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-03-03
  • pycharm遠(yuǎn)程linux開(kāi)發(fā)和調(diào)試代碼的方法

    pycharm遠(yuǎn)程linux開(kāi)發(fā)和調(diào)試代碼的方法

    這篇文章主要介紹了pycharm遠(yuǎn)程linux開(kāi)發(fā)和調(diào)試代碼的方法,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2018-07-07

最新評(píng)論