您现在的位置是:首页 > 文章详情

PHP使用Beanstalkd实例

日期:2019-03-19点击:285

相关笔记:
Beanstalkd消息/任务队列
CentOS编译和yum安装Beanstalkd及service和systemctl管理
Composer在Windows和Linux的安装和使用
有关Beanstalkd的基本概念,编译和yum的安装方法已经在上述笔记中记录了,今天练习下PHP使用Beanstalkd的过程,我选择的是使用Pheanstalk类来连接Beanstalkd

1.使用Composer安装Pheanstalk

composer require pda/pheanstalk

2.实现代码

php查看beanstalkd状态脚本Status.php

<?php /** * Created by PhpStorm. * User: jmsite.cn * Date: 2019/1/21 * Time: 10:32 */ require "../vendor/autoload.php"; use Pheanstalk\Pheanstalk; $pheanstalk = new Pheanstalk('192.168.75.135',11300); print_r($pheanstalk->stats());

生产者代码Producter.php

<?php /** * Created by PhpStorm. * User: jmsite.cn * Date: 2019/1/20 * Time: 16:30 */ require "../vendor/autoload.php"; use Pheanstalk\Pheanstalk; $pheanstalk = new Pheanstalk('192.168.75.135',11300); for ($i=0;$i<50;$i++){ $data = array( 'key' => 'testkey'.$i, 'value' => 'testvalue', 'time' => time(), ); $ret = $pheanstalk->putInTube('test-tube', json_encode($data), Pheanstalk::DEFAULT_PRIORITY, Pheanstalk::DEFAULT_DELAY, Pheanstalk::DEFAULT_TTR); var_dump($ret); }

消费者代码Consumer.php

<?php /** * Created by PhpStorm. * User: jmsite.cn * Date: 2019/1/20 * Time: 16:31 */ set_time_limit(0); ini_set('default_socket_timeout', 900); require "../vendor/autoload.php"; use Pheanstalk\Pheanstalk; $pheanstalk = new Pheanstalk('192.168.75.135',11300); while (true){ $job = $pheanstalk ->watch('test-tube') ->ignore('default') ->reserve(); if ($job){ sleep(2); echo $job->getData(); echo "\n"; $pheanstalk->delete($job); } } 

打开命令行/终端窗口,执行生产者,会向tube写入50条任务

PS E:\repository\work\beanstalk> php .\Producter.php int(101) int(102) int(103) int(104) int(105) int(106) int(107) int(108) int(109) int(110) int(111) int(112) int(113) int(114) ......

由此可见,$pheanstalk->putInTube成功后返回的是job的id
查看状态

