解決線程池中ThreadGroup的坑
線程池中ThreadGroup的坑
在Java中每一個線程都歸屬于某個線程組管理的一員,例如在主函數(shù)main()主工作流程中產(chǎn)生一個線程,則產(chǎn)生的線程屬于main這個線程組管理的一員。簡單地說,線程組(ThreadGroup)就是由線程組成的管理線程的類,這個類是java.lang.ThreadGroup類。
定義一個線程組,通過以下代碼可以實現(xiàn)。
ThreadGroup group=new ThreadGroup(“groupName”); Thread thread=new Thread(group,”the first thread of group”);
ThreadGroup類中的某些方法,可以對線程組中的線程產(chǎn)生作用。例如,setMaxPriority()方法可以設(shè)定線程組中的所有線程擁有最大的優(yōu)先權(quán)。
所有線程都隸屬于一個線程組。那可以是一個默認線程組(不指定group),亦可是一個創(chuàng)建線程時明確指定的組。在創(chuàng)建之初,線程被限制到一個組里,而且不能改變到一個不同的組。每個應(yīng)用都至少有一個線程從屬于系統(tǒng)線程組。若創(chuàng)建多個線程而不指定一個組,它們就會自動歸屬于系統(tǒng)線程組。
線程組也必須從屬于其他線程組。必須在構(gòu)建器里指定新線程組從屬于哪個線程組。若在創(chuàng)建一個線程組的時候沒有指定它的歸屬,則同樣會自動成為系統(tǒng)線程組的一名屬下。因此,一個應(yīng)用程序中的所有線程組最終都會將系統(tǒng)線程組作為自己的“父”。
那么假如我們需要在線程池中實現(xiàn)一個帶自定義ThreadGroup的線程分組,該怎么實現(xiàn)呢?
我們在給線程池(ThreadPoolExecutor)提交任務(wù)的時候可以通過execute(Runnable command)來將一個線程任務(wù)加入到該線程池,那么我們是否可以通過new一個指定了ThreadGroup的Thread實例來加入線程池來達到前面說到的目的呢?
ThreadGroup是否可行
通過new Thread(threadGroup,runnable)實現(xiàn)線程池中任務(wù)分組
public static void main(String[] args) {
ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newCachedThreadPool();
final ThreadGroup group = new ThreadGroup("Main_Test_Group");
for (int i = 0; i < 5; i++) {
Thread thread = new Thread(group, new Runnable() {
@Override
public void run() {
int sleep = (int)(Math.random() * 10);
try {
Thread.sleep(1000 * 3);
System.out.println(Thread.currentThread().getName()+"執(zhí)行完畢");
System.out.println("當前線程組中的運行線程數(shù)"+group.activeCount());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, group.getName()+" #"+i+"");
pool.execute(thread);
}
}
運行結(jié)果
pool-1-thread-3執(zhí)行完畢
pool-1-thread-1執(zhí)行完畢
當前線程組中的運行線程數(shù)0
pool-1-thread-2執(zhí)行完畢
當前線程組中的運行線程數(shù)0
當前線程組中的運行線程數(shù)0
pool-1-thread-4執(zhí)行完畢
pool-1-thread-5執(zhí)行完畢
當前線程組中的運行線程數(shù)0
當前線程組中的運行線程數(shù)0
運行結(jié)果中可以看到group中的線程并沒有因為線程池啟動了這個線程任務(wù)而運行起來.因此通過線程組來對線程池中的線層任務(wù)分組不可行.
從java.util.concurrent.ThreadPoolExecutor源碼中可以看到如下構(gòu)造函數(shù):
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
如果我們在實例化ThreadPoolExecutor時不指定ThreadFactory,那么將以默認的ThreadFactory來創(chuàng)建Thread.
Executors內(nèi)部類DefaultThreadFactory
下面的源碼即是默認的Thread工廠
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
從唯一的構(gòu)造函數(shù)可以看到DefaultThreadFactory以SecurityManager 實例中的ThreadGroup來指定線程的group,如果SecurityManager 獲取到的ThreadGroup為null才默認以當前線程的group來指定.public Thread newThread(Runnable r) 則以group來new 一個Thead.這樣我們可以在實例化ThreadPoolExecutor對象的時候在其構(gòu)造函數(shù)內(nèi)傳入自定義的ThreadFactory實例即可達到目的.
public class MyTheadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
private ThreadGroup defaultGroup;
public MyTheadFactory() {
SecurityManager s = System.getSecurityManager();
defaultGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
}
public MyTheadFactory(ThreadGroup group) {
this.defaultGroup = group;
namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(defaultGroup, null, namePrefix + threadNumber.getAndIncrement(), 0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
ThreadGroup的使用及手寫線程池
監(jiān)聽線程異常關(guān)閉
以下代碼在window下不方便測試,需在linux 上 測試
// 以下線程如果強制關(guān)閉的話,是無法打印`線程被殺掉了`
// 模擬關(guān)閉 kill PID
public static void main(String[] args) {
Runtime.getRuntime().addShutdownHook(new Thread( () -> {
System.out.println("線程被殺掉了");
}));
while(true){
System.out.println("i am working ...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
如何拿到Thread線程中異常
public static void main(String[] args) {
Thread thread = new Thread(() -> {
try {
Thread.sleep(1000);
int i = 10/0;
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread.setUncaughtExceptionHandler((t,e)->{
System.out.println("線程的名字"+ t.getName());
System.out.println(e);
}); // 通過注入接口的方式
thread.start();
}
ThreadGroup
注意: threadGroup 設(shè)置為isDaemon 后,會隨最后一個線程結(jié)束而銷毀,如果沒有設(shè)置isDaemon ,則需要手動調(diào)用 destory()
線程池使用
自己搭建的簡單線程池實現(xiàn)
其中ThreadGroup 的應(yīng)用沒有寫,但是我們可以觀察線程關(guān)閉后,檢查ThreadGroup 中是否還有活躍的線程等,具體參考ThreadGroup API
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.IntStream;
/**
* @Author: shengjm
* @Date: 2020/2/10 9:52
* @Description:
*/
public class SimpleThreadPool extends Thread{
/**
* 線程數(shù)量
*/
private int size;
private final int queueSize;
/**
* 默認線程隊列數(shù)量
*/
private final static int DEFAULR_TASK_QUEUE_SIZE = 2000;
private static volatile int seq = 0;
private final static String THREAD_PREFIX = "SIMPLE_THREAD_POLL_";
private final static ThreadGroup GROUP = new ThreadGroup("Pool_Group");
private final static LinkedList<Runnable> TASK_QUEUE = new LinkedList<>();
private final static List<WorkerTask> THREAD_QUEUE = new ArrayList<>();
private final DiscardPolicy discardPolicy;
private volatile boolean destory = false;
private int min;
private int max;
private int active;
/**
* 定義異常策略的實現(xiàn)
*/
private final static DiscardPolicy DEFAULT_DISCARD_POLICY = () -> {
throw new DiscardException("線程池已經(jīng)被撐爆了,后繼多余的人將丟失");
};
/**
*
*/
public SimpleThreadPool(){
this(4,8,12,DEFAULR_TASK_QUEUE_SIZE,DEFAULT_DISCARD_POLICY);
}
/**
*
*/
public SimpleThreadPool(int min , int active , int max , int queueSize,DiscardPolicy discardPolicy) {
this.min = min;
this.active = active;
this.max = max;
this.queueSize = queueSize;
this.discardPolicy = discardPolicy;
init();
}
/**
* 初始化
*/
private void init() {
for(int i = 0; i < min; i++){
createWorkTask();
}
this.size = min;
this.start();
}
private void createWorkTask(){
WorkerTask task = new WorkerTask(GROUP,THREAD_PREFIX+(seq++));
task.start();
THREAD_QUEUE.add(task);
}
/**
* 線程池自動擴充
*/
@Override
public void run() {
while(!destory){
System.out.println(this.min +" --- "+this.active+" --- "+this.max + " --- "+ this.size + " --- "+ TASK_QUEUE.size());
try {
Thread.sleep(1000);
if(TASK_QUEUE.size() > active && size < active){
for (int i = size; i < active;i++){
createWorkTask();
}
size = active;
}else if(TASK_QUEUE.size() > max && size < max){
for (int i = size; i < max;i++){
createWorkTask();
}
size = max;
}
synchronized (THREAD_QUEUE){
if(TASK_QUEUE.isEmpty() && size > active){
int release = size - active;
for (Iterator<WorkerTask> it = THREAD_QUEUE.iterator();it.hasNext();){
if(release <=0){
break;
}
WorkerTask task = it.next();
task.close();
task.interrupt();
it.remove();
release--;
}
size = active;
}
}
} catch (InterruptedException e) {
break;
}
}
}
public void submit(Runnable runnable){
synchronized (TASK_QUEUE){
if(destory){
throw new DiscardException("線程池已經(jīng)被摧毀了...");
}
if(TASK_QUEUE.size() > queueSize){
discardPolicy.discard();
}
TASK_QUEUE.addLast(runnable);
TASK_QUEUE.notifyAll();
}
}
/**
* 關(guān)閉
*/
public void shutdown(){
while(!TASK_QUEUE.isEmpty()){
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
synchronized (THREAD_QUEUE) {
int initVal = THREAD_QUEUE.size();
while (initVal > 0) {
for (WorkerTask workerTask : THREAD_QUEUE) {
if (workerTask.getTaskState() == TaskState.BLOCKED) {
workerTask.interrupt();
workerTask.close();
initVal--;
} else {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
this.destory = true;
}
}
public int getSize() {
return size;
}
public int getMin() {
return min;
}
public int getMax() {
return max;
}
public int getActive() {
return active;
}
/**
* 線程狀態(tài)
*/
private enum TaskState{
FREE , RUNNING , BLOCKED , DEAD
}
/**
* 自定義異常類
*/
public static class DiscardException extends RuntimeException{
public DiscardException(String message){
super(message);
}
}
/**
* 定義異常策略
*/
@FunctionalInterface
public interface DiscardPolicy{
void discard() throws DiscardException;
}
private static class WorkerTask extends Thread{
private volatile TaskState taskState = TaskState.FREE;
public TaskState getTaskState(){
return this.taskState;
}
public WorkerTask(ThreadGroup group , String name){
super(group , name);
}
@Override
public void run(){
OUTER:
while(this.taskState != TaskState.DEAD){
Runnable runnable;
synchronized (TASK_QUEUE){
while(TASK_QUEUE.isEmpty()){
try {
taskState = TaskState.BLOCKED;
TASK_QUEUE.wait();
} catch (InterruptedException e) {
break OUTER;
}
}
runnable = TASK_QUEUE.removeFirst();
}
if(runnable != null){
taskState = TaskState.RUNNING;
runnable.run();
taskState = TaskState.FREE;
}
}
}
public void close(){
this.taskState = TaskState.DEAD;
}
}
/**
* 測試
* @param args
*/
public static void main(String[] args) {
SimpleThreadPool simpleThreadPool = new SimpleThreadPool();
// SimpleThreadPool simpleThreadPool = new SimpleThreadPool(6,15,SimpleThreadPool.DEFAULT_DISCARD_POLICY);
IntStream.rangeClosed(0,40).forEach(i -> {
simpleThreadPool.submit(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("the runnable " + i + "be servered by " + Thread.currentThread());
});
});
// try {
// Thread.sleep(15000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
simpleThreadPool.shutdown();
}
}
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
Java并發(fā)包工具類CountDownLatch的應(yīng)用詳解
CountDownLatch是Java并發(fā)包中非常實用的一個工具類,它可以幫助我們實現(xiàn)線程之間的同步和協(xié)作。本文主要介紹了CountDownLatch的應(yīng)用場景及最佳實踐,希望對大家有所幫助2023-04-04
SpringBoot?DataSource數(shù)據(jù)源實現(xiàn)自動配置流程詳解
這篇文章主要介紹了SpringBoot?DataSource數(shù)據(jù)源實現(xiàn)自動配置流程,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習吧2022-10-10

