Spring?Cloud?Stream實(shí)現(xiàn)數(shù)據(jù)流處理
1.什么是Spring Cloud Stream?
我看很多回答都是“為了屏蔽消息隊(duì)列的差異,使我們?cè)谑褂孟㈥?duì)列的時(shí)候能夠用統(tǒng)一的一套API,無需關(guān)心具體的消息隊(duì)列實(shí)現(xiàn)”。 這樣理解是有些不全面的,Spring Cloud Stream的核心是Stream,準(zhǔn)確來講Spring Cloud Stream提供了一整套數(shù)據(jù)流走向(流向)的API, 它的最終目的是使我們不關(guān)心數(shù)據(jù)的流入和寫出,而只關(guān)心對(duì)數(shù)據(jù)的業(yè)務(wù)處理 我們舉一個(gè)例子:你們公司有一套系統(tǒng),這套系統(tǒng)由多個(gè)模塊組成,你負(fù)責(zé)其中一個(gè)模塊。數(shù)據(jù)會(huì)從第一個(gè)模塊流入,處理完后再交給下一個(gè)模塊。對(duì)于你負(fù)責(zé)的這個(gè)模塊來說,它的功能就是接收上一個(gè)模塊處理完成的數(shù)據(jù),自己再加工加工,扔給下一個(gè)模塊。
我們很容易總結(jié)出每個(gè)模塊的流程:
- 從上一個(gè)模塊拉取數(shù)據(jù)
- 處理數(shù)據(jù)
- 將處理完成的數(shù)據(jù)發(fā)給下一個(gè)模塊
其中流程1和3代表兩個(gè)模塊間的數(shù)據(jù)交互,這種數(shù)據(jù)交互往往會(huì)采用一些中間件(middleware)。比如模塊1和模塊2間數(shù)據(jù)可能使用的是kafka,模塊1向kafka中push數(shù)據(jù),模塊2向kafka中poll數(shù)據(jù)。而模塊2和模塊3可能使用的是rabbitMQ。很明顯,它們的功能都是一樣的:**提供數(shù)據(jù)的流向,讓數(shù)據(jù)可以流入自己同時(shí)又可以從自己流出發(fā)給別人。**但由于中間件的不同,需要使用不同的API。 為了消除這種數(shù)據(jù)流入(輸入)和數(shù)據(jù)流出(輸出)實(shí)現(xiàn)上的差異性,因此便出現(xiàn)了Spring Cloud Stream。
2.環(huán)境準(zhǔn)備
采用docker-compose搭建kafaka環(huán)境
version: '3' networks: kafka: ipam: driver: default config: - subnet: "172.22.6.0/24" services: zookepper: image: registry.cn-hangzhou.aliyuncs.com/zhengqing/zookeeper:latest container_name: zookeeper-server restart: unless-stopped volumes: - "/etc/localtime:/etc/localtime" environment: ALLOW_ANONYMOUS_LOGIN: yes ports: - "2181:2181" networks: kafka: ipv4_address: 172.22.6.11 kafka: image: registry.cn-hangzhou.aliyuncs.com/zhengqing/kafka:3.4.1 container_name: kafka restart: unless-stopped volumes: - "/etc/localtime:/etc/localtime" environment: ALLOW_PLAINTEXT_LISTENER: yes KAFKA_CFG_ZOOKEEPER_CONNECT: zookepper:2181 KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://10.11.68.77:9092 ports: - "9092:9092" depends_on: - zookepper networks: kafka: ipv4_address: 172.22.6.12 kafka-map: image: registry.cn-hangzhou.aliyuncs.com/zhengqing/kafka-map container_name: kafka-map restart: unless-stopped volumes: - "./kafka/kafka-map/data:/usr/local/kafka-map/data" environment: DEFAULT_USERNAME: admin DEFAULT_PASSWORD: 123456 ports: - "9080:8080" depends_on: - kafka networks: kafka: ipv4_address: 172.22.6.13
run
docker-compose -f docker-compose-kafka.yml -p kafka up -d
kafka-map
https://github.com/dushixiang/kafka-map
- 訪問:http://127.0.0.1:9080
- 賬號(hào)密碼:admin/123456
3.代碼工程
實(shí)驗(yàn)?zāi)繕?biāo)
- 生成UUID并將其發(fā)送到Kafka主題
batch-in
。 - 從
batch-in
主題接收UUID的批量消息,移除其中的數(shù)字,并將結(jié)果發(fā)送到batch-out
主題。 - 監(jiān)聽
batch-out
主題并打印接收到的消息。
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>springcloud-demo</artifactId> <groupId>com.et</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>spring-cloud-stream-kafaka</artifactId> <properties> <maven.compiler.source>17</maven.compiler.source> <maven.compiler.target>17</maven.compiler.target> </properties> <dependencies> <!-- Spring Boot Starter Web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- Spring Boot Starter Test --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies> </project>
處理流
/* * Copyright 2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.et; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import java.util.List; import java.util.UUID; import java.util.function.Function; import java.util.function.Supplier; /** * @author Steven Gantz */ @SpringBootApplication public class CloudStreamsFunctionBatch { public static void main(String[] args) { SpringApplication.run(CloudStreamsFunctionBatch.class, args); } @Bean public Supplier<UUID> stringSupplier() { return () -> { var uuid = UUID.randomUUID(); System.out.println(uuid + " -> batch-in"); return uuid; }; } @Bean public Function<List<UUID>, List<Message<String>>> digitRemovingConsumer() { return idBatch -> { System.out.println("Removed digits from batch of " + idBatch.size()); return idBatch.stream() .map(UUID::toString) // Remove all digits from the UUID .map(uuid -> uuid.replaceAll("\\d","")) .map(noDigitString -> MessageBuilder.withPayload(noDigitString).build()) .toList(); }; } @KafkaListener(id = "batch-out", topics = "batch-out") public void listen(String in) { System.out.println("batch-out -> " + in); } }
定義一個(gè)名為
stringSupplier
的Bean,它實(shí)現(xiàn)了Supplier<UUID>
接口。這個(gè)方法生成一個(gè)隨機(jī)的UUID,并打印到控制臺(tái),表示這個(gè)UUID將被發(fā)送到batch-in
主題。定義一個(gè)名為
digitRemovingConsumer
的Bean,它實(shí)現(xiàn)了Function<List<UUID>, List<Message<String>>>
接口。這個(gè)方法接受一個(gè)UUID的列表,打印出處理的UUID數(shù)量,然后將每個(gè)UUID轉(zhuǎn)換為字符串,移除其中的所有數(shù)字,最后將結(jié)果封裝為消息并返回。使用
@KafkaListener
注解定義一個(gè)Kafka監(jiān)聽器,監(jiān)聽batch-out
主題。當(dāng)接收到消息時(shí),調(diào)用listen
方法并打印接收到的消息內(nèi)容。
配置文件
spring: cloud: function: definition: stringSupplier;digitRemovingConsumer stream: bindings: stringSupplier-out-0: destination: batch-in digitRemovingConsumer-in-0: destination: batch-in group: batch-in consumer: batch-mode: true digitRemovingConsumer-out-0: destination: batch-out kafka: binder: brokers: localhost:9092 bindings: digitRemovingConsumer-in-0: consumer: configuration: # Forces consumer to wait 5 seconds before polling for messages fetch.max.wait.ms: 5000 fetch.min.bytes: 1000000000 max.poll.records: 10000000
參數(shù)解釋
spring: cloud: function: definition: stringSupplier;digitRemovingConsumer
spring.cloud.function.definition
:定義了兩個(gè)函數(shù),stringSupplier
和digitRemovingConsumer
。這兩個(gè)函數(shù)將在應(yīng)用程序中被使用。
stream: bindings: stringSupplier-out-0: destination: batch-in
stream.bindings.stringSupplier-out-0.destination
:將stringSupplier
函數(shù)的輸出綁定到Kafka主題batch-in
。
digitRemovingConsumer-in-0: destination: batch-in group: batch-in consumer: batch-mode: true
stream.bindings.digitRemovingConsumer-in-0.destination
:將digitRemovingConsumer
函數(shù)的輸入綁定到Kafka主題batch-in
。group: batch-in
:指定消費(fèi)者組為batch-in
,這意味著多個(gè)實(shí)例可以共享這個(gè)組來處理消息。consumer.batch-mode: true
:啟用批處理模式,允許消費(fèi)者一次處理多條消息。
digitRemovingConsumer-out-0: destination: batch-out
stream.bindings.digitRemovingConsumer-out-0.destination
:將digitRemovingConsumer
函數(shù)的輸出綁定到Kafka主題batch-out
。
以上只是一些關(guān)鍵代碼
4.測(cè)試
啟動(dòng)弄Spring Boot應(yīng)用,可以看到控制臺(tái)輸出日志如下:
291ea6cc-1e5e-4dfb-92b6-5d5ea43d4277 -> batch-in c746ba4e-835e-4f66-91c5-7a5cf8b01068 -> batch-in a661145b-2dd9-4927-8806-919ad258ade5 -> batch-in db150918-0f0b-49f6-b7bb-77b0f580de4c -> batch-in b0d4917b-6777-4d96-a6d0-bb96715b5b20 -> batch-in Removed digits from batch of 5 batch-out -> eacc-ee-dfb-b-dead batch-out -> cbae-e-f-c-acfb batch-out -> ab-dd---adade batch-out -> db-fb-f-bbb-bfdec batch-out -> bdb--d-ad-bbbb
以上就是Spring Cloud Stream實(shí)現(xiàn)數(shù)據(jù)流處理的詳細(xì)內(nèi)容,更多關(guān)于Spring Cloud Stream數(shù)據(jù)流處理的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
java map中相同的key保存多個(gè)value值方式
這篇文章主要介紹了java map中相同的key保存多個(gè)value值方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08Json轉(zhuǎn)list二層解析轉(zhuǎn)換代碼實(shí)例
這篇文章主要介紹了Json轉(zhuǎn)list二層解析轉(zhuǎn)換代碼實(shí)例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-12-12Spring?Boot異步線程間數(shù)據(jù)傳遞的四種方式
這篇文章主要為大家介紹了Spring?Boot異步線程間數(shù)據(jù)傳遞的四種方式詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-01-01JAVA泛型的繼承和實(shí)現(xiàn)、擦除原理解析
這篇文章主要介紹了JAVA泛型的繼承和實(shí)現(xiàn)、擦除原理解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-11-11如何在Java中調(diào)用python文件執(zhí)行詳解
豐富的第三方庫使得python非常適合用于進(jìn)行數(shù)據(jù)分析,最近在項(xiàng)目中就涉及到j(luò)ava調(diào)用python實(shí)現(xiàn)的算法,下面這篇文章主要給大家介紹了關(guān)于如何在Java中調(diào)用python文件執(zhí)行的相關(guān)資料,需要的朋友可以參考下2022-05-05Java操作mongodb增刪改查的基本操作實(shí)戰(zhàn)指南
MongoDB是一個(gè)基于分布式文件存儲(chǔ)的數(shù)據(jù)庫,由c++語言編寫,旨在為WEB應(yīng)用提供可擴(kuò)展的高性能數(shù)據(jù)存儲(chǔ)解決方案,下面這篇文章主要給大家介紹了關(guān)于Java操作mongodb增刪改查的基本操作實(shí)戰(zhàn)指南,需要的朋友可以參考下2023-05-05springboot多個(gè)service互相調(diào)用的事務(wù)處理方式
這篇文章主要介紹了springboot多個(gè)service互相調(diào)用的事務(wù)處理方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-02-02