Flink狀態(tài)和容錯源碼解析
引言
- 計算模型
- DataStream基礎框架
- 事件時間和窗口
- 狀態(tài)和容錯
- 部署&調度
- 存儲體系
- 底層支撐
Flink中提供了State(狀態(tài))這個概念來保存中間計算結果和緩存數(shù)據,按照不同的場景,F(xiàn)link提供了多種不同類型的State,同時為了實現(xiàn)Exactly once的語義,F(xiàn)link參考Chandy-Lamport算法實現(xiàn)了Asynchronous Barrier Snapshotting算法(簡稱ABS),本篇我們來了解Flink狀態(tài)的底層實現(xiàn)及如何進行快照處理。
概述
Flink中提供了2種State,一種是Keyed State,是用在Keyed DataStream(即每條記錄是有一個key)。即每個key對應有一個狀態(tài)信息,用于數(shù)據處理場景中需要按key記錄狀態(tài)的場景,如風控場景下記錄用戶的狀態(tài),window操作中記錄窗口的數(shù)據信息也是用keyed State來實現(xiàn)的。另一種是Operator State,即算子的狀態(tài),如在KafkaSource中記錄消費的偏移量。本篇將從狀態(tài)的管理、數(shù)據的存儲和備份恢復等來介紹底層機制。
State
Keyed State
Keyed State按照存儲數(shù)據類型的不同,分為如下幾類
- ValueState: 保存一個值,支持查詢和更新
- ListState: 保存一個元素列表
- ReducingState: 保存一個單值,表示添加到狀態(tài)的所有數(shù)據的聚合,有定義一個ReduceFunction來進行聚合處理
- AggregatingState<IN, OUT>: 保留一個單值,也是添加到狀態(tài)的所有數(shù)據的聚合,和ReducingState不同的是,輸入數(shù)據和結果值的類型可以不同。
- MapState: 維護的一個映射列表 創(chuàng)建這些狀態(tài)時需要同時定義一個StateDescriptor,這里面定義一個狀態(tài)的名字(唯一名稱),通過StateDescriptor就可以來引用具體狀態(tài)實例。
狀態(tài)實例管理及數(shù)據存儲
從Keyed State的定義來看,這里的關系應該是一個key到State的映射,而在使用時卻沒有看到這個key,另外具體的狀態(tài)是如何保存的,本節(jié)我們來深入分析

從上圖可以看出,各種不同的State,是通過KeyedStateStore來獲取到的,KeyedStateStore只是一個代理類,其底層是調用KeyedStateBackend來負責具體的處理,具體的實現(xiàn)類有如下2個
- HeapKeyedStateBackend: 基于內存的StateBackend,把數(shù)據保存在Java的Heap里面
- RocksDBKeyedStateBackend: 狀態(tài)數(shù)據保存在RocksDB中
下面我們從如下幾個功能點來看具體各個StateBackend的實現(xiàn)
- 各個實例的管理
- 內部數(shù)據保存
- 數(shù)據過期處理
- 數(shù)據快照處理
- 數(shù)據重分布
HeapKeyedStateBackend
基于內存的StateBackend,將數(shù)據保存在Java的Heap中,適合小數(shù)據狀態(tài)場景
各個實例的管理
private final Map<String, StateTable<K, ?, ?>> registeredKVStates;
使用一個Map來保存各個不同的KeyedState,key為定義的名字,StateTable中存儲的具體的數(shù)據。StateTable的內部在下面內部數(shù)據保存中介紹

