亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Apache Flink的網(wǎng)絡(luò)協(xié)議棧詳細(xì)介紹

  發(fā)布時(shí)間:2019-06-28 17:26:24   作者:佚名   我要評(píng)論
Flink 的網(wǎng)絡(luò)協(xié)議棧是組成 flink-runtime 模塊的核心組件之一,本文中介紹了Apache Flink網(wǎng)絡(luò)協(xié)議棧,感興趣的朋友可以閱讀本文參考一下

Flink 的網(wǎng)絡(luò)協(xié)議棧是組成 flink-runtime 模塊的核心組件之一,是每個(gè) Flink 作業(yè)的核心。它連接所有 TaskManager 的各個(gè)子任務(wù)(Subtask),因此,對(duì)于 Flink 作業(yè)的性能包括吞吐與延遲都至關(guān)重要。與 TaskManager 和 JobManager 之間通過(guò)基于 Akka 的 RPC 通信的控制通道不同,TaskManager 之間的網(wǎng)絡(luò)協(xié)議棧依賴于更加底層的 Netty API。

本文將首先介紹 Flink 暴露給流算子(Stream operator)的高層抽象,然后詳細(xì)介紹 Flink 網(wǎng)絡(luò)協(xié)議棧的物理實(shí)現(xiàn)和各種優(yōu)化、優(yōu)化的效果以及 Flink 在吞吐量和延遲之間的權(quán)衡。

Apache Flink的網(wǎng)絡(luò)協(xié)議棧詳細(xì)介紹

1.邏輯視圖

Flink 的網(wǎng)絡(luò)協(xié)議棧為彼此通信的子任務(wù)提供以下邏輯視圖,例如在 A 通過(guò) keyBy() 操作進(jìn)行數(shù)據(jù) Shuffle :

這一過(guò)程建立在以下三種基本概念的基礎(chǔ)上:

▼ 子任務(wù)輸出類型(ResultPartitionType):

Pipelined(有限的或無(wú)限的):一旦產(chǎn)生數(shù)據(jù)就可以持續(xù)向下游發(fā)送有限數(shù)據(jù)流或無(wú)限數(shù)據(jù)流。 Blocking:僅在生成完整結(jié)果后向下游發(fā)送數(shù)據(jù)。

▼ 調(diào)度策略:

同時(shí)調(diào)度所有任務(wù)(Eager):同時(shí)部署作業(yè)的所有子任務(wù)(用于流作業(yè))。

上游產(chǎn)生第一條記錄部署下游(Lazy):一旦任何生產(chǎn)者生成任何輸出,就立即部署下游任務(wù)。

上游產(chǎn)生完整數(shù)據(jù)部署下游:當(dāng)任何或所有生產(chǎn)者生成完整數(shù)據(jù)后,部署下游任務(wù)。

▼ 數(shù)據(jù)傳輸:

高吞吐:Flink 不是一個(gè)一個(gè)地發(fā)送每條記錄,而是將若干記錄緩沖到其網(wǎng)絡(luò)緩沖區(qū)中并一次性發(fā)送它們。這降低了每條記錄的發(fā)送成本因此提高了吞吐量。 低延遲:當(dāng)網(wǎng)絡(luò)緩沖區(qū)超過(guò)一定的時(shí)間未被填滿時(shí)會(huì)觸發(fā)超時(shí)發(fā)送,通過(guò)減小超時(shí)時(shí)間,可以通過(guò)犧牲一定的吞吐來(lái)獲取更低的延遲。

我們將在下面深入 Flink 網(wǎng)絡(luò)協(xié)議棧的物理實(shí)現(xiàn)時(shí)看到關(guān)于吞吐延遲的優(yōu)化。對(duì)于這一部分,讓我們?cè)敿?xì)說(shuō)明輸出類型與調(diào)度策略。首先,需要知道的是子任務(wù)的輸出類型和調(diào)度策略是緊密關(guān)聯(lián)的,只有兩者的一些特定組合才是有效的。

Pipelined 結(jié)果是流式輸出,需要目標(biāo) Subtask 正在運(yùn)行以便接收數(shù)據(jù)。因此需要在上游 Task 產(chǎn)生數(shù)據(jù)之前或者產(chǎn)生第一條數(shù)據(jù)的時(shí)候調(diào)度下游目標(biāo) Task 運(yùn)行。批處理作業(yè)生成有界結(jié)果數(shù)據(jù),而流式處理作業(yè)產(chǎn)生無(wú)限結(jié)果數(shù)據(jù)。

