Java Flink與kafka實現(xiàn)實時告警功能過程
引出問題
項目使用告警系統(tǒng)的邏輯是將實時數(shù)據(jù)保存到本地數(shù)據(jù)庫再使用定時任務(wù)做判斷,然后產(chǎn)生告警數(shù)據(jù)。這種方式存在告警的延時實在是太高了。數(shù)據(jù)從產(chǎn)生到保存,從保存到判斷都會存在時間差,按照保存數(shù)據(jù)定時5分鐘一次,定時任務(wù)5分鐘一次。最高會產(chǎn)生10分鐘的誤差,這種告警就沒什么意義了。
demo設(shè)計
為了簡單的還原業(yè)務(wù)場景,做了簡單的demo假設(shè)
實現(xiàn)一個對于學(xué)生成績評價的實時處理程序
數(shù)學(xué)成績,基準范圍是90-140,超出告警
物理成績,基準范圍是60-95,超出告警
環(huán)境搭建
使用windows環(huán)境演示
準備工作
1、安裝jdk
2、安裝zookeeper
解壓壓縮包
zoo_sample.cfg將它重命名為zoo.cfg
修改配置 dataDir=D://tools//apache-zookeeper-3.5.10-bin//data
配置環(huán)境變量
3、安裝kafka
解壓壓縮包
修改config/server.properties
log.dirs=D://tools//kafka_2.11-2.1.0//log
flink程序代碼
pom
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.13.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.13.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>1.13.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.12</artifactId> <version>1.13.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.12</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.62</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.10.0</version> </dependency>
主程序
public class StreamAlertDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(3); Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties); DataStreamSource<String> inputDataStream = env.addSource(kafkaConsumer); DataStream<String> resultStream = inputDataStream.flatMap(new AlertFlatMapper()); resultStream.print().setParallelism(4); resultStream.addSink(new FlinkKafkaProducer<>("demo",new SimpleStringSchema(),properties)); env.execute(); } }
主程序,配置告警規(guī)則后期可以使用推送或者拉去方式獲取數(shù)據(jù)
public class RuleMap { private RuleMap(){} public final static Map<String,List<AlertRule>> initialRuleMap; private static List<AlertRule> ruleList = new ArrayList<>(); private static List<String> ruleStringList = new ArrayList<>(Arrays.asList( "{\"target\":\"MathVal\",\"type\":\"0\",\"criticalVal\":90,\"descInfo\":\"You Math score is too low\"}", "{\"target\":\"MathVal\",\"type\":\"2\",\"criticalVal\":140,\"descInfo\":\"You Math score is too high\"}", "{\"target\":\"PhysicsVal\",\"type\":\"0\",\"criticalVal\":60,\"descInfo\":\"You Physics score is too low\"}", "{\"target\":\"PhysicsVal\",\"type\":\"2\",\"criticalVal\":95,\"descInfo\":\"You Physics score is too high\"}")); static { for (String i : ruleStringList) { ruleList.add(JSON.parseObject(i, AlertRule.class)); } initialRuleMap = ruleList.stream().collect(Collectors.groupingBy(AlertRule::getTarget)); } }
AlertFlatMapper,處理告警邏輯
public class AlertFlatMapper implements FlatMapFunction<String, String> { @Override public void flatMap(String inVal, Collector<String> out) throws Exception { Achievement user = JSON.parseObject(inVal, Achievement.class); Map<String, List<AlertRule>> initialRuleMap = RuleMap.initialRuleMap; List<AlertInfo> resList = new ArrayList<>(); List<AlertRule> mathRule = initialRuleMap.get("MathVal"); for (AlertRule rule : mathRule) { if (checkVal(user.getMathVal(), rule.getCriticalVal(), rule.getType())) { resList.add(new AlertInfo(user.getName(), rule.getDescInfo())); } } List<AlertRule> physicsRule = initialRuleMap.get("PhysicsVal"); for (AlertRule rule : physicsRule) { if (checkVal(user.getPhysicsVal(), rule.getCriticalVal(), rule.getType())) { resList.add(new AlertInfo(user.getName(), rule.getDescInfo())); } } String result = JSON.toJSONString(resList); out.collect(result); } private static boolean checkVal(Integer actVal, Integer targetVal, Integer type) { switch (type) { case 0: return actVal < targetVal; case 1: return actVal.equals(targetVal); case 2: return actVal > targetVal; default: return false; } } }
三個實體類
@Data @EqualsAndHashCode(callSuper = false) @Accessors(chain = true) public class Achievement implements Serializable { private static final long serialVersionUID = -1L; private String name; private Integer mathVal; private Integer physicsVal; } @Data @AllArgsConstructor @EqualsAndHashCode(callSuper = false) @Accessors(chain = true) public class AlertInfo implements Serializable { private static final long serialVersionUID = -1L; private String name; private String descInfo; } @Data @AllArgsConstructor @EqualsAndHashCode(callSuper = false) @Accessors(chain = true) public class AlertRule implements Serializable { private static final long serialVersionUID = -1L; private String target; //0小于 1等于 2大于 private Integer type; private Integer criticalVal; private String descInfo; }
項目演示
創(chuàng)建kafka生產(chǎn)者 test
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
創(chuàng)建kafka消費者 demo
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic demo --from-beginning
啟動flink應(yīng)用
給topic test發(fā)送消息
{"name":"liu","MathVal":45,"PhysicsVal":76}
消費topic demo
告警系統(tǒng)架構(gòu)
到此這篇關(guān)于Java Flink與kafka實現(xiàn)實時告警功能過程的文章就介紹到這了,更多相關(guān)Java Flink與kafka實時告警內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring Boot教程之利用ActiveMQ實現(xiàn)延遲消息
這篇文章主要給大家介紹了關(guān)于Spring Boot教程之利用ActiveMQ實現(xiàn)延遲消息的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家學(xué)習(xí)或者使用Spring Boot具有一定的參考學(xué)習(xí)價值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧2019-11-11Java數(shù)據(jù)結(jié)構(gòu)之LinkedList的用法詳解
鏈表(Linked?list)是一種常見的基礎(chǔ)數(shù)據(jù)結(jié)構(gòu),是一種線性表。Java的LinkedList(鏈表)?類似于?ArrayList,是一種常用的數(shù)據(jù)容器,本文就來簡單講講它的使用吧2023-05-05Springboot?HTTP如何調(diào)用其他服務(wù)
這篇文章主要介紹了Springboot?HTTP如何調(diào)用其他服務(wù),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-01-01使用Java的Lucene搜索工具對檢索結(jié)果進行分組和分頁
這篇文章主要介紹了使用Java的搜索工具Lucene對檢索結(jié)果進行分組和分頁的方法,Luence是Java環(huán)境中的一個全文檢索引擎工具包,需要的朋友可以參考下2016-03-03SpringBoot定制JSON響應(yīng)數(shù)據(jù)返回的示例代碼
@JsonView 是 Jackson 庫中的一個注解,它允許你定義哪些屬性應(yīng)該被序列化到 JSON 中,基于不同的“視圖”或“配置”,在本文中,通過了解@JsonView,你將能夠更好地掌握如何在Spring Boot應(yīng)用中定制JSON數(shù)據(jù)的輸出,需要的朋友可以參考下2024-05-05