Spark Streaming算子開發(fā)實(shí)例
Spark Streaming算子開發(fā)實(shí)例
transform算子開發(fā)
transform操作應(yīng)用在DStream上時(shí),可以用于執(zhí)行任意的RDD到RDD的轉(zhuǎn)換操作,還可以用于實(shí)現(xiàn)DStream API中所沒有提供的操作,比如說,DStreamAPI中并沒有提供將一個(gè)DStream中的每個(gè)batch,與一個(gè)特定的RDD進(jìn)行join的操作,DStream中的join算子只能join其他DStream,但是我們自己就可以使用transform操作來實(shí)現(xiàn)該功能。
實(shí)例:黑名單用戶實(shí)時(shí)過濾
package StreamingDemo import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** * 實(shí)時(shí)黑名單過濾 */ object TransformDemo { def main(args: Array[String]): Unit = { //設(shè)置日志級別 Logger.getLogger("org").setLevel(Level.WARN) val conf = new SparkConf() .setAppName(this.getClass.getSimpleName) .setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(2)) //創(chuàng)建一個(gè)黑名單的RDD val blackRDD = ssc.sparkContext.parallelize(Array(("zs", true), ("lisi", true))) //通過socket從nc中獲取數(shù)據(jù) val linesDStream = ssc.socketTextStream("Hadoop01", 6666) /** * 過濾黑名單用戶發(fā)言 * zs sb sb sb sb * lisi fuck fuck fuck * jack hello */ linesDStream .map(x => { val info = x.split(" ") (info(0), info.toList.tail.mkString(" ")) }) .transform(rdd => { //transform是一個(gè)RDD->RDD的操作,所以返回值必須是RDD /** * 經(jīng)過leftouterjoin操作之后,產(chǎn)生的結(jié)果如下: * (zs,(sb sb sb sb),Some(true))) * (lisi,(fuck fuck fuck),some(true))) * (jack,(hello,None)) */ val joinRDD = rdd.leftOuterJoin(blackRDD) //如果是Some(true)的,說明就是黑名單用戶,如果是None的,說明不在黑名單內(nèi),把非黑名單的用戶保留下來 val filterRDD = joinRDD.filter(x => x._2._2.isEmpty) filterRDD }) .map(x=>(x._1,x._2._1)).print() ssc.start() ssc.awaitTermination() } }
測試
啟動(dòng)nc,傳入用戶及其發(fā)言信息
可以看到程序?qū)崟r(shí)的過濾掉了在黑名單里的用戶發(fā)言
updateStateByKey算子開發(fā)
updateStateByKey算子可以保持任意狀態(tài),同時(shí)不斷有新的信息進(jìn)行更新,這個(gè)算子可以為每個(gè)key維護(hù)一份state,并持續(xù)不斷的更新state。對于每個(gè)batch來說,Spark都會(huì)為每個(gè)之前已經(jīng)存在的key去應(yīng)用一次State更新函數(shù),無論這個(gè)key在batch中是否有新的值,如果State更新函數(shù)返回的值是none,那么這個(gè)key對應(yīng)的state就會(huì)被刪除;對于新出現(xiàn)的key也會(huì)執(zhí)行state更新函數(shù)。
要使用該算子,必須進(jìn)行兩個(gè)步驟
- 定義state——state可以是任意的數(shù)據(jù)類型
- 定義state更新函數(shù)——用一個(gè)函數(shù)指定如何使用之前的狀態(tài),以及從輸入流中獲取新值更新狀態(tài)
注意:updateStateByKey操作,要求必須開啟Checkpoint機(jī)制
實(shí)例:基于緩存的實(shí)時(shí)WordCount
package StreamingDemo import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** * 基于緩存的實(shí)時(shí)WordCount,在全局范圍內(nèi)統(tǒng)計(jì)單詞出現(xiàn)次數(shù) */ object UpdateStateByKeyDemo { def main(args: Array[String]): Unit = { //設(shè)置日志級別 Logger.getLogger("org").setLevel(Level.WARN) /** * 如果沒有啟用安全認(rèn)證或者從Kerberos獲取的用戶為null,那么獲取HADOOP_USER_NAME環(huán)境變量, * 并將它的值作為Hadoop執(zhí)行用戶設(shè)置hadoop username * 這里實(shí)驗(yàn)了一下在沒有啟用安全認(rèn)證的情況下,就算不顯式添加,也會(huì)自動(dòng)獲取我的用戶名 */ //System.setProperty("HADOOP_USER_NAME","Setsuna") val conf = new SparkConf() .setAppName(this.getClass.getSimpleName) .setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(2)) //設(shè)置Checkpoint存放的路徑 ssc.checkpoint("hdfs://Hadoop01:9000/checkpoint") //創(chuàng)建輸入DStream val lineDStream = ssc.socketTextStream("Hadoop01", 6666) val wordDStream = lineDStream.flatMap(_.split(" ")) val pairsDStream = wordDStream.map((_, 1)) /** * state:代表之前的狀態(tài)值 * values:代表當(dāng)前batch中key對應(yīng)的values值 */ val resultDStream = pairsDStream.updateStateByKey((values: Seq[Int], state: Option[Int]) => { //當(dāng)state為none,表示沒有對這個(gè)單詞做統(tǒng)計(jì),則返回0值給計(jì)數(shù)器count var count = state.getOrElse(0) //遍歷values,累加新出現(xiàn)的單詞的value值 for (value <- values) { count += value } //返回key對應(yīng)的新state,即單詞的出現(xiàn)次數(shù) Option(count) }) //在控制臺輸出 resultDStream.print() ssc.start() ssc.awaitTermination() } }
測試
開啟nc,輸入單詞
控制臺實(shí)時(shí)輸出的結(jié)果
window滑動(dòng)窗口算子開發(fā)
Spark Streaming提供了滑動(dòng)窗口操作的支持,可以對一個(gè)滑動(dòng)窗口內(nèi)的數(shù)據(jù)執(zhí)行計(jì)算操作
在滑動(dòng)窗口中,包含批處理間隔、窗口間隔、滑動(dòng)間隔
- 對于窗口操作而言,在其窗口內(nèi)部會(huì)有N個(gè)批處理數(shù)據(jù)
- 批處理數(shù)據(jù)的大小由窗口間隔決定,而窗口間隔指的就是窗口的持續(xù)時(shí)間,也就是窗口的長度
- 滑動(dòng)時(shí)間間隔指的是經(jīng)過多長時(shí)間窗口滑動(dòng)一次,形成新的窗口,滑動(dòng)間隔默認(rèn)情況下和批處理時(shí)間間隔的相同
注意:滑動(dòng)時(shí)間間隔和窗口時(shí)間間隔的大小一定得設(shè)置為批處理間隔的整數(shù)倍
用一個(gè)官方的圖來作為說明
批處理間隔是1個(gè)時(shí)間單位,窗口間隔是3個(gè)時(shí)間單位,滑動(dòng)間隔是2個(gè)時(shí)間單位。對于初始的窗口time1-time3,只有窗口間隔滿足了才觸發(fā)數(shù)據(jù)的處理。所以滑動(dòng)窗口操作都必須指定兩個(gè)參數(shù),窗口長度和滑動(dòng)時(shí)間間隔。在Spark Streaming中對滑動(dòng)窗口的支持是比Storm更加完善的。
Window滑動(dòng)算子操作
算子 | 描述 |
window() | 對每個(gè)滑動(dòng)窗口的數(shù)據(jù)執(zhí)行自定義的計(jì)算 |
countByWindow() | 對每個(gè)滑動(dòng)窗口的數(shù)據(jù)執(zhí)行count操作 |
reduceByWindow() | 對每個(gè)滑動(dòng)窗口的數(shù)據(jù)執(zhí)行reduce操作 |
reduceByKeyAndWindow() | 對每個(gè)滑動(dòng)窗口的數(shù)據(jù)執(zhí)行reduceByKey操作 |
countByValueAndWindow() | 對每個(gè)滑動(dòng)窗口的數(shù)據(jù)執(zhí)行countByValue操作 |
reduceByKeyAndWindow算子開發(fā)
實(shí)例:在線熱點(diǎn)搜索詞實(shí)時(shí)滑動(dòng)統(tǒng)計(jì)
每隔2秒鐘,統(tǒng)計(jì)最近5秒鐘的搜索詞中排名最靠前的3個(gè)搜索詞以及出現(xiàn)次數(shù)
package StreamingDemo import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** * 需求:每隔2秒鐘,統(tǒng)計(jì)最近5秒鐘的搜索詞中排名最靠前的3個(gè)搜索詞以及出現(xiàn)次數(shù) */ object ReduceByKeyAndWindowDemo { def main(args: Array[String]): Unit = { //設(shè)置日志級別 Logger.getLogger("org").setLevel(Level.WARN) //基礎(chǔ)配置 val conf = new SparkConf() .setAppName(this.getClass.getSimpleName) .setMaster("local[2]") //批處理間隔設(shè)置為1s val ssc = new StreamingContext(conf, Seconds(1)) val linesDStream = ssc.socketTextStream("Hadoop01", 6666) linesDStream .flatMap(_.split(" ")) //根據(jù)空格來做分詞 .map((_, 1)) //返回(word,1) .reduceByKeyAndWindow( //定義窗口如何計(jì)算的函數(shù) //x代表的是聚合后的結(jié)果,y代表的是這個(gè)Key對應(yīng)的下一個(gè)需要聚合的值 (x: Int, y: Int) => x + y, //窗口長度為5秒 Seconds(5), //窗口時(shí)間間隔為2秒 Seconds(2) ) .transform(rdd => { //transform算子對rdd做處理,轉(zhuǎn)換為另一個(gè)rdd //根據(jù)Key的出現(xiàn)次數(shù)來進(jìn)行排序,然后降序排列,獲取最靠前的3個(gè)搜索詞 val info: Array[(String, Int)] = rdd.sortBy(_._2, false).take(3) //將Array轉(zhuǎn)換為resultRDD val resultRDD = ssc.sparkContext.parallelize(info) resultRDD }) .map(x => s"${x._1}出現(xiàn)的次數(shù)是:${x._2}") .print() ssc.start() ssc.awaitTermination() } }
測試結(jié)果
DStream Output操作概覽
Spark Streaming允許DStream的數(shù)據(jù)輸出到外部系統(tǒng),DSteram中的所有計(jì)算,都是由output操作觸發(fā)的,foreachRDD輸出操作,也必須在里面對RDD執(zhí)行action操作,才能觸發(fā)對每一個(gè)batch的計(jì)算邏輯。
轉(zhuǎn)換 | 描述 |
print() | 在Driver中打印出DStream中數(shù)據(jù)的前10個(gè)元素。主要用于測試,或者是不需要執(zhí)行什么output操作時(shí),用于簡單觸發(fā)一下job。 |
saveAsTextFiles(prefix, [suffix]) |
將DStream中的內(nèi)容以文本的形式保存為文本文件,其中每次批處理間隔內(nèi)產(chǎn)生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。 |
saveAsObjectFiles(prefix , [suffix]) |
將DStream中的內(nèi)容按對象序列化并且以SequenceFile的格式保存。其中每次批處理間隔內(nèi)產(chǎn)生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。 |
saveAsHadoopFiles(pref ix, [suffix]) |
將DStream中的內(nèi)容以文本的形式保存為Hadoop文件,其中每次批處理間隔內(nèi)產(chǎn)生的文件 以prefix-TIME_IN_MS[.suffix]的方式命名。 |
foreachRDD(func) | 最基本的輸出操作,將func函數(shù)應(yīng)用于DStream中的RDD上,這個(gè)操作會(huì)輸出數(shù)據(jù)到外部系 統(tǒng),比如保存RDD到文件或者網(wǎng)絡(luò)數(shù)據(jù)庫等。需要注意的是func函數(shù)是在運(yùn)行該streaming 應(yīng)用的Driver進(jìn)程里執(zhí)行的。 |
foreachRDD算子開發(fā)
foreachRDD是最常用的output操作,可以遍歷DStream中的每個(gè)產(chǎn)生的RDD并進(jìn)行處理,然后將每個(gè)RDD中的數(shù)據(jù)寫入外部存儲(chǔ),如文件、數(shù)據(jù)庫、緩存等,通常在其中針對RDD執(zhí)行action操作,比如foreach
使用foreachRDD操作數(shù)據(jù)庫
通常在foreachRDD中都會(huì)創(chuàng)建一個(gè)Connection,比如JDBC Connection,然后通過Connection將數(shù)據(jù)寫入外部存儲(chǔ)
誤區(qū)一:在RDD的foreach操作外部創(chuàng)建Connection
dstream.foreachRDD { rdd => val connection=createNewConnection() rdd.foreach { record => connection.send(record) } }
這種方式是錯(cuò)誤的,這樣的方式會(huì)導(dǎo)致Connection對象被序列化后被傳輸?shù)矫恳粋€(gè)task上,但是Connection對象是不支持序列化的,所以也就無法被傳輸
誤區(qū)二:在RDD的foreach操作內(nèi)部創(chuàng)建Connection
dstream.foreachRDD { rdd => rdd.foreach { record => val connection = createNewConnection() connection.send(record) connection.close() } }
這種方式雖然是可以的,但是執(zhí)行效率會(huì)很低,因?yàn)樗鼤?huì)導(dǎo)致對RDD中的每一條數(shù)據(jù)都創(chuàng)建一個(gè)Connection對象,通常Connection對象的創(chuàng)建都是很消耗性能的
合理的方式
- 第一種:使用RDD的foreachPartition操作,并且在該操作內(nèi)部創(chuàng)建Connection對象,這樣就相當(dāng)于為RDD的每個(gè)partition創(chuàng)建一個(gè)Connection對象,節(jié)省了很多資源
- 第二種:自己手動(dòng)封裝一個(gè)靜態(tài)連接池,使用RDD的foreachPartition操作,并且在該操作內(nèi)部從靜態(tài)連接池中,通過靜態(tài)方法獲取到一個(gè)連接,連接使用完之后再放回連接池中。這樣的話,可以在多個(gè)RDD的partition之間復(fù)用連接了
實(shí)例:實(shí)時(shí)全局統(tǒng)計(jì)WordCount,并將結(jié)果保存到MySQL數(shù)據(jù)庫中
MySQL數(shù)據(jù)庫建表語句如下
CREATE TABLE wordcount ( word varchar(100) CHARACTER SET utf8 NOT NULL, count int(10) NOT NULL, PRIMARY KEY (word) ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
在IDEA中添加mysql-connector-java-5.1.40-bin.jar
代碼如下
連接池的代碼,其實(shí)一開始有想過用靜態(tài)塊來寫個(gè)池子直接獲取,但是如果考慮到池子寬度不夠用的問題,這樣的方式其實(shí)更好,一開始,實(shí)例化一個(gè)連接池出來,被調(diào)用獲取連接,當(dāng)連接全部都被獲取了的時(shí)候,池子空了,就再實(shí)例化一個(gè)池子出來
package StreamingDemo import java.sql.{Connection, DriverManager, SQLException} import java.util object JDBCManager { var connectionQue: java.util.LinkedList[Connection] = null /** * 從數(shù)據(jù)庫連接池中獲取連接對象 * @return */ def getConnection(): Connection = { synchronized({ try { //如果連接池是空的,那么就實(shí)例化一個(gè)Connection類型的鏈表 if (connectionQue == null) { connectionQue = new util.LinkedList[Connection]() for (i <- 0 until (10)) { //生成10個(gè)連接,并配置相關(guān)信息 val connection = DriverManager.getConnection( "jdbc:mysql://Hadoop01:3306/test?characterEncoding=utf-8", "root", "root") //將連接push進(jìn)連接池 connectionQue.push(connection) } } } catch { //捕獲異常并輸出 case e: SQLException => e.printStackTrace() } //如果連接池不為空,則返回表頭元素,并將它在鏈表里刪除 return connectionQue.poll() }) } /** * 當(dāng)連接對象用完后,需要調(diào)用這個(gè)方法歸還連接 * @param connection */ def returnConnection(connection: Connection) = { //插入元素 connectionQue.push(connection) } def main(args: Array[String]): Unit = { //main方法測試 getConnection() println(connectionQue.size()) } }
wordcount代碼
package StreamingDemo import org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkConf, streaming} import org.apache.spark.streaming.{Seconds, StreamingContext} object ForeachRDDDemo { def main(args: Array[String]): Unit = { //設(shè)置日志級別,避免INFO信息過多 Logger.getLogger("org").setLevel(Level.WARN) //設(shè)置Hadoop的用戶,不加也可以 System.setProperty("HADOOP_USER_NAME", "Setsuna") //Spark基本配置 val conf = new SparkConf() .setAppName(this.getClass.getSimpleName) .setMaster("local[2]") val ssc = new StreamingContext(conf, streaming.Seconds(2)) //因?yàn)橐褂胾pdateStateByKey,所以需要使用checkpoint ssc.checkpoint("hdfs://Hadoop01:9000/checkpoint") //設(shè)置socket,跟nc配置的一樣 val linesDStream = ssc.socketTextStream("Hadoop01", 6666) val wordCountDStream = linesDStream .flatMap(_.split(" ")) //根據(jù)空格做分詞 .map((_, 1)) //生成(word,1) .updateStateByKey((values: Seq[Int], state: Option[Int]) => { //實(shí)時(shí)更新狀態(tài)信息 var count = state.getOrElse(0) for (value <- values) { count += value } Option(count) }) wordCountDStream.foreachRDD(rdd => { if (!rdd.isEmpty()) { rdd.foreachPartition(part => { //從連接池中獲取連接 val connection = JDBCManager.getConnection() part.foreach(data => { val sql = //往wordcount表中插入wordcount信息,on duplicate key update子句是有則更新無則插入 s"insert into wordcount (word,count) " + s"values ('${data._1}',${data._2}) on duplicate key update count=${data._2}" //使用prepareStatement來使用sql語句 val pstmt = connection.prepareStatement(sql) pstmt.executeUpdate() }) //在連接處提交完數(shù)據(jù)后,歸還連接到連接池 JDBCManager.returnConnection(connection) }) } }) ssc.start() ssc.awaitTermination() } }
打開nc,輸入數(shù)據(jù)
在另一個(gè)終端對wordcount的結(jié)果進(jìn)行查詢,可以發(fā)現(xiàn)是實(shí)時(shí)發(fā)生變化的
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Java Map接口及其實(shí)現(xiàn)類原理解析
這篇文章主要介紹了Java Map接口及其實(shí)現(xiàn)類原理解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-03-03關(guān)于SpringSecurity配置403權(quán)限訪問頁面的完整代碼
本文給大家分享SpringSecurity配置403權(quán)限訪問頁面的完整代碼,配置之前和配置之后的詳細(xì)介紹,代碼簡單易懂,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2021-06-06spring boot使用properties定義短信模板的方法教程
這篇文章主要給大家介紹了關(guān)于spring boot使用properties定義短信模板的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧。2018-01-01@Scheduled 如何讀取動(dòng)態(tài)配置文件
這篇文章主要介紹了@Scheduled 如何讀取動(dòng)態(tài)配置文件的操作,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-06-06Java中ThreadLocal線程變量的實(shí)現(xiàn)原理
本文主要介紹了Java中ThreadLocal線程變量的實(shí)現(xiàn)原理,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-06-06基于Restful接口調(diào)用方法總結(jié)(超詳細(xì))
下面小編就為大家?guī)硪黄赗estful接口調(diào)用方法總結(jié)(超詳細(xì))。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-08-08