您现在的位置是:首页 > 文章详情

从RocketMQ消息持久化设计看磁盘性能瓶颈的突破

日期:2019-05-16点击:402

从RocketMQ消息持久化设计看磁盘性能瓶颈的突破

微信公众号:IT一刻钟

大型现实非严肃主义现场,一刻钟与你分享优质技术架构与见闻,做一个有剧情的程序员。

关注可第一时间了解更多精彩内容,定期有福利相送哟。

分布式消息队列通常有高可靠性的要求,所以消息数据是需要持久化存储的。那么以什么方式来进行持久化是一个值得商榷的问题。

从存储方式和效率来看,文件系统 > KV存储 > 关系型数据库,直接操作文件系统自然是最快的一种存储方式,但是仅仅如此就可以了吗?

当然不是,在无数的过往学习中,磁盘IO性能拖累系统性能是众所周知的。那么RocketMQ是怎么解决呢?

各位看官且待我慢慢道来。

存储架构设计

首先我们回忆一下,假如现在有一个字你不认识,然后你手上正巧有一本汉语言辞典,请问该怎么做才能以最快的速度查到这个字?

凡是上过小学的人,应该都不会从汉语言辞典第一页开始一页一页的查找。

作为优秀小学毕业生的我们,肯定是先通过偏旁检索到这个字,然后根据检索上这个字的页码,到汉语言辞典里对应的页码中去找到这个字,于是你就知道它读什么了。

大道相通。

RocketMQ在文件系统中,把所有的消息都存在了同一个文件中,这就像一本厚厚的汉语言辞典,作为消费者,想要做到最大效率的实时消费,说白了就是要快速定位到这个消息在文件中的位置,肯定不能从文件偏移量0开始向下查找。

一张图顶几百字:

RocketMQ主要存储文件有三个,分别是:

CommitLog:消息存储文件,所有的消息存在这里;

ConsumeQueue:消费队列文件,消息在存储到CommitLog后,会将消息所在CommitLog偏移量、大小、tag的hashcode异步转发到消费队列存储,供消费者消费,其类似于数据库的索引文件,存储的是指向物理存储的地址,每个topic下的每个Message Queue都有一个对应的ConsumeQueue文件;

Index:索引文件,消息在存储到CommitLog后,会将消息key与消息所在CommitLog偏移量转发到索引文件存储,供消息查询。

从原理图中,我们可以看出消息的生产与消费进行了分离,Producer端发送消息最终写入的是CommitLog,Consumer端先从ConsumeQueue读取持久化消息的起始物理位置偏移量offset、大小size和消息Tag的HashCode值,再从CommitLog中进行读取待拉取消费消息的真正实体内容部分。

上面说了消费者如何快速定位到消息位置,使消费者可以高效的消费,那么下面我们说说RocketMQ中如何做到消息存储的高效性。

我们先思考一个问题,假如你是印刷厂的老板,你如何才能快速印刷出一本完整没有错误的汉语言辞典呢?

答案很简单,从第一页开始,按照顺序一页一页的印刷,不要跳页印刷,更不要随机印刷。

正如我们的磁盘写入一样,据某某调查研究表明,高性能磁盘在顺序写入的时候,速度基本可以堪比内存的写入速度,但是磁盘随机写入的时候,性能瓶颈非常明显,速度会比较慢。

所以RocketMQ采用了全部消息都存入一个CommitLog文件中,并且对写操作加锁(putMessageLock),保证串行顺序写入消息,避免磁盘竟争导致IO WAIT增高,大大提高写入效率。

我们可以用一个更详细的图来说明:

生产者按顺序写入CommitLog,消费者通过顺序读取ConsumeQueue进行消费,这里有一个地方需要注意,虽然消费者是按照顺序读取ConsumeQueue,但是并不代表它就是按照顺序读取消息,因为根据ConsumeQueue中的起始物理位置偏移量offset读取消息真实内容,在并发量非常高的情况下,实际上是随机读取CommitLog,而随机读取文件带来的性能开销影响还是比较大的,所以在这里,RocketMQ利用了操作系统的pagecache机制,批量从磁盘读取,作为cache存在内存中,加速后速的读取速度。

