使用PHP和RabbitMQ實現(xiàn)消息隊列的延遲功能
前言
今天我們來做個小試驗,用PHP和RabbitMQ實現(xiàn)消息隊列的延遲功能。
前期準備,需要安裝好docker、docker-compose的運行環(huán)境。
需要安裝RabbitMQ的可以看下面這篇文章。
使用PHP和RabbitMQ實現(xiàn)消息隊列功能_php技巧_腳本之家 (jb51.net)
一、安裝RabbitMQ延遲插件
1、打開rabbitmq插件官網(wǎng)。
地址如下:Community Plugins | RabbitMQ
找到對應的延遲插件,rabbitmq_delayed_message_exchange,如下圖所示。
2、進入RabbitMQ容器,下載對應插件,執(zhí)行如下命令。
docker exec -ti rabbitmq bash cd /opt/rabbitmq/plugins/ wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.9.0/rabbitmq_delayed_message_exchange-3.9.0.ez
如下圖所示,找到自己RabbitMQ對應的版本,下載.ez文件。
3、啟用插件,執(zhí)行如下命令。
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
4、重啟RabbitMQ服務。
5、檢查RabbitMQ已啟用哪些插件,執(zhí)行如下命令。
rabbitmq-plugins list -e
正常會返回如下內(nèi)容。
上圖說明延遲插件已啟用。
6、至此,RabbitMQ的延遲插件已安裝完成。
二、安裝php-amqplib
1、安裝php composer,執(zhí)行如下命令。
curl -sS https://getcomposer.org/installer | php -- --install-dir=/usr/local/bin --filename=composer
2、編寫composer.json,內(nèi)容如下,這里下載php-amqplib的版本是3.6。
vim composer.json { "require": { "php-amqplib/php-amqplib": "3.6.*" } }
3、下載包,執(zhí)行如下命令。
composer install
正常情況下,安裝完成的話,當前目錄會多一個vendor目錄,如下圖所示。
4、至此php-amqplib已安裝完成。
三、測試驗證
1、編寫生產(chǎn)者,代碼內(nèi)容如下。
vim producer.php <?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; // 連接到RabbitMQ服務器 $connection = new AMQPStreamConnection('rabbitmq', 5672, 'guest', 'guest'); $channel = $connection->channel(); // 聲明一個具有延遲插件的自定義交換機 $args = new \PhpAmqpLib\Wire\AMQPTable([ 'x-delayed-type' => \PhpAmqpLib\Exchange\AMQPExchangeType::FANOUT // 這里假設我們使用 direct 類型的交換機 ]); $channel->exchange_declare('delayed_exchange', 'x-delayed-message', false, true, false, false, false, $args); $messageBody = 'Hello Max!'; $delay = 5000; // 延遲5秒,單位是毫秒 $headers = new \PhpAmqpLib\Wire\AMQPTable(['x-delay' => $delay]); $message = new AMQPMessage($messageBody, ['delivery_mode' => 2]); $message->set('application_headers', $headers); // 發(fā)布消息到交換機 $channel->basic_publish($message, 'delayed_exchange', 'delayed_key'); echo "Sent {$messageBody} with delay {$delay}ms\n"; $datetime = date('Y/m/d H:i:s'); echo "成功發(fā)送延遲消息 : {$messageBody} , {$datetime} \n"; // 關閉連接 $channel->close(); $connection->close();
2、編寫消費者,代碼內(nèi)容如下。
vim consumer.php <?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; // 連接到RabbitMQ服務器 $connection = new AMQPStreamConnection('rabbitmq', 5672, 'guest', 'guest'); $channel = $connection->channel(); // 聲明一個具有延遲插件的自定義交換機 $args = new \PhpAmqpLib\Wire\AMQPTable([ 'x-delayed-type' => \PhpAmqpLib\Exchange\AMQPExchangeType::FANOUT // 這里假設我們使用 direct 類型的交換機 ]); $channel->exchange_declare('delayed_exchange', 'x-delayed-message', false, true, false, false, false, $args); // 聲明死信隊列 $channel->queue_declare( 'delayed_queue', false, true, false, false, false, new \PhpAmqpLib\Wire\AMQPTable([ 'x-dead-letter-exchange' => 'delayed' ]) ); // 綁定隊列到交換機 $channel->queue_bind('delayed_queue', 'delayed_exchange', 'delayed_key'); echo "正在等待延遲隊列消息, waiting... \n"; $callback = function (AMQPMessage $message) { //$headers = $message->get('application_headers'); //$nativeData = $headers->getNativeData(); echo $message->body . '-------' . date('Y/m/d H:i:s') . "\n"; $message->ack(); }; $channel->basic_consume( 'delayed_queue', '', false, false, false, false, $callback ); while ($channel->is_consuming()) { $channel->wait(); } // 關閉連接 $channel->close(); $connection->close();
3、啟動消費端,執(zhí)行如下命令。
php consumer.php
正常情況會返回如下內(nèi)容,等等消息。
4、運行生產(chǎn)端代,執(zhí)行如下命令。
php producer.php
正常情況會返回如下內(nèi)容。
5、再看消費端接收到的消息,正常返回如下內(nèi)容。
從上面截圖可以看出時間剛好是5秒鐘。發(fā)送時間是08:44:49,消費時間是08:44:54。
6、至此,延遲隊列的測試驗證已完成。
總結
用PHP和RabbitMQ實現(xiàn)消息隊列的延遲功能,其實依靠的是RabbitMQ的一個延遲插件,主要有以下幾個步驟。
1、安裝RabbitMQ延遲插件。
2、安裝PHP的AMQP擴展、php-amqplib代碼包。
3、編寫生產(chǎn)者、消費者進行驗證。
上面的代碼只是做個簡單的示例,如果運用到實際的項目當中需要做進一步的優(yōu)化。
到此這篇關于使用PHP和RabbitMQ實現(xiàn)消息隊列的延遲功能的文章就介紹到這了,更多相關PHP RabbitMQ延遲隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
PHP實現(xiàn)cookie跨域session共享的方法分析
這篇文章主要介紹了PHP實現(xiàn)cookie跨域session共享的方法,結合實例形式分析了php操作cookie的有效期、跨域、session存儲等相關操作技巧,需要的朋友可以參考下2019-08-08PHP刪除二維數(shù)組中相同元素及數(shù)組重復值的方法示例
這篇文章主要介紹了PHP刪除二維數(shù)組中相同元素及數(shù)組重復值的方法,涉及php針對數(shù)組的遍歷、判斷、比較等相關操作技巧,需要的朋友可以參考下2017-05-05php使用ob_start()實現(xiàn)圖片存入變量的方法
這篇文章主要介紹了php使用ob_start()實現(xiàn)圖片存入變量的方法,是對緩存的靈活運用,具有既定的參考借鑒價值,需要的朋友可以參考下2014-11-11PHP數(shù)據(jù)庫操作四:mongodb用法分析
這篇文章主要介紹了PHP數(shù)據(jù)庫操作mongodb用法,結合實例形式較為詳細的分析了MongoDB的功能、安裝、基本命令、使用方法及相關注意事項,需要的朋友可以參考下2017-08-08