亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Golang分布式注冊中心實現(xiàn)流程講解

 更新時間:2023年05月10日 10:55:06   作者:未來誰可知  
這篇文章主要介紹了Golang分布式注冊中心實現(xiàn)流程,注冊中心可以用于服務發(fā)現(xiàn),服務注冊,配置管理等方面,在分布式系統(tǒng)中,服務的發(fā)現(xiàn)和注冊是非常重要的組成部分,需要的朋友可以參考下

動手實現(xiàn)一個分布式注冊中心

以一個日志微服務為例,將日志服務注冊到注冊中心展開!

日志服務

log/Server.go

其實這一個日志類的功能就是有基本的寫文件功能,然后就是注冊一個http的接口去寫日志進去

package log
import (
	"io/ioutil"
	stlog "log"
	"net/http"
	"os"
)
var log *stlog.Logger
type fileLog string
// 編寫日志的方法
func (fl fileLog) Write(data []byte) (int, error) {
	f, err := os.OpenFile(string(fl), os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600)
	if err != nil {
		return 0, err
	}
	defer f.Close()
	return f.Write(data)
}
// 啟動一個日志對象 參數(shù)為日志文件名
func Run(destination string) {
	log = stlog.New(fileLog(destination), "[go] - ", stlog.LstdFlags)
}
// 自身注冊的一個服務方法
func RegisterHandlers() {
	http.HandleFunc("/log", func(w http.ResponseWriter, r *http.Request) {
		switch r.Method {
		case http.MethodPost:
			msg, err := ioutil.ReadAll(r.Body)
			if err != nil || len(msg) == 0 {
				w.WriteHeader(http.StatusBadRequest)
				return
			}
			write(string(msg))
		default:
			w.WriteHeader(http.StatusMethodNotAllowed)
			return
		}
	})
}
func write(message string) {
	log.Printf("%v\n", message)
}

log/Client.go

提供給外部服務的接口,定義好日志的命名格式,來顯示調用接口去使用已經(jīng)注冊好的日志接口并且返回狀態(tài)

package log
import (
	"bytes"
	"distributed/registry"
	"fmt"
	"net/http"
	stlog "log"
)
func SetClientLogger(serviceURL string, clientService registry.ServiceName) {
	stlog.SetPrefix(fmt.Sprintf("[%v] - ", clientService))
	stlog.SetFlags(0)
	stlog.SetOutput(&clientLogger{url: serviceURL})
}
type clientLogger struct {
	url string
}
func (cl clientLogger) Write(data []byte) (int, error) {
	b := bytes.NewBuffer([]byte(data))
	res, err := http.Post(cl.url+"/log", "text/plain", b)
	if err != nil {
		return 0, err
	}
	if res.StatusCode != http.StatusOK {
		return 0, fmt.Errorf("Failed to send log message. Service responded with %d - %s", res.StatusCode, res.Status)
	}
	return len(data), nil
}

主啟動程序LogService

啟動服務Logservice,主要執(zhí)行start方法,里面有細節(jié)實現(xiàn)服務注冊與服務發(fā)現(xiàn)

package main
import (
	"context"
	"distributed/log"
	"distributed/registry"
	"distributed/service"
	"fmt"
	stlog "log"
)
func main() {
  // 初始化啟動一個日志文件對象
	log.Run("./distributed.log")
  // 日志服務注冊的端口和地址
	host, port := "localhost", "4000"
	serviceAddress := fmt.Sprintf("http://%s:%s", host, port)
  // 初始化注冊對象
	r := registry.Registration{
		ServiceName:      registry.LogService, // 自身服務名
		ServiceURL:       serviceAddress,  // 自身服務地址
		RequiredServices: make([]registry.ServiceName, 0),// 依賴服務
		ServiceUpdateURL: serviceAddress + "/services", // 服務列表
		HeartbeatURL: serviceAddress + "/heartbeat",  // 心跳
	}
  // 啟動日志服務包含服務注冊,發(fā)現(xiàn)等細節(jié)
	ctx, err := service.Start(
		context.Background(),
		host,
		port,
		r,
		log.RegisterHandlers,
	)
  // 異常寫入到日志中
	if err != nil {
		stlog.Fatalln(err)
	}
  // 超時停止退出服務
	<-ctx.Done()
	fmt.Println("Shutting down log service.")
}

