亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Spark調(diào)優(yōu)多線程并行處理任務(wù)實(shí)現(xiàn)方式

 更新時間:2020年08月06日 10:21:41   作者:lshan  
這篇文章主要介紹了Spark調(diào)優(yōu)多線程并行處理任務(wù)實(shí)現(xiàn)方式,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下

方式1:

1. 明確 Spark中Job 與 Streaming中 Job 的區(qū)別

1.1 Spark Core

一個 RDD DAG Graph 可以生成一個或多個 Job(Action操作)

一個Job可以認(rèn)為就是會最終輸出一個結(jié)果RDD的一條由RDD組織而成的計(jì)算

Job在spark里應(yīng)用里是一個被調(diào)度的單位

1.2 Streaming

一個 batch 的數(shù)據(jù)對應(yīng)一個 DStreamGraph

而一個 DStreamGraph 包含一或多個關(guān)于 DStream 的輸出操作

每一個輸出對應(yīng)于一個Job,一個 DStreamGraph 對應(yīng)一個JobSet,里面包含一個或多個Job

2. Streaming Job的并行度

Job的并行度由兩個配置決定:

spark.scheduler.mode(FIFO/FAIR)
spark.streaming.concurrentJobs

一個 Batch 可能會有多個 Action 執(zhí)行,比如注冊了多個 Kafka 數(shù)據(jù)流,每個Action都會產(chǎn)生一個Job

所以一個 Batch 有可能是一批 Job,也就是 JobSet 的概念

這些 Job 由 jobExecutor 依次提交執(zhí)行

而 JobExecutor 是一個默認(rèn)池子大小為1的線程池,所以只能執(zhí)行完一個Job再執(zhí)行另外一個Job

這里說的池子,大小就是由spark.streaming.concurrentJobs 控制的

concurrentJobs 決定了向 Spark Core 提交Job的并行度

提交一個Job,必須等這個執(zhí)行完了,才會提交第二個

假設(shè)我們把它設(shè)置為2,則會并發(fā)的把 Job 提交給 Spark Core

Spark 有自己的機(jī)制決定如何運(yùn)行這兩個Job,這個機(jī)制其實(shí)就是FIFO或者FAIR(決定了資源的分配規(guī)則)

默認(rèn)是 FIFO,也就是先進(jìn)先出,把 concurrentJobs 設(shè)置為2,但是如果底層是FIFO,那么會優(yōu)先執(zhí)行先提交的Job

雖然如此,如果資源夠兩個job運(yùn)行,還是會并行運(yùn)行兩個Job

Spark Streaming 不同Batch任務(wù)可以并行計(jì)算么 https://developer.aliyun.com/article/73004

conf.setMaster("local[4]")
conf.set("spark.streaming.concurrentJobs", "3") //job 并行對
conf.set("spark.scheduler.mode", "FIFO")
val sc = new StreamingContext(conf, Seconds(5))

你會發(fā)現(xiàn),不同batch的job其實(shí)也可以并行運(yùn)行的,這里需要有幾個條件:

有延時發(fā)生了,batch無法在本batch完成

concurrentJobs > 1

如果scheduler mode 是FIFO則需要某個Job無法一直消耗掉所有資源

Mode是FAIR則盡力保證你的Job是并行運(yùn)行的,毫無疑問是可以并行的。

方式2:

場景1:

程序每次處理的數(shù)據(jù)量是波動的,比如周末比工作日多很多,晚八點(diǎn)比凌晨四點(diǎn)多很多。

一個spark程序處理的時間在1-2小時波動是OK的。而spark streaming程序不可以,如果每次處理的時間是1-10分鐘,就很蛋疼。
設(shè)置10分鐘吧,實(shí)際上10分鐘的也就那一段高峰時間,如果設(shè)置每次是1分鐘,很多時候會出現(xiàn)程序處理不過來,排隊(duì)過多的任務(wù)延遲更久,還可能出現(xiàn)程序崩潰的可能。

場景2:

  • 程序需要處理的相似job數(shù)隨著業(yè)務(wù)的增長越來越多
  • 我們知道spark的api里無相互依賴的stage是并行處理的,但是job之間是串行處理的。
  • spark程序通常是離線處理,比如T+1之類的延遲,時間變長是可以容忍的。而spark streaming是準(zhǔn)實(shí)時的,如果業(yè)務(wù)增長導(dǎo)致延遲增加就很不合理。

spark雖然是串行執(zhí)行job,但是是可以把job放到線程池里多線程執(zhí)行的。如何在一個SparkContext中提交多個任務(wù)

DStream.foreachRDD{
   rdd =>
    //創(chuàng)建線程池
    val executors=Executors.newFixedThreadPool(rules.length)
    //將規(guī)則放入線程池
    for( ru <- rules){
     val task= executors.submit(new Callable[String] {
      override def call(): String ={
       //執(zhí)行規(guī)則
       runRule(ru,spark)
      }
     })
    }
    //每次創(chuàng)建的線程池執(zhí)行完所有規(guī)則后shutdown
    executors.shutdown()
  }