批處理作業(yè)也可能以阻塞方式產(chǎn)生結(jié)果,具體取決于所使用的算子和連接模式。在這種情況下,必須等待上游 Task 先生成完整的結(jié)果,然后才能調(diào)度下游的接收 Task 運(yùn)行。這能夠提高批處理作業(yè)的效率并且占用更少的資源。

下表總結(jié)了 Task 輸出類型以及調(diào)度策略的有效組合:

注釋:

目前 Flink 未使用 批處理 / 流計(jì)算統(tǒng)一完成后,可能適用于流式作業(yè)

此外,對(duì)于具有多個(gè)輸入的子任務(wù),調(diào)度以兩種方式啟動(dòng):當(dāng)所有或者任何上游任務(wù)產(chǎn)生第一條數(shù)據(jù)或者產(chǎn)生完整數(shù)據(jù)時(shí)調(diào)度任務(wù)運(yùn)行。要調(diào)整批處理作業(yè)中的輸出類型和調(diào)度策略,可以參考 ExecutionConfig#setExecutionMode()——尤其是 ExecutionMode,以及 ExecutionConfig#setDefaultInputDependencyConstraint()。

2.物理數(shù)據(jù)傳輸

為了理解物理數(shù)據(jù)連接,請(qǐng)回想一下,在 Flink 中,不同的任務(wù)可以通過(guò) Slotsharing group 共享相同 Slot。TaskManager 還可以提供多個(gè) Slot,以允許將同一任務(wù)的多個(gè)子任務(wù)調(diào)度到同一個(gè) TaskManager 上。

對(duì)于下圖所示的示例,我們假設(shè) 2 個(gè)并發(fā)為 4 的任務(wù)部署在 2 個(gè) TaskManager 上,每個(gè) TaskManager 有兩個(gè) Slot。TaskManager 1 執(zhí)行子任務(wù) A.1,A.2,B.1 和 B.2,TaskManager 2 執(zhí)行子任務(wù) A.3,A.4,B.3 和 B.4。在 A 和 B 之間是 Shuffle 連接類型,比如來(lái)自于 A 的 keyBy() 操作,在每個(gè) TaskManager 上會(huì)有 2x4 個(gè)邏輯連接,其中一些是本地的,另一些是遠(yuǎn)程的:

不同任務(wù)(遠(yuǎn)程)之間的每個(gè)網(wǎng)絡(luò)連接將在 Flink 的網(wǎng)絡(luò)堆棧中獲得自己的 TCP 通道。但是,如果同一任務(wù)的不同子任務(wù)被調(diào)度到同一個(gè) TaskManager,則它們與同一個(gè) TaskManager 的網(wǎng)絡(luò)連接將多路復(fù)用并共享同一個(gè) TCP 信道以減少資源使用。在我們的例子中,這適用于 A.1→B.3,A.1→B.4,以及 A.2→B.3 和 A.2→B.4,如下圖所示:

每個(gè)子任務(wù)的輸出結(jié)果稱為 ResultPartition,每個(gè) ResultPartition 被分成多個(gè)單獨(dú)的 ResultSubpartition- 每個(gè)邏輯通道一個(gè)。Flink 的網(wǎng)絡(luò)協(xié)議棧在這一點(diǎn)的處理上,不再處理單個(gè)記錄,而是將一組序列化的記錄填充到網(wǎng)絡(luò)緩沖區(qū)中進(jìn)行處理。每個(gè)子任務(wù)本地緩沖區(qū)中最多可用 Buffer 數(shù)目為(每個(gè)發(fā)送方和接收方各一個(gè)):

#channels * buffers-per-channel + floating-buffers-per-gate

單個(gè) TaskManager 上的網(wǎng)絡(luò)層 Buffer 總數(shù)通常不需要配置。有關(guān)如何在需要時(shí)進(jìn)行配置的詳細(xì)信息,請(qǐng)參閱配置網(wǎng)絡(luò)緩沖區(qū)的文檔。

▼ 造成反壓(1)

