SpringBoot+Redis實現(xiàn)不重復(fù)消費的隊列的示例代碼
背景
最近我們新研發(fā)了一個“年夜飯訂購”功能(沒想到吧,雷襲在是一個程序猿的同時,也是一名優(yōu)秀的在廚子)。用戶使用系統(tǒng)選擇年夜飯,點擊“下單”時,后臺首先會生成一條訂單數(shù)據(jù),返回消息給用戶:“您已成功下單,后廚正在準(zhǔn)備菜品!”。同時,以線程的方式指揮各個廚子按菜單聯(lián)系供應(yīng)商準(zhǔn)備食材,制作菜品,最后打包寄給客戶。但是,用戶在使用這個功能時,系統(tǒng)卻有一定的機(jī)率卡死,這個問題極大的影響了用戶的體驗。年關(guān)將近,這個功能也顯得越發(fā)重要,客戶要求我們限期整改,三天內(nèi)必須解決該問題。
我首先對這個功能進(jìn)行了分析,很明顯,這是一個使用頻次不高,但是使用時間比較集中的功能。在大量用戶同時使用時,會導(dǎo)致后臺的廚師,食材,供應(yīng)商等全面告警(用程序員語言翻譯一下,這個功能耗CPU,耗內(nèi)存,耗IO)。但用戶對于實時性的要求并不高。下單之后,訂購的菜品是一天內(nèi)完成,還是兩天完成并沒有關(guān)系,只要年前能做完就可以。
因此,我們決定采用消息中間件的方式,以隊列的形式逐次的執(zhí)行“年夜飯制作”的操作, 來緩解服務(wù)器的各種資源的壓力。
之所以采用Redis來實現(xiàn)消息隊列,而不是使用更為成熟的ONS,Kafka。不是因為ONS用不起,而是Redis更有性價比(用戶只允許使用ONS中間件,但ONS會帶來額外的網(wǎng)絡(luò)開銷,學(xué)習(xí)成本和風(fēng)險都更大,這個功能使用頻度并不高,沒有必要為了它而引入一個重量級的中間件。)
代碼實踐
說干就干,咱們先看看源碼,如下:
// 訂單實體類 @Data public class OrderEntity implements Serializable { /** * 客戶姓名 */ private String customerName; /** * 訂單號 */ private String orderCode; /** * 菜單 */ List<String> menus; } @Slf4j @Service public class DinnerService { /** * 年夜飯下單 * * @param req 訂單信息 * @return */ public Object orderNewYearEveDinner(OrderEntity entity) { // 存儲訂單信息 saveOrder(entity); // 異步開始做菜 CompletableFuture.runAsync(() -> doNewYearEveDinner(entity)); return "您已成功下單,后廚正在準(zhǔn)備預(yù)制菜!"; } /** * 這里模擬的是做年夜飯的過程方法,該方法用時較長,整個過程需要10秒。 * 這個過程中存在多種意外,可能導(dǎo)致該方法執(zhí)行失敗 * * @param req 訂單信息 */ public void doNewYearEveDinner(OrderEntity entity) { System.out.println("開始做訂單 " + entity.getOrderCode() + " 的年夜飯"); try { Thread.sleep(10000); }catch (Exception e ) { e.printStackTrace(); System.out.println("廚子跑了,廚房著火了,供應(yīng)商堵路上了"); } System.out.println("訂單 " + entity.getOrderCode() + " 的年夜飯已經(jīng)完成"); } private void saveOrder(OrderEntity req) { //這里假設(shè)做的是訂單入庫操作 System.out.println("訂單 " + req.getOrderCode() + " 已經(jīng)入庫, 做飯開始時間為 "+ new Date()); } }
1、引入maven依賴,在application.yml中添加redis配置
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
spring: redis: database: 9 host: 127.0.0.1 port: 6379 password: jedis: pool: max-active: 8 max-wait: -1 max-idle: 8 min-idle: 0
2、添加Redis隊列監(jiān)聽,添加Redis配置文件注冊監(jiān)聽
// 監(jiān)聽類 @Component public class DinnerListener implements MessageListener { @Autowired private DinnerService service; @Override public void onMessage(Message message, byte[] pattern) { OrderEntity entity= JSON.parseObject(message.toString(), OrderEntity.class); service.doNewYearEveDinner(entity); } } //配置類,用于注冊監(jiān)聽 @Configuration public class RedisConfig { @Bean public ChannelTopic topic() { return new ChannelTopic("NEW_YEAR_DINNER"); } @Bean public MessageListenerAdapter messageListenerAdapter(DinnerListener listener) { return new MessageListenerAdapter(listener); } @Bean public RedisMessageListenerContainer redisContainer(RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter messageListenerAdapter, ChannelTopic topic) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(redisConnectionFactory); container.addMessageListener(messageListenerAdapter, topic); return container; } }
3、修改原方法,以及Controller調(diào)用
// DinnerService中的方法修改 /** * 年夜飯下單 * * @param req 訂單信息 * @return */ public Object orderNewYearEveDinner(OrderEntity entity) { // 存儲訂單信息 saveOrder(entity); // 異步開始做菜 redisTemplate.convertAndSend("NEW_YEAR_DINNER", JSON.toJSONString(entity)); return "您已成功下單,后廚正在準(zhǔn)備預(yù)制菜!"; } @RestController public class DinnerController { private int i = 0; @Autowired private DinnerService service; @GetMapping("/orderDinner") public Object orderDinner() { OrderEntity entity = new OrderEntity(); entity.setOrderCode("Order" + (++i)); entity.setCustomerName("第"+i+"位客戶"); return service.orderNewYearEveDinner(entity); } }
4、通過postman調(diào)用四次請求,測試結(jié)果如下:
5、Listener中添加同步鎖
細(xì)看上文中打出來的注釋,我發(fā)現(xiàn)這和我設(shè)想的不一樣啊。原定的計劃是先做完第一份年夜飯,再做第二份,做完第二份再做第三份,為什么第一次沒執(zhí)行完就開始執(zhí)行第二次了?
在網(wǎng)上查了些資料后我才知道,要達(dá)到我想要的效果,得在Listener中添加上同步鎖,如下:
@Component public class DinnerListener implements MessageListener { @Autowired private DinnerService service; private final Object lock = new Object(); @Override public void onMessage(Message message, byte[] pattern) { synchronized (lock) { OrderEntity entity = JSON.parseObject(message.toString(), OrderEntity.class); service.doNewYearEveDinner(entity); } } }
再次執(zhí)行測試用例,結(jié)果如下:
6、多服務(wù)不重復(fù)消費消息
上面的結(jié)果已經(jīng)滿足了我們的要求,但是,客戶考慮到我們只有一個廚房,的確影響效率,決定給我們擴(kuò)建一個廚房(添加服務(wù)器),希望能達(dá)到廚房A做第一份訂單,廚房B做第二份訂單,以上的代碼能實現(xiàn)嗎?我們把剛才的項目拷貝一份,修改端口,啟動后測試。結(jié)果如下:
從上面的日志可以看出來,兩個服務(wù)都做了訂單1的年夜飯,消息被重復(fù)消費了。但是根據(jù)業(yè)務(wù)需求,我們不需要重復(fù)消費消息,我們想達(dá)到的效果是多服務(wù)實現(xiàn)負(fù)載均衡,本服務(wù)在處理的數(shù)據(jù),其他服務(wù)不需要再處理了,應(yīng)該怎么實現(xiàn)呢?咱們依然可以運用Redis,對代碼做如下調(diào)整:
@Component public class DinnerListener implements MessageListener { @Autowired private DinnerService service; @Autowired private RedisTemplate<String, String> redisTemplate; private final Object lock = new Object(); @Override public void onMessage(Message message, byte[] pattern) { synchronized (lock) { Boolean flag = redisTemplate.opsForValue().setIfAbsent(message.toString(), "1", 1, TimeUnit.DAYS); // 加鎖失敗,已有消費端在此時對此消息進(jìn)行處理,這里不再做處理 if (!flag) { return; } OrderEntity entity = JSON.parseObject(message.toString(), OrderEntity.class); service.doNewYearEveDinner(entity); } } }
從測試結(jié)果來看,這么調(diào)整解決達(dá)到了我們的效果。
7、添加日志監(jiān)控
仔細(xì)檢查,發(fā)現(xiàn)上面的代碼雖然滿足了我們的業(yè)務(wù)需求,但是在安全方面仍然沒有得到一定的保障,方法doNewYearEveDinner存在很多不可預(yù)見的隱患,如廚師跑了,廚房著了,供應(yīng)商堵路上了,這些都會導(dǎo)致方法執(zhí)行失敗,那么,我們怎么知道這個訂單執(zhí)行成功或者失敗了呢?看日志嗎?成百上千條數(shù)據(jù)堆起來,通過看日志來看結(jié)果多不方便?。吭蹅兪欠窨梢詫Υa做一下調(diào)整?基于這方面考慮,我對代碼做了以下調(diào)整
//訂單類進(jìn)行調(diào)整 @Data public class OrderEntity implements Serializable { /** * 客戶姓名 */ private String customerName; /** * 訂單號 */ private String orderCode; /** * 菜單 */ List<String> menus; /** * 出餐狀態(tài) */ private String dinnerState; /** * 做飯開始時間 */ private String dinnerStartTime; /** * 做飯結(jié)束時間 */ private String dinnerEndTime; /** * 備注 */ private String remark; } // DinnerService做如下調(diào)整, 添加一個訂單信息更新的方法 @Slf4j @Service public class DinnerService { @Autowired private RedisTemplate<String, String> redisTemplate; /** * 年夜飯下單 * * @param req 訂單信息 * @return */ public Object orderNewYearEveDinner(OrderEntity req) { // 存儲訂單信息 saveOrder(req); // 異步開始做菜 redisTemplate.convertAndSend("NEW_YEAR_DINNER", JSON.toJSONString(req)); return "您已成功下單,訂單號為"+ req.getOrderCode()+",后廚正在準(zhǔn)備預(yù)制菜!"; } /** * 這里模擬的是做年夜飯的過程方法,該方法用時較長,整個過程需要10秒,但是,這個過程中存在多種意外,該方法可能失敗 * * @param req 訂單信息 */ public void doNewYearEveDinner(OrderEntity req) throws Exception { System.out.println("開始做訂單 " + req.getOrderCode() + " 的年夜飯"); Thread.sleep(10000); System.out.println("訂單 " + req.getOrderCode() + " 的年夜飯已經(jīng)完成"); } private void saveOrder(OrderEntity req) { //這里假設(shè)做的是訂單入庫操作 System.out.println("訂單 " + req.getOrderCode() + " 已經(jīng)入庫, 做飯開始時間為 "+ new Date()); } /** * 根據(jù)訂單編號修改訂單信息 * * @param orderCode 訂單編號 * @param dinnerStatus * @param remark */ public void updateOrder(String orderCode, String dinnerStatus, String remark) { // 根據(jù)訂單編號修改訂單的出餐結(jié)束時間,出餐狀態(tài),備注等信息。 System.out.println("更新訂單 "+ orderCode +" 信息,做飯結(jié)束時間為 "+ new Date() + ", 出餐狀態(tài)為"+ dinnerStatus +", 備注為 " +remark); } } // Listener中做如下調(diào)整 @Override public void onMessage(Message message, byte[] pattern) { synchronized (lock) { Boolean flag = redisTemplate.opsForValue().setIfAbsent(message.toString(), "1", 1, TimeUnit.DAYS); // 加鎖失敗,已有消費端在此時對此消息進(jìn)行處理,這里不再做處理 if (!flag) { return; } OrderEntity param = JSON.parseObject(message.toString(), OrderEntity.class); try { service.doNewYearEveDinner(param); service.updateOrder(param.getOrderCode(), "SUCCESS", "成功"); }catch (Exception e) { e.printStackTrace(); service.updateOrder(param.getOrderCode(), "FAIL", e.getMessage()); } } }
這部分代碼就不貼測試結(jié)果了,與上一次的測試結(jié)果一致,只不過提升了功能的可測試性,擴(kuò)展一下,這個結(jié)果能否達(dá)到我們的要求呢?其實仍然沒有,對于執(zhí)行失敗的訂單,我們需要一個機(jī)制來處理,根據(jù)報錯信息決定是重新執(zhí)行還是直接報警,人為介入處理,由此才能實現(xiàn)整個事務(wù)的閉環(huán)。
這是一次簡單的SpringBoot+Redis實現(xiàn)隊列的實踐,個人覺得這個過程比較有趣,分析問題出現(xiàn)的原因,需求的潛在歸約,根據(jù)業(yè)務(wù)的需要、當(dāng)前的條件選擇合適的方法和組件,快而有效的解決問題,所以我將它記錄了下來,供大家參考。實際上,已經(jīng)有大神對于Redis實現(xiàn)隊列的方法進(jìn)行了完整細(xì)致的歸納,如果想深入的了解這部分的知識,推薦你們看看這篇博客: Redis隊列詳解(springboot實戰(zhàn))
到此這篇關(guān)于SpringBoot+Redis實現(xiàn)不重復(fù)消費的隊列的示例代碼的文章就介紹到這了,更多相關(guān)SpringBoot Redis不重復(fù)消費隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java隨機(jī)數(shù)算法原理與實現(xiàn)方法實例詳解
這篇文章主要介紹了Java隨機(jī)數(shù)算法原理與實現(xiàn)方法,簡單分析了隨機(jī)數(shù)算法的原理并結(jié)合具體實例形式給出了java編程計算隨機(jī)數(shù)的具體操作技巧,需要的朋友可以參考下2017-09-09Spring boot如何通過@Scheduled實現(xiàn)定時任務(wù)及多線程配置
這篇文章主要介紹了Spring boot如何通過@Scheduled實現(xiàn)定時任務(wù)及多線程配置,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2019-12-12java加載properties文件的六種方法總結(jié)
這篇文章主要介紹了java加載properties文件的六種方法總結(jié)的相關(guān)資料,需要的朋友可以參考下2017-05-05mybatis動態(tài)新增(insert)和修改(update)方式
這篇文章主要介紹了mybatis動態(tài)新增(insert)和修改(update)方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-05-05java GUI實現(xiàn)ATM機(jī)系統(tǒng)(3.0版)
這篇文章主要為大家詳細(xì)介紹了java GUI實現(xiàn)ATM機(jī)系統(tǒng)(3.0版),文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-03-03Javaweb監(jiān)聽器實例之統(tǒng)計在線人數(shù)
這篇文章主要為大家詳細(xì)介紹了Javaweb監(jiān)聽器實例之統(tǒng)計在線人數(shù),文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2019-11-11spring/springboot整合dubbo詳細(xì)教程
今天教大家如何使用spring/springboot整合dubbo,文中有非常詳細(xì)的圖文介紹及代碼示例,對正在學(xué)習(xí)java的小伙伴有很好地幫助,需要的朋友可以參考下2021-05-05