亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

SpringCloudStream原理和深入使用小結(jié)

 更新時(shí)間:2024年06月20日 12:29:59   作者:7仔要加油  
Spring?Cloud?Stream是一個(gè)用于構(gòu)建與共享消息傳遞系統(tǒng)連接的高度可擴(kuò)展的事件驅(qū)動(dòng)型微服務(wù)的框架,本文給大家介紹SpringCloudStream原理和深入使用,感興趣的朋友跟隨小編一起看看吧

簡單概述

Spring Cloud Stream是一個(gè)用于構(gòu)建與共享消息傳遞系統(tǒng)連接的高度可擴(kuò)展的事件驅(qū)動(dòng)型微服務(wù)的框架。

應(yīng)用程序通過inputs或outputs來與Spring Cloud Stream中binder對(duì)象交互,binder對(duì)象負(fù)責(zé)與消息中間件交互。也就是說:Spring Cloud Stream能夠屏蔽底層消息中間件【RabbitMQ,kafka等】的差異,降低切換成本,統(tǒng)一消息的編程模型

相關(guān)概念

Channel(通道):Channel是消息的傳輸管道,用于在生產(chǎn)者和消費(fèi)者之間傳遞消息。生產(chǎn)者通過輸出通道將消息發(fā)送到Destination,消費(fèi)者通過輸入通道從Destination接收消息。

在Spring Cloud Stream中,有兩種類型的通道:輸入(input)和輸出(output)。這兩種通道分別用于消費(fèi)者接收消息和生產(chǎn)者發(fā)送消息。

  • Input(輸入):Input通道用于消費(fèi)者從消息代理接收消息。消費(fèi)者可以通過監(jiān)聽Input通道來實(shí)時(shí)接收傳入的消息
  • Output(輸出):Output通道用于生產(chǎn)者向消息代理發(fā)送消息。生產(chǎn)者可以通過向Output通道發(fā)送消息來發(fā)布新的消息

Destination(目標(biāo)):Destination是消息的目的地,通常對(duì)應(yīng)于消息代理中的Topic或Queue。生產(chǎn)者將消息發(fā)送到特定的Destination,消費(fèi)者從其中接收消息。

Binder(綁定器):Binder是Spring Cloud Stream的核心組件之一。它作為消息代理與外部消息中間件進(jìn)行交互,并負(fù)責(zé)將消息發(fā)送到消息總線或從消息總線接收消息。Binder負(fù)責(zé)處理消息傳遞、序列化、反序列化、消息路由等底層細(xì)節(jié),使得開發(fā)者能夠以統(tǒng)一的方式與不同的消息中間件進(jìn)行交互。Spring Cloud Stream提供了多個(gè)可用的Binder實(shí)現(xiàn),包括RabbitMQ、Kafka等。

**消費(fèi)者組:**在Spring Cloud Stream中,消費(fèi)組(Consumer Group)是一組具有相同功能的消費(fèi)者實(shí)例。當(dāng)多個(gè)消費(fèi)者實(shí)例屬于同一個(gè)消費(fèi)組時(shí),消息代理會(huì)將消息均勻地分發(fā)給消費(fèi)者實(shí)例,以實(shí)現(xiàn)負(fù)載均衡。如果其中一個(gè)消費(fèi)者實(shí)例失效,消息代理會(huì)自動(dòng)將消息重新分配給其他可用的消費(fèi)者實(shí)例,以實(shí)現(xiàn)高可用性。(對(duì)于一個(gè)消息來說,每個(gè)消費(fèi)者組只會(huì)有一個(gè)消費(fèi)者消費(fèi)消息)

分區(qū):Spring Cloud Stream支持在多個(gè)消費(fèi)者實(shí)例之間創(chuàng)建分區(qū),這樣我們通過某些特征量做消息分發(fā),保證相同標(biāo)識(shí)的消息總是能被同一個(gè)消費(fèi)者處理

Spring Message

Spring Message是Spring Framework的一個(gè)模塊,其作用就是統(tǒng)一消息的編程模型。

package org.springframework.messaging;
public interface Message<T> {
    T getPayload();
    MessageHeaders getHeaders();
}

消息通道 MessageChannel 用于接收消息,調(diào)用send方法可以將消息發(fā)送至該消息通道中:

@FunctionalInterface
public interface MessageChannel {
	long INDEFINITE_TIMEOUT = -1;
	default boolean send(Message<?> message) {
		return send(message, INDEFINITE_TIMEOUT);
	}
	boolean send(Message<?> message, long timeout);
}

消息通道里的消息由消息通道的子接口可訂閱的消息通道SubscribableChannel實(shí)現(xiàn),被MessageHandler消息處理器所訂閱

