亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Java Flink與kafka實現(xiàn)實時告警功能過程

 更新時間:2023年01月18日 09:45:29   作者:欽拆大仁  
這篇文章主要介紹了Java Flink與kafka實現(xiàn)實時告警功能,本文通過示例代碼給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下

引出問題

項目使用告警系統(tǒng)的邏輯是將實時數(shù)據(jù)保存到本地數(shù)據(jù)庫再使用定時任務(wù)做判斷,然后產(chǎn)生告警數(shù)據(jù)。這種方式存在告警的延時實在是太高了。數(shù)據(jù)從產(chǎn)生到保存,從保存到判斷都會存在時間差,按照保存數(shù)據(jù)定時5分鐘一次,定時任務(wù)5分鐘一次。最高會產(chǎn)生10分鐘的誤差,這種告警就沒什么意義了。

demo設(shè)計

為了簡單的還原業(yè)務(wù)場景,做了簡單的demo假設(shè)

實現(xiàn)一個對于學(xué)生成績評價的實時處理程序

數(shù)學(xué)成績,基準范圍是90-140,超出告警

物理成績,基準范圍是60-95,超出告警

環(huán)境搭建

使用windows環(huán)境演示

準備工作

1、安裝jdk

2、安裝zookeeper

解壓壓縮包

zoo_sample.cfg將它重命名為zoo.cfg

修改配置 dataDir=D://tools//apache-zookeeper-3.5.10-bin//data

配置環(huán)境變量

3、安裝kafka

解壓壓縮包

修改config/server.properties

log.dirs=D://tools//kafka_2.11-2.1.0//log

flink程序代碼

pom

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.13.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.12</artifactId>
    <version>1.13.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.12</artifactId>
    <version>1.13.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.12</artifactId>
    <version>1.13.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.12</version>
    <scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.62</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.10.0</version>
</dependency>

主程序

public class StreamAlertDemo {
	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(3);
		Properties properties = new Properties();
		properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
		FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
		DataStreamSource<String> inputDataStream = env.addSource(kafkaConsumer);
		DataStream<String> resultStream = inputDataStream.flatMap(new AlertFlatMapper());
		resultStream.print().setParallelism(4);
		resultStream.addSink(new FlinkKafkaProducer<>("demo",new SimpleStringSchema(),properties));
		env.execute();
	}
}
主程序,配置告警規(guī)則后期可以使用推送或者拉去方式獲取數(shù)據(jù)
public class RuleMap {
	private RuleMap(){}
	public final static Map<String,List<AlertRule>> initialRuleMap;
	private static List<AlertRule> ruleList = new ArrayList<>();
	private static List<String> ruleStringList = new ArrayList<>(Arrays.asList(
			"{\"target\":\"MathVal\",\"type\":\"0\",\"criticalVal\":90,\"descInfo\":\"You Math score is too low\"}",
			"{\"target\":\"MathVal\",\"type\":\"2\",\"criticalVal\":140,\"descInfo\":\"You Math score is too high\"}",
			"{\"target\":\"PhysicsVal\",\"type\":\"0\",\"criticalVal\":60,\"descInfo\":\"You Physics score is too low\"}",
			"{\"target\":\"PhysicsVal\",\"type\":\"2\",\"criticalVal\":95,\"descInfo\":\"You Physics score is too high\"}"));
	static {
		for (String i : ruleStringList) {
			ruleList.add(JSON.parseObject(i, AlertRule.class));
		}
		initialRuleMap = ruleList.stream().collect(Collectors.groupingBy(AlertRule::getTarget));
	}
}

AlertFlatMapper,處理告警邏輯

public class AlertFlatMapper implements FlatMapFunction<String, String> {
	@Override
	public void flatMap(String inVal, Collector<String> out) throws Exception {
		Achievement user = JSON.parseObject(inVal, Achievement.class);
		Map<String, List<AlertRule>> initialRuleMap = RuleMap.initialRuleMap;
		List<AlertInfo> resList = new ArrayList<>();
		List<AlertRule> mathRule = initialRuleMap.get("MathVal");
		for (AlertRule rule : mathRule) {
			if (checkVal(user.getMathVal(), rule.getCriticalVal(), rule.getType())) {
				resList.add(new AlertInfo(user.getName(), rule.getDescInfo()));
			}
		}
		List<AlertRule> physicsRule = initialRuleMap.get("PhysicsVal");
		for (AlertRule rule : physicsRule) {
			if (checkVal(user.getPhysicsVal(), rule.getCriticalVal(), rule.getType())) {
				resList.add(new AlertInfo(user.getName(), rule.getDescInfo()));
			}
		}
		String result = JSON.toJSONString(resList);
		out.collect(result);
	}
	private static boolean checkVal(Integer actVal, Integer targetVal, Integer type) {
		switch (type) {
			case 0:
				return actVal < targetVal;
			case 1:
				return actVal.equals(targetVal);
			case 2:
				return actVal > targetVal;
			default:
				return false;
		}
	}
}

