玩转redis-延时消息队列
上一篇基于redis
的list实现了一个简单的消息队列:玩转redis-简单消息队列
产品经理经常说的一句话,我们不光要有X
功能,还要Y
功能,这样客户才能更满意。同样的,只有简单消息队列是不够的,还要有延时消息队列
才能算是一个完整的消息队列。
看看redis
的命令,放眼望去,的有序集合(sorted set)就是一个很好用的命令,完全可以用他做一个延时消息队列
redis有序集合(sorted set)
redis
有序集合,每个元素都会关联一个double
类型的分数。redis
正是通过分数来为集合中的成员进行从小到大的排序。
有序集合的成员是唯一的,但分数(score
)却可以重复。
简单操作
添加数据
127.0.0.1:6379> ZADD testSet1 5 a (integer) 1 127.0.0.1:6379> ZADD testSet1 1 b 8 c 7 d (integer) 3
读取
127.0.0.1:6379> ZRANGEBYSCORE testSet1 0 3 1) "b" 127.0.0.1:6379> ZRANGEBYSCORE testSet1 0 5 1) "b" 2) "a"
也可以把score
打出来
127.0.0.1:6379> ZRANGEBYSCORE testSet1 -inf 5 WITHSCORES 1) "b" 2) "1" 3) "a" 4) "5"
查出所有的数据
127.0.0.1:6379> ZRANGEBYSCORE testSet1 -inf inf 1) "b" 2) "a" 3) "d" 4) "c"
删除数据
ZREMRANGEBYSCORE testSet1 0 2
延时队列的实现思路
总体的思路很简单,就是每一个value
的score
保存的是时间,也就是说,在添加一个元素时他的score
是当前时间+延时的时间。轮循获取数据时,查找小于或等于当前时间的数据项,就是具体的延时消息。
还有一个问题,就是
ZRANGEBYSCORE
和list
的pop
不同,pop
是取出元素并且会把元素在list
中删除。ZRANGEBYSCORE
只会取出数据不会把数据从sorted set
中删除。解决方法1,利用redis
的事务
,先ZRANGEBYSCORE
取出数据,然后再用ZREMRANGEBYSCORE
把数据删除。
具体实现-code
添加延时消息,参数delay
就是我们要延时多久:
func (p *Producer) PublishDelayMsg(topicName string, body []byte, delay time.Duration) error { if delay <= 0 { return errors.New("delay need great than zero") } tm := time.Now().Add(delay) msg := NewMessage("", body) msg.DelayTime = tm.Unix() sendData, _ := json.Marshal(msg) return p.redisCmd.ZAdd(topicName+zsetSuffix, redis.Z{Score: float64(tm.Unix()), Member: string(sendData)}).Err() }
使用,比如我们想过1秒再处理
producer.PublishDelayMsg(topicName, body, time.Second)
读取消息并处理
这就比较简单了,就是在一个ticker
里循环读取小于或等于当前时间的数据:
func (s *consumer) startGetDelayMessage() { go func() { ticker := time.NewTicker(s.options.RateLimitPeriod) defer func() { log.Println("stop get delay message.") ticker.Stop() }() topicName := s.topicName + zsetSuffix for { currentTime := time.Now().Unix() select { case <-s.ctx.Done(): log.Printf("context Done msg: %#v \n", s.ctx.Err()) return case <-ticker.C: var valuesCmd *redis.ZSliceCmd _, err := s.redisCmd.TxPipelined(func(pip redis.Pipeliner) error { valuesCmd = pip.ZRangeWithScores(topicName, 0, currentTime) pip.ZRemRangeByScore(topicName, "0", strconv.FormatInt(currentTime, 10)) return nil }) if err != nil { log.Printf("zset pip error: %#v \n", err) continue } rev := valuesCmd.Val() for _, revBody := range rev { msg := &Message{} json.Unmarshal([]byte(revBody.Member.(string)), msg) if s.handler != nil { s.handler.HandleMessage(msg) } } } } }() }
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
两行代码险些搞垮 JavaScript 生态,受影响项目超百万
云栖号资讯:【点击查看更多行业资讯】在这里您可以找到不同行业的第一手的上云资讯,还在等什么,快来! 4 月 25 日,一个名为 is-promise 的 npm 库进行了更新并发布了 v2.2.0 版本,没想到这一次更新却使 JavaScript 生态陷入危机,据媒体报道,目前已有数百万个项目受到了影响,而事件的始作俑者竟是一个仅仅“单行”的 JavaScript 库。 事件回顾 is-promise 库主要用来测试 JavaScript 对象是否为“Promise”,并在开发时使用该函数返回布尔值 yes 或 no,开发者可以通过 one-liner 调用并在自己的项目中使用这个库。4 月 25 日,is-promise 正常进行更新,发布了 is-promise v2.2.0,但由于该版本并未遵循正确的 ES 模块标准,从而导致更新完成后,由于不正确的 ES 模块标准,所有在构建时使用 is-promise 库的项目几乎全部发生故障。虽然这一错误不会使现有项目崩溃,但它却对开发者编译自己项目的新版本造成了影响。来看一下“肇事者”: declare function isPromis...
- 下一篇
在Java代码中打日志需要注意什么?
云栖号资讯:【点击查看更多行业资讯】在这里您可以找到不同行业的第一手的上云资讯,还在等什么,快来! 为什么要打日志? 日志是什么?日志是你在代码运行时打印出来的一些数据和记录,是快速排查问题的好帮手! 做一件事情之前,先思考为什么。为什么我们在开发中,需要打日志?原因很简单,没人能保证自己写的程序没有BUG,即使你做了足够的测试,也只是能降低产生BUG的概率而已。 尤其是当今分布式环境,定位问题变得越来越复杂。所以我们想要获取一些程序“运行时”的信息,日志就是最方便的。 所以,这种福泽后来人的好东西,当然要用起来了~ Java日志框架 要说Java日志框架啊,要从远古时代的JDK 1.3之前说起。那时候大家打印日志就是直接输出到STDOUT或者STDERR流。 System.out.println()System.err.println()e.printStackTrace() 于是log4j在大牛Ceki中应运而生,后面经过一系列的发展,以及Ceki与Apache的吃瓜事件,逐渐发展为slf4j、logback、log4j2三种最主流的日志框架。 slf4j: 日志的“门面”框架,对...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS6,CentOS7官方镜像安装Oracle11G
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Windows10,CentOS7,CentOS8安装Nodejs环境
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- CentOS8编译安装MySQL8.0.19
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS8安装Docker,最新的服务器搭配容器使用
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- SpringBoot2整合Redis,开启缓存,提高访问速度