亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

windows環(huán)境下flink入門實踐操作示例

KlBlog   發(fā)布時間:2022-02-28 16:18:38   作者:kl   我要評論
這篇文章主要為大家介紹了windows環(huán)境下flink的入門實踐操作示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

前言碎語

為了應(yīng)對凱京科技集團的飛速發(fā)展,凱京科技研發(fā)中心2019定下了數(shù)據(jù)中臺的目標。數(shù)據(jù)處理我們選擇了批處理+流處理結(jié)合的大數(shù)據(jù)應(yīng)用軟件新秀Apache Flink,前幾天阿里又發(fā)出好信息稱將開源Blink(Flink早期分支遷出迭代優(yōu)化),所以今天來近距離認識下Flink。博主之前沒接觸過大數(shù)據(jù)相關(guān)的東西,所以不細究其設(shè)計概念了。目標就是跑一個最簡單的流處理的例子,后面慢慢深入后在和大家分享具體的組件概念以及api設(shè)計。

Apache Flink是什么?

Apache Flink 是一個分布式大數(shù)據(jù)處理引擎,可對有限數(shù)據(jù)流和無限數(shù)據(jù)流進行有狀態(tài)計算。可部署在各種集群環(huán)境,對各種大小的數(shù)據(jù)規(guī)模進行快速計算。上面是非常官方的描述,說白了我們?yōu)槭裁催x擇Flink,是因為他在社區(qū)口碑非常不錯。在國內(nèi)的話有阿里這種大數(shù)據(jù)大流量的公司一直在輸出,當(dāng)然像騰訊、華為、餓了么、滴滴等也都有使用Apache Flink。

進入正題

本篇博文涉及到的軟件工具以及下載地址:

Apache Flink :https://flink.apache.org/downloads.html

Netcat:https://eternallybored.org/misc/netcat/

Netcat是一個有“瑞士軍刀”美譽的網(wǎng)絡(luò)工具,這里用來綁定端口等待Apache Flink的連接

第一步:啟動FLINK

從上面的地址下載Flink后是一個壓縮包,解壓后的目錄結(jié)構(gòu)如下:

/conf/flink-conf.yaml里有一些Flink的基本配置信息,如,jobmanager、taskmanager的端口和jvm內(nèi)存(默認1024M)大小,web控制臺的端口(默認8081)等。我們可以不該任何配置,然后進入到bin下,執(zhí)行start-cluster.bat。這里要注意不是并不是flink.bat。flink.bat是用來提交job的。還有要確保相關(guān)的端口沒有被占用

運行成功后會有兩個java黑窗口(一個TaskManager、一個JobManager),如果只有一個java黑窗口,很可能是你的TaskManager因為端口占用沒有啟動起來,成功后訪問:http://localhost:8081.就會看到如下的web管理控制臺了:

如果啟動失敗的話,上面箭頭所指向的地方應(yīng)該是0.

第二步:JOB任務(wù)編寫

1.首先需要新建一個maven工程,然后導(dǎo)入Flink的接口依賴

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.7.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>1.7.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>1.7.1</version>
</dependency>

2.編寫具體的job,官方提供了一個單詞統(tǒng)計的demo

package com.kl;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class SocketWindowWordCount {
   public static void main(String[] args) throws Exception {
      // the host and the port to connect to
      final String hostname;
      final int port;
      try {
         final ParameterTool params = ParameterTool.fromArgs(args);
         hostname = params.has("hostname") ? params.get("hostname") : "localhost";
         port = params.has("port") ? params.getInt("port"):9000;
      } catch (Exception e) {
         System.err.println("No port specified. Please run 'SocketWindowWordCount " +
            "--hostname--port', where hostname (localhost by default) " +
            "and port is the address of the text server");
         System.err.println("To start a simple text server, run 'netcat -l' and " +
            "type the input text into the command line");
         return;
      }
      // get the execution environment
      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      // get input data by connecting to the socket
      DataStreamtext = env.socketTextStream(hostname, port, "\n");
      // parse the data, group it, window it, and aggregate the counts
      DataStreamwindowCounts = text
            .flatMap(new FlatMapFunction() {
               public void flatMap(String value, Collectorout) {
                  for (String word : value.split("\\s")) {
                     out.collect(new WordWithCount(word, 1L));
                  } }})
            .keyBy("word")
            .timeWindow(Time.seconds(5))
            .reduce(new ReduceFunction() {
               public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                  return new WordWithCount(a.word, a.count + b.count);
               }});
      // print the results with a single thread, rather than in parallel
      windowCounts.print().setParallelism(1);
      env.execute("Socket Window WordCount");
   }
   /**
    * Data type for words with count.
    */
   public static class WordWithCount {
      public String word;
      public long count;
      public WordWithCount() {}
      public WordWithCount(String word, long count) {
         this.word = word;
         this.count = count;
      }
      @Override
      public String toString() {
         return word + " : " + count;
      }
   }
}

上面demo實現(xiàn)了從啟動參數(shù)中獲取ip和端口,然后連接從輸入流接收文本信息,然后統(tǒng)計文本里單詞出現(xiàn)的次數(shù)。因為要打成可運行的jar,所以,還需要引入maven的jar打包插件,如下:

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>1.2.1</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>com.kl.SocketWindowWordCount</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

mainClass標簽中就是你的main方法所在類全類名。然后mvn install就可以打出一個可運行的jar包了。

第三步:NETCAT監(jiān)聽端口,等待連接

從上面貼的地址下載Netcat后,是一個壓縮包,有些安全軟件可能會報病毒,請忽略就好了。然后解壓文件目錄如下:

進入到這個目錄,然后執(zhí)行: nc64.exe -l -p 9000。相當(dāng)于打開了9000端口,并監(jiān)聽了入站信息。最后實現(xiàn)的效果就是從這個窗口中輸入的數(shù)據(jù),回車后會發(fā)送Apache Flink中我們提交的job中處理輸出,所以這里的9000端口,要和我們等下啟動job的啟動參數(shù)端口一致。

第四步:提交JOB運行

運行job有兩種方式:可以通過Flink.bat運行,也可以通過web控制臺運行。

命令行運行:

FLINK RUN E:\FLINKWORKINGSPCE\FLINKDEMO\TARGET\FINLK-DEMO-1.0-SNAPSHOT.JAR --PORT 9000

WEB控制臺運行:

如上圖,點擊Add New后選擇你的jar包然后上傳,上傳成功就會在列表里列出來。然后選中你上傳的jar。就會出現(xiàn)如下圖的輸入框,可以輸入你的啟動參數(shù),然后點擊submit提交就可以了

第五步:驗證效果

提交后如果沒有問題,job的詳情頁面如下:

這個時候我們從Netcat的監(jiān)聽的黑窗口中敲入一些長文本,就會在Flink的job里統(tǒng)計輸出出來如:

文末結(jié)語

Flink的Windows環(huán)境入門實例還算順利,這只是第一步,后面Apache Flink的生產(chǎn)落地肯定還會有更多的問題和挑戰(zhàn)。我們會把落地過程中的問題拿到osc來和大家一起交流,歡迎大家關(guān)注凱京科技。

以上就是windows環(huán)境下flink入門實踐操作示例的詳細內(nèi)容,更多關(guān)于windows環(huán)境flink入門實踐的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評論