亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Spring?Cloud?Stream實(shí)現(xiàn)數(shù)據(jù)流處理

 更新時(shí)間:2024年11月19日 08:24:07   作者:HBLOG  
Spring?Cloud?Stream的核心是Stream,準(zhǔn)確來講Spring?Cloud?Stream提供了一整套數(shù)據(jù)流走向(流向)的API,?它的最終目的是使我們不關(guān)心數(shù)據(jù)的流入和寫出,而只關(guān)心對(duì)數(shù)據(jù)的業(yè)務(wù)處理,本文給大家介紹了Spring?Cloud?Stream實(shí)現(xiàn)數(shù)據(jù)流處理,需要的朋友可以參考下

1.什么是Spring Cloud Stream?

我看很多回答都是“為了屏蔽消息隊(duì)列的差異,使我們?cè)谑褂孟㈥?duì)列的時(shí)候能夠用統(tǒng)一的一套API,無需關(guān)心具體的消息隊(duì)列實(shí)現(xiàn)”。 這樣理解是有些不全面的,Spring Cloud Stream的核心是Stream,準(zhǔn)確來講Spring Cloud Stream提供了一整套數(shù)據(jù)流走向(流向)的API, 它的最終目的是使我們不關(guān)心數(shù)據(jù)的流入和寫出,而只關(guān)心對(duì)數(shù)據(jù)的業(yè)務(wù)處理 我們舉一個(gè)例子:你們公司有一套系統(tǒng),這套系統(tǒng)由多個(gè)模塊組成,你負(fù)責(zé)其中一個(gè)模塊。數(shù)據(jù)會(huì)從第一個(gè)模塊流入,處理完后再交給下一個(gè)模塊。對(duì)于你負(fù)責(zé)的這個(gè)模塊來說,它的功能就是接收上一個(gè)模塊處理完成的數(shù)據(jù),自己再加工加工,扔給下一個(gè)模塊。

我們很容易總結(jié)出每個(gè)模塊的流程:

  • 從上一個(gè)模塊拉取數(shù)據(jù)
  • 處理數(shù)據(jù)
  • 將處理完成的數(shù)據(jù)發(fā)給下一個(gè)模塊

其中流程1和3代表兩個(gè)模塊間的數(shù)據(jù)交互,這種數(shù)據(jù)交互往往會(huì)采用一些中間件(middleware)。比如模塊1和模塊2間數(shù)據(jù)可能使用的是kafka,模塊1向kafka中push數(shù)據(jù),模塊2向kafka中poll數(shù)據(jù)。而模塊2和模塊3可能使用的是rabbitMQ。很明顯,它們的功能都是一樣的:**提供數(shù)據(jù)的流向,讓數(shù)據(jù)可以流入自己同時(shí)又可以從自己流出發(fā)給別人。**但由于中間件的不同,需要使用不同的API。 為了消除這種數(shù)據(jù)流入(輸入)和數(shù)據(jù)流出(輸出)實(shí)現(xiàn)上的差異性,因此便出現(xiàn)了Spring Cloud Stream。

2.環(huán)境準(zhǔn)備

采用docker-compose搭建kafaka環(huán)境

version: '3'

networks:
  kafka:
    ipam:
      driver: default
      config:
        - subnet: "172.22.6.0/24"

services:
  zookepper:
    image: registry.cn-hangzhou.aliyuncs.com/zhengqing/zookeeper:latest
    container_name: zookeeper-server
    restart: unless-stopped
    volumes:
      - "/etc/localtime:/etc/localtime"
    environment:
      ALLOW_ANONYMOUS_LOGIN: yes
    ports:
      - "2181:2181"
    networks:
      kafka:
        ipv4_address: 172.22.6.11

  kafka:
    image: registry.cn-hangzhou.aliyuncs.com/zhengqing/kafka:3.4.1
    container_name: kafka
    restart: unless-stopped
    volumes:
      - "/etc/localtime:/etc/localtime"
    environment:
      ALLOW_PLAINTEXT_LISTENER: yes
      KAFKA_CFG_ZOOKEEPER_CONNECT: zookepper:2181
      KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://10.11.68.77:9092
    ports:
      - "9092:9092"
    depends_on:
      - zookepper
    networks:
      kafka:
        ipv4_address: 172.22.6.12

  kafka-map:
    image: registry.cn-hangzhou.aliyuncs.com/zhengqing/kafka-map
    container_name: kafka-map
    restart: unless-stopped
    volumes:
      - "./kafka/kafka-map/data:/usr/local/kafka-map/data"
    environment:
      DEFAULT_USERNAME: admin
      DEFAULT_PASSWORD: 123456
    ports:
      - "9080:8080"
    depends_on:                         
      - kafka
    networks:
      kafka:
        ipv4_address: 172.22.6.13

run

docker-compose -f docker-compose-kafka.yml -p kafka up -d

kafka-map