服務啟動與注冊

service/service.go

Start 啟動服務的主方法

/*
 host: 地址
 port: 端口號
 reg:  注冊的服務對象
 registerHandlersFunc: 注冊方法
*/
func Start(ctx context.Context, host, port string,
	reg registry.Registration,
	registerHandlersFunc func()) (context.Context, error) {
	registerHandlersFunc()  // 啟動注冊方法
	// 啟動服務
	ctx = startService(ctx, reg.ServiceName, host, port)
	// 注冊服務
	err := registry.RegisterService(reg)
	if err != nil {
		return ctx, err
	}
	return ctx, nil
}

startService

func startService(ctx context.Context, serviceName registry.ServiceName,
	host, port string) context.Context {
	ctx, cancel := context.WithCancel(ctx)
	var srv http.Server
	srv.Addr = ":" + port
	// 該協(xié)程為監(jiān)聽http服務,并且停止服務的時候cancel
	go func() {
		log.Println(srv.ListenAndServe())
		// 刪除對應的服務
		err := registry.ShutdownService(fmt.Sprintf("http://%s:%s", host, port))
		if err != nil {
			log.Println(err)
		}
		cancel()
	}()
	// 該協(xié)程為監(jiān)聽手動停止服務的信號
	go func() {
		fmt.Printf("%v started. Press any key to stop. \n", serviceName)
		var s string
		fmt.Scanln(&s)
		err := registry.ShutdownService(fmt.Sprintf("http://%s:%s", host, port))
		if err != nil {
			log.Println(err)
		}
		srv.Shutdown(ctx)
		cancel()
	}()
	return ctx
}

服務注冊與發(fā)現(xiàn)

registry/client.go

注冊服務的時候會連著心跳以及服務更新的方法一起注冊!

而服務更新里面的細節(jié)就是自己自定義了一個Handler然后ServeHttp方法里面去update全局的服務提供對象,

update主要是更新服務和刪除服務的最新消息

然后就是提供一個注銷服務的方法

