亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

SpringAMQP的使用方式案例詳解

 更新時(shí)間:2024年01月06日 09:30:08   作者:Winter.169  
這篇文章主要介紹了SpringAMQP的使用方式,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧

MQ介紹

MQ,中文是消息隊(duì)列(MessageQueue),字面來看就是存放消息的隊(duì)列。也就是事件驅(qū)動(dòng)架構(gòu)中的Broker。

比較常見的MQ實(shí)現(xiàn):

  • ActiveMQ
  • RabbitMQ
  • RocketMQ
  • Kafka

幾種常見MQ的對比:

RabbitMQActiveMQRocketMQKafka
公司/社區(qū)RabbitApache阿里Apache
開發(fā)語言ErlangJavaJavaScala&Java
協(xié)議支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定義協(xié)議自定義協(xié)議
可用性一般
單機(jī)吞吐量一般非常高
消息延遲微秒級毫秒級毫秒級毫秒以內(nèi)
消息可靠性一般一般

追求可用性:Kafka、 RocketMQ 、RabbitMQ

追求可靠性:RabbitMQ、RocketMQ

追求吞吐能力:RocketMQ、Kafka

追求消息低延遲:RabbitMQ、Kafka

RabbitMQ消息模型

RabbitMQ官方提供了7個(gè)不同的Demo示例,對應(yīng)了不同的消息模型:

RabbitMQ Tutorials — RabbitMQ

SpringAMQP

AMQP,(Advanced Message Queuing Protocol),是用于在應(yīng)用程序之間傳遞業(yè)務(wù)消息的開放標(biāo)準(zhǔn)。該協(xié)議與語言和平臺無關(guān),更符合服務(wù)中獨(dú)立性的要求。

SpringAMQP是基于AMQP協(xié)議定義的一套API規(guī)范,提供了模板來發(fā)送和接收消息。包含兩部分,其中spring-amqp是基礎(chǔ)抽象,spring-rabbit是底層的默認(rèn)實(shí)現(xiàn)。

SpringAmqp的官方地址:Spring AMQP

SpringAMQP提供了三個(gè)功能:

  • 自動(dòng)聲明隊(duì)列、交換機(jī)及其綁定關(guān)系
  • 基于注解的監(jiān)聽器模式,異步接收消息
  • 封裝了RabbitTemplate工具,用于發(fā)送消息

1 "HelloWorld"隊(duì)列模型

簡單隊(duì)列模式的模型圖:

  • publisher:消息發(fā)布者,將消息發(fā)送到隊(duì)列queue
  • queue:消息隊(duì)列,負(fù)責(zé)接受并緩存消息
  • consumer:訂閱隊(duì)列,處理隊(duì)列中的消息

操作步驟:

引入依賴:

spring:
  rabbitmq:
    host: 192.168.150.101 # 主機(jī)名
    port: 5672 # 端口
    virtual-host: / # 虛擬主機(jī)
    username: ddddddd # 用戶名
    password: 123321 # 密碼

添加配置:

在publisher服務(wù)的application.yml中添加配置:

spring:
  rabbitmq:
    host: 192.168.150.101 # 主機(jī)名
    port: 5672 # 端口
    virtual-host: / # 虛擬主機(jī)
    username: ddddddd # 用戶名
    password: 123321 # 密碼

實(shí)現(xiàn)消息發(fā)送

