SparkSQL快速入門教程
(一)概述
SparkSQL可以理解為在原生的RDD上做的一層封裝,通過SparkSQL可以在scala和java中寫SQL語句,并將結(jié)果作為Dataset/DataFrame返回。簡(jiǎn)單來講,SparkSQL可以讓我們像寫SQL一樣去處理內(nèi)存中的數(shù)據(jù)。
Dataset是一個(gè)數(shù)據(jù)的分布式集合,是Spark1.6之后新增的接口,它提供了RDD的優(yōu)點(diǎn)和SparkSQL優(yōu)化執(zhí)行引擎的優(yōu)點(diǎn),一個(gè)Dataset相當(dāng)于RDD+Schema的結(jié)合。
Dataset的底層封裝是RDD,當(dāng)RDD的泛型是Row類型時(shí),該類型就可以稱為DataFrame。DataFrame是一種表格型的數(shù)據(jù)結(jié)構(gòu),就和傳統(tǒng)的Mysql結(jié)構(gòu)一樣,通過DataFrame我們可以更加高效地去執(zhí)行Sql。
特點(diǎn)
- 易整合,在程序中既可以使用SQL,還可以使用API!
- 統(tǒng)一的數(shù)據(jù)訪問, 不同數(shù)據(jù)源中的數(shù)據(jù),都可以使用SQL或DataFrameAPI進(jìn)行操作,還可以進(jìn)行不同數(shù)據(jù)源的Join!
- 對(duì)Hive的無縫支持
- 支持標(biāo)準(zhǔn)的JDBC和ODBC
(二)SparkSQL實(shí)戰(zhàn)
使用SparkSQL首先需要引入相關(guān)的依賴:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.0.0</version> </dependency>
該依賴需要和sparkCore保持一致。
SparkSQL的編碼主要通過四步:
- 創(chuàng)建SparkSession
- 獲取數(shù)據(jù)
- 執(zhí)行SQL
- 關(guān)閉SparkSession
public class SqlTest { public static void main(String[] args) { SparkSession sparkSession = SparkSession.builder() .appName("sql") .master("local") .getOrCreate(); Dataset<Row> json = sparkSession.read().json("data/json"); json.printSchema(); json.show(); sparkSession.stop(); } }
在data的目錄下創(chuàng)建一個(gè)名為json的文件
{"name":"a","age":23} {"name":"b","age":24} {"name":"c","age":25} {"name":"d","age":26} {"name":"e","age":27} {"name":"f","age":28}
運(yùn)行項(xiàng)目后輸出兩個(gè)結(jié)果,schema結(jié)果如下:
Dataset<Row>
輸出結(jié)果如下:
通過SparkSQL可以執(zhí)行和SQL十分相似的查詢操作:
public class SqlTest { public static void main(String[] args) { SparkSession sparkSession = SparkSession.builder() .appName("sql") .master("local") .getOrCreate(); Dataset<Row> json = sparkSession.read().json("data/json"); json.select("age","name").where("age > 26").show(); sparkSession.stop(); } }
在上面的語句中,通過一系列的API實(shí)現(xiàn)了SQL查詢操作,除此之外,SparkSQL還支持直接寫原始SQL語句的操作。
在寫SQL語句之前,首先需要讓Spark知道對(duì)哪個(gè)表進(jìn)行查詢,因此需要建立一張臨時(shí)表,再執(zhí)行SQL查詢:
json.createOrReplaceTempView("json"); sparkSession.sql("select * from json where age > 26").show();
(三)非JSON格式的Dataset創(chuàng)建
在上一節(jié)中創(chuàng)建Dataset時(shí)使用了最簡(jiǎn)單的json,因?yàn)閖son自己帶有schema結(jié)構(gòu),因此不需要手動(dòng)去增加,如果是一個(gè)txt文件,就需要在創(chuàng)建Dataset時(shí)手動(dòng)塞入schema。
下面展示讀取txt文件的例子,首先創(chuàng)建一個(gè)user.txt
a 23 b 24 c 25 d 26
現(xiàn)在我要將上面的這幾行變成DataFrame,第一列表示姓名,第二列表示年齡,于是就可以像下面這樣操作:
public class SqlTest2 { public static void main(String[] args) { SparkSession sparkSession = SparkSession.builder() .appName("sql") .master("local") .getOrCreate(); SparkContext sparkContext = sparkSession.sparkContext(); JavaSparkContext sc = new JavaSparkContext(sparkContext); JavaRDD<String> lines = sc.textFile("data/user.txt"); //將String類型轉(zhuǎn)化為Row類型 JavaRDD<Row> rowJavaRDD = lines.map(new Function<String, Row>() { @Override public Row call(String v1) throws Exception { String[] split = v1.split(" "); return RowFactory.create( split[0], Integer.valueOf(split[1]) ); } }); //定義schema List<StructField> structFields = Arrays.asList( DataTypes.createStructField("name", DataTypes.StringType, true), DataTypes.createStructField("age", DataTypes.IntegerType, true) ); StructType structType = DataTypes.createStructType(structFields); //生成dataFrame Dataset<Row> dataFrame = sparkSession.createDataFrame(rowJavaRDD, structType); dataFrame.show(); } }
(四)通過JDBC創(chuàng)建DataFrame
通過JDBC可直接將對(duì)應(yīng)數(shù)據(jù)庫中的表放入Spark中進(jìn)行一些處理,下面通過MySQL進(jìn)行展示。
使用MySQL需要在依賴中引入MySQL的引擎:
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.46</version> </dependency>
接著通過類似JDBC的方式讀取MySQL數(shù)據(jù):
public class SqlTest3 { public static void main(String[] args) { SparkSession sparkSession = SparkSession.builder() .appName("sql") .master("local") .getOrCreate(); Map<String,String> options = new HashMap<>(); options.put("url","jdbc:mysql://127.0.0.1:3306/books"); options.put("driver","com.mysql.jdbc.Driver"); options.put("user","root"); options.put("password","123456"); options.put("dbtable","book"); Dataset<Row> jdbc = sparkSession.read().format("jdbc").options(options).load(); jdbc.show(); sparkSession.close(); } }
讀取到的數(shù)據(jù)是DataFrame,接下來的操作就是對(duì)DataFrame的操作了。
(五)總結(jié)
SparkSQL是對(duì)Spark原生RDD的增強(qiáng),雖然很多功能通過RDD就可以實(shí)現(xiàn),但是SparkSQL可以更加靈活地實(shí)現(xiàn)一些功能。
到此這篇關(guān)于SparkSQL快速入門教程的文章就介紹到這了,更多相關(guān)SparkSQL入門內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java中使用instanceof判斷對(duì)象類型的示例
在List<Object>中遍歷Object時(shí),先判斷類型,再定向轉(zhuǎn)換,本文給大家介紹Java中使用instanceof判斷對(duì)象類型,感興趣的朋友跟隨小編一起看看吧2023-08-08springboot集成nacos報(bào)錯(cuò):get data from Nacos
這篇文章給大家介紹了springboot集成nacos報(bào)錯(cuò):get data from Nacos error,dataId:null.yaml的原因及解決方法,如果又遇到相同問題的朋友可以參考閱讀本文2023-10-10JVM運(yùn)行時(shí)數(shù)據(jù)區(qū)原理解析
這篇文章主要介紹了JVM運(yùn)行時(shí)數(shù)據(jù)區(qū)原理解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-08-08基于Java代碼實(shí)現(xiàn)判斷春節(jié)、端午節(jié)、中秋節(jié)等法定節(jié)假日的方法
這篇文章主要介紹了基于Java代碼實(shí)現(xiàn)判斷春節(jié)、端午節(jié)、中秋節(jié)等法定節(jié)假日的方法 的相關(guān)資料,需要的朋友可以參考下2016-01-01前置++和后置++ 運(yùn)算的詳解及實(shí)例代碼
這篇文章主要介紹了前置++和后置++ 的相關(guān)資料,并附示例代碼,幫助大家學(xué)習(xí)參考,需要的朋友可以參考下2016-09-09Java開發(fā)人員最常犯的10個(gè)錯(cuò)誤
這篇文章主要介紹了Java開發(fā)人員最常犯的10個(gè)錯(cuò)誤,非常不錯(cuò),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2019-07-07Spring Boot集成Shiro實(shí)現(xiàn)動(dòng)態(tài)加載權(quán)限的完整步驟
這篇文章主要給大家介紹了關(guān)于Spring Boot集成Shiro實(shí)現(xiàn)動(dòng)態(tài)加載權(quán)限的完整步驟,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用Spring Boot具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧2019-09-09