亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

java中brew安裝rabbitmq以及簡單實(shí)例

 更新時(shí)間:2024年10月12日 10:57:42   作者:Lvan的前端筆記  
RabbitMQ是基于AMQP協(xié)議,由Erlang語言開發(fā)的開源消息隊(duì)列系統(tǒng),廣泛應(yīng)用于分布式系統(tǒng)中,用于應(yīng)用程序間的消息傳遞,它支持多種交換機(jī)類型,如直連交換機(jī)、扇形交換機(jī)和主題交換機(jī)等,能夠滿足不同的消息路由需求

什么是消息隊(duì)列mq

可以看我之前寫的這篇 消息隊(duì)列MQ

rabbitmq簡介

RabbitMQ是由erlang語言開發(fā),基于AMQP(Advanced Message Queue 高級消息隊(duì)列協(xié)議)協(xié)議實(shí)現(xiàn)的消息隊(duì)列,它是一種應(yīng)用程序之間的通信方法,消息隊(duì)列在分布式系統(tǒng)開發(fā)中應(yīng)用非常廣泛。

brew安裝rabbitmq

1、安裝

官方文檔

brew install rabbitmq

這是安裝的 log

Management UI: http://localhost:15672
Homebrew-specific docs: https://rabbitmq.com/install-homebrew.html

To start rabbitmq now and restart at login:
  brew services start rabbitmq
Or, if you don't want/need a background service you can just run:
  CONF_ENV_FILE="/opt/homebrew/etc/rabbitmq/rabbitmq-env.conf" /opt/homebrew/opt/rabbitmq/sbin/rabbitmq-server

2、打開 http://localhost:15672

默認(rèn)的用戶名密碼都是 guest,登錄后可以在 Admin 那一列菜單內(nèi)添加自己的用戶

權(quán)限

  • 超級管理員(administrator)

可登陸管理控制臺,可查看所有的信息,并且可以對用戶,策略(policy)進(jìn)行操作。

  • 監(jiān)控者(monitoring)

可登陸管理控制臺,同時(shí)可以查看rabbitmq節(jié)點(diǎn)的相關(guān)信息(進(jìn)程數(shù),內(nèi)存使用情況,磁盤使用情況等)

  • 策略制定者(policymaker)

可登陸管理控制臺, 同時(shí)可以對policy進(jìn)行管理。但無法查看節(jié)點(diǎn)的相關(guān)信息(上圖紅框標(biāo)識的部分)。

  • 普通管理者(management)

僅可登陸管理控制臺,無法看到節(jié)點(diǎn)信息,也無法對策略進(jìn)行管理。

  • 其他

無法登陸管理控制臺,通常就是普通的生產(chǎn)者和消費(fèi)者。

創(chuàng)建虛擬主機(jī)(Virtual Hosts)

為了讓各個(gè)用戶可以互不干擾的工作,RabbitMQ添加了虛擬主機(jī)(Virtual Hosts)的概念。

其實(shí)就是一個(gè)獨(dú)立的訪問路徑,不同用戶使用不同路徑,各自有自己的隊(duì)列、交換機(jī),互相不會影響。

交換機(jī)類型

1. 直連交換機(jī)(Direct Exchange)

特點(diǎn):

  • 基于路由鍵(Routing Key)的完全匹配來路由消息。

應(yīng)用場景:

  • 當(dāng)消息需要明確指定到某個(gè)隊(duì)列時(shí),使用直連交換機(jī)。

舉例說明:

  • 假設(shè)有一個(gè)訂單處理系統(tǒng),其中包含“訂單處理”和“訂單審核”兩個(gè)隊(duì)列。
  • 生產(chǎn)者發(fā)送一個(gè)消息時(shí),指定路由鍵為“order.process”,那么只有綁定了該路由鍵的“訂單處理”隊(duì)列會收到消息。
  • 如果指定路由鍵為“order.review”,則只有“訂單審核”隊(duì)列會收到消息。

2. 扇形交換機(jī)(Fanout Exchange)

特點(diǎn):

  • 將消息廣播到所有綁定到它的隊(duì)列,忽略路由鍵。
  • 如果N個(gè)隊(duì)列綁定到某個(gè)扇型交換機(jī)上,當(dāng)有消息發(fā)送給此扇型交換機(jī)時(shí),交換機(jī)會將消息的發(fā)送給這所有的N個(gè)隊(duì)列

