亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

講解如何利用 Python完成 Saga 分布式事務

 更新時間:2021年09月08日 15:11:13   作者:葉東富  
這篇文章主要介紹了如何利用 Python 完成一個 Saga 的分布式事務,需要的朋友可以參考下面文章具體的內容

銀行跨行轉賬業(yè)務是一個典型分布式事務場景,假設 A 需要跨行轉賬給 B,那么就涉及兩個銀行的數(shù)據(jù),無法通過一個數(shù)據(jù)庫的本地事務保證轉賬的 ACID,只能夠通過分布式事務來解決。

1、分布式事務

分布式事務在分布式環(huán)境下,為了滿足可用性、性能與降級服務的需要,降低一致性與隔離性的要求,一方面遵循 BASE 理論:

  • 基本業(yè)務可用性( Basic Availability )
  • 柔性狀態(tài)( Soft state )
  • 最終一致性( Eventual consistency )

另一方面,分布式事務也部分遵循 ACID 規(guī)范:

  • 原子性:嚴格遵循
  • 一致性:事務完成后的一致性嚴格遵循;事務中的一致性可適當放寬
  • 隔離性:并行事務間不可影響;事務中間結果可見性允許安全放寬
  • 持久性:嚴格遵循

2、SAGA

Saga 是這一篇數(shù)據(jù)庫論文SAGAS提到的一個分布式事務方案。其核心思想是將長事務拆分為多個本地短事務,由 Saga 事務協(xié)調器協(xié)調,如果各個本地事務成功完成那就正常完成,如果某個步驟失敗,則根據(jù)相反順序一次調用補償操作。

目前可用于 SAGA 的開源框架,主要為 Java 語言,其中以 seata 為代表。我們的例子采用 go 語言,使用的分布式事務框架為https://github.com/yedf/dtm,它對分布式事務的支持非常優(yōu)雅。下面來詳細講解 SAGA 的組成:

DTM 事務框架里,有 3 個角色,與經典的 XA 分布式事務一樣:

  • AP/應用程序,發(fā)起全局事務,定義全局事務包含哪些事務分支
  • RM/資源管理器,負責分支事務各項資源的管理
  • TM/事務管理器,負責協(xié)調全局事務的正確執(zhí)行,包括 SAGA 正向 /逆向操作的執(zhí)行

下面看一個成功完成的 SAGA 時序圖,就很容易理解 SAGA 分布式事務:

3、SAGA 實踐

對于我們要進行的銀行轉賬的例子,我們將在正向操作中,進行轉入轉出,在補償操作中,做相反的調整。

首先我們創(chuàng)建賬戶余額表:

CREATE TABLE dtm_busi.`user_account` ( 
  `id` int(11) AUTO_INCREMENT PRIMARY KEY, 
  `user_id` int(11) not NULL UNIQUE , 
  `balance` decimal(10,2) NOT NULL DEFAULT '0.00', 
  `create_time` datetime DEFAULT now(), 
  `update_time` datetime DEFAULT now() 
); 


我們先編寫核心業(yè)務代碼,調整用戶的賬戶余額

def saga_adjust_balance(cursor, uid, amount): 
  affected = utils.sqlexec(cursor, "update dtm_busi.user_account set balance=balance+%d where user_id=%d and balance >= -%d" %(amount, uid, amount)) 
  if affected == 0: 
    raise Exception("update error, balance not enough") 


下面我們來編寫具體的正向操作 /補償操作的處理函數(shù)

@app.post("/api/TransOutSaga") 
def trans_out_saga(): 
  saga_adjust_balance(c, out_uid, -30) 
  return {"dtm_result": "SUCCESS"} 
 
@app.post("/api/TransOutCompensate") 
def trans_out_compensate(): 
  saga_adjust_balance(c, out_uid, 30) 
  return {"dtm_result": "SUCCESS"} 
 
@app.post("/api/TransInSaga") 
def trans_in_saga(): 
  saga_adjust_balance(c, in_uid, 30) 
  return {"dtm_result": "SUCCESS"} 
 
@app.post("/api/TransInCompensate") 
def trans_in_compensate(): 
  saga_adjust_balance(c, in_uid, -30) 
  return {"dtm_result": "SUCCESS"} 


到此各個子事務的處理函數(shù)已經 OK 了,然后是開啟 SAGA 事務,進行分支調用