https://github.com/dushixiang/kafka-map

  • 訪問:http://127.0.0.1:9080
  • 賬號(hào)密碼:admin/123456

3.代碼工程

實(shí)驗(yàn)?zāi)繕?biāo)

  • 生成UUID并將其發(fā)送到Kafka主題batch-in。
  • batch-in主題接收UUID的批量消息,移除其中的數(shù)字,并將結(jié)果發(fā)送到batch-out主題。
  • 監(jiān)聽batch-out主題并打印接收到的消息。

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springcloud-demo</artifactId>
        <groupId>com.et</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>spring-cloud-stream-kafaka</artifactId>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
    </properties>
    <dependencies>
        <!-- Spring Boot Starter Web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- Spring Boot Starter Test -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

    </dependencies>

</project>

處理流

/*
 * Copyright 2023 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.et;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

import java.util.List;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.Supplier;

/**
 * @author Steven Gantz
 */
@SpringBootApplication
public class CloudStreamsFunctionBatch {

   public static void main(String[] args) {
      SpringApplication.run(CloudStreamsFunctionBatch.class, args);
   }

   @Bean
   public Supplier<UUID> stringSupplier() {
      return () -> {
         var uuid = UUID.randomUUID();
         System.out.println(uuid + " -> batch-in");
         return uuid;
      };
   }

   @Bean
   public Function<List<UUID>, List<Message<String>>> digitRemovingConsumer() {
      return idBatch -> {
         System.out.println("Removed digits from batch of " + idBatch.size());
         return idBatch.stream()
            .map(UUID::toString)
            // Remove all digits from the UUID
            .map(uuid -> uuid.replaceAll("\\d",""))
            .map(noDigitString -> MessageBuilder.withPayload(noDigitString).build())
            .toList();
      };
   }

   @KafkaListener(id = "batch-out", topics = "batch-out")
   public void listen(String in) {
      System.out.println("batch-out -> " + in);
   }

}
  • 定義一個(gè)名為stringSupplier的Bean,它實(shí)現(xiàn)了Supplier<UUID>接口。這個(gè)方法生成一個(gè)隨機(jī)的UUID,并打印到控制臺(tái),表示這個(gè)UUID將被發(fā)送到batch-in主題。

  • 定義一個(gè)名為digitRemovingConsumer的Bean,它實(shí)現(xiàn)了Function<List<UUID>, List<Message<String>>>接口。這個(gè)方法接受一個(gè)UUID的列表,打印出處理的UUID數(shù)量,然后將每個(gè)UUID轉(zhuǎn)換為字符串,移除其中的所有數(shù)字,最后將結(jié)果封裝為消息并返回。

  • 使用@KafkaListener注解定義一個(gè)Kafka監(jiān)聽器,監(jiān)聽batch-out主題。當(dāng)接收到消息時(shí),調(diào)用listen方法并打印接收到的消息內(nèi)容。

配置文件

spring:
  cloud:
    function:
      definition: stringSupplier;digitRemovingConsumer
    stream:
      bindings:
        stringSupplier-out-0:
          destination: batch-in
        digitRemovingConsumer-in-0:
          destination: batch-in
          group: batch-in
          consumer:
            batch-mode: true
        digitRemovingConsumer-out-0:
          destination: batch-out
      kafka:
        binder:
          brokers: localhost:9092
        bindings:
          digitRemovingConsumer-in-0:
            consumer:
              configuration:
                # Forces consumer to wait 5 seconds before polling for messages
                fetch.max.wait.ms: 5000
                fetch.min.bytes: 1000000000
                max.poll.records: 10000000

參數(shù)解釋

spring:
  cloud:
    function:
      definition: stringSupplier;digitRemovingConsumer

spring.cloud.function.definition:定義了兩個(gè)函數(shù),stringSupplierdigitRemovingConsumer。這兩個(gè)函數(shù)將在應(yīng)用程序中被使用。

stream:
  bindings:
    stringSupplier-out-0:
      destination: batch-in

stream.bindings.stringSupplier-out-0.destination:將stringSupplier函數(shù)的輸出綁定到Kafka主題batch-in。

    digitRemovingConsumer-in-0:
      destination: batch-in
      group: batch-in
      consumer:
        batch-mode: true
  • stream.bindings.digitRemovingConsumer-in-0.destination:將digitRemovingConsumer函數(shù)的輸入綁定到Kafka主題batch-in。

  • group: batch-in:指定消費(fèi)者組為batch-in,這意味著多個(gè)實(shí)例可以共享這個(gè)組來處理消息。

  • consumer.batch-mode: true:啟用批處理模式,允許消費(fèi)者一次處理多條消息。

    digitRemovingConsumer-out-0:
      destination: batch-out
  • stream.bindings.digitRemovingConsumer-out-0.destination:將digitRemovingConsumer函數(shù)的輸出綁定到Kafka主題batch-out