應(yīng)用場景:

  • 當(dāng)需要將消息發(fā)送給多個(gè)隊(duì)列,且每個(gè)隊(duì)列都需要接收到相同消息的副本時(shí),使用扇出交換機(jī)。

舉例說明:

  • 假設(shè)有一個(gè)實(shí)時(shí)新聞發(fā)布系統(tǒng),包含“新聞訂閱者A”和“新聞訂閱者B”兩個(gè)隊(duì)列。
  • 生產(chǎn)者發(fā)布一條新聞消息到扇出交換機(jī),那么這條消息將被同時(shí)發(fā)送到“新聞訂閱者A”和“新聞訂閱者B”兩個(gè)隊(duì)列,實(shí)現(xiàn)廣播效果。

3. 主題交換機(jī)(Topic Exchange)

特點(diǎn):

  • 基于路由鍵和通配符(*和#)的模糊匹配來路由消息。

應(yīng)用場景:

  • 當(dāng)需要根據(jù)消息的不同類型或?qū)傩詫⑵渎酚傻讲煌年?duì)列時(shí),使用主題交換機(jī)。

舉例說明:

  • 假設(shè)有一個(gè)日志系統(tǒng),包含“error.log”、“info.log”和“debug.log”三個(gè)隊(duì)列。
  • 生產(chǎn)者發(fā)送一條錯(cuò)誤日志消息,路由鍵為“system.error”。
  • 此時(shí),綁定了“*.error”或“system.#”的隊(duì)列將接收到這條消息,因此“error.log”隊(duì)列會收到它,而“info.log”和“debug.log”則不會(除非它們也綁定了匹配的路由鍵)。

* (星號) 用來表示一個(gè)單詞 (必須出現(xiàn)的)

# (井號) 用來表示任意數(shù)量(零個(gè)或多個(gè))單詞

通配的綁定鍵是跟隊(duì)列進(jìn)行綁定的,舉個(gè)小例子

  • 隊(duì)列Q1 綁定鍵為*.TT.* 隊(duì)列Q2綁定鍵為 TT.#
  • 如果一條消息攜帶的路由鍵為 A.TT.B,那么隊(duì)列Q1將會收到;
  • 如果一條消息攜帶的路由鍵為TT.AA.BB,那么隊(duì)列Q2將會收到;

4. 頭部交換機(jī)(Headers Exchange)

特點(diǎn):

  • 基于消息的頭部信息進(jìn)行匹配,而不是路由鍵。

應(yīng)用場景:

  • 當(dāng)需要根據(jù)消息的復(fù)雜屬性進(jìn)行路由時(shí),使用頭部交換機(jī)。
  • 這種交換機(jī)類型使用較少,因?yàn)樗枰鼜?fù)雜的匹配邏輯。

舉例說明(假設(shè)場景):

  • 假設(shè)有一個(gè)消息處理系統(tǒng),其中消息具有多個(gè)頭部屬性,如“priority”(優(yōu)先級)、“type”(類型)等。
  • 生產(chǎn)者發(fā)送一條高優(yōu)先級的緊急消息,并設(shè)置相應(yīng)的頭部屬性。
  • 此時(shí),只有那些設(shè)置了匹配這些頭部屬性(如“priority=high”)的隊(duì)列才會接收到這條消息。

RabbitMQ的工作原理

組成部分說明:

  • Broker:消息隊(duì)列服務(wù)進(jìn)程,此進(jìn)程包括兩個(gè)部分:Exchange和Queue
  • Exchange:消息隊(duì)列交換機(jī),按一定的規(guī)則將消息路由轉(zhuǎn)發(fā)到某個(gè)隊(duì)列,對消息進(jìn)行過慮。
  • Queue:消息隊(duì)列,存儲消息的隊(duì)列,消息到達(dá)隊(duì)列并轉(zhuǎn)發(fā)給指定的消費(fèi)者
  • Producer:消息生產(chǎn)者,即生產(chǎn)方客戶端,生產(chǎn)方客戶端將消息發(fā)送
  • Consumer:消息消費(fèi)者,即消費(fèi)方客戶端,接收MQ轉(zhuǎn)發(fā)的消息。

