SparkSQl簡(jiǎn)介及運(yùn)行原理
一:什么是SparkSQL?
(一)SparkSQL簡(jiǎn)介
Spark SQL是Spark的一個(gè)模塊,用于處理結(jié)構(gòu)化的數(shù)據(jù),它提供了一個(gè)數(shù)據(jù)抽象DataFrame(最核心的編程抽象就是DataFrame),并且SparkSQL作為分布式SQL查詢引擎。
Spark SQL就是將SQL轉(zhuǎn)換成一個(gè)任務(wù),提交到集群上運(yùn)行,類似于Hive的執(zhí)行方式。
(二)SparkSQL運(yùn)行原理
將Spark SQL轉(zhuǎn)化為RDD,然后提交到集群執(zhí)行。
(三)SparkSQL特點(diǎn)
(1)容易整合,Spark SQL已經(jīng)集成在Spark中
(2)提供了統(tǒng)一的數(shù)據(jù)訪問(wèn)方式:JSON、CSV、JDBC、Parquet等都是使用統(tǒng)一的方式進(jìn)行訪問(wèn)
(3)兼容 Hive
(4)標(biāo)準(zhǔn)的數(shù)據(jù)連接:JDBC、ODBC
二:DataFrame
(一)什么是DataFrame?
在Spark中,DataFrame是一種以RDD為基礎(chǔ)的分布式數(shù)據(jù)集,類似于傳統(tǒng)數(shù)據(jù)庫(kù)中的二維表格。
DataFrame是組織成命名列的數(shù)據(jù)集。
它在概念上等同于關(guān)系數(shù)據(jù)庫(kù)中的表,但在底層具有更豐富的優(yōu)化。
關(guān)系型數(shù)據(jù)庫(kù)中的表由表結(jié)構(gòu)和數(shù)據(jù)組成,而DataFrame也類似,由schema(結(jié)構(gòu))和數(shù)據(jù)組成,其數(shù)據(jù)集是RDD。
DataFrame可以根據(jù)很多源進(jìn)行構(gòu)建,包括:結(jié)構(gòu)化的數(shù)據(jù)文件,hive中的表,外部的關(guān)系型數(shù)據(jù)庫(kù),以及RDD
補(bǔ)充:Spark中的RDD、DataFrame和DataSet講解
(一)Spark中的模塊
上圖展示了Spark的模塊及各模塊之間的關(guān)系:
底層是Spark-core核心模塊,Spark每個(gè)模塊都有一個(gè)核心抽象,Spark-core的核心抽象是RDD,
Spark SQL等都基于RDD封裝了自己的抽象,在Spark SQL中是DataFrame/DataSet。
相對(duì)來(lái)說(shuō)RDD是更偏底層的抽象,DataFrame/DataSet是在其上做了一層封裝,做了優(yōu)化,使用起來(lái)更加方便。
從功能上來(lái)說(shuō),DataFrame/DataSet能做的事情RDD都能做,RDD能做的事情DataFrame/DataSet不一定能做。
(二)RDD和DataFrame的區(qū)別
DataFrame與RDD的主要區(qū)別在于:
DataFrame
DataFrame帶有schema元信息,即DataFrame所表示的二維表數(shù)據(jù)集的每一列都帶有名稱和類型。
使得Spark SQL得以洞察更多的結(jié)構(gòu)信息,從而對(duì)藏于DataFrame背后的數(shù)據(jù)源以及作用于DataFrame之上的變換進(jìn)行了針對(duì)性的優(yōu)化,最終達(dá)到大幅提升運(yùn)行時(shí)效率的目標(biāo)。
RDD
RDD,由于無(wú)從得知所存數(shù)據(jù)元素的具體內(nèi)部結(jié)構(gòu),Spark Core只能在stage層面進(jìn)行簡(jiǎn)單、通用的流水線優(yōu)化。
DataFrame和RDD聯(lián)系:
DataFrame底層是以RDD為基礎(chǔ)的分布式數(shù)據(jù)集,和RDD的主要區(qū)別的是:RDD中沒(méi)有schema信息,而DataFrame中數(shù)據(jù)每一行都包含schema
DataFrame = RDD[Row] + shcema
三:SparkSession
(一)SparkSession簡(jiǎn)介
SparkSession是Spark 2.0引如的新概念。SparkSession為用戶提供了統(tǒng)一的切入點(diǎn),來(lái)讓用戶學(xué)習(xí)spark的各項(xiàng)功能。
在spark的早期版本中,SparkContext是spark的主要切入點(diǎn),由于RDD是主要的API,我們通過(guò)sparkcontext來(lái)創(chuàng)建和操作RDD。
對(duì)于每個(gè)其他的API,我們需要使用不同的context。
例如,對(duì)于Streming,我們需要使用StreamingContext;對(duì)于sql,使用sqlContext;對(duì)于Hive,使用hiveContext。
但是隨著DataSet和DataFrame的API逐漸成為標(biāo)準(zhǔn)的API,就需要為他們建立接入點(diǎn)。所以在spark2.0中,引入SparkSession作為DataSet和DataFrame API的切入點(diǎn)。
SparkSession封裝了SparkConf、SparkContext和SQLContext。為了向后兼容,SQLContext和HiveContext也被保存下來(lái)。
(二)SparkSession實(shí)質(zhì)
SparkSession實(shí)質(zhì)上是SQLContext和HiveContext的組合(未來(lái)可能還會(huì)加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同樣是可以使用的。
SparkSession內(nèi)部封裝了sparkContext,所以計(jì)算實(shí)際上是由sparkContext完成的。
(三)SparkSession特點(diǎn)
----為用戶提供一個(gè)統(tǒng)一的切入點(diǎn)使用Spark各項(xiàng)功能
----允許用戶通過(guò)它調(diào)用DataFrame和Dataset相關(guān) API來(lái)編寫(xiě)程序
----減少了用戶需要了解的一些概念,可以很容易的與Spark進(jìn)行交互
----與Spark交互之時(shí)不需要顯示的創(chuàng)建SparkConf, SparkContext以及 SQlContext,這些對(duì)象已經(jīng)封閉在SparkSession中
四:通過(guò)RDD創(chuàng)建DataFrame
(一)通過(guò)樣本類創(chuàng)建(反射)
case class People(val name:String,val age:Int) //可以聲明數(shù)據(jù)類型 object WordCount { def main(args:Array[String]):Unit={ val conf = new SparkConf() //設(shè)置運(yùn)行模式為本地運(yùn)行,不然默認(rèn)是集群模式 //conf.setMaster("local") //默認(rèn)是集群模式 //設(shè)置任務(wù)名 conf.setAppName("WordCount").setMaster("local") conf.set("spark.default.parallelism","5") //設(shè)置SparkContext,是SparkCore的程序入口 val sc = new SparkContext(conf) val Sqlsc = new SQLContext(sc) //根據(jù)SparkContext生成SQLContext val array = Array("mark,14","kitty,23","dasi,45") val peopleRDD = sc.parallelize(array).map(line=>{ //生成RDD People(line.split(",")(0),line.split(",")(1).trim().toInt) }) import Sqlsc.implicits._ //引入全部方法 //將RDD轉(zhuǎn)換成DataFrame val df = peopleRDD.toDF() //將DataFrame轉(zhuǎn)換成一個(gè)臨時(shí)的視圖 df.createOrReplaceTempView("people") //使用SQL語(yǔ)句進(jìn)行查詢 Sqlsc.sql("select * from people").show() } }
(二)通過(guò)SparkSession創(chuàng)建DataFrame
object WordCount { def main(args:Array[String]):Unit={ val conf = new SparkConf() //設(shè)置運(yùn)行模式為本地運(yùn)行,不然默認(rèn)是集群模式 //conf.setMaster("local") //默認(rèn)是集群模式 //設(shè)置任務(wù)名 conf.setAppName("WordCount").setMaster("local") conf.set("spark.default.parallelism","5") //設(shè)置SparkContext,是SparkCore的程序入口 val sc = new SparkContext(conf) val Sqlsc = new SQLContext(sc) //根據(jù)SparkContext生成SQLContext val array = Array("mark,14","kitty,23","dasi,45") //1.需要將RDD數(shù)據(jù)映射成Row,需要引入import org.apache.spark.sql.Row val peopleRDD = sc.parallelize(array).map(line=>{ //生成RDD val fields = line.split(",") Row(fields(0),fields(1).trim().toInt) }) //2.創(chuàng)建StructType定義結(jié)構(gòu) val st:StructType = StructType( //字段名,字段類型,是否可以為空 List( //傳參是列表類型,或者使用StructField("name", StringType, true) :: StructField("age", IntegerType, true) :: Nil來(lái)構(gòu)建列表 StructField("name",StringType,true), StructField("age",IntegerType,true) ) ) //3.使用SparkSession建立DataFrame val df = Sqlsc.createDataFrame(peopleRDD,st) //將DataFrame轉(zhuǎn)換成一個(gè)臨時(shí)的視圖 df.createOrReplaceTempView("people") //使用SQL語(yǔ)句進(jìn)行查詢 Sqlsc.sql("select * from people").show() } }
(三)通過(guò) json 文件創(chuàng)建DataFrames
[{"name":"dafa","age":12},{"name":"safaw","age":17},{"name":"ge","age":34}]
def main(args:Array[String]):Unit={ val conf = new SparkConf() //設(shè)置運(yùn)行模式為本地運(yùn)行,不然默認(rèn)是集群模式 //conf.setMaster("local") //默認(rèn)是集群模式 //設(shè)置任務(wù)名 conf.setAppName("WordCount").setMaster("local") //設(shè)置SparkContext,是SparkCore的程序入口 val sc = new SparkContext(conf) val Sqlsc = new SQLContext(sc) //根據(jù)SparkContext生成SQLContext //通過(guò)json數(shù)據(jù)直接創(chuàng)建DataFrame val df = Sqlsc.read.json("E:\\1.json") //將DataFrame轉(zhuǎn)換成一個(gè)臨時(shí)的視圖 df.createOrReplaceTempView("people1") //使用SQL語(yǔ)句進(jìn)行查詢 Sqlsc.sql("select * from people1").show() }
五:臨時(shí)視圖
(一)什么是視圖
視圖是一個(gè)虛表,跟Mysql里的概念是一樣的,視圖基于實(shí)際的表而存在,其實(shí)質(zhì)是一系列的查詢語(yǔ)句
(二)類型
局部視圖(Temoporary View):只在當(dāng)前會(huì)話中有效,如果創(chuàng)建它的會(huì)話終止,則視圖也會(huì)消失。
全局視圖(Global Temporary View): 在全局范圍內(nèi)有效,不同的Session中都可以訪問(wèn),生命周期是Spark的Application運(yùn)行周期,全局視圖會(huì)綁定到系統(tǒng)保留的數(shù)據(jù)庫(kù)global_temp中,因此使用它的時(shí)候必須加上相應(yīng)前綴。
(三)創(chuàng)建視圖
創(chuàng)建局部視圖:df.createOrReplaceTempView("emp")
創(chuàng)建全局視圖:df.createOrReplaceGlobalTempView("empG")
(四)視圖查詢
spark.sql("select * from emp").show
spark.sql("select * from global_temp.empG").show //查詢?nèi)忠晥D,需要添加前綴
(五)會(huì)話周期
spark.newSession.sql("select * from emp").show -----> 報(bào)錯(cuò),Table or View Not Found
spark.newSession.sql("select * from global_temp.empG").show ---->可以正常查詢
六:DataFrame的read和save和savemode
(一)數(shù)據(jù)讀取
val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) //方式一 val df1 = sqlContext.read.json("E:\\666\\people.json") val df2 = sqlContext.read.parquet("E:\\666\\users.parquet") //方式二 val df3 = sqlContext.read.format("json").load("E:\\666\\people.json") val df4 = sqlContext.read.format("parquet").load("E:\\666\\users.parquet") //方式三,默認(rèn)是parquet格式 val df5 = sqlContext.load("E:\\666\\users.parquet") //方式四,使用MySQL進(jìn)行數(shù)據(jù)源讀取 val url = "jdbc:mysql://192.168.123.102:3306/hivedb" val table = "dbs" val properties = new Properties() properties.setProperty("user","root") properties.setProperty("password","root") //需要傳入Mysql的URL、表明、properties(連接數(shù)據(jù)庫(kù)的用戶名密碼) val df = sqlContext.read.jdbc(url,table,properties) df.createOrReplaceTempView("dbs") sqlContext.sql("select * from dbs").show()
使用Hive作為數(shù)據(jù)源:需要在pom.xml文件中添加依賴
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>2.3.0</version> </dependency>
開(kāi)發(fā)環(huán)境則把resource文件夾下添加hive-site.xml文件,集群環(huán)境把hive的配置文件要發(fā)到$SPARK_HOME/conf目錄下
<configuration> <property> <name>javax.jdo.option.ConnectionURL</name> <value>jdbc:mysql://localhost:3306/hivedb?createDatabaseIfNotExist=true</value> <description>JDBC connect string for a JDBC metastore</description> <!-- 如果 mysql 和 hive 在同一個(gè)服務(wù)器節(jié)點(diǎn),那么請(qǐng)更改 hadoop02 為 localhost --> </property> <property> <name>javax.jdo.option.ConnectionDriverName</name> <value>com.mysql.jdbc.Driver</value> <description>Driver class name for a JDBC metastore</description> </property> <property> <name>javax.jdo.option.ConnectionUserName</name> <value>root</value> <description>username to use against metastore database</description> </property> <property> <name>javax.jdo.option.ConnectionPassword</name> <value>root</value> <description>password to use against metastore database</description> </property> <property> <name>hive.metastore.warehouse.dir</name> <value>/hive/warehouse</value> <description>hive default warehouse, if nessecory, change it</description> </property> </configuration> hive-site.xml配置文件
val conf = new SparkConf().setMaster("local").setAppName(this.getClass.getSimpleName) val sc = new SparkContext(conf) val sqlContext = new HiveContext(sc) sqlContext.sql("select * from myhive.student").show()
(二)數(shù)據(jù)保存
val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val df1 = sqlContext.read.json("E:\\666\\people.json") //方式一 df1.write.json("E:\\111") df1.write.parquet("E:\\222") //方式二 df1.write.format("json").save("E:\\333") df1.write.format("parquet").save("E:\\444") //方式三 df1.write.save("E:\\555")
(三)數(shù)據(jù)保存模式
df1.write.format("parquet").mode(SaveMode.Ignore).save("E:\\444")
七:數(shù)據(jù)集DataSet
Dataset也是一個(gè)分布式數(shù)據(jù)容器,簡(jiǎn)單來(lái)說(shuō)是類似二維表,Dataset里頭存有schema數(shù)據(jù)結(jié)構(gòu)信息和原生數(shù)據(jù),Dataset的底層封裝的是RDD,當(dāng)RDD的泛型是Row類型的時(shí)候,我們也可以稱它為DataFrame。即Dataset<Row> = DataFrame。DataFrame是特殊的Dataset。
Spark整合了Dataset和DataFrame,前者是有明確類型的數(shù)據(jù)集,后者是無(wú)明確類型的數(shù)據(jù)集。根據(jù)官方的文檔:
Dataset是一種強(qiáng)類型集合,與領(lǐng)域?qū)ο笙嚓P(guān),可以使用函數(shù)或者關(guān)系進(jìn)行分布式的操作。
每個(gè)Dataset也有一個(gè)無(wú)類型的視圖,叫做DataFrame,也就是關(guān)于Row的Dataset。
簡(jiǎn)單來(lái)說(shuō),Dataset一般都是Dataset[T]形式,這里的T是指數(shù)據(jù)的類型,如上圖中的Person,而DataFrame就是一個(gè)Dataset[Row]。
Datasets是懶加載的,即只有actions被調(diào)用的時(shí)候才會(huì)觸發(fā)計(jì)算。在內(nèi)部,Dataset代表一個(gè)邏輯計(jì)劃,用來(lái)描述產(chǎn)生數(shù)據(jù)需要的計(jì)算。當(dāng)一個(gè)action被調(diào)用的時(shí)候,Spark的query優(yōu)化器會(huì)優(yōu)化這個(gè)邏輯計(jì)劃并以分布式的方式在物理上進(jìn)行實(shí)際的計(jì)算操作。
(一)創(chuàng)建和使用DataSet---使用序列
(1,"Tom") (2,"Mary")
測(cè)試數(shù)據(jù)
(1)定義case class
case class MyData(a:Int,b:String)
(2)使用序列創(chuàng)建DataSet
val DS = Seq(MyData(1,"Tom"),MyData(2,"Mary")).toDS
(二)創(chuàng)建和使用DataSet---通過(guò)case class作為編碼器,將DataFrame轉(zhuǎn)換成DataSet
(1)定義case class
case class Person(name:String,age:BigInt)
(2)讀入JSON的數(shù)據(jù)
val df = spark.read.json("/root/temp/people.json")
(3)將DataFrame轉(zhuǎn)換成DataSet
val PersonDS =df.as[Person]
(三)創(chuàng)建和使用DataSet---讀取HDFS數(shù)據(jù)文件
(1)讀取HDFS的文件,直接創(chuàng)建DataSet
val lineDS = spark.read.text("hdfs://bigdata111:9000/input/data.txt").as[String]
(2)分詞操作,查詢長(zhǎng)度大于3的單詞
val words = lineDS.flatMap(_.split(" ")).filter(_.length > 3)
words.show
words.collect
到此這篇關(guān)于SparkSQl簡(jiǎn)介及運(yùn)行原理的文章就介紹到這了,更多相關(guān)SparkSQl使用內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java構(gòu)造代碼塊,靜態(tài)代碼塊原理與用法實(shí)例分析
這篇文章主要介紹了Java構(gòu)造代碼塊,靜態(tài)代碼塊,結(jié)合實(shí)例形式分析了Java構(gòu)造代碼塊,靜態(tài)代碼塊的功能、原理、用法及操作注意事項(xiàng),需要的朋友可以參考下2020-04-04java使用URLDecoder和URLEncoder對(duì)中文字符進(jìn)行編碼和解碼
這篇文章主要介紹了java 使用 URLDecoder 和 URLEncoder 對(duì)中文字符進(jìn)行編碼和解碼,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-07-07SpringBoot 內(nèi)嵌 camunda的配置方法
Camunda是一個(gè)基于Java的框架,支持用于工作流和流程自動(dòng)化的BPMN、用于案例管理的CMMN和用于業(yè)務(wù)決策管理的DMN,這篇文章主要介紹了SpringBoot 內(nèi)嵌 camunda,需要的朋友可以參考下2024-06-06數(shù)據(jù)庫(kù)連接超時(shí)java處理的兩種方式
這篇文章主要介紹了數(shù)據(jù)庫(kù)連接超時(shí)java處理的兩種方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-04-04Java運(yùn)行時(shí)jar終端輸出的中文日志亂碼兩種解決方式
jar包啟動(dòng),今天java開(kāi)發(fā)過(guò)來(lái)找,說(shuō)jar包啟動(dòng)日志是亂碼,這篇文章主要給大家介紹了關(guān)于Java運(yùn)行時(shí)jar終端輸出的中文日志亂碼的兩種解決方式,文中通過(guò)圖文介紹的非常詳細(xì),需要的朋友可以參考下2024-01-01