亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

python隊列通信:rabbitMQ的使用(實例講解)

 更新時間:2017年12月22日 15:55:19   作者:ywq935  
下面小編就為大家分享一篇python隊列通信:rabbitMQ的使用(實例講解),具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧

(一)、前言

為什么引入消息隊列?

1.程序解耦

2.提升性能

3.降低多業(yè)務(wù)邏輯復(fù)雜度

(二)、python操作rabbit mq

rabbitmq配置安裝基本使用參見上節(jié)文章,不再復(fù)述。

若想使用python操作rabbitmq,需安裝pika模塊,直接pip安裝:

pip install pika

1.最簡單的rabbitmq producer端與consumer端對話:

producer:

#Author :ywq
import pika
auth=pika.PlainCredentials('ywq','qwe') #save auth indo
connection = pika.BlockingConnection(pika.ConnectionParameters(
  '192.168.0.158',5672,'/',auth)) #connect to rabbit
channel = connection.channel() #create channel
channel.queue_declare(queue='hello') #declare queue
#n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',
   routing_key='hello',
   body='Hello World!') #the body is the msg content
print(" [x] Sent 'Hello World!'")
connection.close()

consumer:

#Author :ywq
import pika
auth=pika.PlainCredentials('ywq','qwe') #auth info
connection = pika.BlockingConnection(pika.ConnectionParameters(
  '192.168.0.158',5672,'/',auth)) #connect to rabbit
channel = connection.channel()  #create channel

channel.queue_declare(queue='hello') #decalre queue
def callback(ch, method, properties, body):
 print(" [x] Received %r" % body)

channel.basic_consume(callback,
   queue='hello',
   no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

消息傳遞消費過程中,可以在rabbit web管理頁面實時查看隊列消息信息。

2.持久化的消息隊列,避免宕機等意外情況造成消息隊列丟失。

consumer端無需改變,在producer端代碼內(nèi)加上兩個屬性,分別使消息持久化、隊列持久化,只選其一還是會出現(xiàn)消息丟失,必須同時開啟:

delivery_mode=2 #make msg persisdent
durable=True

屬性插入位置見如下代碼(producer端):

#Author :ywq
import pika,sys
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
  '192.168.0.158',5672,'/',auth_info
 ))
channel=connection.channel()
channel.queue_declare(queue='test1',durable=True) #durable=Ture, make queue persistent

msg=''.join(sys.argv[1:]) or 'Hello'
channel.basic_publish(
 exchange='',
 routing_key='test1',
 body=msg,
 properties=pika.BasicProperties(
  delivery_mode=2 #make msg persisdent
 )
)

print('Send done:',msg)
connection.close()

3.公平分發(fā)

在多consumer的情況下,默認rabbit是輪詢發(fā)送消息的,但有的consumer消費速度快,有的消費速度慢,為了資源使用更平衡,引入ack確認機制。consumer消費完消息后會給rabbit發(fā)送ack,一旦未ack的消息數(shù)量超過指定允許的數(shù)量,則不再往該consumer發(fā)送,改為發(fā)送給其他consumer。

producer端代碼不用改變,需要給consumer端代碼插入兩個屬性:

channel.basic_qos(prefetch_count= *) #define the max non_ack_count
channel.basic_ack(delivery_tag=deliver.delivery_tag) #send ack to rabbitmq

屬性插入位置見如下代碼(consumer端):

#Author :ywq
import pika,time
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
 '192.168.0.158',5672,'/',auth_info
 )
)
channel=connection.channel()
channel.queue_declare(queue='test2',durable=True)
def callback(chann,deliver,properties,body):
 print('Recv:',body)
 time.sleep(5)
 chann.basic_ack(delivery_tag=deliver.delivery_tag) #send ack to rabbit
channel.basic_qos(prefetch_count=1)
'''
注意,no_ack=False 注意,這里的no_ack類型僅僅是告訴rabbit該消費者隊列是否返回ack,若要返回ack,需要在callback內(nèi)定義
prefetch_count=1,未ack的msg數(shù)量超過1個,則此consumer不再接受msg,此配置需寫在channel.basic_consume上方,否則會造成non_ack情況出現(xiàn)。
'''
channel.basic_consume(
 callback,
 queue='test2'
)

channel.start_consuming()

三、消息發(fā)布/訂閱

上方的幾種模式都是producer端發(fā)送一次,則consumer端接收一次,能不能實現(xiàn)一個producer發(fā)送,多個關(guān)聯(lián)的consumer同時接收呢?of course,rabbit支持消息發(fā)布訂閱,共支持三種模式,通過組件exchange轉(zhuǎn)發(fā)器,實現(xiàn)3種模式:

fanout: 所有bind到此exchange的queue都可以接收消息,類似廣播。

direct: 通過routingKey和exchange決定的哪個唯一的queue可以接收消息,推送給綁定了該queue的consumer,類似組播。

topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息,類似前綴列表匹配路由。

1.fanout

publish端(producer):

#Author :ywq
import pika,sys,time
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
 '192.168.0.158',5672,'/',auth_info
 )
)
channel=connection.channel()
channel.exchange_declare(exchange='hello',
    exchange_type='fanout'
    )
msg=''.join(sys.argv[1:]) or 'Hello world %s' %time.time()
channel.basic_publish(
 exchange='hello',
 routing_key='',
 body=msg,
 properties=pika.BasicProperties(
 delivery_mode=2
 )
)
print('send done')
connection.close()

subscribe端(consumer):

#Author :ywq
import pika
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
 '192.168.0.158',5672,'/',auth_info
 )
)
channel=connection.channel()
channel.exchange_declare(
 exchange='hello',
 exchange_type='fanout'
)
random_num=channel.queue_declare(exclusive=True) #隨機與rabbit建立一個queue,comsumer斷開后,該queue立即刪除釋放
queue_name=random_num.method.queue
channel.basic_qos(prefetch_count=1)
channel.queue_bind(
 queue=queue_name,
 exchange='hello'
)
def callback(chann,deliver,properties,body):
 print('Recv:',body)
 chann.basic_ack(delivery_tag=deliver.delivery_tag) #send ack to rabbit

channel.basic_consume(
 callback,
 queue=queue_name,
)
channel.start_consuming()

實現(xiàn)producer一次發(fā)送,多個關(guān)聯(lián)consumer接收。

使用exchange模式時:

1.producer端不再申明queue,直接申明exchange

2.consumer端仍需綁定隊列并指定exchange來接收message

3.consumer最好創(chuàng)建隨機queue,使用完后立即釋放。

隨機隊列名在web下可以檢測到:

2.direct

使用exchange同時consumer有選擇性的接收消息。隊列綁定關(guān)鍵字,producer將數(shù)據(jù)根據(jù)關(guān)鍵字發(fā)送到消息exchange,exchange根據(jù) 關(guān)鍵字 判定應(yīng)該將數(shù)據(jù)發(fā)送至指定隊列,consumer相應(yīng)接收。即在fanout基礎(chǔ)上增加了routing key.

producer:

#Author :ywq
import pika,sys
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
 '192.168.0.158',5672,'/',auth_info
 )
)
channel=connection.channel()
channel.exchange_declare(exchange='direct_log',
   exchange_type='direct',
   )
while True:
 route_key=input('Input routing key:')
 msg=''.join(sys.argv[1:]) or 'Hello'
 channel.basic_publish(
 exchange='direct_log',
 routing_key=route_key,
 body=msg,
 properties=pika.BasicProperties(
  delivery_mode=2
 )
 )
connection.close()

consumer:

#Author :ywq
import pika,sys
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
 '192.168.0.158',5672,'/',auth_info
))
channel=connection.channel()
channel.exchange_declare(
 exchange='direct_log',
 exchange_type='direct'
)
queue_num=channel.queue_declare(exclusive=True)
queue_name=queue_num.method.queue
route_key=input('Input routing key:')

channel.queue_bind(
 queue=queue_name,
 exchange='direct_log',
 routing_key=route_key
)
def callback(chann,deliver,property,body):
 print('Recv:[level:%s],[msg:%s]' %(route_key,body))
 chann.basic_ack(delivery_tag=deliver.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(
 callback,
 queue=queue_name
)
channel.start_consuming()

同時開啟多個consumer,其中兩個接收notice,兩個接收warning,運行效果如下:

3.topic

相較于direct,topic能實現(xiàn)模糊匹配式工作方式(在consumer端指定匹配方式),只要routing key包含指定的關(guān)鍵字,則將該msg發(fā)往綁定的queue上。

rabbitmq通配符規(guī)則:

符號“#”匹配一個或多個詞,符號“”匹配一個詞。因此“abc.#”能夠匹配到“abc.m.n”,但是“abc.*‘' 只會匹配到“abc.m”?!?'號為分割符。使用通配符匹配時必須使用‘.'號分割。

producer:

#Author :ywq
import pika,sys
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
 '192.168.0.158',5672,'/',auth_info
 )
)
channel=connection.channel()
channel.exchange_declare(exchange='topic_log',
   exchange_type='topic',
   )
while True:
 route_key=input('Input routing key:')
 msg=''.join(sys.argv[1:]) or 'Hello'
 channel.basic_publish(
 exchange='topic_log',
 routing_key=route_key,
 body=msg,
 properties=pika.BasicProperties(
  delivery_mode=2
 )
 )
connection.close()

consumer:

#Author :ywq
import pika,sys
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
 '192.168.0.158',5672,'/',auth_info
))
channel=connection.channel()
channel.exchange_declare(
 exchange='topic_log',
 exchange_type='topic'
)
queue_num=channel.queue_declare(exclusive=True)
queue_name=queue_num.method.queue
route_key=input('Input routing key:')

