您现在的位置是:首页 > 文章详情

Netty内存池之PoolThreadCache详解

日期:2019-04-23点击:242

       PoolThreadCahche是Netty内存管理中能够实现高效内存申请和释放的一个重要原因,Netty会为每一个线程都维护一个PoolThreadCache对象,当进行内存申请时,首先会尝试从PoolThreadCache中申请,如果无法从中申请到,则会尝试从Netty的公共内存池中申请。本文首先会对PoolThreadCache的数据结构进行讲解,然后会介绍Netty是如何初始化PoolThreadCache的,最后会介绍如何在PoolThreadCache中申请内存和如何将内存释放到PoolThreadCache中。

1. PoolThreadCache数据结构

       PoolThreadCache的数据结构与PoolArena的主要属性结构非常相似,但细微位置有很大的不同。在PoolThreadCache中,其维护了三个数组(我们以直接内存的缓存方式为例进行讲解),如下所示:

// 存储tiny类型的内存缓存,该数组长度为32,其中只有下标为1~31的元素缓存了有效数据,第0号位空置。 // 这里内存大小的存储方式也与PoolSubpage类似,数组的每一号元素都存储了不同等级的内存块,每个等级的 // 内存块的内存大小差值为16byte,比如第1号位维护了大小为16byte的内存块,第二号为维护了大小为32byte的 // 内存块,依次类推,第31号位维护了大小为496byte的内存块。 private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches; // 存储small类型的内存缓存,该数组长度为4,数组中每个元素中维护的内存块大小也是成等级递增的,并且这里 // 的递增方式是按照2的指数次幂进行的,比如第0号为维护的是大小为512byte的内存块,第1号位维护的是大小为 // 1024byte的内存块,第2号位维护的是大小为2048byte的内存块,第3号位维护的是大小为4096byte的内存块 private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches; // 存储normal类型的内存缓存。需要注意的是,这里虽说是维护的normal类型的缓存,但是其只维护2<<13,2<<14 // 和2<<15三个大小的内存块,而该数组的大小也正好为3,因而这三个大小的内存块将被依次放置在该数组中。 // 如果申请的目标内存大于2<<15,那么Netty会将申请动作交由PoolArena进行。 private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches; 

       这三个数组分别保存了tiny,small和normal类型的缓存数据,不同于PoolArena的使用PoolSubpage和PoolChunk进行内存的维护,这里都是使用MemoryRegionCache进行的。另外,在MemoryRegionCache中保存了一个有界队列,对于tiny类型的缓存,该队列的长度为512,对于small类型的缓存,该队列的长度为256,对于normal类型的缓存,该队列的长度为64。在进行内存释放的时候,如果队列已经满了,那么就会将该内存块释放回PoolArena中。这里需要说明的是,这里的队列中的元素统一使用的是Entry这种数据结构,该结构的主要属性如下:

static final class Entry<T> { // 用于循环利用当前Entry对象的处理器,该处理器的实现原理,我们后续将进行讲解 final Handle<Entry<?>> recyclerHandle; // 记录了当前内存块是从哪一个PoolChunk中申请得来的 PoolChunk<T> chunk; // 如果是直接内存,该属性记录了当前内存块所在的ByteBuffer对象 ByteBuffer nioBuffer; // 由于当前申请的内存块在PoolChunk以及PoolSubpage中的位置是可以通过一个长整型参数来表示的, // 这个长整型参数就是这里的handle,因而这里直接将其记录下来,以便后续需要将当前内存块释放到 // PoolArena中时,能够快速获取其所在的位置 long handle = -1; } 

       PoolThreadCache中维护每一个内存块最终都是使用的一个Entry对象来进行的,从上面的属性可以看出,记录该内存块最重要的属性是chunk和handle,chunk记录了当前内存块所在的PoolChunk对象,而handle则记录了当前内存块是在PoolChunk和PoolSubpage中的哪个位置(关于PoolChunk,PoolSubpage和PoolArena的实现原理,建议读者阅读一下前面的文章,这样有助于读者快速理解相关原理)。如此,对于Netty使用的PoolThreadCache的存储结构我们就有了一个比较清晰的认识。下面我们通过一幅图来对PoolThreadCache的数据结构进行一个整体的演示:

image.png

