亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Flink JobGraph生成源碼解析

 更新時間:2022年12月01日 10:41:06   作者:xiangel  
這篇文章主要為大家介紹了Flink JobGraph生成源碼解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

引言

在DataStream基礎(chǔ)中,由于其中的內(nèi)容較多,只是介紹了JobGraph的結(jié)果,而沒有涉及到StreamGraph到JobGraph的轉(zhuǎn)換過程。本篇我們來介紹下JobGraph的生成的詳情,重點是Operator可以串聯(lián)成Chain的條件

概念

首先我們來回顧下JobGraph中的相關(guān)概念

  • JobVertex:job的頂點,即對應(yīng)的計算邏輯(這里用的是Vertex, 而前面用的是Node,有點差異),通過inputs記錄了所有來源的Edge,而輸出是ArrayList來記錄
  • JobEdge: job的邊,記錄了源Vertex和目標(biāo)表Vertex.
  • IntermediateDataSet: 定義了一個中間數(shù)據(jù)集,但并沒有存儲,只是記錄了一個Producer(JobVertex)和一個Consumer(JobEdge)

JobGraph生成

前面我們在介紹部署的時候,有介紹具體是通過PipelineExecutor的execute()方法來提交對應(yīng)的任務(wù),StreamGraph到JobGraph的轉(zhuǎn)換邏輯就是在該方法中處理的,具體是通過如下方法來進行處理

public static JobGraph getJobGraph(
            @Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration)

最后執(zhí)行轉(zhuǎn)換的類為FlinkPipelineTranslator,調(diào)用的是其中的translateToJobGraph方法。

JobGraph translateToJobGraph(
            Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism);

這里有2個不同的實現(xiàn)類

  • StreamGraphTranslator:對StreamGraph的Pipeline進行轉(zhuǎn)換處理
  • PlanTranslator:對Plan類型的Pipeline進行轉(zhuǎn)換處理,用于SQL場景。 而這2個分別對應(yīng)到2個不同的類來生成JobGraph,分別如下:
  • StreamingJobGraphGenerator
  • JobGraphGenerator 本篇我們重點介紹StreamGraph到JobGraph的轉(zhuǎn)換StreamingJobGraphGenerator, JogGraphGenerator這塊等到介紹FlinkSQL的時候來介紹。StreamingJobGraphGenerator類中具體轉(zhuǎn)換處理的邏輯如下:
 private JobGraph createJobGraph() {
        preValidate();
        jobGraph.setJobType(streamGraph.getJobType());
![image.png](https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/0603957ea34f4d6b9af96b686bd5fdb1~tplv-k3u1fbpfcp-watermark.image?)
        jobGraph.enableApproximateLocalRecovery(
                streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());
        // Generate deterministic hashes for the nodes in order to identify them across
        // submission iff they didn't change.
        Map<Integer, byte[]> hashes =
                defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
        // Generate legacy version hashes for backwards compatibility
        List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
        for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
            legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
        }
        setChaining(hashes, legacyHashes);
        setPhysicalEdges();
        setSlotSharingAndCoLocation();
        setManagedMemoryFraction(
                Collections.unmodifiableMap(jobVertices),
                Collections.unmodifiableMap(vertexConfigs),
                Collections.unmodifiableMap(chainedConfigs),
                id -> streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(),
                id -> streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases());
        configureCheckpointing();
        jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());
        final Map<String, DistributedCache.DistributedCacheEntry> distributedCacheEntries =
                JobGraphUtils.prepareUserArtifactEntries(
                        streamGraph.getUserArtifacts().stream()
                                .collect(Collectors.toMap(e -> e.f0, e -> e.f1)),
                        jobGraph.getJobID());
        for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
                distributedCacheEntries.entrySet()) {
            jobGraph.addUserArtifact(entry.getKey(), entry.getValue());
        }
        // set the ExecutionConfig last when it has been finalized
        try {
            jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
        } catch (IOException e) {
            throw new IllegalConfigurationException(
                    "Could not serialize the ExecutionConfig."
                            + "This indicates that non-serializable types (like custom serializers) were registered");
        }
        addVertexIndexPrefixInVertexName();
        setVertexDescription();
        return jobGraph;
    }

重點我們介紹以下幾點

生成hash值

對每個streamNode生成一個hash值,用于來標(biāo)識節(jié)點,用于重新提交任務(wù)后涉及恢復(fù)作業(yè)的場景。具體生成hash值的邏輯如下:

  • 如果指定了id信息,如Transformation.getUid(), 就用該值來生成hash值
  • 否則使用鏈上的輸出node和節(jié)點的輸入nodes的hash值來生成一個hash值 對具體的算法細節(jié)感興趣的同學(xué)可以深入研究StreamGraphHasherV2的具體內(nèi)容。

生成chain

如果連接的2個節(jié)點滿足一定的條件,就會把這2個節(jié)點放到一個chain里面,這樣可以避免上下游算子間發(fā)送數(shù)據(jù)的網(wǎng)絡(luò)開銷和序列化反序列化的性能開銷。判斷算子是否可以組成一個chain的判斷邏輯如下:

    public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
        StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
        return downStreamVertex.getInEdges().size() == 1 && isChainableInput(edge, streamGraph);
    }
    private static boolean isChainableInput(StreamEdge edge, StreamGraph streamGraph) {
        StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
        StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
        if (!(upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
                && areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph)
                && arePartitionerAndExchangeModeChainable(
                        edge.getPartitioner(),
                        edge.getExchangeMode(),
                        streamGraph.getExecutionConfig().isDynamicGraph())
                && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
                && streamGraph.isChainingEnabled())) {
            return false;
        }
        // check that we do not have a union operation, because unions currently only work
        // through the network/byte-channel stack.
        // we check that by testing that each "type" (which means input position) is used only once
        for (StreamEdge inEdge : downStreamVertex.getInEdges()) {
            if (inEdge != edge && inEdge.getTypeNumber() == edge.getTypeNumber()) {
                return false;
            }
        }
        return true;
    }

