解決python3 pika之連接斷開的問題
問題描述
在消費rabbitMQ隊列時, 每次進入回調(diào)函數(shù)內(nèi)需要進行一些比較耗時的操作;操作完成后給rabbitMQ server發(fā)送ack信號以dequeue本條消息。
問題就發(fā)生在發(fā)送ack操作時, 程序提示鏈接已被斷開或socket error。
源碼示例
#!/usr/bin #coding: utf-8 import pika import time USER = 'guest' PWD = 'guest' TEST_QUEUE = 'just4test' def callback(ch, method, properties, body): print(body) time.sleep(600) ch.basic_publish('', routing_key=TEST_QUEUE, body="fortest") ch.basic_ack(delivery_tag = method.delivery_tag) def test_main(): s_conn = pika.BlockingConnection( pika.ConnectionParameters('127.0.0.1', credentials=pika.PlainCredentials(USER, PWD))) chan = s_conn.channel() chan.queue_declare(queue=TEST_QUEUE) chan.basic_publish('', routing_key=TEST_QUEUE, body="fortest") chan.basic_consume(callback, queue=TEST_QUEUE) chan.start_consuming() if __name__ == "__main__": test_main()
運行一段時間后, 就會報錯:
[ERROR][pika.adapters.base_connection][2017-08-18 12:33:49]Error event 25, None [CRITICAL][pika.adapters.base_connection][2017-08-18 12:33:49]Tried to handle an error where no error existed [ERROR][pika.adapters.base_connection][2017-08-18 12:33:49]Fatal Socket Error: BrokenPipeError(32, 'Broken pipe')
問題排查
猜測:pika客戶端沒有及時發(fā)送心跳,連接被server斷開
一開始修改了heartbeat_interval參數(shù)值, 示例如下:
def test_main(): s_conn = pika.BlockingConnection( pika.ConnectionParameters('127.0.0.1', heartbeat_interval=10, socket_timeout=5, credentials=pika.PlainCredentials(USER, PWD))) # ....
修改后運行依然報錯,后來想想應該單線程被一直占用,pika無法發(fā)送心跳;
于是又加了個心跳線程, 示例如下:
#!/usr/bin #coding: utf-8 import pika import time import logging import threading USER = 'guest' PWD = 'guest' TEST_QUEUE = 'just4test' class Heartbeat(threading.Thread): def __init__(self, connection): super(Heartbeat, self).__init__() self.lock = threading.Lock() self.connection = connection self.quitflag = False self.stopflag = True self.setDaemon(True) def run(self): while not self.quitflag: time.sleep(10) self.lock.acquire() if self.stopflag : self.lock.release() continue try: self.connection.process_data_events() except Exception as ex: logging.warn("Error format: %s"%(str(ex))) self.lock.release() return self.lock.release() def startHeartbeat(self): self.lock.acquire() if self.quitflag==True: self.lock.release() return self.stopflag=False self.lock.release() def callback(ch, method, properties, body): logging.info("recv_body:%s" % body) time.sleep(600) ch.basic_ack(delivery_tag = method.delivery_tag) def test_main(): s_conn = pika.BlockingConnection( pika.ConnectionParameters('127.0.0.1', heartbeat_interval=10, socket_timeout=5, credentials=pika.PlainCredentials(USER, PWD))) chan = s_conn.channel() chan.queue_declare(queue=TEST_QUEUE) chan.basic_consume(callback, queue=TEST_QUEUE) heartbeat = Heartbeat(s_conn) heartbeat.start() #開啟心跳線程 heartbeat.startHeartbeat() chan.start_consuming() if __name__ == "__main__": test_main()
嘗試運行,結果還是不行,不得不安靜下來思考自己是不是想錯了。
去看它的api,看到heartbeat_interval的解析:
:param int heartbeat_interval: How often to send heartbeats. Min between this value and server's proposal will be used. Use 0 to deactivate heartbeats and None to accept server's proposal.
按這樣說法,應該還是沒有把心跳值給設置好。上面的程序期望是10秒發(fā)一次心跳,但是理論上發(fā)送心跳的間隔會比10秒多一點。所以艾瑪,我應該是把heartbeat_interval的作用搞錯了, 它是指超過這個時間間隔不發(fā)心跳或不給server任何信息,server就會斷開連接, 而不是說pika會按這個間隔來發(fā)心跳。 結果我把heartbeat_interval值設置高一點(比實際發(fā)送心跳/信息的間隔更長),比如上面設置成60秒,就正常運行了。
如果不指定heartbeat_interval, 它默認為None, 意味著按rabbitMQ server的配置來檢測心跳是否正常。
如果設置heartbeat_interval=0, 意味著不檢測心跳,server端將不會主動斷開連接。
以上這篇解決python3 pika之連接斷開的問題就是小編分享給大家的全部內(nèi)容了,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關文章
詳解Python對某地區(qū)二手房房價數(shù)據(jù)分析
這篇文章主要為大家介紹了Python數(shù)據(jù)分析某地區(qū)二手房房價,具有一定的參考價值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助2021-12-12Python cookbook(數(shù)據(jù)結構與算法)將序列分解為單獨變量的方法
這篇文章主要介紹了Python cookbook(數(shù)據(jù)結構與算法)將序列分解為單獨變量的方法,結合實例形式分析了Python序列賦值實現(xiàn)的分解成單獨變量功能相關操作技巧,需要的朋友可以參考下2018-02-02python操作redis數(shù)據(jù)庫的三種方法
這篇文章主要介紹了python操作redis數(shù)據(jù)庫的三種方法,幫助大家更好的理解和使用python,感興趣的朋友可以了解下2020-09-09淺析Python 實現(xiàn)一個自動化翻譯和替換的工具
這篇文章主要介紹了Python 實現(xiàn)一個自動化翻譯和替換的工具,非常不錯,具有一定的參考借鑒價值,需要的朋友可以參考下2019-04-04Pytorch使用MNIST數(shù)據(jù)集實現(xiàn)基礎GAN和DCGAN詳解
今天小編就為大家分享一篇Pytorch使用MNIST數(shù)據(jù)集實現(xiàn)基礎GAN和DCGAN詳解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-01-01