Java多線程批量數(shù)據(jù)導(dǎo)入的方法詳解
前言:
當(dāng)遇到大量數(shù)據(jù)導(dǎo)入時(shí),為了提高處理的速度,可以選擇使用多線程來批量處理這些處理。常見的場景有:
- 大文件導(dǎo)入數(shù)據(jù)庫(這個(gè)文件不一定是標(biāo)準(zhǔn)的CSV可導(dǎo)入文件或者需要在內(nèi)存中經(jīng)過一定的處理)
- 數(shù)據(jù)同步(從第三方接口拉取數(shù)據(jù)處理后寫入自己的數(shù)據(jù)庫)
以上的場景有一個(gè)共性,這類數(shù)據(jù)導(dǎo)入的場景簡單來說就是將數(shù)據(jù)從一個(gè)數(shù)據(jù)源移動(dòng)到另外一個(gè)數(shù)據(jù)源,而其中必定可以分為兩步
- 數(shù)據(jù)讀取:從數(shù)據(jù)源讀取數(shù)據(jù)到內(nèi)存
- 數(shù)據(jù)寫入:將內(nèi)存中的數(shù)據(jù)寫入到另外一個(gè)數(shù)據(jù)源,可能存在數(shù)據(jù)處理
而且根據(jù)讀取的速度一般會(huì)比數(shù)據(jù)寫入的速度快很多,即讀取快,寫入慢。
設(shè)計(jì)思路
由于場景的特點(diǎn)是讀取快,寫入慢,如果是使用多線程處理,建議是數(shù)據(jù)寫入部分改造為多線程。而數(shù)據(jù)讀取可以改造成批量讀取數(shù)據(jù)。簡單來說就是兩個(gè)要點(diǎn):
- 批量讀取數(shù)據(jù)
- 多線程寫入數(shù)據(jù)
示例
多線程批量處理最簡單的方案是使用線程池來進(jìn)行處理,下面會(huì)通過一個(gè)模擬批量讀取和寫入的服務(wù),以及對(duì)這個(gè)服務(wù)的多線程寫入調(diào)用作為示例,展示如何多線程批量數(shù)據(jù)導(dǎo)入。
模擬服務(wù)
import java.util.concurrent.atomic.AtomicLong;
/**
* 數(shù)據(jù)批量寫入用的模擬服務(wù)
*
* @author RJH
* create at 2019-04-01
*/
public class MockService {
/**
* 可讀取總數(shù)
*/
private long canReadTotal;
/**
* 寫入總數(shù)
*/
private AtomicLong writeTotal=new AtomicLong(0);
/**
* 寫入休眠時(shí)間(單位:毫秒)
*/
private final long sleepTime;
/**
* 構(gòu)造方法
*
* @param canReadTotal
* @param sleepTime
*/
public MockService(long canReadTotal, long sleepTime) {
this.canReadTotal = canReadTotal;
this.sleepTime = sleepTime;
}
/**
* 批量讀取數(shù)據(jù)接口
*
* @param num
* @return
*/
public synchronized long readData(int num) {
long readNum;
if (canReadTotal >= num) {
canReadTotal -= num;
readNum = num;
} else {
readNum = canReadTotal;
canReadTotal = 0;
}
//System.out.println("read data size:" + readNum);
return readNum;
}
/**
* 寫入數(shù)據(jù)接口
*/
public void writeData() {
try {
// 休眠一定時(shí)間模擬寫入速度慢
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 寫入總數(shù)自增
System.out.println("thread:" + Thread.currentThread() + " write data:" + writeTotal.incrementAndGet());
}
/**
* 獲取寫入的總數(shù)
*
* @return
*/
public long getWriteTotal() {
return writeTotal.get();
}
}
批量數(shù)據(jù)處理器
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 基于線程池的多線程批量寫入處理器
* @author RJH
* create at 2019-04-01
*/
public class SimpleBatchHandler {
private ExecutorService executorService;
private MockService service;
/**
* 每次批量讀取的數(shù)據(jù)量
*/
private int batch;
/**
* 線程個(gè)數(shù)
*/
private int threadNum;
public SimpleBatchHandler(MockService service, int batch,int threadNum) {
this.service = service;
this.batch = batch;
//使用固定數(shù)目的線程池
this.executorService = Executors.newFixedThreadPool(threadNum);
}
/**
* 開始處理
*/
public void startHandle() {
// 開始處理的時(shí)間
long startTime = System.currentTimeMillis();
System.out.println("start handle time:" + startTime);
long readData;
while ((readData = service.readData(batch)) != 0) {// 批量讀取數(shù)據(jù),知道讀取不到數(shù)據(jù)才停止
for (long i = 0; i < readData; i++) {
executorService.execute(() -> service.writeData());
}
}
// 關(guān)閉線程池
executorService.shutdown();
while (!executorService.isTerminated()) {//等待線程池中的線程執(zhí)行完
}
// 結(jié)束時(shí)間
long endTime = System.currentTimeMillis();
System.out.println("end handle time:" + endTime);
// 總耗時(shí)
System.out.println("total handle time:" + (endTime - startTime) + "ms");
// 寫入總數(shù)
System.out.println("total write num:" + service.getWriteTotal());
}
}
測試類
/**
* SimpleBatchHandler的測試類
* @author RJH
* create at 2019-04-01
*/
public class SimpleBatchHandlerTest {
public static void main(String[] args) {
// 總數(shù)
long total=100000;
// 休眠時(shí)間
long sleepTime=100;
// 每次拉取的數(shù)量
int batch=100;
// 線程個(gè)數(shù)
int threadNum=16;
MockService mockService=new MockService(total,sleepTime);
SimpleBatchHandler handler=new SimpleBatchHandler(mockService,batch,threadNum);
handler.startHandle();
}
}
運(yùn)行結(jié)果
start handle time:1554298681755 thread:Thread[pool-1-thread-2,5,main] write data:1 thread:Thread[pool-1-thread-1,5,main] write data:2 ...省略部分輸出 thread:Thread[pool-1-thread-4,5,main] write data:100000 end handle time:1554299330202 total handle time:648447ms total write num:100000
分析
在單線程情況下的執(zhí)行時(shí)間應(yīng)該為total*sleepTime,即10000000ms,而改造為多線程后執(zhí)行時(shí)間為648447ms。
以上就是本文的全部內(nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Spring?容器初始化?register?與?refresh方法
這篇文章主要介紹了Spring?容器初始化?register?與?refresh方法,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的小伙伴可以參考一下2022-07-07
Idea打包springboot項(xiàng)目沒有.original文件解決方案
這篇文章主要介紹了Idea打包springboot項(xiàng)目沒有.original文件解決方案,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-07-07
Spring Boot中自動(dòng)化配置的利弊以及解決方法
這篇文章主要給大家介紹了關(guān)于Spring Boot中自動(dòng)化配置的利弊以及解決方法,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用Spring Boot具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來一起看看吧。2017-08-08
Java基于Javafaker生成測試數(shù)據(jù)
這篇文章主要介紹了Java基于Javafaker生成測試數(shù)據(jù)的方法,幫助大家更好的理解和使用Java,感興趣的朋友可以了解下2020-12-12
Java實(shí)現(xiàn)ftp上傳下載、刪除文件及在ftp服務(wù)器上傳文件夾的方法
這篇文章主要介紹了Java實(shí)現(xiàn)ftp上傳下載、刪除文件及在ftp服務(wù)器上傳文件夾的方法,需要的朋友可以參考下2015-11-11
Java結(jié)構(gòu)型設(shè)計(jì)模式之組合模式Composite Pattern詳解
組合模式,又叫部分整體模式,它創(chuàng)建了對(duì)象組的數(shù)據(jù)結(jié)構(gòu)組合模式使得用戶對(duì)單個(gè)對(duì)象和組合對(duì)象的訪問具有一致性。本文將通過示例為大家詳細(xì)介紹一下組合模式,需要的可以參考一下2022-11-11
SpringBoot如何使用自定義注解實(shí)現(xiàn)接口限流
這篇文章主要介紹了SpringBoot如何使用自定義注解實(shí)現(xiàn)接口限流,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-06-06

