" />

亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Flink入門(mén)級(jí)應(yīng)用域名處理示例

 更新時(shí)間:2022年03月21日 17:54:48   作者:andandan  
這篇文章主要介紹了一個(gè)比較簡(jiǎn)單的入門(mén)級(jí)Flink應(yīng)用,代碼很容易寫(xiě),主要用到的算子有FlatMap、KeyBy、Reduce,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

概述

最近做了一個(gè)小任務(wù),要使用Flink處理域名數(shù)據(jù),在4GB的域名文檔中求出每個(gè)域名的頂級(jí)域名,最后輸出每個(gè)頂級(jí)域名下的前10個(gè)子級(jí)域名。一個(gè)比較簡(jiǎn)單的入門(mén)級(jí)Flink應(yīng)用,代碼很容易寫(xiě),主要用到的算子有FlatMap、KeyBy、Reduce。但是由于Maven打包問(wèn)題,總是提示找不到入口類(lèi),卡了好久,最后也是成功解決了。

主體代碼如下:

public class FlinkStreamingTopDomain {
    public static void main(String[] args) throws Exception{
        // 獲取流處理運(yùn)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 獲取kafkaConsumer
        FlinkKafkaConsumer<String> kafkaConsumer = FlinkUtil.getKafkaConsumer("ahl_test1", "console-consumer-72096");
        // 從當(dāng)前消費(fèi)組下標(biāo)開(kāi)始讀取
        kafkaConsumer.setStartFromEarliest();
        DataStreamSource text = env.addSource(kafkaConsumer);

        // 算子
        DataStream<Tuple2<String,String>> windowCount = text.flatMap(new FlatMap())
                .keyBy(0).reduce(new Reduce());
        //把數(shù)據(jù)打印到控制臺(tái)
        windowCount.print()
                .setParallelism(16);//使用16個(gè)并行度
        //注意:因?yàn)閒link是懶加載的,所以必須調(diào)用execute方法,上面的代碼才會(huì)執(zhí)行
        env.execute("streaming topDomain calculate");
    }
}

算子

FlatMap

Flatmap是對(duì)一行字符進(jìn)行處理的,官網(wǎng)上的解釋如下

FlatMap
DataStream → DataStream
Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:
dataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out)
        throws Exception {
        for(String word: value.split(" ")){
            out.collect(word);
        }
    }
});

其實(shí)和Hadoop的Map差不多,都是把一行字符串進(jìn)行處理,得到我們想要的<key,value>,不同之處在于Map處理后得到的是<key,values[]>。即Hadoop的Map操作會(huì)按key自動(dòng)的將value處理成數(shù)組的形式,而Flink的FlatMap算子只會(huì)把每行數(shù)據(jù)處理成key、value。

下面是我處理業(yè)務(wù)的FlatMap代碼

    // FlatMap分割域名,并輸出二元組<頂級(jí)域名,域名>
    public static class FlatMap implements FlatMapFunction<String, Tuple2<String,String>> {
        @Override
        public void flatMap(String s, Collector<Tuple2<String, String>> out) throws Exception {
            String[] values = s.split("\\^");   // 按字符^分割
            if(values.length - 1 < 2) {
                return;
            }
            String domain = values[2];
            out.collect(new Tuple2<String,String>(ToolUtil.getTopDomain(domain),domain));
        }
    }

我這里把數(shù)據(jù)處理成了二元組形式,之后reduce也是對(duì)這個(gè)二元組進(jìn)行處理。

KeyBy

先來(lái)看看官網(wǎng)的解釋

KeyBy
DataStream → KeyedStream
    
Logically partitions a stream into disjoint partitions. All records with the same key are assigned to the same partition. Internally, keyBy() is implemented with hash partitioning. There are different ways to specify keys.

This transformation returns a KeyedStream, which is, among other things, required to use keyed state.

dataStream.keyBy(value -&gt; value.getSomeKey()) // Key by field "someKey"
dataStream.keyBy(value -&gt; value.f0) // Key by the first element of a Tuple

Attention:A type cannot be a key if:
    1.it is a POJO type but does not override the hashCode() method and relies on the Object.hashCode() implementation.
    2.it is an array of any type.   

keyBy會(huì)按照一個(gè)keySelector定義的方式進(jìn)行哈希分區(qū),會(huì)將一個(gè)流分成多個(gè)Partition,相同key的會(huì)被分在同一個(gè)分區(qū),經(jīng)過(guò)keyBy的流變成KeyedStream。

