關(guān)于go平滑重啟庫(kù)overseer實(shí)現(xiàn)原理詳解
overseer主要完成了三部分功能:
1、連接的無(wú)損關(guān)閉,2、連接的平滑重啟,3、文件變更的自動(dòng)重啟。
下面依次講一下:
一、連接的無(wú)損關(guān)閉
golang官方的net包是不支持連接的無(wú)損關(guān)閉的,當(dāng)主監(jiān)聽(tīng)協(xié)程退出時(shí),并不會(huì)等待各個(gè)實(shí)際work協(xié)程的處理完成。
以下是golang官方代碼:
Go/src/net/http/server.go
func (srv *Server) Serve(l net.Listener) error { if fn := testHookServerServe; fn != nil { fn(srv, l) // call hook with unwrapped listener } origListener := l l = &onceCloseListener{Listener: l} defer l.Close() if err := srv.setupHTTP2_Serve(); err != nil { return err } if !srv.trackListener(&l, true) { return ErrServerClosed } defer srv.trackListener(&l, false) baseCtx := context.Background() if srv.BaseContext != nil { baseCtx = srv.BaseContext(origListener) if baseCtx == nil { panic("BaseContext returned a nil context") } } var tempDelay time.Duration // how long to sleep on accept failure ctx := context.WithValue(baseCtx, ServerContextKey, srv) for { rw, err := l.Accept() if err != nil { if srv.shuttingDown() { return ErrServerClosed } if ne, ok := err.(net.Error); ok && ne.Temporary() { if tempDelay == 0 { tempDelay = 5 * time.Millisecond } else { tempDelay *= 2 } if max := 1 * time.Second; tempDelay > max { tempDelay = max } srv.logf("http: Accept error: %v; retrying in %v", err, tempDelay) time.Sleep(tempDelay) continue } return err } connCtx := ctx if cc := srv.ConnContext; cc != nil { connCtx = cc(connCtx, rw) if connCtx == nil { panic("ConnContext returned nil") } } tempDelay = 0 c := srv.newConn(rw) c.setState(c.rwc, StateNew, runHooks) // before Serve can return go c.serve(connCtx) } }
當(dāng)監(jiān)聽(tīng)套接字關(guān)閉,l.Accept()退出循環(huán)時(shí),并不會(huì)等待go c.serve(connCtx)協(xié)程的處理完成。
overseer的處理方式是,包裝了golang的監(jiān)聽(tīng)套接字和連接套接字,通過(guò)sync.WaitGroup提供了對(duì)主協(xié)程異步等待work協(xié)程處理完成的支持。
overseer代碼如下:
overseer-v1.1.6\graceful.go
func (l *overseerListener) Accept() (net.Conn, error) { conn, err := l.Listener.(*net.TCPListener).AcceptTCP() if err != nil { return nil, err } conn.SetKeepAlive(true) // see http.tcpKeepAliveListener conn.SetKeepAlivePeriod(3 * time.Minute) // see http.tcpKeepAliveListener uconn := overseerConn{ Conn: conn, wg: &l.wg, closed: make(chan bool), } go func() { //connection watcher select { case <-l.closeByForce: uconn.Close() case <-uconn.closed: //closed manually } }() l.wg.Add(1) return uconn, nil } //non-blocking trigger close func (l *overseerListener) release(timeout time.Duration) { //stop accepting connections - release fd l.closeError = l.Listener.Close() //start timer, close by force if deadline not met waited := make(chan bool) go func() { l.wg.Wait() waited <- true }() go func() { select { case <-time.After(timeout): close(l.closeByForce) case <-waited: //no need to force close } }() } //blocking wait for close func (l *overseerListener) Close() error { l.wg.Wait() return l.closeError } func (o overseerConn) Close() error { err := o.Conn.Close() if err == nil { o.wg.Done() o.closed <- true } return err }
在(l *overseerListener) Accept函數(shù)中,每生成一個(gè)work連接,執(zhí)行l(wèi).wg.Add(1),在(o overseerConn) Close函數(shù)中,每關(guān)閉一個(gè)work連接,執(zhí)行o.wg.Done()。
在異步關(guān)閉模式(l *overseerListener) release函數(shù)中和在同步關(guān)閉模式(l *overseerListener) Close函數(shù)中都會(huì)調(diào)用l.wg.Wait()以等待work協(xié)程的處理完成。
監(jiān)聽(tīng)套接字關(guān)閉流程:
1、work進(jìn)程收到重啟信號(hào),或者master進(jìn)程收到重啟信號(hào)然后轉(zhuǎn)發(fā)到work進(jìn)程。
2、work進(jìn)程的信號(hào)處理里包含對(duì)(l *overseerListener) release的調(diào)用。
3、在(l *overseerListener) release里關(guān)閉監(jiān)聽(tīng)套接字,并異步l.wg.Wait()。
4、在官方包net/http/server.go的 (srv *Server) Serve里l.Accept()出錯(cuò)返回,退出監(jiān)聽(tīng)循環(huán),然后執(zhí)行defer l.Close(),即(l *overseerListener) Close。
5、在(l *overseerListener) Close里同步執(zhí)行l(wèi).wg.Wait(),等待work連接處理完成。
6、work連接處理完成時(shí),會(huì)調(diào)用(o overseerConn) Close(),進(jìn)而調(diào)用o.wg.Done()。
7、所有work連接處理完成后,向master進(jìn)程發(fā)送SIGUSR1信號(hào)。
8、master進(jìn)程收到SIGUSR1信號(hào)后,將true寫(xiě)入mp.descriptorsReleased管道。
9、master進(jìn)程的(mp *master) fork里,收到mp.descriptorsReleased后,結(jié)束本次fork,進(jìn)入下一次fork。
二、連接的平滑重啟
所謂平滑重啟,就是重啟不會(huì)造成客戶端的斷連,對(duì)客戶端無(wú)感知,比如原有的排隊(duì)連接不會(huì)被丟棄,所以監(jiān)聽(tīng)套接字通過(guò)master進(jìn)程在新舊work進(jìn)程間傳遞,而不是新啟的work進(jìn)程重新創(chuàng)建監(jiān)聽(tīng)連接。
監(jiān)聽(tīng)套接字由master進(jìn)程創(chuàng)建:
overseer-v1.1.6/proc_master.go
func (mp *master) retreiveFileDescriptors() error { mp.slaveExtraFiles = make([]*os.File, len(mp.Config.Addresses)) for i, addr := range mp.Config.Addresses { a, err := net.ResolveTCPAddr("tcp", addr) if err != nil { return fmt.Errorf("Invalid address %s (%s)", addr, err) } l, err := net.ListenTCP("tcp", a) if err != nil { return err } f, err := l.File() if err != nil { return fmt.Errorf("Failed to retreive fd for: %s (%s)", addr, err) } if err := l.Close(); err != nil { return fmt.Errorf("Failed to close listener for: %s (%s)", addr, err) } mp.slaveExtraFiles[i] = f } return nil }
從mp.Config.Addresses中拿到地址,建立監(jiān)聽(tīng)連接,最后把文件句柄存入mp.slaveExtraFiles。
在這個(gè)過(guò)程中調(diào)用了(l *TCPListener) Close,但其實(shí)對(duì)work進(jìn)程無(wú)影響,影響的只是master進(jìn)程自己不能讀寫(xiě)監(jiān)聽(tīng)套接字。
這里引用下對(duì)網(wǎng)絡(luò)套接字close和shutdown的區(qū)別:
close ---- 關(guān)閉本進(jìn)程的socket id,但連接還是開(kāi)著的,用這個(gè)socket id的其它進(jìn)程還能用這個(gè)連接,能讀或?qū)戇@個(gè)socket id。
shutdown ---- 則破壞了socket 連接,讀的時(shí)候可能偵探到EOF結(jié)束符,寫(xiě)的時(shí)候可能會(huì)收到一個(gè)SIGPIPE信號(hào),這個(gè)信號(hào)可能直到socket buffer被填充了才收到,shutdown還有一個(gè)關(guān)閉方式的參數(shù),0 不能再讀,1不能再寫(xiě),2 讀寫(xiě)都不能。
將mp.slaveExtraFiles傳遞給子進(jìn)程即work進(jìn)程:
overseer-v1.1.6/proc_master.go
func (mp *master) fork() error { mp.debugf("starting %s", mp.binPath) cmd := exec.Command(mp.binPath) //mark this new process as the "active" slave process. //this process is assumed to be holding the socket files. mp.slaveCmd = cmd mp.slaveID++ //provide the slave process with some state e := os.Environ() e = append(e, envBinID+"="+hex.EncodeToString(mp.binHash)) e = append(e, envBinPath+"="+mp.binPath) e = append(e, envSlaveID+"="+strconv.Itoa(mp.slaveID)) e = append(e, envIsSlave+"=1") e = append(e, envNumFDs+"="+strconv.Itoa(len(mp.slaveExtraFiles))) cmd.Env = e //inherit master args/stdfiles cmd.Args = os.Args cmd.Stdin = os.Stdin cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr //include socket files cmd.ExtraFiles = mp.slaveExtraFiles if err := cmd.Start(); err != nil { return fmt.Errorf("Failed to start slave process: %s", err) } //was scheduled to restart, notify success if mp.restarting { mp.restartedAt = time.Now() mp.restarting = false mp.restarted <- true } //convert wait into channel cmdwait := make(chan error) go func() { cmdwait <- cmd.Wait() }() //wait.... select { case err := <-cmdwait: //program exited before releasing descriptors //proxy exit code out to master code := 0 if err != nil { code = 1 if exiterr, ok := err.(*exec.ExitError); ok { if status, ok := exiterr.Sys().(syscall.WaitStatus); ok { code = status.ExitStatus() } } } mp.debugf("prog exited with %d", code) //if a restarts are disabled or if it was an //unexpected crash, proxy this exit straight //through to the main process if mp.NoRestart || !mp.restarting { os.Exit(code) } case <-mp.descriptorsReleased: //if descriptors are released, the program //has yielded control of its sockets and //a parallel instance of the program can be //started safely. it should serve state.Listeners //to ensure downtime is kept at <1sec. The previous //cmd.Wait() will still be consumed though the //result will be discarded. } return nil }
通過(guò)cmd.ExtraFiles = mp.slaveExtraFiles語(yǔ)句向子進(jìn)程傳遞套接字,這個(gè)參數(shù)最終傳遞給fork系統(tǒng)調(diào)用,傳遞的fd會(huì)被子進(jìn)程繼承。
子進(jìn)程即work進(jìn)程處理繼承的套接字:
overseer-v1.1.6/proc_slave.go
func (sp *slave) run() error { sp.id = os.Getenv(envSlaveID) sp.debugf("run") sp.state.Enabled = true sp.state.ID = os.Getenv(envBinID) sp.state.StartedAt = time.Now() sp.state.Address = sp.Config.Address sp.state.Addresses = sp.Config.Addresses sp.state.GracefulShutdown = make(chan bool, 1) sp.state.BinPath = os.Getenv(envBinPath) if err := sp.watchParent(); err != nil { return err } if err := sp.initFileDescriptors(); err != nil { return err } sp.watchSignal() //run program with state sp.debugf("start program") sp.Config.Program(sp.state) return nil } func (sp *slave) initFileDescriptors() error { //inspect file descriptors numFDs, err := strconv.Atoi(os.Getenv(envNumFDs)) if err != nil { return fmt.Errorf("invalid %s integer", envNumFDs) } sp.listeners = make([]*overseerListener, numFDs) sp.state.Listeners = make([]net.Listener, numFDs) for i := 0; i < numFDs; i++ { f := os.NewFile(uintptr(3+i), "") l, err := net.FileListener(f) if err != nil { return fmt.Errorf("failed to inherit file descriptor: %d", i) } u := newOverseerListener(l) sp.listeners[i] = u sp.state.Listeners[i] = u } if len(sp.state.Listeners) > 0 { sp.state.Listener = sp.state.Listeners[0] } return nil }
子進(jìn)程只是重新包裝套接字,并沒(méi)有新建監(jiān)聽(tīng)連接,包裝成u := newOverseerListener(l)類(lèi)型,這些監(jiān)聽(tīng)套接字最后傳遞給sp.Config.Program(sp.state),即用戶的啟動(dòng)程序:
overseer-v1.1.6/example/main.go
// convert your 'main()' into a 'prog(state)' // 'prog()' is run in a child process func prog(state overseer.State) { fmt.Printf("app#%s (%s) listening...\n", BuildID, state.ID) http.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { d, _ := time.ParseDuration(r.URL.Query().Get("d")) time.Sleep(d) fmt.Fprintf(w, "app#%s (%s) %v says hello\n", BuildID, state.ID, state.StartedAt) })) http.Serve(state.Listener, nil) fmt.Printf("app#%s (%s) exiting...\n", BuildID, state.ID) } // then create another 'main' which runs the upgrades // 'main()' is run in the initial process func main() { overseer.Run(overseer.Config{ Program: prog, Address: ":5001", Fetcher: &fetcher.File{Path: "my_app_next"}, Debug: true, //display log of overseer actions TerminateTimeout: 10 * time.Minute, }) }
在用戶程序中http.Serve(state.Listener, nil)調(diào)用:
1、使用的accept方式是包裝后的(l *overseerListener) Accept()。
2、defer l.Close()使用也是包裝后的(l *overseerListener) Close()。
3、由(l *overseerListener) Accept()創(chuàng)建的work連接也都包裝成了overseerConn連接,在關(guān)閉時(shí)會(huì)調(diào)用(o overseerConn) Close()
三、文件變更的自動(dòng)重啟
能夠自動(dòng)監(jiān)視文件變化,有變更時(shí)自動(dòng)觸發(fā)重啟流程。
在master進(jìn)程啟動(dòng)時(shí)檢查配置,如果設(shè)置了mp.Config.Fetcher則進(jìn)入fetchLoop:
overseer-v1.1.6/proc_master.go
// fetchLoop is run in a goroutine func (mp *master) fetchLoop() { min := mp.Config.MinFetchInterval time.Sleep(min) for { t0 := time.Now() mp.fetch() //duration fetch of fetch diff := time.Now().Sub(t0) if diff < min { delay := min - diff //ensures at least MinFetchInterval delay. //should be throttled by the fetcher! time.Sleep(delay) } } }
mp.Config.MinFetchInterval默認(rèn)是1秒,也就是每秒檢查一次變更。time.Duration類(lèi)型,可以設(shè)置更小的粒度。
已經(jīng)支持的fetcher包括:fetcher_file.go、fetcher_github.go、fetcher_http.go、fetcher_s3.go。
以fetcher_file.go為例說(shuō)明。
1、文件變更的判斷:
overseer-v1.1.6/proc_master.go
//tee off to sha1 hash := sha1.New() reader = io.TeeReader(reader, hash) //write to a temp file _, err = io.Copy(tmpBin, reader) if err != nil { mp.warnf("failed to write temp binary: %s", err) return } //compare hash newHash := hash.Sum(nil) if bytes.Equal(mp.binHash, newHash) { mp.debugf("hash match - skip") return }
通過(guò)sha1算法實(shí)現(xiàn),比較新舊hash值,并沒(méi)有關(guān)注文件時(shí)間戳。
2、驗(yàn)證是可執(zhí)行文件,且是支持overseer的:
overseer-v1.1.6/proc_master.go
tokenIn := token() cmd := exec.Command(tmpBinPath) cmd.Env = append(os.Environ(), []string{envBinCheck + "=" + tokenIn}...) cmd.Args = os.Args returned := false go func() { time.Sleep(5 * time.Second) if !returned { mp.warnf("sanity check against fetched executable timed-out, check overseer is running") if cmd.Process != nil { cmd.Process.Kill() } } }() tokenOut, err := cmd.CombinedOutput() returned = true if err != nil { mp.warnf("failed to run temp binary: %s (%s) output \"%s\"", err, tmpBinPath, tokenOut) return } if tokenIn != string(tokenOut) { mp.warnf("sanity check failed") return }
這是通過(guò)overseer預(yù)埋的代碼實(shí)現(xiàn)的:
overseer-v1.1.6/overseer.go
//sanityCheck returns true if a check was performed func sanityCheck() bool { //sanity check if token := os.Getenv(envBinCheck); token != "" { fmt.Fprint(os.Stdout, token) return true } //legacy sanity check using old env var if token := os.Getenv(envBinCheckLegacy); token != "" { fmt.Fprint(os.Stdout, token) return true } return false }
這段代碼在main啟動(dòng)時(shí)在overseer.Run里會(huì)調(diào)用到,傳遞固定的環(huán)境變量,然后命令行輸出會(huì)原樣顯示出來(lái)即為成功。
3、覆蓋舊文件,并觸發(fā)重啟。
overseer-v1.1.6/proc_master.go
//overwrite! if err := overwrite(mp.binPath, tmpBinPath); err != nil { mp.warnf("failed to overwrite binary: %s", err) return } mp.debugf("upgraded binary (%x -> %x)", mp.binHash[:12], newHash[:12]) mp.binHash = newHash //binary successfully replaced if !mp.Config.NoRestartAfterFetch { mp.triggerRestart() }
由(mp *master) triggerRestart進(jìn)入重啟流程:
overseer-v1.1.6/proc_master.go
func (mp *master) triggerRestart() { if mp.restarting { mp.debugf("already graceful restarting") return //skip } else if mp.slaveCmd == nil || mp.restarting { mp.debugf("no slave process") return //skip } mp.debugf("graceful restart triggered") mp.restarting = true mp.awaitingUSR1 = true mp.signalledAt = time.Now() mp.sendSignal(mp.Config.RestartSignal) //ask nicely to terminate select { case <-mp.restarted: //success mp.debugf("restart success") case <-time.After(mp.TerminateTimeout): //times up mr. process, we did ask nicely! mp.debugf("graceful timeout, forcing exit") mp.sendSignal(os.Kill) } }
向子進(jìn)程發(fā)送mp.Config.RestartSignal信號(hào),子進(jìn)程收到信號(hào)后,關(guān)閉監(jiān)聽(tīng)套接字然后向父進(jìn)程發(fā)送SIGUSR1信號(hào):
overseer-v1.1.6/proc_slave.go
if len(sp.listeners) > 0 { //perform graceful shutdown for _, l := range sp.listeners { l.release(sp.Config.TerminateTimeout) } //signal release of held sockets, allows master to start //a new process before this child has actually exited. //early restarts not supported with restarts disabled. if !sp.NoRestart { sp.masterProc.Signal(SIGUSR1) } //listeners should be waiting on connections to close... }
父進(jìn)程收到SIGUSR1信號(hào)后,通知mp.descriptorsReleased管道監(jiān)聽(tīng)套接字已經(jīng)關(guān)閉:
overseer-v1.1.6/proc_master.go
//**during a restart** a SIGUSR1 signals //to the master process that, the file //descriptors have been released if mp.awaitingUSR1 && s == SIGUSR1 { mp.debugf("signaled, sockets ready") mp.awaitingUSR1 = false mp.descriptorsReleased <- true } else
最終回到(mp *master) fork函數(shù),fork函數(shù)一直在等待mp.descriptorsReleased通知或者cmd.Wait子進(jìn)程退出,收到管道通知后fork退出,進(jìn)入下一輪fork循環(huán)。
overseer-v1.1.6/proc_master.go
func (mp *master) fork() error { //... ... //... ... //... ... //convert wait into channel cmdwait := make(chan error) go func() { cmdwait <- cmd.Wait() }() //wait.... select { case err := <-cmdwait: //program exited before releasing descriptors //proxy exit code out to master code := 0 if err != nil { code = 1 if exiterr, ok := err.(*exec.ExitError); ok { if status, ok := exiterr.Sys().(syscall.WaitStatus); ok { code = status.ExitStatus() } } } mp.debugf("prog exited with %d", code) //if a restarts are disabled or if it was an //unexpected crash, proxy this exit straight //through to the main process if mp.NoRestart || !mp.restarting { os.Exit(code) } case <-mp.descriptorsReleased: //if descriptors are released, the program //has yielded control of its sockets and //a parallel instance of the program can be //started safely. it should serve state.Listeners //to ensure downtime is kept at <1sec. The previous //cmd.Wait() will still be consumed though the //result will be discarded. } return nil }
以上就是關(guān)于go平滑重啟庫(kù)overseer實(shí)現(xiàn)原理詳解的詳細(xì)內(nèi)容,更多關(guān)于go平滑重啟庫(kù)overseer的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
golang使用map支持高并發(fā)的方法(1000萬(wàn)次操作14ms)
這篇文章主要介紹了golang使用map支持高并發(fā)的方法(1000萬(wàn)次操作14ms),本文給大家詳細(xì)講解,對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-11-11golang 定時(shí)任務(wù)方面time.Sleep和time.Tick的優(yōu)劣對(duì)比分析
這篇文章主要介紹了golang 定時(shí)任務(wù)方面time.Sleep和time.Tick的優(yōu)劣對(duì)比分析,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-05-05Golang String字符串類(lèi)型轉(zhuǎn)Json格式
本文主要介紹了Golang String字符串類(lèi)型轉(zhuǎn)Json格式的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2024-05-05gin使用自定義結(jié)構(gòu)綁定表單數(shù)據(jù)的示例代碼
這篇文章主要介紹了gin使用自定義結(jié)構(gòu)綁定表單數(shù)據(jù)的示例代碼,代碼簡(jiǎn)單易懂,對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-11-11golang 檢查網(wǎng)絡(luò)狀態(tài)是否正常的方法
今天小編就為大家分享一篇golang 檢查網(wǎng)絡(luò)狀態(tài)是否正常的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-07-07