每當(dāng)子任務(wù)的數(shù)據(jù)發(fā)送緩沖區(qū)耗盡時(shí)——數(shù)據(jù)駐留在 Subpartition 的緩沖區(qū)隊(duì)列中或位于更底層的基于 Netty 的網(wǎng)絡(luò)堆棧內(nèi),生產(chǎn)者就會(huì)被阻塞,無(wú)法繼續(xù)發(fā)送數(shù)據(jù),而受到反壓。接收端以類似的方式工作:Netty 收到任何數(shù)據(jù)都需要通過(guò)網(wǎng)絡(luò) Buffer 傳遞給 Flink。如果相應(yīng)子任務(wù)的網(wǎng)絡(luò)緩沖區(qū)中沒(méi)有足夠可用的網(wǎng)絡(luò) Buffer,F(xiàn)link 將停止從該通道讀取,直到 Buffer 可用。這將反壓該多路復(fù)用上的所有發(fā)送子任務(wù),因此也限制了其他接收子任務(wù)。下圖說(shuō)明了過(guò)載的子任務(wù) B.4,它會(huì)導(dǎo)致多路復(fù)用的反壓,也會(huì)導(dǎo)致子任務(wù) B.3 無(wú)法接受和處理數(shù)據(jù),即使是 B.3 還有足夠的處理能力。

為了防止這種情況發(fā)生,F(xiàn)link 1.5 引入了自己的流量控制機(jī)制。

3.Credit-based 流量控制

Credit-based 流量控制可確保發(fā)送端已經(jīng)發(fā)送的任何數(shù)據(jù),接收端都具有足夠的能力(Buffer)來(lái)接收。新的流量控制機(jī)制基于網(wǎng)絡(luò)緩沖區(qū)的可用性,作為 Flink 之前機(jī)制的自然延伸。每個(gè)遠(yuǎn)程輸入通道(RemoteInputChannel)現(xiàn)在都有自己的一組獨(dú)占緩沖區(qū)(Exclusive buffer),而不是只有一個(gè)共享的本地緩沖池(LocalBufferPool)。與之前不同,本地緩沖池中的緩沖區(qū)稱為流動(dòng)緩沖區(qū)(Floating buffer),因?yàn)樗鼈儠?huì)在輸出通道間流動(dòng)并且可用于每個(gè)輸入通道。

數(shù)據(jù)接收方會(huì)將自身的可用 Buffer 作為 Credit 告知數(shù)據(jù)發(fā)送方(1 buffer = 1 credit)。每個(gè) Subpartition 會(huì)跟蹤下游接收端的 Credit(也就是可用于接收數(shù)據(jù)的 Buffer 數(shù)目)。只有在相應(yīng)的通道(Channel)有 Credit 的時(shí)候 Flink 才會(huì)向更底層的網(wǎng)絡(luò)協(xié)議棧發(fā)送數(shù)據(jù)(以 Buffer 為粒度),并且每發(fā)送一個(gè) Buffer 的數(shù)據(jù),相應(yīng)的通道上的 Credit 會(huì)減 1。除了發(fā)送數(shù)據(jù)本身外,數(shù)據(jù)發(fā)送端還會(huì)發(fā)送相應(yīng) Subpartition 中有多少正在排隊(duì)發(fā)送的 Buffer 數(shù)(稱之為 Backlog)給下游。數(shù)據(jù)接收端會(huì)利用這一信息(Backlog)去申請(qǐng)合適數(shù)量的 Floating buffer 用于接收發(fā)送端的數(shù)據(jù),這可以加快發(fā)送端堆積數(shù)據(jù)的處理。接收端會(huì)首先申請(qǐng)和 Backlog 數(shù)量相等的 Buffer,但可能無(wú)法申請(qǐng)到全部,甚至一個(gè)都申請(qǐng)不到,這時(shí)接收端會(huì)利用已經(jīng)申請(qǐng)到的 Buffer 進(jìn)行數(shù)據(jù)接收,并監(jiān)聽(tīng)是否有新的 Buffer 可用。

Credit-based 的流控使用 Buffers-per-channel 來(lái)指定每個(gè) Channel 有多少獨(dú)占的 Buffer,使用 Floating-buffers-per-gate 來(lái)指定共享的本地緩沖池(Local buffer pool)大小(可選3),通過(guò)共享本地緩沖池,Credit-based 流控可以使用的 Buffer 數(shù)目可以達(dá)到與原來(lái)非 Credit-based 流控同樣的大小。這兩個(gè)參數(shù)的默認(rèn)值是被精心選取的,以保證新的 Credit-based 流控在網(wǎng)絡(luò)健康延遲正常的情況下至少可以達(dá)到與原策略相同的吞吐。可以根據(jù)實(shí)際的網(wǎng)絡(luò) RRT (round-trip-time)和帶寬對(duì)這兩個(gè)參數(shù)進(jìn)行調(diào)整。

注釋3:如果沒(méi)有足夠的 Buffer 可用,則每個(gè)緩沖池將獲得全局可用 Buffer 的相同份額(±1)。

相關(guān)文章

最新評(píng)論