亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

PHP實現(xiàn)RabbitMQ消息列隊的示例代碼

 更新時間:2022年05月10日 16:53:03   作者:PHP開源社區(qū)  
眾所周知,php本身的運行效率存在一定的缺陷,所以如果有一個很復(fù)雜很耗時的業(yè)務(wù)時,必須開發(fā)一個常駐內(nèi)存的程序。本文將利用PHP實現(xiàn)RabbitMQ消息列隊,感興趣的可以了解一下

業(yè)務(wù)場景

項目公司是主php做開發(fā)的,框架為thinkphp。眾所周知,php本身的運行效率存在一定的缺陷,所以如果有一個很復(fù)雜很耗時的業(yè)務(wù)時,必須開發(fā)一個常駐內(nèi)存的程序。首先我想到了php的workerman與swoole,但是這里應(yīng)上面的標(biāo)題哈,想將耗時任務(wù)交給另一個服務(wù)器,同時列隊處理。所以這里我想獨立部署一個rabbitMQ服務(wù)器用于處理列隊任務(wù)。

當(dāng)rabbitMQ服務(wù)器我們準(zhǔn)備好了,建立了一個持久化命名為ceshi的列隊,如下:

項目上生產(chǎn)者和消費者的開發(fā)我這里全部采用tinkphp6+workerman,為便于管理。這里這么做也是因為發(fā)現(xiàn)workerman中對rabbitMQ的文檔解釋太少了!

所以開始踩坑!

1、首先部署好thinkphp6框架

過程去看thinkphp6手冊

2、安裝workerman擴展

過程去看thinkphp6手冊

3、生產(chǎn)者

配置一個workerman類

創(chuàng)建的Send類代碼如下:

<?php

namespace app\workerman;
use Bunny\Channel;
use Workerman\RabbitMQ\Client;
use think\worker\Server;
class Send extends Server
{
    //websocket地址,一會用于測試。
    protected $socket = 'websocket://127.0.0.1:2345';

    /**
     * 收到信息
     * @param $connection
     * @param $data
     */
    public function onMessage($connection, $data)
{
        //websocket發(fā)送過來的消息
        $connection->send('我收到你的信息了:'.$data);
        //rabbitMQ配置
        $options = [
            'host'=>'127.0.0.1',//rabbitMQ IP
            'port'=>5672,//rabbitMQ 通訊端口
            'user'=>'admin',//rabbitMQ 賬號
            'password'=>'123456'//rabbitMQ 密碼
        ];
        (new Client($options))->connect()->then(function (Client $client) {
            return $client->channel();
        })->then(function (Channel $channel) {
            /**
             * 創(chuàng)建隊列(Queue)
             * name: ceshi         // 隊列名稱
             * passive: false      // 如果設(shè)置true存在則返回OK,否則就報錯。設(shè)置false存在返回OK,不存在則自動創(chuàng)建
             * durable: true       // 是否持久化,設(shè)置false是存放到內(nèi)存中RabbitMQ重啟后會丟失,
             *                        設(shè)置true則代表是一個持久的隊列,服務(wù)重啟之后也會存在,因為服務(wù)會把持久化的Queue存放在硬盤上,當(dāng)服務(wù)重啟的時候,會重新加載之前被持久化的Queue
             * exclusive: false    // 是否排他,指定該選項為true則隊列只對當(dāng)前連接有效,連接斷開后自動刪除
             *  auto_delete: false // 是否自動刪除,當(dāng)最后一個消費者斷開連接之后隊列是否自動被刪除
             */
            return $channel->queueDeclare('ceshi', false, true, false, false)->then(function () use ($channel) {
                return $channel;
            });
        })->then(function (Channel $channel) use($data){
            echo "發(fā)送消息內(nèi)容:".$data."\n";

            /**
             * 發(fā)送消息
             * body 發(fā)送的數(shù)據(jù)
             * headers 數(shù)據(jù)頭,建議 ['content_type' => 'text/plain'],這樣消費端是springboot注解接收直接是字符串類型
             * exchange 交換器名稱
             * routingKey 路由key
             * mandatory
             * immediate
             * @return bool|PromiseInterface|int
             */

            return $channel->publish($data, ['content_type' => 'text/plain'], '', 'ceshi')->then(function () use ($channel) {
                return $channel;
            });
        })->then(function (Channel $channel) {
            //echo " [x] Sent 'Hello World!'\n";
            $client = $channel->getClient();
            return $channel->close()->then(function () use ($client) {
                return $client;
            });
        })->then(function (Client $client) {
            $client->disconnect();
        });
    }

    /**
     * 當(dāng)連接建立時觸發(fā)的回調(diào)函數(shù)
     * @param $connection
     */
    public function onConnect($connection)
{

    }