# 這是 dtm 服務地址 
dtm = "http://localhost:8080/api/dtmsvr" 
# 這是業(yè)務微服務地址 
svc = "http://localhost:5000/api" 
 
    req = {"amount": 30} 
    s = saga.Saga(dtm, utils.gen_gid(dtm)) 
    s.add(req, svc + "/TransOutSaga", svc + "/TransOutCompensate") 
    s.add(req, svc + "/TransInSaga", svc + "/TransInCompensate") 
    s.submit() 


至此,一個完整的 SAGA 分布式事務編寫完成。

如果您想要完整運行一個成功的示例,那么參考這個例子yedf/dtmcli-py-sample,將它運行起來非常簡單

# 部署啟動 dtm 
# 需要 docker 版本 18 以上 
git clone https://github.com/yedf/dtm 
cd dtm 
docker-compose up 
 
# 另起一個命令行 
git clone https://github.com/yedf/dtmcli-py-sample 
cd dtmcli-py-sample 
pip3 install flask dtmcli requests 
flask run 
 
# 另起一個命令行 
curl localhost:5000/api/fireSaga 

4、處理網絡異常

假設提交給 dtm 的事務中,調用轉入操作時,出現(xiàn)短暫的故障怎么辦?按照 SAGA 事務的協(xié)議,dtm 會重試未完成的操作,這時我們要如何處理?故障有可能是轉入操作完成后出網絡故障,也有可能是轉入操作完成中出現(xiàn)機器宕機。如何處理才能夠保障賬戶余額的調整是正確無問題的?

這類網絡異常的妥當處理,是分布式事務中的大難題,異常情況包括三類:重復請求、空補償、懸掛,都需要正確處理

DTM 提供了子事務屏障功能,保證上述異常情況下的業(yè)務邏輯,只會有一次正確順序下的成功提交。(子事務屏障詳情參考分布式事務最經典的七種解決方案的子事務屏障環(huán)節(jié))

我們把處理函數(shù)調整為:

@app.post("/api/TransOutSaga") 
def trans_out_saga(): 
  with barrier.AutoCursor(conn_new()) as cursor: 
    def busi_callback(c): 
      saga_adjust_balance(c, out_uid, -30) 
    barrier_from_req(request).call(cursor, busi_callback) 
  return {"dtm_result": "SUCCESS"} 


這里的 barrier_from_req(request).call(cursor, busi_callback)調用會使用子事務屏障技術,保證 busi_callback 回調函數(shù)僅被提交一次

您可以嘗試多次調用這個 TransIn 服務,僅有一次余額調整。

5、處理回滾

假如銀行將金額準備轉入用戶 2 時,發(fā)現(xiàn)用戶 2 的賬戶異常,返回失敗,會怎么樣?我們調整處理函數(shù),讓轉入操作返回失敗

@app.post("/api/TransInSaga") 
def trans_in_saga(): 
  return {"dtm_result": "FAILURE"} 


我們給出事務失敗交互的時序圖:

這里有一點,TransIn 的正向操作什么都沒有做,就返回了失敗,此時調用 TransIn 的補償操作,會不會導致反向調整出錯了呢?

不用擔心,前面的子事務屏障技術,能夠保證 TransIn 的錯誤如果發(fā)生在提交之前,則補償為空操作;TransIn 的錯誤如果發(fā)生在提交之后,則補償操作會將數(shù)據(jù)提交一次。

我們可以將返回錯誤的 TransIn 改成:

@app.post("/api/TransInSaga") 
def trans_in_saga(): 
  with barrier.AutoCursor(conn_new()) as cursor: 
    def busi_callback(c): 
      saga_adjust_balance(c, in_uid, 30) 
    barrier_from_req(request).call(cursor, busi_callback) 
  return {"dtm_result": "FAILURE"} 


最后的結果余額依舊會是對的,原理可以參考:分布式事務最經典的七種解決方案的子事務屏障環(huán)節(jié)

6、小結

在這篇文章里,我們介紹了 SAGA 的理論知識,也通過一個例子,完整給出了編寫一個 SAGA 事務的過程,涵蓋了正常成功完成,異常情況,以及成功回滾的情況。相信讀者通過這邊文章,對 SAGA 已經有了深入的理解。

文中使用的 dtm 是新開源的 Golang 分布式事務管理框架,功能強大,支持 TCC 、SAGA 、XA 、事務消息等事務模式,支持 Go 、python 、PHP 、node 、csharp 等語言的。同時提供了非常簡單易用的接口。

以上就是利用 Python 輕松完成一個 Saga 分布式事務的詳細內容,更多關于Python完成一個 Saga 分布式事務的資料請關注腳本之家其它相關文章!

相關文章

最新評論