亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

使用PySpark實現(xiàn)數(shù)據(jù)清洗與JSON格式轉(zhuǎn)換的實踐詳解

 更新時間:2023年12月15日 08:21:24   作者:冷月半明  
在大數(shù)據(jù)處理中,PySpark?提供了強大的工具來處理海量數(shù)據(jù),特別是在數(shù)據(jù)清洗和轉(zhuǎn)換方面,本文將介紹如何使用?PySpark?進行數(shù)據(jù)清洗,并將數(shù)據(jù)格式轉(zhuǎn)換為?JSON?格式的實踐,感興趣的可以了解下

簡介

PySpark 是 Apache Spark 的 Python API,可用于處理大規(guī)模數(shù)據(jù)集。它提供了豐富的功能和庫,使得數(shù)據(jù)清洗和轉(zhuǎn)換變得更加高效和便捷。

代碼實踐

本文將以一個示例數(shù)據(jù)集為例,演示如何使用 PySpark 對數(shù)據(jù)進行清洗和轉(zhuǎn)換。以下是代碼實現(xiàn)的主要步驟:

步驟 1:連接到遠程 Spark 服務器

# Author: 冷月半明
# Date: 2023/12/14
# Description: This script does XYZ.

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("RemoteSparkConnection") \
    .master("yarn") \
    .config("spark.pyspark.python", "/opt/apps/anaconda3/envs/myspark/bin/python") \
    .config("spark.sql.warehouse.dir", "/hive/warehouse") \
    .config("hive.metastore.uris", "thrift://node01:9083") \
    .config("spark.sql.parquet.writeLegacyFormat", "true") \
    .enableHiveSupport() \
    .getOrCreate()

當使用 PySpark 進行大數(shù)據(jù)處理時,首先需要建立與 Spark 集群的連接。在這段代碼中,我們使用了 SparkSession 類來創(chuàng)建一個與遠程 Spark 服務器的連接,并設置了連接所需的參數(shù)。

導入必要的庫: 我們首先導入了 SparkSession 類,這是 PySpark 中用于管理 Spark 應用程序的入口點。

建立連接: 在接下來的代碼中,我們使用 SparkSession.builder 來創(chuàng)建一個 SparkSession 對象。這個對象允許我們設置應用程序的名稱、集群的主節(jié)點、配置項等參數(shù)。在這個例子中:

  • .appName("RemoteSparkConnection"):為我們的 Spark 應用程序設置了一個名稱,這有助于在集群中識別應用程序。
  • .master("yarn"):指定 Spark 應用程序的主節(jié)點,這里使用的是 YARN 資源管理器,用于分配和管理集群資源。
  • .config("spark.pyspark.python", "/opt/apps/anaconda3/envs/myspark/bin/python"):設置了 PySpark 使用的 Python 解釋器路徑,確保在集群中使用正確的 Python 環(huán)境。(因為所使用的環(huán)境為anaconda創(chuàng)建的虛擬環(huán)境,先進入虛擬環(huán)境,然后使用which python查看解釋器位置)
  • .config("spark.sql.warehouse.dir", "/hive/warehouse"):指定了 Spark SQL 的倉庫目錄,這對于數(shù)據(jù)存儲和管理非常重要。如果不指定的話使用sparksql獲取hive的時候可能會出現(xiàn)問題。
  • .config("hive.metastore.uris", "thrift://node01:9083"):配置 Hive 元數(shù)據(jù)存儲的 URI,Hive 是 Hadoop 生態(tài)系統(tǒng)中的一部分,用于管理數(shù)據(jù)倉庫。如果不指定的話使用sparksql獲取hive的時候可能會出只能獲取define默認倉庫的情況。
  • .config("spark.sql.parquet.writeLegacyFormat", "true"):設置了寫入 Parquet 格式數(shù)據(jù)時使用傳統(tǒng)格式,確保兼容性和向后兼容性。因為spark寫入和hive不同,使用該配置可保證spark寫入hive的數(shù)據(jù),hive能正常訪問。
  • .enableHiveSupport():啟用了對 Hive 的支持,允許在 Spark 中使用 Hive 的功能和特性。
  • .getOrCreate():最后使用 .getOrCreate() 方法創(chuàng)建或獲取 SparkSession 實例。

總而言之,這段代碼建立了與遠程 Spark 服務器的連接,并配置了各種參數(shù)以確保應用程序能夠正確地運行和訪問集群資源。這是使用 PySpark 開展大數(shù)據(jù)處理工作的第一步,為后續(xù)的數(shù)據(jù)處理和分析創(chuàng)建了必要的環(huán)境和基礎設施。

步驟 2:加載數(shù)據(jù)

df = spark.sql("SELECT * FROM cjw_data.xiecheng;")

使用 PySpark 的 spark.sql() 函數(shù)執(zhí)行 SQL 查詢,將查詢結(jié)果加載到 DataFrame 中,為后續(xù)的數(shù)據(jù)操作和分析做好準備。這種靈活性和強大的數(shù)據(jù)處理能力是 PySpark 在大數(shù)據(jù)處理中的關鍵優(yōu)勢之一。