    /**
     * 當(dāng)連接斷開時觸發(fā)的回調(diào)函數(shù)
     * @param $connection
     */
    public function onClose($connection)
{

    }
    /**
     * 當(dāng)客戶端的連接上發(fā)生錯誤時觸發(fā)
     * @param $connection
     * @param $code
     * @param $msg
     */
    public function onError($connection, $code, $msg)
{
        echo "error $code $msg\n";
    }

    /**
     * 每個進程啟動
     * @param $worker
     */
    public function onWorkerStart($worker)
{


    }
}

上述都OK以后咱們可以項目路徑下通過命令啟動這個生產(chǎn)者:

php think worker:server

測試發(fā)送數(shù)據(jù):

通過這個網(wǎng)站

連接【ws://127.0.0.1:2345】后發(fā)送數(shù)據(jù)!

前往rabbitMQ控制臺

列隊中有一條消息產(chǎn)生并且等待了!

這個時候你可能問,如果我發(fā)送數(shù)據(jù)不想通過ws發(fā)送而是接口發(fā)送怎么辦?

笨思路唄:接口給內(nèi)置服務(wù)器發(fā)消息->內(nèi)置服務(wù)去發(fā)消息給rabbitMQ

將協(xié)議改為tcp

然后重新啟動服務(wù)

然后去tp6創(chuàng)建一個路由接口

接口代碼

<?php
namespace app\controller;

use app\BaseController;

class Index extends BaseController
{
    public function index(string $msg)
{
        //連接本地tcp服務(wù)
        $client = stream_socket_client('tcp://127.0.0.1:2345', $errno, $errmsg, 1);
        //發(fā)送字符串
        fwrite($client, $msg."\n");
        //斷開服務(wù)
        fclose($client);
        return 'OK';
    }

}

執(zhí)行結(jié)果:

說明接口成功的將數(shù)據(jù)發(fā)送給了本地內(nèi)置的tcp服務(wù)。

同時,內(nèi)置服務(wù)將收到的數(shù)據(jù)給了rabbitMQ服務(wù)列隊中。

生產(chǎn)者完成。

4、消費者

同生產(chǎn)者一樣新創(chuàng)建一個thinkphp6及安裝workerman擴展,注意端口別和生產(chǎn)者沖突!這里我設(shè)置的是2346端口

創(chuàng)建的Receive類代碼如下:

<?php

namespace app\workerman;
use Bunny\Channel;
use Bunny\Message;
use Workerman\RabbitMQ\Client;
use think\worker\Server;
class Receive extends Server
{
    protected $socket = 'tcp://127.0.0.1:2346';

    /**
     * 收到信息
     * @param $connection
     * @param $data
     */
    public function onMessage($connection, $data)
{

    }

    /**
     * 當(dāng)連接建立時觸發(fā)的回調(diào)函數(shù)
     * @param $connection
     */
    public function onConnect($connection)
{

    }

    /**
     * 當(dāng)連接斷開時觸發(fā)的回調(diào)函數(shù)
     * @param $connection
     */
    public function onClose($connection)
{

    }
    /**
     * 當(dāng)客戶端的連接上發(fā)生錯誤時觸發(fā)
     * @param $connection
     * @param $code
     * @param $msg
     */
    public function onError($connection, $code, $msg)
{
        echo "error $code $msg\n";
    }

    /**
     * 每個進程啟動
     * @param $worker
     */
    public function onWorkerStart($worker)
{
        //rabbitMQ配置
        $options = [
            'host'=>'127.0.0.1',//rabbitMQ IP
            'port'=>5672,//rabbitMQ 通訊端口
            'user'=>'admin',//rabbitMQ 賬號
            'password'=>'123456'//rabbitMQ 密碼
        ];
        (new Client($options))->connect()->then(function (Client $client) {
            return $client->channel();
        })->then(function (Channel $channel) {
            /**
             * 創(chuàng)建隊列(Queue)
             * name: ceshi         // 隊列名稱
             * passive: false      // 如果設(shè)置true存在則返回OK,否則就報錯。設(shè)置false存在返回OK,不存在則自動創(chuàng)建
             * durable: true       // 是否持久化,設(shè)置false是存放到內(nèi)存中RabbitMQ重啟后會丟失,
             *                        設(shè)置true則代表是一個持久的隊列,服務(wù)重啟之后也會存在,因為服務(wù)會把持久化的Queue存放在硬盤上,當(dāng)服務(wù)重啟的時候,會重新加載之前被持久化的Queue
             * exclusive: false    // 是否排他,指定該選項為true則隊列只對當(dāng)前連接有效,連接斷開后自動刪除
             *  auto_delete: false // 是否自動刪除,當(dāng)最后一個消費者斷開連接之后隊列是否自動被刪除
             */
            return $channel->queueDeclare('ceshi', false, true, false, false)->then(function () use ($channel) {
                return $channel;
            });
        })->then(function (Channel $channel) {
            echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
            $channel->consume(
                function (Message $message, Channel $channel, Client $client) {
                    echo "接收消息內(nèi)容:", $message->content, "\n";
                },
                'ceshi',
                '',
                false,
                true
            );
        });

    }
}