依賴注入RabbitTemplate,調(diào)用convertAndSend方法。

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSimpleQueue() {
        // 隊(duì)列名稱
        String queueName = "simple.queue";
        // 消息
        String message = "hello, spring amqp!";
        // 發(fā)送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

實(shí)現(xiàn)消息接收

在consumer模塊中也需要引入與上面相同的依賴和寫入配置

1,引入依賴

2,寫入配置

3,編寫消息接收類:

兩個(gè)注解: @Component            @RabbitListener

@Component
public class SpringRabbitListener {
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException {
        System.out.println("spring 消費(fèi)者接收到消息:【" + msg + "】");
    }
}

消息一旦消費(fèi)就會(huì)從隊(duì)列刪除,RabbitMQ沒有消息回溯功能。

2 "WorkQueues"隊(duì)列模型

Work queues,也被稱為(Task queues),任務(wù)模型。簡單來說就是讓多個(gè)消費(fèi)者綁定到一個(gè)隊(duì)列,共同消費(fèi)隊(duì)列中的消息。

當(dāng)消息處理比較耗時(shí)的時(shí)候,可能生產(chǎn)消息的速度會(huì)遠(yuǎn)遠(yuǎn)大于消息的消費(fèi)速度。長此以往,消息就會(huì)堆積越來越多,無法及時(shí)處理。

此時(shí)就可以使用work 模型,多個(gè)消費(fèi)者共同處理消息處理,速度就能大大提高了。

  • 多個(gè)消費(fèi)者綁定到一個(gè)隊(duì)列,同一條消息只會(huì)被一個(gè)消費(fèi)者處理
  • 通過設(shè)置prefetch來控制消費(fèi)者預(yù)取的消息數(shù)量

消息發(fā)送

循環(huán)發(fā)送,模擬大量消息堆積現(xiàn)象。

在publisher服務(wù)中的SpringAmqpTest類中添加一個(gè)測試方法:

/**
     * workQueue
     * 向隊(duì)列中不停發(fā)送消息,模擬消息堆積。
     */
@Test
public void testWorkQueue() throws InterruptedException {
    // 隊(duì)列名稱
    String queueName = "simple.queue";
    // 消息
    String message = "hello, message_";
    for (int i = 0; i < 50; i++) {
        // 發(fā)送消息
        rabbitTemplate.convertAndSend(queueName, message + i);
        Thread.sleep(20);
    }
}

消息接收

要模擬多個(gè)消費(fèi)者綁定同一個(gè)隊(duì)列,我們在consumer服務(wù)的SpringRabbitListener中添加2個(gè)新的方法:

默認(rèn)情況下,消息是平均分配給每個(gè)消費(fèi)者,叫做消息預(yù)取。并沒有考慮到消費(fèi)者的處理能力。這樣顯然是有問題的。

要解決這個(gè)問題:

能者多勞

在消費(fèi)者中配置yml:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能獲取一條消息,處理完成才能獲取下一個(gè)消息

3 "Publish/Subscribe"隊(duì)列模型

發(fā)布訂閱的模型如圖:

發(fā)布訂閱 模式與之前案例的區(qū)別就是允許將同一消息發(fā)給多個(gè)消費(fèi)者。實(shí)現(xiàn)方式是加入了exchange(交換機(jī))

可以看到,在訂閱模型中,多了一個(gè)exchange角色,而且過程略有變化:

  • Publisher:生產(chǎn)者,也就是要發(fā)送消息的程序,但是不再發(fā)送到隊(duì)列中,而是發(fā)給Exchange(交換機(jī))
  • Exchange:交換機(jī),圖中的exchange。一方面,接收生產(chǎn)者發(fā)送的消息。另一方面,知道如何處理消息,例如遞交給某個(gè)特別隊(duì)列、遞交給所有隊(duì)列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。Exchange有以下3種類型:
    • Fanout:廣播,將消息交給所有綁定到交換機(jī)的隊(duì)列
    • Direct:定向,把消息交給符合指定routing key 的隊(duì)列
    • Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊(duì)列
  • Consumer:消費(fèi)者,與以前一樣,訂閱隊(duì)列,沒有變化
  • Queue:消息隊(duì)列也與以前一樣,接收消息、緩存消息。

Exchange(交換機(jī))只負(fù)責(zé)轉(zhuǎn)發(fā)消息,不具備存儲消息的能力,因此如果沒有任何隊(duì)列與Exchange綁定,或者沒有符合路由規(guī)則的隊(duì)列,那么消息會(huì)丟失!

三種交換機(jī)的使用:

Fanout:廣播

在廣播模式下,消息發(fā)送流程是這樣的:

  • 1) 可以有多個(gè)隊(duì)列
  • 2) 每個(gè)隊(duì)列都要綁定到Exchange(交換機(jī))
  • 3) 生產(chǎn)者發(fā)送的消息,只能發(fā)送到交換機(jī),交換機(jī)來決定要發(fā)給哪個(gè)隊(duì)列,生產(chǎn)者無法決定
  • 4) 交換機(jī)把消息發(fā)送給綁定過的所有隊(duì)列
  • 5) 訂閱隊(duì)列的消費(fèi)者都能拿到消息

