SpringBoot集成flink全過程
SpringBoot集成flink
Flink是一個(gè)批處理和流處理結(jié)合的統(tǒng)一計(jì)算框架,其核心是一個(gè)提供了數(shù)據(jù)分發(fā)以及并行化計(jì)算的流數(shù)據(jù)處理引擎。
最大亮點(diǎn)是流處理,最適合的應(yīng)用場景是低時(shí)延的數(shù)據(jù)處理。
場景
高并發(fā)pipeline處理數(shù)據(jù),時(shí)延毫秒級,且兼具可靠性。
環(huán)境搭建
①、安裝flink
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/try-flink/local_installation/
②、安裝Netcat
Netcat(又稱為NC)是一個(gè)計(jì)算機(jī)網(wǎng)絡(luò)工具,它可以在兩臺(tái)計(jì)算機(jī)之間建立 TCP/IP 或 UDP 連接。
用于測試網(wǎng)絡(luò)中的端口,發(fā)送文件等操作。
進(jìn)行網(wǎng)絡(luò)調(diào)試和探測,也可以進(jìn)行加密連接和遠(yuǎn)程管理等高級網(wǎng)絡(luò)操作
yum install -y nc # 安裝nc命令 nc -lk 8888 # 啟動(dòng)socket端口
無界流之讀取socket文本流
一、依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>springboot-demo</artifactId> <groupId>com.et</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>flink</artifactId> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-autoconfigure</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- 添加 Flink 依賴 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>1.17.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.17.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>1.17.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>1.17.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-files</artifactId> <version>1.17.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>1.17.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web</artifactId> <version>1.17.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/spring.handlers</resource> </transformer> <transformer implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer"> <resource>META-INF/spring.factories</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/spring.schemas</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.et.flink.job.SocketJob</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
二、SoketJob
public class SocketJob{ public static void main(String[] args)throws Exception{ // 創(chuàng)建執(zhí)行環(huán)境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 指定并行度,默認(rèn)電腦線程數(shù) env.setParallelism(3); // 讀取數(shù)據(jù)socket文本流 指定監(jiān)聽 IP 端口 只有在接收到數(shù)據(jù)才會(huì)執(zhí)行任務(wù) DataStreamSource<String> socketDS = env.socketTextStream("172.24.4.193", 8888); // 處理數(shù)據(jù): 切換、轉(zhuǎn)換、分組、聚合 得到統(tǒng)計(jì)結(jié)果 SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socketDS .flatMap( (String value, Collector<Tuple2<String, Integer>> out) -> { String[] words = value.split(" "); for (String word : words) { out.collect(Tuple2.of(word, 1)); } } ) .setParallelism(2) // // 顯式地提供類型信息:對于flatMap傳入Lambda表達(dá)式,系統(tǒng)只能推斷出返回的是Tuple2類型,而無法得到Tuple2<String, Long>。只有顯式設(shè)置系統(tǒng)當(dāng)前返回類型,才能正確解析出完整數(shù)據(jù) .returns(new TypeHint<Tuple2<String, Integer>>() { }) // .returns(Types.TUPLE(Types.STRING,Types.INT)) .keyBy(value -> value.f0) .sum(1); // 輸出 sum.print(); // 執(zhí)行 env.execute(); } }
測試:
啟動(dòng)socket流:
nc -l 8888
本地執(zhí)行:直接ideal啟動(dòng)main程序,在socket流中輸入
abc bcd cde bcd cde fgh cde fgh hij
集群執(zhí)行:
執(zhí)行maven打包,將打包的jar上傳到集群中
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Spring Boot Maven Plugin打包異常解決方案
這篇文章主要介紹了Spring Boot Maven Plugin打包異常解決方案,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-11-11SpringBoot整合sharding-jdbc實(shí)現(xiàn)自定義分庫分表的實(shí)踐
本文主要介紹了SpringBoot整合sharding-jdbc實(shí)現(xiàn)自定義分庫分表的實(shí)踐,將通過自定義算法來實(shí)現(xiàn)定制化的分庫分表來擴(kuò)展相應(yīng)業(yè)務(wù),感興趣的可以了解一下2021-11-11Java中String、StringBuffer和StringBuilder的區(qū)別
這篇文章主要介紹了Java中String、StringBuffer和StringBuilder的區(qū)別,StringBuilder與StringBuffer都繼承自AbstractStringBuilder類,在AbstractStringBuilder中也是使用字符數(shù)組保存字符串char[]value但是沒有final關(guān)鍵字修飾,所以這兩個(gè)可變,需要的朋友可以參考下2024-01-01解決swagger主頁訪問,返回報(bào)錯(cuò)500問題
在使用Swagger時(shí)遇到500錯(cuò)誤,通過仔細(xì)的debug發(fā)現(xiàn)問題源于注解使用不當(dāng),具體表現(xiàn)為一個(gè)接口的入?yún)⒈诲e(cuò)誤地注解了三個(gè)參數(shù),而實(shí)際上只有兩個(gè),這導(dǎo)致了Swagger在解析時(shí)拋出了NullPointerException異常,解決方法是刪除錯(cuò)誤的第三個(gè)參數(shù)的注解2024-09-09SpringBoot實(shí)現(xiàn)文件上傳下載功能小結(jié)
最近做的一個(gè)項(xiàng)目涉及到文件上傳與下載功能。SpringBoot后臺(tái)如何實(shí)現(xiàn)文件上傳下載呢?下面有單文件上傳和多文件上傳功能,感興趣的朋友一起看看吧2017-08-08SpringBoot + SpringSecurity 環(huán)境搭建的步驟
這篇文章主要介紹了SpringBoot + SpringSecurity 環(huán)境搭建的步驟,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-05-05maven pom中內(nèi)置變量及引用的實(shí)現(xiàn)
maven其實(shí)有很多內(nèi)置變量供開發(fā)著在開發(fā)中使用,本文主要介紹了maven pom中內(nèi)置變量及引用的實(shí)現(xiàn),具有一定的參考價(jià)值,感興趣的可以了解一下2024-01-01mybatis-plus IdWorker生成的Id和返回給前臺(tái)的不一致的解決
這篇文章主要介紹了mybatis-plus IdWorker生成的Id和返回給前臺(tái)的不一致的解決,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-03-03