       如上图所示展示的就是PoolThreadCache的结构示意图。从图中可以看出在一个PoolThreadCache中,主要有三个MemoryRegionCache数组用于存储tiny,small和normal类型的内存块。每个MemoryRegionCache中有一个队列,队列中的元素类型为Entry。Entry的作用就是存储缓存的内存块的,其存储的方式主要是通过记录当前内存块所在的PoolChunk和标志其在PoolChunk中位置的handle参数。对于不同类型的数组,队列的长度是不一样的,tiny类型的是512,small类型的是256,normal类型的则是64。

2. PoolThreadCache初始化

       对于PoolThreadCache的初始化,这里单独拿出来讲解的原因是,其初始化过程是与PoolThreadLocalCache所绑定的。PoolThreadLocalCache的作用与Java中的ThreadLocal的作用非常类似,其有一个initialValue()方法,用于在无法从PoolThreadLocalCache中获取数据时,通过调用该方法初始化一个。另外其提供了一个get()方法和和remove()方法,分别用于从PoolThreadLocalCache中将当前绑定的数据给清除。这里我们首先看看获取PoolThreadCache的入口代码:

@Override protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { // 从PoolThreadLocalCache中尝试获取一个PoolThreadCache对象, // 如果不存在,则自行初始化一个返回 PoolThreadCache cache = threadCache.get(); // 由于当前方法是需要返回一个direct buffer,因而这里直接使用cache中的directArena PoolArena<ByteBuffer> directArena = cache.directArena; final ByteBuf buf; if (directArena != null) { // 如果directArena不为空,则直接调用其allocate()方法申请内存 buf = directArena.allocate(cache, initialCapacity, maxCapacity); } else { // 如果当前缓存中由于某种原因无法获取到directArena,则直接创建一个存有直接内存的ByteBuf, // 一般情况下不会走到这一步 buf = PlatformDependent.hasUnsafe() ? UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) : new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } // 为ByteBuf设置内存泄露检测功能 return toLeakAwareBuffer(buf); } 

       从上面的代码中可以看出,在最开始的时候,就会通过PoolThreadLocalCache尝试获取一个PoolThreadCache对象,如果不存在,其会自行初始化一个。这里我们直接看其是如何初始化的,如下是PoolThreadLocalCache.initialValue()方法的源码:

@Override protected synchronized PoolThreadCache initialValue() { // 这里leastUsedArena()就是获取对应的PoolArena数组中最少被使用的那个Arena,将其返回。 // 这里的判断方式是通过比较PoolArena.numThreadCaches属性来进行的,该属性记录了当前PoolArena被 // 多少个线程所占用了。这里采用的思想就是,找到最少被使用的那个PoolArena,将其存入新的线程缓存中 final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas); final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas); Thread current = Thread.currentThread(); // 只有在指定了为每个线程使用缓存,或者当前线程是FastThreadLocalThread的子类型时,才会使用线程缓存 if (useCacheForAllThreads || current instanceof FastThreadLocalThread) { return new PoolThreadCache( heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); } // 如果指定了不使用缓存,或者线程换粗对象不是FastThreadLocalThread类型的,则创建一个PoolThreadCache // 对象,该对象中是不做任何缓存的,因为初始化数据都是0 return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0); } private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) { if (arenas == null || arenas.length == 0) { return null; } // 在PoolArena数组中找到被最少线程占用的对象,将其返回。这样做的目的是,由于内存池是多个线程都可以 // 访问的公共区域,因而当这里就需要对内存池进行划分,以减少线程之间的竞争。 PoolArena<T> minArena = arenas[0]; for (int i = 1; i < arenas.length; i++) { PoolArena<T> arena = arenas[i]; if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) { minArena = arena; } } return minArena; } 

       从上述代码可以看出,对于PoolThreadCache的初始化,其首先会查找PoolArena数组中被最少线程占用的那个arena,然后将其封装到一个新建的PoolThreadCache中。

3. 内存申请

       需要注意的是,PoolThreadCache申请内存并不是说其会创建一块内存,或者说其会到PoolArena中申请内存,而是指,其本身已经缓存有内存块,而当前申请的内存块大小正好与其一致,就会将该内存块返回;PoolThreadCache中的内存块都是在当前线程使用完创建的ByteBuf对象后,通过调用其release()方法释放内存时直接缓存到当前PoolThreadCache中的,其并不会直接将内存块返回给PoolArena。这里我们直接看一下其allocate()方法是如何实现的:

// 申请tiny类型的内存块 boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) { return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity); } // 申请small类型的内存块 boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) { return allocate(cacheForSmall(area, normCapacity), buf, reqCapacity); } // 申请normal类型的内存块 boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) { return allocate(cacheForNormal(area, normCapacity), buf, reqCapacity); } // 从MemoryRegionCache中申请内存 private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) { if (cache == null) { return false; } // 从MemoryRegionCache中申请内存,本质上就是从其队列中申请,如果存在,则初始化申请到的内存块 boolean allocated = cache.allocate(buf, reqCapacity); // 这里是如果当前PoolThreadCache中申请内存的次数达到了8192次,则对内存块进行一次trim()操作, // 对使用较少的内存块,将其返还给PoolArena,以供给其他线程使用 if (++allocations >= freeSweepAllocationThreshold) { allocations = 0; trim(); } return allocated; } 

       这里对于内存块的申请,我们可以看到,PoolThreadCache是将其分为tiny,small和normal三种不同的方法来调用的,而具体大小的区分其实是在PoolArena中进行区分的(读者可以阅读本人前面的关于PoolArena介绍的文章)。在对应的内存数组中找到MemoryRegionCache对象之后,通过调用allocate()方法来申请内存,申请完之后还会检查当前缓存申请次数是否达到了8192次,达到了则对缓存中使用的内存块进行检测,将较少使用的内存块返还给PoolArena。这里我们首先看一下获取MemoryRegionCache的代码是如何实现的,也即cacheForTiny(),cacheForSmall()和cacheForNormal()的代码:

private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) { // 计算当前数组下标索引,由于tiny类型的内存块每一层级相差16byte,因而这里的计算方式就是 // 将目标内存大小除以16 int idx = PoolArena.tinyIdx(normCapacity); // 返回tiny类型的数组中对应位置的MemoryRegionCache if (area.isDirect()) { return cache(tinySubPageDirectCaches, idx); } return cache(tinySubPageHeapCaches, idx); } private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int normCapacity) { // 计算当前数组下标的索引,由于small类型的内存块大小都是2的指数次幂,因而这里就是将目标内存大小 // 除以1024之后计算其偏移量 int idx = PoolArena.smallIdx(normCapacity); // 返回small类型的数组中对应位置的MemoryRegionCache if (area.isDirect()) { return cache(smallSubPageDirectCaches, idx); } return cache(smallSubPageHeapCaches, idx); } private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int normCapacity) { // 对于normal类型的缓存,这里也是首先将其向右位移13位,也就是8192,然后取2的对数,这样就 // 可以得到其在数组中的位置,然后返回normal类型的数组中对应位置的MemoryRegionCache if (area.isDirect()) { int idx = log2(normCapacity >> numShiftsNormalDirect); return cache(normalDirectCaches, idx); } int idx = log2(normCapacity >> numShiftsNormalHeap); return cache(normalHeapCaches, idx); } 

       这里对于数组位置的计算,主要是根据各个数组数据存储方式的不同而进行的,而它们最终都是通过一个MemoryRegionCache存储的,因而只需要返回该缓存对象即可。下面我们继续看一下MemoryRegionCache.allocate()方法是如何申请内存的:

public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) { // 尝试从队列中获取,如果队列中不存在,说明没有对应的内存块,则返回false,表示申请失败 Entry<T> entry = queue.poll(); if (entry == null) { return false; } // 走到这里说明队列中存在对应的内存块,那么通过其存储的Entry对象来初始化ByteBuf对象, // 如此即表示申请内存成功 initBuf(entry.chunk, entry.nioBuffer, entry.handle, buf, reqCapacity); // 对entry对象进行循环利用 entry.recycle(); // 更新当前已经申请的内存数量 ++allocations; return true; } 

       可以看到,MemoryRegionCache申请内存的方式主要是从队列中取,如果取到了,则使用该内存块初始化一个ByteBuf对象。

       前面我们讲到,PoolThreadCache会对其内存块使用次数进行计数,这么做的目的在于,如果一个ThreadPoolCache所缓存的内存块使用较少,那么就可以将其释放到PoolArena中,以便于其他线程可以申请使用。PoolThreadCache会在其内存总的申请次数达到8192时遍历其所有的MemoryRegionCache,然后调用其trim()方法进行内存释放,如下是该方法的源码:

public final void trim() { // size表示当前MemoryRegionCache中队列的最大可存储容量,allocations表示当前MemoryRegionCache // 的内存申请次数,size-allocations的含义就是判断当前申请的次数是否连队列的容量都没达到 int free = size - allocations; allocations = 0; // 如果申请的次数连队列的容量都没达到,则释放该内存块 if (free > 0) { free(free); } } private int free(int max) { int numFreed = 0; // 依次从队列中取出Entry数据,调用freeEntry()方法释放该Entry for (; numFreed < max; numFreed++) { Entry<T> entry = queue.poll(); if (entry != null) { freeEntry(entry); } else { return numFreed; } } return numFreed; } private void freeEntry(Entry entry) { // 通过当前Entry中保存的PoolChunk和handle等数据释放当前内存块 PoolChunk chunk = entry.chunk; long handle = entry.handle; ByteBuffer nioBuffer = entry.nioBuffer; entry.recycle(); chunk.arena.freeChunk(chunk, handle, sizeClass, nioBuffer); } 

4. 内存释放

       对于内存的释放,其原理比较简单,一般的释放内存的入口在ByteBuf对象中。当调用ByteBuf.release()方法的时候,其首先会将释放动作委托给PoolChunk的free()方法,PoolChunk则会判断当前是否是池化的ByteBuf,如果是池化的ByteBuf,则调用PoolThreadCache.add()方法将其添加到PoolThreadCache中,也就是说在释放内存时,其实际上是释放到当前线程的PoolThreadCache中的。如下是add()方法的源码:

boolean add(PoolArena<?> area, PoolChunk chunk, ByteBuffer nioBuffer, long handle, int normCapacity, SizeClass sizeClass) { // 通过当前释放的内存块的大小计算其应该放到哪个等级的MemoryRegionCache中 MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass); if (cache == null) { return false; } // 将内存块释放到目标MemoryRegionCache中 return cache.add(chunk, nioBuffer, handle); } public final boolean add(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle) { // 这里会尝试从缓存中获取一个Entry对象,如果没获取到则创建一个 Entry<T> entry = newEntry(chunk, nioBuffer, handle); // 将实例化的Entry对象放到队列里 boolean queued = queue.offer(entry); if (!queued) { entry.recycle(); } return queued; } 

5. 小结

       本文首先详细讲解了PoolThreadCache的数据结构,并且说明了其中需要注意的点,然后介绍了PoolThreadCache的实例化方式,接着从申请和释放内存两个角度介绍了PoolThreadCache源码的实现方式。

原文链接:https://my.oschina.net/zhangxufeng/blog/3040834
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章