Python使用gRPC實(shí)現(xiàn)數(shù)據(jù)分析能力的共享
gRPC是一個(gè)高性能、開(kāi)源、通用的遠(yuǎn)程過(guò)程調(diào)用(RPC)框架,由Google推出。
它基于HTTP/2協(xié)議標(biāo)準(zhǔn)設(shè)計(jì)開(kāi)發(fā),默認(rèn)采用Protocol Buffers數(shù)據(jù)序列化協(xié)議,支持多種開(kāi)發(fā)語(yǔ)言。
在gRPC中,客戶端可以像調(diào)用本地對(duì)象一樣直接調(diào)用另一臺(tái)不同的機(jī)器上服務(wù)端應(yīng)用的方法,使得您能夠更容易地創(chuàng)建分布式應(yīng)用和服務(wù)。
gRPC支持多種語(yǔ)言,并提供了豐富的接口和庫(kù),以及簡(jiǎn)單易用的API,方便開(kāi)發(fā)者進(jìn)行快速開(kāi)發(fā)和部署。
同時(shí),gRPC的底層框架處理了所有強(qiáng)制嚴(yán)格的服務(wù)契約、數(shù)據(jù)序列化、網(wǎng)絡(luò)通訊、服務(wù)認(rèn)證、訪問(wèn)控制、服務(wù)觀測(cè)等等通常有關(guān)聯(lián)的復(fù)雜性,使得開(kāi)發(fā)者可以更加專注于業(yè)務(wù)邏輯的實(shí)現(xiàn)。
1. 為什么用 gRPC
我平時(shí)用的最多的語(yǔ)言其實(shí)是golang,但是,做數(shù)據(jù)分析相關(guān)的項(xiàng)目,不太可能繞開(kāi)python那些優(yōu)秀的庫(kù)。
于是,就想把數(shù)據(jù)分析的核心部分用python來(lái)實(shí)現(xiàn),并用gRPC接口的方式提供出來(lái)。
其他的業(yè)務(wù)部分,仍然用原先的語(yǔ)言來(lái)實(shí)現(xiàn)。
gRPC相比于http REST,性能和安全上更加有保障,而且對(duì)主流的開(kāi)發(fā)語(yǔ)言都支持的很好,不用擔(dān)心與其他語(yǔ)言開(kāi)發(fā)的業(yè)務(wù)系統(tǒng)對(duì)接的問(wèn)題。
最后,gRPC雖然接口的定義和實(shí)現(xiàn)比http REST更復(fù)雜,但是,它提供了方便的命令行工具,可以根據(jù)protocol buf的定義自動(dòng)生成對(duì)應(yīng)語(yǔ)言的類(lèi)型定義,以及stub相關(guān)的代碼等等。
實(shí)際開(kāi)發(fā)時(shí),一般只要關(guān)注接口的定義和業(yè)務(wù)功能的實(shí)現(xiàn)即可,gRPC框架需要的代碼可以通過(guò)命令行工具生成。
2. 安裝
對(duì)于Python語(yǔ)言,安裝gRPC框架本身和對(duì)應(yīng)的命令行工具即可:
$ pip install grpcio # gRPC框架 $ pip install grpcio-tools # gRPC命令行工具
3. 開(kāi)發(fā)步驟
開(kāi)發(fā)一個(gè)gPRC接口一般分為4個(gè)步驟:
- 使用
[protocal buf](https://protobuf.dev/overview)定義服務(wù)接口 - 通過(guò)命令行生成
client和server的模板代碼 - 實(shí)現(xiàn)server端代碼(具體業(yè)務(wù)功能)
- 實(shí)現(xiàn)client端代碼(具體業(yè)務(wù)功能)
下面通過(guò)一個(gè)示例演示gRPC接口的開(kāi)發(fā)步驟。
這個(gè)示例來(lái)自最近做量化分析時(shí)的一個(gè)指標(biāo)(MACD)的實(shí)現(xiàn),為了簡(jiǎn)化示例,下面實(shí)現(xiàn)MACD指標(biāo)的業(yè)務(wù)功能部分是虛擬的,不是實(shí)際的計(jì)算方法。
3.1. 定義服務(wù)接口
接口主要定義方法,參數(shù),返回值。
syntax = "proto3";
package idc;
// 定義服務(wù),也就是對(duì)外提供的功能
service Indicator {
rpc GetMACD(MACDRequest) returns (MACDReply) {}
}
// 請(qǐng)求的參數(shù)
message MACDRequest {
string start_date = 1; // 交易開(kāi)始時(shí)間
string end_date = 2; // 交易結(jié)束時(shí)間
}
// 返回值中每個(gè)對(duì)象的詳細(xì)內(nèi)容
message MACDData {
string date = 1; // 交易時(shí)間
float open = 2; // 開(kāi)盤(pán)價(jià)
float close = 3; // 收盤(pán)價(jià)
float high = 4; // 最高價(jià)
float low = 5; // 最低價(jià)
float macd = 6; // macd指標(biāo)值
}
// 返回的內(nèi)容,是一個(gè)數(shù)組
message MACDReply {
repeated MACDData macd = 1;
}
3.2. 生成模板代碼
在grpc_sample目錄下,執(zhí)行命令:
python -m grpc_tools.protoc -I./protos --python_out=. --pyi_out=. --grpc_python_out=. ./protos/indicator.proto
生成后文件結(jié)構(gòu)如下:

生成了3個(gè)文件:
indicator_pb2.py:proto文件定義的消息類(lèi)indicator_pb2_grpc.py:服務(wù)端和客戶端的模板代碼indicator_pb2.pyi:不是必須的,為了能讓mypy等工具校驗(yàn)代碼類(lèi)型是否正確
3.3. server端代碼
通過(guò)繼承indicator_pb2_grpc.py文件中的服務(wù)類(lèi),實(shí)現(xiàn)服務(wù)端功能。
# -*- coding: utf-8 -*-
from concurrent import futures
import grpc
import indicator_pb2
import indicator_pb2_grpc
class Indicator(indicator_pb2_grpc.IndicatorServicer):
def GetMACD(self, request, context):
macd = []
for i in range(1, 5):
data = indicator_pb2.MACDData(
date=request.start_date,
open=i * 1.1,
close=i * 2.1,
high=i * 3.1,
low=i * 0.1,
macd=i * 2.5,
)
macd.append(data)
return indicator_pb2.MACDReply(macd=macd)
def serve():
port = "50051"
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
indicator_pb2_grpc.add_IndicatorServicer_to_server(Indicator(), server)
server.add_insecure_port("[::]:" + port)
server.start()
print("Server started, listening on " + port)
server.wait_for_termination()
if __name__ == "__main__":
serve()
服務(wù)端需要實(shí)現(xiàn)proto文件中定義接口的具體業(yè)務(wù)功能。
3.4. client端代碼
使用indicator_pb2_grpc.py文件中的Stub來(lái)調(diào)用服務(wù)端的代碼。
# -*- coding: utf-8 -*-
import grpc
import indicator_pb2
import indicator_pb2_grpc
def run():
with grpc.insecure_channel("localhost:50051") as channel:
stub = indicator_pb2_grpc.IndicatorStub(channel)
response = stub.GetMACD(
indicator_pb2.MACDRequest(
start_date="2023-01-01",
end_date="2023-12-31",
)
)
print("indicator client received: ")
print(response)
if __name__ == "__main__":
run()
3.5. 運(yùn)行效果
加入客戶端和服務(wù)端代碼后,最后的文件結(jié)構(gòu)如下:

測(cè)試時(shí),先啟動(dòng)服務(wù):
$ python.exe .\idc_server.py Server started, listening on 50051
然后啟動(dòng)客戶端看效果:
$ python.exe .\idc_client.py
indicator client received:
macd {
date: "2023-01-01"
open: 1.1
close: 2.1
high: 3.1
low: 0.1
macd: 2.5
}
macd {
date: "2023-01-01"
open: 2.2
close: 4.2
high: 6.2
low: 0.2
macd: 5
}
macd {
date: "2023-01-01"
open: 3.3
close: 6.3
high: 9.3
low: 0.3
macd: 7.5
}
macd {
date: "2023-01-01"
open: 4.4
close: 8.4
high: 12.4
low: 0.4
macd: 10
}
4. 傳輸文件/圖片
除了上面的返回列表數(shù)據(jù)的接口比較常用以外,我用的比較多的還有一種接口就是返回圖片。
將使用python的matplotlib等庫(kù)生成的分析結(jié)果圖片提供給其他系統(tǒng)使用。
開(kāi)發(fā)的步驟和上面是一樣的。
4.1. 定義服務(wù)接口
定義文件相關(guān)的服務(wù)接口,文件的部分需要加上stream關(guān)鍵字,也就是流式數(shù)據(jù)。
syntax = "proto3";
package idc;
// 定義服務(wù),也就是對(duì)外提供的功能
service IndicatorGraph {
rpc GetMACDGraph(MACDGraphRequest) returns (stream MACDGraphReply) {}
}
// 請(qǐng)求的參數(shù)
message MACDGraphRequest {
string start_date = 1; // 交易開(kāi)始時(shí)間
string end_date = 2; // 交易結(jié)束時(shí)間
}
// 返回的內(nèi)容,是一個(gè)圖片
message MACDGraphReply {
bytes macd_chunk = 1;
}
注意,定義服務(wù)接口GetMACDGraph時(shí),返回值MACDGraphReply前面加上stream關(guān)鍵字。
返回的文件內(nèi)容是 bytes 二進(jìn)制類(lèi)型。
4.2. 生成模板代碼
執(zhí)行命令:
python -m grpc_tools.protoc -I./protos --python_out=. --pyi_out=. --grpc_python_out=. ./protos/indicator_graph.proto
生成3個(gè)文件:
- indicator_graph_pb2.py
- indicator_graph_pb2.pyi
- indicator_graph_pb2_grpc.py
4.3. server端代碼
首先,生成一個(gè)MACD指標(biāo)的圖片(macd.png)。

然后,服務(wù)端的代碼主要就是按塊讀取這個(gè)文件并返回。
import grpc
import indicator_graph_pb2
import indicator_graph_pb2_grpc
class IndicatorGraph(indicator_graph_pb2_grpc.IndicatorGraphServicer):
def GetMACDGraph(self, request, context):
chunk_size = 1024
with open("./macd.png", mode="rb") as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
return
response = indicator_graph_pb2.MACDGraphReply(macd_chunk=chunk)
yield response
4.4. client端代碼
客戶端的代碼也要相應(yīng)修改,不再是一次性接受請(qǐng)求的結(jié)果,而是循環(huán)接受,直至結(jié)束。
import grpc
import indicator_graph_pb2
import indicator_graph_pb2_grpc
def run():
with grpc.insecure_channel("localhost:50051") as channel:
stub = indicator_graph_pb2_grpc.IndicatorGraphStub(channel)
print("indicator client received: ")
with open("./received_macd.png", mode="wb") as f:
for response in stub.GetMACDGraph(
indicator_graph_pb2.MACDGraphRequest(
start_date="2023-01-01",
end_date="2023-12-31",
)
):
f.write(response.macd_chunk)
客戶端接收完成后,圖片保存在 received_macd.png 中。
實(shí)際執(zhí)行后,圖片可以正常保存并顯示。
5. 回顧
本篇是最近用gPRC封裝python數(shù)據(jù)分析相關(guān)業(yè)務(wù)過(guò)程中一些簡(jiǎn)單的總結(jié)。
這里沒(méi)有對(duì)gPRC做系統(tǒng)的介紹,它的官方文檔已經(jīng)非常完善,而且文檔中針對(duì)主流編程語(yǔ)言的示例也都有。
本篇筆記中的兩個(gè)示例雖然簡(jiǎn)單,卻是我用的最多的兩種情況:
一種是返回對(duì)象數(shù)組:是為了將pandas,numpy等庫(kù)處理后的數(shù)據(jù)返回出來(lái)供其他系統(tǒng)使用;
一種是返回文件/圖片:是為了將matplotlib,seaborn等庫(kù)生成的分析圖片返回出來(lái)供其他系統(tǒng)使用。
目前gPRC對(duì)我最大的好處是,它提供了一種穩(wěn)定可靠的,將python強(qiáng)大的數(shù)據(jù)分析能力結(jié)合到其他系統(tǒng)中的能力。
到此這篇關(guān)于Python使用gRPC實(shí)現(xiàn)數(shù)據(jù)分析能力的共享的文章就介紹到這了,更多相關(guān)Python gRPC數(shù)據(jù)分析內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python實(shí)戰(zhàn)小項(xiàng)目之身份證信息校驗(yàn)
讀萬(wàn)卷書(shū)不如行萬(wàn)里路,只學(xué)書(shū)上的理論是遠(yuǎn)遠(yuǎn)不夠的,只有在實(shí)戰(zhàn)中才能獲得能力的提升,本篇文章手把手帶你用Python做一個(gè)身份證信息校驗(yàn)的小項(xiàng)目,大家可以在過(guò)程中查缺補(bǔ)漏,提升水平2021-10-10
python在linux環(huán)境下安裝skimage的示例代碼
這篇文章主要介紹了python在linux環(huán)境下安裝skimage,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-10-10
Python基礎(chǔ)知識(shí)學(xué)習(xí)之類(lèi)的繼承
今天帶大家學(xué)習(xí)Python的基礎(chǔ)知識(shí),文中對(duì)python類(lèi)的繼承作了非常詳細(xì)的介紹,對(duì)正在學(xué)習(xí)python基礎(chǔ)的小伙伴們很有幫助,需要的朋友可以參考下2021-05-05
解決python中畫(huà)圖時(shí)x,y軸名稱出現(xiàn)中文亂碼的問(wèn)題
今天小編就為大家分享一篇解決python中畫(huà)圖時(shí)x,y軸名稱出現(xiàn)中文亂碼的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-01-01
在Debian下配置Python+Django+Nginx+uWSGI+MySQL的教程
這篇文章主要介紹了在Debian下配置Python+Django+Nginx+uWSGI+MySQL的教程,Debian系統(tǒng)和Nginx服務(wù)器皆是高性能的選擇,需要的朋友可以參考下2015-04-04
pydantic-resolve嵌套數(shù)據(jù)結(jié)構(gòu)生成LoaderDepend管理contextvars
這篇文章主要為大家介紹了pydantic-resolve解決嵌套數(shù)據(jù)結(jié)構(gòu)生成LoaderDepend管理contextvars的使用示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪<BR>2023-04-04
Python?SQLAlchemy插入日期時(shí)間時(shí)區(qū)詳解
SQLAlchemy是一個(gè)功能強(qiáng)大且流行的?Python?庫(kù),它提供了一種靈活有效的與數(shù)據(jù)庫(kù)交互的方式,在本文中,我們將了解SQLAlchemy如何更新日期、時(shí)間和時(shí)區(qū)并將其插入數(shù)據(jù)庫(kù),感興趣的可以了解下2023-09-09