注意點(diǎn)

1.最后需要executors.shutdown()。

  • 如果是executors.shutdownNow()會發(fā)生未執(zhí)行完的task強(qiáng)制關(guān)閉線程。
  • 如果使用executors.awaitTermination()則會發(fā)生阻塞,不是我們想要的結(jié)果。
  • 如果沒有這個shutdowm操作,程序會正常執(zhí)行,但是長時間會產(chǎn)生大量無用的線程池,因?yàn)槊看蝔oreachRDD都會創(chuàng)建一個線程池。

2.可不可以將創(chuàng)建線程池放到foreachRDD外面?

不可以,這個關(guān)系到對于scala閉包到理解,經(jīng)測試,第一次或者前幾次batch是正常的,后面的batch無線程可用。

3.線程池executor崩潰了就會導(dǎo)致數(shù)據(jù)丟失

原則上是這樣的,但是正常的代碼一般不會發(fā)生executor崩潰。至少我在使用的時候沒遇到過。

以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

  • 解析ConcurrentHashMap: put方法源碼分析

    解析ConcurrentHashMap: put方法源碼分析

    ConcurrentHashMap是由Segment數(shù)組結(jié)構(gòu)和HashEntry數(shù)組結(jié)構(gòu)組成。Segment的結(jié)構(gòu)和HashMap類似,是一種數(shù)組和鏈表結(jié)構(gòu),今天給大家普及java面試常見問題---ConcurrentHashMap知識,一起看看吧
    2021-06-06
  • Spring數(shù)據(jù)源及配置文件數(shù)據(jù)加密實(shí)現(xiàn)過程詳解

    Spring數(shù)據(jù)源及配置文件數(shù)據(jù)加密實(shí)現(xiàn)過程詳解

    這篇文章主要介紹了Spring數(shù)據(jù)源及配置文件數(shù)據(jù)加密實(shí)現(xiàn)過程詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-05-05
  • 淺談多線程中的鎖的幾種用法總結(jié)(必看)

    淺談多線程中的鎖的幾種用法總結(jié)(必看)

    下面小編就為大家?guī)硪黄獪\談多線程中的鎖的幾種用法總結(jié)(必看)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-05-05
  • IDEA加載項(xiàng)目沒有src目錄的問題及解決

    IDEA加載項(xiàng)目沒有src目錄的問題及解決

    這篇文章主要介紹了IDEA加載項(xiàng)目沒有src目錄的問題及解決方案,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-12-12
  • Spring?Security認(rèn)證器實(shí)現(xiàn)過程詳解

    Spring?Security認(rèn)證器實(shí)現(xiàn)過程詳解

    一些權(quán)限框架一般都包含認(rèn)證器和決策器,前者處理登陸驗(yàn)證,后者處理訪問資源的控制,這篇文章主要介紹了Spring?Security認(rèn)證器實(shí)現(xiàn)過程,需要的朋友可以參考下
    2022-06-06
  • 分布式調(diào)度器之Spring Task 的使用詳解

    分布式調(diào)度器之Spring Task 的使用詳解

    SpringTask是Spring框架中用于任務(wù)調(diào)度的組件,通過簡單的注解就能實(shí)現(xiàn)定時任務(wù)的創(chuàng)建和調(diào)度,可以通過配置線程池來實(shí)現(xiàn),本文給大家介紹分布式調(diào)度器之Spring Task 的使用,感興趣的朋友跟隨小編一起看看吧
    2024-10-10
  • jmap執(zhí)行失敗如何獲取heapdump詳解

    jmap執(zhí)行失敗如何獲取heapdump詳解

    這篇文章主要為大家介紹了jmap執(zhí)行失敗如何獲取heapdump詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-04-04
  • Java代碼實(shí)現(xiàn)酒店管理系統(tǒng)

    Java代碼實(shí)現(xiàn)酒店管理系統(tǒng)

    這篇文章主要為大家詳細(xì)介紹了Java代碼實(shí)現(xiàn)酒店管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-05-05
  • 新手初學(xué)Java-Map

    新手初學(xué)Java-Map

    Map簡介:將鍵映射到值的對象。一個映射不能包含重復(fù)的鍵;每個鍵最多只能映射到一個值。此接口取代 Dictionary 類,后者完全是一個抽象類,而不是一個接口
    2021-07-07
  • Spring?Boot指定外部配置文件簡單示例

    Spring?Boot指定外部配置文件簡單示例

    Spring Boot可以讓你將配置外部化,這樣你就可以在不同的環(huán)境中使用相同的應(yīng)用程序代碼,這篇文章主要給大家介紹了關(guān)于Spring?Boot指定外部配置文件的相關(guān)資料,需要的朋友可以參考下
    2024-01-01

最新評論