都OK以后咱們可以項目路徑下通過命令啟動這個消費者:

php think worker:server

此時應(yīng)該會自動消費掉rabbitMQ中等待的消息!

到這里消費者也就結(jié)束啦!

5、整體測試

接下來我用cmd來啟動兩個服務(wù),然后用接口發(fā)送消息和消費測試!

至于具體怎么靈活應(yīng)用自行開拓大腦哦~

比如php項目有些業(yè)務(wù)吃力,可以去做個java的消費端,讓java來完成任務(wù)~

以上就是PHP實現(xiàn)RabbitMQ消息列隊的示例代碼的詳細內(nèi)容,更多關(guān)于PHP RabbitMQ消息列隊的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • php實現(xiàn)異步將遠程鏈接上內(nèi)容(圖片或內(nèi)容)寫到本地的方法

    php實現(xiàn)異步將遠程鏈接上內(nèi)容(圖片或內(nèi)容)寫到本地的方法

    這篇文章主要介紹了php實現(xiàn)異步將遠程鏈接上內(nèi)容(圖片或內(nèi)容)寫到本地的方法,涉及php基于curl進行遠程文件傳輸?shù)南嚓P(guān)操作技巧,需要的朋友可以參考下
    2016-11-11
  • PHP上傳圖片進行等比縮放可增加水印功能

    PHP上傳圖片進行等比縮放可增加水印功能

    PHP上傳圖片進行等比縮放,大家可以自行添加增加水印功能,具體代碼如下,喜歡的朋友可以參考下
    2014-01-01
  • php獲取文件名后綴常用方法小結(jié)

    php獲取文件名后綴常用方法小結(jié)

    這篇文章主要介紹了php獲取文件名后綴常用方法,實例分析了五種常用的php獲取文件名后綴的技巧,具有一定參考借鑒價值,需要的朋友可以參考下
    2015-02-02
  • 詳解PHP中的命名空間

    詳解PHP中的命名空間

    這篇文章主要介紹了PHP中的命名空間的相關(guān)資料,幫助大家更好的理解和學(xué)習(xí)使用PHP,感興趣的朋友可以了解下
    2021-04-04
  • php生成HTML文件的類方法

    php生成HTML文件的類方法

    在本篇文章里小編給大家整理的是關(guān)于用php生成HTML文件的類的相關(guān)知識點,有需要的朋友們學(xué)習(xí)下。
    2019-10-10
  • PHP中foreach()用法匯總

    PHP中foreach()用法匯總

    這篇文章主要給大家詳細介紹了PHP中foreach()用法以及相關(guān)的示例,十分的細致,有需要的小伙伴可以參考下。
    2015-07-07
  • PHP中設(shè)置時區(qū)方法小結(jié)

    PHP中設(shè)置時區(qū)方法小結(jié)

    今天發(fā)現(xiàn)一段PHP代碼中的時間判斷語句出了點問題,研究了一下發(fā)現(xiàn)問題出在PHP的時區(qū)設(shè)置上,PHP所取的時間默認是格林威治標(biāo)準(zhǔn)時間,所以和北京時間相差8小時
    2012-06-06
  • php通過function_exists檢測函數(shù)是否存在的方法

    php通過function_exists檢測函數(shù)是否存在的方法

    這篇文章主要介紹了php通過function_exists檢測函數(shù)是否存在的方法,實例分析了php使用function_exists檢測函數(shù)是否存在及調(diào)用的相關(guān)技巧,非常具有實用價值,需要的朋友可以參考下
    2015-03-03
  • PHP也能干大事之PHP中的編碼解碼詳解

    PHP也能干大事之PHP中的編碼解碼詳解

    這篇文章主要介紹了PHP也能干大事之PHP中的編碼解碼詳解,本文講解了ASCII編解碼、URL編解碼、Base64編解碼、HTML實體編解碼、二進制、八進制、十進制、十六進制相互轉(zhuǎn)換等內(nèi)容,需要的朋友可以參考下
    2015-04-04
  • php檢索或者復(fù)制遠程文件的方法

    php檢索或者復(fù)制遠程文件的方法

    這篇文章主要介紹了php檢索或者復(fù)制遠程文件的方法,涉及php使用copy函數(shù)操作文件的技巧,具有一定參考借鑒價值,需要的朋友可以參考下
    2015-03-03

最新評論