以上只是一些關(guān)鍵代碼

4.測(cè)試

啟動(dòng)弄Spring Boot應(yīng)用,可以看到控制臺(tái)輸出日志如下:

291ea6cc-1e5e-4dfb-92b6-5d5ea43d4277 -> batch-in
c746ba4e-835e-4f66-91c5-7a5cf8b01068 -> batch-in
a661145b-2dd9-4927-8806-919ad258ade5 -> batch-in
db150918-0f0b-49f6-b7bb-77b0f580de4c -> batch-in
b0d4917b-6777-4d96-a6d0-bb96715b5b20 -> batch-in
Removed digits from batch of 5
batch-out -> eacc-ee-dfb-b-dead
batch-out -> cbae-e-f-c-acfb
batch-out -> ab-dd---adade
batch-out -> db-fb-f-bbb-bfdec
batch-out -> bdb--d-ad-bbbb

以上就是Spring Cloud Stream實(shí)現(xiàn)數(shù)據(jù)流處理的詳細(xì)內(nèi)容,更多關(guān)于Spring Cloud Stream數(shù)據(jù)流處理的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 史上最全圖文講解Java泛型

    史上最全圖文講解Java泛型

    泛型在java中有很重要的地位,在面向?qū)ο缶幊碳案鞣N設(shè)計(jì)模式中有非常廣泛的應(yīng)用,下面這篇文章主要給大家介紹了Java泛型的相關(guān)資料,需要的朋友可以參考下
    2022-02-02
  • java map中相同的key保存多個(gè)value值方式

    java map中相同的key保存多個(gè)value值方式

    這篇文章主要介紹了java map中相同的key保存多個(gè)value值方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-08-08
  • Json轉(zhuǎn)list二層解析轉(zhuǎn)換代碼實(shí)例

    Json轉(zhuǎn)list二層解析轉(zhuǎn)換代碼實(shí)例

    這篇文章主要介紹了Json轉(zhuǎn)list二層解析轉(zhuǎn)換代碼實(shí)例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-12-12
  • Spring?Boot異步線程間數(shù)據(jù)傳遞的四種方式

    Spring?Boot異步線程間數(shù)據(jù)傳遞的四種方式

    這篇文章主要為大家介紹了Spring?Boot異步線程間數(shù)據(jù)傳遞的四種方式詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-01-01
  • JAVA泛型的繼承和實(shí)現(xiàn)、擦除原理解析

    JAVA泛型的繼承和實(shí)現(xiàn)、擦除原理解析

    這篇文章主要介紹了JAVA泛型的繼承和實(shí)現(xiàn)、擦除原理解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-11-11
  • 如何在Java中調(diào)用python文件執(zhí)行詳解

    如何在Java中調(diào)用python文件執(zhí)行詳解

    豐富的第三方庫使得python非常適合用于進(jìn)行數(shù)據(jù)分析,最近在項(xiàng)目中就涉及到j(luò)ava調(diào)用python實(shí)現(xiàn)的算法,下面這篇文章主要給大家介紹了關(guān)于如何在Java中調(diào)用python文件執(zhí)行的相關(guān)資料,需要的朋友可以參考下
    2022-05-05
  • Spring循環(huán)依賴的解決方法詳解

    Spring循環(huán)依賴的解決方法詳解

    Spring的解決循環(huán)依賴是有前置條件的,要解決循環(huán)依賴我們首先要了解Spring Bean對(duì)象的創(chuàng)建過程和依賴注入的方式。依賴注入方式,我之前的博客有所分享,大家可以在看本篇文章之前進(jìn)行一下小小的回顧
    2022-08-08
  • Java操作mongodb增刪改查的基本操作實(shí)戰(zhàn)指南

    Java操作mongodb增刪改查的基本操作實(shí)戰(zhàn)指南

    MongoDB是一個(gè)基于分布式文件存儲(chǔ)的數(shù)據(jù)庫,由c++語言編寫,旨在為WEB應(yīng)用提供可擴(kuò)展的高性能數(shù)據(jù)存儲(chǔ)解決方案,下面這篇文章主要給大家介紹了關(guān)于Java操作mongodb增刪改查的基本操作實(shí)戰(zhàn)指南,需要的朋友可以參考下
    2023-05-05
  • java編程小白進(jìn)階包的作用詳解

    java編程小白進(jìn)階包的作用詳解

    這篇文章主要為大家介紹了java編程中包的作用詳解,文中通過示例分析方便大家更容易理解包的作用,有需要的朋友可以借鑒參考下,希望能夠有所幫助
    2021-10-10
  • springboot多個(gè)service互相調(diào)用的事務(wù)處理方式

    springboot多個(gè)service互相調(diào)用的事務(wù)處理方式

    這篇文章主要介紹了springboot多個(gè)service互相調(diào)用的事務(wù)處理方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-02-02

最新評(píng)論