public interface SubscribableChannel extends MessageChannel {
	boolean subscribe(MessageHandler handler);
	boolean unsubscribe(MessageHandler handler);
}

MessageHandler真正地消費(fèi)/處理消息

@FunctionalInterface
public interface MessageHandler {
    void handleMessage(Message<?> message) throws MessagingException;
}

Spring Integration

Spring Integration 提供了 Spring 編程模型的擴(kuò)展用來支持企業(yè)集成模式(Enterprise Integration Patterns),是對(duì) Spring Messaging 的擴(kuò)展。

它提出了不少新的概念,包括消息路由MessageRoute、消息分發(fā)MessageDispatcher、消息過濾Filter、消息轉(zhuǎn)換Transformer、消息聚合Aggregator、消息分割Splitter等等。同時(shí)還提供了MessageChannel和MessageHandler的實(shí)現(xiàn),分別包括 DirectChannel、ExecutorChannel、PublishSubscribeChannel和MessageFilter、ServiceActivatingHandler、MethodInvokingSplitter 等內(nèi)容。

Spring-Cloud-Stream的架構(gòu)

img

快速入門

引入依賴

        <!--stream-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

增加配置文件

spring:
    cloud:
        stream:
            # 定義消息中間件
            binders:
              MyRabbit:
                  type: rabbit
                  environment:
                    spring:
                        rabbitmq:
                            host: localhost
                            port: 5672
                            username: root
                            password: root
                            vhost: /
            bindings:
            # 生產(chǎn)者中定義,定義發(fā)布對(duì)象
              myInput:
                destination: myStreamExchange
                group: myStreamGroup
                binder: MyRabbit
            # 消費(fèi)者中定義,定義訂閱的對(duì)象
              myOutput-in-0:
                destination: myStreamExchange
                group: myStreamGroup
                binder: MyRabbit
        # 消費(fèi)者中定義,定義輸出的函數(shù)
        function:
            definition: myOutput

生產(chǎn)者

@Resource
	private StreamBridge streamBridge;
	public void sendNormal() {
		streamBridge.send("myInput", "hello world");
	}

消費(fèi)者

@Bean("myOutput")
	public Consumer<Message<String>> myOutput() {
		return (message) -> {
			MessageHeaders headers = message.getHeaders();
			System.out.println("myOutput head is : " + headers);
			String payload = message.getPayload();
			System.out.println("myOutput payload is : " + payload);
		};
	}

如何自定義Binder

  • 添加spring-cloud-stream依賴
  • 提供ProvisioningProvider的實(shí)現(xiàn)提供
  • MessageProducer的實(shí)現(xiàn)提供
  • MessageHandler的實(shí)現(xiàn)提供
  • Binder的實(shí)現(xiàn)創(chuàng)建Binder的配置
  • META-INF/spring.binders中定義綁定器

添加spring-cloud-stream依賴

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
    <version>${spring.cloud.stream.version}</version>
</dependency>

提供ProvisioningProvider的實(shí)現(xiàn)

ProvisioningProvider負(fù)責(zé)提供消費(fèi)者和生產(chǎn)者目的地,并需要將 application.yml 或 application.properties 文件中包含的邏輯目的地轉(zhuǎn)換為物理目的地引用。

public class FileProvisioningProvider implements ProvisioningProvider<
	ExtendedConsumerProperties<FileConsumerProperties>, ExtendedProducerProperties<FileProducerProperties>> {
	public FileProvisioningProvider() {
		super();
	}
	@Override
	public ProducerDestination provisionProducerDestination(String name, ExtendedProducerProperties<FileProducerProperties> properties) throws ProvisioningException {
		return new FileMessageDestination(name);
	}
	@Override
	public ConsumerDestination provisionConsumerDestination(String name, String group, ExtendedConsumerProperties<FileConsumerProperties> properties) throws ProvisioningException {
		return new FileMessageDestination(name);
	}
	private static class FileMessageDestination implements ProducerDestination, ConsumerDestination {
		private final String destination;
		private FileMessageDestination(final String destination) {
			this.destination = destination;
		}
		@Override
		public String getName() {
			return destination.trim();
		}
		@Override
		public String getNameForPartition(int partition) {
			throw new UnsupportedOperationException("Partitioning is not implemented for file messaging.");
		}
	}
}

提供MessageProducer的實(shí)現(xiàn)

MessageProducer負(fù)責(zé)使用事件并將其作為消息處理,發(fā)送給配置為使用此類事件的客戶端應(yīng)用程序。

