JAVA多線程之實(shí)現(xiàn)用戶任務(wù)排隊(duì)并預(yù)估排隊(duì)時(shí)長(zhǎng)
實(shí)現(xiàn)流程
初始化一定數(shù)量的任務(wù)處理線程和緩存線程池,用戶每次調(diào)用接口,開啟一個(gè)線程處理。
假設(shè)初始化5個(gè)處理器,代碼執(zhí)行 BlockingQueue.take 時(shí)候,每次take都會(huì)處理器隊(duì)列就會(huì)減少一個(gè),當(dāng)處理器隊(duì)列為空時(shí),take就是阻塞線程,當(dāng)用戶處理某某任務(wù)完成時(shí)候,調(diào)用資源釋放接口,在處理器隊(duì)列put 一個(gè)處理器對(duì)象,原來阻塞的take ,就繼續(xù)執(zhí)行。
排隊(duì)論簡(jiǎn)介
排隊(duì)論是研究系統(tǒng)隨機(jī)聚散現(xiàn)象和隨機(jī)系統(tǒng)工作工程的數(shù)學(xué)理論和方法,又稱隨機(jī)服務(wù)系統(tǒng)理論,為運(yùn)籌學(xué)的一個(gè)分支。我們下面對(duì)排隊(duì)論做下簡(jiǎn)化處理,先看下圖:
代碼具體實(shí)現(xiàn)
任務(wù)隊(duì)列初始化 TaskQueue
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; /** * 初始化隊(duì)列及線程池 * @author tarzan * */ @Component public class TaskQueue { //處理器隊(duì)列 public static BlockingQueue<TaskProcessor> taskProcessors; //等待任務(wù)隊(duì)列 public static BlockingQueue<CompileTask> waitTasks; //處理任務(wù)隊(duì)列 public static BlockingQueue<CompileTask> executeTasks; //線程池 public static ExecutorService exec; //初始處理器數(shù)(計(jì)算機(jī)cpu可用線程數(shù)) public static Integer processorNum=Runtime.getRuntime().availableProcessors(); /** * 初始化處理器、等待任務(wù)、處理任務(wù)隊(duì)列及線程池 */ @PostConstruct public static void initEquipmentAndUsersQueue(){ exec = Executors.newCachedThreadPool(); taskProcessors =new LinkedBlockingQueue<TaskProcessor>(processorNum); //將空閑的設(shè)備放入設(shè)備隊(duì)列中 setFreeDevices(processorNum); waitTasks =new LinkedBlockingQueue<CompileTask>(); executeTasks=new LinkedBlockingQueue<CompileTask>(processorNum); } /** * 將空閑的處理器放入處理器隊(duì)列中 */ private static void setFreeDevices(int num) { //獲取可用的設(shè)備 for (int i = 0; i < num; i++) { TaskProcessor dc=new TaskProcessor(); try { taskProcessors.put(dc); } catch (InterruptedException e) { e.printStackTrace(); } } } public static CompileTask getWaitTask(Long clazzId) { return get(TaskQueue.waitTasks,clazzId); } public static CompileTask getExecuteTask(Long clazzId) { return get(TaskQueue.executeTasks,clazzId); } private static CompileTask get(BlockingQueue<CompileTask> users, Long clazzId) { CompileTask compileTask =null; if (CollectionUtils.isNotEmpty(users)){ Optional<CompileTask> optional=users.stream().filter(e->e.getClazzId().longValue()==clazzId.longValue()).findFirst(); if(optional.isPresent()){ compileTask = optional.get(); } } return compileTask; } public static Integer getSort(Long clazzId) { AtomicInteger index = new AtomicInteger(-1); BlockingQueue<CompileTask> compileTasks = TaskQueue.waitTasks; if (CollectionUtils.isNotEmpty(compileTasks)){ compileTasks.stream() .filter(e -> { index.getAndIncrement(); return e.getClazzId().longValue() == clazzId.longValue(); }) .findFirst(); } return index.get(); } //單位秒 public static int estimatedTime(Long clazzId){ return estimatedTime(60,getSort(clazzId)+1); } //單位秒 public static int estimatedTime(int cellMs,int num){ int a= (num-1)/processorNum; int b= cellMs*(a+1); return b; }
編譯任務(wù)類 CompileTask
import lombok.Data; import org.springblade.core.tool.utils.SpringUtil; import org.springblade.gis.common.enums.DataScheduleEnum; import org.springblade.gis.dynamicds.service.DynamicDataSourceService; import org.springblade.gis.modules.feature.schedule.service.DataScheduleService; import java.util.Date; @Data public class CompileTask implements Runnable { //當(dāng)前請(qǐng)求的線程對(duì)象 private Long clazzId; //用戶id private Long userId; //當(dāng)前請(qǐng)求的線程對(duì)象 private Thread thread; //綁定處理器 private TaskProcessor taskProcessor; //任務(wù)狀態(tài) private Integer status; //開始時(shí)間 private Date startTime; //結(jié)束時(shí)間 private Date endTime; private DataScheduleService dataScheduleService= SpringUtil.getBean(DataScheduleService.class); private DynamicDataSourceService dataSourceService= SpringUtil.getBean(DynamicDataSourceService.class); @Override public void run() { compile(); } /** * 編譯 */ public void compile() { try { //取出一個(gè)設(shè)備 TaskProcessor taskProcessor = TaskQueue.taskProcessors.take(); //取出一個(gè)任務(wù) CompileTask compileTask = TaskQueue.waitTasks.take(); //任務(wù)和設(shè)備綁定 compileTask.setTaskProcessor(taskProcessor); //放入 TaskQueue.executeTasks.put(compileTask); System.out.println(DataScheduleEnum.DEAL_WITH.getName()+" "+userId); //切換用戶數(shù)據(jù)源 dataSourceService.switchDataSource(userId); //添加進(jìn)度 dataScheduleService.addSchedule(clazzId, DataScheduleEnum.DEAL_WITH.getState()); } catch (InterruptedException e) { System.err.println( e.getMessage()); } } }
任務(wù)處理器 TaskProcessor?
import lombok.Data; import java.util.Date; @Data public class TaskProcessor { /** * 釋放 */ public static Boolean release(CompileTask task) { Boolean flag=false; Thread thread=task.getThread(); synchronized (thread) { try { if(null!=task.getTaskProcessor()){ TaskQueue.taskProcessors.put(task.getTaskProcessor()); TaskQueue.executeTasks.remove(task); task.setEndTime(new Date()); long intervalMilli = task.getEndTime().getTime() - task.getStartTime().getTime(); flag=true; System.out.println("用戶"+task.getClazzId()+"耗時(shí)"+intervalMilli+"ms"); } } catch (InterruptedException e) { e.printStackTrace(); } return flag; } } }
Controller控制器接口實(shí)現(xiàn)
import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import org.springblade.core.tool.api.R; import org.springblade.gis.multithread.TaskProcessor; import org.springblade.gis.multithread.TaskQueue; import org.springblade.gis.multithread.CompileTask; import org.springframework.web.bind.annotation.*; import java.util.Date; @RestController @RequestMapping("task") @Api(value = "數(shù)據(jù)編譯任務(wù)", tags = "數(shù)據(jù)編譯任務(wù)") public class CompileTaskController { @ApiOperation(value = "添加等待請(qǐng)求 @author Tarzan Liu") @PostMapping("compile/{clazzId}") public R<Integer> compile(@PathVariable("clazzId") Long clazzId) { CompileTask checkUser=TaskQueue.getWaitTask(clazzId); if(checkUser!=null){ return R.fail("已經(jīng)正在排隊(duì)!"); } checkUser=TaskQueue.getExecuteTask(clazzId); if(checkUser!=null){ return R.fail("正在執(zhí)行編譯!"); } //獲取當(dāng)前的線程 Thread thread=Thread.currentThread(); //創(chuàng)建當(dāng)前的用戶請(qǐng)求對(duì)象 CompileTask compileTask =new CompileTask(); compileTask.setThread(thread); compileTask.setClazzId(clazzId); compileTask.setStartTime(new Date()); //將當(dāng)前用戶請(qǐng)求對(duì)象放入隊(duì)列中 try { TaskQueue.waitTasks.put(compileTask); } catch (InterruptedException e) { e.printStackTrace(); } TaskQueue.exec.execute(compileTask); return R.data(TaskQueue.waitTasks.size()-1); } @ApiOperation(value = "查詢當(dāng)前任務(wù)前還有多少任務(wù)等待 @author Tarzan Liu") @PostMapping("sort/{clazzId}") public R<Integer> sort(@PathVariable("clazzId") Long clazzId) { return R.data(TaskQueue.getSort(clazzId)); } @ApiOperation(value = "查詢當(dāng)前任務(wù)預(yù)估時(shí)長(zhǎng) @author Tarzan Liu") @PostMapping("estimate/time/{clazzId}") public R<Integer> estimatedTime(@PathVariable("clazzId") Long clazzId) { return R.data(TaskQueue.estimatedTime(clazzId)); } @ApiOperation(value = "任務(wù)釋放 @author Tarzan Liu") @PostMapping("release/{clazzId}") public R<Boolean> release(@PathVariable("clazzId") Long clazzId) { CompileTask task=TaskQueue.getExecuteTask(clazzId); if(task==null){ return R.fail("資源釋放異常"); } return R.status(TaskProcessor.release(task)); } @ApiOperation(value = "執(zhí)行 @author Tarzan Liu") @PostMapping("exec") public R exec() { Long start=System.currentTimeMillis(); for (Long i = 1L; i < 100; i++) { compile(i); } System.out.println("消耗時(shí)間:"+(System.currentTimeMillis()-start)+"ms"); return R.status(true); } }
接口測(cè)試
根據(jù)任務(wù)id查詢?cè)撊蝿?wù)前還有多少個(gè)任務(wù)待執(zhí)行
根據(jù)任務(wù)id查詢?cè)撊蝿?wù)預(yù)估執(zhí)行完成的剩余時(shí)間,單位秒
補(bǔ)充知識(shí)
BlockingQueue
BlockingQueue即阻塞隊(duì)列,它是基于ReentrantLock,依據(jù)它的基本原理,我們可以實(shí)現(xiàn)Web中的長(zhǎng)連接聊天功能,當(dāng)然其最常用的還是用于實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者模式,大致如下圖所示:
在Java中,BlockingQueue是一個(gè)接口,它的實(shí)現(xiàn)類有ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等,它們的區(qū)別主要體現(xiàn)在存儲(chǔ)結(jié)構(gòu)上或?qū)υ夭僮魃系牟煌?,但是?duì)于take與put操作的原理,卻是類似的。
阻塞與非阻塞
入隊(duì)
offer(E e):如果隊(duì)列沒滿,立即返回true; 如果隊(duì)列滿了,立即返回false-->不阻塞
put(E e):如果隊(duì)列滿了,一直阻塞,直到隊(duì)列不滿了或者線程被中斷-->阻塞
offer(E e, long timeout, TimeUnit unit):在隊(duì)尾插入一個(gè)元素,,如果隊(duì)列已滿,則進(jìn)入等待,直到出現(xiàn)以下三種情況:-->阻塞
被喚醒
等待時(shí)間超時(shí)
當(dāng)前線程被中斷
出隊(duì)
poll():如果沒有元素,直接返回null;如果有元素,出隊(duì)
take():如果隊(duì)列空了,一直阻塞,直到隊(duì)列不為空或者線程被中斷-->阻塞
poll(long timeout, TimeUnit unit):如果隊(duì)列不空,出隊(duì);如果隊(duì)列已空且已經(jīng)超時(shí),返回null;如果隊(duì)列已空且時(shí)間未超時(shí),則進(jìn)入等待,直到出現(xiàn)以下三種情況:
被喚醒
等待時(shí)間超時(shí)
當(dāng)前線程被中斷?
到此這篇關(guān)于JAVA多線程之實(shí)現(xiàn)用戶任務(wù)排隊(duì)并預(yù)估排隊(duì)時(shí)長(zhǎng)的文章就介紹到這了,更多相關(guān)JAVA 多線程 用戶任務(wù)排隊(duì)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java實(shí)現(xiàn)按中文首字母排序的具體實(shí)例
這篇文章主要介紹了Java實(shí)現(xiàn)按中文首字母排序的具體實(shí)例,有需要的朋友可以參考一下2013-12-12spring cloud consul使用ip注冊(cè)服務(wù)的方法示例
這篇文章主要介紹了spring cloud consul使用ip注冊(cè)服務(wù)的方法示例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-03-03Java實(shí)現(xiàn)添加文字水印和圖片水印功能
為圖片添加水印是一種常用的圖片處理技術(shù),本文主要介紹了Java實(shí)現(xiàn)添加文字水印和圖片水印功能,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-05-05Java Swing null絕對(duì)布局的實(shí)現(xiàn)示例
這篇文章主要介紹了Java Swing null絕對(duì)布局的實(shí)現(xiàn)示例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-12-12java發(fā)送heartbeat心跳包(byte轉(zhuǎn)16進(jìn)制)
這篇文章主要介紹了java發(fā)送heartbeat心跳包(byte轉(zhuǎn)16進(jìn)制),需要的朋友可以參考下2014-05-05在springboot中使用AOP進(jìn)行全局日志記錄
這篇文章主要介紹就在springboot中使用AOP進(jìn)行全局日志記錄,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11Java更新調(diào)度器(update scheduler)的使用詳解
Java更新調(diào)度器是Java中的一個(gè)特性,可以自動(dòng)化Java應(yīng)用程序的更新過程,它提供了一種方便的方式來安排Java應(yīng)用程序的更新,確保其與最新的功能、錯(cuò)誤修復(fù)和安全補(bǔ)丁保持同步,本文將深入介紹如何使用Java更新調(diào)度器,并解釋它對(duì)Java開發(fā)人員和用戶的好處2023-11-11