- 內部數(shù)據保存 內部數(shù)據存儲是通過StateTable來管理的,里面定義了一個StateMap的數(shù)組來存儲具體的數(shù)據,具體的數(shù)據通過計算key的KeyGroupIndex來確定把數(shù)據存放到哪個StateMap。
- StateMap:存儲數(shù)據,具體的實現(xiàn)有CopyOnWriteStateMap和CopyOnWriteSkipListStateMap2個。
- KeyGroup:將key分組管理,這樣方便在后續(xù)通過savepoint來恢復任務時如果調整了并行度,這樣方便對key按KeyGroup重新分布。keyGroup的計算為將key做hash處理后按最大并行度(maxParallelism)取余
- 數(shù)據過期處理 如果有設置了數(shù)據過期處理,這種生成的State會每個value都帶上一個時間戳數(shù)據,如MapState是每個value都帶一個時間戳,具體的實例類型也會不同,對應為如下幾個
- TtlValueState
- TtlListState
- TtlReducingState
- TtlAggregatingState
- TtlMapState 具體數(shù)據的清理處理通過TtlIncrementalCleanup類來實現(xiàn)
- 數(shù)據快照處理 HeapKeyedStateBackend的snapshot處理由類HeapSnapshotStrategy來負責,調用的方法為asyncSnapshot(),只支持全量的快照處理。
- 數(shù)據重分布 在實際的數(shù)據處理中由于并行度設置的不合理,在日常運維過程中會涉及到并行度的調整,算子的并行度調整后,那對key的分布也會進行調整,這樣就會導致keyedState的數(shù)據進行重分布。Flink中引入了一個KeyGroup的概念,其對key做了個分組管理,KeyGroup的個數(shù)為最大并行度,具體實現(xiàn)為將key進行hash后然分類(hash值對KeyGroup數(shù)取余)到其中一個KeyGroup中。
RocksDBKeyedStateBackend
使用RocksDB來存儲狀態(tài),這個State backend可以存儲非常大的狀態(tài),如果超過了內存可以split到磁盤上。同樣我們分如下幾個階段來了解相關具體的實現(xiàn)
- 各個實例的管理 通過如下的Map來存儲定義的各個狀態(tài)信息,和前面HeapKeyedStateBackend類似,key為自己定義的名稱
LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation;
- 內部數(shù)據保存 從上面可以看到每個key對應的value是RocksDbKvStateInfo,這個其中有如下2個屬性 ColumnFamilyHandle:rocksdb庫中的類,提供了一個handle對應到rocksdb的ColumnFamily。
public static class RocksDbKvStateInfo implements AutoCloseable {
public final ColumnFamilyHandle columnFamilyHandle;
public final RegisteredStateMetaInfoBase metaInfo;
}
- 數(shù)據過期處理 通過RocksDB的CompactionFilter[1]功能來實現(xiàn)數(shù)據過期的處理,RocksDB項目中專門有個FlinkCompactionFilter的類用于Flink項目 CompactionFilter提供了一種在rocksdb進行compaction時候,根據自定義邏輯去刪除/修改key/value對的方法。如根據業(yè)務的ttl屬性刪除過期keys。
- 數(shù)據快照處理 RocksDBKeyedStateBackend支持2種快照處理方式,全量和增量,由于RocksDB底層是使用lsm樹來進行存儲的,所以比較方便實現(xiàn)增量數(shù)據的獲取
- 數(shù)據恢復處理 和前面HeapKeyedStateBackend的類似,這里就不再展開了。
OperatorState
接下面我們深入了解下OperatorState,在實際使用中如我們消費了kafka的數(shù)據需要記錄kafka消費的offset,還有一些場景需要將一些信息分發(fā)到所有的任務。這里需要使用到一類新的狀態(tài)處理,這種狀態(tài)是與每個算子綁定的,F(xiàn)link提供了如下3個類來支持
- ListState:通過一個list來保存相關的狀態(tài)信息,如果并行度調整了,其中的數(shù)據按新的并行度重新進行分布處理;
- UnionListState: 保存數(shù)據時和ListState類似,但是在出錯和從savepoint恢復數(shù)據時的策略會不一樣,其會將所有的狀態(tài)通過廣播的方式發(fā)給下游的每個任務,然后下游的任務來選擇自己需要的部分;
- BroadcastState: 用于每個任務上的狀態(tài)都要一樣的場景,這個在數(shù)據恢復時是把狀態(tài)復制到所有的任務。
類似KeyedState,這里也是通過一個Backend來管理對應的狀態(tài)數(shù)據,其接口定義為:OperatorStateStore。其實現(xiàn)類目前只有一個:DefaultOperatorStateBackend,我們也通過以下幾個部分來分別了解DefaultOperatorStateBackend的實現(xiàn)
- 各個實例的管理
private final Map<String, PartitionableListState<?>> registeredOperatorStates; private final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates;
這里通過2個Map來分別管理ListState(含UnionListState,后面都統(tǒng)一使用ListState來代指)和BroadcastState。 2. 內部數(shù)據保存 ListState的實現(xiàn)類為PartitionableListState,底層通過一個ArrayList來保存數(shù)據。BroadcastState定義的接口是kv的,所以其實現(xiàn)類HeapBroadcastState使用Map來存儲相應的數(shù)據。 3. 數(shù)據過期處理 operatorState不涉及到數(shù)據過期的處理 4. 數(shù)據快照處理 DefaultOperatorStateBackendSnapshotStrategy類來負責具體的快照處理,調用的方法為asyncSnapshot,分別對ListState和BroadcastState進行快照處理 5. 數(shù)據重分布 數(shù)據重分布的策略前面介紹各個State時已經介紹了,這里就不再重復介紹了。
上層封裝
Flink中對狀態(tài)的backend和checkpoint存儲策略進行了封裝定義。 StateBackend:定義一個Streaming應用的state如何在集群中的本地存儲。不同的實現(xiàn)使用不同的數(shù)據結構來存儲對應的狀態(tài)。其具體實現(xiàn)有如下2個:
- HashMapStateBackend:將狀態(tài)保存在TaskManager的內存中(JVM heap),其底層KeyedStateBackend使用的是HeapKeyedStateBackend,OperatorStateBackend使用的是DefaultOperatorStateBackend
- EmbeddedRocksDBStateBackend:狀態(tài)保存在一個嵌入的Rockdb實例中,其底層實現(xiàn)KeyedStateBackend使用的是RocksDBKeyedStateBackend,OperatorStateBackend使用的和HashMapStateBackend一致,也是DefaultOperatorStateBackend 說明:原來的RocksDBStateBackend、FsStateBackend和MemoryStateBackend不建議使用了。 另外對checkpoint的存儲,CheckpointStorage接口定義了在Streaming應用中StateBackend在容錯性方面如何存儲其狀態(tài)數(shù)據。其實現(xiàn)有如下2個
- JobManagerCheckpointStorage:將checkpoints狀態(tài)數(shù)據存儲到JobManager的內存中,savepoints保存到文件系統(tǒng)
- FileSystemCheckpointStorage:將checkpoints狀態(tài)存儲到文件系統(tǒng),如保存到HDFS上。
總結
本篇我們了解了Flink的相關的狀態(tài)的內容和checkpoint的保存策略。
- State分為KeyedState和OperatorState,KeyedState主要存儲針對記錄級別的狀態(tài),如window操作時的狀態(tài)。OperatorState主要存儲針對算子的狀態(tài),如消費kafka的offset信息等;每類狀態(tài)分別提供了不同種類的狀態(tài)類來支持不同場景的狀態(tài)保存需求
- Flink底層通過StateBackend來保存對應的狀態(tài)數(shù)據,主要有通過內存保存和使用RocksDB保存這2類,另外在其具體實現(xiàn)中為了方便并行度調整后對狀態(tài)的重新拆分處理,引入了KeyGroup的概念
- 在用戶使用層,對上述2種狀態(tài)進行了封裝,接口為:StateBackend,來管理KeyedState和OperatorState,另外將checkpoint的存儲策略從原來的StateBackend中拆分出來。目前支持有基于內存和外部存儲的2類checkpoint存儲策略。 最后由于checkpoint的觸發(fā)以及任務恢復的處理與整體計算處理比較緊密,這塊等介紹完Flink部署模式后再來詳細梳理checkpoint的處理過程。
參考文檔
以上就是Flink狀態(tài)和容錯源碼解析的詳細內容,更多關于Flink狀態(tài)容錯的資料請關注腳本之家其它相關文章!
相關文章
解決 IDEA 創(chuàng)建 Gradle 項目沒有src目錄問題
這篇文章主要介紹了解決 IDEA 創(chuàng)建 Gradle 項目沒有src目錄問題,本文圖文并茂給大家介紹的非常詳細,具有一定的參考借鑒價值,需要的朋友可以參考下2018-06-06
spring-cloud入門之spring-cloud-config(配置中心)
這篇文章主要介紹了spring-cloud入門之spring-cloud-config(配置中心),小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-01-01
Java使用Queryable-pageable實現(xiàn)分頁效果
這篇文章主要為大家介紹了Java如何使用Queryable-pageable從而實現(xiàn)分頁效果,文中的示例代碼簡潔易懂,感興趣的小伙伴可以動手嘗試一下2022-06-06
JavaScript中new運算符的實現(xiàn)過程解析
這篇文章主要介紹了JavaScript中new運算符的實現(xiàn)過程解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2019-10-10
SpringBoot集成Swagger2生成接口文檔的方法示例
我們提供Restful接口的時候,API文檔是尤為的重要,它承載著對接口的定義,描述等,本文主要介紹了SpringBoot集成Swagger2生成接口文檔的方法示例,需要的朋友們下面隨著小編來一起學習學習吧2018-12-12
SpringMVC xml文件路徑在web.xml中的配置方式
這篇文章主要介紹了SpringMVC xml文件路徑在web.xml中的配置方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09

