java線程池ExecutorService超時處理小結
場景問題:使用線程池ExecutorService,想設置每個子線程的執(zhí)行超時時間,使用future.get()來監(jiān)聽超時,當有子線程阻塞時,導致有的隊列任務還未執(zhí)行就被取消了。
方式一、使用 future.get() 來監(jiān)聽超時取消
這種辦法看似能解決問題,但是當任務累積處理不過來時,會漏執(zhí)行。
比如下面的例子,就實際只會執(zhí)行一個子線程。
package com.study;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.*;
public class Test {
private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private static final ExecutorService threadPool = Executors.newFixedThreadPool(1);
public static void main(String[] args) throws Exception {
for (int i = 0; i < 10; i++) {
Future<?> future = threadPool.submit(new Runnable() {
@Override
public void run() {
try {
System.out.println(LocalDateTime.now().format(formatter));
Thread.sleep(5000);
} catch (InterruptedException e) {
// e.printStackTrace();
}
}
});
new Thread(new Runnable() {
@Override
public void run() {
try {
future.get(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {//超時異常
future.cancel(true); // 超時后取消任務
}
}
}).start();
}
}
}
方式二、在子線程內(nèi)部,超時后去發(fā)送中斷信號
package com.study;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.*;
public class Test {
private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private static final ExecutorService threadPool = Executors.newFixedThreadPool(1);
private static final ScheduledExecutorService timeoutExecutor = new ScheduledThreadPoolExecutor(1);//監(jiān)聽超時,這個數(shù)量要和線程池數(shù)量相同
public static void main(String[] args) throws Exception {
for (int i = 0; i < 10; i++) {
// int delay = 3;
int delay = i + 1;
threadPool.submit(new Runnable() {
@Override
public void run() {
ScheduledFuture<?> schedule = null;
try {
Thread thread = Thread.currentThread();
// 啟動一個定時器,如果任務執(zhí)行超過3秒則中斷當前線程
schedule = timeoutExecutor.schedule(() -> {
thread.interrupt(); // 中斷當前正在執(zhí)行的任務
}, delay, TimeUnit.SECONDS);
System.out.println(LocalDateTime.now().format(formatter));
Thread.sleep(5000);
// FileOutputStream fos = new FileOutputStream("d:/test.txt" + k);
// for (int j = 0; j < 1000000; j++) {
// fos.write("123".getBytes());
// }
// fos.close();
} catch (InterruptedException e) {
// e.printStackTrace();
} finally {
if (schedule != null) {
//取消任務
schedule.cancel(true);
}
}
}
});
}
}
}
這里其實還是有問題 ,把 Thread.sleep(5000);改成注釋的io阻塞,還是要等線程執(zhí)行結束后才會取消線程執(zhí)行。
所以單純使用 future 是實現(xiàn)不了這個場景的邏輯的。
timeoutExecutor 數(shù)量和 線程池數(shù)量要一致的原因如下示例。
package com.study;
import java.time.LocalDateTime;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ScheduledExecutorServiceExample {
private static final ScheduledExecutorService timeoutExecutor = new ScheduledThreadPoolExecutor(2);
public static void main(String[] args) throws InterruptedException {
// 調(diào)用schedule方法兩次
scheduleTask("Task 1");
scheduleTask("Task 2");
scheduleTask("Task 3");
}
private static void scheduleTask(String taskName) {
timeoutExecutor.schedule(() -> {
System.out.println(taskName + " started at: " + LocalDateTime.now());
try {
// 模擬任務執(zhí)行
Thread.sleep(2000); // 假設每個任務執(zhí)行2秒
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, 3, TimeUnit.SECONDS);
}
}方式三、自己定義鎖來實現(xiàn)
package com.study;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.*;
public class Test {
private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private static final ExecutorService threadPool = Executors.newFixedThreadPool(16);//這里不能設置為1了,這里已經(jīng)不是用來控制并發(fā)數(shù)量了,只是為了重復利用線程
private static final ScheduledExecutorService timeoutExecutor = new ScheduledThreadPoolExecutor(1);//監(jiān)聽超時,這個數(shù)量要和線程池數(shù)量相同
public static void main(String[] args) throws Exception {
for (int i = 0; i < 10; i++) {
Thread.sleep(50);
// int delay = 3;
int delay = i + 1;
int k = i;
threadPool.submit(new Runnable() {
@Override
public void run() {
ScheduledFuture<?> schedule = null;
try {
ThreadPool.awaitThread();
// 啟動一個定時器,如果任務執(zhí)行超過3秒則中斷當前線程
// timeoutExecutor如果只有一個線程池,這里面的代碼片段會阻塞,上一個線程在這里的代碼片段執(zhí)行完后,當前線程才會執(zhí)行這里的代碼片段,
// 但是影響不大,因為這里的代碼片段只是釋放動作,一瞬間就會執(zhí)行完,所以影響不大,
// 如果其他場景這里阻塞時間比較久,那么timeoutExecutor線程大小要和threadPool線程大小一致。
schedule = timeoutExecutor.schedule(() -> {
System.out.println("釋放1");
ThreadPool.releaseThread();
}, delay, TimeUnit.SECONDS);
System.out.println("【" + Thread.currentThread().getName() + "】" + LocalDateTime.now().format(formatter));
Thread.sleep(5000);
// FileOutputStream fos = new FileOutputStream("d:/test.txt" + k);
// for (int j = 0; j < 1000000; j++) {
// fos.write("123".getBytes());
// }
// fos.close();
} catch (Exception e) {
System.out.println("異常");
} finally {
if (schedule != null) {
//cancel返回true任務還未執(zhí)行,需要取消任務
if (schedule.cancel(true)) {
System.out.println("釋放2");
ThreadPool.releaseThread();
}
}
}
}
});
}
}
}package com.study;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 線程池
*/
public class ThreadPool {
private static final int MAX_POOL_SIZE = 1; // 最大線程數(shù),控制并發(fā)數(shù)量
private static int totalThread = 0; // 總線程數(shù)
private static final Lock lock = new ReentrantLock(true);
private static final Condition notice = lock.newCondition();
/**
* 從線程池獲取線程
*/
public static boolean awaitThread() {
lock.lock();
try {
// 嘗試從線程池中獲取線程
if (totalThread < MAX_POOL_SIZE) {
totalThread++;
return true;
}
// 線程已到達最大線程數(shù),等待歸還線程,最長等待1小時,await()會釋放當前線程的鎖
if (notice.await(1, TimeUnit.HOURS)) {
totalThread++;
return true;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return false;
}
/**
* 釋放線程到線程池
*/
public static void releaseThread() {
lock.lock();
try {
totalThread--;
// 通知有空閑,signal()會喚醒其中一個await()線程
notice.signal();
} finally {
lock.unlock();
}
}
}到此這篇關于java線程池ExecutorService超時處理小結的文章就介紹到這了,更多相關java線程池ExecutorService超時內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
java開啟遠程debug竟有兩種參數(shù)(最新推薦)
這篇文章主要介紹了java開啟遠程debug竟有兩種參數(shù),本文結合實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-07-07
SpringBoot3通過GraalVM生成exe執(zhí)行文件問題
文章介紹了如何安裝GraalVM和Visual Studio,并通過Spring Boot項目將Java應用程序封裝成可執(zhí)行文件(.exe)2024-12-12
SpringBoot使用Jasypt對YML文件配置內(nèi)容加密的方法(數(shù)據(jù)庫密碼加密)
本文介紹了如何在SpringBoot項目中使用Jasypt對application.yml文件中的敏感信息(如數(shù)據(jù)庫密碼)進行加密,通過引入Jasypt依賴、配置加密密鑰、加密敏感信息并測試解密功能,可以提高配置文件的安全性,減少因配置文件泄露導致的安全風險,感興趣的朋友一起看看吧2025-03-03
Java創(chuàng)建多線程的幾種方式實現(xiàn)
這篇文章主要介紹了Java創(chuàng)建多線程的幾種方式實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-10-10
10k+點贊的 SpringBoot 后臺管理系統(tǒng)教程詳解
這篇文章主要介紹了10k+點贊的 SpringBoot 后臺管理系統(tǒng)教程詳解,本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-01-01
Java HttpClient實現(xiàn)socks代理的示例代碼
這篇文章主要介紹了Java HttpClient 實現(xiàn) socks 代理的示例代碼,幫助大家更好的理解和使用Java,感興趣的朋友可以了解下2020-11-11

