java實(shí)現(xiàn)MapReduce對(duì)文件進(jìn)行切分的示例代碼
比如有海量的文本文件,如訂單,頁面點(diǎn)擊事件的記錄,量特別大,很難搞定。
那么我們?cè)撛鯓咏鉀Q海量數(shù)據(jù)的計(jì)算?
1、獲取總行數(shù)
2、計(jì)算每個(gè)文件中存多少數(shù)據(jù)
3、split切分文件
4、reduce將文件進(jìn)行匯總
例如這里有百萬條數(shù)據(jù),單個(gè)文件操作太麻煩,所以我們需要進(jìn)行切分
在切分文件的過程中會(huì)出現(xiàn)文件不能整個(gè)切分的情況,可能有剩下的數(shù)據(jù)并沒有被讀取到,所以我們每個(gè)切分128條數(shù)據(jù),不足128條再保留到一個(gè)文件中
創(chuàng)建MapTask
import java.io.*; import java.util.HashMap; import java.util.Map; import java.util.Set; public class MapTask extends Thread { //用來接收具體的哪一個(gè)文件 private File file; private int flag; public MapTask(File file, int flag) { this.file = file; this.flag = flag; } @Override public void run() { try { BufferedReader br = new BufferedReader(new FileReader(file)); String line; HashMap<String, Integer> map = new HashMap<String, Integer>(); while ((line = br.readLine()) != null) { /** * 統(tǒng)計(jì)班級(jí)人數(shù)HashMap存儲(chǔ) */ String clazz = line.split(",")[4]; if (!map.containsKey(clazz)) { map.put(clazz, 1); } else { map.put(clazz, map.get(clazz) + 1); } } br.close(); BufferedWriter bw = new BufferedWriter( new FileWriter("F:\\IDEADEMO\\shujiabigdata\\part\\part---" + flag)); Set<Map.Entry<String, Integer>> entries = map.entrySet(); for (Map.Entry<String, Integer> entry : entries) { String key = entry.getKey(); Integer value = entry.getValue(); bw.write(key + ":" + value); bw.newLine(); } bw.flush(); bw.close(); } catch (Exception e) { e.printStackTrace(); } } }
創(chuàng)建Map
import java.io.File; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Map { public static void main(String[] args) { long start = System.currentTimeMillis(); // 多線程連接池(線程池) ExecutorService executorService = Executors.newFixedThreadPool(8); // 獲取文件列表 File file = new File("F:\\IDEADEMO\\shujiabigdata\\split"); File[] files = file.listFiles(); //創(chuàng)建多線程對(duì)象 int flag = 0; for (File f : files) { //為每一個(gè)文件啟動(dòng)一個(gè)線程 MapTask mapTask = new MapTask(f, flag); executorService.submit(mapTask); flag++; } executorService.shutdown(); long end = System.currentTimeMillis(); System.out.println(end-start); } }
創(chuàng)建ClazzSum
import java.io.BufferedReader; import java.io.FileReader; import java.util.HashMap; public class ClazzSum { public static void main(String[] args) throws Exception { long start = System.currentTimeMillis(); BufferedReader br = new BufferedReader( new FileReader("F:\\IDEADEMO\\shujiabigdata\\data\\bigstudents.txt")); String line; HashMap<String, Integer> map = new HashMap<String, Integer>(); while ((line = br.readLine()) != null) { String clazz = line.split(",")[4]; if (!map.containsKey(clazz)) { map.put(clazz, 1); } else { map.put(clazz, map.get(clazz) + 1); } } System.out.println(map); long end = System.currentTimeMillis(); System.out.println(end-start); } }
創(chuàng)建split128
import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.FileReader; import java.io.FileWriter; import java.util.ArrayList; public class Split128 { public static void main(String[] args) throws Exception { BufferedReader br = new BufferedReader( new FileReader("F:\\IDEADEMO\\shujiabigdata\\data\\students.txt")); //用作標(biāo)記文件,也作為文件名稱 int index = 0; BufferedWriter bw = new BufferedWriter( new FileWriter("F:\\IDEADEMO\\shujiabigdata\\split01\\split---" + index)); ArrayList<String> list = new ArrayList<String>(); String line; //用作累計(jì)讀取了多少行數(shù)據(jù) int flag = 0; int row = 0; while ((line = br.readLine()) != null) { list.add(line); flag++; // flag = 140 if (flag == 140) {// 一個(gè)文件讀寫完成,生成新的文件 row = 0 + 128 * index; for (int i = row; i <= row + 127; i++) { bw.write(list.get(i)); bw.newLine(); } bw.flush(); bw.close(); /** * 生成新的文件 * 計(jì)數(shù)清零 */ index++; flag = 12; bw = new BufferedWriter( new FileWriter("F:\\IDEADEMO\\shujiabigdata\\split01\\split---" + index)); } } //文件讀取剩余128*1.1范圍之內(nèi) for (int i = list.size() - flag; i < list.size(); i++) { bw.write(list.get(i)); bw.newLine(); } bw.flush(); bw.close(); } }
創(chuàng)建Reduce
import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.util.HashMap; public class Reduce { public static void main(String[] args) throws Exception { long start = System.currentTimeMillis(); HashMap<String, Integer> map = new HashMap<String, Integer>(); File file = new File("F:\\IDEADEMO\\shujiabigdata\\part"); File[] files = file.listFiles(); for (File f : files) { BufferedReader br = new BufferedReader(new FileReader(f)); String line; while ((line = br.readLine()) != null) { String clazz = line.split(":")[0]; int sum = Integer.valueOf(line.split(":")[1]); if (!map.containsKey(clazz)) { map.put(clazz, sum); } else { map.put(clazz, map.get(clazz) + sum); } } } long end = System.currentTimeMillis(); System.out.println(end-start); System.out.println(map); } }
最后將文件切分了8份,這里采用了線程池,建立線程連接,多個(gè)線程同時(shí)啟動(dòng),比單一文件采用多線程效率更高更好使。
到此這篇關(guān)于java實(shí)現(xiàn)MapReduce對(duì)文件進(jìn)行切分的示例代碼的文章就介紹到這了,更多相關(guān)java MapReduce 文件切分內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Springboot利用Aop捕捉注解實(shí)現(xiàn)業(yè)務(wù)異步執(zhí)行
在開發(fā)過程中,盡量會(huì)將比較耗時(shí)且并不會(huì)影響請(qǐng)求的響應(yīng)結(jié)果的業(yè)務(wù)放在異步線程池中進(jìn)行處理,那么到時(shí)什么任務(wù)在執(zhí)行的時(shí)候會(huì)創(chuàng)建單獨(dú)的線程進(jìn)行處理呢?這篇文章主要介紹了Springboot利用Aop捕捉注解實(shí)現(xiàn)業(yè)務(wù)異步執(zhí)行2023-04-04springboot用controller跳轉(zhuǎn)html頁面的實(shí)現(xiàn)
這篇文章主要介紹了springboot用controller跳轉(zhuǎn)html頁面的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09java并發(fā)學(xué)習(xí)之Executor源碼解析
這篇文章主要為大家介紹了java并發(fā)學(xué)習(xí)之Executor源碼示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-07-07java連接mysql數(shù)據(jù)庫詳細(xì)步驟解析
以下是對(duì)java連接mysql數(shù)據(jù)庫的具體詳細(xì)步驟進(jìn)行了分析介紹,需要的朋友可以過來參考下2013-08-08SpringBoot的10個(gè)參數(shù)驗(yàn)證技巧分享
參數(shù)驗(yàn)證很重要,是平時(shí)開發(fā)環(huán)節(jié)中不可少的一部分,但是我想很多后端同事會(huì)偷懶,干脆不錯(cuò),這樣很可能給系統(tǒng)的穩(wěn)定性和安全性帶來嚴(yán)重的危害,那么在Spring Boot應(yīng)用中如何做好參數(shù)校驗(yàn)工作呢,本文提供了10個(gè)小技巧,需要的朋友可以參考下2023-09-09java將數(shù)據(jù)寫入內(nèi)存,磁盤的方法
下面小編就為大家分享一篇java將數(shù)據(jù)寫入內(nèi)存,磁盤的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2018-01-01fastjson轉(zhuǎn)換對(duì)象實(shí)體@JsonProperty不生效問題及解決
這篇文章主要介紹了fastjson轉(zhuǎn)換對(duì)象實(shí)體@JsonProperty不生效問題及解決,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-08-08