如何使用IDEA開(kāi)發(fā)Spark SQL程序(一文搞懂)
前言
大家好,我是DJ丶小哪吒,我又來(lái)跟你們分享知識(shí)了。對(duì)軟件開(kāi)發(fā)有著濃厚的興趣。喜歡與人分享知識(shí)。做博客的目的就是為了能與 他 人知識(shí)共享。由于水平有限。博客中難免會(huì)有一些錯(cuò)誤。如有 紕漏 之處,歡迎大家在留言區(qū)指正。小編也會(huì)及時(shí)改正。
DJ丶小哪吒又來(lái)與各位分享知識(shí)了。今天我們不飆車,今天就靜靜的坐下來(lái),我們來(lái)聊一聊關(guān)于sparkSQL。準(zhǔn)備好茶水,聽(tīng)老朽與你娓娓道來(lái)。
Spark SQL是什么
Spark SQL 是一個(gè)用來(lái)處理結(jié)構(gòu)化數(shù)據(jù)的spark組件。它提供了一個(gè)叫做DataFrames的可編程抽象數(shù)據(jù)模型,并且可被視為一個(gè)分布式的SQL查詢引擎。
1、使用IDEA開(kāi)發(fā)Spark SQL
Spark會(huì)根據(jù)文件信息嘗試著去推斷DataFrame/DataSet的Schema,當(dāng)然我們也可以手動(dòng)指定,手動(dòng)指定的方式有以下幾種:
- 第1種:指定列名添加Schema
- 第2種:通過(guò)StructType指定Schema
- 第3種:編寫樣例類,利用反射機(jī)制推斷Schema
1.1、指定列名添加Schema
package cn.itcast.sql import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} object CreateDFDS { def main(args: Array[String]): Unit = { //1.創(chuàng)建SparkSession val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") //2.讀取文件 val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt") val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" ")) val rowRDD: RDD[(Int, String, Int)] = linesRDD.map(line =>(line(0).toInt,line(1),line(2).toInt)) //3.將RDD轉(zhuǎn)成DF //注意:RDD中原本沒(méi)有toDF方法,新版本中要給它增加一個(gè)方法,可以使用隱式轉(zhuǎn)換 import spark.implicits._ val personDF: DataFrame = rowRDD.toDF("id","name","age") personDF.show(10) personDF.printSchema() sc.stop() spark.stop() } }
1.2、通過(guò)StructType指定Schema
package cn.itcast.sql import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Row, SparkSession} object CreateDFDS2 { def main(args: Array[String]): Unit = { //1.創(chuàng)建SparkSession val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") //2.讀取文件 val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt") val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" ")) val rowRDD: RDD[Row] = linesRDD.map(line =>Row(line(0).toInt,line(1),line(2).toInt)) //3.將RDD轉(zhuǎn)成DF //注意:RDD中原本沒(méi)有toDF方法,新版本中要給它增加一個(gè)方法,可以使用隱式轉(zhuǎn)換 //import spark.implicits._ val schema: StructType = StructType(Seq( StructField("id", IntegerType, true),//允許為空 StructField("name", StringType, true), StructField("age", IntegerType, true)) ) val personDF: DataFrame = spark.createDataFrame(rowRDD,schema) personDF.show(10) personDF.printSchema() sc.stop() spark.stop() } }
1.3、反射推斷Schema–掌握
package cn.itcast.sql import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} object CreateDFDS3 { case class Person(id:Int,name:String,age:Int) def main(args: Array[String]): Unit = { //1.創(chuàng)建SparkSession val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL") .getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") //2.讀取文件 val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt") val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" ")) val rowRDD: RDD[Person] = linesRDD.map(line =>Person(line(0).toInt,line(1),line(2).toInt)) //3.將RDD轉(zhuǎn)成DF //注意:RDD中原本沒(méi)有toDF方法,新版本中要給它增加一個(gè)方法,可以使用隱式轉(zhuǎn)換 import spark.implicits._ //注意:上面的rowRDD的泛型是Person,里面包含了Schema信息 //所以SparkSQL可以通過(guò)反射自動(dòng)獲取到并添加給DF val personDF: DataFrame = rowRDD.toDF personDF.show(10) personDF.printSchema() sc.stop() spark.stop() } }
1.4、花式查詢
package cn.itcast.sql import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} object QueryDemo { case class Person(id:Int,name:String,age:Int) def main(args: Array[String]): Unit = { //1.創(chuàng)建SparkSession val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL") .getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") //2.讀取文件 val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt") val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" ")) val rowRDD: RDD[Person] = linesRDD.map(line =>Person(line(0).toInt,line(1),line(2).toInt)) //3.將RDD轉(zhuǎn)成DF //注意:RDD中原本沒(méi)有toDF方法,新版本中要給它增加一個(gè)方法,可以使用隱式轉(zhuǎn)換 import spark.implicits._ //注意:上面的rowRDD的泛型是Person,里面包含了Schema信息 //所以SparkSQL可以通過(guò)反射自動(dòng)獲取到并添加給DF val personDF: DataFrame = rowRDD.toDF personDF.show(10) personDF.printSchema() //=======================SQL方式查詢======================= //0.注冊(cè)表 personDF.createOrReplaceTempView("t_person") //1.查詢所有數(shù)據(jù) spark.sql("select * from t_person").show() //2.查詢age+1 spark.sql("select age,age+1 from t_person").show() //3.查詢age最大的兩人 spark.sql("select name,age from t_person order by age desc limit 2").show() //4.查詢各個(gè)年齡的人數(shù) spark.sql("select age,count(*) from t_person group by age").show() //5.查詢年齡大于30的 spark.sql("select * from t_person where age > 30").show() //=======================DSL方式查詢======================= //1.查詢所有數(shù)據(jù) personDF.select("name","age") //2.查詢age+1 personDF.select($"name",$"age" + 1) //3.查詢age最大的兩人 personDF.sort($"age".desc).show(2) //4.查詢各個(gè)年齡的人數(shù) personDF.groupBy("age").count().show() //5.查詢年齡大于30的 personDF.filter($"age" > 30).show() sc.stop() spark.stop() } }
1.5、 相互轉(zhuǎn)化
RDD、DF、DS之間的相互轉(zhuǎn)換有很多(6種),但是我們實(shí)際操作就只有2類:
1)使用RDD算子操作
2)使用DSL/SQL對(duì)表操作
package cn.itcast.sql import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} object TransformDemo { case class Person(id:Int,name:String,age:Int) def main(args: Array[String]): Unit = { //1.創(chuàng)建SparkSession val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") //2.讀取文件 val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt") val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" ")) val personRDD: RDD[Person] = linesRDD.map(line =>Person(line(0).toInt,line(1),line(2).toInt)) //3.將RDD轉(zhuǎn)成DF //注意:RDD中原本沒(méi)有toDF方法,新版本中要給它增加一個(gè)方法,可以使用隱式轉(zhuǎn)換 import spark.implicits._ //注意:上面的rowRDD的泛型是Person,里面包含了Schema信息 //所以SparkSQL可以通過(guò)反射自動(dòng)獲取到并添加給DF //=========================相互轉(zhuǎn)換====================== //1.RDD-->DF val personDF: DataFrame = personRDD.toDF //2.DF-->RDD val rdd: RDD[Row] = personDF.rdd //3.RDD-->DS val DS: Dataset[Person] = personRDD.toDS() //4.DS-->RDD val rdd2: RDD[Person] = DS.rdd //5.DF-->DS val DS2: Dataset[Person] = personDF.as[Person] //6.DS-->DF val DF: DataFrame = DS2.toDF() sc.stop() spark.stop() } }
1.6、Spark SQL完成WordCount(案例)
1.6.1、SQL風(fēng)格
package cn.itcast.sql import org.apache.spark.SparkContext import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} object WordCount { def main(args: Array[String]): Unit = { //1.創(chuàng)建SparkSession val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") //2.讀取文件 val fileDF: DataFrame = spark.read.text("D:\\data\\words.txt") val fileDS: Dataset[String] = spark.read.textFile("D:\\data\\words.txt") //fileDF.show() //fileDS.show() //3.對(duì)每一行按照空格進(jìn)行切分并壓平 //fileDF.flatMap(_.split(" ")) //注意:錯(cuò)誤,因?yàn)镈F沒(méi)有泛型,不知道_是String import spark.implicits._ val wordDS: Dataset[String] = fileDS.flatMap(_.split(" "))//注意:正確,因?yàn)镈S有泛型,知道_是String //wordDS.show() /* +-----+ |value| +-----+ |hello| | me| |hello| | you| ... */ //4.對(duì)上面的數(shù)據(jù)進(jìn)行WordCount wordDS.createOrReplaceTempView("t_word") val sql = """ |select value ,count(value) as count |from t_word |group by value |order by count desc """.stripMargin spark.sql(sql).show() sc.stop() spark.stop() } }
1.6.2、DQL風(fēng)格
package cn.itcast.sql import org.apache.spark.SparkContext import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} object WordCount2 { def main(args: Array[String]): Unit = { //1.創(chuàng)建SparkSession val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") //2.讀取文件 val fileDF: DataFrame = spark.read.text("D:\\data\\words.txt") val fileDS: Dataset[String] = spark.read.textFile("D:\\data\\words.txt") //fileDF.show() //fileDS.show() //3.對(duì)每一行按照空格進(jìn)行切分并壓平 //fileDF.flatMap(_.split(" ")) //注意:錯(cuò)誤,因?yàn)镈F沒(méi)有泛型,不知道_是String import spark.implicits._ val wordDS: Dataset[String] = fileDS.flatMap(_.split(" "))//注意:正確,因?yàn)镈S有泛型,知道_是String //wordDS.show() /* +-----+ |value| +-----+ |hello| | me| |hello| | you| ... */ //4.對(duì)上面的數(shù)據(jù)進(jìn)行WordCount wordDS.groupBy("value").count().orderBy($"count".desc).show() sc.stop() spark.stop() } }
好了,以上內(nèi)容就到這里了。你學(xué)到了嗎。
到此這篇關(guān)于如何使用IDEA開(kāi)發(fā)Spark SQL程序(一文搞懂)的文章就介紹到這了,更多相關(guān)IDEA開(kāi)發(fā)Spark SQL內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
JavaBean valication驗(yàn)證實(shí)現(xiàn)方法示例
這篇文章主要介紹了JavaBean valication驗(yàn)證實(shí)現(xiàn)方法,結(jié)合實(shí)例形式分析了JavaBean valication驗(yàn)證相關(guān)概念、原理、用法及操作注意事項(xiàng),需要的朋友可以參考下2020-03-03ThreadLocal?在上下文傳值場(chǎng)景實(shí)踐源碼
這篇文章主要為大家介紹了ThreadLocal在上下文傳值場(chǎng)景下的實(shí)踐源碼,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步2022-03-03一文了解Java Log框架徹底搞懂Log4J,Log4J2,LogBack,SLF4J
本文主要介紹了一文了解Java Log框架徹底搞懂Log4J,Log4J2,LogBack,SLF4J,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-03-03詳解SpringMVC的攔截器鏈實(shí)現(xiàn)及攔截器鏈配置
攔截器(Interceptor)是一種動(dòng)態(tài)攔截方法調(diào)用的機(jī)制,在SpringMVC中動(dòng)態(tài)攔截控制器方法的執(zhí)行。本文將詳細(xì)講講SpringMVC中攔截器參數(shù)及攔截器鏈配置,感興趣的可以嘗試一下2022-08-08SpringBoot中使用Redis對(duì)接口進(jìn)行限流的實(shí)現(xiàn)
本文將結(jié)合實(shí)例代碼,介紹SpringBoot中使用Redis對(duì)接口進(jìn)行限流的實(shí)現(xiàn),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-07-07使用@Cacheable緩存解決雙冒號(hào)::的問(wèn)題
這篇文章主要介紹了使用@Cacheable緩存解決雙冒號(hào)::的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12