channel.queue_bind(
 queue=queue_name,
 exchange='topic_log',
 routing_key=route_key
)
def callback(chann,deliver,property,body):
 print('Recv:[type:%s],[msg:%s]' %(route_key,body))
 chann.basic_ack(delivery_tag=deliver.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(
 callback,
 queue=queue_name
)
channel.start_consuming()

運行效果:

rabbitmq三種publish/subscribe模型簡單介紹完畢。

以上這篇python隊列通信:rabbitMQ的使用(實例講解)就是小編分享給大家的全部內(nèi)容了,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • python用folium繪制地圖并設(shè)置彈窗效果

    python用folium繪制地圖并設(shè)置彈窗效果

    這篇文章主要介紹了python用folium繪制地圖并設(shè)置彈窗,本文通過實例代碼給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-09-09
  • python使用MkDocs自動生成文檔的操作方法

    python使用MkDocs自動生成文檔的操作方法

    python代碼注釋風(fēng)格有很多,比較主流的有 reStructuredText風(fēng)格、numpy風(fēng)格、Google風(fēng)格,自動生成文檔的工具也有很多,常見的有:Pydocs,Sphinx和MkDocs,本文給大家介紹了python使用MkDocs自動生成文檔的操作方法,需要的朋友可以參考下
    2024-06-06
  • python數(shù)據(jù)結(jié)構(gòu)之二叉樹的統(tǒng)計與轉(zhuǎn)換實例

    python數(shù)據(jù)結(jié)構(gòu)之二叉樹的統(tǒng)計與轉(zhuǎn)換實例

    這篇文章主要介紹了python數(shù)據(jù)結(jié)構(gòu)之二叉樹的統(tǒng)計與轉(zhuǎn)換實例,例如統(tǒng)計二叉樹的葉子、分支節(jié)點,以及二叉樹的左右兩樹互換等,需要的朋友可以參考下
    2014-04-04
  • 用python3讀取python2的pickle數(shù)據(jù)方式

    用python3讀取python2的pickle數(shù)據(jù)方式

    今天小編就為大家分享一篇用python3讀取python2的pickle數(shù)據(jù)方式,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2019-12-12
  • python實現(xiàn)sm2和sm4國密(國家商用密碼)算法的示例

    python實現(xiàn)sm2和sm4國密(國家商用密碼)算法的示例

    這篇文章主要介紹了python實現(xiàn)sm2和sm4國密(國家商用密碼)算法的示例,幫助大家使用python加密文件,感興趣的朋友可以了解下
    2020-09-09
  • Python實現(xiàn)按學(xué)生年齡排序的實際問題詳解

    Python實現(xiàn)按學(xué)生年齡排序的實際問題詳解

    這篇文章主要給大家介紹了關(guān)于Python實現(xiàn)按學(xué)生年齡排序?qū)嶋H問題的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面跟著小編來一起學(xué)習(xí)學(xué)習(xí)吧。
    2017-08-08
  • 你需要掌握的20個Python常用技巧

    你需要掌握的20個Python常用技巧

    Python的可讀性和簡單性是其廣受歡迎的兩大原因,本文介紹20個常用的Python技巧來提高代碼的可讀性,并能幫助你節(jié)省大量時間,需要的可以參考一下
    2022-02-02
  • 解決Python報錯問題[SSL:?SSLV3_ALERT_HANDSHAKE_FAILURE]

    解決Python報錯問題[SSL:?SSLV3_ALERT_HANDSHAKE_FAILURE]

    這篇文章主要介紹了解決Python報錯問題[SSL:?SSLV3_ALERT_HANDSHAKE_FAILURE],具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-07-07
  • 使用PySpark實現(xiàn)數(shù)據(jù)清洗與JSON格式轉(zhuǎn)換的實踐詳解

    使用PySpark實現(xiàn)數(shù)據(jù)清洗與JSON格式轉(zhuǎn)換的實踐詳解

    在大數(shù)據(jù)處理中,PySpark?提供了強大的工具來處理海量數(shù)據(jù),特別是在數(shù)據(jù)清洗和轉(zhuǎn)換方面,本文將介紹如何使用?PySpark?進行數(shù)據(jù)清洗,并將數(shù)據(jù)格式轉(zhuǎn)換為?JSON?格式的實踐,感興趣的可以了解下
    2023-12-12
  • Python內(nèi)置函數(shù)ord()的實現(xiàn)示例

    Python內(nèi)置函數(shù)ord()的實現(xiàn)示例

    ord()函數(shù)是用于返回字符的Unicode碼點,適用于處理文本和國際化應(yīng)用,它只能處理單個字符,超過一字符或非字符串類型會引發(fā)TypeError,示例代碼展示了如何使用ord()進行字符轉(zhuǎn)換和比較
    2024-09-09

最新評論