您现在的位置是:首页 > 文章详情

PHP使用Redis的List(列表)命令实现消息队列

日期:2019-04-26点击:376

1.用到的List(列表)命令

命令 作用
lPush 将一个或多个值插入到列表头部
rpoplpush 弹出列表最后一个值,同时插入到另一个列表头部,并返回该值
lRem 删除列表内的给定值
lIndex 按索引获取列表内的值

2.队列的组成

名称 职责
生产者 发布消息
消费者 获取并处理消息
监听者 监听超时的消息,弹回原消息队列,确保消费者挂掉后或处理失败后消息能被其他消费者处理

3.php实现代码

生产者Producter.php

<?php /** * Created by PhpStorm. * User: jmsite.cn * Date: 2019/1/26 * Time: 0:13 */ try { //声明消息队列-list的键名 $queueKey = 'testQueueKey'; $redis = new Redis(); $redis->connect('192.168.75.132', 6379); //向列表中push10条消息 for ($i = 0;$i < 10;$i++){ //为消息生成唯一标识 $uniqid = uniqid(mt_rand(10000, 99999).getmypid().memory_get_usage(), true); $ret = $redis->lPush($queueKey, json_encode(array('uniqid' => $uniqid, 'key' => 'key-'.$i, 'value' => 'data'))); var_dump($ret); } } catch (Exception $e){ echo $e->getMessage(); }

消费者Consumer.php

<?php /** * Created by PhpStorm. * User: jmsite.cn * Date: 2019/1/26 * Time: 0:14 */ try { //声明消息队列-list的键名 $queueKey = 'testQueueKey'; //声明监听者队列-list的键名 $watchQueueKey = 'watchQueueKey'; $redis = new Redis(); $redis->connect('192.168.75.132', 6379); //队列先进先出,弹出最先加入的消息,同时放入监听队列 while (true){ $ret = $redis->rpoplpush($queueKey, $watchQueueKey); if ($ret === false){ sleep(1); } else { $retArray = json_decode($ret, true); //将唯一id写入缓存设置有效期 $redis->setex($retArray['uniqid'], 60, 0); //模拟失败 $rand = mt_rand(0,9); if ($rand < 3){ echo "failure:".$ret."\n"; } else { //todo //处理成功移除消息 $redis->lRem($watchQueueKey, $ret, 0); echo "success:".$ret."\n"; } } } } catch (Exception $e){ echo $e->getMessage(); }

监听者Watcher.php

<?php /** * Created by PhpStorm. * User: jmsite.cn * Date: 2019/1/26 * Time: 0:15 */ try { //声明消息队列-list的键名 $queueKey = 'testQueueKey'; //声明监听者队列-list的键名 $watchQueueKey = 'watchQueueKey'; $redis = new Redis(); $redis->connect('192.168.75.132', 6379); while (true){ //取出列表尾部的一个值 $ret = $redis->lIndex($watchQueueKey, -1); //如果不存在则休眠1秒 if ($ret === false){ sleep(1); } else { $retArray = json_decode($ret, true); $idCache = $redis->get($retArray['uniqid']); if ($idCache === false){ //如果已过期,表示任务超时,弹回原队列 $redis->rpoplpush($watchQueueKey, $queueKey); echo "rpoplpush:".$ret."\n"; } else { //处理中,继续等待 sleep(1); } } } } catch (Exception $e){ echo $e->getMessage(); }

4.执行队列

开启监听者php Watcher.php
开启消费者php Consumer.php
执行生产者php Producter.php
生产者输出

int(1) int(2) int(3) int(4) int(5) int(6) int(7) int(8) int(9) int(10)

监听者输出

rpoplpush:{"uniqid":"28580267323642245c4bde640dd8f3.30292468","key":"key-1","value":"data"} rpoplpush:{"uniqid":"10258267323642245c4bde640e1cd9.95656605","key":"key-4","value":"data"} rpoplpush:{"uniqid":"43356267323642245c4bde640e88e9.50566706","key":"key-5","value":"data"} rpoplpush:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"} rpoplpush:{"uniqid":"83293267323642245c4bde640ed753.04622366","key":"key-9","value":"data"} rpoplpush:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}

消费者输出

success:{"uniqid":"47280267323557445c4bde640dbfb4.78962728","key":"key-0","value":"data"} failure:{"uniqid":"28580267323642245c4bde640dd8f3.30292468","key":"key-1","value":"data"} success:{"uniqid":"39394267323642245c4bde640de992.34641654","key":"key-2","value":"data"} success:{"uniqid":"41335267323642245c4bde640df980.38466514","key":"key-3","value":"data"} failure:{"uniqid":"10258267323642245c4bde640e1cd9.95656605","key":"key-4","value":"data"} failure:{"uniqid":"43356267323642245c4bde640e88e9.50566706","key":"key-5","value":"data"} failure:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"} success:{"uniqid":"43817267323642245c4bde640ec189.44008738","key":"key-7","value":"data"} success:{"uniqid":"69276267323642245c4bde640ecb91.04877522","key":"key-8","value":"data"} failure:{"uniqid":"83293267323642245c4bde640ed753.04622366","key":"key-9","value":"data"} success:{"uniqid":"28580267323642245c4bde640dd8f3.30292468","key":"key-1","value":"data"} success:{"uniqid":"10258267323642245c4bde640e1cd9.95656605","key":"key-4","value":"data"} success:{"uniqid":"43356267323642245c4bde640e88e9.50566706","key":"key-5","value":"data"} failure:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"} success:{"uniqid":"83293267323642245c4bde640ed753.04622366","key":"key-9","value":"data"} success:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}

我们看到消费者第一次执行时失败的消息,超时后又被弹回了消息队列,消费者有了再次执行的机会,监听者的职责就是确保消费者执行失败或挂掉后消息还能再弹回原队列得到再次执行
原文地址:https://www.jmsite.cn/blog-615.html

原文链接:https://yq.aliyun.com/articles/700385
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章