亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Python RabbitMQ實現(xiàn)簡單的進程間通信示例

 更新時間:2020年07月02日 11:05:14   作者:Xue__Feng  
這篇文章主要介紹了Python RabbitMQ實現(xiàn)簡單的進程間通信示例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧

RabbitMQ    消息隊列

PY
threading Queue
進程Queue 父進程與子進程,或同一父進程下的多個子進程進行交互
缺點:兩個不同Python文件不能通過上面兩個Queue進行交互

erlong
基于這個語言創(chuàng)建的一種中間商
win中需要先安裝erlong才能使用
rabbitmq_server start

安裝 Python module

pip install pika

or

easy_install pika

or
源碼

rabbit      默認(rèn)端口15672
查看當(dāng)前時刻的隊列數(shù)
rabbitmqctl.bat list_queue

exchange
在定義的時候就是有類型的,決定到底哪些queue符合條件,可以接受消息
fanout:所有bind到此exchange的queue都可以收到消息
direct:通過routingkey和exchange決定唯一的queue可以接受消息
topic: 所有符合routingkey(此時可以是一個表達式)的routingkey所bind的queue都可以接受消息
      表達式符號說明:
      # 代表一個或多個字符     * 代表任何字符

RPC
remote procedure call           雙向傳輸,指令<-------->指令執(zhí)行結(jié)果
實現(xiàn)方法:                        創(chuàng)建兩個隊列,一個隊列收指令,一個隊列發(fā)送執(zhí)行結(jié)果

用rabbitmq實現(xiàn)簡單的生產(chǎn)者消費者模型

1) rabbit_producer.py

# Author : Xuefeng

import pika

connection = pika.BlockingConnection(pika.Connection.Parameters(
 "localhost"
))
# statement a channel
channel = connection.channel()

# create the queue, the name of queue is "hello"
# durable=True can make the queue be exist, although the service have stopped before.
channel.queue_declare(queue="hello", durable=True)

# n RabbitMQ a message can never be sent directly to queue,it always need to go through
channel.basic_publish(exchange = " ",
      routing_key = "hello",
      body = "Hello world!",
      properties = pika.BasicPropreties(
       delivery_mode=2, # make the message persistence
      )
      )
print("[x] sent 'Hello world!'")
connection.close()

2) rabbit_consumer.py

# Author : Xuefeng

import pika

connection = pika.BlockingConnection(pika.Connection.Parameters(
 "localhost"
))
# statement a channel
channel = connection.channel()
channel.queue_declare(queue="hello", durable=True)

def callback(ch, method, properties, body):
 '''
 Handle the recieved data
 :param ch: The address of the channel
 :param method: Information about the connection
 :param properties:
 :param body:
 :return:
 '''
 print("------>", ch, method, properties )
 print("[x] Recieved %r" % body)
 # ack by ourself
 ch.basic_ack(delivery_tag = method.delivery_tag)

# follow is for consumer to auto change with the ability
channel.basic_qos(profetch_count=1)
# no_ack = True represent that the message cannot be transfor to next consumer,
# when the current consumer is stop by accident.
channel.basic_consume(callback,  # If have recieved message, enable the callback() function to handle the message.
      queue = "hello",
      no_ack = True)

print("[*] Waiting for messages. To Exit press CTRL+C")
channel.start_consuming()

用rabbitmq中的fanout模式實現(xiàn)廣播模式

1) fanout_rabbit_publish.py

# Author : Xuefeng

import pika
import sys

# 廣播模式:
# 生產(chǎn)者發(fā)送一條消息,所有的開通鏈接的消費者都可以接收到消息

connection = pika.BlockingConnection(pika.Connection.Parameters(
 "localhost"
))
# statement a channel
channel = connection.channel()
channel.exchange_declare(exchange="logs",
       type="fanout")
message = ' '.join(sys.argv[1:]) or "info:Hello world!"
channel.basic_publish(
 exchange="logs",
 routing_key="",
 body=message
)
print("[x] Send %r" % message)