需要注意的有兩點(diǎn):

1.pojo類(lèi)型作為key,必須重寫(xiě)hashcode()方法

2.數(shù)組類(lèi)型不能作為key

Reduce

官網(wǎng)的解釋如下

Reduce
KeyedStream → DataStream
A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.
A reduce function that creates a stream of partial sums:
keyedStream.reduce(new ReduceFunction<Integer>() {
    @Override
    public Integer reduce(Integer value1, Integer value2)
    throws Exception {
        return value1 + value2;
    }
});

reduce是進(jìn)行”滾動(dòng)“處理的,即reduce方法的第一個(gè)參數(shù)是當(dāng)前已經(jīng)得到的結(jié)果記為currentResult,第二個(gè)參數(shù)是當(dāng)前要處理的<key,value>。流式計(jì)算會(huì)一條一條的處理數(shù)據(jù),每處理完一條數(shù)據(jù)就得到新的currentResult。

業(yè)務(wù)處理代碼如下

    // 拼接同一分區(qū)下的ip
    public static class Reduce implements ReduceFunction<Tuple2<String,String>>{
        @Override
        public Tuple2<String,String> reduce(Tuple2 t1, Tuple2 t2) throws Exception {
            String[] domains = t1.f1.toString().split("\\^");
            if(domains.length == 10){
                return t1;
            }
            t1.f1 = t1.f1.toString() + "^" + t2.f1.toString();
            System.out.println(t1.f1 );
            return t1;
        }
   }

連接socket測(cè)試

1.將主體代碼里的kafka獲取數(shù)據(jù),改成socket獲取數(shù)據(jù)

//        int port;
//        try {
//            ParameterTool parameterTool = ParameterTool.fromArgs(args);
//            port = parameterTool.getInt("port");
//        } catch (Exception e){
//            System.out.println("沒(méi)有指定port參數(shù),使用默認(rèn)值1112");
//            port = 1112;
//        }

        // 連接socket獲取輸入數(shù)據(jù)
//        DataStreamSource<String> text = env.socketTextStream("192.168.3.221",port);

2.在服務(wù)器開(kāi)啟一個(gè)端口號(hào):nc -l -p 1112

3.運(yùn)行代碼

4.服務(wù)器輸入測(cè)試數(shù)據(jù)就可以實(shí)時(shí)的獲取處理結(jié)果

連接kafka

正式

使用kafka命令創(chuàng)建主題

kafka-topics.sh --create --zookeeper IP1:2181 IP2:2181... --replication-factor 2 --partitions 16 --topic ahl_test

kafka建立topic需要先開(kāi)啟zookeeper

運(yùn)行生產(chǎn)者jar包,用生產(chǎn)者讀取數(shù)據(jù)

java -jar $jar包路徑  $topic $path

測(cè)試

另外,還可以使用測(cè)試生產(chǎn)者實(shí)現(xiàn)和socket測(cè)試相同的效果

/kafka-console-producer.sh --broker-list slave3:9092 --topic ahl_test1

打包上傳服務(wù)器

打包上傳服務(wù)器注意不要使用idea提供的build方式,反正我使用build會(huì)一直報(bào)錯(cuò)找不到主類(lèi),即便我反編譯jar包發(fā)現(xiàn)主類(lèi)在里面,并且MF文件也有配置主類(lèi)信息。這個(gè)問(wèn)題卡了我很久,最后我使用mvn pakage的方式打包并運(yùn)行成功,把我的打包插件貼出來(lái)幫助遇到和我相同問(wèn)題的人

