亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

RabbitMQ簡單隊列實例及原理解析

 更新時間:2019年12月10日 09:40:07   作者:海向  
這篇文章主要介紹了RabbitMQ簡單隊列實例及原理解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下

這篇文章主要介紹了RabbitMQ簡單隊列實例及原理解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下

RabbitMQ 簡述

RabbitMQ是一個消息代理:它接受并轉發(fā)消息。 您可以將其視為郵局:當您將要把寄發(fā)的郵件投遞到郵箱中時,您可以確信Postman 先生最終會將郵件發(fā)送給收件人。 在這個比喻中,RabbitMQ是一個郵箱,郵局和郵遞員,用來接受,存儲和轉發(fā)二進制數(shù)據(jù)塊的消息。

隊列就像是在RabbitMQ中扮演郵箱的角色。 雖然消息經(jīng)過RabbitMQ和應用程序,但它們只能存儲在隊列中。 隊列只受主機的內存和磁盤限制的限制,它本質上是一個大的消息緩沖區(qū)。 許多生產者可以發(fā)送到一個隊列的消息,許多消費者可以嘗試從一個隊列接收數(shù)據(jù)。

producer即為生產者,用來產生消息發(fā)送給隊列。consumer是消費者,需要去讀隊列內的消息。producer,consumer和broker(rabbitMQ server)不必駐留在同一個主機上;確實在大多數(shù)應用程序中它們是這樣分布的。

簡單隊列

簡單隊列是最簡單的一種模式,由生產者、隊列、消費者組成。生產者將消息發(fā)送給隊列,消費者從隊列中讀取消息完成消費。

在下圖中,“P”是我們的生產者,“C”是我們的消費者。 中間的框是隊列 - RabbitMQ代表消費者的消息緩沖區(qū)。

java 方式

生產者

package com.anqi.mq.nat;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class MyProducer {
  private static final String QUEUE_NAME = "ITEM_QUEUE";

  public static void main(String[] args) throws Exception {
    //1. 創(chuàng)建一個 ConnectionFactory 并進行設置
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    factory.setVirtualHost("/");
    factory.setUsername("guest");
    factory.setPassword("guest");

    //2. 通過連接工廠來創(chuàng)建連接
    Connection connection = factory.newConnection();

    //3. 通過 Connection 來創(chuàng)建 Channel
    Channel channel = connection.createChannel();

    //實際場景中,消息多為json格式的對象
    String msg = "hello";
    //4. 發(fā)送三條數(shù)據(jù)
    for (int i = 1; i <= 3 ; i++) {
      channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
      System.out.println("Send message" + i +" : " + msg);
    }

    //5. 關閉連接
    channel.close();
    connection.close();
  }
}
  /**
   * Declare a queue
   * @param queue the name of the queue
   * @param durable true if we are declaring a durable queue (the queue will survive a server restart)
   * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
   * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
   * @param arguments other properties (construction arguments) for the queue
   * @return a declaration-confirm method to indicate the queue was successfully declared
   * @throws java.io.IOException if an error is encountered
   */
  Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;

  /**
   * Publish a message
   * @see com.rabbitmq.client.AMQP.Basic.Publish
   * @param exchange the exchange to publish the message to
   * @param routingKey the routing key
   * @param props other properties for the message - routing headers etc
   * @param body the message body
   * @throws java.io.IOException if an error is encountered
   */
  void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;


  /**
   * Start a non-nolocal, non-exclusive consumer, with
   * a server-generated consumerTag.
   * @param queue the name of the queue
   * @param autoAck true if the server should consider messages
   * acknowledged once delivered; false if the server should expect
   * explicit acknowledgements
   * @param callback an interface to the consumer object
   * @return the consumerTag generated by the server
   * @throws java.io.IOException if an error is encountered
   * @see com.rabbitmq.client.AMQP.Basic.Consume
   * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
   * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
   */
  String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

消費者

package com.anqi.mq.nat;

import com.rabbitmq.client.*;
import java.io.IOException;

public class MyConsumer {

  private static final String QUEUE_NAME = "ITEM_QUEUE";