connection.close()

2) fanout_rabbit_consumer.py

# Author : Xuefeng


import pika
import sys

connection = pika.BlockingConnection(pika.Connection.Parameters(
 "localhost"
))
# statement a channel
channel = connection.channel()
# exclusive 排他,唯一的 隨機生成queue
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
print("Random queue name:", queue_name)

channel.queue_bind(exchange="logs",
     queue=queue_name)


def callback(ch, method, properties, body):
 '''
 Handle the recieved data
 :param ch: The address of the channel
 :param method: Information about the connection
 :param properties:
 :param body:
 :return:
 '''
 print("------>", ch, method, properties )
 print("[x] Recieved %r" % body)
 # ack by ourself
 ch.basic_ack(delivery_tag = method.delivery_tag)

# no_ack = True represent that the message cannot be transfor to next consumer,
# when the current consumer is stop by accident.
channel.basic_consume(callback,  # If have recieved message, enable the callback() function to handle the message.
      queue = "hello",
      no_ack = True)

print("[*] Waiting for messages. To Exit press CTRL+C")
channel.start_consuming()

用rabbitmq中的direct模式實現(xiàn)消息過濾模式

1) direct_rabbit_publisher.py

# Author : Xuefeng
import pika
import sys

# 消息過濾模式:
# 生產(chǎn)者發(fā)送一條消息,通過severity優(yōu)先級來確定是否可以接收到消息

connection = pika.BlockingConnection(pika.Connection.Parameters(
 "localhost"
))
# statement a channel
channel = connection.channel()
channel.exchange_declare(exchange="direct_logs",
       type="direct")
severity = sys.argv[1] if len(sys.argv) > 1 else "info"
message = ' '.join(sys.argv[2:]) or "info:Hello world!"

channel.basic_publish(
 exchange="direct_logs",
 routing_key=severity,
 body=message
)
print("[x] Send %r:%r" % (severity, message))

connection.close()

2) direct_rabbit_consumer.py

# Author : Xuefeng

import pika
import sys

connection = pika.BlockingConnection(pika.Connection.Parameters(
 "localhost"
))
# statement a channel
channel = connection.channel()

channel.exchange_declare(exchange="direct_logs",
       type="direct")

# exclusive 排他,唯一的 隨機生成queue
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
print("Random queue name:", queue_name)

severities = sys.argv[1:]
if not severities:
 sys.stderr.write("Usage:%s [info] [warning] [error]\n" % sys.argv[0])
 sys.exit(1)

for severity in severities:
 channel.queue_bind(exchange="direct_logs",
      queue=queue_name,
      routing_key=severity)
 


def callback(ch, method, properties, body):
 '''
 Handle the recieved data
 :param ch: The address of the channel
 :param method: Information about the connection
 :param properties:
 :param body:
 :return:
 '''
 print("------>", ch, method, properties )
 print("[x] Recieved %r" % body)
 # ack by ourself
 ch.basic_ack(delivery_tag = method.delivery_tag)

# no_ack = True represent that the message cannot be transfor to next consumer,
# when the current consumer is stop by accident.
channel.basic_consume(callback,  # If have recieved message, enable the callback() function to handle the message.
      queue = "hello",
      no_ack = True)

print("[*] Waiting for messages. To Exit press CTRL+C")
channel.start_consuming() 


用rabbitmq中的topic模式實現(xiàn)細(xì)致消息過濾模式

1) topic_rabbit_publisher.py

# Author : Xuefeng

import pika
import sys

# 消息細(xì)致過濾模式:
# 生產(chǎn)者發(fā)送一條消息,通過運行腳本 *.info 等確定接收消息類型進行對應(yīng)接收
connection = pika.BlockingConnection(pika.Connection.Parameters(
 "localhost"
))
# statement a channel
channel = connection.channel()
channel.exchange_declare(exchange="topic_logs",
       type="topic")
binding_key = sys.argv[1] if len(sys.argv) > 1 else "info"
message = ' '.join(sys.argv[2:]) or "info:Hello world!"

