MongoDB 4.2 内核解析 - Change Stream
MongoDB 从3.6版本开始支持了 Change Stream 能力(4.0、4.2 版本在能力上做了很多增强),用于订阅 MongoDB 内部的修改操作,change stream 可用于 MongoDB 之间的增量数据迁移、同步,也可以将 MongoDB 的增量订阅应用到其他的关联系统;比如电商场景里,MongoDB 里存储新的订单信息,业务需要根据新增的订单信息去通知库存管理系统发货。
Change Stream 与 Tailing Oplog 对比
在 change stream 功能之前,如果要获取 MongoDB 增量的修改,可以通过不断 tailing oplog
的方式来 拉取增量的 oplog ,然后针对拉取到的 oplog 集合,来过滤满足条件的 oplog。这种方式也能满足绝大部分场景的需求,但存在如下的不足。
- 使用门槛较高,用户需要针对 oplog 集合,打开特殊选项的的 tailable cursor ("tailable": true, "awaitData" : true)。
- 用户需要自己管理增量续传,当拉取应用 crash 时,用户需要记录上一条拉取oplog的 ts、h 等字段,在下一次先定位到指定 oplog 再继续拉取。
- 结果过滤必须在拉取侧完成,但只需要订阅部分 oplog 时,比如针对某个 DB、某个 Collection、或某种类型的操作,必须要把左右的 oplog 拉取到再进行过滤。
- 对于 update 操作,oplog 只包含操作的部分内容,比如
{$set: {x: 1}}
,而应用经常需要获取到完整的文档内容。 - 不支持 Sharded Cluster 的订阅,用户必须针对每个 shard 进行 tailing oplog,并且这个过程中不能有 moveChunk 操作,否则结果可能乱序。
MongoDB Change Stream 解决了 Tailing oplog 存在的不足
- 简单易用,提供统一的 Change Stream API,一次 API 调用,即可从 MongoDB Server 侧获取增量修改。
- 统一的进度管理,通过 resume token 来标识拉取位置,只需在 API 调用时,带上上次结果的 resume token,即可从上次的位置接着订阅。
- 支持对结果在 Server 端进行 pipeline 过滤,减少网络传输,支持针对 DB、Collection、OperationType 等维度进行结果过滤。
- 支持 fullDocument: "updateLookup" 选项,对于 update,返回当时对应文档的完整内容。
- 支持 Sharded Cluster 的修改订阅,相同的 API 请求发到 mongos ,即可获取集群维度全局有序的修改。
Change Stream 实战
以 Mongo shell 为例,使用 Change Stream 非常简单,mongo shell 封装了针对整个实例、DB、Collection 级别的订阅操作。
db.getMongo().watch() 订阅整个实例的修改 db.watch() 订阅指定DB的修改 db.collection.watch() 订阅指定Collection的修改
- 新建连接1发起订阅操作
mytest:PRIMARY>db.coll.watch([], {maxAwaitTimeMS: 60000}) 最多阻塞等待 1分钟
- 新建连接2写入新数据
mytest:PRIMARY> db.coll.insert({x: 100}) WriteResult({ "nInserted" : 1 }) mytest:PRIMARY> db.coll.insert({x: 101}) WriteResult({ "nInserted" : 1 }) mytest:PRIMARY> db.coll.insert({x: 102}) WriteResult({ "nInserted" : 1 })
- 连接1上收到 Change Stream 更新
mytest:PRIMARY> db.watch([], {maxAwaitTimeMS: 60000}) { "_id" : { "_data" : "825E0D5E35000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E353BE5C36D695042C90004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934389, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e353be5c36d695042c9"), "x" : 100 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e353be5c36d695042c9") } } { "_id" : { "_data" : "825E0D5E37000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E373BE5C36D695042CA0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934391, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca"), "x" : 101 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca") } } { "_id" : { "_data" : "825E0D5E39000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E393BE5C36D695042CB0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934393, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb"), "x" : 102 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb") } }
- 上述 ChangeStream 结果里,_id 字段的内容即为 resume token,标识着 oplog 的某个位置,如果想从某个位置继续订阅,在 watch 时,通过 resumeAfter 指定即可。比如每个应用订阅了上述3条修改,但只有第一条已经成功消费了,下次订阅时指定第一条的 resume token 即可再次订阅到接下来的2条。
mytest:PRIMARY> db.coll.watch([], {maxAwaitTimeMS: 60000, resumeAfter: { "_data" : "825E0D5E35000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E353BE5C36D695042C90004" }}) { "_id" : { "_data" : "825E0D5E37000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E373BE5C36D695042CA0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934391, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca"), "x" : 101 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca") } } { "_id" : { "_data" : "825E0D5E39000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E393BE5C36D695042CB0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934393, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb"), "x" : 102 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb") } }
Change Stream 内部实现
watch() wrapper
db.watch() 实际上是一个 API wrapper,实际上 Change Stream 在 MongoDB 内部实际上是一个 aggregation 命令,只是加了一个特殊的 $changestream
阶段,在发起 change stream 订阅操作后,可通过 db.currentOp() 看到对应的 aggregation/getMore 操作的详细参数。
{ "op" : "getmore", "ns" : "test.coll", "command" : { "getMore" : NumberLong("233479991942333714"), "collection" : "coll", "maxTimeMS" : 50000, "lsid" : { "id" : UUID("e4fffa71-e168-4527-be61-f0918849d107") }, }, "planSummary" : "COLLSCAN", "cursor" : { "cursorId" : NumberLong("233479991942333714"), "createdDate" : ISODate("2019-12-31T06:35:52.479Z"), "lastAccessDate" : ISODate("2019-12-31T06:36:09.988Z"), "nDocsReturned" : NumberLong(1), "nBatchesReturned" : NumberLong(1), "noCursorTimeout" : false, "tailable" : true, "awaitData" : true, "originatingCommand" : { "aggregate" : "coll", "pipeline" : [ { "$changeStream" : { "fullDocument" : "default" } } ], "cursor" : { }, "lsid" : { "id" : UUID("e4fffa71-e168-4527-be61-f0918849d107") }, "$clusterTime" : { "clusterTime" : Timestamp(1577774144, 1), "signature" : { "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="), "keyId" : NumberLong(0) } }, "$db" : "test" }, "operationUsingCursorId" : NumberLong(7019500) }, "numYields" : 2, "locks" : { } }
resume token
resume token 用来描述一个订阅点,本质上是 oplog 信息的一个封装,包含 clusterTime、uuid、documentKey等信息,当订阅 API 带上 resume token 时,MongoDB Server 会将 token 转换为对应的信息,并定位到 oplog 起点继续订阅操作。
struct ResumeTokenData { Timestamp clusterTime; int version = 0; size_t applyOpsIndex = 0; Value documentKey; boost::optional<UUID> uuid; };
ResumeTokenData 结构里包含 version 信息,在 4.0.7 以前的版本,version 均为0; 4.0.7 引入了一种新的 resume token 格式,version 为 1; 另外在 3.6 版本里,Resume Token 的编码与 4.0 也有所不同;所以在版本升级后,有可能出现不同版本 token 无法识别的问题,所以尽量要让 MongoDB Server 所有组件(Replica Set 各个成员,ConfigServer、Mongos)都保持相同的内核版本。
更详细的信息,参考 https://docs.mongodb.com/manual/reference/method/Mongo.watch/#resumability
updateLookup
Change Stream 支持针对 update 操作,获取当前的文档完整内容,而不是仅更新操作本身,比如
mytest:PRIMARY> db.coll.find({_id: 101}) { "_id" : 101, "name" : "jack", "age" : 18 } mytest:PRIMARY> db.coll.update({_id: 101}, {$set: {age: 20}}) WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })
上面的 update 操作,默认情况下,change stream 会收到 {_id: 101}, {$set: {age: 20}
的内容,而并不会包含这个文档其他未更新字段的信息;而加上 fullDocument: "updateLookup" 选项后,Change Stream 会根据文档 _id 去查找文档当前的内容并返回。
需要注意的是,updateLookup 选项只能保证最终一致性,比如针对上述文档,如果连续更新100次,update 的 change stream 并不会按顺序收到中间每一次的更新,因为每次都是去查找文档当前的内容,而当前的内容可能已经被后续的修改覆盖。
Sharded cluster
Change Stream 支持针对 sharded cluster 进行订阅,会保证全局有序的返回结果;为了达到全局有序这个目标,mongos 需要从每个 shard 都返回订阅结果按时间戳进行排序合并返回。
在极端情况下,如果某些 shard 写入量很少或者没有写入,change stream 的返回延时会受到影响,因为需要等到所有 shard 都返回订阅结果;默认情况下,mongod server 每10s会产生一条 Noop 的特殊oplog,这个机制会间接驱动 sharded cluster 在写入量不高的情况下也能持续运转下去。
由于需要全局排序,在 sharded cluster 写入量很高时,Change Stream 的性能很可能跟不上;如果对性能要求非常高,可以考虑关闭 Balancer,在每个 shard 上各自建立 Change Stream。
参考资料
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Android 和iOS客户端网络数据收集
1.1 Android-网络万用表 安卓推荐使用网络万用表实现常用网络检测工具包括: 域名解析,ping ,出口ip获取,本地dns ip获取,traceroute , tracert ,telnet ,curl地址,MTR,下载文件或给出文件大小及md5,http性能分析,IP定位,IP计算器,时间转化,编码转化,二维码生成。1.域名解析及Ping对输入的域名进行DNS 解析并完成ping 操作2.位置,IP地址获取设备的GPS位置(需要GPS授权)获取设备当前网络的IP(内网IP,公网IP)3.DNS信息得到用户当前网络的dns 信息(运营商),如果有多个都会遍历到4.Traceroute得到用户当前网络的到某个域名或IP 的路由信息,用于诊断网络故障使用5.端口tcptelnet 指定的IP及端口的联通性情况,有时ip禁ping ,可以进行用它进行检测网络情况.6.Curl结果抓取一个URL,并等到它的内容信息(源码展示).7.Wget 测试wget 一个文件,测试文件的下载速度,文件大小,文件md58.HttpTime 数据说明time_total 总时间,按秒计。精确到小数点...
- 下一篇
彩食鲜开源基础框架 csx-bsf-all【开源】【原创】
开源csx-bsf-all Git地址 https://gitee.com/yhcsx/csx-bsf-all 技术架构 彩食鲜技术架构概述 介绍 BSF 为 base service framework 的简写,定义为永辉彩食鲜技术团队的基础框架,用于基础服务的集成和跟业务无关的基础技术集成。 BSF集成了自研的监控报警,用来监控各个服务系统的性能及异常告警。集成并封装Apollo,Rocket MQ,Redis, Elastic Search,ELK,XXLJOB, Sharding JDBC,Cat,Eureka,七牛云等第三方中间件,提供简易使用的底层框架。 愿景 为了更好地支持业务开发,让开发人员从中间件中解放出来,专注业务以提高开发效率。同时基础框架集中统一优化中间件相关服务及使用,为开发人员提供高性能,更方便的基础服务接口及工具。 项目结构规范说明 csx-bsf-all -- csx-bsf-core (项目核心类库) -- csx-bsf-demo (项目集成使用demo) -- csx-bsf-dependencies (项目依赖pom定义) --...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS7安装Docker,走上虚拟化容器引擎之路
- CentOS6,CentOS7官方镜像安装Oracle11G
- CentOS8编译安装MySQL8.0.19