亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Flink實(shí)現(xiàn)特定統(tǒng)計(jì)的歸約聚合reduce操作

 更新時(shí)間:2023年02月08日 11:55:25   作者:響徹天堂丶  
這篇文章主要介紹了Flink實(shí)現(xiàn)特定統(tǒng)計(jì)的歸約聚合reduce操作,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧

如果說簡(jiǎn)單聚合是對(duì)一些特定統(tǒng)計(jì)需求的實(shí)現(xiàn),那么 reduce 算子就是一個(gè)一般化的聚合統(tǒng)計(jì)操作了。從大名鼎鼎的 MapReduce 開始,我們對(duì) reduce 操作就不陌生:它可以對(duì)已有的

數(shù)據(jù)進(jìn)行歸約處理,把每一個(gè)新輸入的數(shù)據(jù)和當(dāng)前已經(jīng)歸約出來的值,再做一個(gè)聚合計(jì)算。與簡(jiǎn)單聚合類似,reduce 操作也會(huì)將 KeyedStream 轉(zhuǎn)換為 DataStream。它不會(huì)改變流的元

素?cái)?shù)據(jù)類型,所以輸出類型和輸入類型是一樣的。調(diào)用 KeyedStream 的 reduce 方法時(shí),需要傳入一個(gè)參數(shù),實(shí)現(xiàn) ReduceFunction 接口。接口在源碼中的定義如下:

@Public
@FunctionalInterface
public interface ReduceFunction<T> extends Function, Serializable {
    /**
     * The core method of ReduceFunction, combining two values into one value of the same type. The
     * reduce function is consecutively applied to all values of a group until only a single value
     * remains.
     *
     * @param value1 The first value to combine.
     * @param value2 The second value to combine.
     * @return The combined value of both input values.
     * @throws Exception This method may throw exceptions. Throwing an exception will cause the
     *     operation to fail and may trigger recovery.
     */
    T reduce(T value1, T value2) throws Exception;
}

ReduceFunction 接口里需要實(shí)現(xiàn) reduce()方法,這個(gè)方法接收兩個(gè)輸入事件,經(jīng)過轉(zhuǎn)換處理之后輸出一個(gè)相同類型的事件;所以,對(duì)于一組數(shù)據(jù),我們可以先取兩個(gè)進(jìn)行合并,然后再

將合并的結(jié)果看作一個(gè)數(shù)據(jù)、再跟后面的數(shù)據(jù)合并,最終會(huì)將它“簡(jiǎn)化”成唯一的一個(gè)數(shù)據(jù),這也就是 reduce“歸約”的含義。在流處理的底層實(shí)現(xiàn)過程中,實(shí)際上是將中間“合并的結(jié)果”

作為任務(wù)的一個(gè)狀態(tài)保存起來的;之后每來一個(gè)新的數(shù)據(jù),就和之前的聚合狀態(tài)進(jìn)一步做歸約。

其實(shí),reduce 的語義是針對(duì)列表進(jìn)行規(guī)約操作,運(yùn)算規(guī)則由 ReduceFunction 中的 reduce方法來定義,而在 ReduceFunction 內(nèi)部會(huì)維護(hù)一個(gè)初始值為空的累加器,注意累加器的類型

和輸入元素的類型相同,當(dāng)?shù)谝粭l元素到來時(shí),累加器的值更新為第一條元素的值,當(dāng)新的元素到來時(shí),新元素會(huì)和累加器進(jìn)行累加操作,這里的累加操作就是 reduce 函數(shù)定義的運(yùn)算規(guī)

則。然后將更新以后的累加器的值向下游輸出。

我們可以單獨(dú)定義一個(gè)函數(shù)類實(shí)現(xiàn) ReduceFunction 接口,也可以直接傳入一個(gè)匿名類。當(dāng)然,同樣也可以通過傳入 Lambda 表達(dá)式實(shí)現(xiàn)類似的功能。與簡(jiǎn)單聚合類似,reduce 操作也會(huì)將 KeyedStream 轉(zhuǎn)換為 DataStrema。它不會(huì)改變流的元素?cái)?shù)據(jù)類型,所以輸出類型和輸入類型是一樣的。下面我們來看一個(gè)稍復(fù)雜的例子。

我們將數(shù)據(jù)流按照用戶 id 進(jìn)行分區(qū),然后用一個(gè) reduce 算子實(shí)現(xiàn) sum 的功能,統(tǒng)計(jì)每個(gè)用戶訪問的頻次;進(jìn)而將所有統(tǒng)計(jì)結(jié)果分到一組,用另一個(gè) reduce 算子實(shí)現(xiàn) maxBy 的功能,記錄所有用戶中訪問頻次最高的那個(gè),也就是當(dāng)前訪問量最大的用戶是誰。

package com.rosh.flink.test;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
/**
 * 我們將數(shù)據(jù)流按照用戶 id 進(jìn)行分區(qū),然后用一個(gè) reduce 算子實(shí)現(xiàn) sum 的功能,統(tǒng)計(jì)每個(gè)
 * 用戶訪問的頻次;進(jìn)而將所有統(tǒng)計(jì)結(jié)果分到一組,用另一個(gè) reduce 算子實(shí)現(xiàn) maxBy 的功能,
 * 記錄所有用戶中訪問頻次最高的那個(gè),也就是當(dāng)前訪問量最大的用戶是誰。
 */