生產(chǎn)者發(fā)送消息流程:

  • 1、生產(chǎn)者和Broker建立TCP連接。
  • 2、生產(chǎn)者和Broker建立通道。
  • 3、生產(chǎn)者通過通道消息發(fā)送給Broker,由Exchange將消息進(jìn)行轉(zhuǎn)發(fā)。
  • 4、Exchange將消息轉(zhuǎn)發(fā)到指定的Queue(隊(duì)列)

消費(fèi)者接收消息流程:

  • 1、消費(fèi)者和Broker建立TCP連接
  • 2、消費(fèi)者和Broker建立通道
  • 3、消費(fèi)者監(jiān)聽指定的Queue(隊(duì)列)
  • 4、當(dāng)有消息到達(dá)Queue時(shí)Broker默認(rèn)將消息推送給消費(fèi)者。
  • 5、消費(fèi)者接收到消息。
  • 6、ack回復(fù)

使用

基本配置

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring.application.name=zy-rabbitmq

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=xxx
spring.rabbitmq.password=xxx
#虛擬host 可以不設(shè)置,使用server默認(rèn)host
spring.rabbitmq.virtual-host: JCcccHost

直連交換機(jī)示例

創(chuàng)建一個(gè)配置文件

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
@Configuration
public class DirectRabbitConfig {
 
    //隊(duì)列 起名:TestDirectQueue
    @Bean
    public Queue TestDirectQueue() {
        // durable:是否持久化,默認(rèn)是false,持久化隊(duì)列:會被存儲在磁盤上,當(dāng)消息代理重啟時(shí)仍然存在,暫存隊(duì)列:當(dāng)前連接有效
        // exclusive:默認(rèn)也是false,只能被當(dāng)前創(chuàng)建的連接使用,而且當(dāng)連接關(guān)閉后隊(duì)列即被刪除。此參考優(yōu)先級高于durable
        // autoDelete:是否自動刪除,當(dāng)沒有生產(chǎn)者或者消費(fèi)者使用此隊(duì)列,該隊(duì)列會自動刪除。
        // return new Queue("TestDirectQueue",true,true,false);
 
        //一般設(shè)置一下隊(duì)列的持久化就好,其余兩個(gè)就是默認(rèn)false
        return new Queue("TestDirectQueue",true);
    }
 
    //Direct交換機(jī) 起名:TestDirectExchange
    @Bean
    DirectExchange TestDirectExchange() {
      //  return new DirectExchange("TestDirectExchange",true,true);
        return new DirectExchange("TestDirectExchange",true,false);
    }
 
    //綁定  
    //將隊(duì)列和交換機(jī)綁定, 并設(shè)置用于匹配鍵:TestDirectRouting
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
    }
     
    @Bean
    DirectExchange lonelyDirectExchange() {
        return new DirectExchange("lonelyDirectExchange");
    }
}

寫個(gè)簡單的接口進(jìn)行消息推送(根據(jù)需求也可以改為定時(shí)任務(wù)等等,具體看需求)

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

@RestController
public class SendMessageController {

    //使用RabbitTemplate,這提供了接收/發(fā)送等等方法
    @Autowired
    RabbitTemplate rabbitTemplate;  
 
    @GetMapping("/sendDirectMessage")
    public String sendDirectMessage() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "test message, hello!";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String,Object> map=new HashMap<>();
        map.put("messageId",messageId);
        map.put("messageData",messageData);
        map.put("createTime",createTime);
        //將消息攜帶綁定鍵值:TestDirectRouting 發(fā)送到交換機(jī)TestDirectExchange
        rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);
        return "ok";
    }
}

可以看到消息已經(jīng)推送到rabbitMq服務(wù)器上面了。接下來要消費(fèi)這個(gè)消息了。

一般來說要開啟另一個(gè)服務(wù)來消費(fèi)了,但是我們是練習(xí)就在當(dāng)前服務(wù)進(jìn)行消費(fèi)。

然后是創(chuàng)建消息接收監(jiān)聽類

@Component
//監(jiān)聽的隊(duì)列名稱 TestDirectQueue
@RabbitListener(queues = "TestDirectQueue")
public class DirectReceiver {
 
    @RabbitHandler
    public void process(Map testMessage) {
        System.out.println("DirectReceiver消費(fèi)者收到消息  : " + testMessage.toString());
    }
 
}

運(yùn)行項(xiàng)目,就把這個(gè)消息消費(fèi)掉了。

扇形交換機(jī)、主題交換機(jī)

和上面的差不多,就先不寫了

總結(jié)

以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。

相關(guān)文章

最新評論