<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-shade-plugin</artifactId>
				<version>3.0.0</version>
				<executions>
					<execution>
						<phase>package</phase>
						<goals>
							<goal>shade</goal>
						</goals>
						<configuration>
							<!--							<createDependencyReducedPom>false</createDependencyReducedPom>-->
							<artifactSet>
								<excludes>
									<exclude>com.google.code.findbugs:jsr305</exclude>
									<exclude>org.slf4j:*</exclude>
									<exclude>log4j:*</exclude>
								</excludes>
							</artifactSet>
							<filters>
								<filter>
									<!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
									<artifact>*:*</artifact>
									<excludes>
										<exclude>META-INF/*.SF</exclude>
										<exclude>META-INF/*.DSA</exclude>
										<exclude>META-INF/*.RSA</exclude>
									</excludes>
								</filter>
							</filters>
							<transformers>
								<transformer
										implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
									<mainClass>com.ncs.flink.streaming.FlinkStreamingTopDomain</mainClass>
								</transformer>
							</transformers>
						</configuration>
					</execution>
				</executions>
			</plugin>
		</plugins>

Flink運(yùn)行指令為:

/home/soft/flink-1.12.0//bin/flink run -c com.ncs.flink.streaming.FlinkStreamingDomainJob /home/ahl/flink/situation-mapred-flink-0.0.1-SNAPSHOT.jar

或者可以訪(fǎng)問(wèn)Flink集群的8081端口,在提供的UI頁(yè)面上傳運(yùn)行

以上就是Flink入門(mén)級(jí)應(yīng)用域名處理示例的詳細(xì)內(nèi)容,更多關(guān)于Flink域名處理的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Java的枚舉,注解和反射(一)

    Java的枚舉,注解和反射(一)

    今天小編就為大家分享一篇關(guān)于Java枚舉,注解與反射原理說(shuō)明,小編覺(jué)得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧
    2021-07-07
  • 淺談@RequestBody和@RequestParam可以同時(shí)使用

    淺談@RequestBody和@RequestParam可以同時(shí)使用

    這篇文章主要介紹了@RequestBody和@RequestParam可以同時(shí)使用,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-03-03
  • Java實(shí)現(xiàn)調(diào)用MySQL存儲(chǔ)過(guò)程詳解

    Java實(shí)現(xiàn)調(diào)用MySQL存儲(chǔ)過(guò)程詳解

    相信大家都知道存儲(chǔ)過(guò)程是在大型數(shù)據(jù)庫(kù)系統(tǒng)中,一組為了完成特定功能的SQL語(yǔ)句集。存儲(chǔ)過(guò)程是數(shù)據(jù)庫(kù)中的一個(gè)重要對(duì)象,任何一個(gè)設(shè)計(jì)良好的數(shù)據(jù)庫(kù)應(yīng)用程序都應(yīng)該用到存儲(chǔ)過(guò)程。Java調(diào)用mysql存儲(chǔ)過(guò)程,實(shí)現(xiàn)如下,有需要的朋友們可以參考借鑒,下面來(lái)一起看看吧。
    2016-11-11
  • 淺析java異常棧

    淺析java異常棧

    給大家通過(guò)一個(gè)簡(jiǎn)單的代碼實(shí)例給大家分型了java異常棧問(wèn)題,需要的朋友參考一下吧。
    2017-12-12
  • java開(kāi)放地址法和鏈地址法解決hash沖突的方法示例

    java開(kāi)放地址法和鏈地址法解決hash沖突的方法示例

    這篇文章主要介紹了java開(kāi)放地址法和鏈地址法解決hash沖突的方法示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2019-12-12
  • Java中判斷字符串是否相等的實(shí)現(xiàn)

    Java中判斷字符串是否相等的實(shí)現(xiàn)

    這篇文章主要介紹了Java中判斷字符串是否相等的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2021-01-01
  • MyBatis?Plus如何實(shí)現(xiàn)獲取自動(dòng)生成主鍵值

    MyBatis?Plus如何實(shí)現(xiàn)獲取自動(dòng)生成主鍵值

    這篇文章主要介紹了MyBatis?Plus如何實(shí)現(xiàn)獲取自動(dòng)生成主鍵值問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-09-09
  • Java實(shí)現(xiàn)簡(jiǎn)單聊天機(jī)器人

    Java實(shí)現(xiàn)簡(jiǎn)單聊天機(jī)器人

    這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)簡(jiǎn)單聊天機(jī)器人,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2021-07-07
  • Mybatis中updateBatch實(shí)現(xiàn)批量更新

    Mybatis中updateBatch實(shí)現(xiàn)批量更新

    本文主要介紹了Mybatis中updateBatch實(shí)現(xiàn)批量更新,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2022-03-03
  • MyBatisPlus中@TableField注解的基本使用

    MyBatisPlus中@TableField注解的基本使用

    這篇文章主要介紹了MyBatisPlus中@TableField注解的基本使用,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-07-07

最新評(píng)論