Java如何從Redis中批量讀取數(shù)據(jù)
一.背景概述
本周接到一個新的需求:從用戶dau日志文件中讀取用戶uid,然后到Redis中獲取對應(yīng)的用戶數(shù)據(jù)。用戶的uid存儲于login_day_20220913.txt文件,共1億2千多萬條數(shù)據(jù),數(shù)量達(dá)1.4G。
要求:盡量在2小時內(nèi)獲得結(jié)果,在數(shù)據(jù)處理過程中,Redis服務(wù)器QPS盡量低,不超過某個閾值,不然會觸發(fā)監(jiān)控報警。數(shù)據(jù)從Redis從庫讀取,只提供一個端口。
二.分析與實(shí)現(xiàn)
由于之前做過相同數(shù)據(jù)量的統(tǒng)計需求,所以從一開始就確定單線程完成此次數(shù)據(jù)處理也是可以的。實(shí)際上,對多線程和并發(fā)的使用需要慎之又慎,特別是在業(yè)務(wù)繁忙的系統(tǒng)或環(huán)境下。
接觸Redis的朋友都知道,Redis是支持批量讀取的,其中常用的兩個方法:mget()和hmget()。
本次處理的數(shù)據(jù)不是哈希結(jié)構(gòu),所以確定使用mget()。
此時,我自然而然地問了同事一個問題,那就是mget批量處理數(shù)據(jù)的最佳參數(shù)范圍是多少?因?yàn)閙get()接受一個字符串?dāng)?shù)組參數(shù),也就是說字符串?dāng)?shù)組的長度最佳為多少?
同事并沒有給我明確的答案,只是說他們?nèi)粘C颗翁幚?0000條,建議我自己可以嘗試一下,于是我打算試試50000條數(shù)據(jù)。
主要代碼如下:
package com.sina.weibo; import com.sina.weibo.util.FileUtils; import com.sina.weibo.util.ListUtil; import org.apache.commons.lang3.time.StopWatch; import redis.clients.jedis.Jedis; import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.List; import java.util.concurrent.TimeUnit; public class Application { /** dau數(shù)據(jù)讀取路徑 */ private static String dauDataPath = "/data1/sinawap/var/logs/wapcommon/place/user_position/dau/login_day_20220913.txt"; /** 結(jié)果輸出路徑 */ private static String outputPath = "/data1/bingqing5/importcampusdata/output/campus_data.txt"; /** 已處理過的uid數(shù)據(jù)存儲路徑 */ private static String processedUidDataPath = "/data1/bingqing5/importcampusdata/process/processed_uid.txt"; public static void main(String[] args) { StopWatch stopWatch = new StopWatch(); // 開始時間 stopWatch.start(); System.out.println("================程序開始==============="); transfer(dauDataPath, processedUidDataPath, outputPath); System.out.println("================程序結(jié)束==============="); // 結(jié)束時間 stopWatch.stop(); // 統(tǒng)計執(zhí)行時間(秒) System.out.println("執(zhí)行時長:" + stopWatch.getTime(TimeUnit.SECONDS) + " 秒."); } private static void transfer(String dauDataPath, String processedUidDataPath, String outputPath) { List<String> dauDataList = FileUtils.readInfoFromFile(dauDataPath); List<List<String>> bucket = ListUtil.splitList(dauDataList, 50000); Jedis jedis = new Jedis("rdsxxxxx.xxxx.xxxx.xxxx.com.cn",50000); List<String> processedUidDataList = FileUtils.readInfoFromFile(processedUidDataPath); LinkedHashSet<String> linkedHashSet = ListUtil.getLinkedHashSet(processedUidDataList); for (List<String> list : bucket) { List<String> jsonStrList = jedis.mget(list.toArray(new String[list.size()])); for (int i = 0; i < list.size(); i++) { if (!linkedHashSet.contains(list.get(i))) { String uid = list.get(i); FileUtils.appendInfoToFile(processedUidDataPath, uid); String jsonStr = jsonStrList.get(i); if (jsonStr == null || jsonStr == "") continue; String content = uid + "\t" + jsonStr; FileUtils.appendInfoToFile(outputPath, content); } } System.out.println(list.size()); } } }
三.發(fā)現(xiàn)問題與屢次改進(jìn)
3.1.QPS過高而且波動很大
上述代碼上線后沒多久,就被同事找來,說QPS過高,開始的時候瞬間達(dá)到近100k,之后穩(wěn)定在70k~100k之間。因?yàn)閾?dān)心影響其他業(yè)務(wù),于是把jar包暫停,著手優(yōu)化。
于是,我多次修改如下代碼:
List<List<String>> bucket = ListUtil.splitList(dauDataList, 50000);
將50000,調(diào)整為10000,5000,1000,500,100等值逐一嘗試。
QPS確實(shí)逐步降下來了,但是即便是每次處理1000條,QPS也有40K左右。
3.2.程序中斷,拋異常
最終以每批次讀取500條數(shù)據(jù),將代碼上線。但是程序總是中斷報錯,拋出異常:
而這時候已處理的數(shù)據(jù)量達(dá)到幾千萬條。
最初懷疑是因?yàn)閖edis對象沒有調(diào)用close方法,于是修改代碼如下:
package com.sina.weibo; import com.sina.weibo.util.FileUtils; import com.sina.weibo.util.ListUtil; import org.apache.commons.lang3.time.StopWatch; import redis.clients.jedis.Jedis; import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.List; import java.util.concurrent.TimeUnit; public class Application { /** dau數(shù)據(jù)讀取路徑 */ private static String dauDataPath = "/data1/sinawap/var/logs/wapcommon/place/user_position/dau/login_day_20220913.txt"; /** 結(jié)果輸出路徑 */ private static String outputPath = "/data1/bingqing5/importcampusdata/output/campus_data.txt"; /** 已處理過的uid數(shù)據(jù)存儲路徑 */ private static String processedUidDataPath = "/data1/bingqing5/importcampusdata/process/processed_uid.txt"; public static void main(String[] args) { StopWatch stopWatch = new StopWatch(); // 開始時間 stopWatch.start(); System.out.println("================程序開始==============="); transfer(dauDataPath, processedUidDataPath, outputPath); System.out.println("================程序結(jié)束==============="); // 結(jié)束時間 stopWatch.stop(); // 統(tǒng)計執(zhí)行時間(秒) System.out.println("執(zhí)行時長:" + stopWatch.getTime(TimeUnit.SECONDS) + " 秒."); } private static void transfer(String dauDataPath, String processedUidDataPath, String outputPath) { List<String> dauDataList = FileUtils.readInfoFromFile(dauDataPath); List<List<String>> bucket = ListUtil.splitList(dauDataList, 50000); List<String> processedUidDataList = FileUtils.readInfoFromFile(processedUidDataPath); LinkedHashSet<String> linkedHashSet = ListUtil.getLinkedHashSet(processedUidDataList); for (List<String> list : bucket) { Jedis jedis = new Jedis(rdsxxxxx.xxxx.xxxx.xxxx.com.cn", 50000); List<String> jsonStrList = jedis.mget(list.toArray(new String[list.size()])); for (int i = 0; i < list.size(); i++) { if (!linkedHashSet.contains(list.get(i))) { String uid = list.get(i); FileUtils.appendInfoToFile(processedUidDataPath, uid); String jsonStr = jsonStrList.get(i); if (jsonStr == null || jsonStr == "") continue; String content = uid + "\t" + jsonStr; FileUtils.appendInfoToFile(outputPath, content); } } jedis.close(); System.out.println(list.size()); } } }
修改后跑程序依舊沒有任何改善,繼續(xù)修改,代碼如下:
package com.sina.weibo; import com.sina.weibo.util.FileUtils; import com.sina.weibo.util.ListUtil; import org.apache.commons.lang3.time.StopWatch; import redis.clients.jedis.Jedis; import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.List; import java.util.concurrent.TimeUnit; public class A { /** dau數(shù)據(jù)讀取路徑 */ private static String dauDataPath = "/data1/sinawap/var/logs/wapcommon/place/user_position/dau/login_day_20220913.txt"; /** 結(jié)果輸出路徑 */ private static String outputPath = "/data1/bingqing5/importcampusdata/output/campus_data.txt"; /** 已處理過的uid數(shù)據(jù)存儲路徑 */ private static String processedUidDataPath = "/data1/bingqing5/importcampusdata/process/processed_uid.txt"; public static void main(String[] args) { StopWatch stopWatch = new StopWatch(); // 開始時間 stopWatch.start(); System.out.println("================程序開始==============="); transfer(dauDataPath, processedUidDataPath, outputPath); System.out.println("================程序結(jié)束==============="); // 結(jié)束時間 stopWatch.stop(); // 統(tǒng)計執(zhí)行時間(秒) System.out.println("執(zhí)行時長:" + stopWatch.getTime(TimeUnit.SECONDS) + " 秒."); } private static void transfer(String dauDataPath, String processedUidDataPath, String outputPath) { List<String> dauDataList = FileUtils.readInfoFromFile(dauDataPath); List<List<String>> bucket = ListUtil.splitList(dauDataList, 50000); List<String> processedUidDataList = FileUtils.readInfoFromFile(processedUidDataPath); LinkedHashSet<String> linkedHashSet = ListUtil.getLinkedHashSet(processedUidDataList); for (List<String> list : bucket) { Jedis jedis = new Jedis("rdsxxxxx.xxxx.xxxx.xxxx.com.cn", 50000); List<String> jsonStrList = jedis.mget(list.toArray(new String[list.size()])); for (int i = 0; i < list.size(); i++) { if (!linkedHashSet.contains(list.get(i))) { String uid = list.get(i); FileUtils.appendInfoToFile(processedUidDataPath, uid); String jsonStr = jsonStrList.get(i); if (jsonStr == null || jsonStr == "") continue; String content = uid + "\t" + jsonStr; FileUtils.appendInfoToFile(outputPath, content); } } try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } finally { jedis.close(); } System.out.println(list.size()); } } }
上線以后,觀測發(fā)現(xiàn)QPS區(qū)域穩(wěn)定,但是程序會空跑,也就是從頭開始將已處理的數(shù)據(jù)也要逐一讀取一次,很多時候都沒有跑到上次程序處理的地方就已經(jīng)被迫退出。
linkedHashSet本來是用來標(biāo)記上次程序運(yùn)行停止的地方,但是似乎并沒有完全發(fā)揮作用。
于是修改代碼,加入一個新的list集合,用于存放還沒有處理過的數(shù)據(jù),代碼如下:
package com.sina.weibo; import com.sina.weibo.util.FileUtils; import com.sina.weibo.util.ListUtil; import org.apache.commons.lang3.time.StopWatch; import redis.clients.jedis.Jedis; import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.List; import java.util.concurrent.TimeUnit; /** * @author bingqing5 * @date 2022/09/14 15:00 * @version 1.0 */ public class Application { /** dau數(shù)據(jù)讀取路徑 */ private static String dauDataPath = "/data1/sinawap/var/logs/wapcommon/place/user_position/dau/login_day_20220913.txt"; /** 結(jié)果輸出路徑 */ private static String outputPath = "/data1/bingqing5/importcampusdata/output/campus_data.txt"; /** 已處理過的uid數(shù)據(jù)存儲路徑 */ private static String processedUidDataPath = "/data1/bingqing5/importcampusdata/process/processed_uid.txt"; public static void main(String[] args) { StopWatch stopWatch = new StopWatch(); // 開始時間 stopWatch.start(); System.out.println("================程序開始==============="); // transfer(dauDataPath, processedUidDataPath, outputPath); List<String> dauDataList = FileUtils.readInfoFromFile(dauDataPath); // List<List<String>> bucket = ListUtil.splitList(dauDataList, 50000); // Jedis jedis = new Jedis("rdsxxxxx.xxxx.xxxx.xxxx.com.cn", 50000); List<String> processedUidDataList = FileUtils.readInfoFromFile(processedUidDataPath); LinkedHashSet<String> linkedHashSet = ListUtil.getLinkedHashSet(processedUidDataList); List<String> uidList = new ArrayList<>(); for (String uid : dauDataList) { if (linkedHashSet.contains(uid)) { continue; } else { uidList.add(uid); } } List<List<String>> bucket; if (uidList.size() != 0) { bucket = ListUtil.splitList(uidList, 10000); } else { bucket = new ArrayList<>(); } for (List<String> list : bucket) { Jedis jedis = new Jedis("rdsxxxxx.xxxx.xxxx.xxxx.com.cn", 50000); List<String> jsonStrList = jedis.mget(list.toArray(new String[list.size()])); for (int i = 0; i < list.size(); i++) { if (!linkedHashSet.contains(list.get(i))) { String uid = list.get(i); FileUtils.appendInfoToFile(processedUidDataPath, uid); String jsonStr = jsonStrList.get(i); if (jsonStr == null || jsonStr == "") continue; String content = uid + "\t" + jsonStr; FileUtils.appendInfoToFile(outputPath, content); } } try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } finally { jedis.close(); } System.out.println(list.size()); } System.out.println("================程序結(jié)束==============="); // 結(jié)束時間 stopWatch.stop(); // 統(tǒng)計執(zhí)行時間(秒) System.out.println("執(zhí)行時長:" + stopWatch.getTime(TimeUnit.SECONDS) + " 秒."); } }
終于這次修改后,上線代碼,代碼平穩(wěn)運(yùn)行。
此時查看QPS,發(fā)現(xiàn)10000的批讀取量,QPS文檔在25K以下,此前同樣的數(shù)據(jù)量,QPS能達(dá)到40K。
3.3.內(nèi)存消耗過大
在上次修改后,程序平穩(wěn)運(yùn)行,期間我查看了機(jī)器狀態(tài),發(fā)現(xiàn)我跑的jar包竟然消耗了32%左右的內(nèi)存,那臺機(jī)器也不過62G的總內(nèi)存。雖然不缺內(nèi)存資源,但是還是決定趁著程序在跑的期間,回顧一下代碼。
List<List<String>> bucket = ListUtil.splitList(dauDataList, 10000);
上面這行代碼是將所有的用戶uid數(shù)據(jù)按照10000的大小均等分割,每次遍歷,要重復(fù)創(chuàng)建同一類Jedis對象,也會消耗大量內(nèi)存。
另外,下面這段程序:
List<String> uidList = new ArrayList<>(); for (String uid : dauDataList) { if (linkedHashSet.contains(uid)) { continue; } else { uidList.add(uid); } }
已經(jīng)對處理過的數(shù)據(jù)做過篩選,在循環(huán)中再次做如下判斷:
if (!linkedHashSet.contains(list.get(i))) { }
也是多次一舉,會增加耗時。
綜合以上考慮,我做了修改,代碼如下:
package com.sina.weibo; import com.sina.weibo.util.FileUtils; import com.sina.weibo.util.ListUtil; import org.apache.commons.lang3.time.StopWatch; import redis.clients.jedis.Jedis; import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.List; import java.util.concurrent.TimeUnit; /** * @author bingqing5 * @date 2022/09/14 15:00 * @version 1.0 */ public class Application { /** dau數(shù)據(jù)讀取路徑 */ private static String dauDataPath = "/data1/sinawap/var/logs/wapcommon/place/user_position/dau/login_day_20220913.txt"; /** 結(jié)果輸出路徑 */ // private static String outputPath = "/data1/bingqing5/redis_test/output/campus_data.txt"; private static String outputPath = "/data1/bingqing/redis_test/output/campus_data.txt"; /** 已處理過的uid數(shù)據(jù)存儲路徑 */ // private static String processedUidDataPath = "/data1/bingqing5/redis_test/process/processed_uid.txt"; private static String processedUidDataPath = "/data1/bingqing/redis_test/process/processed_uid.txt"; public static void main(String[] args) { StopWatch stopWatch = new StopWatch(); // 開始時間 stopWatch.start(); System.out.println("================程序開始==============="); transfer(dauDataPath, processedUidDataPath, outputPath); System.out.println("================程序結(jié)束==============="); // 結(jié)束時間 stopWatch.stop(); // 統(tǒng)計執(zhí)行時間(秒) System.out.println("執(zhí)行時長:" + stopWatch.getTime(TimeUnit.SECONDS) + " 秒."); } private static void transfer(String dauDataPath, String processedUidDataPath, String outputPath) { List<String> dauDataList = FileUtils.readInfoFromFile(dauDataPath); Jedis jedis = new Jedis("rdsxxxxx.xxxx.xxxx.xxxx.com.cn", 50000); List<String> processedUidDataList = FileUtils.readInfoFromFile(processedUidDataPath); LinkedHashSet<String> linkedHashSet = ListUtil.getLinkedHashSet(processedUidDataList); List<String> uidList = new ArrayList<>(); for (String uid : dauDataList) { if (linkedHashSet.contains(uid)) { continue; } else { uidList.add(uid); } } List<List<String>> bucket; if (uidList.size() != 0) { bucket = ListUtil.splitList(uidList, 50000); } else { bucket = new ArrayList<>(); } for (List<String> list : bucket) { List<String> jsonStrList = jedis.mget(list.toArray(new String[list.size()])); for (int i = 0; i < list.size(); i++) { String uid = list.get(i); FileUtils.appendInfoToFile(processedUidDataPath, uid); String jsonStr = jsonStrList.get(i); if (jsonStr == null || jsonStr == "") continue; String content = uid + "\t" + jsonStr; FileUtils.appendInfoToFile(outputPath, content); } try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } finally { jedis.close(); } System.out.println(list.size()); } } }
修改代碼以后,替換掉原先運(yùn)行的jar包,接著運(yùn)行。發(fā)現(xiàn)內(nèi)存消耗明顯降低,穩(wěn)定占總內(nèi)存的20%。
然后嘗試修改了mget參數(shù)量,修改為50000條,再次運(yùn)行程序發(fā)現(xiàn)QPS穩(wěn)定在40K左右。
總結(jié)
本篇算是筆者剛接觸Redis不久的一篇隨手記。通過本次需求的開發(fā)經(jīng)歷,讓我對Redis有了直觀的了解,同時也理解了代碼優(yōu)化在實(shí)際生產(chǎn)工作和開發(fā)中的潛在價值。
關(guān)于Redis,在快速直接從Redis讀取數(shù)據(jù)的場景中,尤其是數(shù)據(jù)量大的時候,為了防止QPS過高,最好在處理一批次數(shù)據(jù)后空出一定的時間間隔,比如可以讓線程暫時休眠一定時間間隔,再進(jìn)行下批次讀取和處理。
關(guān)于代碼優(yōu)化,盡量創(chuàng)建可重復(fù)使用的對象,非必要不添加同類對象,避免大量創(chuàng)建對象帶來的資源消耗,本次經(jīng)歷也算是很鮮明的體會到這點(diǎn)。
以上為個人經(jīng)驗(yàn),希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
基于SpringBoot實(shí)現(xiàn)QQ郵箱驗(yàn)證碼注冊功能
QQ 郵箱是由騰訊公司推出的一款免費(fèi)郵箱服務(wù),它提供了完整的郵件發(fā)送和接收功能,并且還支持多種郵件格式和附件類型,QQ 郵箱還具有強(qiáng)大的反垃圾郵件功能,可以有效地過濾垃圾郵件,并保護(hù)用戶隱私和安全,所以本文給大家介紹了基于SpringBoot實(shí)現(xiàn)QQ郵箱驗(yàn)證碼注冊功能2024-11-11如何解決@value獲取不到y(tǒng)aml數(shù)組的問題
文章介紹了在使用YAML配置文件時,通過@Value注解獲取整數(shù)和數(shù)組列表的配置方法,并提供了兩種解決方案:一種適用于非嵌套列表,另一種適用于嵌套列表等復(fù)雜配置2024-11-11基于springboot微信公眾號開發(fā)(微信自動回復(fù))
這篇文章主要介紹了基于springboot微信公眾號開發(fā)(微信自動回復(fù)),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-11-11SpringBoot深入講解單元測試與熱部署應(yīng)用
這篇文章介紹了SpringBoot單元測試與熱部署,文中通過示例代碼介紹的非常詳細(xì)。對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-06-06