步驟 3:數(shù)據(jù)清洗與 JSON 格式轉(zhuǎn)換

from pyspark.sql.functions import udf
import json

def json_clean(commentlist):
    try:
        jsonstr = str(commentlist)
        s = jsonstr.replace("'", '"')
        s = '[' + s.replace('}{', '},{') + ']'
        python_obj = json.loads(s, strict=False)
        json_str = json.dumps(python_obj)
        return json_str
    except:
        return None

json_clean_udf = udf(json_clean, StringType())

df = df.withColumn("new_commentlist", json_clean_udf(df["commentlist"]))
newdf = df.withColumn("commentlist", df["new_commentlist"])
newdf = newdf.drop("new_commentlist")

在 PySpark 中定義并應用一個用戶自定義函數(shù)(UDF)來對數(shù)據(jù)進行清洗和轉(zhuǎn)換。

定義數(shù)據(jù)清洗函數(shù): json_clean() 函數(shù)接收一個名為 commentlist 的參數(shù),這個函數(shù)用于將從數(shù)據(jù)庫中檢索到的評論數(shù)據(jù)進行清洗。具體來說:

  • jsonstr = str(commentlist):將傳入的 commentlist 轉(zhuǎn)換為字符串格式。
  • s = jsonstr.replace("'", '"'):將字符串中的單引號替換為雙引號,以滿足 JSON 格式的要求。
  • s = '[' + s.replace('}{', '},{') + ']':在字符串中的每個對象之間添加逗號,并將整個字符串包含在一個數(shù)組中,以滿足 JSON 格式。
  • python_obj = json.loads(s, strict=False):將字符串解析為 Python 對象。
  • json_str = json.dumps(python_obj):將 Python 對象轉(zhuǎn)換回 JSON 字符串格式。
  • return json_str:返回清洗后的 JSON 字符串,如果清洗失敗則返回 None。

創(chuàng)建用戶定義函數(shù)(UDF): 使用 udf() 函數(shù)將 Python 函數(shù) json_clean() 封裝為 PySpark 的用戶定義函數(shù)(UDF),以便在 Spark 中使用。

應用函數(shù)到 DataFrame: df.withColumn() 函數(shù)將定義的 UDF 應用于 DataFrame 中的 commentlist 列,并將處理后的結(jié)果存儲到名為 new_commentlist 的新列中。

更新 DataFrame: 創(chuàng)建了一個新的 DataFrame newdf,通過在原始 DataFrame df 的基礎上添加了經(jīng)過清洗的 commentlist 列,并刪除了原始的 new_commentlist 列。

步驟 4:保存清洗后的數(shù)據(jù)

newdf.write.mode("overwrite").saveAsTable("cjw_data.xiechengsentiment")
  • 使用 write 方法: write 方法用于將 DataFrame 中的數(shù)據(jù)寫入外部存儲,可以是文件系統(tǒng)或數(shù)據(jù)庫。
  • 指定保存模式(Mode): 在這個例子中,.mode("overwrite") 指定了保存模式為 "overwrite",即如果目標位置已存在同名的表或數(shù)據(jù),將覆蓋(重寫)已有的內(nèi)容。
  • 保存為表(saveAsTable): saveAsTable() 方法將 DataFrame 中的數(shù)據(jù)保存為一個新的表,名為 cjw_data.xiechengsentiment。這意味著,如果該表不存在,它將會被創(chuàng)建;如果表已存在,根據(jù)指定的模式進行重寫或追加。

步驟 5:統(tǒng)計數(shù)據(jù)

comment_count = newdf.filter(newdf.commentlist != "[]").count()
total_count = newdf.count()

print("有效長度:", comment_count)
print("總長度:", total_count)
  • 篩選非空數(shù)據(jù)行: newdf.filter(newdf.commentlist != "[]") 這部分代碼使用 filter() 方法篩選出 newdf DataFrame 中 commentlist 列不為空的行。這里使用的條件是 commentlist != "[]",即不等于空列表的行。也就是篩選出之前udf函數(shù)里返還為空的那些解析JSON異常的行。
  • 統(tǒng)計有效數(shù)據(jù)長度: 通過 count() 方法統(tǒng)計篩選后的 DataFrame 中的行數(shù),即得到了符合條件的有效數(shù)據(jù)的長度,并將結(jié)果存儲在變量 comment_count 中。
  • 統(tǒng)計總長度: newdf.count() 統(tǒng)計了整個 DataFrame 的行數(shù),即獲取了總長度,并將結(jié)果存儲在變量 total_count 中。

截圖示例

清洗后示意圖:

hive表中查看清洗后的數(shù)據(jù):

