使用PySpark實現(xiàn)數(shù)據(jù)清洗與JSON格式轉(zhuǎn)換的實踐詳解
簡介
PySpark 是 Apache Spark 的 Python API,可用于處理大規(guī)模數(shù)據(jù)集。它提供了豐富的功能和庫,使得數(shù)據(jù)清洗和轉(zhuǎn)換變得更加高效和便捷。
代碼實踐
本文將以一個示例數(shù)據(jù)集為例,演示如何使用 PySpark 對數(shù)據(jù)進行清洗和轉(zhuǎn)換。以下是代碼實現(xiàn)的主要步驟:
步驟 1:連接到遠程 Spark 服務器
# Author: 冷月半明 # Date: 2023/12/14 # Description: This script does XYZ. from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("RemoteSparkConnection") \ .master("yarn") \ .config("spark.pyspark.python", "/opt/apps/anaconda3/envs/myspark/bin/python") \ .config("spark.sql.warehouse.dir", "/hive/warehouse") \ .config("hive.metastore.uris", "thrift://node01:9083") \ .config("spark.sql.parquet.writeLegacyFormat", "true") \ .enableHiveSupport() \ .getOrCreate()
當使用 PySpark 進行大數(shù)據(jù)處理時,首先需要建立與 Spark 集群的連接。在這段代碼中,我們使用了 SparkSession
類來創(chuàng)建一個與遠程 Spark 服務器的連接,并設置了連接所需的參數(shù)。
導入必要的庫: 我們首先導入了 SparkSession
類,這是 PySpark 中用于管理 Spark 應用程序的入口點。
建立連接: 在接下來的代碼中,我們使用 SparkSession.builder
來創(chuàng)建一個 SparkSession
對象。這個對象允許我們設置應用程序的名稱、集群的主節(jié)點、配置項等參數(shù)。在這個例子中:
.appName("RemoteSparkConnection")
:為我們的 Spark 應用程序設置了一個名稱,這有助于在集群中識別應用程序。.master("yarn")
:指定 Spark 應用程序的主節(jié)點,這里使用的是 YARN 資源管理器,用于分配和管理集群資源。.config("spark.pyspark.python", "/opt/apps/anaconda3/envs/myspark/bin/python")
:設置了 PySpark 使用的 Python 解釋器路徑,確保在集群中使用正確的 Python 環(huán)境。(因為所使用的環(huán)境為anaconda創(chuàng)建的虛擬環(huán)境,先進入虛擬環(huán)境,然后使用which python查看解釋器位置).config("spark.sql.warehouse.dir", "/hive/warehouse")
:指定了 Spark SQL 的倉庫目錄,這對于數(shù)據(jù)存儲和管理非常重要。如果不指定的話使用sparksql獲取hive的時候可能會出現(xiàn)問題。.config("hive.metastore.uris", "thrift://node01:9083")
:配置 Hive 元數(shù)據(jù)存儲的 URI,Hive 是 Hadoop 生態(tài)系統(tǒng)中的一部分,用于管理數(shù)據(jù)倉庫。如果不指定的話使用sparksql獲取hive的時候可能會出只能獲取define默認倉庫的情況。.config("spark.sql.parquet.writeLegacyFormat", "true")
:設置了寫入 Parquet 格式數(shù)據(jù)時使用傳統(tǒng)格式,確保兼容性和向后兼容性。因為spark寫入和hive不同,使用該配置可保證spark寫入hive的數(shù)據(jù),hive能正常訪問。.enableHiveSupport()
:啟用了對 Hive 的支持,允許在 Spark 中使用 Hive 的功能和特性。.getOrCreate()
:最后使用.getOrCreate()
方法創(chuàng)建或獲取 SparkSession 實例。
總而言之,這段代碼建立了與遠程 Spark 服務器的連接,并配置了各種參數(shù)以確保應用程序能夠正確地運行和訪問集群資源。這是使用 PySpark 開展大數(shù)據(jù)處理工作的第一步,為后續(xù)的數(shù)據(jù)處理和分析創(chuàng)建了必要的環(huán)境和基礎設施。
步驟 2:加載數(shù)據(jù)
df = spark.sql("SELECT * FROM cjw_data.xiecheng;")
使用 PySpark 的 spark.sql()
函數(shù)執(zhí)行 SQL 查詢,將查詢結(jié)果加載到 DataFrame 中,為后續(xù)的數(shù)據(jù)操作和分析做好準備。這種靈活性和強大的數(shù)據(jù)處理能力是 PySpark 在大數(shù)據(jù)處理中的關鍵優(yōu)勢之一。
步驟 3:數(shù)據(jù)清洗與 JSON 格式轉(zhuǎn)換
from pyspark.sql.functions import udf import json def json_clean(commentlist): try: jsonstr = str(commentlist) s = jsonstr.replace("'", '"') s = '[' + s.replace('}{', '},{') + ']' python_obj = json.loads(s, strict=False) json_str = json.dumps(python_obj) return json_str except: return None json_clean_udf = udf(json_clean, StringType()) df = df.withColumn("new_commentlist", json_clean_udf(df["commentlist"])) newdf = df.withColumn("commentlist", df["new_commentlist"]) newdf = newdf.drop("new_commentlist")
在 PySpark 中定義并應用一個用戶自定義函數(shù)(UDF)來對數(shù)據(jù)進行清洗和轉(zhuǎn)換。
定義數(shù)據(jù)清洗函數(shù): json_clean()
函數(shù)接收一個名為 commentlist
的參數(shù),這個函數(shù)用于將從數(shù)據(jù)庫中檢索到的評論數(shù)據(jù)進行清洗。具體來說:
jsonstr = str(commentlist)
:將傳入的commentlist
轉(zhuǎn)換為字符串格式。s = jsonstr.replace("'", '"')
:將字符串中的單引號替換為雙引號,以滿足 JSON 格式的要求。s = '[' + s.replace('}{', '},{') + ']'
:在字符串中的每個對象之間添加逗號,并將整個字符串包含在一個數(shù)組中,以滿足 JSON 格式。python_obj = json.loads(s, strict=False)
:將字符串解析為 Python 對象。json_str = json.dumps(python_obj)
:將 Python 對象轉(zhuǎn)換回 JSON 字符串格式。return json_str
:返回清洗后的 JSON 字符串,如果清洗失敗則返回None
。
創(chuàng)建用戶定義函數(shù)(UDF): 使用 udf()
函數(shù)將 Python 函數(shù) json_clean()
封裝為 PySpark 的用戶定義函數(shù)(UDF),以便在 Spark 中使用。
應用函數(shù)到 DataFrame: df.withColumn()
函數(shù)將定義的 UDF 應用于 DataFrame 中的 commentlist
列,并將處理后的結(jié)果存儲到名為 new_commentlist
的新列中。
更新 DataFrame: 創(chuàng)建了一個新的 DataFrame newdf
,通過在原始 DataFrame df
的基礎上添加了經(jīng)過清洗的 commentlist
列,并刪除了原始的 new_commentlist
列。
步驟 4:保存清洗后的數(shù)據(jù)
newdf.write.mode("overwrite").saveAsTable("cjw_data.xiechengsentiment")
- 使用
write
方法:write
方法用于將 DataFrame 中的數(shù)據(jù)寫入外部存儲,可以是文件系統(tǒng)或數(shù)據(jù)庫。 - 指定保存模式(Mode): 在這個例子中,
.mode("overwrite")
指定了保存模式為 "overwrite",即如果目標位置已存在同名的表或數(shù)據(jù),將覆蓋(重寫)已有的內(nèi)容。 - 保存為表(
saveAsTable
):saveAsTable()
方法將 DataFrame 中的數(shù)據(jù)保存為一個新的表,名為cjw_data.xiechengsentiment
。這意味著,如果該表不存在,它將會被創(chuàng)建;如果表已存在,根據(jù)指定的模式進行重寫或追加。
步驟 5:統(tǒng)計數(shù)據(jù)
comment_count = newdf.filter(newdf.commentlist != "[]").count() total_count = newdf.count() print("有效長度:", comment_count) print("總長度:", total_count)
- 篩選非空數(shù)據(jù)行:
newdf.filter(newdf.commentlist != "[]")
這部分代碼使用filter()
方法篩選出newdf
DataFrame 中commentlist
列不為空的行。這里使用的條件是commentlist != "[]"
,即不等于空列表的行。也就是篩選出之前udf函數(shù)里返還為空的那些解析JSON異常的行。 - 統(tǒng)計有效數(shù)據(jù)長度: 通過
count()
方法統(tǒng)計篩選后的 DataFrame 中的行數(shù),即得到了符合條件的有效數(shù)據(jù)的長度,并將結(jié)果存儲在變量comment_count
中。 - 統(tǒng)計總長度:
newdf.count()
統(tǒng)計了整個 DataFrame 的行數(shù),即獲取了總長度,并將結(jié)果存儲在變量total_count
中。
截圖示例
清洗后示意圖:
hive表中查看清洗后的數(shù)據(jù):
輸出的字符串中包含了轉(zhuǎn)義字符(例如 \u597d),這些字符實際上是 Unicode 字符的表示方式,而不是真正的亂碼。 Python 中的 json.dumps() 方法默認將非 ASCII 字符串轉(zhuǎn)換為 Unicode 轉(zhuǎn)義序列。這種轉(zhuǎn)義是為了確保 JSON 字符串可以被準確地傳輸和解析,但可能會在輸出時顯示為 Unicode 轉(zhuǎn)義字符。
JSON 是一種數(shù)據(jù)交換格式,它使用 Unicode 轉(zhuǎn)義序列(比如 \uXXXX)來表示非 ASCII 字符。在默認情況下,json.dumps() 將非 ASCII 字符轉(zhuǎn)義為 Unicode 字符以確保其正確性,并且這種轉(zhuǎn)義對于網(wǎng)絡傳輸和解析是非常重要的
總結(jié)
本文介紹了使用 PySpark 對數(shù)據(jù)進行清洗和 JSON 格式轉(zhuǎn)換的過程。通過上述步驟,我們可以連接到遠程 Spark 服務器,加載數(shù)據(jù),應用自定義函數(shù)對數(shù)據(jù)進行清洗和格式轉(zhuǎn)換,并最終保存清洗后的數(shù)據(jù)。這個流程展示了 PySpark 在數(shù)據(jù)處理中的強大功能,特別是在大規(guī)模數(shù)據(jù)集的處理和轉(zhuǎn)換方面的優(yōu)勢。
以上就是使用PySpark實現(xiàn)數(shù)據(jù)清洗與JSON格式轉(zhuǎn)換的實踐詳解的詳細內(nèi)容,更多關于PySpark數(shù)據(jù)清洗與JSON格式轉(zhuǎn)換的資料請關注腳本之家其它相關文章!
相關文章
python將二維數(shù)組升為一維數(shù)組或二維降為一維方法實例
在實際應用的過程中,遇到列表或是數(shù)組的維數(shù)不同,需要變換的問題,如二維列表/數(shù)組變成了一維列表/數(shù)組,下面這篇文章主要給大家介紹了關于python將二維數(shù)組升為一維數(shù)組或二維降為一維的相關資料,需要的朋友可以參考下2022-11-11Python Pandas兩個表格內(nèi)容模糊匹配的實現(xiàn)
模糊查詢大家應該都不會陌生,下面這篇文章主要給大家介紹了關于Python Pandas兩個表格內(nèi)容模糊匹配的實現(xiàn)方法,文中通過示例代碼介紹的非常詳細,需要的朋友可以參考下2021-11-11Python實現(xiàn)半角轉(zhuǎn)全角的方法示例
本文介紹了使用Python實現(xiàn)半角字符到全角字符的轉(zhuǎn)換,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2025-01-01