  public static void main(String[] args) throws Exception {
    //1. 創(chuàng)建一個 ConnectionFactory 并進行設置
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    factory.setVirtualHost("/");
    factory.setUsername("guest");
    factory.setPassword("guest");

    //2. 通過連接工廠來創(chuàng)建連接
    Connection connection = factory.newConnection();

    //3. 通過 Connection 來創(chuàng)建 Channel
    Channel channel = connection.createChannel();

    //4. 聲明一個隊列
    channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    /*
      true:表示自動確認,只要消息從隊列中獲取,無論消費者獲取到消息后是否成功消費,都會認為消息已經(jīng)成功消費
      false:表示手動確認,消費者獲取消息后,服務器會將該消息標記為不可用狀態(tài),等待消費者的反饋,如果消費者一
      直沒有反饋,那么該消息將一直處于不可用狀態(tài),并且服務器會認為該消費者已經(jīng)掛掉,不會再給其發(fā)送消息,
      直到該消費者反饋。
    */

    //5. 創(chuàng)建消費者并接收消息
    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope,
                    AMQP.BasicProperties properties, byte[] body)
          throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
      }
    };

    //6. 設置 Channel 消費者綁定隊列
    channel.basicConsume(QUEUE_NAME, true, consumer);

  }
}
Send message1 : hello
Send message2 : hello
Send message3 : hello

 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'hello'
 [x] Received 'hello'
 [x] Received 'hello'

當我們啟動生產者之后查看RabbitMQ管理后臺可以看到有一條消息正在等待被消費。

當我們啟動消費者之后再次查看,可以看到積壓的一條消息已經(jīng)被消費。