輸出的字符串中包含了轉(zhuǎn)義字符(例如 \u597d),這些字符實際上是 Unicode 字符的表示方式,而不是真正的亂碼。 Python 中的 json.dumps() 方法默認將非 ASCII 字符串轉(zhuǎn)換為 Unicode 轉(zhuǎn)義序列。這種轉(zhuǎn)義是為了確保 JSON 字符串可以被準確地傳輸和解析,但可能會在輸出時顯示為 Unicode 轉(zhuǎn)義字符。

JSON 是一種數(shù)據(jù)交換格式,它使用 Unicode 轉(zhuǎn)義序列(比如 \uXXXX)來表示非 ASCII 字符。在默認情況下,json.dumps() 將非 ASCII 字符轉(zhuǎn)義為 Unicode 字符以確保其正確性,并且這種轉(zhuǎn)義對于網(wǎng)絡傳輸和解析是非常重要的

總結(jié)

本文介紹了使用 PySpark 對數(shù)據(jù)進行清洗和 JSON 格式轉(zhuǎn)換的過程。通過上述步驟,我們可以連接到遠程 Spark 服務器,加載數(shù)據(jù),應用自定義函數(shù)對數(shù)據(jù)進行清洗和格式轉(zhuǎn)換,并最終保存清洗后的數(shù)據(jù)。這個流程展示了 PySpark 在數(shù)據(jù)處理中的強大功能,特別是在大規(guī)模數(shù)據(jù)集的處理和轉(zhuǎn)換方面的優(yōu)勢。

以上就是使用PySpark實現(xiàn)數(shù)據(jù)清洗與JSON格式轉(zhuǎn)換的實踐詳解的詳細內(nèi)容,更多關于PySpark數(shù)據(jù)清洗與JSON格式轉(zhuǎn)換的資料請關注腳本之家其它相關文章!

相關文章

  • python將二維數(shù)組升為一維數(shù)組或二維降為一維方法實例

    python將二維數(shù)組升為一維數(shù)組或二維降為一維方法實例

    在實際應用的過程中,遇到列表或是數(shù)組的維數(shù)不同,需要變換的問題,如二維列表/數(shù)組變成了一維列表/數(shù)組,下面這篇文章主要給大家介紹了關于python將二維數(shù)組升為一維數(shù)組或二維降為一維的相關資料,需要的朋友可以參考下
    2022-11-11
  • Python實現(xiàn)合并兩個字典的8種方法

    Python實現(xiàn)合并兩個字典的8種方法

    Python有多種方法可以通過使用各種函數(shù)和構(gòu)造函數(shù)來合并字典,本文主要介紹了Python實現(xiàn)合并兩個字典的8種方法,具有一定的參考價值,感興趣的可以了解一下
    2024-07-07
  • Pytorch中retain_graph的坑及解決

    Pytorch中retain_graph的坑及解決

    這篇文章主要介紹了Pytorch中retain_graph的坑及解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-02-02
  • 使用k8s部署Django項目的方法步驟

    使用k8s部署Django項目的方法步驟

    這篇文章主要介紹了使用k8s部署Django項目的方法步驟,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2019-01-01
  • Python條件語句與循環(huán)語句

    Python條件語句與循環(huán)語句

    這篇文章主要介紹了Python條件語句與循環(huán)語句,條件語句就是通過指定的表達式的運行結(jié)果來判斷當前是執(zhí)行還是跳過某些指定的語句塊,循環(huán)語句就是對某些語句的重復執(zhí)行,這個重復執(zhí)行是通過指定表達式來控制的,下面來看具體內(nèi)容及續(xù)航管案例吧,需要的朋友可以參考一下
    2021-11-11
  • python語音信號處理詳細教程

    python語音信號處理詳細教程

    在深度學習中,語音的輸入都是需要處理的,下面這篇文章主要給大家介紹了關于python語音信號處理的相關資料,文中通過實例代碼介紹的非常詳細,需要的朋友可以參考下
    2023-01-01
  • 深入了解Django View(視圖系統(tǒng))

    深入了解Django View(視圖系統(tǒng))

    這篇文章主要介紹了簡單了解Django View(視圖系統(tǒng)),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2019-07-07
  • python編寫暴力破解zip文檔程序的實例講解

    python編寫暴力破解zip文檔程序的實例講解

    下面小編就為大家分享一篇python編寫暴力破解zip文檔程序的實例講解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2018-04-04
  • Python Pandas兩個表格內(nèi)容模糊匹配的實現(xiàn)

    Python Pandas兩個表格內(nèi)容模糊匹配的實現(xiàn)

    模糊查詢大家應該都不會陌生,下面這篇文章主要給大家介紹了關于Python Pandas兩個表格內(nèi)容模糊匹配的實現(xiàn)方法,文中通過示例代碼介紹的非常詳細,需要的朋友可以參考下
    2021-11-11
  • Python實現(xiàn)半角轉(zhuǎn)全角的方法示例

    Python實現(xiàn)半角轉(zhuǎn)全角的方法示例

    本文介紹了使用Python實現(xiàn)半角字符到全角字符的轉(zhuǎn)換,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2025-01-01

最新評論