Java如何處理延遲任務(wù)過(guò)程解析
1、利用延遲隊(duì)列
延時(shí)隊(duì)列,第一他是個(gè)隊(duì)列,所以具有對(duì)列功能第二就是延時(shí),這就是延時(shí)對(duì)列,功能也就是將任務(wù)放在該延時(shí)對(duì)列中,只有到了延時(shí)時(shí)刻才能從該延時(shí)對(duì)列中獲取任務(wù)否則獲取不到……
應(yīng)用場(chǎng)景比較多,比如延時(shí)1分鐘發(fā)短信,延時(shí)1分鐘再次執(zhí)行等,下面先看看延時(shí)隊(duì)列demo之后再看延時(shí)隊(duì)列在項(xiàng)目中的使用:
簡(jiǎn)單的延時(shí)隊(duì)列要有三部分:第一實(shí)現(xiàn)了Delayed接口的消息體、第二消費(fèi)消息的消費(fèi)者、第三存放消息的延時(shí)隊(duì)列,那下面就來(lái)看看延時(shí)隊(duì)列demo。
一、消息體
package com.delqueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; /** * 消息體定義 實(shí)現(xiàn)Delayed接口就是實(shí)現(xiàn)兩個(gè)方法即compareTo 和 getDelay最重要的就是getDelay方法,這個(gè)方法用來(lái)判斷是否到期…… */ public class Message implements Delayed { private int id; private String body; // 消息內(nèi)容 private long excuteTime;// 延遲時(shí)長(zhǎng),這個(gè)是必須的屬性因?yàn)橐凑者@個(gè)判斷延時(shí)時(shí)長(zhǎng)。 public int getId() { return id; } public String getBody() { return body; } public long getExcuteTime() { return excuteTime; } public Message(int id, String body, long delayTime) { this.id = id; this.body = body; this.excuteTime = TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS) + System.nanoTime(); } // 自定義實(shí)現(xiàn)比較方法返回 1 0 -1三個(gè)參數(shù) @Override public int compareTo(Delayed delayed) { Message msg = (Message) delayed; return Integer.valueOf(this.id) > Integer.valueOf(msg.id) ? 1 : (Integer.valueOf(this.id) < Integer.valueOf(msg.id) ? -1 : 0); } // 延遲任務(wù)是否到時(shí)就是按照這個(gè)方法判斷如果返回的是負(fù)數(shù)則說(shuō)明到期否則還沒(méi)到期 @Override public long getDelay(TimeUnit unit) { return unit.convert(this.excuteTime - System.nanoTime(), TimeUnit.NANOSECONDS); } }
二、消息消費(fèi)者
package com.delqueue; import java.util.concurrent.DelayQueue; public class Consumer implements Runnable { // 延時(shí)隊(duì)列 ,消費(fèi)者從其中獲取消息進(jìn)行消費(fèi) private DelayQueue<Message> queue; public Consumer(DelayQueue<Message> queue) { this.queue = queue; } @Override public void run() { while (true) { try { Message take = queue.take(); System.out.println("消費(fèi)消息id:" + take.getId() + " 消息體:" + take.getBody()); } catch (InterruptedException e) { e.printStackTrace(); } } } }
三、延時(shí)隊(duì)列
package com.delqueue; import java.util.concurrent.DelayQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class DelayQueueTest { public static void main(String[] args) { // 創(chuàng)建延時(shí)隊(duì)列 DelayQueue<Message> queue = new DelayQueue<Message>(); // 添加延時(shí)消息,m1 延時(shí)3s Message m1 = new Message(1, "world", 3000); // 添加延時(shí)消息,m2 延時(shí)10s Message m2 = new Message(2, "hello", 10000); //將延時(shí)消息放到延時(shí)隊(duì)列中 queue.offer(m2); queue.offer(m1); // 啟動(dòng)消費(fèi)線程 消費(fèi)添加到延時(shí)隊(duì)列中的消息,前提是任務(wù)到了延期時(shí)間 ExecutorService exec = Executors.newFixedThreadPool(1); exec.execute(new Consumer(queue)); exec.shutdown(); } }
將消息體放入延遲隊(duì)列中,在啟動(dòng)消費(fèi)者線程去消費(fèi)延遲隊(duì)列中的消息,如果延遲隊(duì)列中的消息到了延遲時(shí)間則可以從中取出消息否則無(wú)法取出消息也就無(wú)法消費(fèi)。
這就是延遲隊(duì)列demo,下面我們來(lái)說(shuō)說(shuō)在真實(shí)環(huán)境下的使用。
使用場(chǎng)景描述:
在打車(chē)軟件中對(duì)訂單進(jìn)行派單的流程,當(dāng)有訂單的時(shí)候給該訂單篩選司機(jī),然后給當(dāng)訂單綁定司機(jī),但是有時(shí)運(yùn)氣沒(méi)那么好,訂單進(jìn)來(lái)后第一次沒(méi)有篩選到合適的司機(jī),但我們也不能就此結(jié)束派單,而是將該訂單的信息放到延時(shí)隊(duì)列中過(guò)個(gè)2秒鐘在進(jìn)行一次,其實(shí)這個(gè)2秒鐘就是一個(gè)延遲,所以這里我們就可以使用延時(shí)隊(duì)列來(lái)實(shí)現(xiàn)……
下面看看簡(jiǎn)單的流程圖:
下面來(lái)看看具體代碼實(shí)現(xiàn):
在項(xiàng)目中有如下幾個(gè)類(lèi):第一 、任務(wù)類(lèi) 第二、按照任務(wù)類(lèi)組裝的消息體類(lèi) 第三、延遲隊(duì)列管理類(lèi)
任務(wù)類(lèi)即執(zhí)行篩選司機(jī)、綁單、push消息的任務(wù)類(lèi)
package com.test.delayqueue; /** * 具體執(zhí)行相關(guān)業(yè)務(wù)的業(yè)務(wù)類(lèi) * @author whd * @date 2017年9月25日 上午12:49:32 */ public class DelayOrderWorker implements Runnable { @Override public void run() { // TODO Auto-generated method stub //相關(guān)業(yè)務(wù)邏輯處理 System.out.println(Thread.currentThread().getName()+" do something ……"); } }
消息體類(lèi),在延時(shí)隊(duì)列中這個(gè)實(shí)現(xiàn)了Delayed接口的消息類(lèi)是比不可少的,實(shí)現(xiàn)接口時(shí)有一個(gè)getDelay(TimeUnit unit)方法,這個(gè)方法就是判斷是否到期的
這里定義的是一個(gè)泛型類(lèi),所以可以將我們上面的任務(wù)類(lèi)作為其中的task,這樣就將任務(wù)類(lèi)分裝成了一個(gè)消息體
package com.test.delayqueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; /** * 延時(shí)隊(duì)列中的消息體將任務(wù)封裝為消息體 * * @author whd * @date 2017年9月25日 上午12:48:30 * @param <T> */ public class DelayOrderTask<T extends Runnable> implements Delayed { private final long time; private final T task; // 任務(wù)類(lèi),也就是之前定義的任務(wù)類(lèi) /** * @param timeout * 超時(shí)時(shí)間(秒) * @param task * 任務(wù) */ public DelayOrderTask(long timeout, T task) { this.time = System.nanoTime() + timeout; this.task = task; } @Override public int compareTo(Delayed o) { // TODO Auto-generated method stub DelayOrderTask other = (DelayOrderTask) o; long diff = time - other.time; if (diff > 0) { return 1; } else if (diff < 0) { return -1; } else { return 0; } } @Override public long getDelay(TimeUnit unit) { // TODO Auto-generated method stub return unit.convert(this.time - System.nanoTime(), TimeUnit.NANOSECONDS); } @Override public int hashCode() { return task.hashCode(); } public T getTask() { return task; } }
延時(shí)隊(duì)列管理類(lèi),這個(gè)類(lèi)主要就是將任務(wù)類(lèi)封裝成消息并并添加到延時(shí)隊(duì)列中,以及輪詢(xún)延時(shí)隊(duì)列從中取出到時(shí)的消息體,在獲取任務(wù)類(lèi)放到線程池中執(zhí)行任務(wù)
package com.test.delayqueue; import java.util.Map; import java.util.concurrent.DelayQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** * 延時(shí)隊(duì)列管理類(lèi),用來(lái)添加任務(wù)、執(zhí)行任務(wù) * * @author whd * @date 2017年9月25日 上午12:44:59 */ public class DelayOrderQueueManager { private final static int DEFAULT_THREAD_NUM = 5; private static int thread_num = DEFAULT_THREAD_NUM; // 固定大小線程池 private ExecutorService executor; // 守護(hù)線程 private Thread daemonThread; // 延時(shí)隊(duì)列 private DelayQueue<DelayOrderTask<?>> delayQueue; private static final AtomicLong atomic = new AtomicLong(0); private final long n = 1; private static DelayOrderQueueManager instance = new DelayOrderQueueManager(); private DelayOrderQueueManager() { executor = Executors.newFixedThreadPool(thread_num); delayQueue = new DelayQueue<>(); init(); } public static DelayOrderQueueManager getInstance() { return instance; } /** * 初始化 */ public void init() { daemonThread = new Thread(() -> { execute(); }); daemonThread.setName("DelayQueueMonitor"); daemonThread.start(); } private void execute() { while (true) { Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces(); System.out.println("當(dāng)前存活線程數(shù)量:" + map.size()); int taskNum = delayQueue.size(); System.out.println("當(dāng)前延時(shí)任務(wù)數(shù)量:" + taskNum); try { // 從延時(shí)隊(duì)列中獲取任務(wù) DelayOrderTask<?> delayOrderTask = delayQueue.take(); if (delayOrderTask != null) { Runnable task = delayOrderTask.getTask(); if (null == task) { continue; } // 提交到線程池執(zhí)行task executor.execute(task); } } catch (Exception e) { e.printStackTrace(); } } } /** * 添加任務(wù) * * @param task * @param time * 延時(shí)時(shí)間 * @param unit * 時(shí)間單位 */ public void put(Runnable task, long time, TimeUnit unit) { // 獲取延時(shí)時(shí)間 long timeout = TimeUnit.NANOSECONDS.convert(time, unit); // 將任務(wù)封裝成實(shí)現(xiàn)Delayed接口的消息體 DelayOrderTask<?> delayOrder = new DelayOrderTask<>(timeout, task); // 將消息體放到延時(shí)隊(duì)列中 delayQueue.put(delayOrder); } /** * 刪除任務(wù) * * @param task * @return */ public boolean removeTask(DelayOrderTask task) { return delayQueue.remove(task); } }
測(cè)試類(lèi)
package com.delqueue; import java.util.concurrent.TimeUnit; import com.test.delayqueue.DelayOrderQueueManager; import com.test.delayqueue.DelayOrderWorker; public class Test { public static void main(String[] args) { DelayOrderWorker work1 = new DelayOrderWorker();// 任務(wù)1 DelayOrderWorker work2 = new DelayOrderWorker();// 任務(wù)2 DelayOrderWorker work3 = new DelayOrderWorker();// 任務(wù)3 // 延遲隊(duì)列管理類(lèi),將任務(wù)轉(zhuǎn)化消息體并將消息體放入延遲對(duì)列中等待執(zhí)行 DelayOrderQueueManager manager = DelayOrderQueueManager.getInstance(); manager.put(work1, 3000, TimeUnit.MILLISECONDS); manager.put(work2, 6000, TimeUnit.MILLISECONDS); manager.put(work3, 9000, TimeUnit.MILLISECONDS); } }
OK 這就是項(xiàng)目中的具體使用情況,當(dāng)然具體內(nèi)容被忽略,整體框架就是這樣,還有這里使用java的延時(shí)隊(duì)列但是這種方式是有問(wèn)題的如果如果down機(jī)則會(huì)出現(xiàn)任務(wù)丟失,所以也可以考慮使用mq、redis來(lái)實(shí)現(xiàn)
2、mq實(shí)現(xiàn)延遲消息
在rabbitmq 3.5.7及以上的版本提供了一個(gè)插件(rabbitmq-delayed-message-exchange)來(lái)實(shí)現(xiàn)延遲隊(duì)列功能。同時(shí)插件依賴(lài)Erlang/OPT 18.0及以上。
插件源碼地址:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
插件下載地址:
https://bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange
安裝:
進(jìn)入插件安裝目錄
{rabbitmq-server}/plugins/(可以查看一下當(dāng)前已存在的插件)
下載插件
rabbitmq_delayed_message_exchange wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez
(如果下載的文件名稱(chēng)不規(guī)則就手動(dòng)重命名一下如:
rabbitmq_delayed_message_exchange-0.0.1.ez)
啟用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
關(guān)閉插件
rabbitmq-plugins disable rabbitmq_delayed_message_exchange
插件使用
通過(guò)聲明一個(gè)x-delayed-message類(lèi)型的exchange來(lái)使用delayed-messaging特性
x-delayed-message是插件提供的類(lèi)型,并不是rabbitmq本身的,發(fā)送消息的時(shí)候通過(guò)在header添加”x-delay”參數(shù)來(lái)控制消息的延時(shí)時(shí)間
直接在maven工程的pom.xml文件中加入
接下來(lái)在 application.properties 文件中加入redis配置:
也很簡(jiǎn)單,代碼如下:
實(shí)現(xiàn)消息發(fā)送
x-delay
在這里我設(shè)置的延遲時(shí)間是3秒。
消息消費(fèi)者
直接在main方法里運(yùn)行Spring Boot程序,Spring Boot會(huì)自動(dòng)解析 MessageReceiver 類(lèi)的。
接下來(lái)只需要用Junit運(yùn)行一下發(fā)送消息的接口即可。
運(yùn)行完后,可以看到如下信息:
消息發(fā)送時(shí)間:2018-05-03 12:44:53 3秒鐘后,Spring Boot控制臺(tái)會(huì)輸出: 消息接收時(shí)間:2018-05-03 12:44:56 接收到的消息:hello i am delay msg
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
- Java實(shí)現(xiàn)任務(wù)超時(shí)處理方法
- Java實(shí)現(xiàn)的并發(fā)任務(wù)處理實(shí)例
- Java應(yīng)用多機(jī)器部署解決大量定時(shí)任務(wù)問(wèn)題
- Quartz實(shí)現(xiàn)JAVA定時(shí)任務(wù)的動(dòng)態(tài)配置的方法
- 在Java Web項(xiàng)目中添加定時(shí)任務(wù)的方法
- java實(shí)現(xiàn)多線程之定時(shí)器任務(wù)
- Java定時(shí)任務(wù):利用java Timer類(lèi)實(shí)現(xiàn)定時(shí)執(zhí)行任務(wù)的功能
- 最流行的java后臺(tái)框架spring quartz定時(shí)任務(wù)
相關(guān)文章
JAVA spark創(chuàng)建DataFrame的方法
這篇文章主要介紹了JAVA spark創(chuàng)建DataFrame的方法,幫助大家更好的理解和學(xué)習(xí)spark,感興趣的朋友可以了解下2020-08-08SpringBoot thymeleaf eclipse熱部署方案操作步驟
今天小編就為大家分享一篇關(guān)于SpringBoot thymeleaf eclipse熱部署方案操作步驟,小編覺(jué)得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧2019-03-03spring boot validation參數(shù)校驗(yàn)實(shí)例分析
這篇文章主要介紹了spring boot validation參數(shù)校驗(yàn),結(jié)合實(shí)例形式分析了spring boot validation進(jìn)行數(shù)據(jù)有效性驗(yàn)證的相關(guān)操作技巧,需要的朋友可以參考下2019-11-11基于java.lang.IllegalArgumentException異常報(bào)錯(cuò)問(wèn)題及解決
這篇文章主要介紹了基于java.lang.IllegalArgumentException異常報(bào)錯(cuò)問(wèn)題及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-03-03在SpringBoot中實(shí)現(xiàn)一個(gè)訂單號(hào)生成系統(tǒng)的示例代碼
在Spring Boot中設(shè)計(jì)一個(gè)訂單號(hào)生成系統(tǒng),主要考慮到生成的訂單號(hào)需要滿足的幾個(gè)要求:唯一性、可擴(kuò)展性、以及可能的業(yè)務(wù)相關(guān)性,本文給大家介紹了幾種常見(jiàn)的解決方案及相應(yīng)的示例代碼,需要的朋友可以參考下2024-02-02java實(shí)現(xiàn)上傳文件類(lèi)型檢測(cè)過(guò)程解析
這篇文章主要介紹了java實(shí)現(xiàn)上傳文件類(lèi)型檢測(cè)過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-12-12詳解Guava Cache本地緩存在Spring Boot應(yīng)用中的實(shí)踐
Guava Cache是一個(gè)全內(nèi)存的本地緩存實(shí)現(xiàn),本文將講述如何將 Guava Cache緩存應(yīng)用到 Spring Boot應(yīng)用中。具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2019-01-01