channel.basic_publish(
 exchange="topic_logs",
 routing_key=binding_key,
 body=message
)
print("[x] Send %r:%r" % (binding_key, message))

connection.close()

2) topic_rabbit_consumer.py

# Author : Xuefeng

import pika
import sys

connection = pika.BlockingConnection(pika.Connection.Parameters(
 "localhost"
))
# statement a channel
channel = connection.channel()

channel.exchange_declare(exchange="topic_logs",
       type="topic")

# exclusive 排他,唯一的 隨機生成queue
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
print("Random queue name:", queue_name)

binding_keys = sys.argv[1:]
if not binding_keys:
 sys.stderr.write("Usage:%s [info] [warning] [error]\n" % sys.argv[0])
 sys.exit(1)

for binding_key in binding_keys:
 channel.queue_bind(exchange="topic_logs",
      queue=queue_name,
      routing_key=binding_key)


def callback(ch, method, properties, body):
 '''
 Handle the recieved data
 :param ch: The address of the channel
 :param method: Information about the connection
 :param properties:
 :param body:
 :return:
 '''
 print("------>", ch, method, properties)
 print("[x] Recieved %r" % body)
 # ack by ourself
 ch.basic_ack(delivery_tag=method.delivery_tag)


# no_ack = True represent that the message cannot be transfor to next consumer,
# when the current consumer is stop by accident.
channel.basic_consume(callback, # If have recieved message, enable the callback() function to handle the message.
      queue="hello",
      no_ack=True)

print("[*] Waiting for messages. To Exit press CTRL+C")
channel.start_consuming()

用rabbitmq實現(xiàn)rpc操作

1) Rpc_rabbit_client.py

# Author : Xuefeng

import pika
import time
import uuid

class FibonacciRpcClient(object):
 def __init__(self):
  self.connection = pika.BlockingConnection(pika.Connection.Parameters(
 "localhost"))
  self.channel = self.connection.channel()
  result = self.channel.queue_declare(exclusive=True)
  self.callback_queue = result.method.queue  # 隨機的生成一個接收命令執(zhí)行結(jié)果的隊列
  self.channel.basic_consume(self.on_response, # 只要收到消息就調(diào)用
         no_ack=True,
         queue=self.callback_queue)

 def on_response(self, ch, method, props, body):
  if self.corr_id == props.correlation_id:
   self.response = body

 def call(self,n):
  self.response = None
  self.corr_id = str(uuid.uuid4())
  self.channel.basic_publish(
   exchange="",
   routing_key="rpc_queue",
   properties=pika.BasicPropreties(
    rely_to=self.callback_queue,
    correlation_id=self.corr_id   # 通過隨機生成的ID來驗證指令執(zhí)行結(jié)果與指令的匹配性
   ),
   body=str(n)
  )
  while self.response is None:
   self.connection.process_data_events() # 非阻塞版的start_consume,有沒有消息都繼續(xù)
   print("no message...")
   time.sleep(0.5)
  return int(self.response)

fibonacci_rcp = FibonacciRpcClient()

print("[x] Requesting fib(30)")
response = fibonacci_rcp.call(30)
print("[x] Rec %r" % response)

2) Rpc_rabbit_server.py

# Author : Xuefeng

import pika
import sys

connection = pika.BlockingConnection(pika.Connection.Parameters(
 "localhost"
))
# statement a channel
channel = connection.channel()

channel.queue_declare(queue="rpc_queue")

def fib(n):
 if n == 0:
  return 0
 elif n == 1:
  return 1
 else:
  return fib(n-1)+fib(n-2)

def on_request(ch, method, props, body):
 n = int(body)
 print("[.] fib(%s)" % n)
 response = fib(n)
 ch.basic_publish(
  exchange="",
  routing_key=props.rely_to,
  properties=pika.BasicPropreties(correlation_id=\
          props.correlation),
  body = str(body)
 )
 ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue="rpc_queue")

print("[x] Awaiting RPC requests")
channel.start_consumeing()



