亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

利用擴展的方式在PHP中使用Kafka的教程分享

 更新時間:2023年05月26日 09:24:23   作者:北橋蘇  
這篇文章主要為大家詳細介紹了如何利用擴展的方式實現(xiàn)在PHP中使用Kafka,具體包括擴展安裝和這種方式的基本操作,感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下

前言

由于之前在 PHP 中使用 Kafka 是通過 composer 包的方式,由于 nmred/kafka-php 很久沒有維護,并且網(wǎng)上相關(guān)問題的文章也比較少。所以我這次換成 PHP 擴展 RdKafka 繼續(xù)使用,主要介紹擴展安裝和這種方式的基本操作。

安裝

1. 下載

地址(找到與自己環(huán)境匹配的就可以)

2. 目錄

由于 php-rdkafka 依賴 librdkafka,linux 就需要先安裝 librdkafka 后安裝 php-rdkafka,而 windows 版本是如下幾個文件,安裝方法如下:

(1). 將 librdkafka.dll 和 librdkafka.pdb 放入 PHP 安裝的根目錄下,而 php_rdkafka.dll 和 php_rdkafka.pdb 放入 PHP 安裝目錄的 ext 下。

(2). php.ini 配置文件添加 extension=php_rdkafka.dll,最后重啟 PHP。

(3). php-m 或這 phpinfo (); 就可以查看到擴展了。

通過 get_declared_classes() 也可以查看到擴展里預(yù)設(shè)的函數(shù)了。

使用

1. 生產(chǎn)

public function kafkaTest()
{
	$rk = new \RdKafka\Producer();
	$rk->addBrokers("127.0.0.1:9092");
	$topic = $rk->newTopic("shop");
	$ret = [];
	for ($i = 0; $i < 5; $i++) {
		$content = "第" . $i . "次發(fā)送失敗";
		$message = ["mobile" => "15623652142", "content" => $content];
		$payload = json_encode($message);
		// 指定向0號partition生產(chǎn)數(shù)據(jù)
		$ret[]['produce_res'] = $topic->produce(0, 0, $payload, "sms_$i");
		// 隨機選擇partition
		//$topic->produce(RD_KAFKA_PARTITION_UA, 0, $payload);
		if ($rk->getOutQLen() > 0) {
			$ret[]['produce_poll'] = $rk->poll(500);
		} else {
			$ret[]['produce_poll'] = $rk->poll(0);
		}
	}
	dump($ret);
}

2. 消費(從指定的 partition 消費)

protected function execute(Input $input, Output $output)
{
	$output->writeln("!!!hello kafka!!!");
	$conf = new \RdKafka\Conf();
	$conf->set('group.id', 'sms-consumer-group');
	$rk = new \RdKafka\Consumer($conf);
	$rk->addBrokers("127.0.0.1:9092");
	$topicConf = new \RdKafka\TopicConf();
	$topicConf->set('auto.commit.interval.ms', 100);
	$topicConf->set('offset.store.method', 'file');
	$topicConf->set('offset.store.path', sys_get_temp_dir());
	$topicConf->set('auto.offset.reset', 'smallest');
	$topic = $rk->newTopic("shop", $topicConf);
	$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
	while(true) {
		// 設(shè)置消費時的時間間隔,單位毫秒,以下表示5秒消費一個
		$message = $topic->consume(0, 5000);
		if ($message) {
			echo "讀取到消息\n\r";
			// 消息對象,包括消息主題,消息創(chuàng)建時間戳,消息分區(qū)編號,消息主體,消息鍵名,消息長度等
			var_dump($message);
			switch ($message->err) {
				case RD_KAFKA_RESP_ERR_NO_ERROR:
					echo "讀取消息成功:\n\r";
					var_dump($message->payload);
					break;
				case RD_KAFKA_RESP_ERR__PARTITION_EOF:
					echo "讀取消息失敗\n\r";
					break;
				case RD_KAFKA_RESP_ERR__TIMED_OUT:
					echo "請求超時\n\r";
					break;
				default:
					throw new \Exception($message->errstr(), $message->err);
					break;
			}
		} else {
			echo "未讀取到消息\n\r";
		}
	}
	$output->writeln("!!!the end!!!");
}

其他

在執(zhí)行消費過程中,發(fā)現(xiàn) kafka 停止服務(wù),拋出的異常:ERROR Shutdown broker because all log dirs in /tmp/kafka-logs have failed。

解決方法

刪除 kafka-logs 下的所有日志,再重新啟動 Kafaka, kafka-server-start.bat ....\config\server.properties &

到此這篇關(guān)于利用擴展的方式在PHP中使用Kafka的教程分享的文章就介紹到這了,更多相關(guān)PHP使用Kafka內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評論