java并發(fā)編程_線(xiàn)程池的使用方法(詳解)
一、任務(wù)和執(zhí)行策略之間的隱性耦合
Executor可以將任務(wù)的提交和任務(wù)的執(zhí)行策略解耦
只有任務(wù)是同類(lèi)型的且執(zhí)行時(shí)間差別不大,才能發(fā)揮最大性能,否則,如將一些耗時(shí)長(zhǎng)的任務(wù)和耗時(shí)短的任務(wù)放在一個(gè)線(xiàn)程池,除非線(xiàn)程池很大,否則會(huì)造成死鎖等問(wèn)題
1.線(xiàn)程饑餓死鎖
類(lèi)似于:將兩個(gè)任務(wù)提交給一個(gè)單線(xiàn)程池,且兩個(gè)任務(wù)之間相互依賴(lài),一個(gè)任務(wù)等待另一個(gè)任務(wù),則會(huì)發(fā)生死鎖;表現(xiàn)為池不夠
定義:某個(gè)任務(wù)必須等待池中其他任務(wù)的運(yùn)行結(jié)果,有可能發(fā)生饑餓死鎖
2.線(xiàn)程池大小
注意:線(xiàn)程池的大小還受其他的限制,如其他資源池:數(shù)據(jù)庫(kù)連接池
如果每個(gè)任務(wù)都是一個(gè)連接,那么線(xiàn)程池的大小就受制于數(shù)據(jù)庫(kù)連接池的大小
3.配置ThreadPoolExecutor線(xiàn)程池
實(shí)例:
1.通過(guò)Executors的工廠方法返回默認(rèn)的一些實(shí)現(xiàn)
2.通過(guò)實(shí)例化ThreadPoolExecutor(.....)自定義實(shí)現(xiàn)
線(xiàn)程池的隊(duì)列
1.無(wú)界隊(duì)列:任務(wù)到達(dá),線(xiàn)程池飽滿(mǎn),則任務(wù)在隊(duì)列中等待,如果任務(wù)無(wú)限達(dá)到,則隊(duì)列會(huì)無(wú)限擴(kuò)張
如:?jiǎn)卫凸潭ù笮〉木€(xiàn)程池用的就是此種
2.有界隊(duì)列:如果新任務(wù)到達(dá),隊(duì)列滿(mǎn)則使用飽和策略
3.同步移交:如果線(xiàn)程池很大,將任務(wù)放入隊(duì)列后在移交就會(huì)產(chǎn)生延時(shí),如果任務(wù)生產(chǎn)者很快也會(huì)導(dǎo)致任務(wù)排隊(duì)
SynchronousQueue直接將任務(wù)移交給工作線(xiàn)程
機(jī)制:將一個(gè)任務(wù)放入,必須有一個(gè)線(xiàn)程等待接受,如果沒(méi)有,則新增線(xiàn)程,如果線(xiàn)程飽和,則拒絕任務(wù)
如:CacheThreadPool就是使用的這種策略
飽和策略:
setRejectedExecutionHandler來(lái)修改飽和策略
1.終止Abort(默認(rèn)):拋出異常由調(diào)用者處理
2.拋棄Discard
3.拋棄DiscardOldest:拋棄最舊的任務(wù),注意:如果是優(yōu)先級(jí)隊(duì)列將拋棄優(yōu)先級(jí)最高的任務(wù)
4.CallerRuns:回退任務(wù),有調(diào)用者線(xiàn)程自行處理
4.線(xiàn)程工廠ThreadFactoy
每當(dāng)創(chuàng)建線(xiàn)程時(shí):其實(shí)是調(diào)用了線(xiàn)程工廠來(lái)完成
自定義線(xiàn)程工廠:implements ThreadFactory
可以定制該線(xiàn)程工廠的行為:如UncaughtExceptionHandler等
public class MyAppThread extends Thread { public static final String DEFAULT_NAME = "MyAppThread"; private static volatile boolean debugLifecycle = false; private static final AtomicInteger created = new AtomicInteger(); private static final AtomicInteger alive = new AtomicInteger(); private static final Logger log = Logger.getAnonymousLogger(); public MyAppThread(Runnable r) { this(r, DEFAULT_NAME); } public MyAppThread(Runnable runnable, String name) { super(runnable, name + "-" + created.incrementAndGet()); //設(shè)置該線(xiàn)程工廠創(chuàng)建的線(xiàn)程的 未捕獲異常的行為 setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { public void uncaughtException(Thread t, Throwable e) { log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e); } }); } public void run() { // Copy debug flag to ensure consistent value throughout. boolean debug = debugLifecycle; if (debug) log.log(Level.FINE, "Created " + getName()); try { alive.incrementAndGet(); super.run(); } finally { alive.decrementAndGet(); if (debug) log.log(Level.FINE, "Exiting " + getName()); } } public static int getThreadsCreated() { return created.get(); } public static int getThreadsAlive() { return alive.get(); } public static boolean getDebug() { return debugLifecycle; } public static void setDebug(boolean b) { debugLifecycle = b; } }
5.擴(kuò)展ThreadPoolExecutor
可以被自定義子類(lèi)覆蓋的方法:
1.afterExecute:結(jié)束后,如果拋出RuntimeException則方法不會(huì)執(zhí)行
2.beforeExecute:開(kāi)始前,如果拋出RuntimeException則任務(wù)不會(huì)執(zhí)行
3.terminated:在線(xiàn)程池關(guān)閉時(shí),可以用來(lái)釋放資源等
二、遞歸算法的并行化
1.循環(huán)
在循環(huán)中,每次循環(huán)操作都是獨(dú)立的
//串行化 void processSequentially(List<Element> elements) { for (Element e : elements) process(e); } //并行化 void processInParallel(Executor exec, List<Element> elements) { for (final Element e : elements) exec.execute(new Runnable() { public void run() { process(e); } }); }
2.迭代
如果每個(gè)迭代操作是彼此獨(dú)立的,則可以串行執(zhí)行
如:深度優(yōu)先搜索算法;注意:遞歸還是串行的,但是,每個(gè)節(jié)點(diǎn)的計(jì)算是并行的
//串行 計(jì)算compute 和串行迭代 public <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results) { for (Node<T> n : nodes) { results.add(n.compute()); sequentialRecursive(n.getChildren(), results); } } //并行 計(jì)算compute 和串行迭代 public <T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results) { for (final Node<T> n : nodes) { exec.execute(() -> results.add(n.compute())); parallelRecursive(exec, n.getChildren(), results); } } //調(diào)用并行方法的操作 public <T> Collection<T> getParallelResults(List<Node<T>> nodes) throws InterruptedException { ExecutorService exec = Executors.newCachedThreadPool(); Queue<T> resultQueue = new ConcurrentLinkedQueue<T>(); parallelRecursive(exec, nodes, resultQueue); exec.shutdown(); exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); return resultQueue; }
實(shí)例:
public class ConcurrentPuzzleSolver <P, M> { private final Puzzle<P, M> puzzle; private final ExecutorService exec; private final ConcurrentMap<P, Boolean> seen; protected final ValueLatch<PuzzleNode<P, M>> solution = new ValueLatch<PuzzleNode<P, M>>(); public ConcurrentPuzzleSolver(Puzzle<P, M> puzzle) { this.puzzle = puzzle; this.exec = initThreadPool(); this.seen = new ConcurrentHashMap<P, Boolean>(); if (exec instanceof ThreadPoolExecutor) { ThreadPoolExecutor tpe = (ThreadPoolExecutor) exec; tpe.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); } } private ExecutorService initThreadPool() { return Executors.newCachedThreadPool(); } public List<M> solve() throws InterruptedException { try { P p = puzzle.initialPosition(); exec.execute(newTask(p, null, null)); // 等待ValueLatch中閉鎖解開(kāi),則表示已經(jīng)找到答案 PuzzleNode<P, M> solnPuzzleNode = solution.getValue(); return (solnPuzzleNode == null) ? null : solnPuzzleNode.asMoveList(); } finally { exec.shutdown();//最終主線(xiàn)程關(guān)閉線(xiàn)程池 } } protected Runnable newTask(P p, M m, PuzzleNode<P, M> n) { return new SolverTask(p, m, n); } protected class SolverTask extends PuzzleNode<P, M> implements Runnable { SolverTask(P pos, M move, PuzzleNode<P, M> prev) { super(pos, move, prev); } public void run() { //如果有一個(gè)線(xiàn)程找到了答案,則return,通過(guò)ValueLatch中isSet CountDownlatch閉鎖實(shí)現(xiàn); //為類(lèi)避免死鎖,將已經(jīng)掃描的節(jié)點(diǎn)放入set集合中,避免繼續(xù)掃描產(chǎn)生死循環(huán) if (solution.isSet() || seen.putIfAbsent(pos, true) != null){ return; // already solved or seen this position } if (puzzle.isGoal(pos)) { solution.setValue(this); } else { for (M m : puzzle.legalMoves(pos)) exec.execute(newTask(puzzle.move(pos, m), m, this)); } } } }
以上這篇java并發(fā)編程_線(xiàn)程池的使用方法(詳解)就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
- Java線(xiàn)程池并發(fā)執(zhí)行多個(gè)任務(wù)方式
- Java并發(fā)包線(xiàn)程池ThreadPoolExecutor的實(shí)現(xiàn)
- Java并發(fā)編程面試之線(xiàn)程池
- 詳解Java并發(fā)包中線(xiàn)程池ThreadPoolExecutor
- java并發(fā)包中CountDownLatch和線(xiàn)程池的使用詳解
- Java并發(fā)線(xiàn)程之線(xiàn)程池的知識(shí)總結(jié)
- Java并發(fā)之線(xiàn)程池Executor框架的深入理解
- Java并發(fā)之串行線(xiàn)程池實(shí)例解析
- Java并發(fā)線(xiàn)程池實(shí)例分析講解
相關(guān)文章
springboot2.x默認(rèn)使用的代理是cglib代理操作
這篇文章主要介紹了springboot2.x默認(rèn)使用的代理是cglib代理操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08Java+swing實(shí)現(xiàn)經(jīng)典貪吃蛇游戲
貪吃蛇(也叫做貪食蛇)游戲是一款休閑益智類(lèi)游戲,有PC和手機(jī)等多平臺(tái)版本。既簡(jiǎn)單又耐玩。本文將通過(guò)java的swing來(lái)實(shí)現(xiàn)這一游戲,需要的可以參考一下2022-01-01java自定義封裝StringUtils常用工具類(lèi)
這篇文章主要為大家詳細(xì)介紹了java自定義封裝StringUtils常用工具類(lèi),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-03-03mybatis整合ehcache做三級(jí)緩存的實(shí)現(xiàn)方法
ehcache是一個(gè)快速內(nèi)存緩存框架,java項(xiàng)目里用起來(lái)很方便,下面這篇文章主要給大家介紹了關(guān)于mybatis整合ehcache做三級(jí)緩存的實(shí)現(xiàn)方法,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2023-06-06Java中的lambda和stream實(shí)現(xiàn)排序
這篇文章主要介紹了Java中的lambda和stream實(shí)現(xiàn)排序,文章圍繞主題展開(kāi)詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的小伙伴可以參考一下2022-09-09mybatis中orderBy(排序字段)和sort(排序方式)引起的bug及解決
這篇文章主要介紹了mybatis中orderBy(排序字段)和sort(排序方式)引起的bug,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-01-01Java進(jìn)程cpu頻繁100%問(wèn)題解決方案
這篇文章主要介紹了Java進(jìn)程cpu頻繁100%問(wèn)題解決方案,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-10-10Java中使用MongoDB數(shù)據(jù)庫(kù)實(shí)例Demo
MongoDB是由C++語(yǔ)言編寫(xiě)的,基于分布式文件存儲(chǔ)的數(shù)據(jù)庫(kù),是一個(gè)介于關(guān)系數(shù)據(jù)庫(kù)和非關(guān)系數(shù)據(jù)庫(kù)之間的產(chǎn)品,是最接近于關(guān)系型數(shù)據(jù)庫(kù)的NoSQL數(shù)據(jù)庫(kù),下面這篇文章主要給大家介紹了關(guān)于Java中使用MongoDB數(shù)據(jù)庫(kù)的相關(guān)資料,需要的朋友可以參考下2023-12-12