golang實(shí)現(xiàn)大文件讀取的代碼示例
在實(shí)際工作,我們需要讀取大數(shù)據(jù)文件,文件可能上G百G,所以我們不可能一次性的讀取到內(nèi)存,io.readAll不可用,那么我們可以考慮分塊,IO流的方式如io.copy.
對(duì)比兩者:
io.ReadAll:
io.ReadAll 是一個(gè)方便的函數(shù),可以將整個(gè)文件內(nèi)容一次性讀取到內(nèi)存中,并返回一個(gè)字節(jié)切片。這在處理小文件或者需要一次性加載數(shù)據(jù)的情況下非常適用。然而,對(duì)于大文件,使用 io.ReadAll 可能會(huì)導(dǎo)致以下問(wèn)題:
- 內(nèi)存消耗:讀取大文件可能導(dǎo)致內(nèi)存消耗急劇增加,甚至超出可用內(nèi)存限制。
- 性能問(wèn)題:應(yīng)用程序的響應(yīng)性可能下降,用戶可能會(huì)感到應(yīng)用程序不再響應(yīng)。
- 延遲問(wèn)題:大文件的讀取需要更多時(shí)間,可能導(dǎo)致較長(zhǎng)的延遲。
io.Copy:
io.Copy 函數(shù)通過(guò)逐塊的方式從源讀取數(shù)據(jù)并將其寫入目標(biāo),適用于流式傳輸大文件。它具有以下優(yōu)勢(shì):
- 低內(nèi)存消耗:io.Copy 逐塊處理數(shù)據(jù),不需要將整個(gè)文件加載到內(nèi)存中,從而降低內(nèi)存消耗。
- 高性能:流式傳輸提高了讀取和寫入的效率,適用于需要高性能處理大文件的情況。
- 更好的響應(yīng)性:io.Copy 不會(huì)一次性阻塞等待整個(gè)文件讀取完成,從而提高應(yīng)用程序的響應(yīng)性
示例:
package test
import (
"fmt"
"io"
"os"
"runtime"
"testing"
)
func largeFileRead(_file string) {
f, err := os.Open(_file)
if err != nil {
fmt.Errorf("打開(kāi)文件錯(cuò)誤:%v", err)
return
}
defer f.Close()
// 讀取數(shù)據(jù)大寫
buffer := make([]byte, 4096)
for {
getMemory()
n, err := f.Read(buffer)
if err != nil && err != io.EOF {
fmt.Errorf("讀取文件錯(cuò)誤:%v", err)
return
}
if n == 0 {
break
}
fmt.Println("內(nèi)容:", string(buffer))
}
fmt.Println("讀取完成")
}
func getMemory() {
// 獲取內(nèi)存信息
var m runtime.MemStats
runtime.ReadMemStats(&m)
fmt.Printf("%d KB\n", m.Alloc/1024)
}
func Test_largeFileRead(t *testing.T) {
fileName := "D:xxxx.txt"
largeFileRead(fileName)
}運(yùn)行結(jié)果:

實(shí)時(shí)內(nèi)存占用:854KB,文件大小102M
拓展:Golang并發(fā)讀取超大文件
當(dāng)今世界的任何計(jì)算機(jī)系統(tǒng)每天都會(huì)生成大量的日志或數(shù)據(jù)。隨著系統(tǒng)的發(fā)展,將調(diào)試數(shù)據(jù)存儲(chǔ)到數(shù)據(jù)庫(kù)中是不可行的,因?yàn)樗鼈兪遣豢勺兊模⑶抑荒苡糜诜治龊徒鉀Q故障。所以大部分公司傾向于將日志存儲(chǔ)在文件中,而這些文件通常位于本地磁盤中。
我們將使用Go語(yǔ)言,從一個(gè)大小為16GB的.txt或.log文件中提取日志。
讓我們開(kāi)始編碼……
首先,我們打開(kāi)文件。對(duì)于任何文件的IO,我們都將使用標(biāo)準(zhǔn)的Go庫(kù)os.File。
f, err := os.Open(fileName)
if err != nil {
fmt.Println("cannot able to read the file", err)
return
}
// UPDATE: close after checking error
defer file.Close() //Do not forget to close the file
打開(kāi)文件后,我們有以下兩個(gè)選項(xiàng)可以選擇:
- 逐行讀取文件,這有助于減少內(nèi)存緊張,但需要更多的時(shí)間。
- 一次將整個(gè)文件讀入內(nèi)存并處理該文件,這將消耗更多內(nèi)存,但會(huì)顯著減少時(shí)間。
由于文件太大,即16 GB,因此無(wú)法將整個(gè)文件加載到內(nèi)存中。但是第一種選擇對(duì)我們來(lái)說(shuō)也是不可行的,因?yàn)槲覀兿M趲酌腌妰?nèi)處理文件。
但你猜怎么著,還有第三種選擇。瞧……相比于將整個(gè)文件加載到內(nèi)存中,在Go語(yǔ)言中,我們還可以使用bufio.NewReader()將文件分塊加載。
r := bufio.NewReader(f)
for {
buf := make([]byte,4*1024) //the chunk size
n, err := r.Read(buf) //loading chunk into buffer
buf = buf[:n]
if n == 0 {
if err != nil {
fmt.Println(err)
break
}
if err == io.EOF {
break
}
return err
}
}
一旦我們將文件分塊,我們就可以分叉一個(gè)線程,即Go routine,同時(shí)處理多個(gè)文件區(qū)塊。上述代碼將修改為:
//sync pools to reuse the memory and decrease the preassure on Garbage Collector
linesPool := sync.Pool{New: func() interface{} {
lines := make([]byte, 500*1024)
return lines
}}
stringPool := sync.Pool{New: func() interface{} {
lines := ""
return lines
}}
slicePool := sync.Pool{New: func() interface{} {
lines := make([]string, 100)
return lines
}}
r := bufio.NewReader(f)
var wg sync.WaitGroup //wait group to keep track off all threads
for {
buf := linesPool.Get().([]byte)
n, err := r.Read(buf)
buf = buf[:n]
if n == 0 {
if err != nil {
fmt.Println(err)
break
}
if err == io.EOF {
break
}
return err
}
nextUntillNewline, err := r.ReadBytes('\n')//read entire line
if err != io.EOF {
buf = append(buf, nextUntillNewline...)
}
wg.Add(1)
go func() {
//process each chunk concurrently
//start -> log start time, end -> log end time
ProcessChunk(buf, &linesPool, &stringPool, &slicePool, start, end)
wg.Done()
}()
}
wg.Wait()
}
上面的代碼,引入了兩個(gè)優(yōu)化點(diǎn):
- sync.Pool是一個(gè)強(qiáng)大的對(duì)象池,可以重用對(duì)象來(lái)減輕垃圾收集器的壓力。我們將重用各個(gè)分片的內(nèi)存,以減少內(nèi)存消耗,大大加快我們的工作。
- Go Routines幫助我們同時(shí)處理緩沖區(qū)塊,這大大提高了處理速度。
現(xiàn)在讓我們實(shí)現(xiàn)ProcessChunk函數(shù),它將處理以下格式的日志行。
2020-01-31T20:12:38.1234Z, Some Field, Other Field, And so on, Till new line,...\n
我們將根據(jù)命令行提供的時(shí)間戳提取日志。
func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, slicePool *sync.Pool, start time.Time, end time.Time) {
//another wait group to process every chunk further
var wg2 sync.WaitGroup
logs := stringPool.Get().(string)
logs = string(chunk)
linesPool.Put(chunk) //put back the chunk in pool
//split the string by "\n", so that we have slice of logs
logsSlice := strings.Split(logs, "\n")
stringPool.Put(logs) //put back the string pool
chunkSize := 100 //process the bunch of 100 logs in thread
n := len(logsSlice)
noOfThread := n / chunkSize
if n%chunkSize != 0 { //check for overflow
noOfThread++
}
length := len(logsSlice)
//traverse the chunk
for i := 0; i < length; i += chunkSize {
wg2.Add(1)
//process each chunk in saperate chunk
go func(s int, e int) {
for i:= s; i<e;i++{
text := logsSlice[i]
if len(text) == 0 {
continue
}
logParts := strings.SplitN(text, ",", 2)
logCreationTimeString := logParts[0]
logCreationTime, err := time.Parse("2006-01- 02T15:04:05.0000Z", logCreationTimeString)
if err != nil {
fmt.Printf("\n Could not able to parse the time :%s for log : %v", logCreationTimeString, text)
return
}
// check if log's timestamp is inbetween our desired period
if logCreationTime.After(start) && logCreationTime.Before(end) {
fmt.Println(text)
}
}
textSlice = nil
wg2.Done()
}(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))
//passing the indexes for processing
}
wg2.Wait() //wait for a chunk to finish
logsSlice = nil
}
對(duì)上面的代碼進(jìn)行基準(zhǔn)測(cè)試。以16 GB的日志文件為例,提取日志所需的時(shí)間約為25秒。
完整的代碼示例如下:
func main() {
s := time.Now()
args := os.Args[1:]
if len(args) != 6 { // for format LogExtractor.exe -f "From Time" -t "To Time" -i "Log file directory location"
fmt.Println("Please give proper command line arguments")
return
}
startTimeArg := args[1]
finishTimeArg := args[3]
fileName := args[5]
file, err := os.Open(fileName)
if err != nil {
fmt.Println("cannot able to read the file", err)
return
}
defer file.Close() //close after checking err
queryStartTime, err := time.Parse("2006-01-02T15:04:05.0000Z", startTimeArg)
if err != nil {
fmt.Println("Could not able to parse the start time", startTimeArg)
return
}
queryFinishTime, err := time.Parse("2006-01-02T15:04:05.0000Z", finishTimeArg)
if err != nil {
fmt.Println("Could not able to parse the finish time", finishTimeArg)
return
}
filestat, err := file.Stat()
if err != nil {
fmt.Println("Could not able to get the file stat")
return
}
fileSize := filestat.Size()
offset := fileSize - 1
lastLineSize := 0
for {
b := make([]byte, 1)
n, err := file.ReadAt(b, offset)
if err != nil {
fmt.Println("Error reading file ", err)
break
}
char := string(b[0])
if char == "\n" {
break
}
offset--
lastLineSize += n
}
lastLine := make([]byte, lastLineSize)
_, err = file.ReadAt(lastLine, offset+1)
if err != nil {
fmt.Println("Could not able to read last line with offset", offset, "and lastline size", lastLineSize)
return
}
logSlice := strings.SplitN(string(lastLine), ",", 2)
logCreationTimeString := logSlice[0]
lastLogCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString)
if err != nil {
fmt.Println("can not able to parse time : ", err)
}
if lastLogCreationTime.After(queryStartTime) && lastLogCreationTime.Before(queryFinishTime) {
Process(file, queryStartTime, queryFinishTime)
}
fmt.Println("\nTime taken - ", time.Since(s))
}
func Process(f *os.File, start time.Time, end time.Time) error {
linesPool := sync.Pool{New: func() interface{} {
lines := make([]byte, 250*1024)
return lines
}}
stringPool := sync.Pool{New: func() interface{} {
lines := ""
return lines
}}
r := bufio.NewReader(f)
var wg sync.WaitGroup
for {
buf := linesPool.Get().([]byte)
n, err := r.Read(buf)
buf = buf[:n]
if n == 0 {
if err != nil {
fmt.Println(err)
break
}
if err == io.EOF {
break
}
return err
}
nextUntillNewline, err := r.ReadBytes('\n')
if err != io.EOF {
buf = append(buf, nextUntillNewline...)
}
wg.Add(1)
go func() {
ProcessChunk(buf, &linesPool, &stringPool, start, end)
wg.Done()
}()
}
wg.Wait()
return nil
}
func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, start time.Time, end time.Time) {
var wg2 sync.WaitGroup
logs := stringPool.Get().(string)
logs = string(chunk)
linesPool.Put(chunk)
logsSlice := strings.Split(logs, "\n")
stringPool.Put(logs)
chunkSize := 300
n := len(logsSlice)
noOfThread := n / chunkSize
if n%chunkSize != 0 {
noOfThread++
}
for i := 0; i < (noOfThread); i++ {
wg2.Add(1)
go func(s int, e int) {
defer wg2.Done() //to avaoid deadlocks
for i := s; i < e; i++ {
text := logsSlice[i]
if len(text) == 0 {
continue
}
logSlice := strings.SplitN(text, ",", 2)
logCreationTimeString := logSlice[0]
logCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString)
if err != nil {
fmt.Printf("\n Could not able to parse the time :%s for log : %v", logCreationTimeString, text)
return
}
if logCreationTime.After(start) && logCreationTime.Before(end) {
//fmt.Println(text)
}
}
}(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))
}
wg2.Wait()
logsSlice = nil
}
到此這篇關(guān)于golang實(shí)現(xiàn)大文件讀取的代碼示例的文章就介紹到這了,更多相關(guān)golang大文件讀取內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
go module構(gòu)建項(xiàng)目的實(shí)現(xiàn)
本文主要介紹了go module構(gòu)建項(xiàng)目的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-03-03
Go語(yǔ)言使用Json的方法實(shí)現(xiàn)
本文主要介紹了Go語(yǔ)言使用Json的方法實(shí)現(xiàn)2024-05-05
golang bad file descriptor問(wèn)題的解決方法
這篇文章主要給大家介紹了golang bad file descriptor問(wèn)題的解決方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-02-02
go語(yǔ)言實(shí)現(xiàn)通過(guò)FTP庫(kù)自動(dòng)上傳web日志
這篇文章主要介紹了go語(yǔ)言實(shí)現(xiàn)通過(guò)FTP庫(kù)自動(dòng)上傳web日志,非常簡(jiǎn)單實(shí)用,需要的小伙伴快來(lái)參考下吧。2015-03-03
淺析Go使用定時(shí)器時(shí)如何避免潛在的內(nèi)存泄漏陷阱
這篇文章來(lái)和大家一起探討一下Go?中如何高效使用?timer,特別是與select?一起使用時(shí),如何防止?jié)撛诘膬?nèi)存泄漏問(wèn)題,感興趣的可以了解下2024-01-01
GO語(yǔ)言實(shí)現(xiàn)批量壓縮圖片和水印
這篇文章主要介紹了GO語(yǔ)言實(shí)現(xiàn)批量壓縮圖片和水印,主要用到了github.com/nfnt/resize這個(gè)第三方庫(kù),僅僅支持JPG圖片格式,有相同需求的小伙伴參考下吧。2015-03-03
Go語(yǔ)言實(shí)現(xiàn)字符串切片賦值的方法小結(jié)
這篇文章主要給大家介紹了Go語(yǔ)言實(shí)現(xiàn)字符串切片賦值的兩種方法,分別是在for循環(huán)的range中以及在函數(shù)的參數(shù)傳遞中實(shí)現(xiàn),有需要的朋友們可以根據(jù)自己的需要選擇使用。下面來(lái)一起看看吧。2016-10-10