package registry
import (
	"bytes"
	"encoding/json"
	"fmt"
	"log"
	"math/rand"
	"net/http"
	"net/url"
	"sync"
)
// 注冊服務
func RegisterService(r Registration) error {
	// 獲得心跳地址并注冊
	heartbeatURL, err := url.Parse(r.HeartbeatURL)
	if err != nil {
		return err
	}
	http.HandleFunc(heartbeatURL.Path, func (w http.ResponseWriter, r *http.Request)  {
		w.WriteHeader(http.StatusOK)
	})
   // 獲得服務更新地址,并且自定義http服務的handler,因為每次更新服務的時候,可以在ServeHttp方法里面去維護
	serviceUpdateURL, err := url.Parse(r.ServiceUpdateURL)
	if err != nil {
		return err
	}
	http.Handle(serviceUpdateURL.Path, &serviceUpdateHanlder{})
    // 寫入buf值將服務對象發(fā)送給注冊中心的services地址
	buf := new(bytes.Buffer)
	enc := json.NewEncoder(buf)
	err = enc.Encode(r)
	if err != nil {
		return err
	}
	res, err := http.Post(ServicesURL, "application/json", buf)
	if err != nil {
		return err
	}
	if res.StatusCode != http.StatusOK {
		return fmt.Errorf("Failed to register service. Registry service "+
			"responded with code %v", res.StatusCode)
	}
	return nil
}
type serviceUpdateHanlder struct{}
func (suh serviceUpdateHanlder) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodPost {
		w.WriteHeader(http.StatusMethodNotAllowed)
		return
	}
	dec := json.NewDecoder(r.Body)
	var p patch
	err := dec.Decode(&p)
	if err != nil {
		log.Println(err)
		w.WriteHeader(http.StatusBadRequest)
		return
	}
	fmt.Printf("Updated received %v\n", p)
	prov.Update(p) // 更新服務提供對象
}
// 刪除對應注冊中心的服務地址
func ShutdownService(url string) error {
	req, err := http.NewRequest(http.MethodDelete, ServicesURL,
		bytes.NewBuffer([]byte(url)))
	if err != nil {
		return err
	}
	req.Header.Add("Content-Type", "text/plain")
	res, err := http.DefaultClient.Do(req)
	if err != nil {
		return err
	}
	if res.StatusCode != http.StatusOK {
		return fmt.Errorf("Failed to deregister service. Registry "+
			"service responded with code %v", res.StatusCode)
	}
	return nil
}
// 更新服務列表
func (p *providers) Update(pat patch) {
	p.mutex.Lock()
	defer p.mutex.Unlock()
    // 將patch中有新增的進行添加
	for _, patchEntry := range pat.Added {
		if _, ok := p.services[patchEntry.Name]; !ok {
			p.services[patchEntry.Name] = make([]string, 0)
		}
		p.services[patchEntry.Name] = append(p.services[patchEntry.Name],
			patchEntry.URL)
	}
    // 將patch中被標記刪除的
	for _, patchEntry := range pat.Removed {
		if providerURLs, ok := p.services[patchEntry.Name]; ok {
			for i := range providerURLs {
				if providerURLs[i] == patchEntry.URL {
					p.services[patchEntry.Name] = append(providerURLs[:i],
						providerURLs[i+1:]...)
				}
			}
		}
	}
}
// 根據(jù)服務名負載均衡隨機獲取服務地址
func (p providers) get(name ServiceName) (string, error) {
	providers, ok := p.services[name]
	if !ok {
		return "", fmt.Errorf("No providers available for service %v", name)
	}
	idx := int(rand.Float32() * float32(len(providers)))
	return providers[idx], nil
}
// 對外暴露生產(chǎn)者的方法
func GetProvider(name ServiceName) (string, error) {
	return prov.get(name)
}
type providers struct {
	services map[ServiceName][]string
	mutex    *sync.RWMutex
}
// 服務提供對象
var prov = providers{
	services: make(map[ServiceName][]string),  // 服務列表  服務名->集群地址集合
	mutex:    new(sync.RWMutex),  // 鎖  防止服務注冊更新時的并發(fā)情況
}

registry/registration.go

主要是一些關于服務使用到的參數(shù)以及對象!

package registry
type Registration struct {
	ServiceName      ServiceName   // 服務名
	ServiceURL       string        // 服務地址
	RequiredServices []ServiceName // 依賴的服務
	ServiceUpdateURL string        // 服務更新的地址
	HeartbeatURL     string        // 心跳地址
}
type ServiceName string
// 服務名集合
const (
	LogService     = ServiceName("LogService")
	GradingService = ServiceName("GradingService")
	PortalService  = ServiceName("Portald")
)
// 服務對象參數(shù)
type patchEntry struct {
	Name ServiceName
	URL  string
}
// 更新的服務對象參數(shù)  
type patch struct {
	Added   []patchEntry
	Removed []patchEntry
}

registry/server.go

服務端的注冊中心服務的增刪改查管理以及心跳檢測,及時將最新的更新的服務消息通知回給客戶端

