PHP簡易延時隊列的實現(xiàn)流程詳解
更新時間:2022年11月21日 08:54:40 作者:i_zane
普通的隊列是先進先出,但是延時隊列并不是,而是加上了時間這一權重。希望到達時間點的先執(zhí)行。從某種意義上來講,延遲隊列的結(jié)構(gòu)并不像一個隊列,而更像是一種以時間為權重的有序堆結(jié)構(gòu)
需求說明
- 當用戶申請售后,商家未在n小時內(nèi)處理,系統(tǒng)自動進行退款。
- 商家拒絕后,用戶可申請客服介入,客服x天內(nèi)超時未處理,系統(tǒng)自動退款。
- 用戶收到貨物,x天自動確認收貨
- 等等需要延時操作的流程……
設計思路
- 設計一張隊列表,記錄所有隊列的參數(shù),執(zhí)行狀態(tài),重試次數(shù)
- 將創(chuàng)建隊列的
id
存于redis
中,使用zset
有序集合。按照時間戳進行排序 - 使用
croontab
定時任務每分鐘執(zhí)行一次
實現(xiàn)
新建隊列表
CREATE TABLE `delay_queue` ( `id` int(10) unsigned NOT NULL AUTO_INCREMENT, `params` varchar(512) DEFAULT NULL, `message` varchar(255) DEFAULT '' COMMENT '執(zhí)行結(jié)果', `ext_string` varchar(255) DEFAULT '' COMMENT '擴展字符串,可用于快速檢索。取消該隊列', `retry_times` int(2) DEFAULT '0' COMMENT '重試次數(shù)', `status` int(2) NOT NULL DEFAULT '1' COMMENT '1 待執(zhí)行, 10 執(zhí)行成功, 20 執(zhí)行失敗,30取消執(zhí)行', `created` datetime DEFAULT NULL, `modified` datetime DEFAULT NULL, PRIMARY KEY (`id`), KEY `ext_idx` (`ext_string`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
部分隊列的操作方法,新增隊列、取消隊列、隊列執(zhí)行成功、隊列執(zhí)行失敗、隊列重試【重試時間間隔抄的微信支付的異步通知時間】
class DelayQueueService { // 重試時間,最大重試次數(shù) 15 private static $retryTimes = [ 15, 15, 30, 3 * 60, 10 * 60, 20 * 60, 30 * 60, 30 * 60, 30 * 60, 60 * 60, 3 * 60 * 60, 3 * 60 * 60, 3 * 60 * 60, 6 * 60 * 60, 6 * 60 * 60, ]; /** * @description 增加隊列至redis * @param $queueId * @param int $delay 需要延遲執(zhí)行的時間。單位秒 * @return void */ public function addDelayQueue($queueId, int $delay) { $time = time() + $delay; $redis = RedisService::getInstance(); $redis->zAdd("delay_queue_job", $time, $queueId); } // 取消redis 隊列 public function cancelDelayQueue($ext) { $row = $query->getRow(); // 使用ext_string 快速檢索到相應的記錄 if ($row) { $redis = RedisService::getInstance(); $redis->zRem('delay_queue_job', $row->id); $row->status = DelayQueueTable::STATUS_CANCEL; $table->save($row); } } /** * @description 執(zhí)行成功 * @return void */ public static function success($id, $message = null) { $table->update([ 'status' => DelayQueueTable::STATUS_SUCCESS, 'message' => $message ?? '', 'modified' => date('Y-m-d H:i:s'), ], [ 'id' => $id, ]); } /** * @description 執(zhí)行失敗 * @return void */ public static function failed($id, $message = null) { $table->updateAll([ 'status' => DelayQueueTable::STATUS_FAILED, 'message' => $message ?? '', 'modified' => date('Y-m-d H:i:s'), ], [ 'id' => $id, ]); } /** * @description 失敗隊列重試,最大重試15次 * @param $id * @return void */ public static function retry($id) { $info = self::getById($id); if (!$info) { return; } $retryTimes = ++$info['retry_times']; if ($retryTimes > 15) { return; } $entity = [ 'params' => $info['params'], 'ext_string' => $info['ext_string'], 'retry_times' => $retryTimes, ]; $queueId = $table->save($entity); self::addDelayQueue($queueId, self::$retryTimes[$retryTimes - 1]); } }
在命令行進行任務的運行
public function execute(Arguments $args, ConsoleIo $io) { $startTimestamp = strtotime("-1 days"); $now = time(); $redis = RedisService::getInstance(); $queueIds = $redis->zRangeByScore('delay_queue_job', $startTimestamp, $now); if ($queueIds) { foreach ($queueIds as $id) { $info = // 按照隊列id 獲取相應的信息 if ($info['status'] === DelayQueueTable::STATUS_PADDING) { $params = unserialize($info['params']); // 創(chuàng)建記錄的時候,需要試用serialize 將類名,方法,參數(shù)序列化 $class = $params['class']; $method = $params['method']; $data = $params['data']; try { call_user_func_array([$class, $method], [$data]); $redis->zRem('delay_queue_job', $id); $msg = date('Y-m-d H:i:s') . " [info] success: $id"; DelayQueueService::success($id, $msg); $io->success($msg); } catch (Exception $e) { $msg = date('Y-m-d H:i:s') . " [error] {$e->getMessage()}"; DelayQueueService::failed($id, $msg); // 自定義異常code,不進行隊列重試 if (10000 != $e->getCode()) { DelayQueueService::retry($id); } $io->error($msg); } } } } }
最后說點
- 我這邊的系統(tǒng)對實時性要求不高,所以直接使用的是
linux
的crond
服務,每分鐘運行一次。如需精確到秒級,可寫一個shell
,一分鐘循環(huán)執(zhí)行<=60
次 - 因為目前的數(shù)據(jù)較少,延時隊列加入的只有小部分。所以就在
command
里面直接執(zhí)行更新操作了,后期如果隊列多,且有比較耗時的操作,可考慮把耗時操作單獨放置一個隊列中。本方法只用于將數(shù)據(jù)塞進隊列。
附上 shell
腳本 一分鐘執(zhí)行60次
#!/bin/bash step=2 #間隔的秒數(shù),不能大于60 for (( i = 0; i < 60; i=(i+step) )); do echo $i # do something sleep $step done
到此這篇關于PHP簡易延時隊列的實現(xiàn)流程詳解的文章就介紹到這了,更多相關PHP延時隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
PHP 函數(shù)call_user_func和call_user_func_array用法詳解
下面來和大家分享一下這個call_user_func_array和call_user_func函數(shù)的用法,另外附贈func_get_args()函數(shù)和func_num_args()函數(shù),嘿嘿!!2014-03-03