總結

  • 隊列聲明queueDeclare的參數(shù):第一個參數(shù)表示隊列名稱、第二個參數(shù)為是否持久化(true表示是,隊列將在服務器重啟時生存)、第三個參數(shù)為是否是獨占隊列(創(chuàng)建者可以使用的私有隊列,斷開后自動刪除)、第四個參數(shù)為當所有消費者客戶端連接斷開時是否自動刪除隊列、第五個參數(shù)為隊列的其他參數(shù)。
  • basicConsume的第二個參數(shù)autoAck: 應答模式,true:自動應答,即消費者獲取到消息,該消息就會從隊列中刪除掉,false:手動應答,當從隊列中取出消息后,需要程序員手動調用方法應答,如果沒有應答,該消息還會再放進隊列中,就會出現(xiàn)該消息一直沒有被消費掉的現(xiàn)象。
  • 這種簡單隊列的模式,系統(tǒng)會為每個隊列隱式地綁定一個默認交換機,交換機名稱為" (AMQP default)",類型為直連 direct,當你手動創(chuàng)建一個隊列時,系統(tǒng)會自動將這個隊列綁定到一個名稱為空的 Direct 類型的交換機上,綁定的路由鍵 routing key 與隊列名稱相同,相當于channel.queueBind(queue:"QUEUE_NAME", exchange:"(AMQP default)“, routingKey:"QUEUE_NAME");雖然實例沒有顯式聲明交換機,但是當路由鍵和隊列名稱一樣時,就會將消息發(fā)送到這個默認的交換機中。這種方式比較簡單,但是無法滿足復雜的業(yè)務需求,所以通常在生產環(huán)境中很少使用這種方式。
  • The default exchange is implicitly bound to every queue, with a routing key equal to the queue name. It is not possible to explicitly bind to, or unbind from the default exchange. It also cannot be deleted.默認交換機隱式綁定到每個隊列,其中路由鍵等于隊列名稱。不可能顯式綁定到,或從缺省交換中解除綁定。它也不能被刪除。 ——引自 RabbitMQ 官方文檔

spring-amqp方式

引入 Maven 依賴

    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.6.0</version>
    </dependency>    
        <dependency>
      <groupId>org.springframework.amqp</groupId>
      <artifactId>spring-rabbit</artifactId>
      <version>2.1.5.RELEASE</version>
    </dependency>

spring 配置文件

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xsi:schemaLocation="http://www.springframework.org/schema/rabbit
      https://www.springframework.org/schema/rabbit/spring-rabbit.xsd
      http://www.springframework.org/schema/beans
      https://www.springframework.org/schema/beans/spring-beans.xsd">

  <rabbit:connection-factory id="connectionFactory" host="localhost" virtual-host="/"
  username="guest" password="guest"/>
  <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/>
  <rabbit:admin connection-factory="connectionFactory"/>
  <rabbit:queue name="MY-QUEUE"/>
</beans>

使用測試

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Main {
  public static void main(String[] args) {
    ApplicationContext app = new ClassPathXmlApplicationContext("spring/rabbit-context.xml");
    AmqpTemplate amqpTemplate = app.getBean(AmqpTemplate.class);
    amqpTemplate.convertAndSend("MY-QUEUE", "Item");
    String msg = (String) amqpTemplate.receiveAndConvert("MY-QUEUE");
    System.out.println(msg);
  }
}

參考方法

/**
 * Convert a Java object to an Amqp {@link Message} and send it to a specific exchange
 * with a specific routing key.
 *
 * @param exchange the name of the exchange
 * @param routingKey the routing key
 * @param message a message to send
 * @throws AmqpException if there is a problem
 */
void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;
/**
   * Receive a message if there is one from a specific queue and convert it to a Java
   * object. Returns immediately, possibly with a null value.
   *
   * @param queueName the name of the queue to poll
   * @return a message or null if there is none waiting
   * @throws AmqpException if there is a problem
   */
@Nullable
Object receiveAndConvert(String queueName) throws AmqpException;

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。

相關文章

  • SpringBoot整合Activiti工作流框架的使用

    SpringBoot整合Activiti工作流框架的使用

    本文主要介紹了SpringBoot整合Activiti工作流框架的使用,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-02-02
  • java實現(xiàn)簡單的小超市程序

    java實現(xiàn)簡單的小超市程序

    這篇文章主要為大家詳細介紹了java實現(xiàn)簡單的小超市程序,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2021-02-02
  • RabbitMQ消息拒絕如何解決

    RabbitMQ消息拒絕如何解決

    這篇文章主要介紹了RabbitMQ消息拒絕如何解決問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-01-01
  • javaweb頁面附件、圖片下載及打開(實現(xiàn)方法)

    javaweb頁面附件、圖片下載及打開(實現(xiàn)方法)

    下面小編就為大家?guī)硪黄猨avaweb頁面附件、圖片下載及打開(實現(xiàn)方法)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-06-06
  • Java日常練習題,每天進步一點點(15)

    Java日常練習題,每天進步一點點(15)

    下面小編就為大家?guī)硪黄狫ava基礎的幾道練習題(分享)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧,希望可以幫到你
    2021-07-07
  • SpringBoot使用AOP實現(xiàn)防重復提交功能

    SpringBoot使用AOP實現(xiàn)防重復提交功能

    這篇文章主要為大家詳細介紹了SpringBoot如何使用AOP實現(xiàn)防重復提交功能,文中的示例代碼講解詳細,感興趣的小伙伴可以跟隨小編一起學習一下
    2024-03-03
  • 簡單聊聊工作中常用的Java?Lambda表達式

    簡單聊聊工作中常用的Java?Lambda表達式

    日常開發(fā)中,我們很多時候需要用到Java?8的Lambda表達式,它允許把函數(shù)作為一個方法的參數(shù),讓我們的代碼更優(yōu)雅、更簡潔。所以整理了一波工作中常用的Lambda表達式??赐暌欢〞袔椭?/div> 2022-11-11
  • Java之Pattern.compile函數(shù)用法詳解

    Java之Pattern.compile函數(shù)用法詳解

    這篇文章主要介紹了Java之Pattern.compile函數(shù)用法詳解,本篇文章通過簡要的案例,講解了該項技術的了解與使用,以下就是詳細內容,需要的朋友可以參考下
    2021-08-08
  • 如何基于SpringMVC實現(xiàn)斷點續(xù)傳(HTTP)

    如何基于SpringMVC實現(xiàn)斷點續(xù)傳(HTTP)

    這篇文章主要介紹了如何基于SpringMVC實現(xiàn)斷點續(xù)傳(HTTP),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-01-01
  • Spring Data的Domain Event的用法詳解

    Spring Data的Domain Event的用法詳解

    這篇文章主要介紹了Spring Data的Domain Event的用法詳解,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2018-02-02

最新評論