下面操作:

  • 創(chuàng)建一個(gè)交換機(jī) itcast.fanout,類型是Fanout
  • 創(chuàng)建兩個(gè)隊(duì)列fanout.queue1和fanout.queue2,綁定到交換機(jī)itcast.fanout

步驟:

基于@bean聲明隊(duì)列和交換機(jī)(下面有基于注解的方式)

Consumer中創(chuàng)建一個(gè)配置類,聲明交換機(jī)和隊(duì)列

package cn.itcast.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConfig {
    /**
     * 聲明交換機(jī)
     * @return Fanout類型交換機(jī)
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("itcast.fanout");
    }
    /**
     * 第1個(gè)隊(duì)列
     */
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }
    /**
     * 綁定隊(duì)列和交換機(jī)
     */
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
    /**
     * 第2個(gè)隊(duì)列
     */
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }
    /**
     * 綁定隊(duì)列和交換機(jī)
     */
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

消息接收

在consumer服務(wù)的SpringRabbitListener中添加兩個(gè)方法,作為消費(fèi)者:

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
    System.out.println("消費(fèi)者1接收到Fanout消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
    System.out.println("消費(fèi)者2接收到Fanout消息:【" + msg + "】");
}

消息發(fā)送:

在publisher服務(wù)的SpringAmqpTest類中添加測試方法:

這里convertAndSend的參數(shù)是交換機(jī)的名稱而不是隊(duì)列名稱了。

@Test
public void testFanoutExchange() {
    // 隊(duì)列名稱
    String exchangeName = "itcast.fanout";
    // 消息
    String message = "hello, everyone!";
    rabbitTemplate.convertAndSend(exchangeName, "", message);
}

Direct:定向

在Fanout模式中,一條消息,會(huì)被所有訂閱的隊(duì)列都消費(fèi)。但是,在某些場景下,我們希望不同的消息被不同的隊(duì)列消費(fèi)。這時(shí)就要用到Direct類型的Exchange。

在Direct模型下:

  • 隊(duì)列與交換機(jī)的綁定,不能是任意綁定了,而是要指定一個(gè)RoutingKey(路由key)
  • 消息的發(fā)送方在 向 Exchange發(fā)送消息時(shí),也必須指定消息的 RoutingKey。
  • Exchange不再把消息交給每一個(gè)綁定的隊(duì)列,而是根據(jù)消息的Routing Key進(jìn)行判斷,只有隊(duì)列的Routingkey與消息的 Routing key完全一致,才會(huì)接收到消息

案例需求如下

  • 利用@RabbitListener聲明Exchange、Queue、RoutingKey
  • 在consumer服務(wù)中,編寫兩個(gè)消費(fèi)者方法,分別監(jiān)聽direct.queue1和direct.queue2
  • 在publisher中編寫測試方法,向itcast. direct發(fā)送消息

基于注解聲明隊(duì)列和交換機(jī)

基于@Bean的方式聲明隊(duì)列和交換機(jī)比較麻煩,Spring還提供了基于注解方式來聲明。

在consumer的SpringRabbitListener中添加兩個(gè)消費(fèi)者,同時(shí)基于注解來聲明隊(duì)列和交換機(jī):

添加消費(fèi)者 的同時(shí)聲明了隊(duì)列和交換機(jī)

添加兩個(gè)消費(fèi)者 的同時(shí)聲明了隊(duì)列和交換機(jī):

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),
    exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消費(fèi)者接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue2"),
    exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
    System.out.println("消費(fèi)者接收到direct.queue2的消息:【" + msg + "】");
}

消息發(fā)送

交換機(jī)會(huì)根據(jù)RoutingKey去發(fā)送給對應(yīng)的隊(duì)列。

