ZooKeeper Watcher 机制
前言
在 ZooKeeper 中,客户端可以向服务端注册一个监听器,监听某个节点或者其子节点列表,当监听对象发生变化时,服务端就会向指定的客户端发送通知,这是 ZooKeeper 中的 Watcher 机制,Watcher 机制是 ZooKeeper 中一个重要的特性,这篇文章就带大家了解下,底下是 Watcher 机制的执行过程:
从上图可以看到,Watcher 机制包括三个角色:客户端线程、客户端的 WatchManager 以及 ZooKeeper 服务器。Watcher 机制就是这三个角色之间的交互,整个过程分为注册、存储和通知三个步骤:
- 客户端向 ZooKeeper 服务器注册一个 Watcher 监听,
- 把这个监听信息存储到客户端的 WatchManager 中
- 当 ZooKeeper 中的节点发生变化时,会通知客户端,客户端会调用相应 Watcher 对象中的回调方法。
了解了整体的流程之后,接下来就来看下一些细节问题。
客户端处理
要了解 Watcher 机制,首先我们得知道什么时候客户端可以注册一个 Watcher 呢?通过查看 API 我们可以了解到,**在创建 ZooKeeper 对象,或者是在读取数据时(即调用 getData、exists、getChildren 方法)可以注册一个 Watcher 监听,**他们内部的实现都是一样的,这里我们就以 getData 方法为例来探究下 Watcher 机制的实现。
/** * ZooKeeper.java */ public byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException { ZooKeeper.WatchRegistration wcb = null; if (watcher != null) { wcb = new ZooKeeper.DataWatchRegistration(watcher, path); } // ... GetDataRequest request = new GetDataRequest(); request.setPath(serverPath); request.setWatch(watcher != null); ReplyHeader r = this.cnxn.submitRequest(h, request, response, wcb); // ... } /** * ClientCnxn.java */ public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) throws InterruptedException { ReplyHeader r = new ReplyHeader(); Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration, watchDeregistration); // ... }
通过上面的代码我们可以了解到,Watcher 对象和其监听的路径会被封装在 WatchRegistration 对象中,然后在 ClientCnxn 还会被封装在 Packet 对象中。这个 Packet 可以被看做是一个最小的通信协议单元,用于进行客户端与服务端之间的网络传输。封装完成之后,将该请求发送给服务端,发送成功后,将 Watcher 相关信息存储到客户端的 ZKWatchManager 对象中,至此客户端的做的工作也就完成了。
不过,这里有一个细节问题,是不是每调用一次 getData 客户端都会把整个 Watcher 对象发送给客户端呢?如果是这样的话,多次 getData 的调用就会导致服务端内存的紧张,我们来看下 ZooKeeper 是怎么处理这个问题的。
/** * Packet.java */ public void createBB() { try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); boa.writeInt(-1, "len"); // We'll fill this in later if (requestHeader != null) { requestHeader.serialize(boa, "header"); } if (request instanceof ConnectRequest) { request.serialize(boa, "connect"); // append "am-I-allowed-to-be-readonly" flag boa.writeBool(readOnly, "readOnly"); } } }
通过查看 Packet 内部的 createBB 方法我们可以看到,**Packet 在进行网络传输时仅仅是把 requestHeader 和 request 两个属性进行序列化,虽然 Watcher 被封装在 Packet 中,但是其并不会通过网络传输给服务端。**request 对象里面的内容可以看上面 ZooKeeper 类中的 Request 对象,它主要给该 Packet 添加一个标识,让服务端判断该请求是否包含 Watcher 监听。
服务端处理
在源码中,服务端是由 ZooKeeperServer 实现的,在其内部,是由 RequestProcessor 接口来处理客户端的请求,我们来看下其中的一个实现类 FinalRequestProcessor。
/** * FinalRequestProcessor */ public void processRequest(Request request) { // ... ServerCnxn cnxn = request.cnxn; case OpCode.getData: { // ... byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null); break; } }
当 FinalRequestProcesor 判断到该请求是一个 Watcher 监听时,会把 ServerCnxn 对象和监听路径传到 getData 方法里面去。这个 ServerCnxn 是什么呢?它是一个 ZooKeeper 客户端和服务端之间的连接接口,代表了一个客户端和服务器的连接,并且实现了 Watcher 的 process 方法,它最终会交由给 WatchManager 管理。
除了管理 Watcher,WatcherManager 还负责 Watcher 事件的触发,并移除那些已经被触发的 Watcher。由于其管理的 ServerCnxn 已经实现了 process 方法,因此当监听对象发生变化时,它就会调用 ServerCnxn 的 process 方法。
/** * NIOServerCnxn */ public void process(WatchedEvent event) { ReplyHeader h = new ReplyHeader(-1, -1L, 0); WatcherEvent e = event.getWrapper(); sendResponse(h, e, "notification"); }
我们可以看到,服务端执行的逻辑很简单,只是在请求头中标记 -1,表明当前是一个通知,然后将该请求发送给客户端,具体的回调逻辑都是在客户端执行的。
客户端回调 Watcher
客户端在和 ZooKeeper 建立连接时,会启动 sendThread 和 eventThread 线程。
sendThread 线程负责发送请求给服务端,同时也接收服务端发送过来的响应,当它判断到响应中的 XID 标识为 -1,便将它作为一个通知类型的响应,将响应中的信息进行序列化,交给 eventThread 线程处理。
eventThread 会根据响应内容判断该通知对应的 Watcher 类型,从 ZKWatchManager 中取出所有相关的 Watcher,然后放到 waitingEvents 队列中,该队列时一个待处理 Watcher 的队列,eventThrad 每次从中取出一个 Watcher,然后进行串行同步处理,就是依次调用队列中 Watcher 的 process 的方法。
Watcher 特性总结
上面就是 Watcher 机制的整个执行流程了,最后就简单说下我认为 Watcher 机制中两个比较显著的特点。
第一个就是一次性,在整个流程中,不管是服务端还是客户端在处理 Watcher,当 Watcher 触发之后,就会将他们从本地内存中去除掉,如果还需要监听的话就需要反复注册。如果注册一个 Watcher 一直有效的话,那么当更新频繁时,对网络带宽和服务器的压力是很大的。
第二个就是轻量,客户端发送给服务器的请求中只是表明该请求是对哪个路径的监听,并没有把全部信息传给服务端,服务端给客户端做响应也是如此,它只是告诉它监听的节点或子列表发生变化了,具体的变化信息需要重新去服务端获取,这个轻量的设计使得网络带宽和服务器的压力大大减小了。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
【云栖号案例 | 教育与科研机构】红岭创投上云 成功顺应全球电子商务发展趋势
云栖号案例库:【点击查看更多上云案例】不知道怎么上云?看云栖号案例库,了解不同行业不同发展阶段的上云方案,助力你上云决策! 公司介绍 红岭创投隶属于深圳市红岭创投电子商务股份有限公司。 红岭创投顺应全球电子商务发展趋势,充分挖掘互联网市场潜力,致力于通过建立一个安全、高效、诚信、互惠互助的网络借贷平台,让人们有机会帮助到需要帮助的人,从而解决创业问题,帮助投资者及创业者。 选择阿里云 作为一家全部生产业务使用互联网中间件EDAS的企业,在快速迭代的业务模式下,互联网中间件大中台的架构完美的适配了敏捷开发的需要,并且在阿里云企业服务7*24不间断的保障护航下,所有大小问题都第一时间都可以得到快速响应解决并迭代更新,很好的满足了金融行业所要求的高稳定性。 “阿里云专家服务通过非常专业的技术及态度帮助我们将应用及数据平滑迁移到云上,并在上云前后对系统进行全方面的护航技术保障,进行全方面的容量评估和性能优化,让业务系统平稳度过业务流量高峰。” ——红岭创投CTO 徐敏 获得的成效 阿里云为红岭创投提供7*24不间断的护航保障,快速响应,助力业务系统和数据平滑上云,让业务系统平稳度过业务流量高峰...
- 下一篇
PHP 框架 CodeIgniter 4.0 正式版发布了!
今天我们隆重的宣布,完全重写的 CodeIgniter 4.0 正式版发布了!非常感谢所有贡献代码,撰写文档或提交 Issue 的朋友们。当你浏览市面上大多数的 PHP 框架时,你会发现其中大多数框架都具有某种形式的商业支持,但是对于 CodeIgniter 框架来说,是一些普普通通的开源贡献者们帮助了框架的发展,甚至在遇到一些挑战时仍在继续坚持。 回首过去,我第一次提交代码是在 2015 年 8 月 26 日。当时,我只是凭兴趣自愿提供一些代码,我从未想过我会成为这个项目的首席开发人员,但当时由于其他团队成员的时间问题,我担任了这个看起来容易的职位。在最初的三年左右的时间里,我一直在用业余时间编写代码,努力实现自己的承诺。这些年来,我有好几次想停下来,我想去开发一些更可能完成的东西,例如我想探索的四个主要思想中的任何一个,是你们让我坚持了下去。 在过去的一年半中,你们中的许多人都挺身而出,以帮助将该项目推向终点。到今天,如果没有大家的帮助这是不可能发生的。 不幸的是,一切并不都是美好的,因为我们最近因肺癌失去了出色的项目负责人 James Perry。他的杰出贡献使得这次 4.0 版...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS7安装Docker,走上虚拟化容器引擎之路
- CentOS6,CentOS7官方镜像安装Oracle11G
- CentOS8编译安装MySQL8.0.19
- Windows10,CentOS7,CentOS8安装Nodejs环境
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- SpringBoot2整合Redis,开启缓存,提高访问速度