java項目中的多線程實踐記錄
項目開發(fā)中對于一些數(shù)據(jù)的處理需要用到多線程,比如文件的批量上傳,數(shù)據(jù)庫的分批寫入,大文件的分段下載等。 通常會使用spring自帶的線程池處理,做到對線程的定制化處理和更好的可控,建議使用自定義的線程池。 主要涉及到的幾個點:
1. 自定義線程工廠(ThreadFactoryBuilder),主要用于線程的命名,方便追蹤
2. 自定義的線程池(ThreadPoolExecutorUtils),可以按功能優(yōu)化配置參數(shù)
3. 一個抽象的多線程任務處理接口(OperationThreadService)和通用實現(xiàn)(OperationThread)
4. 統(tǒng)一的調(diào)度實現(xiàn)(MultiThreadOperationUtils)
核心思想:分治歸并,每個線程計算出自己的結(jié)果,最后統(tǒng)一匯總。
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
/**
* description: 自定義實現(xiàn)的線程池,遵循alibaba編程規(guī)范,使用ThreadPoolExecutor創(chuàng)建線程池使用
* 設置更有描述意義的線程名稱,默認的ThreadFactory,它給線程起名字大概規(guī)律就是pool-m-thread-n,如pool-1-thread-1。
* 當分析一個thread dump時,很難知道線程的目的,需要有描述意義的線程名稱來分析追蹤問題
* 設置線程是否是守護線程,默認的ThreadFactory總是提交非守護線程
* 設置線程優(yōu)先級,默認ThreadFactory總是提交的一般優(yōu)先級線程
* <p>
* CustomThreadFactoryBuilder類實現(xiàn)了一種優(yōu)雅的Builder Mechanism方式去得到一個自定義ThreadFactory實例。
* ThreadFactory接口中有一個接受Runnable類型參數(shù)的方法newThread(Runnable r),
* 業(yè)務的factory邏輯就應該寫在這個方法中,去配置線程名稱、優(yōu)先級、守護線程狀態(tài)等屬性。
* 原文鏈接:https://blog.csdn.net/zombres/article/details/80497515
*
* @author Hlingoes
* @date 2019/12/22 0:45
*/
public class ThreadFactoryBuilder {
private static Logger logger = LoggerFactory.getLogger(ThreadFactoryBuilder.class);
private String nameFormat = null;
private boolean daemon = false;
private int priority = Thread.NORM_PRIORITY;
public ThreadFactoryBuilder setNameFormat(String nameFormat) {
if (nameFormat == null) {
throw new NullPointerException();
}
this.nameFormat = nameFormat;
return this;
}
public ThreadFactoryBuilder setDaemon(boolean daemon) {
this.daemon = daemon;
return this;
}
public ThreadFactoryBuilder setPriority(int priority) {
if (priority < Thread.MIN_PRIORITY) {
throw new IllegalArgumentException(String.format(
"Thread priority (%s) must be >= %s", priority, Thread.MIN_PRIORITY));
}
if (priority > Thread.MAX_PRIORITY) {
throw new IllegalArgumentException(String.format(
"Thread priority (%s) must be <= %s", priority, Thread.MAX_PRIORITY));
}
this.priority = priority;
return this;
}
public ThreadFactory build() {
return build(this);
}
private static ThreadFactory build(ThreadFactoryBuilder builder) {
final String nameFormat = builder.nameFormat;
final Boolean daemon = builder.daemon;
final Integer priority = builder.priority;
final AtomicLong count = new AtomicLong(0);
return (Runnable runnable) -> {
Thread thread = new Thread(runnable);
if (nameFormat != null) {
thread.setName(String.format(nameFormat, count.getAndIncrement()));
}
if (daemon != null) {
thread.setDaemon(daemon);
}
thread.setPriority(priority);
thread.setUncaughtExceptionHandler((t, e) -> {
String threadName = t.getName();
logger.error("error occurred! threadName: {}, error msg: {}", threadName, e.getMessage(), e);
});
return thread;
};
}
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.*;
/**
* description: 創(chuàng)建通用的線程池
* <p>
* corePoolSize:線程池中核心線程數(shù)量
* maximumPoolSize:線程池同時允許存在的最大線程數(shù)量
* 內(nèi)部處理邏輯如下:
* 當線程池中工作線程數(shù)小于corePoolSize,創(chuàng)建新的工作線程來執(zhí)行該任務,不管線程池中是否存在空閑線程。
* 如果線程池中工作線程數(shù)達到corePoolSize,新任務嘗試放入隊列,入隊成功的任務將等待工作線程空閑時調(diào)度。
* 1. 如果隊列滿并且線程數(shù)小于maximumPoolSize,創(chuàng)建新的線程執(zhí)行該任務(注意:隊列中的任務繼續(xù)排序)。
* 2. 如果隊列滿且線程數(shù)超過maximumPoolSize,拒絕該任務
* <p>
* keepAliveTime
* 當線程池中工作線程數(shù)大于corePoolSize,并且線程空閑時間超過keepAliveTime,則這些線程將被終止。
* 同樣,可以將這種策略應用到核心線程,通過調(diào)用allowCoreThreadTimeout來實現(xiàn)。
* <p>
* BlockingQueue
* 任務等待隊列,用于緩存暫時無法執(zhí)行的任務。分為如下三種堵塞隊列:
* 1. 直接遞交,如SynchronousQueue,該策略直接將任務直接交給工作線程。如果當前沒有空閑工作線程,創(chuàng)建新線程。
* 這種策略最好是配合unbounded線程數(shù)來使用,從而避免任務被拒絕。但當任務生產(chǎn)速度大于消費速度,將導致線程數(shù)不斷的增加。
* 2. 無界隊列,如LinkedBlockingQueue,當工作的線程數(shù)達到核心線程數(shù)時,新的任務被放在隊列上。
* 因此,永遠不會有大于corePoolSize的線程被創(chuàng)建,maximumPoolSize參數(shù)失效。
* 這種策略比較適合所有的任務都不相互依賴,獨立執(zhí)行。
* 但是當任務處理速度小于任務進入速度的時候會引起隊列的無限膨脹。
* 3. 有界隊列,如ArrayBlockingQueue,按前面描述的corePoolSize、maximumPoolSize、BlockingQueue處理邏輯處理。
* 隊列長度和maximumPoolSize兩個值會相互影響:
* 長隊列 + 小maximumPoolSize。會減少CPU的使用、操作系統(tǒng)資源、上下文切換的消耗,但是會降低吞吐量,
* 如果任務被頻繁的阻塞如IO線程,系統(tǒng)其實可以調(diào)度更多的線程。
* 短隊列 + 大maximumPoolSize。CPU更忙,但會增加線程調(diào)度的消耗.
* 總結(jié)一下,IO密集型可以考慮多些線程來平衡CPU的使用,CPU密集型可以考慮少些線程減少線程調(diào)度的消耗
*
* @author Hlingoes
* @citation https://blog.csdn.net/wanghao112956/article/details/99292107
* @citation https://www.jianshu.com/p/896b8e18501b
* @date 2020/2/26 0:46
*/
public class ThreadPoolExecutorUtils {
private static Logger logger = LoggerFactory.getLogger(ThreadFactoryBuilder.class);
public static int defaultCoreSize = Runtime.getRuntime().availableProcessors();
private static int pollWaitingTime = 60;
private static int defaultQueueSize = 10 * 1000;
private static int defaultMaxSize = 4 * defaultCoreSize;
private static String threadName = "custom-pool";
/**
* description: 創(chuàng)建線程池
*
* @param waitingTime
* @param coreSize
* @param maxPoolSize
* @param queueSize
* @return java.util.concurrent.ThreadPoolExecutor
* @author Hlingoes 2020/4/12
*/
public static ThreadPoolExecutor getExecutorPool(int waitingTime, int coreSize, int maxPoolSize, int queueSize) {
pollWaitingTime = waitingTime;
defaultCoreSize = coreSize;
defaultMaxSize = maxPoolSize;
defaultQueueSize = queueSize;
return getExecutorPool();
}
/**
* description: 創(chuàng)建線程池
*
* @param waitingTime
* @param queueSize
* @param maxPoolSize
* @return java.util.concurrent.ThreadPoolExecutor
* @author Hlingoes 2020/3/20
*/
public static ThreadPoolExecutor getExecutorPool(int waitingTime, int queueSize, int maxPoolSize) {
pollWaitingTime = waitingTime;
defaultQueueSize = queueSize;
defaultMaxSize = maxPoolSize;
return getExecutorPool();
}
/**
* description: 創(chuàng)建線程池
*
* @param waitingTime
* @param queueSize
* @return java.util.concurrent.ThreadPoolExecutor
* @author Hlingoes 2020/3/20
*/
public static ThreadPoolExecutor getExecutorPool(int waitingTime, int queueSize) {
pollWaitingTime = waitingTime;
defaultQueueSize = queueSize;
return getExecutorPool();
}
/**
* description: 創(chuàng)建線程池
*
* @param waitingTime
* @return java.util.concurrent.ThreadPoolExecutor
* @author Hlingoes 2020/3/20
*/
public static ThreadPoolExecutor getExecutorPool(int waitingTime) {
pollWaitingTime = waitingTime;
return getExecutorPool();
}
/**
* description: 創(chuàng)建線程池
*
* @param
* @return java.util.concurrent.ThreadPoolExecutor
* @author Hlingoes 2020/6/6
*/
public static ThreadPoolExecutor getExecutorPool() {
return getExecutorPool(threadName);
}
/**
* description: 創(chuàng)建線程池
*
* @param
* @return java.util.concurrent.ThreadPoolExecutor
* @author Hlingoes 2020/3/20
*/
public static ThreadPoolExecutor getExecutorPool(String threadName) {
ThreadFactory factory = new ThreadFactoryBuilder()
.setNameFormat(threadName + "-%d")
.build();
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(defaultQueueSize);
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(defaultCoreSize,
defaultMaxSize, 60, TimeUnit.SECONDS, queue, factory,
(r, executor) -> {
/**
* 自定義的拒絕策略
* 當提交給線程池的某一個新任務無法直接被線程池中“核心線程”直接處理,
* 又無法加入等待隊列,也無法創(chuàng)建新的線程執(zhí)行;
* 又或者線程池已經(jīng)調(diào)用shutdown()方法停止了工作;
* 又或者線程池不是處于正常的工作狀態(tài);
* 這時候ThreadPoolExecutor線程池會拒絕處理這個任務
*/
if (!executor.isShutdown()) {
logger.warn("ThreadPoolExecutor is over working, please check the thread tasks! ");
}
}) {
/**
* description: 針對提交給線程池的任務可能會拋出異常這一問題,
* 可自行實現(xiàn)線程池的afterExecute方法,或者實現(xiàn)Thread的UncaughtExceptionHandler接口
* ThreadFactoryBuilder中已經(jīng)實現(xiàn)了UncaughtExceptionHandler接口,這里是為了進一步兼容
*
* @param r
* @param t
* @return void
* @author Hlingoes 2020/5/27
*/
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t == null && r instanceof Future<?>) {
try {
Future<?> future = (Future<?>) r;
future.get();
} catch (CancellationException ce) {
t = ce;
} catch (ExecutionException ee) {
t = ee.getCause();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
if (t != null) {
logger.error("customThreadPool error msg: {}", t.getMessage(), t);
}
}
};
/**
* 備選方法,事先知道會有很多任務會提交給這個線程池,可以在初始化的時候完成核心線程的創(chuàng)建,提高系統(tǒng)性能
* 一個線程池創(chuàng)建出來之后,在沒有給它提交任何任務之前,這個線程池中的線程數(shù)為0
* 一個個去創(chuàng)建新線程開銷太大,影響系統(tǒng)性能
* 可以在創(chuàng)建線程池的時候就將所有的核心線程全部一次性創(chuàng)建完畢,系統(tǒng)起來之后就可以直接使用
*/
poolExecutor.prestartAllCoreThreads();
return poolExecutor;
}
/**
* description: 所有任務執(zhí)行完之后,釋放線程池資源
*
* @param pool
* @return void
* @author Hlingoes 2020/3/20
*/
public static void closeAfterComplete(ThreadPoolExecutor pool) {
/**
* 當線程池調(diào)用該方法時,線程池的狀態(tài)則立刻變成SHUTDOWN狀態(tài)。
* 此時,則不能再往線程池中添加任何任務,否則將會拋出RejectedExecutionException異常。
* 但是,此時線程池不會立刻退出,直到添加到線程池中的任務都已經(jīng)處理完成,才會退出。
* 唯一的影響就是不能再提交任務了,正則執(zhí)行的任務即使在阻塞著也不會結(jié)束,在排隊的任務也不會取消。
*/
pool.shutdown();
try {
/**
* awaitTermination方法可以設定線程池在關閉之前的最大超時時間,
* 如果在超時時間結(jié)束之前線程池能夠正常關閉,這個方法會返回true,否則,一旦超時,就會返回false。
* 通常來說不可能無限制地等待下去,因此需要預估一個合理的超時時間,然后使用這個方法
*/
if (!pool.awaitTermination(pollWaitingTime, TimeUnit.SECONDS)) {
/**
* 如果awaitTermination方法返回false,又希望盡可能在線程池關閉之后再做其他資源回收工作,
* 可以考慮再調(diào)用一下shutdownNow方法,
* 此時隊列中所有尚未被處理的任務都會被丟棄,同時會設置線程池中每個線程的中斷標志位。
* shutdownNow并不保證一定可以讓正在運行的線程停止工作,除非提交給線程的任務能夠正確響應中斷。
* 到了這一步,可以考慮繼續(xù)調(diào)用awaitTermination方法,或者直接放棄,去做接下來要做的事情。
*/
pool.shutdownNow();
}
} catch (InterruptedException e) {
logger.error("ThreadPool overtime: {}", e.getMessage());
//(重新)丟棄所有尚未被處理的任務,同時會設置線程池中每個線程的中斷標志位
pool.shutdownNow();
// 保持中斷狀態(tài)
Thread.currentThread().interrupt();
}
}
}
import java.util.Arrays;
/**
* description: 分段參數(shù)
*
* @author Hlingoes
* @date 2020/5/22 23:50
*/
public class PartitionElements {
/**
* 當前的分段任務索引
*/
private long index;
/**
* 批量處理的任務個數(shù)
*/
private long batchCounts;
/**
* 任務的分段個數(shù)
*/
private long partitions;
/**
* 任務總數(shù)
*/
private long totalCounts;
private Object[] args;
private Object data;
public PartitionElements() {
}
public PartitionElements(long batchCounts, long totalCounts, Object[] args) {
this.batchCounts = batchCounts;
this.totalCounts = totalCounts;
this.partitions = aquirePartitions(totalCounts, batchCounts);
this.args = args;
}
public PartitionElements(long index, PartitionElements elements) {
this.index = index;
this.batchCounts = elements.getBatchCounts();
this.partitions = elements.getPartitions();
this.totalCounts = elements.getTotalCounts();
this.args = elements.getArgs();
}
/**
* description: 根據(jù)任務總量和單次任務處理量,計算任務個數(shù)
*
* @param totalCounts
* @param batchCounts
* @return long partitions
* @author Hlingoes 2020/5/23
*/
public long aquirePartitions(long totalCounts, long batchCounts) {
long partitions = totalCounts / batchCounts;
if (totalCounts % batchCounts != 0) {
partitions = partitions + 1;
}
// 兼容任務總數(shù)total = 1 的情況
if (partitions == 0) {
partitions = 1;
}
return partitions;
}
public long getIndex() {
return index;
}
public void setIndex(long index) {
this.index = index;
}
public long getBatchCounts() {
return batchCounts;
}
public void setBatchCounts(long batchCounts) {
this.batchCounts = batchCounts;
}
public long getPartitions() {
return partitions;
}
public void setPartitions(long partitions) {
this.partitions = partitions;
}
public long getTotalCounts() {
return totalCounts;
}
public void setTotalCounts(long totalCounts) {
this.totalCounts = totalCounts;
}
public Object[] getArgs() {
return args;
}
public void setArgs(Object[] args) {
this.args = args;
}
public Object getData() {
return data;
}
public void setData(Object data) {
this.data = data;
}
@Override
public String toString() {
return "PartitionElements{" +
"index=" + index +
", batchCounts=" + batchCounts +
", partitions=" + partitions +
", totalCounts=" + totalCounts +
", args=" + Arrays.toString(args) +
'}';
}
}
import cn.henry.study.common.bo.PartitionElements;
/**
* description: 業(yè)務分治歸并處理接口
*
* @author Hlingoes 2020/5/22
*/
public interface OperationThreadService {
/**
* description: 任務總量
*
* @param args
* @return long
* @throws Exception
* @author Hlingoes 2020/5/22
*/
long count(Object[] args) throws Exception;
/**
* description: 在多線程分治任務之前的預處理方法,返回業(yè)務數(shù)據(jù)
*
* @param args
* @return Object
* @throws Exception
* @author Hlingoes 2020/5/23
*/
Object prepare(Object[] args) throws Exception;
/**
* description: 多線程的任務邏輯
*
* @param elements
* @return java.lang.Object
* @throws Exception
* @author Hlingoes 2020/5/24
*/
Object invoke(PartitionElements elements) throws Exception;
/**
* description: 多線程單個任務結(jié)束后的歸并方法
*
* @param elements
* @param object
* @return void
* @throws Exception
* @author Hlingoes 2020/5/23
*/
void post(PartitionElements elements, Object object) throws Exception;
/**
* description: 歸并結(jié)果之后的尾處理
*
* @param object
* @return java.lang.Object
* @throws Exception
* @author Hlingoes 2020/5/24
*/
Object finished(Object object) throws Exception;
}
import cn.henry.study.common.bo.PartitionElements;
import cn.henry.study.common.service.OperationThreadService;
import cn.henry.study.common.thread.OperationThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
/**
* description: 多線程業(yè)務分治歸并處理
*
* @author Hlingoes
* @date 2020/5/22 0:42
*/
public class MultiThreadOperationUtils {
private static Logger logger = LoggerFactory.getLogger(MultiThreadOperationUtils.class);
/**
* description: 開啟多線程執(zhí)行任務,按順序歸并處理任務結(jié)果
* 按照默認線程數(shù),計算批量任務數(shù)
*
* @param service
* @param args
* @return void
* @author Hlingoes 2020/5/23
*/
public static Object batchExecute(OperationThreadService service, Object[] args) throws Exception {
long totalCounts = service.count(args);
long batchCounts = totalCounts / ThreadPoolExecutorUtils.defaultCoreSize;
// 兼容任務少于核心線程數(shù)的情況
if (batchCounts == 0) {
batchCounts = 1L;
}
PartitionElements elements = new PartitionElements(batchCounts, totalCounts, args);
return batchExecute(service, elements);
}
/**
* description: 開啟多線程執(zhí)行任務,按順序歸并處理任務結(jié)果
* 給定每頁顯示條目個數(shù)
*
* @param service
* @param batchCounts
* @param args
* @return void
* @author Hlingoes 2020/5/23
*/
public static Object batchExecute(OperationThreadService service, long batchCounts, Object[] args) throws Exception {
long totalCounts = service.count(args);
PartitionElements elements = new PartitionElements(batchCounts, totalCounts, args);
return batchExecute(service, elements);
}
/**
* description: 開啟多線程執(zhí)行分治任務,按順序歸并處理任務結(jié)果
*
* @param service
* @param elements
* @return void
* @author Hlingoes 2020/5/23
*/
private static Object batchExecute(OperationThreadService service, PartitionElements elements) throws Exception {
ThreadPoolExecutor executor = ThreadPoolExecutorUtils.getExecutorPool();
// 在多線程分治任務之前的預處理方法,返回業(yè)務數(shù)據(jù)
final Object obj = service.prepare(elements.getArgs());
// 預防list和map的resize,初始化給定容量,可提高性能
ArrayList<Future<PartitionElements>> futures = new ArrayList<>((int) elements.getPartitions());
OperationThread opThread = null;
Future<PartitionElements> future = null;
// 添加線程任務
for (int i = 0; i < elements.getPartitions(); i++) {
// 劃定任務分布
opThread = new OperationThread(new PartitionElements(i + 1, elements), service);
future = executor.submit(opThread);
futures.add(future);
}
// 關閉線程池
executor.shutdown();
// 阻塞線程,同步處理數(shù)據(jù)
futures.forEach(f -> {
try {
// 線程單個任務結(jié)束后的歸并方法
service.post(f.get(), obj);
} catch (Exception e) {
logger.error("post routine fail", e);
}
});
return service.finished(obj);
}
}
import cn.henry.study.common.bo.PartitionElements;
import cn.henry.study.common.service.OperationThreadService;
import cn.henry.study.common.utils.MultiThreadOperationUtils;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
/**
* description: 多線程的測試用例
*
* @author Hlingoes
* @date 2020/6/12 20:52
*/
public class MultiThreadServiceTest implements OperationThreadService {
private static Logger logger = LoggerFactory.getLogger(MultiThreadServiceTest.class);
@Override
public long count(Object[] args) throws Exception {
return 100L;
}
@Override
public Object prepare(Object[] args) throws Exception {
return "success";
}
@Override
public Object invoke(PartitionElements elements) throws Exception {
List<Object> list = new ArrayList<>((int) elements.getBatchCounts());
for (int i = 0; i < elements.getIndex(); i++) {
list.add("test_" + i);
}
return list;
}
@Override
public void post(PartitionElements elements, Object object) throws Exception {
String insertSql = "insert into test (id) values ";
StringBuilder sb = new StringBuilder();
List<Object> datas = (List<Object>) elements.getData();
for (int i = 0; i < datas.size(); i++) {
if ((i + 1) % 5 == 0 || (i + 1) == datas.size()) {
sb.append("('" + datas.get(i) + "')");
logger.info("{}: 測試insert sql: {}", elements, insertSql + sb.toString());
sb = new StringBuilder();
} else {
sb.append("('" + datas.get(i) + "'),");
}
}
}
@Override
public Object finished(Object object) throws Exception {
return object;
}
@Test
public void testBatchExecute() {
try {
Object object = MultiThreadOperationUtils.batchExecute(new MultiThreadServiceTest(), 10, new Object[]{"test"});
logger.info("測試完成: {}", object.toString());
} catch (Exception e) {
e.printStackTrace();
}
}
}
總結(jié):這是一個抽象之后的多線程業(yè)務流程處理方式,已在生產(chǎn)環(huán)境使用,多線程的重點在業(yè)務分割和思想上,有清晰的責任劃分。
到此這篇關于java項目中的多線程實踐的文章就介紹到這了,更多相關java多線程實踐內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
spring boot實現(xiàn)上傳圖片并在頁面上顯示及遇到的問題小結(jié)
最近在使用spring boot搭建網(wǎng)站的過程之中遇到了有點小問題,最終解決方案是在main目錄下新建了一個webapp文件夾,并且對其路徑進行了配置,本文重點給大家介紹spring boot實現(xiàn)上傳圖片并在頁面上顯示功能,需要的朋友參考下吧2017-12-12
Java的MyBatis框架中Mapper映射配置的使用及原理解析
Mapper用于映射SQL語句,可以說是MyBatis操作數(shù)據(jù)庫的核心特性之一,這里我們來討論Java的MyBatis框架中Mapper映射配置的使用及原理解析,包括對mapper的xml配置文件的讀取流程解讀.2016-06-06
Spring MVC 學習 之 - URL參數(shù)傳遞詳解
本篇文章主要介紹了SpringMVC-URL參數(shù)傳遞,在學習 Spring Mvc 過程中,有必要來先了解幾個關鍵參數(shù),有興趣的可以了解一下。2017-01-01