package registry
import (
	"bytes"
	"encoding/json"
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"sync"
	"time"
)
const ServerPort = ":3000"
const ServicesURL = "http://localhost" + ServerPort + "/services"   // 注冊中心地址
// 服務對象集合
type registry struct {
	registrations []Registration
	mutex         *sync.RWMutex
}
// 添加服務
func (r *registry) add(reg Registration) error {
	r.mutex.Lock()
	r.registrations = append(r.registrations, reg)
	r.mutex.Unlock()
	err := r.sendRequiredServices(reg)
	r.notify(patch{
		Added: []patchEntry{
			patchEntry{
				Name: reg.ServiceName,
				URL:  reg.ServiceURL,
			},
		},
	})
	return err
}
// 通知服務接口請求去刷新改變后到服務
func (r registry) notify(fullPatch patch) {
	r.mutex.RLock()
	defer r.mutex.RUnlock()
	for _, reg := range r.registrations {
		go func(reg Registration) {
			for _, reqService := range reg.RequiredServices {
				p := patch{Added: []patchEntry{}, Removed: []patchEntry{}}
				sendUpdate := false
				for _, added := range fullPatch.Added {
					if added.Name == reqService {
						p.Added = append(p.Added, added)
						sendUpdate = true
					}
				}
				for _, removed := range fullPatch.Removed {
					if removed.Name == reqService {
						p.Removed = append(p.Removed, removed)
						sendUpdate = true
					}
				}
				if sendUpdate {
					err := r.sendPatch(p, reg.ServiceUpdateURL)
					if err != nil {
						log.Println(err)
						return
					}
				}
			}
		}(reg)
	}
}
// 更新每個服務的依賴服務
func (r registry) sendRequiredServices(reg Registration) error {
	r.mutex.RLock()
	defer r.mutex.RUnlock()
	var p patch
	for _, serviceReg := range r.registrations {
		for _, reqService := range reg.RequiredServices {
			if serviceReg.ServiceName == reqService {
				p.Added = append(p.Added, patchEntry{
					Name: serviceReg.ServiceName,
					URL:  serviceReg.ServiceURL,
				})
			}
		}
	}
	err := r.sendPatch(p, reg.ServiceUpdateURL)
	if err != nil {
		return err
	}
	return nil
}
// 告訴客戶端更新,最新的服務列表是這個
func (r registry) sendPatch(p patch, url string) error {
	d, err := json.Marshal(p)
	if err != nil {
		return err
	}
	_, err = http.Post(url, "application/json", bytes.NewBuffer(d))
	if err != nil {
		return err
	}
	return nil
}
// 注冊中心刪除服務對象
func (r *registry) remove(url string) error {
	for i := range reg.registrations {
		if reg.registrations[i].ServiceURL == url {
			// 通知客戶端更新對象信息
			r.notify(patch{
				Removed: []patchEntry{
					{
						Name: r.registrations[i].ServiceName,
						URL:  r.registrations[i].ServiceURL,
					},
				},
			})
			r.mutex.Lock()
			reg.registrations = append(reg.registrations[:i], reg.registrations[i+1:]...)
			r.mutex.Unlock()
			return nil
		}
	}
	return fmt.Errorf("Service at URL %s not found", url)
}
// 心跳檢測
func (r *registry) heartbeat(freq time.Duration) {
	for {
		var wg sync.WaitGroup
		for _, reg := range r.registrations {
			wg.Add(1)
			go func(reg Registration) {
				defer wg.Done()
				success := true
				for attemps := 0; attemps < 3; attemps++ {
					res, err := http.Get(reg.HeartbeatURL)
					if err != nil {
						log.Println(err)
					} else if res.StatusCode == http.StatusOK {
						log.Printf("Heartbeat check passed for %v", reg.ServiceName)
						// 如果心跳恢復了,把服務重新注冊回來
						if !success {
							r.add(reg)
						}
						break;
					}
					// 如果執(zhí)行到這就代表著心跳沒有響應,那就代表著需要回收注銷該服務了
					log.Printf("Heartbeat check failed for %v", reg.ServiceName)
					if success {
						success = false
						r.remove(reg.ServiceURL)
					}
					time.Sleep(1 * time.Second)
				}
			}(reg)
			wg.Wait()
			time.Sleep(freq)
		}
	}
}
var once sync.Once
func SetupRegistryService() {
	// 保證執(zhí)行一次進行服務到心跳 每三秒循環(huán)一遍
	once.Do(func() {
		go reg.heartbeat(3 * time.Second)
	})
}
var reg = registry{
	registrations: make([]Registration, 0),
	mutex:         new(sync.RWMutex),
}
type RegistryService struct{}
func (s RegistryService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	log.Println("Request received")
	switch r.Method {
	case http.MethodPost:
		dec := json.NewDecoder(r.Body)
		var r Registration
		err := dec.Decode(&r)
		if err != nil {
			log.Println(err)
			w.WriteHeader(http.StatusBadRequest)
			return
		}
		log.Printf("Adding service: %v with URL: %s\n", r.ServiceName,
			r.ServiceURL)
		err = reg.add(r)
		if err != nil {
			log.Println(err)
			w.WriteHeader(http.StatusBadRequest)
			return
		}
	case http.MethodDelete:
		payload, err := ioutil.ReadAll(r.Body)
		if err != nil {
			log.Println(err)
			w.WriteHeader(http.StatusInternalServerError)
			return
		}
		url := string(payload)
		log.Printf("Removing service at URL: %s", url)
		err = reg.remove(url)
		if err != nil {
			log.Println(err)
			w.WriteHeader(http.StatusInternalServerError)
			return
		}
	default:
		w.WriteHeader(http.StatusMethodNotAllowed)
		return
	}
}