存储文件

我们打开RocketMQ在磁盘上持久化的目录(store目录下),便可以很直观的看到CommitLog,ConsumeQueue,Index三个文件夹。(其中config文件夹中是运行期间一些配置信息,而abort,checkpoint我会在后续的文章中讲述它们的作用,关注“IT一刻钟”吧,不要在犹豫中错过了重要内容!)

CommitLog文件夹中的内容(${ROCKET_HOME}/store/commitlog)

可以看到每个文件1G大小,以该文件中第一个偏移量为文件名,偏移量小于20位用0补齐。如图所示,第一个文件的初始偏移量为9663676416,第二个文件的初始偏移量为10737418240。

CommitLog文件内部存储逻辑是,每条消息的前4个字节存储该条消息的总长度(包含长度信息本身),随后便是消息内容。如图所示:

消息的长度=消息长度信息(4字节)+ 消息内容长度。

实现消息查找的步骤:

1.消费者从消费队列中获取到某个消息的偏移量offset与长度size;

2.根据偏移量offset定位到消息所在的commitLog物理文件;

3.用偏移量与文件长度取模,得到消息在这个commitLog文件内部的偏移量;

4.从该偏移量取得size长度的内容返回即可。

注:如果只是根据消息偏移量查找消息,则首先找到文件内偏移量,然后读取前4个字节获取消息的实际长度,然后读取指定的长度。 这里有一个比较巧妙的设计,CommitLog文件并不是每次生成一个,然后写满之后再创建下一个,而是有一个预分配的机制。

即,CommitLog创建过程是把下一个文件的路径、下下个文件的路径以及文件大小作为参数封装到AllocateRequest对象并添加到队列中,后台运行的AllocateMappedFileService服务线程会不停地run,只要请求队列里存在请求对象,就会去创建下个CommitLog,同时还会将下下个CommitLog预先创建并保存至请求队列中等待下次获取时直接返回,不用再次因为等待CommitLog创建分配而产生时间延迟。

ConsumeQueue文件夹中的内容(${ROCKET_HOME}/store/consumequeue)

对于消费者来说,最关心的莫过于某个主题下的所有消息,但是在RocketMQ中,不同主题下的消息都交错杂糅在同一个文件里,想要提高查询速度,必须要构建类似于搜索索引的文件,于是就有了消费队列ConsumeQueue文件。

从实际物理存储来说,ConsumeQueue对应每个Topic和QueuId下面的文件,在上图中,00000000000012000000就是在主题为sim-online-orders,QueueId为1下的ConsumeQueue文件。单个文件大小约5.72M,每个文件由30W条数据组成,每个文件默认大小为600万个字节,即每条数据20个字节。当一个ConsumeQueue类型的文件写满了,则写入下一个文件。

ConsumeQueue文件内部存储逻辑如图:

包含消息在commitLog文件的偏移量,消息长度,消息tag的HashCode。 单个ConsumeQueue文件可以看作是ConsumeQueue条目数组,其下标是ConsumeQueue的逻辑偏移量。

消息消费队列是RocketMQ为消息订阅构建的索引文件,目的在于提高主题与消息队列检索消息的速度。

Index文件夹中的内容(${ROCKET_HOME}/store/index)

RocketMQ为了通过消息Key值查询消息真正的实体内容,引入了Hash索引机制。在实际的物理存储上,文件名则是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存2000W个索引。

我们先来看看Index索引文件的内部存储逻辑:

IndexFile包含三个部分:IndexHead,Hash槽,Index条目。

1.IndexHead,包含40个字节,记录一些统计信息:

    beginTimestamp:该索引文件中包含消息的最小存储时间。

    endTimestamp:该索引文件中包含消息的最大存储时间。

    beginPhyoffset:该索引文件中包含消息的最小物理偏移量(commitlog文件偏移量)。

    endPhyoffset:该索引文件中包含消息的最大物理偏移量(commitlog文件偏移量)。

    hashslotCount: hashslot个数,并不是hash槽使用的个数,在这里意义不大。

    indexCount: Index条目列表当前已使用的个数,Index条目在Index条目列表中按顺序存储。

