请先关注 [低调大师] 公众号 优秀的自媒体个人博客,低调大师,许军

低调大师

您现在的位置是:首页>文章详情

文章详情

玩转redis-延时消息队列

2020-05-05 17热度

上一篇基于redis的list实现了一个简单的消息队列:玩转redis-简单消息队列

源码地址 使用demo

产品经理经常说的一句话,我们不光要有X功能,还要Y功能,这样客户才能更满意。同样的,只有简单消息队列是不够的,还要有延时消息队列才能算是一个完整的消息队列。

看看redis的命令,放眼望去,的有序集合(sorted set)就是一个很好用的命令,完全可以用他做一个延时消息队列

redis有序集合(sorted set)

redis有序集合,每个元素都会关联一个double类型的分数。redis正是通过分数来为集合中的成员进行从小到大的排序。
有序集合的成员是唯一的,但分数(score)却可以重复。

简单操作

添加数据

127.0.0.1:6379> ZADD testSet1 5 a (integer) 1 127.0.0.1:6379> ZADD testSet1 1 b 8 c 7 d (integer) 3

读取

127.0.0.1:6379> ZRANGEBYSCORE testSet1 0 3 1) "b" 127.0.0.1:6379> ZRANGEBYSCORE testSet1 0 5 1) "b" 2) "a"

也可以把score打出来

127.0.0.1:6379> ZRANGEBYSCORE testSet1 -inf 5 WITHSCORES 1) "b" 2) "1" 3) "a" 4) "5"

查出所有的数据

127.0.0.1:6379> ZRANGEBYSCORE testSet1 -inf inf 1) "b" 2) "a" 3) "d" 4) "c"

删除数据

ZREMRANGEBYSCORE testSet1 0 2

延时队列的实现思路

总体的思路很简单,就是每一个valuescore保存的是时间,也就是说,在添加一个元素时他的score是当前时间+延时的时间。轮循获取数据时,查找小于或等于当前时间的数据项,就是具体的延时消息。

还有一个问题,就是ZRANGEBYSCORElistpop不同,pop是取出元素并且会把元素在list中删除。ZRANGEBYSCORE只会取出数据不会把数据从sorted set中删除。解决方法1,利用redis事务,先ZRANGEBYSCORE取出数据,然后再用ZREMRANGEBYSCORE 把数据删除。

具体实现-code

添加延时消息,参数delay就是我们要延时多久:

func (p *Producer) PublishDelayMsg(topicName string, body []byte, delay time.Duration) error { if delay <= 0 { return errors.New("delay need great than zero") } tm := time.Now().Add(delay) msg := NewMessage("", body) msg.DelayTime = tm.Unix() sendData, _ := json.Marshal(msg) return p.redisCmd.ZAdd(topicName+zsetSuffix, redis.Z{Score: float64(tm.Unix()), Member: string(sendData)}).Err() }

使用,比如我们想过1秒再处理

producer.PublishDelayMsg(topicName, body, time.Second)

读取消息并处理
这就比较简单了,就是在一个ticker里循环读取小于或等于当前时间的数据:

func (s *consumer) startGetDelayMessage() { go func() { ticker := time.NewTicker(s.options.RateLimitPeriod) defer func() { log.Println("stop get delay message.") ticker.Stop() }() topicName := s.topicName + zsetSuffix for { currentTime := time.Now().Unix() select { case <-s.ctx.Done(): log.Printf("context Done msg: %#v \n", s.ctx.Err()) return case <-ticker.C: var valuesCmd *redis.ZSliceCmd _, err := s.redisCmd.TxPipelined(func(pip redis.Pipeliner) error { valuesCmd = pip.ZRangeWithScores(topicName, 0, currentTime) pip.ZRemRangeByScore(topicName, "0", strconv.FormatInt(currentTime, 10)) return nil }) if err != nil { log.Printf("zset pip error: %#v \n", err) continue } rev := valuesCmd.Val() for _, revBody := range rev { msg := &Message{} json.Unmarshal([]byte(revBody.Member.(string)), msg) if s.handler != nil { s.handler.HandleMessage(msg) } } } } }() }
收藏 (0)

相关文章

    文章评论

    共有0条评论来说两句吧...