super.onInit();
		executorService = Executors.newScheduledThreadPool(1);
	}
	@Override
	public void doStart() {
		executorService.scheduleWithFixedDelay(() -> {
			String payload = getPayload();
			if (payload != null) {
				Message<String> receivedMessage = MessageBuilder.withPayload(payload).build();
				sendMessage(receivedMessage);
			}
		}, 0, 50, TimeUnit.MILLISECONDS);
	}
	@Override
	protected void doStop() {
		executorService.shutdownNow();
	}
	private String getPayload() {
		try {
			List<String> allLines = Files.readAllLines(Paths.get(fileExtendedBindingProperties.getPath() + File.separator + destination.getName() + ".txt"));
			String currentPayload = allLines.get(allLines.size() - 1);
			if (!currentPayload.equals(previousPayload)) {
				previousPayload = currentPayload;
				return currentPayload;
			}
		} catch (IOException e) {
			FileUtil.touch(new File(fileExtendedBindingProperties.getPath() + File.separator + destination.getName() + ".txt"));
		}
		return null;
	}
}

提供MessageHandler的實(shí)現(xiàn)

MessageHandler提供產(chǎn)生事件所需的邏輯。

public class FileMessageHandler extends AbstractMessageHandler {
	FileExtendedBindingProperties fileExtendedBindingProperties;
	ProducerDestination destination;
	public FileMessageHandler(ProducerDestination destination, FileExtendedBindingProperties fileExtendedBindingProperties) {
		this.destination = destination;
		this.fileExtendedBindingProperties = fileExtendedBindingProperties;
	}
	@Override
	protected void handleMessageInternal(Message<?> message) {
		try {
			if (message.getPayload() instanceof byte[]) {
				Files.write(Paths.get(fileExtendedBindingProperties.getPath() + File.separator + destination.getName() + ".txt"), (byte[]) message.getPayload());
			} else {
				throw new RuntimeException("處理消息失敗");
			}
		} catch (IOException e) {
			throw new RuntimeException(e);
		}
	}
}

提供Binder的實(shí)現(xiàn)

提供自己的Binder抽象實(shí)現(xiàn):

  • 擴(kuò)展AbstractMessageChannelBinder
  • 將自定義的 ProvisioningProvider 指定為 AbstractMessageChannelBinder 的通用參數(shù)
  • 重寫createProducerMessageHandlercreateConsumerEndpoint方法
public class FileMessageChannelBinder extends AbstractMessageChannelBinder
	<ExtendedConsumerProperties<FileConsumerProperties>, ExtendedProducerProperties<FileProducerProperties>, FileProvisioningProvider>
	implements ExtendedPropertiesBinder<MessageChannel, FileConsumerProperties, FileProducerProperties> {
	FileExtendedBindingProperties fileExtendedBindingProperties;
	public FileMessageChannelBinder(String[] headersToEmbed, FileProvisioningProvider provisioningProvider, FileExtendedBindingProperties fileExtendedBindingProperties) {
		super(headersToEmbed, provisioningProvider);
		this.fileExtendedBindingProperties = fileExtendedBindingProperties;
	}
	@Override
	protected MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties<FileProducerProperties> producerProperties, MessageChannel errorChannel) throws Exception {
		FileMessageHandler fileMessageHandler = new FileMessageHandler(destination, fileExtendedBindingProperties);
		return fileMessageHandler;
	}
	@Override
	protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, ExtendedConsumerProperties<FileConsumerProperties> properties) throws Exception {
		FileMessageProducerAdapter fileMessageProducerAdapter = new FileMessageProducerAdapter(destination, fileExtendedBindingProperties);
		return fileMessageProducerAdapter;
	}
	@Override
	public FileConsumerProperties getExtendedConsumerProperties(String channelName) {
		return fileExtendedBindingProperties.getExtendedConsumerProperties(channelName);
	}
	@Override
	public FileProducerProperties getExtendedProducerProperties(String channelName) {
		return fileExtendedBindingProperties.getExtendedProducerProperties(channelName);
	}
	@Override
	public String getDefaultsPrefix() {
		return fileExtendedBindingProperties.getDefaultsPrefix();
	}
	@Override
	public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
		return fileExtendedBindingProperties.getExtendedPropertiesEntryClass();
	}
}

創(chuàng)建Binder的配置

嚴(yán)格要求創(chuàng)建一個(gè) Spring 配置來初始化你的綁定器實(shí)現(xiàn)的 bean

