剖析nsq消息队列(四) 消息的负载处理
剖析nsq消息队列-目录
实际应用中,一部分服务集群可能会同时订阅同一个topic
,并且处于同一个channel
下。当nsqd
有消息需要发送给订阅客户端去处理时,发给哪个客户端是需要考虑的,也就是我要说的消息的负载。
如果不考虑负载情况,把随机的把消息发送到某一个客服端去处理消息,如果机器的性能不同,可能发生的情况就是某一个或几个客户端处理速度慢,但还有大量新的消息需要处理,其他的客户端处于空闲状态。理想的状态是,找到当前相对空闲的客户端去处理消息。
nsq
的处理方式是客户端主动向nsqd
报告自已的可处理消息数量(也就是RDY
命令)。nsqd
根据每个连接的客户端的可处理消息的状态来随机把消息发送到可用的客户端,来进行消息处理
如下图所示:
客户端更新RDY
从第一篇帖子的例子中我们就有配置consumer的config
config := nsq.NewConfig() config.MaxInFlight = 1000 config.MaxBackoffDuration = 5 * time.Second config.DialTimeout = 10 * time.Second
MaxInFlight
来设置最大的处理中的消息数量,会根据这个变量计算在是否更新RDY
初始化的时候 客户端会向连接的nsqd服务端来发送updateRDY来设置最大处理数,
func (r *Consumer) maybeUpdateRDY(conn *Conn) { inBackoff := r.inBackoff() inBackoffTimeout := r.inBackoffTimeout() if inBackoff || inBackoffTimeout { r.log(LogLevelDebug, "(%s) skip sending RDY inBackoff:%v || inBackoffTimeout:%v", conn, inBackoff, inBackoffTimeout) return } remain := conn.RDY() lastRdyCount := conn.LastRDY() count := r.perConnMaxInFlight() // refill when at 1, or at 25%, or if connections have changed and we're imbalanced if remain <= 1 || remain < (lastRdyCount/4) || (count > 0 && count < remain) { r.log(LogLevelDebug, "(%s) sending RDY %d (%d remain from last RDY %d)", conn, count, remain, lastRdyCount) r.updateRDY(conn, count) } else { r.log(LogLevelDebug, "(%s) skip sending RDY %d (%d remain out of last RDY %d)", conn, count, remain, lastRdyCount) } }
当剩余的可用处理数量remain
小于等于1,或者小于最后一次设置的可用数量lastRdyCount
的1/4时,或者可用连接平均的maxInFlight大于0并且小于remain
时,则更新RDY
状态
当有多个nsqd
时,会把最大的消息进行平均计算:
// perConnMaxInFlight calculates the per-connection max-in-flight count. // // This may change dynamically based on the number of connections to nsqd the Consumer // is responsible for. func (r *Consumer) perConnMaxInFlight() int64 { b := float64(r.getMaxInFlight()) s := b / float64(len(r.conns())) return int64(math.Min(math.Max(1, s), b)) }
当有消息从nsqd
发送过来后也会调用maybeUpdateRDY
方法,计算是否需要发送RDY
命令
func (r *Consumer) onConnMessage(c *Conn, msg *Message) { atomic.AddInt64(&r.totalRdyCount, -1) atomic.AddUint64(&r.messagesReceived, 1) r.incomingMessages <- msg r.maybeUpdateRDY(c) }
上面就是主要的处理逻辑,但还有一些逻辑,就是当消息处理发生错误时,nsq
有自己的退避算法backoff
也会更新RDY
简单来说就是当发现有处理错误时,来进行重试和指数退避,在退避期间RDY
会为0,重试时会先放尝试RDY
为1看有没有错误,如果没有错误则全部放开,这个算法这篇帖子我就不详细说了。
服务端nsqd选择客户端进行发送消息
同时订阅同一topic
的客户端(comsumer)有很多个,每个客户端根据自己的配置或状态发送RDY
命令到nsqd
表明自己能处理多少消息量
nsqd服务端会检查每个客户端的的状态是否可以发送消息。也就是IsReadyForMessages
方法,判断inFlightCount是否大于readyCount,如果大于或者等于就不再给客户端发送数据,等待Ready
后才会再给客户端发送数据
func (c *clientV2) IsReadyForMessages() bool { if c.Channel.IsPaused() { return false } readyCount := atomic.LoadInt64(&c.ReadyCount) inFlightCount := atomic.LoadInt64(&c.InFlightCount) c.ctx.nsqd.logf(LOG_DEBUG, "[%s] state rdy: %4d inflt: %4d", c, readyCount, inFlightCount) if inFlightCount >= readyCount || readyCount <= 0 { return false } return true
每一次发送消息inFlightCount
会+1并保存到发送中的队列中,当客户端发送FIN会-1在之前的帖子中有说过。
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) { // ... for { // 检查订阅状态和消息是否可处理状态 if subChannel == nil || !client.IsReadyForMessages() { // the client is not ready to receive messages... memoryMsgChan = nil backendMsgChan = nil flusherChan = nil // ... flushed = true } else if flushed { memoryMsgChan = subChannel.memoryMsgChan backendMsgChan = subChannel.backend.ReadChan() flusherChan = nil } else { memoryMsgChan = subChannel.memoryMsgChan backendMsgChan = subChannel.backend.ReadChan() flusherChan = outputBufferTicker.C } select { case <-flusherChan: // ... // 消息处理 case b := <-backendMsgChan: client.SendingMessage() // ... case msg := <-memoryMsgChan: client.SendingMessage() //... } } // ... }
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
零代码一分钟创建 HTTP 服务(一)
一、背景 如果你想快速搭建一个 HTTP 服务来测试,以往可能需要用 Java/Node.js 等语言写个脚本部署到服务器上,但现在你多了一种完全无需写代码方案:阿里云逻辑编排。 借助逻辑编排,不要一行代码不到一分钟就可以实现 HTTP 服务。 二、实现 2.1 创建编排实例 首先进入逻辑编排控制台 https://lc.console.aliyun.com/flow,点击 “创建编排实例”: 2.2 图形化设计 创建成功后,会有个弹窗提示是否使用模板,默认不使用,我们也不需要使用,点击“确定” 直接进入到实例的图形化设计界面。 我们先不管这么多概念,直接操作。 2.2.1 接收 HTTP 请求 点击 “当收到 HTTP 请求时”,会进入下图的界面: 可以看到有几个属性: HTTP Endpointe:就是 HTTP 服务的 URL,保存后会自动生成。而我们自己编程实现 HTTP 服务,则需要我们自己绑定域名到部署代码的服务器上; Method:就是通过 GET 或 POST 方法来请求该 HTTP 服务 请求正文 JSON Schema:非必填项,先不管它 现在我们就使用默认配置,继...
- 下一篇
一篇文章带你解读Redis分布式锁的发展史和正确实现方式
前言近两年来微服务变得越来越热门,越来越多的应用部署在分布式环境中,在分布式环境中,数据一致性是一直以来需要关注并且去解决的问题,分布式锁也就成为了一种广泛使用的技术,常用的分布式实现方式为Redis,Zookeeper,其中基于Redis的分布式锁的使用更加广泛。但是在工作和网络上看到过各个版本的Redis分布式锁实现,每种实现都有一些不严谨的地方,甚至有可能是错误的实现,包括在代码中,如果不能正确的使用分布式锁,可能造成严重的生产环境故障,本文主要对目前遇到的各种分布式锁以及其缺陷做了一个整理,并对如何选择合适的Redis分布式锁给出建议。 各个版本的Redis分布式锁 V1.0 tryLock(){ SETNX Key 1 EXPIRE Key Seconds } release(){ DELETE Key } 这个版本应该是最简单的版本,也是出现频率很高的一个版本,首先给锁加一个过期时间操作是为了避免应用在服务重启或者异常导致锁无法释放后,不会出现锁一直无法被释放的情况。这个方案的一个问题在于每次提交一个Redis请求,如果执行完第一条命令后应用异常或者重启,锁将无法过期,一种...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- CentOS关闭SELinux安全模块
- CentOS7设置SWAP分区,小内存服务器的救世主
- Docker安装Oracle12C,快速搭建Oracle学习环境
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Mario游戏-低调大师作品