JVM優(yōu)先級線程池做任務隊列的實現(xiàn)方法
前言
我們都知道 web 服務的工作大多是接受 http 請求,并返回處理后的結果。服務器接受的每一個請求又可以看是一個任務。一般而言這些請求任務會根據(jù)請求的先后有序處理,如果請求任務的處理比較耗時,往往就需要排隊了。而同時不同的任務直接可能會存在一些優(yōu)先級的變化,這時候就需要引入任務隊列并進行管理了。可以做任務隊列的東西有很多,Java 自帶的線程池,以及其他的消息中間件都可以。
同步與異步
這個問題在之前已經(jīng)提過很多次了,有些任務是需要請求后立即返回結果的,而有的則不需要。設想一下你下單購物的場景,付完錢后,系統(tǒng)只需要返回一個支付成功即可,后續(xù)的積分增加、優(yōu)惠券發(fā)放、安排發(fā)貨等等業(yè)務都不需要實時返回給用戶的,這些就是異步的任務。大量的異步任務到達我們部署的服務上,由于處理效率的瓶頸,無法達到實時處理,因此與需要用隊列將他們暫時保存起來,排隊處理。
線程池
在 Java 中提到隊列,我們除了想到基本的數(shù)據(jù)結構之外,應該還有線程池。線程池自帶一套機制可以實現(xiàn)任務的排隊和執(zhí)行,可以滿足單點環(huán)境下絕大多數(shù)異步化的場景。下面是典型的一個處理流程:
// 注入合適類型的線程池 @Autowired private final ThreadPoolExecutor asyncPool; @RequestMapping(value = "/async/someOperate", method = RequestMethod.POST) public RestResult someOperate(HttpServletRequest request, String params,String callbackUrl { // 接受請求后 submit 到線程池排隊處理 asyncPool.submit(new Task(params,callbackUrl); return new RestResult(ResultCode.SUCCESS.getCode(), null) {{ setMsg("successful!" + prop.getShowMsg()); }}; } // 異步任務處理 @Slf4j public class Task extends Callable<RestResult> { private String params; private String callbackUrl; private final IAlgorithmService algorithmService = SpringUtil.getBean(IAlgorithmServiceImpl.class); private final ServiceUtils serviceUtils = SpringUtil.getBean(ServiceUtils.class); public ImageTask(String params,String callbackUrl) { this.params = params; this.callbackUrl = callbackUrl; } @Override public RestResult call() { try { // 業(yè)務處理 CarDamageResult result = algorithmService.someOperate(this.params); // 回調 return serviceUtils.callback(this.callbackUrl, this.caseNum, ResultCode.SUCCESS.getCode(), result, this.isAsync); } catch (ServiceException e) { return serviceUtils.callback(this.callbackUrl, this.caseNum, e.getCode(), null, this.isAsync); } } }
對于線程池這里就不具體展開講了,僅僅簡單理了下具體的流程:
- 收到請求后,參數(shù)校驗后傳入線程池排隊。
- 返回結果:“請求成功,正在處理”。
- 任務排到后由相應的線程處理,處理完后進行接口回調。
上面的例子描述了一個生產(chǎn)速度遠遠大于消費速度的模型,普通面向數(shù)據(jù)庫開發(fā)的企業(yè)級應用,由于數(shù)據(jù)庫的連接池開發(fā)的連接數(shù)較大,一般不需要這樣通過線程池來處理,而一些 GPU 密集型的應用場景,由于顯存的瓶頸導致消費速度慢時,就需要隊列來作出調整了。
帶優(yōu)先級的線程池
更復雜的,例如考慮到任務的優(yōu)先級,還需要對線程池進行重寫,通過 PriorityBlockingQueue 來替換默認的阻塞隊列。直接上代碼。
import lombok.Data; import java.util.concurrent.Callable; /** * @author Fururur * @create 2020-01-14-10:37 */ @Data public abstract class PriorityCallable<T> implements Callable<T> { private int priority; }
import lombok.Getter; import lombok.Setter; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; /** * 優(yōu)先級線程池的實現(xiàn) * * @author Fururur * @create 2019-07-23-10:19 */ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { private ThreadLocal<Integer> local = ThreadLocal.withInitial(() -> 0); public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, getWorkQueue()); } public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, getWorkQueue(), threadFactory); } public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, getWorkQueue(), handler); } public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, getWorkQueue(), threadFactory, handler); } private static PriorityBlockingQueue getWorkQueue() { return new PriorityBlockingQueue(); } @Override public void execute(Runnable command) { int priority = local.get(); try { this.execute(command, priority); } finally { local.set(0); } } public void execute(Runnable command, int priority) { super.execute(new PriorityRunnable(command, priority)); } public <T> Future<T> submit(PriorityCallable<T> task) { local.set(task.getPriority()); return super.submit(task); } public <T> Future<T> submit(Runnable task, T result, int priority) { local.set(priority); return super.submit(task, result); } public Future<?> submit(Runnable task, int priority) { local.set(priority); return super.submit(task); } @Getter @Setter protected static class PriorityRunnable implements Runnable, Comparable<PriorityRunnable> { private final static AtomicLong seq = new AtomicLong(); private final long seqNum; private Runnable run; private int priority; PriorityRunnable(Runnable run, int priority) { seqNum = seq.getAndIncrement(); this.run = run; this.priority = priority; } @Override public void run() { this.run.run(); } @Override public int compareTo(PriorityRunnable other) { int res = 0; if (this.priority == other.priority) { if (other.run != this.run) { // ASC res = (seqNum < other.seqNum ? -1 : 1); } } else { // DESC res = this.priority > other.priority ? -1 : 1; } return res; } } }
要點如下:
- 替換線程池默認的阻塞隊列為 PriorityBlockingQueue,響應的傳入的線程類需要實現(xiàn) Comparable<T> 才能進行比較。
- PriorityBlockingQueue 的數(shù)據(jù)結構決定了,優(yōu)先級相同的任務無法保證 FIFO,需要自己控制順序。
- 需要重寫線程池的 execute() 方法??催^線程池源碼的會發(fā)現(xiàn),執(zhí)行 submit(task) 方法后,都會轉化成 RunnableFuture<T> 再進一步執(zhí)行,由于傳入的 task 雖然實現(xiàn)了 Comparable<T> 到,但是內部轉換成的 RunnableFuture<T> 并未實現(xiàn),因此直接 submit 會拋出 Caused by: java.lang.ClassCastException: java.util.concurrent.FutureTask cannot be cast to java.lang.Comparable 這樣一個異常,所以需要重寫 execute() 方法,構造一個 PriorityRunnable 作為中轉。
總結
JVM 線程池是實現(xiàn)異步任務隊列最簡單最原生的一種方式,本文介紹了基本的使用流程和帶有優(yōu)先隊列需求的用法。這種方法可有滿足到一些簡單的業(yè)務場景,但也存在一定的局限性:
- JVM 線程池是單機的,橫向擴展多個服務下做負載均衡時,就會存在多個線程池了他們是分開工作的,無法很好的統(tǒng)一和管理,不太適合分布式場景。
- JVM 線程池是基于內存的,一旦服務掛了,會出現(xiàn)任務丟失的情況,可靠性低。
- 缺少作為任務隊列的 ack 機制,一旦任務失敗不會重新執(zhí)行,且無法很好地對線程池隊列進行監(jiān)控。
顯然簡單的 JVM 線程池是無法 handle 到負載的業(yè)務場景的,這就需要引入其他中間件了,在接下來的文章中我們會繼續(xù)探討。
參考文獻
ThreadPoolExecutor 優(yōu)先級的線程池
implementing PriorityQueue on ThreadPoolExecutor
ThreadPoolExecutor 的 PriorityBlockingQueue 類型轉化問題
到此這篇關于JVM優(yōu)先級線程池做任務隊列的實現(xiàn)方法的文章就介紹到這了,更多相關java線程池優(yōu)先級內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
springboot整合JSR303參數(shù)校驗與全局異常處理的方法
JSR-303 是 JAVA EE 6 中的一項子規(guī)范,叫做 Bean Validation,官方參考實現(xiàn)是Hibernate Validator,這篇文章主要介紹了springboot整合JSR303參數(shù)校驗與全局異常處理,需要的朋友可以參考下2022-09-09Spring Mvc中傳遞參數(shù)方法之url/requestMapping詳解
在開發(fā)中,參數(shù)傳遞是必不可少的一個功能,下面這篇文章主要給大家介紹了關于Spring Mvc中傳遞參數(shù)方法之url/requestMapping的相關資料,文中通過示例代碼介紹的非常詳細,需要的朋友可以參考借鑒,下面來一起看看吧。2017-07-07java如何獲取request中json數(shù)據(jù)
這篇文章主要給大家介紹了關于java如何獲取request中json數(shù)據(jù)的相關資料,文中通過代碼示例以及圖文將獲取的方法介紹的非常詳細,對大家學習或者使用java具有一定的參考借鑒價值,需要的朋友可以參考下2023-08-08