golang實(shí)現(xiàn)可中斷的流式下載功能
golang實(shí)現(xiàn)可中斷的流式下載
最近有一個需要實(shí)現(xiàn)下載功能:
從服務(wù)器上讀取文件,返回一個ReadCloser
在用戶磁盤上創(chuàng)建文件,通過io.Copy實(shí)現(xiàn)文件下載(io.Copy是流式的操作,不會出現(xiàn)因文件過大而內(nèi)存暴漲的問題)
通過context實(shí)現(xiàn)暫停
1 流式下載:io.Copy
這里拷貝文件我們選擇的是io.Copy而非是通過ioutil.ReadAll()將body中返回的數(shù)據(jù)一次性讀取到內(nèi)存
通過io.Copy可以保證內(nèi)存占用一直處于一個比較穩(wěn)定的水平
2 可中斷:context
通過封裝io.Copy實(shí)現(xiàn)
- 將io.Copy封裝為一個方法,方法里傳入context,外部通過context.WithCancel()控制流式拷貝的暫停
3 全部代碼
這里演示我通過讀取S3的一個對象下載到本地
/*
通過io.Copy實(shí)現(xiàn)可中斷的流復(fù)制
*/
var (
ak = "99999999999999999999"
sk = "9999999999999999999999999999999999999999"
endpoint = "http://xx.xx.xx.xx:8060"
bucket = "test-bucket"
key = "d_xp/2G/2G.txt"
)
func main() {
s3Client := osg.Client.GetS3Client(ak, sk, endpoint)
ctx, cancelFunc := context.WithCancel(context.Background())
object, err := s3Client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
go func() {
time.Sleep(time.Second * 10)
cancelFunc()
log.Infof("canceled...")
}()
if err != nil {
log.Errorf("%v", err)
return
}
body := object.Body
defer body.Close()
file, err := os.Create("/Users/ziyi/GolandProjects/MyTest/demo_home/io_demo/target.txt")
if err != nil {
log.Errorf("%v", err)
return
}
defer file.Close()
_, err = FileService.Copy(ctx, file, body)
if err != nil {
log.Errorf("%v", err)
return
}
}
type fileService struct {
sem *semaphore.Weighted
}
var FileService = &fileService{
sem: semaphore.NewWeighted(1),
}
type IoCopyCancelledErr struct {
errMsg string
}
func (e *IoCopyCancelledErr) Error() string {
return fmt.Sprintf("io copy error, %s", e.errMsg)
}
func NewIoCopyCancelledErr(msg string) *IoCopyCancelledErr {
return &IoCopyCancelledErr{
errMsg: msg,
}
}
type readerFunc func(p []byte) (n int, err error)
func (rf readerFunc) Read(p []byte) (n int, err error) { return rf(p) }
//通過ctx實(shí)現(xiàn)可中斷的流拷貝
// Copy closable copy
func (s *fileService) Copy(ctx context.Context, dst io.Writer, src io.Reader) (int64, error) {
// Copy will call the Reader and Writer interface multiple time, in order
// to copy by chunk (avoiding loading the whole file in memory).
// I insert the ability to cancel before read time as it is the earliest
// possible in the call process.
size, err := io.Copy(dst, readerFunc(func(p []byte) (int, error) {
select {
// if context has been canceled
case <-ctx.Done():
// stop process and propagate "context canceled" error
return 0, NewIoCopyCancelledErr(ctx.Err().Error())
default:
// otherwise just run default io.Reader implementation
return src.Read(p)
}
}))
return size, err
}
以上就是golang實(shí)現(xiàn)可中斷的流式下載的詳細(xì)內(nèi)容,更多關(guān)于golang實(shí)現(xiàn)流式下載的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
golang數(shù)組和切片作為參數(shù)和返回值的實(shí)現(xiàn)
本文主要介紹了golang數(shù)組和切片作為參數(shù)和返回值的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-02-02
Golang的循環(huán)語句和循環(huán)控制語句詳解
循環(huán)語句為了簡化程序中有規(guī)律的重復(fù)性操作,需要用到循環(huán)語句,和其他大多數(shù)編程語言一樣,GO的循環(huán)語句有for循環(huán),不同的是沒有while循環(huán),而循環(huán)控制語句可以改變循環(huán)語句的執(zhí)行過程,下面給大家介紹下go循環(huán)語句和循環(huán)控制語句的相關(guān)知識,一起看看吧2021-11-11
線上問題排查之golang使用json進(jìn)行對象copy
這篇文章主要介紹了線上問題排查之golang使用json進(jìn)行對象copy,文章圍繞golang使用json進(jìn)行對象copy的內(nèi)存溢出問題排查展開詳細(xì)內(nèi)容需要的小伙伴可以參考一下2022-06-06
Go使用Gin+mysql實(shí)現(xiàn)增刪改查的詳細(xì)實(shí)例
golang本身沒有提供連接mysql的驅(qū)動,但是定義了標(biāo)準(zhǔn)接口供第三方開發(fā)驅(qū)動,下面這篇文章主要給大家介紹了關(guān)于Go使用Gin+mysql實(shí)現(xiàn)增刪改查的相關(guān)資料,需要的朋友可以參考下2022-12-12
一鍵定位Golang線上服務(wù)內(nèi)存泄露的秘籍
內(nèi)存泄露?別讓它拖垮你的Golang線上服務(wù)!快速掌握Go語言服務(wù)內(nèi)存泄漏排查秘籍,從此問題無處遁形,一文讀懂如何精準(zhǔn)定位與有效解決Golang應(yīng)用中的內(nèi)存問題,立即閱讀,讓性能飛升!2024-01-01

