Java優(yōu)雅的關(guān)閉線程池的方法
背景
前幾天在和同事聊一個需求,說是有個數(shù)據(jù)查詢的功能,因為涉及到多個第三方接口調(diào)用,想用線程池并行來做。
很正常的一個方案,但是上線后發(fā)現(xiàn),每次服務(wù)發(fā)布的時候,這個數(shù)據(jù)查詢的功能就會掛掉,后來發(fā)現(xiàn)是線程池沒有做好關(guān)閉,這里總結(jié)一下。
關(guān)鍵字:線程池、shutdown、shutdownNow、interrupt
一、線程中斷 interrupt
先補一補基礎(chǔ)的知識:線程中斷。線程中斷的含義,并不是強制把運行中的線程給“咔嚓”中斷,而是把線程的中斷標志位置為true,這樣等線程之后阻塞(wait、join、sleep)的時候,就會拋出 InterruptedException,程序通過捕獲 InterruptedException 來做一定的善后處理,然后讓線程退出。
來看個例子,下面這段代碼是起一個線程,打印一百行文本,打印過程中,會把線程的中斷標志位置為true
public static void test02() throws InterruptedException { Thread t = new Thread(() -> { for (int i = 0; i < 100; i++) { System.out.println("process i=" + i + ",interrupted:" + Thread.currentThread().isInterrupted()); } }); t.start(); Thread.sleep(1); t.interrupt(); }
看看控制臺的輸出,發(fā)現(xiàn)在打印到 57 的時候,中斷標志位已經(jīng)成功置為true了,但是線程任然在打印,說明只是設(shè)置了中斷標志位,而不是直接粗暴的把線程中斷。
...
process i=55,interrupted:false
process i=56,interrupted:false
process i=57,interrupted:true
process i=58,interrupted:true
process i=59,interrupted:true
...
再看看這個示例,同樣是打印一百行文本,打印過程中會判斷中斷標志位,如果中斷就自行退出。
public static void test02() throws InterruptedException { Thread t = new Thread(() -> { for (int i = 0; i < 100; i++) { if (Thread.interrupted()) { System.out.println("線程已中斷,退出執(zhí)行"); break; } System.out.println("process i=" + i + ",interrupted:" + Thread.currentThread().isInterrupted()); } }); t.start(); Thread.sleep(1); t.interrupt(); }
控制臺輸出如下,:
process i=49,interrupted:false
process i=50,interrupted:false
process i=51,interrupted:false
線程已中斷,退出執(zhí)行
二、線程池的關(guān)閉 shutdown 方法
了解完線程中斷,再來看看線程池的關(guān)閉方法。
關(guān)閉線程池有兩個方法 shutdown() 和 shutdownNow(),具體有什么區(qū)別?我們先來看看 shutdown() 方法
/** * Initiates an orderly shutdown in which previously submitted * tasks are executed, but no new tasks will be accepted. * Invocation has no additional effect if already shut down. * * <p>This method does not wait for previously submitted tasks to * complete execution. Use {@link #awaitTermination awaitTermination} * to do that. * * @throws SecurityException {@inheritDoc} */ public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); // 1. 把線程池的狀態(tài)設(shè)置為 SHUTDOWN interruptIdleWorkers(); // 2. 把空閑的工作線程置為中斷 onShutdown(); // 3. 一個空實現(xiàn),暫不用關(guān)注 } finally { mainLock.unlock(); } tryTerminate(); }
看源碼先看注釋,翻譯下:
啟動有序關(guān)閉會執(zhí)行以前提交的任務(wù),但不接受任何新任務(wù)。如果已經(jīng)關(guān)閉,則調(diào)用不會產(chǎn)生額外的影響。此方法不等待活動執(zhí)行的任務(wù)終止。如果需要,可使用 awaitTermination() 做到這一點。
2.1、第一步:advanceRunState(SHUTDOWN) 把線程池置為 SHUTDOWN
線程池狀態(tài)流轉(zhuǎn)如下。調(diào)用 shutdown() 方法會把線程池的狀態(tài)置為 SHUTDOWN,后續(xù)再往線程池提交任務(wù)就會被拒絕(execute() 方法中做了判斷)。
2.2、第二步:interruptIdleWorkers() 把空閑的工作線程置為中斷
interruptIdleWorkers() 方法遍歷所有的工作線程,如果 tryLock() 成功,就把線程置為中斷。這里,如果 tryLock() 成功,說明對應(yīng)的 woker 是一個空閑的,沒有在執(zhí)行任務(wù)的線程,如果沒成功,說明對應(yīng)的 worker 正在執(zhí)行任務(wù)。也就是說,這里的中斷,對正在執(zhí)行中的任務(wù)并沒有影響。
private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
2.3、 第三步:onShutdown() 一個空實現(xiàn),暫不用關(guān)注
這個沒啥,就是個留空的方法。
2.4、 小結(jié)
shutdown() 方法干兩件事:
- 把線程池狀態(tài)置為 SHUTDOWN 狀態(tài)
- 中斷空閑線程
我們來看個例子,加深下印象。
public static void test01() throws InterruptedException { // corePoolSize 是 2,maximumPoolSize 是 2 ThreadPoolExecutor es = new ThreadPoolExecutor(2, 2, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); es.prestartAllCoreThreads(); // 啟動所有 worker es.execute(new Task()); // Task是一個訪問某網(wǎng)站的 HTTP 請求,跑的慢,后面會貼出來完整代碼,這里把他當做一個跑的慢的異步任務(wù)就行 es.shutdown(); es.execute(new Task()); // 在線程池 shutdown() 后 繼續(xù)添加任務(wù),這里預(yù)期是拋出異常 }
這個例子我們主要觀察兩個現(xiàn)象。
一個是線程池會有兩個woker( prestartAllCoreThreads() 方法的調(diào)用使得已啟動就有兩個 worker),其中一個正在執(zhí)行,一個處于空閑。所以當調(diào)用shutdown() 方法,走進 interruptIdleWorkers() 的時候,只有那個空閑的線程會調(diào)用 t.interrupt()。
第二個是調(diào)用 shutdown() 方法后,再調(diào)用 execute() 時,會拋出異常,因為線程池的狀態(tài)已經(jīng)置為 SHUTDOWN,不再接受新的任務(wù)添加進來。
三、線程池的關(guān)閉 shutdownNow 方式
/** * Attempts to stop all actively executing tasks, halts the * processing of waiting tasks, and returns a list of the tasks * that were awaiting execution. These tasks are drained (removed) * from the task queue upon return from this method. * * <p>This method does not wait for actively executing tasks to * terminate. Use {@link #awaitTermination awaitTermination} to * do that. * * <p>There are no guarantees beyond best-effort attempts to stop * processing actively executing tasks. This implementation * cancels tasks via {@link Thread#interrupt}, so any task that * fails to respond to interrupts may never terminate. * * @throws SecurityException {@inheritDoc} */ public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); // 1:把線程池設(shè)置為STOP interruptWorkers(); // 2.中斷工作線程 tasks = drainQueue(); // 3.把線程池中的任務(wù)都 drain 出來 } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
注釋的意思是:
嘗試停止所有正在執(zhí)行的任務(wù),暫停正在等待的任務(wù)的處理,并返回等待執(zhí)行的任務(wù)列表。從該方法返回時,這些任務(wù)將從任務(wù)隊列中清空(移除)。此方法不等待活動執(zhí)行的任務(wù)終止。如果需要,可使用 awaitTermination() 做到這一點。除了盡最大努力嘗試停止處理主動執(zhí)行的任務(wù)之外,沒有其他保證。此實現(xiàn)通過 Thread.Interrupt() 取消任務(wù),因此任何無法響應(yīng)中斷的任務(wù)都可能永遠不會終止。
3.1、第一步:advanceRunState() 把線程池設(shè)置為STOP
和 shutdown() 方法不同的是,shutdownNow() 方法會把線程池的狀態(tài)設(shè)置為 STOP。
3.2、 第二步:interruptWorkers() 中斷工作線程
interruptWorkers() 如下,可以看到,和 shutdown() 方法不同的是,所有的工作線程都調(diào)用了 interrupt() 方法
/** * Interrupts all threads, even if active. Ignores SecurityExceptions * (in which case some threads may remain uninterrupted). */ private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } }
3.3、第三步:drainQueue() 把線程池中的任務(wù)都 drain 出來
drainQueue() 方法如下,把阻塞隊列里面等待的任務(wù)都拿出來,并返回。關(guān)閉線程池的時候,可以基于這個特性,把返回的任務(wù)都打印出來,做個記錄。
/** * Drains the task queue into a new list, normally using * drainTo. But if the queue is a DelayQueue or any other kind of * queue for which poll or drainTo may fail to remove some * elements, it deletes them one by one. */ private List<Runnable> drainQueue() { BlockingQueue<Runnable> q = workQueue; ArrayList<Runnable> taskList = new ArrayList<Runnable>(); q.drainTo(taskList); if (!q.isEmpty()) { for (Runnable r : q.toArray(new Runnable[0])) { if (q.remove(r)) taskList.add(r); } } return taskList; }
3.4、小結(jié)
shutdownNow() 方法干三件事:
- 把線程池狀態(tài)置為 STOP 狀態(tài)
- 中斷工作線程
- 把線程池中的任務(wù)都 drain 出來并返回
我們來看個例子,代碼合剛才的一樣,只是關(guān)閉線程用的是shutdownNow()
public static void test01() throws InterruptedException { // corePoolSize 是 1,maximumPoolSize 是 1,無限容量 ThreadPoolExecutor es = new ThreadPoolExecutor(1, 1, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); es.prestartAllCoreThreads(); // 啟動所有 worker es.execute(new Task()); // Task是一個訪問某網(wǎng)站的 HTTP 請求,跑的慢,后面會貼出來完整代碼,這里把他當做一個跑的慢的異步任務(wù)就行 es.execute(new Task()); List<Runnable> result = es.shutdownNow(); System.out.println(result); es.execute(new Task()); // 在線程池 shutdownNow() 后 繼續(xù)添加任務(wù),這里預(yù)期是拋出異常 }
這個例子我們主要觀察三個現(xiàn)象。一個是線程池有兩個woker,所以當調(diào)用shutdownNow() 方法,走進 interruptWorkers() 的時候,所有的 woker 都會調(diào)用 t.interrupt()。
第二個是 shutdownNow() 方法會返回還沒來得及執(zhí)行的task,并打印出來。第三個是調(diào)用 shutdownNow() 方法后,再調(diào)用 execute() 時,會拋出異常,因為線程池的狀態(tài)已經(jīng)置為 STOP,不再接受新的任務(wù)添加
四、實戰(zhàn),與 JVM 鉤子配合
實際工作中,我們一般是使用 shutdown() 方法,因為它比較“溫和”,會等待我們把線程池中的任務(wù)都執(zhí)行完,這里也已 shutdown() 方法為例。
我們回到最開頭聊到的那個 case,機器重新發(fā)布,但是線程池中還有沒執(zhí)行完任務(wù),機器一關(guān),這些任務(wù)全部被kill,怎么辦呢?有什么機制能夠阻塞一下,等待這個任務(wù)執(zhí)行完再關(guān)閉嗎?
有的,用 JVM 的鉤子!
實例代碼如下,一個線程池,提交了三個任務(wù)去執(zhí)行,執(zhí)行完得半分鐘。然后增加一個JVM的鉤子,這個鉤子可以簡單理解為監(jiān)聽器,注冊后,JVM在關(guān)閉的時候就會調(diào)用這個方法,調(diào)用完才會正式關(guān)閉JVM。
public static void test01() throws InterruptedException { ThreadPoolExecutor es = new ThreadPoolExecutor(1, 1, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); es.execute(new Task()); es.execute(new Task()); es.execute(new Task()); Thread shutdownHook = new Thread(() -> { es.shutdown(); try { es.awaitTermination(3, TimeUnit.MINUTES); } catch (InterruptedException e) { e.printStackTrace(); System.out.println("等待超時,直接關(guān)閉"); } }); Runtime.getRuntime().addShutdownHook(shutdownHook); }
在機器上執(zhí)行,會發(fā)現(xiàn),我使用 ctrl + c (注意不是ctrl + z )關(guān)閉進程,會發(fā)現(xiàn)進程并沒有直接關(guān)閉,線程池任然執(zhí)行,一直等到線程池的任務(wù)執(zhí)行完,進程才會正式退出。
怎么樣,是不是很神奇。本文中涉及的 Task 的源碼如下。這個任務(wù)是對 stackoverflow 網(wǎng)站發(fā)起 10 次請求,用來模擬跑的比較慢的任務(wù),當然這不是重點,可以忽略,有興趣動手試一下本文代碼的同學(xué)可以參考下。
public static class Task implements Runnable { @Override public void run() { System.out.println("task start"); for (int i = 0; i < 10; i++) { httpGet(); System.out.println("task execute " + i); } System.out.println("task finish"); } private void httpGet() { String url = "https://stackoverflow.com/"; String result = ""; BufferedReader in = null; try { String urlName = url; URL realUrl = new URL(urlName); // 打開和URL之間的連接 URLConnection conn = realUrl.openConnection(); // 設(shè)置通用的請求屬性 conn.setRequestProperty("accept", "*/*"); conn.setRequestProperty("connection", "Keep-Alive"); conn.setRequestProperty("user-agent", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1)"); // 建立實際的連接 conn.connect(); // 獲取所有響應(yīng)頭字段 Map<String, List<String>> map = conn.getHeaderFields(); // 遍歷所有的響應(yīng)頭字段 // for (String key : map.keySet()) { // System.out.println(key + "--->" + map.get(key)); // } // 定義BufferedReader輸入流來讀取URL的響應(yīng) in = new BufferedReader( new InputStreamReader(conn.getInputStream())); String line; while ((line = in.readLine()) != null) { result += "/n" + line; } } catch (Exception e) { e.printStackTrace(); } // 使用finally塊來關(guān)閉輸入流 finally { try { if (in != null) { in.close(); } } catch (Exception ex) { ex.printStackTrace(); } } // System.out.print(result); } }
五、總結(jié)
想要優(yōu)雅的關(guān)閉線程池,首先要理解線程中斷的含義。
其次,關(guān)閉線程池有兩種方式:shutdown() 和 shutdownNow(),二者最大的區(qū)別是 shutdown() 只是把空閑的 woker 置為中斷,不影響正在運行的woker,并且會繼續(xù)把待執(zhí)行的任務(wù)給處理完。shutdonwNow() 則是把所有的 woker 都置為中斷,待執(zhí)行的任務(wù)全部抽出并返回,日常工作中更多是使用 shutdown()。
最后,單純的使用 shutdown() 也不靠譜,還得使用 awaitTermination() 和 JVM 的鉤子,才算優(yōu)雅的關(guān)閉線程池。
到此這篇關(guān)于Java優(yōu)雅的關(guān)閉線程池的方法的文章就介紹到這了,更多相關(guān)Java 關(guān)閉線程池內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
idea?2024使用Maven創(chuàng)建Java?Web項目詳細圖文教程
這篇文章主要給大家介紹了關(guān)于idea?2024使用Maven創(chuàng)建Java?Web項目的相關(guān)資料,介紹了如何使用Maven創(chuàng)建一個Spring?MVC項目,并配置Tomcat服務(wù)器以運行一個簡單的Helloworld?JSP頁面,需要的朋友可以參考下2024-12-12

Java負載均衡算法實現(xiàn)之輪詢和加權(quán)輪詢