2.Hash槽,默认500万个槽,每个槽位存储着该消息key的HashCode所对应的最新Index条目的下标数。

3.Index条目列表,默认一个索引文件包含2000万个条目:

    hashcode:key的HashCode。

    phyoffset:消息对应的物理偏移量。

    timedif:该消息存储时间与第一条消息的时间戳的差值,小于0该消息无效。

    preIndexNo:该HashCode上一个条目的Index索引,当出现hash冲突时,构建的链表结构。

    大家看懂了这个数据结构没有?设计的真是精妙。

如果没有理解,我给大家画个图,来体会一下这个数据结构的精妙:

首先根据key的HashCode对槽数取模,得到槽位,然后将相应的数据按顺序存入到Index条目中,同时将条目数存回对应的槽内。

如果遇到Hash冲突,Index条目会通过pre index no构建链表结构:

如图第二个槽位冲突,第5条index条目的pre index no存储原来的第二条序号。 其实就是HashMap的变形结构。

通过以上结构便可以用消息的key快速定位到消息内容。

内存映射

如果说以上内容是RocketMQ通过优化数据结构的方式来提高分布式消息队列的性能,那么这里便是通过操作系统底层来优化性能。

在Linux中,操作系统分为“用户态”和“内核态”,普通的标准IO操作文件时,首先从磁盘将数据复制到内核态内存,接着从内核态内存复制到用户态内存,完成读取操作,然后从用户态内存复制到网络驱动的内核态内存,最后从网络驱动的内核态内存复制到网卡中进行传输,完成写出操作。

这个全过程中涉及到四次复制,可以说效率是可见的低。

于是,在RocketMQ中,通过Java中的MappedByteBuffer(mmap方式)实现“零拷贝”,省去了向用户态的内存复制,提高了消息存储和网络发送的速度。

这里我们说一说什么是mmap内存映射技术。

mmap技术可以直接将用户进程私有地址空间中的一块区域与文件对象建立映射关系,这样程序就好像可以直接从内存中完成对文件读/写操作一样。当发生缺页中断时,直接将文件从磁盘拷贝至用户态的进程空间内,只进行了一次数据拷贝。对于容量较大的文件来说(文件大小一般需要限制在1.5~2G以下),采用mmap的方式读/写效率和性能都非常高。如图:

使用Mmap的限制:

a.Mmap映射的内存空间释放的问题:由于映射的内存空间本身就不属于JVM的堆内存区(Java Heap),因此其不受JVM GC的控制,卸载这部分内存空间需要通过系统调用 unmap()方法来实现。然而unmap()方法是FileChannelImpl类里实现的私有方法,无法直接显示调用。RocketMQ中的做法是,通过Java反射的方式调用“sun.misc”包下的Cleaner类的clean()方法来释放映射占用的内存空间;

b.MappedByteBuffer内存映射大小限制:因为其占用的是虚拟内存(非JVM的堆内存),大小不受JVM的-Xmx参数限制,但其大小也受到OS虚拟内存大小的限制。一般来说,一次只能映射1.5~2G 的文件至用户态的虚拟内存空间,这也是为何RocketMQ默认设置单个CommitLog日志数据文件为1G的原因了;
c.使用MappedByteBuffe的其他问题:会存在内存占用率较高和文件关闭不确定性的问题;

突破性能瓶颈的处理方法有哪些?

1.简单高效的数据结构,提高检索速度;

2.磁盘的顺序写入,避免无序io竞争,提高消息存储速度;

3.预分配机制,降低文件处理等待时间;

4.依赖pagecache机制,批量从磁盘读取消息并加载到缓存,提高读取速度;

5.内存映射机制,较少用户态内核态之间的复制次数,提高处理效率。

说在后面的话

代码底下无秘密。

想要更深入的研究RocketMQ持久化处理,请点击链接:https://github.com/MrChiu/RocketMQ-Study/tree/release-4.3.2/store

里面附有我标注的注释,易于通读代码

 

原文链接:https://my.oschina.net/u/3790005/blog/3050589
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章