Flink JobGraph生成源碼解析
引言
在DataStream基礎(chǔ)中,由于其中的內(nèi)容較多,只是介紹了JobGraph的結(jié)果,而沒有涉及到StreamGraph到JobGraph的轉(zhuǎn)換過程。本篇我們來介紹下JobGraph的生成的詳情,重點是Operator可以串聯(lián)成Chain的條件
概念
首先我們來回顧下JobGraph中的相關(guān)概念
- JobVertex:job的頂點,即對應(yīng)的計算邏輯(這里用的是Vertex, 而前面用的是Node,有點差異),通過inputs記錄了所有來源的Edge,而輸出是ArrayList來記錄
- JobEdge: job的邊,記錄了源Vertex和目標(biāo)表Vertex.
- IntermediateDataSet: 定義了一個中間數(shù)據(jù)集,但并沒有存儲,只是記錄了一個Producer(JobVertex)和一個Consumer(JobEdge)
JobGraph生成
前面我們在介紹部署的時候,有介紹具體是通過PipelineExecutor的execute()方法來提交對應(yīng)的任務(wù),StreamGraph到JobGraph的轉(zhuǎn)換邏輯就是在該方法中處理的,具體是通過如下方法來進行處理
public static JobGraph getJobGraph( @Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration)
最后執(zhí)行轉(zhuǎn)換的類為FlinkPipelineTranslator,調(diào)用的是其中的translateToJobGraph方法。
JobGraph translateToJobGraph( Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism);
這里有2個不同的實現(xiàn)類
- StreamGraphTranslator:對StreamGraph的Pipeline進行轉(zhuǎn)換處理
- PlanTranslator:對Plan類型的Pipeline進行轉(zhuǎn)換處理,用于SQL場景。 而這2個分別對應(yīng)到2個不同的類來生成JobGraph,分別如下:
- StreamingJobGraphGenerator
- JobGraphGenerator 本篇我們重點介紹StreamGraph到JobGraph的轉(zhuǎn)換StreamingJobGraphGenerator, JogGraphGenerator這塊等到介紹FlinkSQL的時候來介紹。StreamingJobGraphGenerator類中具體轉(zhuǎn)換處理的邏輯如下:
private JobGraph createJobGraph() { preValidate(); jobGraph.setJobType(streamGraph.getJobType());  jobGraph.enableApproximateLocalRecovery( streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled()); // Generate deterministic hashes for the nodes in order to identify them across // submission iff they didn't change. Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph); // Generate legacy version hashes for backwards compatibility List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size()); for (StreamGraphHasher hasher : legacyStreamGraphHashers) { legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph)); } setChaining(hashes, legacyHashes); setPhysicalEdges(); setSlotSharingAndCoLocation(); setManagedMemoryFraction( Collections.unmodifiableMap(jobVertices), Collections.unmodifiableMap(vertexConfigs), Collections.unmodifiableMap(chainedConfigs), id -> streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(), id -> streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases()); configureCheckpointing(); jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings()); final Map<String, DistributedCache.DistributedCacheEntry> distributedCacheEntries = JobGraphUtils.prepareUserArtifactEntries( streamGraph.getUserArtifacts().stream() .collect(Collectors.toMap(e -> e.f0, e -> e.f1)), jobGraph.getJobID()); for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry : distributedCacheEntries.entrySet()) { jobGraph.addUserArtifact(entry.getKey(), entry.getValue()); } // set the ExecutionConfig last when it has been finalized try { jobGraph.setExecutionConfig(streamGraph.getExecutionConfig()); } catch (IOException e) { throw new IllegalConfigurationException( "Could not serialize the ExecutionConfig." + "This indicates that non-serializable types (like custom serializers) were registered"); } addVertexIndexPrefixInVertexName(); setVertexDescription(); return jobGraph; }
重點我們介紹以下幾點
生成hash值
對每個streamNode生成一個hash值,用于來標(biāo)識節(jié)點,用于重新提交任務(wù)后涉及恢復(fù)作業(yè)的場景。具體生成hash值的邏輯如下:
- 如果指定了id信息,如Transformation.getUid(), 就用該值來生成hash值
- 否則使用鏈上的輸出node和節(jié)點的輸入nodes的hash值來生成一個hash值 對具體的算法細節(jié)感興趣的同學(xué)可以深入研究StreamGraphHasherV2的具體內(nèi)容。
生成chain
如果連接的2個節(jié)點滿足一定的條件,就會把這2個節(jié)點放到一個chain里面,這樣可以避免上下游算子間發(fā)送數(shù)據(jù)的網(wǎng)絡(luò)開銷和序列化反序列化的性能開銷。判斷算子是否可以組成一個chain的判斷邏輯如下:
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) { StreamNode downStreamVertex = streamGraph.getTargetVertex(edge); return downStreamVertex.getInEdges().size() == 1 && isChainableInput(edge, streamGraph); } private static boolean isChainableInput(StreamEdge edge, StreamGraph streamGraph) { StreamNode upStreamVertex = streamGraph.getSourceVertex(edge); StreamNode downStreamVertex = streamGraph.getTargetVertex(edge); if (!(upStreamVertex.isSameSlotSharingGroup(downStreamVertex) && areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph) && arePartitionerAndExchangeModeChainable( edge.getPartitioner(), edge.getExchangeMode(), streamGraph.getExecutionConfig().isDynamicGraph()) && upStreamVertex.getParallelism() == downStreamVertex.getParallelism() && streamGraph.isChainingEnabled())) { return false; } // check that we do not have a union operation, because unions currently only work // through the network/byte-channel stack. // we check that by testing that each "type" (which means input position) is used only once for (StreamEdge inEdge : downStreamVertex.getInEdges()) { if (inEdge != edge && inEdge.getTypeNumber() == edge.getTypeNumber()) { return false; } } return true; }
具體解讀如下:
- 下游節(jié)點只有1個輸入邊
- 上游節(jié)點和下游節(jié)點是在同一個SlotSharingGroup,slotSharingGroup在沒有設(shè)置的情況下,默認為default;
- 上下游節(jié)點的算子的chaining策略是支持chain的,上游算子的chaining策略為ALWAYS\HEAD\HEAD_WITH_SOURCES,下游算子的chaining策略為ALWAYS或者(HEAD_WITH_SOURCES且上游算子為source算子,具體這些策略的說明見ChainingStrategy.java
- 邊的分區(qū)策略是ForwardForConsecutiveHashPartitioner或者分區(qū)策略是ForwardPartitioner且數(shù)據(jù)交換方式(StreamExchangeMode)不是批模式
- 上下游節(jié)點的并行度一致
- StreamGraph是允許Chaining的
總結(jié)
本篇介紹了StreamGraph到JobGraph的生成流程,重點是在上下游節(jié)點是需要滿足什么條件才能chain到一起的具體邏輯。
以上就是Flink JobGraph生成源碼解析的詳細內(nèi)容,更多關(guān)于Flink JobGraph生成的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
SpringCloud微服務(wù)續(xù)約實現(xiàn)源碼分析詳解
這篇文章主要介紹了SpringCloud微服務(wù)續(xù)約實現(xiàn)源碼分析,服務(wù)續(xù)期和服務(wù)注冊非常相似,服務(wù)注冊在Eureka?Client程序啟動之后開啟,并同時開啟服務(wù)續(xù)期的定時任務(wù)2022-11-11Spring?Cloud?Gateway遠程命令執(zhí)行漏洞分析(CVE-2022-22947)
使用Spring Cloud Gateway的應(yīng)用程序在Actuator端點啟用、公開和不安全的情況下容易受到代碼注入的攻擊,攻擊者可以惡意創(chuàng)建允許在遠程主機上執(zhí)行任意遠程執(zhí)行的請求,這篇文章主要介紹了Spring?Cloud?Gateway遠程命令執(zhí)行漏洞(CVE-2022-22947),需要的朋友可以參考下2023-03-03JDK17、JDK19、JDK1.8輕松切換(無坑版,小白也可以看懂!)
在做不同的java項目時候,因項目需要很可能來回切換jdk版本,下面這篇文章主要介紹了JDK17、JDK19、JDK1.8輕松切換的相關(guān)資料,文中通過圖文介紹的非常詳細,需要的朋友可以參考下2023-02-02Springboot如何通過yml配置文件為靜態(tài)成員變量賦值
這篇文章主要介紹了Springboot如何通過yml配置文件為靜態(tài)成員變量賦值,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-10-10