Golang SSE 服務器端推送事件
寫在前面(愚蠢的我犯的錯誤)

本應該在EventStream的怎么都在響應這里出現(xiàn)
后面通過查找問題知道EventSream有特殊的回復格式為:data: [返回的內容]\n\n
示例:data: success\n\n
返回success字符串
原因
我做了一個在線點贊的實時更新的小玩意,我想著實時更新WS全雙工用不著。
SSE介紹
SSE(Server-Sent Event)是一種用于客戶端與服務器端實時通訊的技術。它允許服務器端發(fā)送事件到客戶端,客戶端可以通過 EventSource 接口來接收這些事件。通常情況下,SSE 是基于 HTTP 協(xié)議實現(xiàn)的,它不需要建立和維護長連接,但服務器可以長時間向客戶端推送數(shù)據(jù),而客戶端只需要等待并處理收到的數(shù)據(jù)即可。
Golang實現(xiàn)方式
SSE核心代碼
//sse Server-Sent-Events 服務事件流
http.HandleFunc("/sse", func(w http.ResponseWriter, r *http.Request) {
// 設置響應頭,表明這是一個 SSE 連接
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
//設置為刷新模式
flush, ok := w.(http.Flusher)
flush.Flush()
if !ok {
//判斷是否轉換成功,不成功則返回錯誤信息
responseInfo(http.StatusOK, "response cannot convert http.Flusher", w)
return
}
//這里因為我創(chuàng)建了一個Map用來存儲響應IO和Flush刷新,
//我在其他地方可以使用遍歷進行給各個通信端進行發(fā)送信息
respFlushMap[&w] = &flush
select {
case <-r.Context().Done():
delete(respFlushMap, &w)
return
}
})
發(fā)送事件請求
func main(){
//....
//點贊評論
http.HandleFunc("/favorite", favorite(client))
//...
}
// 點贊
func favorite(client *redis.Client) func(w http.ResponseWriter, r *http.Request) {
var lock = sync.RWMutex{}
return func(w http.ResponseWriter, r *http.Request) {
/* 業(yè)務處理邏輯
......
*/
//核心代碼 將點贊信息發(fā)送到各個SSE
for writer, flusher := range respFlushMap {
//一定要是這個格式“data: [數(shù)據(jù)內容]\n\n”不然前端不會體現(xiàn)在ServeEvent中而出現(xiàn)在response中
fmt.Fprintf(*writer, "data: %s@%s\n\n", member, method)
(*flusher).Flush()
}
}
}
全部代碼,包含了一些處理邏輯,可能比較混亂建議還是看看之前的
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/go-redis/redis/v8"
"html/template"
"log"
"math/rand"
"net"
"net/http"
"strconv"
"sync"
"time"
)
var commentNodeHashRedisKey = "commentNodeHashRedisKey"
var commentNodeSorterSetRedisKey = "commentNodeSorterSetRedisKey"
var respFlushMap = make(map[*http.ResponseWriter]*http.Flusher)
type CommentNode struct {
Content string `json:"content"` //內容
Score float64 `json:"score"` //點贊數(shù)
IP string `json:"ip"` //IP
NickName string `json:"nickName"` //昵稱
IsFavorite bool `json:"isFavorite"` //是否點贊
Member string `json:"member"` //唯一值
}
func main() {
//靜態(tài)資源文件
staticServer := http.FileServer(http.Dir("./template"))
//處理靜態(tài)資源文件
http.Handle("/static/", http.StripPrefix("/static/", staticServer))
//創(chuàng)建客戶端
client := redis.NewClient(&redis.Options{
Addr: "192.168.192.170:6379",
Password: "",
DB: 0,
})
//判斷時候連接成功
err := client.Ping(context.Background()).Err()
if err != nil {
log.Println("連接錯誤: ", err.Error())
}
log.Println("連接成功")
//添加評論
http.HandleFunc("/addComment", addComment(client))
//點贊評論
http.HandleFunc("/favorite", favorite(client))
//sse Server-Sent-Events 服務事件流
http.HandleFunc("/sse", func(w http.ResponseWriter, r *http.Request) {
// 設置響應頭,表明這是一個 SSE 連接
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
//設置為刷新模式
flush, ok := w.(http.Flusher)
flush.Flush()
if !ok {
responseInfo(http.StatusOK, "response cannot convert http.Flusher", w)
return
}
respFlushMap[&w] = &flush
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
select {
case <-r.Context().Done():
delete(respFlushMap, &w)
return
}
})
http.HandleFunc("/commentList", commentList(client))
//主頁
http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
//獲取模板
indexFile, err := template.ParseFiles("./template/index.html")
if err != nil {
log.Println(err.Error())
resp.Write([]byte("./template/index.html not found"))
return
}
//將內容輸出
indexFile.Execute(resp, nil)
})
//啟動服務
if err := http.ListenAndServe(":80", nil); err != nil {
log.Println("啟動服務失?。? + err.Error())
}
}
// 添加評論
func addComment(client *redis.Client) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
nickname := query.Get("nickName")
if nickname == "" {
nickname = "逸士"
}
//判斷內容是否為空
content := query.Get("content")
if content == "" {
responseInfo(http.StatusBadRequest, "your comment content is empty", w)
return
}
host, _, _ := net.SplitHostPort(r.RemoteAddr)
//使用時間戳
member := fmt.Sprint(time.Now().UnixMilli() ^ rand.Int63())
//序列化
comment, _ := json.Marshal(CommentNode{
Member: member,
IP: host,
NickName: nickname,
Content: content,
})
//添加到隊列中
client.HSet(r.Context(), commentNodeHashRedisKey, member, string(comment))
//更新排行
client.ZAdd(r.Context(), commentNodeSorterSetRedisKey, &redis.Z{
Score: 0,
Member: member,
})
responseInfo(http.StatusOK, "add comment success", w)
}
}
// 點贊
func favorite(client *redis.Client) func(w http.ResponseWriter, r *http.Request) {
var lock = sync.RWMutex{}
return func(w http.ResponseWriter, r *http.Request) {
lock.Lock()
defer lock.Unlock()
//查詢成員(member)是否存在
query := r.URL.Query()
member := query.Get("member")
method := r.Method
if member == "" {
responseInfo(http.StatusBadRequest, "member cannot be empty", w)
lock.Unlock()
return
}
//獲取分數(shù)
score, err := client.ZScore(r.Context(), commentNodeSorterSetRedisKey, member).Result()
//點贊減少
if method == http.MethodDelete {
score -= 2
}
score++
//更新排行
client.ZAdd(r.Context(), commentNodeSorterSetRedisKey, &redis.Z{
Score: score,
Member: member,
})
if err != nil {
//不存在返回錯誤
responseInfo(http.StatusBadRequest, "member does not exists", w)
return
}
//更新分數(shù)
var commentNode CommentNode
commentNodeStr, err := client.HGet(r.Context(), commentNodeHashRedisKey, member).Result()
if err != nil {
responseInfo(http.StatusBadRequest, "update hash scope error: "+err.Error(), w)
return
}
err = json.Unmarshal([]byte(commentNodeStr), &commentNode)
if err != nil {
responseInfo(http.StatusBadRequest, "update hash scope error: "+err.Error(), w)
return
}
commentNode.Score = score
data, _ := json.Marshal(commentNode)
if err = client.HSet(r.Context(), commentNodeHashRedisKey, member, data).Err(); err != nil {
responseInfo(http.StatusBadRequest, "update hash scope error: "+err.Error(), w)
return
}
//返回成功
responseInfo(http.StatusOK, "favorite comment success", w)
//將點贊信息發(fā)送到各個SSE
for writer, flusher := range respFlushMap {
fmt.Fprintf(*writer, "data: %s@%s\n\n", member, method)
(*flusher).Flush()
}
}
}
// 評論列表
func commentList(client *redis.Client) func(resp http.ResponseWriter, req *http.Request) {
return func(resp http.ResponseWriter, req *http.Request) {
query := req.URL.Query()
offset, err := strconv.Atoi(query.Get("offset"))
if err != nil {
offset = 100
}
//連接人地址
connectionAddr := req.RemoteAddr
log.Printf("連接人地址: %s\n", connectionAddr)
//獲取offset偏移量的排行
result, err := client.ZRevRangeWithScores(req.Context(), commentNodeSorterSetRedisKey, 0, int64(offset-1)).Result()
if err != nil || result == nil {
responseInfo(http.StatusOK, fmt.Sprint("錯誤:", err), resp)
return
}
//獲取評論詳細信息
members := make([]string, 0)
scopeMap := make(map[string]float64)
for _, item := range result {
members = append(members, item.Member.(string))
scopeMap[item.Member.(string)] = item.Score
}
rlt, err := client.HMGet(req.Context(), commentNodeHashRedisKey, members...).Result()
if err != nil {
responseInfo(http.StatusInternalServerError, err.Error(), resp)
return
}
data, _ := json.Marshal(rlt)
responseInfo(http.StatusOK, string(data), resp)
}
}
func responseInfo(code int, info string, w http.ResponseWriter) {
w.WriteHeader(code)
w.Write([]byte(info))
}
到此這篇關于Golang SSE 服務器端推送事件的文章就介紹到這了,更多相關Golang SSE推送內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
詳解Golang利用反射reflect動態(tài)調用方法
這篇文章主要介紹了詳解Golang利用反射reflect動態(tài)調用方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2018-11-11
Go語言實現(xiàn)一個Http Server框架(二) Server的抽象
上一篇文章對http庫的基本使用做了說明,這篇文章主要介紹了如何實現(xiàn)一個簡單地httpServer,文中代碼示例非常詳細,感興趣的朋友可以參考下2023-04-04
Go語言實現(xiàn)類似c++中的多態(tài)功能實例
Go本身不具有多態(tài)的特性,不能夠像Java、C++那樣編寫多態(tài)類、多態(tài)方法。但是,使用Go可以編寫具有多態(tài)功能的類綁定的方法。下面來一起看看吧2016-09-09

