SpringBoot整合RabbitMQ及原理
1、相關(guān)依賴
這里無需指定版本號,讓其跟著SpringBoot版本走。本示例使用SpringBoot版本號為2.7.10。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency>
2、生產(chǎn)者、消費者
創(chuàng)建兩個SpringBoot應(yīng)用,模擬消息生產(chǎn)者與消費者【publisher、consumer】。
2-1生產(chǎn)者
編寫配置文件,用戶名和密碼等自行修改 這里虛擬機的名稱是上一篇文章中新建的。
server.port=8082 #rabbitmq服務(wù)器ip spring.rabbitmq.host=localhost #rabbitmq的端口 spring.rabbitmq.port=5672 #用戶名 spring.rabbitmq.username=用戶名 #密碼 spring.rabbitmq.password=密碼 #配置虛擬機 spring.rabbitmq.virtual-host=demo
聲明交換機、隊列并綁定:
@Configuration public class RabbitMqConfig { @Bean public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter()); return rabbitTemplate; } @Bean public MessageConverter jackson2JsonMessageConverter() { return new Jackson2JsonMessageConverter(); } @Bean public DirectExchange getExchange(){ return new DirectExchange("directExchange",false,false); } @Bean public Queue getQueue(){ return new Queue("publisher.addUser",true,false,false); } @Bean public Binding getBinding(DirectExchange exchange,Queue queue){ return BindingBuilder.bind(queue).to(exchange).with("publisher.addUser"); } }
新建User實體類
@Data public class User { private Long id; private String name; private String desc; }
在方法中使用RabbitTemplate來發(fā)送消息:
public interface PublisherService { /** * 添加用戶 * @param user 用戶信息 */ void addUser(User user); }
@RequiredArgsConstructor @Service public class PublisherServiceImpl implements PublisherService{ private final RabbitTemplate rabbitTemplate; @Override public void addUser(User user) { rabbitTemplate.convertAndSend("directExchange","publisher.addUser",user); } }
以上需要注意的就是交換機的名稱、隊列名、routingKey。示例中使用的是直連交換機,routingKey需要和隊列名保持一致。不懂的可以查看上一篇文章。
controller:
@RequiredArgsConstructor @RestController @RequestMapping("/user") public class UserController { private final PublisherService publisherService; @PostMapping("/add") public void add(){ User user = new User(); user.setId(1000L); user.setName("黃忠"); user.setDesc("老兵不死,只是逐漸凋零"); publisherService.addUser(user); } }
2-2消費者
消費者的配置和生產(chǎn)者一樣,不贅述了,直接看代碼:
@Service @Slf4j public class ConsumerService { @RabbitListener(queues ="publisher.addUser") public void addUser(String userStr){ User user = JSONObject.parseObject(userStr,User.class); log.info(user.toString()); } }
@RabbitListener 注解是指定某方法作為消息消費的方法,指定隊列名稱。@RabbitListener 如果標注在類上,需配合 @RabbitHandler 注解一起使用,根據(jù)接受的參數(shù)類型進入具體的方法中。
2-3測試
消費端在啟動時可能會報找不到交換機或隊列,只需要讓生產(chǎn)者發(fā)送一次消息,從控制臺就可以看到相關(guān)的交換機和隊列等信息了。
可以看到消費者成功消費了消息:
3、消費流程
通過上述操作,我們已經(jīng)會簡單地使用RabbitMQ了,接下來了解一下它的整個流程。如此可以讓我們掌握的更牢固。
生產(chǎn)者:
- 生產(chǎn)者連接到Message Broker【也就是RabbitMQ服務(wù)】,建立一個連接( Connection)開啟一個信道(Channel)。
- 生產(chǎn)者聲明一個交換機,并設(shè)置相關(guān)屬性,比如交換機類型、是否持久化等。
- 生產(chǎn)者聲明一個隊列并設(shè)置相關(guān)屬性。
- 生產(chǎn)者通過路由鍵【Routing Key】將交換機和隊列綁定。
- 生產(chǎn)者發(fā)送消息至RabbitMQ Broker,其中包含路由鍵、交換器等信息。
- 相應(yīng)的交換機根據(jù)接收到的路由鍵查找相匹配的隊列。
- 如果找到,則將從生產(chǎn)者發(fā)送過來的消息存入相應(yīng)的隊列中。
- 如果沒有找到,則根據(jù)生產(chǎn)者配置的屬性選擇丟棄還是回退給生產(chǎn)者
- 關(guān)閉信道。
- 關(guān)閉連接。
消費者:
- 消費者連接到RabbitMQ Broker ,建立一個連接(Connection),開啟一個信道(Channel) 。
- 消費者向RabbitMQ Broker 請求消費相應(yīng)隊列中的消息,可能會設(shè)置相應(yīng)的回調(diào)函數(shù),
- 等待RabbitMQ Broker 回應(yīng)并投遞相應(yīng)隊列中的消息,消費者接收消息。
- 消費者確認(ack) 接收到的消息。
- RabbitMQ 從隊列中刪除相應(yīng)己經(jīng)被確認的消息。
- 關(guān)閉信道。
- 關(guān)閉連接。
到此這篇關(guān)于SpringBoot整合RabbitMQ及其原理分析的文章就介紹到這了,更多相關(guān)SpringBoot整合RabbitMQ內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot實現(xiàn)在一個模塊中引入另一個模塊
這篇文章主要介紹了SpringBoot實現(xiàn)在一個模塊中引入另一個模塊的方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-10-10SpringBoot快速整合RabbitMq小案例(使用步驟)
這篇文章主要介紹了SpringBoot快速整合RabbitMq小案例,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-06-06Java線程池隊列PriorityBlockingQueue原理分析
這篇文章主要介紹了Java線程池隊列PriorityBlockingQueue原理分析,PriorityBlockingQueue隊列是?JDK1.5?的時候出來的一個阻塞隊列,但是該隊列入隊的時候是不會阻塞的,永遠會加到隊尾,需要的朋友可以參考下2023-12-12