public class TransReduceTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //隨機(jī)生成數(shù)據(jù)
        Random random = new Random();
        List<Integer> userIds = new ArrayList<>();
        for (int i = 1; i <= 10; i++) {
            userIds.add(random.nextInt(5));
        }
        DataStreamSource<Integer> userIdDS = env.fromCollection(userIds);
        //每個(gè)ID訪問記錄一次
        SingleOutputStreamOperator<Tuple2<Integer, Long>> mapDS = userIdDS.map(new MapFunction<Integer, Tuple2<Integer, Long>>() {
            @Override
            public Tuple2<Integer, Long> map(Integer value) throws Exception {
                return new Tuple2<>(value, 1L);
            }
        });
        //統(tǒng)計(jì)每個(gè)user訪問多少次
        SingleOutputStreamOperator<Tuple2<Integer, Long>> sumDS = mapDS.keyBy(tuple -> tuple.f0).reduce(new ReduceFunction<Tuple2<Integer, Long>>() {
            @Override
            public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception {
                return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
            }
        });
        sumDS.print("sumDS  ->>>>>>>>>>>>>");
        //把所有分區(qū)合并,求出最大的訪問量
        SingleOutputStreamOperator<Tuple2<Integer, Long>> maxDS = sumDS.keyBy(key -> true).reduce(new ReduceFunction<Tuple2<Integer, Long>>() {
            @Override
            public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception {
                if (value1.f1 > value2.f1) {
                    return value1;
                } else {
                    return value2;
                }
            }
        });
        maxDS.print("maxDS ->>>>>>>>>>>");
        env.execute("TransReduceTest");
    }
}

到此這篇關(guān)于Flink實(shí)現(xiàn)特定統(tǒng)計(jì)的歸約聚合reduce操作的文章就介紹到這了,更多相關(guān)Flink歸約聚合內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • java Disruptor構(gòu)建高性能內(nèi)存隊(duì)列使用詳解

    java Disruptor構(gòu)建高性能內(nèi)存隊(duì)列使用詳解

    這篇文章主要為大家介紹了java Disruptor構(gòu)建高性能內(nèi)存隊(duì)列使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-12-12
  • java實(shí)現(xiàn)網(wǎng)站微信掃碼支付

    java實(shí)現(xiàn)網(wǎng)站微信掃碼支付

    這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)網(wǎng)站微信掃碼支付,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2018-07-07
  • SpringBoot中使用@ControllerAdvice注解詳解

    SpringBoot中使用@ControllerAdvice注解詳解

    這篇文章主要介紹了SpringBoot中使用@ControllerAdvice注解詳解,@ControllerAdvice,是Spring3.2提供的新注解,它是一個(gè)Controller增強(qiáng)器,可對(duì)controller中被 @RequestMapping注解的方法加一些邏輯處理,需要的朋友可以參考下
    2023-10-10
  • JAVA實(shí)現(xiàn)的CrazyArcade泡泡堂游戲

    JAVA實(shí)現(xiàn)的CrazyArcade泡泡堂游戲

    CrazyArcade泡泡堂游戲,一款用Java編寫的JavaSwing游戲程序。 使用了MVC模式,分離了模型、視圖和控制器,使得項(xiàng)目結(jié)構(gòu)清晰易于擴(kuò)展,使用配置文件來設(shè)置游戲基本配置,擴(kuò)展地圖人物道具等。同時(shí),該程序編寫期間用了單例模式、工廠模式、模板模式等設(shè)計(jì)模式。
    2021-04-04
  • 手把手教你如何獲取微信用戶openid

    手把手教你如何獲取微信用戶openid

    眾所周知小程序的openid相當(dāng)重要,它是用戶的唯一標(biāo)識(shí)id,牽扯的支付,登錄,授權(quán)等,下面這篇文章主要給大家介紹了關(guān)于如何獲取微信用戶openid的相關(guān)資料,需要的朋友可以參考下
    2023-02-02
  • Java的Volatile實(shí)例用法及講解

    Java的Volatile實(shí)例用法及講解

    在本篇文章里小編給大家整理了關(guān)于Java的Volatile知識(shí)點(diǎn)相關(guān)內(nèi)容,有需要的朋友們可以跟著學(xué)習(xí)下。
    2019-09-09
  • Java前后端的JSON傳輸方式(前后端JSON格式轉(zhuǎn)換)

    Java前后端的JSON傳輸方式(前后端JSON格式轉(zhuǎn)換)

    這篇文章主要介紹了Java前后端的JSON傳輸方式(前后端JSON格式轉(zhuǎn)換),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-04-04
  • springboot連接多個(gè)數(shù)據(jù)庫的實(shí)現(xiàn)方法

    springboot連接多個(gè)數(shù)據(jù)庫的實(shí)現(xiàn)方法

    有時(shí)候一個(gè)SpringBoot項(xiàng)目需要同時(shí)連接兩個(gè)數(shù)據(jù)庫,本文就來介紹一下springboot連接多個(gè)數(shù)據(jù)庫的實(shí)現(xiàn)方法,具有一定的參考價(jià)值,感興趣的可以了解一下
    2024-08-08
  • SpringMVC+Mybatis二維碼實(shí)現(xiàn)多平臺(tái)付款(附源碼)

    SpringMVC+Mybatis二維碼實(shí)現(xiàn)多平臺(tái)付款(附源碼)

    本文主要實(shí)現(xiàn)微信支付寶等支付平臺(tái)合多為一的二維碼支付,并且實(shí)現(xiàn)有效時(shí)間內(nèi)支付有效,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2021-08-08
  • Pulsar源碼徹底解決重復(fù)消費(fèi)問題

    Pulsar源碼徹底解決重復(fù)消費(fèi)問題

    這篇文章主要為大家介紹了Pulsar源碼徹底解決重復(fù)消費(fèi)問題,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-05-05

最新評(píng)論