亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Java中RabbitMQ消息隊列的交換機詳解

 更新時間:2023年07月31日 10:15:09   作者:迷鹿小女子  
這篇文章主要介紹了Java中的RabbitMQ交換機詳解,消息隊列是指利用高效可靠的消息傳遞機制進行與平臺無關(guān)的數(shù)據(jù)交流,并基于數(shù)據(jù)通信來進行分布式系統(tǒng)的集成,是在消息的傳輸過程中保存消息的容器,需要的朋友可以參考下

RabbitMQ交換機

在這里插入圖片描述

交換機屬性

  • Name:交換機名稱
  • Type:交換機類型 direct、topic、fanout、headers
  • Durability:是否需要持久化,true為持久化
  • Auto Delete:當(dāng)最后一個綁定到Exchange上的隊列刪除后,自動刪除該Exchange
  • Internal:當(dāng)前Exchange是否用于RabbitMQ內(nèi)部使用,默認(rèn)為False
  • Arguments:擴展參數(shù),用于擴展AMQP協(xié)議,定制化使用

直流交換機

直連交換機Direct Exchange(完全匹配路由key)

所有發(fā)送到Direct Exchange的消息會被轉(zhuǎn)發(fā)到RouteKey中指定的Queue

注意:Direct模式可以使用RabbitMQ自帶的Exchange:default Exchange,所以不需要將Exchange進行任何綁定(binding)操作,消息傳遞時,RouteKey必須完全匹配才會被隊列接收,否則該消息會被拋棄;

在這里插入圖片描述

消費端代碼

package com.xieminglu.rabbitmqapi.exchange.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Consumer4DirectExchange {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory() ;
        connectionFactory.setHost("192.168.248.134");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //4 聲明
        String exchangeName = "test_direct_exchange";
        String exchangeType = "direct";
        String queueName = "test_direct_queue";
        String routingKey = "test.direct";
        //表示聲明了一個交換機
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        //表示聲明了一個隊列
        channel.queueDeclare(queueName, false, false, false, null);
        //建立一個綁定關(guān)系:
        channel.queueBind(queueName, exchangeName, routingKey);
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //參數(shù):隊列名稱、是否自動ACK、Consumer
        channel.basicConsume(queueName, true, consumer);
        //循環(huán)獲取消息
        while(true){
            //獲取消息,如果沒有消息,這一步將會一直阻塞
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("收到消息:" + msg);
        }
    }
}

生產(chǎn)端代碼

package com.xieminglu.rabbitmqapi.exchange.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * @author 小李飛刀
 * @site www.javaxl.com
 * @company
 * @create  2019-11-18 10:22
 */
public class Producer4DirectExchange {
    public static void main(String[] args) throws Exception {
        //1 創(chuàng)建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.248.134");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //2 創(chuàng)建Connection
        Connection connection = connectionFactory.newConnection();
        //3 創(chuàng)建Channel
        Channel channel = connection.createChannel();
        //4 聲明
        String exchangeName = "test_direct_exchange";
        String routingKey = "test.direct";
//        String routingKey = "test.direct111"; //收不到
        //5 發(fā)送
        String msg = "Hello World RabbitMQ 4  Direct Exchange Message 111 ... ";
        channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());
    }
}

代碼的區(qū)別: 一條消息只會發(fā)送在一個隊列里

在這里插入圖片描述

創(chuàng)建一個交換機與隊列

在這里插入圖片描述

所綁定的交換機

在這里插入圖片描述

控制臺輸出

在這里插入圖片描述

主題交換機

主題交換機Topic Exchange(匹配路由規(guī)則的交換機)

所有發(fā)送到Topic Exchange的消息被轉(zhuǎn)發(fā)到所有關(guān)系RouteKey中指定Topic的Queue上;

Exchange將RouteKey和某Topic進行模糊匹配,此時隊列需要綁定一個Topic;

注意:可以使用通配符進行模糊匹配

  • 符號:“#” 匹配一個或者多個詞
  • 符號:“” 匹配不多不少一個詞

列如:

  • “log.#” 能夠匹配到 “log.info.oa”
  • “log.” 能夠匹配到 “log.err”

在這里插入圖片描述

消費端代碼

package com.xieminglu.rabbitmqapi.exchange.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Consumer4TopicExchange {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory() ;
        connectionFactory.setHost("192.168.248.134");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //4 聲明
        String exchangeName = "test_topic_exchange";
        String exchangeType = "topic";
        String queueName = "test_topic_queue";
        String routingKey = "user.#";
//        String routingKey = "user.*";
        // 1 聲明交換機
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        // 2 聲明隊列
        channel.queueDeclare(queueName, false, false, false, null);
        // 3 建立交換機和隊列的綁定關(guān)系:
        channel.queueBind(queueName, exchangeName, routingKey);
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //參數(shù):隊列名稱、是否自動ACK、Consumer
        channel.basicConsume(queueName, true, consumer);
        //循環(huán)獲取消息
        while(true){
            //獲取消息,如果沒有消息,這一步將會一直阻塞
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("收到消息:" + msg);
        }
    }
}

