PHP使用Redis的List(列表)命令实现消息队列
1.用到的List(列表)命令
命令 | 作用 |
---|---|
lPush | 将一个或多个值插入到列表头部 |
rpoplpush | 弹出列表最后一个值,同时插入到另一个列表头部,并返回该值 |
lRem | 删除列表内的给定值 |
lIndex | 按索引获取列表内的值 |
2.队列的组成
名称 | 职责 |
---|---|
生产者 | 发布消息 |
消费者 | 获取并处理消息 |
监听者 | 监听超时的消息,弹回原消息队列,确保消费者挂掉后或处理失败后消息能被其他消费者处理 |
3.php实现代码
生产者Producter.php
<?php /** * Created by PhpStorm. * User: jmsite.cn * Date: 2019/1/26 * Time: 0:13 */ try { //声明消息队列-list的键名 $queueKey = 'testQueueKey'; $redis = new Redis(); $redis->connect('192.168.75.132', 6379); //向列表中push10条消息 for ($i = 0;$i < 10;$i++){ //为消息生成唯一标识 $uniqid = uniqid(mt_rand(10000, 99999).getmypid().memory_get_usage(), true); $ret = $redis->lPush($queueKey, json_encode(array('uniqid' => $uniqid, 'key' => 'key-'.$i, 'value' => 'data'))); var_dump($ret); } } catch (Exception $e){ echo $e->getMessage(); }
消费者Consumer.php
<?php /** * Created by PhpStorm. * User: jmsite.cn * Date: 2019/1/26 * Time: 0:14 */ try { //声明消息队列-list的键名 $queueKey = 'testQueueKey'; //声明监听者队列-list的键名 $watchQueueKey = 'watchQueueKey'; $redis = new Redis(); $redis->connect('192.168.75.132', 6379); //队列先进先出,弹出最先加入的消息,同时放入监听队列 while (true){ $ret = $redis->rpoplpush($queueKey, $watchQueueKey); if ($ret === false){ sleep(1); } else { $retArray = json_decode($ret, true); //将唯一id写入缓存设置有效期 $redis->setex($retArray['uniqid'], 60, 0); //模拟失败 $rand = mt_rand(0,9); if ($rand < 3){ echo "failure:".$ret."\n"; } else { //todo //处理成功移除消息 $redis->lRem($watchQueueKey, $ret, 0); echo "success:".$ret."\n"; } } } } catch (Exception $e){ echo $e->getMessage(); }
监听者Watcher.php
<?php /** * Created by PhpStorm. * User: jmsite.cn * Date: 2019/1/26 * Time: 0:15 */ try { //声明消息队列-list的键名 $queueKey = 'testQueueKey'; //声明监听者队列-list的键名 $watchQueueKey = 'watchQueueKey'; $redis = new Redis(); $redis->connect('192.168.75.132', 6379); while (true){ //取出列表尾部的一个值 $ret = $redis->lIndex($watchQueueKey, -1); //如果不存在则休眠1秒 if ($ret === false){ sleep(1); } else { $retArray = json_decode($ret, true); $idCache = $redis->get($retArray['uniqid']); if ($idCache === false){ //如果已过期,表示任务超时,弹回原队列 $redis->rpoplpush($watchQueueKey, $queueKey); echo "rpoplpush:".$ret."\n"; } else { //处理中,继续等待 sleep(1); } } } } catch (Exception $e){ echo $e->getMessage(); }
4.执行队列
开启监听者php Watcher.php
开启消费者php Consumer.php
执行生产者php Producter.php
生产者输出
int(1) int(2) int(3) int(4) int(5) int(6) int(7) int(8) int(9) int(10)
监听者输出
rpoplpush:{"uniqid":"28580267323642245c4bde640dd8f3.30292468","key":"key-1","value":"data"} rpoplpush:{"uniqid":"10258267323642245c4bde640e1cd9.95656605","key":"key-4","value":"data"} rpoplpush:{"uniqid":"43356267323642245c4bde640e88e9.50566706","key":"key-5","value":"data"} rpoplpush:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"} rpoplpush:{"uniqid":"83293267323642245c4bde640ed753.04622366","key":"key-9","value":"data"} rpoplpush:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}
消费者输出
success:{"uniqid":"47280267323557445c4bde640dbfb4.78962728","key":"key-0","value":"data"} failure:{"uniqid":"28580267323642245c4bde640dd8f3.30292468","key":"key-1","value":"data"} success:{"uniqid":"39394267323642245c4bde640de992.34641654","key":"key-2","value":"data"} success:{"uniqid":"41335267323642245c4bde640df980.38466514","key":"key-3","value":"data"} failure:{"uniqid":"10258267323642245c4bde640e1cd9.95656605","key":"key-4","value":"data"} failure:{"uniqid":"43356267323642245c4bde640e88e9.50566706","key":"key-5","value":"data"} failure:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"} success:{"uniqid":"43817267323642245c4bde640ec189.44008738","key":"key-7","value":"data"} success:{"uniqid":"69276267323642245c4bde640ecb91.04877522","key":"key-8","value":"data"} failure:{"uniqid":"83293267323642245c4bde640ed753.04622366","key":"key-9","value":"data"} success:{"uniqid":"28580267323642245c4bde640dd8f3.30292468","key":"key-1","value":"data"} success:{"uniqid":"10258267323642245c4bde640e1cd9.95656605","key":"key-4","value":"data"} success:{"uniqid":"43356267323642245c4bde640e88e9.50566706","key":"key-5","value":"data"} failure:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"} success:{"uniqid":"83293267323642245c4bde640ed753.04622366","key":"key-9","value":"data"} success:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}
我们看到消费者第一次执行时失败的消息,超时后又被弹回了消息队列,消费者有了再次执行的机会,监听者的职责就是确保消费者执行失败或挂掉后消息还能再弹回原队列得到再次执行
原文地址:https://www.jmsite.cn/blog-615.html
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
PHP使用Redis的Hash(哈希表)命令
1.Hash(哈希表)命令 命令 作用 返回值 hDel 删除哈希表key中一个或多个字段,传入不存在的字段将被忽略 返回成功删除的字段数 hExists 判断哈希表key中是否存在给定的字段 存在true,不存在false hGet 从哈希表key中获取给定的一个字段值 存在返回字段值,不存在返回false hGetAll 从哈希表key中获取全部的字段和值 存在返回关联数组,不存在返回空数组 hIncrBy 整形数值递增(有符号) 递增后的值 hIncrByFloat 浮点型数值递增(有符号) 递增后的值 hKeys 返回哈希表key中所有的字段 存在字段返回关联数组,不存在返回空数组 hLen 返回哈希表key中字段数量 大于等于0的整数 hMGet 返回哈希表key中给定的一个或多个字段的值 返回一个关联数租,键为查询的字段,值为查询到的值,查询的字段不存在则值为false hMSet 对哈希表key设置一组字段值 如果命令成功,返回true hSet 对哈希表key设置一个字段值 如果是设置了一个新值,返回1,如果覆盖了一个旧值,返回0 HSetnx 在哈希表key中不存在给...
- 下一篇
PHP获取Memcached的cas_token
php官方提供的方法代码 $ips = $m->get('ip_block', null, $cas); 按照php官方文档提供的代码来获取cas_token,结果$cas始终是null,查了好久,原来php5和php7中获取cas_token的方式是不同的php5的方法 $ips = $m->get('ip_block', null, $cas); var_dump($cas); php7的方法 $_val = $m->get('ip_block', null, Memcached::GET_EXTENDED); var_dump($_val['cas']); 做个判断 $cas = null; if (defined(Memcached::GET_EXTENDED)){ $_val = $m->get('ip_block', null, Memcached::GET_EXTENDED); $cas = $_val['cas']; } else { $ips = $m->get('ip_block', null, $cas); } var_dump($c...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- 2048小游戏-低调大师作品
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS8安装Docker,最新的服务器搭配容器使用
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- Windows10,CentOS7,CentOS8安装Nodejs环境
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- 设置Eclipse缩进为4个空格,增强代码规范
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16