剖析nsq消息队列(二) 去中心化代码源码解析
在上一篇帖子剖析nsq消息队列(一) 简介及去中心化实现原理中,我介绍了nsq的两种使用方式,一种是直接连接,还有一种是通过nslookup来实现去中心化的方式使用,并大概说了一下实现原理,没有什么难理解的东西,这篇帖子我把nsq
实现去中心化的源码和其中的业物逻辑展示给大家看一下。
nsqd和nsqlookupd的通信实现
上一篇中在启动nsqd
时我用了以下命令,我指定了一个参数 --lookupd-tcp-address
./nsqd -tcp-address ":8000" -http-address ":8001" --lookupd-tcp-address=127.0.0.1:8200 --lookupd-tcp-address=127.0.0.1:7200 -data-path=./a
--lookupd-tcp-address
用于指定nsqlookupd
的tcp
监听地址。
nsqd
和 nsqlookupd
的通信交流简单来说就是下图这样
nsqd
启动后连接nsqlookupd
,连接成功后,要发送一个魔法标识nsq.MagicV1
,这个标识有啥魔法么,当然不是,他只是用于标明,客户端和服务端双方使用的信息通信版本,不能的版本有不同的处理方式,为了后期做新的消息处理版本方便吧。nsqlookupd
的代码块
func (p *tcpServer) Handle(clientConn net.Conn) { // ... buf := make([]byte, 4) _, err := io.ReadFull(clientConn, buf) // ... protocolMagic := string(buf) // ... var prot protocol.Protocol switch protocolMagic { case " V1": prot = &LookupProtocolV1{ctx: p.ctx} default: // ... return } err = prot.IOLoop(clientConn) //... }
这个时候的nsqd
已经和nsqlookupd
建立好了连接,但是这时,仅仅说明他俩连接成功。nsqlookupd
也并没有把这个连接加到可用的nsqd
列表里。
建立连接完成后,nsqd
会发送IDENTIFY
命令,这个命令里包含了nsq的基本信息nsqd
的代码
ci := make(map[string]interface{}) ci["version"] = version.Binary ci["tcp_port"] = n.RealTCPAddr().Port ci["http_port"] = n.RealHTTPAddr().Port ci["hostname"] = hostname ci["broadcast_address"] = n.getOpts().BroadcastAddress cmd, err := nsq.Identify(ci) if err != nil { lp.Close() return } resp, err := lp.Command(cmd)
包含了nsqd
提供的tcp
和http
端口,主机名,版本等等,发送给nsqlookupd
,nsqlookupd
收到IDENTIFY
命令后,解析信息然后加到nsqd
的可用列表里nsqlookupd
的代码块
func (p *LookupProtocolV1) IDENTIFY(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) { var err error if client.peerInfo != nil { return nil, protocol.NewFatalClientErr(err, "E_INVALID", "cannot IDENTIFY again") } var bodyLen int32 err = binary.Read(reader, binary.BigEndian, &bodyLen) // ... body := make([]byte, bodyLen) _, err = io.ReadFull(reader, body) // ... peerInfo := PeerInfo{id: client.RemoteAddr().String()} err = json.Unmarshal(body, &peerInfo) // ... client.peerInfo = &peerInfo // 把nsqd的连接加入到可用列表里 if p.ctx.nsqlookupd.DB.AddProducer(Registration{"client", "", ""}, &Producer{peerInfo: client.peerInfo}) { p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category:%s key:%s subkey:%s", client, "client", "", "") } // ... return response, nil }
然后每过15秒,会发送一个PING
心跳命令给nsqlookupd
,这样保持存活状态,nsqlookupd
每次收到发过来的PING
命令后,也会记下这个nsqd
的最后更新时间,这样做为一个筛选条件,如果长时间没有更新,就认为这个节点有问题,不会把这个节点的信息加入到可用列表。
到此为止,一个nsqd
就把自己的信息注册到nsqlookupd
的可用列表了,我们可以启动多个nsqd
和多个nsqlookupd
,为nsqd
指定多个nsqlookupd
,就如同我上一篇帖子写的那样
--lookupd-tcp-address=127.0.0.1:8200 --lookupd-tcp-address=127.0.0.1:7200
nsqd
和所有的nsqlookupd
建立连接,注册服务信息,并保持心跳,保证可用列表的更新.
nsqlookupd 挂掉的处理方式
上面我们说了nsqd
如果出现问题,nsqlookupd
的nsqd
可用列表里就会处理掉这个连接信息。如nsqlookupd
挂了怎么办呢
目前的处理方式是这样的,
无论是心跳,还是其他命令,nsqd
会给所有的nsqlookup
发送信息,当nsqd
发现nsqlookupd
出现问题时,在每次发送命令时,会不断的进行重新连接:
func (lp *lookupPeer) Command(cmd *nsq.Command) ([]byte, error) { initialState := lp.state if lp.state != stateConnected { err := lp.Connect() if err != nil { return nil, err } lp.state = stateConnected _, err = lp.Write(nsq.MagicV1) if err != nil { lp.Close() return nil, err } if initialState == stateDisconnected { lp.connectCallback(lp) } if lp.state != stateConnected { return nil, fmt.Errorf("lookupPeer connectCallback() failed") } } // ... }
如果连接成功,会再次调用connectCallback
方法,进行IDENTIFY
命令的调用等。
客户端和nsqlookupd、nsqd的通信实现
上一篇帖子里介绍了,客户端如何连接nsqlookupd
来进行通信
adds := []string{"127.0.0.1:7201", "127.0.0.1:8201"} config := nsq.NewConfig() config.MaxInFlight = 1000 config.MaxBackoffDuration = 5 * time.Second config.DialTimeout = 10 * time.Second topicName := "testTopic1" c, _ := nsq.NewConsumer(topicName, "ch1", config) testHandler := &MyTestHandler{consumer: c} c.AddHandler(testHandler) if err := c.ConnectToNSQLookupds(adds); err != nil { panic(err) }
需要注意adds
里地址的端口,是nsqlookupd
的http
端口
这里我还使用上一篇帖子中的图,给大家详细分析
调用方法c.ConnectToNSQLookupds(adds)
,他的实现是访问nsqlookupd
的http端口http://127.0.0.1:7201/lookup?topic=testTopic1
得到提供consumer
订阅的topic
所有的producers
节点信息, url返回的数据信息如下。
{ "channels": [ "nsq_to_file", "ch1" ], "producers": [ { "remote_address": "127.0.0.1:58606", "hostname": "li-peng-mc-macbook.local", "broadcast_address": "li-peng-mc-macbook.local", "tcp_port": 8000, "http_port": 8001, "version": "1.1.1-alpha" }, { "remote_address": "127.0.0.1:58627", "hostname": "li-peng-mc-macbook.local", "broadcast_address": "li-peng-mc-macbook.local", "tcp_port": 7000, "http_port": 7001, "version": "1.1.1-alpha" } ] }
方法queryLookupd
就是进行的上图的操作
- 得到提供订阅的
topic
的nsqd
列表 - 进行连接
func (r *Consumer) queryLookupd() { retries := 0 retry: endpoint := r.nextLookupdEndpoint() // ... err := apiRequestNegotiateV1("GET", endpoint, nil, &data) if err != nil { // ... } var nsqdAddrs []string for _, producer := range data.Producers { broadcastAddress := producer.BroadcastAddress port := producer.TCPPort joined := net.JoinHostPort(broadcastAddress, strconv.Itoa(port)) nsqdAddrs = append(nsqdAddrs, joined) } // 进行连接 for _, addr := range nsqdAddrs { err = r.ConnectToNSQD(addr) if err != nil && err != ErrAlreadyConnected { r.log(LogLevelError, "(%s) error connecting to nsqd - %s", addr, err) continue } } }
如何刷新nsqd的可用列表
有新的nsqd加入,是如何处理的呢?
在调用ConnectToNSQLookupd
时会启动一个协程go r.lookupdLoop()
调用方法lookupdLoop
的定时循环访问 queryLookupd
更新 nsqd
的可用列表
// poll all known lookup servers every LookupdPollInterval func (r *Consumer) lookupdLoop() { // ... var ticker *time.Ticker select { case <-time.After(jitter): case <-r.exitChan: goto exit } // 设置Interval 来循环访问 queryLookupd ticker = time.NewTicker(r.config.LookupdPollInterval) for { select { case <-ticker.C: r.queryLookupd() case <-r.lookupdRecheckChan: r.queryLookupd() case <-r.exitChan: goto exit } } exit: // ... }
处理 nsqd 的单点故障
当有nsqd
出现故障时怎么办?当前的处理方式是
-
nsqdlookupd
会把这个故障节点从可用列表中去除,客户端从接口得到的可用列表永远都是可用的。 - 客户端会把这个故障节点从可用节点上移除,然后要去判断是否使用了
nsqlookup
进行了连接,如果是则case r.lookupdRecheckChan <- 1
去刷新可用列表queryLookupd
,如果不是,然后启动一个协程去定时做重试连接,如果故障恢复,连接成功,会重新加入到可用列表.
客户端实现的代码
func (r *Consumer) onConnClose(c *Conn) { // ... // remove this connections RDY count from the consumer's total delete(r.connections, c.String()) left := len(r.connections) // ... r.mtx.RLock() numLookupd := len(r.lookupdHTTPAddrs) reconnect := indexOf(c.String(), r.nsqdTCPAddrs) >= 0 // 如果使用的是nslookup则去刷新可用列表 if numLookupd > 0 { // trigger a poll of the lookupd select { case r.lookupdRecheckChan <- 1: default: } } else if reconnect { // ... }(c.String()) } }
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Mockito 2 关于打标(stubbing)
请参考下面有关于打标的代码。 //You can mock concrete classes, not just interfacesLinkedList mockedList = mock(LinkedList.class); //stubbingwhen(mockedList.get(0)).thenReturn("first");when(mockedList.get(1)).thenThrow(newRuntimeException()); //following prints "first"System.out.println(mockedList.get(0)); //following throws runtime exceptionSystem.out.println(mockedList.get(1)); //following prints "null" because get(999) was not stubbedSystem.out.println(mockedList.get(999)); //Although it is possible to verif...
- 下一篇
最新115道华为、京东、滴滴、美团精选Java面试题整理
京东面试题 一般sql注入怎么发现触点的,从源码阐述sqlmap如何测试注入点的。 masscan扫描端口时靠什么检测,为什么这么快? 请详述. 你写过哪些小工具,你为你使用过的工具做过什么修改. 如何提高采用python编写的扫描速度,谈谈对GIL锁的了解.5.你觉得你发现的那个漏洞影响比较大. 常见的web漏洞有哪些. 有没有玩过硬件安全,研究程度如何. 反爬虫,如果是你如何进行反爬虫,如何绕过反爬措施。使用无头浏览器被检测到了,如何绕过 nmap扫描如何进行扫描。发包与协议,握手和不握手,哪些协议握手,哪些不握手. 如何不直接接触目标服务器探测对方端口是否开放 有没有自己编写过yara扫描模块,如果要解决扫描{k1:v1, k2:v2, k3:v3} ,保证同时在k1中的v1里出现特定值,k2中出现v2特定值,以及k3,v3。怎么实现 xss什么原理,如何自己实现一个beef类似的xss平台. 既然这样实现,面临的跨域如何解决? ip 频率限制, ip信誉度模型? SCTP协议是什么?如何使用 SCTP 优化网络? 美团面试题 java虚拟机内存模型 内存溢出一般发生在哪个区?永...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- CentOS8安装Docker,最新的服务器搭配容器使用
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- 设置Eclipse缩进为4个空格,增强代码规范
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS8编译安装MySQL8.0.19
- SpringBoot2整合Redis,开启缓存,提高访问速度
- SpringBoot2配置默认Tomcat设置,开启更多高级功能