生產(chǎn)端代碼

package com.xieminglu.rabbitmqapi.exchange.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer4TopicExchange {
    public static void main(String[] args) throws Exception {
        //1 創(chuàng)建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.248.134");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //2 創(chuàng)建Connection
        Connection connection = connectionFactory.newConnection();
        //3 創(chuàng)建Channel
        Channel channel = connection.createChannel();
        //4 聲明
        String exchangeName = "test_topic_exchange";
        String routingKey1 = "user.save";
        String routingKey2 = "user.update";
        String routingKey3 = "user.delete.abc";
        //5 發(fā)送
        String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
        channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());
        channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());
        channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes());
        channel.close();
        connection.close();
    }
}

代碼的區(qū)別: 一條消息會發(fā)送在多個隊列里 消費端:

在這里插入圖片描述

生產(chǎn)端:

在這里插入圖片描述

控制臺輸出

在這里插入圖片描述

并且可以同時綁定多個交換機

在這里插入圖片描述

輸出交換機

輸出交換機Fanout Exchange(不做路由)

  • 不處理路由鍵,只需要簡單的將隊列綁定到交換機上;
  • 發(fā)送到交換機的消息都會被轉(zhuǎn)發(fā)到與該交換機綁定的所有隊列上;
  • Fanout交換機轉(zhuǎn)發(fā)消息是最快的

在這里插入圖片描述

消費端代碼

package com.xieminglu.rabbitmqapi.exchange.fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Consumer4FanoutExchange {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory() ;
        connectionFactory.setHost("192.168.248.134");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //4 聲明
        String exchangeName = "test_fanout_exchange";
        String exchangeType = "fanout";
        String queueName = "test_fanout_queue";
        String routingKey = "";    //不設(shè)置路由鍵
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //參數(shù):隊列名稱、是否自動ACK、Consumer
        channel.basicConsume(queueName, true, consumer);
        //循環(huán)獲取消息
        while(true){
            //獲取消息,如果沒有消息,這一步將會一直阻塞
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("收到消息:" + msg);
        }
    }
}

生產(chǎn)端代碼

package com.xieminglu.rabbitmqapi.exchange.fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer4FanoutExchange {
    public static void main(String[] args) throws Exception {
        //1 創(chuàng)建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.248.134");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //2 創(chuàng)建Connection
        Connection connection = connectionFactory.newConnection();
        //3 創(chuàng)建Channel
        Channel channel = connection.createChannel();
        //4 聲明
        String exchangeName = "test_fanout_exchange";
        //5 發(fā)送
        for(int i = 0; i < 10; i ++) {
            String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ...";
            channel.basicPublish(exchangeName, "", null , msg.getBytes());
        }
        channel.close();
        connection.close();
    }
}

消費端:

在這里插入圖片描述

生產(chǎn)端:

在這里插入圖片描述

在這里插入圖片描述

控制臺輸出

在這里插入圖片描述

在這里插入圖片描述

在這里插入圖片描述

Binding-綁定

  • Exchange和Exchange、Queue之間的連接關(guān)系;
  • Binding中可以包含RoutingKey或者參數(shù)

Queue-消息隊列

  • 消息隊列,實際存儲消息數(shù)據(jù)
  • Durability:是否持久化
  • Durable:是,Transient:否
  • Auto delete:如選yes,代表當(dāng)最后一個監(jiān)聽被移除之后,該Queue會自動被刪除

Message-消息

  • 服務(wù)器和應(yīng)用程序之間傳遞的數(shù)據(jù)
  • 本質(zhì)上就是一段數(shù)據(jù),由Properties和Payload(Body)組成
  • 常用屬性:delivery model、headers(自定義屬性)

Message-其他屬性

  • content_type、content_encoding、priority
  • correlation_id、reply_to、expiration、message_id
  • Timestamp、type、user_id、app_id、cluster_id

Virtual host-虛擬主機

  • 虛擬地址,用于進行邏輯隔離,最上層的消息路由
  • 一個Virtual Host里面可以有若干個Exchange和Queue
  • 同一個Virtual Host里面不能有相同名稱的Exchange或Queue

總結(jié)一下

交換機的概念

在沒有交換機的時候,我們的消息隊列會處理所有發(fā)給這個消息隊列的消息,然后由消費者一個一個消費這個隊列里面的消息,如果由集群的話還會分?jǐn)倢@個消息隊列的處理。只不過這里面有一個

Message acknowledgment的概念

這將會導(dǎo)致嚴(yán)重的bug——Queue中堆積的消息會越來越多

在這里插入圖片描述

當(dāng)然一般的消息中間件都不會這么干,我們使用了交換機后,我們看到我們的三種策略,其實都可以說由交換機去找跟它所綁定的消息隊列,如果生產(chǎn)端的路由鍵不符合要求或找不到消息隊列定好的路由鍵的話就會進行其他處理。

在這里插入圖片描述

到此這篇關(guān)于Java中RabbitMQ消息隊列的交換機詳解的文章就介紹到這了,更多相關(guān)RabbitMQ交換機內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評論