Flink實(shí)現(xiàn)特定統(tǒng)計(jì)的歸約聚合reduce操作
如果說簡(jiǎn)單聚合是對(duì)一些特定統(tǒng)計(jì)需求的實(shí)現(xiàn),那么 reduce 算子就是一個(gè)一般化的聚合統(tǒng)計(jì)操作了。從大名鼎鼎的 MapReduce 開始,我們對(duì) reduce 操作就不陌生:它可以對(duì)已有的
數(shù)據(jù)進(jìn)行歸約處理,把每一個(gè)新輸入的數(shù)據(jù)和當(dāng)前已經(jīng)歸約出來的值,再做一個(gè)聚合計(jì)算。與簡(jiǎn)單聚合類似,reduce 操作也會(huì)將 KeyedStream 轉(zhuǎn)換為 DataStream。它不會(huì)改變流的元
素?cái)?shù)據(jù)類型,所以輸出類型和輸入類型是一樣的。調(diào)用 KeyedStream 的 reduce 方法時(shí),需要傳入一個(gè)參數(shù),實(shí)現(xiàn) ReduceFunction 接口。接口在源碼中的定義如下:
@Public @FunctionalInterface public interface ReduceFunction<T> extends Function, Serializable { /** * The core method of ReduceFunction, combining two values into one value of the same type. The * reduce function is consecutively applied to all values of a group until only a single value * remains. * * @param value1 The first value to combine. * @param value2 The second value to combine. * @return The combined value of both input values. * @throws Exception This method may throw exceptions. Throwing an exception will cause the * operation to fail and may trigger recovery. */ T reduce(T value1, T value2) throws Exception; }
ReduceFunction 接口里需要實(shí)現(xiàn) reduce()方法,這個(gè)方法接收兩個(gè)輸入事件,經(jīng)過轉(zhuǎn)換處理之后輸出一個(gè)相同類型的事件;所以,對(duì)于一組數(shù)據(jù),我們可以先取兩個(gè)進(jìn)行合并,然后再
將合并的結(jié)果看作一個(gè)數(shù)據(jù)、再跟后面的數(shù)據(jù)合并,最終會(huì)將它“簡(jiǎn)化”成唯一的一個(gè)數(shù)據(jù),這也就是 reduce“歸約”的含義。在流處理的底層實(shí)現(xiàn)過程中,實(shí)際上是將中間“合并的結(jié)果”
作為任務(wù)的一個(gè)狀態(tài)保存起來的;之后每來一個(gè)新的數(shù)據(jù),就和之前的聚合狀態(tài)進(jìn)一步做歸約。
其實(shí),reduce 的語義是針對(duì)列表進(jìn)行規(guī)約操作,運(yùn)算規(guī)則由 ReduceFunction 中的 reduce方法來定義,而在 ReduceFunction 內(nèi)部會(huì)維護(hù)一個(gè)初始值為空的累加器,注意累加器的類型
和輸入元素的類型相同,當(dāng)?shù)谝粭l元素到來時(shí),累加器的值更新為第一條元素的值,當(dāng)新的元素到來時(shí),新元素會(huì)和累加器進(jìn)行累加操作,這里的累加操作就是 reduce 函數(shù)定義的運(yùn)算規(guī)
則。然后將更新以后的累加器的值向下游輸出。
我們可以單獨(dú)定義一個(gè)函數(shù)類實(shí)現(xiàn) ReduceFunction 接口,也可以直接傳入一個(gè)匿名類。當(dāng)然,同樣也可以通過傳入 Lambda 表達(dá)式實(shí)現(xiàn)類似的功能。與簡(jiǎn)單聚合類似,reduce 操作也會(huì)將 KeyedStream 轉(zhuǎn)換為 DataStrema。它不會(huì)改變流的元素?cái)?shù)據(jù)類型,所以輸出類型和輸入類型是一樣的。下面我們來看一個(gè)稍復(fù)雜的例子。
我們將數(shù)據(jù)流按照用戶 id 進(jìn)行分區(qū),然后用一個(gè) reduce 算子實(shí)現(xiàn) sum 的功能,統(tǒng)計(jì)每個(gè)用戶訪問的頻次;進(jìn)而將所有統(tǒng)計(jì)結(jié)果分到一組,用另一個(gè) reduce 算子實(shí)現(xiàn) maxBy 的功能,記錄所有用戶中訪問頻次最高的那個(gè),也就是當(dāng)前訪問量最大的用戶是誰。
package com.rosh.flink.test; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.ArrayList; import java.util.List; import java.util.Random; /** * 我們將數(shù)據(jù)流按照用戶 id 進(jìn)行分區(qū),然后用一個(gè) reduce 算子實(shí)現(xiàn) sum 的功能,統(tǒng)計(jì)每個(gè) * 用戶訪問的頻次;進(jìn)而將所有統(tǒng)計(jì)結(jié)果分到一組,用另一個(gè) reduce 算子實(shí)現(xiàn) maxBy 的功能, * 記錄所有用戶中訪問頻次最高的那個(gè),也就是當(dāng)前訪問量最大的用戶是誰。 */ public class TransReduceTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //隨機(jī)生成數(shù)據(jù) Random random = new Random(); List<Integer> userIds = new ArrayList<>(); for (int i = 1; i <= 10; i++) { userIds.add(random.nextInt(5)); } DataStreamSource<Integer> userIdDS = env.fromCollection(userIds); //每個(gè)ID訪問記錄一次 SingleOutputStreamOperator<Tuple2<Integer, Long>> mapDS = userIdDS.map(new MapFunction<Integer, Tuple2<Integer, Long>>() { @Override public Tuple2<Integer, Long> map(Integer value) throws Exception { return new Tuple2<>(value, 1L); } }); //統(tǒng)計(jì)每個(gè)user訪問多少次 SingleOutputStreamOperator<Tuple2<Integer, Long>> sumDS = mapDS.keyBy(tuple -> tuple.f0).reduce(new ReduceFunction<Tuple2<Integer, Long>>() { @Override public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception { return new Tuple2<>(value1.f0, value1.f1 + value2.f1); } }); sumDS.print("sumDS ->>>>>>>>>>>>>"); //把所有分區(qū)合并,求出最大的訪問量 SingleOutputStreamOperator<Tuple2<Integer, Long>> maxDS = sumDS.keyBy(key -> true).reduce(new ReduceFunction<Tuple2<Integer, Long>>() { @Override public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception { if (value1.f1 > value2.f1) { return value1; } else { return value2; } } }); maxDS.print("maxDS ->>>>>>>>>>>"); env.execute("TransReduceTest"); } }
到此這篇關(guān)于Flink實(shí)現(xiàn)特定統(tǒng)計(jì)的歸約聚合reduce操作的文章就介紹到這了,更多相關(guān)Flink歸約聚合內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java Disruptor構(gòu)建高性能內(nèi)存隊(duì)列使用詳解
這篇文章主要為大家介紹了java Disruptor構(gòu)建高性能內(nèi)存隊(duì)列使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-12-12java實(shí)現(xiàn)網(wǎng)站微信掃碼支付
這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)網(wǎng)站微信掃碼支付,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-07-07SpringBoot中使用@ControllerAdvice注解詳解
這篇文章主要介紹了SpringBoot中使用@ControllerAdvice注解詳解,@ControllerAdvice,是Spring3.2提供的新注解,它是一個(gè)Controller增強(qiáng)器,可對(duì)controller中被 @RequestMapping注解的方法加一些邏輯處理,需要的朋友可以參考下2023-10-10JAVA實(shí)現(xiàn)的CrazyArcade泡泡堂游戲
CrazyArcade泡泡堂游戲,一款用Java編寫的JavaSwing游戲程序。 使用了MVC模式,分離了模型、視圖和控制器,使得項(xiàng)目結(jié)構(gòu)清晰易于擴(kuò)展,使用配置文件來設(shè)置游戲基本配置,擴(kuò)展地圖人物道具等。同時(shí),該程序編寫期間用了單例模式、工廠模式、模板模式等設(shè)計(jì)模式。2021-04-04Java前后端的JSON傳輸方式(前后端JSON格式轉(zhuǎn)換)
這篇文章主要介紹了Java前后端的JSON傳輸方式(前后端JSON格式轉(zhuǎn)換),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-04-04springboot連接多個(gè)數(shù)據(jù)庫的實(shí)現(xiàn)方法
有時(shí)候一個(gè)SpringBoot項(xiàng)目需要同時(shí)連接兩個(gè)數(shù)據(jù)庫,本文就來介紹一下springboot連接多個(gè)數(shù)據(jù)庫的實(shí)現(xiàn)方法,具有一定的參考價(jià)值,感興趣的可以了解一下2024-08-08SpringMVC+Mybatis二維碼實(shí)現(xiàn)多平臺(tái)付款(附源碼)
本文主要實(shí)現(xiàn)微信支付寶等支付平臺(tái)合多為一的二維碼支付,并且實(shí)現(xiàn)有效時(shí)間內(nèi)支付有效,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-08-08