@EnableConfigurationProperties(FileExtendedBindingProperties.class)
@Configuration
public class FileMessageBinderConfiguration {
	@Bean
	@ConditionalOnMissingBean
	public FileProvisioningProvider fileMessageBinderProvisioner() {
		return new FileProvisioningProvider();
	}
	@Bean
	@ConditionalOnMissingBean
	public FileMessageChannelBinder fileMessageBinder(FileProvisioningProvider fileMessageBinderProvisioner, FileExtendedBindingProperties fileExtendedBindingProperties) {
		return new FileMessageChannelBinder(null, fileMessageBinderProvisioner, fileExtendedBindingProperties);
	}
	@Bean
	public FileProducerProperties fileConsumerProperties() {
		return new FileProducerProperties();
	}
}

詳細(xì)的代碼見https://gitee.com/xiaovcloud/spring-cloud-stream

到此這篇關(guān)于SpringCloudStream原理和深入使用的文章就介紹到這了,更多相關(guān)SpringCloudStream原理內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Spring @Retryable注解輕松搞定循環(huán)重試功能

    Spring @Retryable注解輕松搞定循環(huán)重試功能

    spring系列的spring-retry是另一個(gè)實(shí)用程序模塊,可以幫助我們以標(biāo)準(zhǔn)方式處理任何特定操作的重試。在spring-retry中,所有配置都是基于簡單注釋的。本文主要介紹了Spring@Retryable注解如何輕松搞定循環(huán)重試功能,有需要的朋友可以參考一下
    2023-04-04
  • SpringBoot項(xiàng)目application.yml文件數(shù)據(jù)庫配置密碼加密的方法

    SpringBoot項(xiàng)目application.yml文件數(shù)據(jù)庫配置密碼加密的方法

    這篇文章主要介紹了SpringBoot項(xiàng)目application.yml文件數(shù)據(jù)庫配置密碼加密的方法,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-03-03
  • Spring Security基于散列加密方案實(shí)現(xiàn)自動(dòng)登錄功能

    Spring Security基于散列加密方案實(shí)現(xiàn)自動(dòng)登錄功能

    為了提高項(xiàng)目的用戶體驗(yàn),我們可以在項(xiàng)目中添加自動(dòng)登錄功能,當(dāng)然也要給用戶提供退出登錄的功能。接下來學(xué)習(xí)下Spring Security基于散列加密方案實(shí)現(xiàn)自動(dòng)登錄功能,一起看看吧
    2021-09-09
  • Java:String.split()特殊字符處理操作

    Java:String.split()特殊字符處理操作

    這篇文章主要介紹了Java:String.split()特殊字符處理操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2020-10-10
  • 詳解Java中CountDownLatch的用法

    詳解Java中CountDownLatch的用法

    這篇文章主要為大家詳細(xì)介紹了Java中CountDownLatch類的用法,本文通過一些簡單的示例進(jìn)行了簡單介紹,感興趣的小伙伴可以跟隨小編一起了解一下
    2023-05-05
  • 詳解Java編程中if...else語句的嵌套寫法

    詳解Java編程中if...else語句的嵌套寫法

    這篇文章主要介紹了Java編程中if...else語句的嵌套寫法,是Java入門學(xué)習(xí)中的基礎(chǔ)知識(shí),需要的朋友可以參考下
    2015-11-11
  • Java中ArrayList在foreach里remove的問題詳析

    Java中ArrayList在foreach里remove的問題詳析

    這篇文章主要給大家介紹了關(guān)于Java中ArrayList在foreach里remove問題的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來一起看看吧
    2018-09-09
  • Java魔法值處理的四種方式

    Java魔法值處理的四種方式

    這篇文章主要介紹了Java魔法值處理的四種方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-11-11
  • springboot責(zé)任鏈模式實(shí)現(xiàn)多級(jí)校驗(yàn)

    springboot責(zé)任鏈模式實(shí)現(xiàn)多級(jí)校驗(yàn)

    責(zé)任鏈模式是將鏈中的每一個(gè)節(jié)點(diǎn)看作是一個(gè)對(duì)象,每個(gè)節(jié)點(diǎn)處理的請(qǐng)求不同,且內(nèi)部自動(dòng)維護(hù)一個(gè)下一節(jié)點(diǎn)對(duì)象,下面我們來聊聊springboot如何利用責(zé)任鏈模式實(shí)現(xiàn)多級(jí)校驗(yàn)吧
    2024-11-11
  • Java中特殊運(yùn)算符及其應(yīng)用詳解

    Java中特殊運(yùn)算符及其應(yīng)用詳解

    當(dāng)涉及位操作和位級(jí)運(yùn)算時(shí),Java?提供了一組特殊的運(yùn)算符,即左移(<<)和右移(>>)運(yùn)算符,下面小編就帶大家深入了解一下它們的具體應(yīng)用吧
    2023-08-08

最新評(píng)論