三個實體類

@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class Achievement implements Serializable {
    private static final long serialVersionUID = -1L;
    private String name;
    private Integer mathVal;
    private Integer physicsVal;
}
@Data
@AllArgsConstructor
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class AlertInfo implements Serializable {
    private static final long serialVersionUID = -1L;
    private String name;
    private String descInfo;
}
@Data
@AllArgsConstructor
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class AlertRule implements Serializable {
	private static final long serialVersionUID = -1L;
	private String target;
	//0小于 1等于 2大于
	private Integer type;
	private Integer criticalVal;
	private String descInfo;
}

項目演示

創(chuàng)建kafka生產(chǎn)者 test

.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test

創(chuàng)建kafka消費者 demo

.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic demo --from-beginning

啟動flink應(yīng)用

給topic test發(fā)送消息

{"name":"liu","MathVal":45,"PhysicsVal":76}

消費topic demo

告警系統(tǒng)架構(gòu)

到此這篇關(guān)于Java Flink與kafka實現(xiàn)實時告警功能過程的文章就介紹到這了,更多相關(guān)Java Flink與kafka實時告警內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Spring中的循環(huán)依賴問題

    Spring中的循環(huán)依賴問題

    在Spring框架中,循環(huán)依賴是指兩個或多個Bean相互依賴,這導(dǎo)致在Bean的創(chuàng)建過程中出現(xiàn)依賴死鎖,為了解決這一問題,Spring引入了三級緩存機制,包括singletonObjects、earlySingletonObjects和singletonFactories
    2024-09-09
  • Spring Boot教程之利用ActiveMQ實現(xiàn)延遲消息

    Spring Boot教程之利用ActiveMQ實現(xiàn)延遲消息

    這篇文章主要給大家介紹了關(guān)于Spring Boot教程之利用ActiveMQ實現(xiàn)延遲消息的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家學(xué)習(xí)或者使用Spring Boot具有一定的參考學(xué)習(xí)價值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-11-11
  • SpringBoot整個啟動過程的分析

    SpringBoot整個啟動過程的分析

    今天小編就為大家分享一篇關(guān)于SpringBoot整個啟動過程的分析,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧
    2019-03-03
  • Java數(shù)據(jù)結(jié)構(gòu)之LinkedList的用法詳解

    Java數(shù)據(jù)結(jié)構(gòu)之LinkedList的用法詳解

    鏈表(Linked?list)是一種常見的基礎(chǔ)數(shù)據(jù)結(jié)構(gòu),是一種線性表。Java的LinkedList(鏈表)?類似于?ArrayList,是一種常用的數(shù)據(jù)容器,本文就來簡單講講它的使用吧
    2023-05-05
  • Springboot?HTTP如何調(diào)用其他服務(wù)

    Springboot?HTTP如何調(diào)用其他服務(wù)

    這篇文章主要介紹了Springboot?HTTP如何調(diào)用其他服務(wù),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-01-01
  • 使用Java的Lucene搜索工具對檢索結(jié)果進行分組和分頁

    使用Java的Lucene搜索工具對檢索結(jié)果進行分組和分頁

    這篇文章主要介紹了使用Java的搜索工具Lucene對檢索結(jié)果進行分組和分頁的方法,Luence是Java環(huán)境中的一個全文檢索引擎工具包,需要的朋友可以參考下
    2016-03-03
  • Spring自定義注解配置簡單日志示例

    Spring自定義注解配置簡單日志示例

    這篇文章主要介紹了Spring自定義注解配置簡單日志示例,注解可以增強我們的java代碼,同時利用反射技術(shù)可以擴充實現(xiàn)很多功能,它們被廣泛應(yīng)用于三大框架底層,需要的朋友可以參考下
    2023-05-05
  • SpringBoot定制JSON響應(yīng)數(shù)據(jù)返回的示例代碼

    SpringBoot定制JSON響應(yīng)數(shù)據(jù)返回的示例代碼

    @JsonView 是 Jackson 庫中的一個注解,它允許你定義哪些屬性應(yīng)該被序列化到 JSON 中,基于不同的“視圖”或“配置”,在本文中,通過了解@JsonView,你將能夠更好地掌握如何在Spring Boot應(yīng)用中定制JSON數(shù)據(jù)的輸出,需要的朋友可以參考下
    2024-05-05
  • 解決Spring?MVC中文亂碼的編碼配置

    解決Spring?MVC中文亂碼的編碼配置

    這篇文章主要為大家介紹了解決SpringMVC中文亂碼的編碼配置示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-10-10
  • Java OOP三大特征之封裝繼承與多態(tài)詳解

    Java OOP三大特征之封裝繼承與多態(tài)詳解

    本文主要講述的是面向?qū)ο蟮娜筇匦裕悍庋b,繼承,多態(tài),內(nèi)容含括從封裝到繼承再到多態(tài)的所有重點內(nèi)容以及使用細節(jié)和注意事項,內(nèi)容有點長,請大家耐心看完
    2022-07-07

最新評論