@Test
public void testSendDirectExchange() {
    // 交換機(jī)名稱
    String exchangeName = "itcast.direct";
    // 消息
    String message = "紅色警報(bào)!日本亂排核廢水,導(dǎo)致海洋生物變異,驚現(xiàn)哥斯拉!";
    // 發(fā)送消息
    rabbitTemplate.convertAndSend(exchangeName, "red", message);
}

Topic:通配符

`Topic`類型的`Exchange`與`Direct`相比,都是可以根據(jù)`RoutingKey`把消息路由到不同的隊(duì)列。只不過`Topic`類型`Exchange`可以讓隊(duì)列在綁定`Routing key` 的時(shí)候使用通配符!

Routingkey 一般都是有一個(gè)或多個(gè)單詞組成,多個(gè)單詞之間以”.”分割,例如: item.insert

通配符規(guī)則:

  • #:匹配一個(gè)或多個(gè)詞
  • *:匹配不多不少恰好1個(gè)詞

舉例:

  • item.#:能夠匹配item.spu.insert 或者 item.spu
  • item.*:只能匹配item.spu

圖示:

解釋:

  • Queue1:綁定的是china.# ,因此凡是以 china.開頭的routing key 都會(huì)被匹配到。包括china.news和china.weather
  • Queue2:綁定的是#.news ,因此凡是以 .news結(jié)尾的 routing key 都會(huì)被匹配。包括china.news和japan.news

案例需求:

實(shí)現(xiàn)思路如下:

  • 并利用@RabbitListener聲明Exchange、Queue、RoutingKey
  • 在consumer服務(wù)中,編寫兩個(gè)消費(fèi)者方法,分別監(jiān)聽topic.queue1和topic.queue2
  • 在publisher中編寫測試方法,向itcast. topic發(fā)送消息

消息接收

在consumer服務(wù)的SpringRabbitListener中添加方法:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue1"),
    exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
    key = "china.#"
))
public void listenTopicQueue1(String msg){
    System.out.println("消費(fèi)者接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue2"),
    exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
    key = "#.news"
))
public void listenTopicQueue2(String msg){
    System.out.println("消費(fèi)者接收到topic.queue2的消息:【" + msg + "】");
}

消息發(fā)送

/**
     * topicExchange
     */
@Test
public void testSendTopicExchange() {
    // 交換機(jī)名稱
    String exchangeName = "itcast.topic";
    // 消息
    String message = "喜報(bào)!孫悟空大戰(zhàn)哥斯拉,勝!";
    // 發(fā)送消息
    rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

描述下Direct交換機(jī)與Topic交換機(jī)的差異?

  • Topic交換機(jī)接收的消息RoutingKey必須是多個(gè)單詞,以 **.** 分割
  • Topic交換機(jī)與隊(duì)列綁定時(shí)的bindingKey可以指定通配符
  • #:代表0個(gè)或多個(gè)詞
  • *:代表1個(gè)詞

消息轉(zhuǎn)換器

1,測試發(fā)送Object類型消息

在SpringAMQP的發(fā)送方法中,接收消息的類型是Object,也就是說我們可以發(fā)送任意對象類型的消息,SpringAMQP會(huì)幫我們序列化為字節(jié)后發(fā)送。

驗(yàn)證:

在consumer中利用@bean聲明一個(gè)隊(duì)列:

在Publisher中測試類中發(fā)送一個(gè)集合類型的消息:

發(fā)現(xiàn)發(fā)送的消息被序列化了:

解決(加個(gè)依賴,加個(gè)bean)

配置JSON轉(zhuǎn)換器

顯然,JDK序列化方式并不合適。我們希望消息體的體積更小、可讀性更高,因此可以使用JSON方式來做序列化和反序列化。

在publisher和consumer兩個(gè)服務(wù)中都引入依賴:

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

在啟動(dòng)類中添加一個(gè)Bean即可:

@Bean
public MessageConverter jsonMessageConverter(){
    return new Jackson2JsonMessageConverter();
}

再重新發(fā)送消息,會(huì)發(fā)現(xiàn)是json格式了:

2,接收消息:

發(fā)送消息的類型怎么寫,接收消息的類型也怎么寫:

到此這篇關(guān)于SpringAMQP的使用方式的文章就介紹到這了,更多相關(guān)SpringAMQP使用內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評論