PS E:\repository\work\beanstalk> php Status.php Pheanstalk\Response\ArrayResponse Object ( [_name:Pheanstalk\Response\ArrayResponse:private] => OK [storage:ArrayObject:private] => Array ( [current-jobs-urgent] => 0 [current-jobs-ready] => 50 [current-jobs-reserved] => 0 [current-jobs-delayed] => 0 [current-jobs-buried] => 0 ......

结果中显示处于ready待读取状态的job是50个
打开两个或以上命令行/终端窗口,执行消费者,模拟多消费者竞争
消费者1

PS E:\repository\work\beanstalk> php .\Consumer.php {"key":"testkey0","value":"testvalue","time":1548039103} {"key":"testkey1","value":"testvalue","time":1548039103} {"key":"testkey2","value":"testvalue","time":1548039103} {"key":"testkey4","value":"testvalue","time":1548039103} {"key":"testkey6","value":"testvalue","time":1548039103} {"key":"testkey8","value":"testvalue","time":1548039103} {"key":"testkey10","value":"testvalue","time":1548039103} {"key":"testkey12","value":"testvalue","time":1548039103} {"key":"testkey14","value":"testvalue","time":1548039103} {"key":"testkey16","value":"testvalue","time":1548039103} {"key":"testkey18","value":"testvalue","time":1548039103} {"key":"testkey20","value":"testvalue","time":1548039103} {"key":"testkey22","value":"testvalue","time":1548039103} {"key":"testkey24","value":"testvalue","time":1548039103} {"key":"testkey26","value":"testvalue","time":1548039103} {"key":"testkey28","value":"testvalue","time":1548039103} {"key":"testkey30","value":"testvalue","time":1548039103} {"key":"testkey32","value":"testvalue","time":1548039103} {"key":"testkey34","value":"testvalue","time":1548039103} {"key":"testkey36","value":"testvalue","time":1548039103} {"key":"testkey38","value":"testvalue","time":1548039103} {"key":"testkey40","value":"testvalue","time":1548039103} {"key":"testkey42","value":"testvalue","time":1548039103} {"key":"testkey44","value":"testvalue","time":1548039103} {"key":"testkey46","value":"testvalue","time":1548039103} {"key":"testkey48","value":"testvalue","time":1548039103}

消费者2

PS E:\repository\work\beanstalk> php .\Consumer.php {"key":"testkey3","value":"testvalue","time":1548039103} {"key":"testkey5","value":"testvalue","time":1548039103} {"key":"testkey7","value":"testvalue","time":1548039103} {"key":"testkey9","value":"testvalue","time":1548039103} {"key":"testkey11","value":"testvalue","time":1548039103} {"key":"testkey13","value":"testvalue","time":1548039103} {"key":"testkey15","value":"testvalue","time":1548039103} {"key":"testkey17","value":"testvalue","time":1548039103} {"key":"testkey19","value":"testvalue","time":1548039103} {"key":"testkey21","value":"testvalue","time":1548039103} {"key":"testkey23","value":"testvalue","time":1548039103} {"key":"testkey25","value":"testvalue","time":1548039103} {"key":"testkey27","value":"testvalue","time":1548039103} {"key":"testkey29","value":"testvalue","time":1548039103} {"key":"testkey31","value":"testvalue","time":1548039103} {"key":"testkey33","value":"testvalue","time":1548039103} {"key":"testkey35","value":"testvalue","time":1548039103} {"key":"testkey37","value":"testvalue","time":1548039103} {"key":"testkey39","value":"testvalue","time":1548039103} {"key":"testkey41","value":"testvalue","time":1548039103} {"key":"testkey43","value":"testvalue","time":1548039103} {"key":"testkey45","value":"testvalue","time":1548039103} {"key":"testkey47","value":"testvalue","time":1548039103} {"key":"testkey49","value":"testvalue","time":1548039103}

两个消费者竞争着完成了全部任务,由于我的beanstalkd启动时开启了binlog持久,所以beanstalkd重启后任务也不会丢失

3.需要注意的事项

1.创建job时,设置的超时时间Pheanstalk::DEFAULT_TTR一定要比消费者处理一个job的时间要长,否则job在超时之后会被tube更改为ready状态,被其他消费者获取,而此时当前消费者还在处理该job,这就出现了一个job被多个消费者重复执行的可怕现象
2.Pheanstalk的维护者发生了变化,在新版的Pheanstalk中是不支持长连接的,当客户端socket连接服务器时间超过php.ini中设置的default_socket_timeout时,如果未能从服务端tube获得job,连接将会被断开,所以消费者进程需要维护,以便在退出后可以重新开启进程,推荐使用supervisord维护消费者进程。
判断socket超时的代码

public function getLine($length = null) { $timeout = ini_get('default_socket_timeout'); $timer = microtime(true); do { $data = isset($length) ? $this->_wrapper()->fgets($this->_socket, $length) : $this->_wrapper()->fgets($this->_socket); if ($this->_wrapper()->feof($this->_socket)) { throw new Exception\SocketException('Socket closed by server!'); } if (($data === false) && microtime(true) - $timer > $timeout) { $this->disconnect(); throw new Exception\SocketException('Socket timed out!'); } } while ($data === false); return rtrim($data); }

原文地址:https://www.jmsite.cn/blog-572.html

原文链接:https://yq.aliyun.com/articles/694502
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章