到此這篇關于Golang分布式注冊中心實現(xiàn)流程講解的文章就介紹到這了,更多相關Golang分布式注冊中心內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • 代碼之美:探索Go語言斷行規(guī)則的奧秘

    代碼之美:探索Go語言斷行規(guī)則的奧秘

    Go語言是一門以簡潔、清晰和高效著稱的編程語言,而斷行規(guī)則是其代碼風格的重要組成部分,通過深入研究Go語言的斷行規(guī)則,我們可以更好地理解和編寫優(yōu)雅的代碼,本文將從語法規(guī)范、代碼風格和最佳實踐等方面進行探討,幫助讀者更好地理解和應用Go語言的斷行規(guī)則
    2023-10-10
  • 淺談go中切片比數(shù)組好用在哪

    淺談go中切片比數(shù)組好用在哪

    數(shù)組和切片都是常見的數(shù)據(jù)結構,本文將介紹Go語言中數(shù)組和切片的基本概念,同時詳細探討切片的優(yōu)勢,感興趣的可以了解下
    2023-06-06
  • Golang解析JSON遇到的坑及解決方法

    Golang解析JSON遇到的坑及解決方法

    這篇文章主要為大家介紹了Golang解析JSON時會遇到的一些坑及解決方法,文中的示例代碼講解詳細,對我們學習Go語言有一點的幫助,需要的可以參考一下
    2023-02-02
  • Go語言實現(xiàn)可選參數(shù)的方法小結

    Go語言實現(xiàn)可選參數(shù)的方法小結

    這篇文章主要為大家詳細介紹了Go語言實現(xiàn)可選參數(shù)的一些常見方法,文中的示例代碼講解詳細,感興趣的小伙伴可以跟隨小編一起學習一下
    2024-02-02
  • 詳解如何熱重啟golang服務器

    詳解如何熱重啟golang服務器

    這篇文章主要介紹了詳解如何熱重啟golang服務器,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2018-08-08
  • Golang Gin框架實現(xiàn)多種數(shù)據(jù)格式返回結果詳解

    Golang Gin框架實現(xiàn)多種數(shù)據(jù)格式返回結果詳解

    這篇文章主要介紹了Golang Gin框架實現(xiàn)多種數(shù)據(jù)格式返回結果,我們都知道,一個完整的請求包含請求和處理請求以及結果返回三個步驟,在服務器端對請求處理完成以后,會將結果返回給客戶端,在gin框架中,支持返回多種請求數(shù)據(jù)格式,下面我們一起來看看
    2023-05-05
  • go語言數(shù)組及結構體繼承和初始化示例解析

    go語言數(shù)組及結構體繼承和初始化示例解析

    這篇文章主要為大家介紹了go語言數(shù)組及結構體繼承和初始化示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步早日升職加薪
    2022-04-04
  • 關于Golang變量初始化/類型推斷/短聲明的問題

    關于Golang變量初始化/類型推斷/短聲明的問題

    這篇文章主要介紹了關于Golang變量初始化/類型推斷/短聲明的問題,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-02-02
  • intelliJ?idea安裝go開發(fā)環(huán)境并搭建go項目(打包)全過程

    intelliJ?idea安裝go開發(fā)環(huán)境并搭建go項目(打包)全過程

    最近在配置idea開發(fā)go語言時碰到很多問題,所以這里給大家總結下,這篇文章主要給大家介紹了關于intelliJ?idea安裝go開發(fā)環(huán)境并搭建go項目(打包)的相關資料,需要的朋友可以參考下
    2023-10-10
  • Golang基礎教程之字符串string實例詳解

    Golang基礎教程之字符串string實例詳解

    這篇文章主要給大家介紹了關于Golang基礎教程之字符串string的相關資料,需要的朋友可以參考下
    2022-07-07

最新評論