channel.exchange_declare(exchange="direct_logs",
       type="direct")

# exclusive 排他,唯一的 隨機生成queue
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
print("Random queue name:", queue_name)

severities = sys.argv[1:]

到此這篇關(guān)于Python RabbitMQ實現(xiàn)簡單的進程間通信示例的文章就介紹到這了,更多相關(guān)Python RabbitMQ進程間通信內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • OpenCV+Python3.5 簡易手勢識別的實現(xiàn)

    OpenCV+Python3.5 簡易手勢識別的實現(xiàn)

    這篇文章主要介紹了OpenCV+Python3.5 簡易手勢識別的實現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-12-12
  • 人臉識別實戰(zhàn)之Opencv+SVM實現(xiàn)人臉識別

    人臉識別實戰(zhàn)之Opencv+SVM實現(xiàn)人臉識別

    這篇文章主要介紹了通過Opencv+SVM實現(xiàn)人臉識別功能,文中的示例代碼介紹詳細(xì),對于我們學(xué)習(xí)人臉識別和OpenCV都有一定的幫助,感興趣的小伙伴可以學(xué)習(xí)一下
    2021-12-12
  • python調(diào)用DLL與EXE文件截屏對比分析

    python調(diào)用DLL與EXE文件截屏對比分析

    這篇文章主要為大家介紹了python調(diào)用DLL與EXE文件截屏對比分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2021-10-10
  • Python正則表達式re模塊講解以及其案例舉例

    Python正則表達式re模塊講解以及其案例舉例

    Python中re模塊主要功能是通過正則表達式是用來匹配處理字符串的 ,下面這篇文章主要給大家介紹了關(guān)于Python正則表達式re模塊講解以及其案例舉例的相關(guān)資料,文中通過實例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2022-09-09
  • Python實現(xiàn)pdf文檔轉(zhuǎn)txt的方法示例

    Python實現(xiàn)pdf文檔轉(zhuǎn)txt的方法示例

    這篇文章主要介紹了Python實現(xiàn)pdf文檔轉(zhuǎn)txt的方法,結(jié)合實例形式分析了Python基于第三方庫pdfminier實現(xiàn)針對pdf格式文檔的讀取、轉(zhuǎn)換等相關(guān)操作技巧,需要的朋友可以參考下
    2018-01-01
  • 如何使用 Flask 做一個評論系統(tǒng)

    如何使用 Flask 做一個評論系統(tǒng)

    這篇文章主要介紹了如何使用 Flask 做一個評論系統(tǒng),幫助大家更好的理解和使用flask框架進行python web開發(fā),感興趣的朋友可以了解下
    2020-11-11
  • Python轉(zhuǎn)換itertools.chain對象為數(shù)組的方法

    Python轉(zhuǎn)換itertools.chain對象為數(shù)組的方法

    這篇文章主要介紹了Python轉(zhuǎn)換itertools.chain對象為數(shù)組的方法,通過代碼給大家介紹了itertools 的 chain() 方法,需要的朋友可以參考下
    2020-02-02
  • python之DataFrame實現(xiàn)excel合并單元格

    python之DataFrame實現(xiàn)excel合并單元格

    這篇文章主要為大家詳細(xì)介紹了python之DataFrame實現(xiàn)excel合并單元格,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2018-04-04
  • Python 數(shù)值區(qū)間處理_對interval 庫的快速入門詳解

    Python 數(shù)值區(qū)間處理_對interval 庫的快速入門詳解

    今天小編就為大家分享一篇Python 數(shù)值區(qū)間處理_對interval 庫的快速入門詳解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2018-11-11
  • 使用rst2pdf實現(xiàn)將sphinx生成PDF

    使用rst2pdf實現(xiàn)將sphinx生成PDF

    這篇文章主要介紹了使用rst2pdf實現(xiàn)將sphinx生成PDF的相關(guān)資料,以及使用過程用遇到的錯誤的處理方法,非常的全面,需要的朋友可以參考下
    2016-06-06

最新評論