具體解讀如下:

  • 下游節(jié)點只有1個輸入邊
  • 上游節(jié)點和下游節(jié)點是在同一個SlotSharingGroup,slotSharingGroup在沒有設(shè)置的情況下,默認為default;
  • 上下游節(jié)點的算子的chaining策略是支持chain的,上游算子的chaining策略為ALWAYS\HEAD\HEAD_WITH_SOURCES,下游算子的chaining策略為ALWAYS或者(HEAD_WITH_SOURCES且上游算子為source算子,具體這些策略的說明見ChainingStrategy.java
  • 邊的分區(qū)策略是ForwardForConsecutiveHashPartitioner或者分區(qū)策略是ForwardPartitioner且數(shù)據(jù)交換方式(StreamExchangeMode)不是批模式
  • 上下游節(jié)點的并行度一致
  • StreamGraph是允許Chaining的

總結(jié)

本篇介紹了StreamGraph到JobGraph的生成流程,重點是在上下游節(jié)點是需要滿足什么條件才能chain到一起的具體邏輯。

以上就是Flink JobGraph生成源碼解析的詳細內(nèi)容,更多關(guān)于Flink JobGraph生成的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • SpringCloud微服務(wù)續(xù)約實現(xiàn)源碼分析詳解

    SpringCloud微服務(wù)續(xù)約實現(xiàn)源碼分析詳解

    這篇文章主要介紹了SpringCloud微服務(wù)續(xù)約實現(xiàn)源碼分析,服務(wù)續(xù)期和服務(wù)注冊非常相似,服務(wù)注冊在Eureka?Client程序啟動之后開啟,并同時開啟服務(wù)續(xù)期的定時任務(wù)
    2022-11-11
  • Spring?Cloud?Gateway遠程命令執(zhí)行漏洞分析(CVE-2022-22947)

    Spring?Cloud?Gateway遠程命令執(zhí)行漏洞分析(CVE-2022-22947)

    使用Spring Cloud Gateway的應(yīng)用程序在Actuator端點啟用、公開和不安全的情況下容易受到代碼注入的攻擊,攻擊者可以惡意創(chuàng)建允許在遠程主機上執(zhí)行任意遠程執(zhí)行的請求,這篇文章主要介紹了Spring?Cloud?Gateway遠程命令執(zhí)行漏洞(CVE-2022-22947),需要的朋友可以參考下
    2023-03-03
  • JDK17、JDK19、JDK1.8輕松切換(無坑版,小白也可以看懂!)

    JDK17、JDK19、JDK1.8輕松切換(無坑版,小白也可以看懂!)

    在做不同的java項目時候,因項目需要很可能來回切換jdk版本,下面這篇文章主要介紹了JDK17、JDK19、JDK1.8輕松切換的相關(guān)資料,文中通過圖文介紹的非常詳細,需要的朋友可以參考下
    2023-02-02
  • 深入理解JDK8中Stream使用

    深入理解JDK8中Stream使用

    Stream 是 Java8 中處理集合的關(guān)鍵抽象概念,它可以指定你希望對集合進行的操作,可以執(zhí)行非常復(fù)雜的查找、過濾和映射數(shù)據(jù)等操作。這篇文章主要介紹了JDK8中Stream使用解析,需要的朋友可以參考下
    2021-06-06
  • idea 如何查找類中的某個方法

    idea 如何查找類中的某個方法

    這篇文章主要介紹了idea 如何查找類中的某個方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-02-02
  • Springboot如何通過yml配置文件為靜態(tài)成員變量賦值

    Springboot如何通過yml配置文件為靜態(tài)成員變量賦值

    這篇文章主要介紹了Springboot如何通過yml配置文件為靜態(tài)成員變量賦值,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-10-10
  • Effective Java (異常處理)

    Effective Java (異常處理)

    Effective Java (異常處理),需要的朋友可以參考一下
    2013-02-02
  • Spring Boot jpa Service層代碼實例

    Spring Boot jpa Service層代碼實例

    這篇文章主要介紹了Spring Boot jpa Service層代碼實例,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2019-10-10
  • 詳解MyBatis延遲加載是如何實現(xiàn)的

    詳解MyBatis延遲加載是如何實現(xiàn)的

    MyBatis 的延遲加載(懶加載)特性允許在需要使用關(guān)聯(lián)對象數(shù)據(jù)時才進行加載,而不是在執(zhí)行主查詢時就加載所有相關(guān)數(shù)據(jù),我們將通過以下幾個方面來深入了解MyBatis的延遲加載實現(xiàn)機制,需要的朋友可以參考下
    2024-07-07
  • 使用迭代器Iterator遍歷Collection問題

    使用迭代器Iterator遍歷Collection問題

    這篇文章主要介紹了使用迭代器Iterator遍歷Collection問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-11-11

最新評論