Java多線程處理千萬級數據更新操作
故事背景
之前因為用戶信息安全把好幾個敏感信息用AES加密保存了,應業(yè)務需求,需要通過后臺userId等信息查詢訂單,所以需要明文保存數據。但是目前訂單表已經有7000萬+訂單數據,操作起來比較麻煩,肯定需要分批操作的,否則會有內存溢出等問題。初步想到的解決方案是新建一個數據庫表,保存處理訂單表的id是處理到哪一條數據,然后用limit查詢一萬條數據,處理完id+10000。
偽代碼
//查詢id
while(id<maxid){
Integer id = mapper.selectList().get(0);
List list = mapper.selectList(Wrappers.<order>lambdaQuery()
.gt(order::getId,id)
.last("limit 10000");
for(){
處理數據
update數據
}
id = list.get(10000).getId();
updateId();
}
這樣update數據處理可能會比較慢,可以用多線程處理。
方案一
用多線程的話需要考慮一下怎么設計,首先是想著拿到一萬條數據之后,分成10份,給10條線程去處理數據,但是這樣可能會因為線程處理太慢,線程任務隊列太多導致OOM異常,所以可以等待線程全都執(zhí)行完畢再處理下一批數據。那么怎么等待線程執(zhí)行完再處理呢?百度完發(fā)現可以用CountDownLatch實現~

方案二
如果不想等線程執(zhí)行完再處理,可以轉換下思路。把list當做消息隊列,每個線程獲取1千條數據,如果隊列數據為空則再查詢后面1萬條數據做處理。這里有一個多線程處理共享隊列的問題,所以需要在取數據的時候和增刪隊列數據的時候加鎖。

偽代碼
private static List list;
//線程處理
public void run(){
List dataList;
while(true){
synchronized(xx.class){
if(list.size() == 0){
list = getList();
}
//再判斷一次,如果為空就是處理完了
if(orderList == null || orderList.size() == 0) {
return;
}
//取最后1千條
dataList = new ArrayList<>(list.subList(list.size()-1000,list.size()));
//減去最后一千條
list = list.subList(0,list.size()-1000);
}
//處理dataList
for(Order order:dataList){
...
}
}
}
多線程的坑
多線程一定要注意數據的并發(fā)安全問題,比如說用subList的話并不會復制一個新的list出來,只是原有l(wèi)ist的基礎上的一個視圖,如果原來的list改變,subList得到的list也會跟著改變,如果想要用subList一定要new一個List接收。
DEMO
public static void main(String[] args) {
List<Integer> list = new ArrayList<>();
for(int i=0;i<100;i++){
list.add(i);
}
while(list.size() >=10){
//取最后10條
List<Integer> dataList = list.subList(list.size()-10,list.size());
//List<Integer> dataList = list.stream().skip(Math.max(0, list.size() - 10)).collect(Collectors.toList()) ;
ListThread listThread = new ListThread(dataList);
listThread.start();
//去掉最后10條數據
list = list.subList(0,list.size()-10);
}
list.add(33333);
}
class ListThread extends Thread{
private List<Integer> list;
public ListThread(List list){
this.list = list;
}
public void run(){
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName()+"開始打印數據======");
for(int i=0;i<list.size();i++){
System.out.println(list.get(i));
}
}
}
如果sleep5秒模擬隊列的增刪操作,原來的list已經變了,會影響線程里的list變動,會報錯
java.util.ConcurrentModificationException
要解決這個問題需要用創(chuàng)建一個新的list,這就不會受原來的list影響
List<Integer> dataList = new ArrayList<>(list.subList(list.size()-10,list.size())); //或者使用stream流 List<Integer> dataList = list.stream().skip(Math.max(0, list.size() - 10)).collect(Collectors.toList()) ;
后面還遇到一個問題,new出來的多線程類無法使用@Autowired注入,解決辦法:
1、將需要的Bean作為線程的的構造函數的參數傳入
2、使用ApplicationContext.getBean方法來靜態(tài)的獲取Bean(推薦)
總結
1、使用多線程一定要注意線程安全問題,操作共享變量需要加鎖處理,或者分割獨立的數據給各自的線程處理。如果是簡單且不追求性能的業(yè)務還是用單線程比較安全。
2、使用subList要注意返回的是視圖,如果原來的list有變化,需要創(chuàng)建一個新的list接收,或者用stream流做數據分割。
3、查詢+處理數據速度大概是1分鐘35萬條,大概是這個時間,批量update可能還能優(yōu)化,我用的是mybatis-plus自帶的updateBatchById。
到此這篇關于Java多線程處理千萬級數據更新操作的文章就介紹到這了,更多相關Java多線程處理數據更新內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
restemplate請求亂碼之content-encoding=“gzip“示例詳解
RestTemplate從Spring3.0開始支持的一個HTTP請求工具,它提供了常見的REST請求方案的模板,及一些通用的請求執(zhí)行方法 exchange 以及 execute,接下來通過本文給大家介紹restemplate請求亂碼之content-encoding=“gzip“,需要的朋友可以參考下2024-03-03
關于報錯IDEA Terminated with exit code
如果在IDEA構建項目時遇到下面這樣的報錯IDEA Terminated with exit code 1,那必然是Maven的設置參數重置了,導致下載錯誤引起的,本文給大家分享兩種解決方法,需要的朋友可以參考下2022-08-08

